Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "ifmap/ifmap_update_sender.h"
6 : #include "base/task.h"
7 : #include "ifmap/ifmap_client.h"
8 : #include "ifmap/ifmap_server.h"
9 : #include "ifmap/ifmap_exporter.h"
10 : #include "ifmap/ifmap_log.h"
11 : #include "ifmap/ifmap_log_types.h"
12 : #include "ifmap/ifmap_update.h"
13 : #include "ifmap/ifmap_update_queue.h"
14 :
15 : using namespace std;
16 :
17 191 : IFMapUpdateSender::IFMapUpdateSender(IFMapServer *server,
18 191 : IFMapUpdateQueue *queue)
19 191 : : server_(server), queue_(queue), message_(new IFMapMessage()),
20 382 : task_scheduled_(false), queue_active_(false) {
21 191 : }
22 :
23 370 : IFMapUpdateSender::~IFMapUpdateSender() {
24 191 : delete(message_);
25 370 : }
26 :
27 : class IFMapUpdateSender::SendTask : public Task {
28 : public:
29 47 : explicit SendTask(IFMapUpdateSender *sender)
30 47 : : Task(TaskScheduler::GetInstance()->GetTaskId("db::IFMapTable"), 0),
31 47 : sender_(sender) {
32 47 : }
33 47 : virtual bool Run() {
34 47 : BitSet send_scheduled;
35 47 : sender_->GetSendScheduled(&send_scheduled);
36 47 : sender_->send_blocked_.Reset(send_scheduled);
37 47 : for (size_t i = send_scheduled.find_first(); i != BitSet::npos;
38 0 : i = send_scheduled.find_next(i)) {
39 : // Dequeue from client marker (i).
40 0 : IFMAP_UPD_SENDER_TRACE(IFMapUSSendScheduled, "Send scheduled for",
41 : send_scheduled.ToNumberedString(), "client", i,
42 : sender_->queue_->GetMarker(i)->ToString());
43 0 : sender_->Send(sender_->queue_->GetMarker(i));
44 : }
45 47 : if (sender_->queue_active_) {
46 : // Dequeue from tail marker.
47 : // Reset queue_active_
48 47 : IFMAP_UPD_SENDER_TRACE(IFMapUSQueueActive, "Queue active for",
49 : sender_->queue_->tail_marker()->ToString());
50 47 : sender_->Send(sender_->queue_->tail_marker());
51 47 : sender_->queue_active_ = false;
52 : }
53 47 : return true;
54 47 : }
55 :
56 0 : std::string Description() const { return "IFMapUpdateSender::SendTask"; }
57 : private:
58 : IFMapUpdateSender *sender_;
59 : };
60 :
61 47 : void IFMapUpdateSender::StartTask() {
62 47 : if (!task_scheduled_) {
63 : // create new task
64 47 : SendTask *send_task = new SendTask(this);
65 47 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
66 47 : scheduler->Enqueue(send_task);
67 47 : task_scheduled_ = true;
68 : }
69 47 : }
70 :
71 1013 : void IFMapUpdateSender::QueueActive() {
72 1013 : if (queue_active_) {
73 966 : return;
74 : }
75 47 : queue_active_ = true;
76 47 : std::scoped_lock lock(mutex_);
77 47 : StartTask();
78 47 : }
79 :
80 0 : void IFMapUpdateSender::SendActive(int index) {
81 0 : std::scoped_lock lock(mutex_);
82 0 : send_scheduled_.set(index);
83 0 : StartTask();
84 0 : }
85 :
86 47 : void IFMapUpdateSender::GetSendScheduled(BitSet *current) {
87 47 : std::scoped_lock lock(mutex_);
88 47 : *current = send_scheduled_;
89 47 : send_scheduled_.clear();
90 47 : task_scheduled_ = false;
91 47 : }
92 :
93 6 : void IFMapUpdateSender::CleanupClient(int index) {
94 6 : std::scoped_lock lock(mutex_);
95 6 : send_scheduled_.reset(index);
96 6 : send_blocked_.reset(index);
97 6 : }
98 :
99 : // We return only under 2 conditions:
100 : // 1. All the clients in the marker are blocked.
101 : // 2. We have finished traversing the Q.
102 : // Invariant: while we are traversing the Q, the marker that we are working
103 : // with only has ready clients. As soon as a client blocks, we split it out and
104 : // continue with the ready set.
105 47 : void IFMapUpdateSender::Send(IFMapMarker *imarker) {
106 47 : IFMapMarker *marker = imarker;
107 :
108 : // Get the clients in this marker that are blocked. If all of the clients in
109 : // this marker are blocked, we are done.
110 47 : BitSet blocked_clients;
111 47 : blocked_clients = (marker->mask & send_blocked_);
112 47 : if (blocked_clients == marker->mask) {
113 0 : return;
114 : }
115 :
116 : // If any of the clients are blocked, create a new marker for the set of
117 : // blocked clients, insert it before marker and continue with the ready
118 : // set.
119 47 : if (!blocked_clients.empty()) {
120 0 : IFMAP_UPD_SENDER_TRACE(IFMapUSSplitBlocked, "Splitting blocked clients",
121 : blocked_clients.ToNumberedString(), "from", marker->ToString());
122 0 : queue_->MarkerSplitBefore(marker, marker, blocked_clients);
123 : }
124 :
125 47 : IFMapListEntry *next = queue_->Next(marker);
126 47 : BitSet base_send_set;
127 :
128 : // Start with the node after the 'marker'
129 948 : for (IFMapListEntry *curr = next; curr != NULL; curr = next) {
130 901 : next = queue_->Next(curr);
131 :
132 901 : if (curr->IsMarker()) {
133 0 : IFMapMarker *next_marker = static_cast<IFMapMarker *>(curr);
134 : // Processing the next_marker can change the send_set and all
135 : // clients in the next_marker should have already seen the updates
136 : // currently sitting in the buffer. So, flush the buffer to the
137 : // existing client-set before processing the marker so that we dont
138 : // send duplicates.
139 0 : if (!message_->IsEmpty()) {
140 0 : BitSet blocked_set;
141 0 : SendUpdate(base_send_set, &blocked_set);
142 0 : }
143 : bool done;
144 0 : marker = ProcessMarker(marker, next_marker, &done);
145 0 : if (done) {
146 : // All the clients in this marker are blocked. We are done.
147 0 : return;
148 : }
149 : // marker has the ready clients. Continue as if we are starting
150 : // fresh.
151 0 : base_send_set.clear();
152 0 : continue;
153 0 : }
154 :
155 : // ...else its an update or delete
156 :
157 901 : IFMapUpdate *update = static_cast<IFMapUpdate *>(curr);
158 901 : BitSet send_set = update->advertise() & marker->mask;
159 901 : if (send_set.empty()) {
160 0 : continue;
161 : }
162 :
163 901 : if (base_send_set.empty()) {
164 47 : base_send_set = send_set;
165 : }
166 :
167 : // Flush the message to all possible clients if:
168 : // 1. The buffer is full OR
169 : // 2. The send_set is changing and buffer is filled.
170 1745 : if (message_->IsFull() ||
171 844 : ((base_send_set != send_set) && !message_->IsEmpty())) {
172 :
173 65 : BitSet blocked_set;
174 65 : SendUpdate(base_send_set, &blocked_set);
175 65 : if (!blocked_set.empty()) {
176 : // All the clients in this marker are blocked. We are done.
177 0 : if (blocked_set == marker->mask) {
178 0 : IFMAP_UPD_SENDER_TRACE(IFMapUSAllBlocked, marker->ToString(),
179 : "blocked before", curr->ToString());
180 0 : queue_->MoveMarkerBefore(marker, curr);
181 0 : return;
182 : }
183 : // Only a subset of clients in this marker are blocked. Insert
184 : // a marker for them 'before' curr since they have seen
185 : // everything before curr. Let the ready clients continue the
186 : // traversal.
187 0 : IFMAP_UPD_SENDER_TRACE(IFMapUSSubsetBlocked, "Clients",
188 : blocked_set.ToNumberedString(), "blocked before",
189 : curr->ToString(), "and split from", marker->ToString());
190 0 : queue_->MarkerSplitBefore(marker, curr, blocked_set);
191 0 : send_set.Reset(blocked_set);
192 : }
193 :
194 : // The send_set for this marker is changing. Pick up the new one.
195 65 : base_send_set = send_set;
196 65 : }
197 :
198 : // base_send_set is same as send_set at this point.
199 901 : ProcessUpdate(update, base_send_set);
200 901 : }
201 :
202 : // The buffer will be filled in the common case of updates being added
203 : // after the tail_marker.
204 47 : BitSet blk_set;
205 47 : if (!message_->IsEmpty()) {
206 47 : SendUpdate(base_send_set, &blk_set);
207 : }
208 : // If the last node in the Q was the tail_marker, we would have already
209 : // flushed the buffer and merged with it and we would be the last node in
210 : // the Q.
211 47 : IFMapListEntry *last = queue_->GetLast();
212 47 : if (marker != last) {
213 : // Since we have reached the end of the Q, we better be the tail_marker
214 0 : assert(marker == queue_->tail_marker());
215 : // If we have any blocked clients, splitting markers for them is not
216 : // useful at this point. Just move the marker to the end of the Q,
217 : // immediately after last, even if it has blocked clients. Being lazy
218 : // is advantageous since by the time we get the next trigger, a blocked
219 : // client could have become ready and splitting the marker now would be
220 : // useless.
221 0 : IFMAP_UPD_SENDER_TRACE(IFMapUSMoveAfterLast, "Moving", marker->ToString(),
222 : "before", last->ToString(), "with blocked_set",
223 : blk_set.ToNumberedString());
224 0 : queue_->MoveMarkerAfter(marker, last);
225 : }
226 47 : return;
227 47 : }
228 :
229 901 : void IFMapUpdateSender::ProcessUpdate(IFMapUpdate *update,
230 : const BitSet &base_send_set) {
231 901 : LogAndCountSentUpdate(update, base_send_set);
232 :
233 : // Append the contents of the update-node to the message.
234 901 : message_->EncodeUpdate(update);
235 :
236 : // Clean up the node if everybody has seen it.
237 901 : update->AdvertiseReset(base_send_set);
238 901 : if (update->advertise().empty()) {
239 901 : queue_->Dequeue(update);
240 : }
241 : // Update may be freed.
242 1802 : server_->exporter()->StateUpdateOnDequeue(update, base_send_set,
243 901 : update->IsDelete());
244 901 : }
245 :
246 : // blocked_set is a subset of send_set
247 112 : void IFMapUpdateSender::SendUpdate(BitSet send_set, BitSet *blocked_set) {
248 : IFMapClient *client;
249 : bool send_result;
250 :
251 112 : assert(!message_->IsEmpty());
252 :
253 226 : for (size_t i = send_set.find_first(); i != BitSet::npos;
254 114 : i = send_set.find_next(i)) {
255 114 : assert(!send_blocked_.test(i));
256 114 : client = server_->GetClient(i);
257 114 : assert(client);
258 :
259 114 : message_->SetReceiverInMsg(client->identifier());
260 : // Close the message to save the document as string
261 114 : message_->Close();
262 :
263 : // Send the string version of the message to the client.
264 114 : send_result = client->SendUpdate(message_->get_string());
265 :
266 : // Keep track of all the clients whose buffers are full.
267 114 : if (!send_result) {
268 0 : blocked_set->set(i);
269 0 : send_blocked_.set(i);
270 : }
271 : }
272 : // Reset the message to init things for the next message
273 112 : message_->Reset();
274 112 : }
275 :
276 : // marker is before next_marker in the Q. next_marker could be the tail_marker.
277 : // 'done' is set to true only if all the clients in the union of the
278 : // client-sets of the 2 markers are blocked.
279 0 : IFMapMarker* IFMapUpdateSender::ProcessMarker(IFMapMarker *marker,
280 : IFMapMarker *next_marker,
281 : bool *done) {
282 : // There should never be a marker beyond the tail_marker
283 0 : assert(marker != queue_->tail_marker());
284 :
285 : // Get the union (total_set) of the client-sets in the 2 markers. Then, get
286 : // the subset of clients in the union that are blocked (blocked_set). The
287 : // remaining subset of clients are ready (ready_set).
288 0 : BitSet total_set = (marker->mask | next_marker->mask);
289 0 : BitSet blocked_set = (total_set & send_blocked_);
290 0 : BitSet ready_set;
291 0 : ready_set.BuildComplement(total_set, blocked_set); // *this = lhs & ~rhs
292 :
293 : // If all the clients are ready or all are blocked, merge marker into
294 : // next_marker. marker will be deleted.
295 0 : if (blocked_set.empty() || ready_set.empty()) {
296 0 : IFMAP_UPD_SENDER_TRACE(IFMapUSMarkerMerge, "Merging", marker->ToString(),
297 : "into", next_marker->ToString());
298 0 : queue_->MarkerMerge(next_marker, marker, marker->mask);
299 0 : assert(next_marker->mask == total_set);
300 : } else {
301 : // We have both, ready and blocked, clients. First, merge both the
302 : // markers into next_marker so that next_marker has the total_set. Then
303 : // split next_marker into 2 markers: first with the blocked_set and the
304 : // second with the ready_set, with first(blocked) preceding the
305 : // second(ready).
306 0 : IFMAP_UPD_SENDER_TRACE(IFMapUSMarkerMerge, "Merging", marker->ToString(),
307 : "into", next_marker->ToString());
308 0 : queue_->MarkerMerge(next_marker, marker, marker->mask);
309 0 : assert(next_marker->mask == total_set);
310 0 : IFMAP_UPD_SENDER_TRACE(IFMapUSMarkerSplit, "Splitting blocked clients",
311 : blocked_set.ToNumberedString(), "from", next_marker->ToString());
312 0 : queue_->MarkerSplitBefore(next_marker, next_marker, blocked_set);
313 : }
314 0 : if (ready_set.empty()) {
315 : // If all the clients are blocked, we are done.
316 0 : *done = true;
317 : } else {
318 : // Atleast some clients are ready to continue.
319 0 : *done = false;
320 : }
321 :
322 : // next_marker has the ready_set if done is false
323 0 : return next_marker;
324 0 : }
325 :
326 901 : void IFMapUpdateSender::LogAndCountSentUpdate(IFMapUpdate *update,
327 : const BitSet &base_send_set) {
328 901 : size_t total = base_send_set.count();
329 : // Avoid dealing with return value of BitSet::npos
330 901 : if (total) {
331 901 : string name = update->ConfigName();
332 901 : string operation = update->TypeToString();
333 901 : size_t client_id = base_send_set.find_first();
334 1804 : while (total--) {
335 903 : IFMapClient *client = server_->GetClient(client_id);
336 903 : if (client) {
337 903 : IFMAP_DEBUG_ONLY(IFMapClientSendInfo, operation, name,
338 : client->identifier(), client->name());
339 903 : if (update->IsNode()) {
340 435 : if (update->IsUpdate()) {
341 435 : client->incr_update_nodes_sent();
342 0 : } else if (update->IsDelete()) {
343 0 : client->incr_delete_nodes_sent();
344 : } else {
345 0 : assert(0);
346 : }
347 468 : } else if (update->IsLink()) {
348 468 : if (update->IsUpdate()) {
349 468 : client->incr_update_links_sent();
350 0 : } else if (update->IsDelete()) {
351 0 : client->incr_delete_links_sent();
352 : } else {
353 0 : assert(0);
354 : }
355 : }
356 : }
357 903 : client_id = base_send_set.find_next(client_id);
358 : }
359 901 : }
360 901 : }
361 :
|