Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "bgp/bgp_ribout_updates.h"
6 :
7 : #include <string>
8 :
9 : #include "sandesh/sandesh_trace.h"
10 : #include "base/task_annotations.h"
11 : #include "bgp/bgp_log.h"
12 : #include "bgp/bgp_peer_types.h"
13 : #include "bgp/bgp_ribout.h"
14 : #include "bgp/bgp_route.h"
15 : #include "bgp/bgp_update_queue.h"
16 : #include "bgp/bgp_update_monitor.h"
17 : #include "bgp/bgp_update_sender.h"
18 : #include "bgp/message_builder.h"
19 :
20 : using std::unique_ptr;
21 : using std::vector;
22 :
23 : vector<Message *> RibOutUpdates::bgp_messages_;
24 : vector<Message *> RibOutUpdates::xmpp_messages_;
25 :
26 : //
27 : // Create a new RibOutUpdates. Also create the necessary UpdateQueue and
28 : // add them to the vector.
29 : //
30 135680 : RibOutUpdates::RibOutUpdates(RibOut *ribout, int index)
31 135680 : : ribout_(ribout),
32 135680 : index_(index) {
33 407040 : for (int i = 0; i < QCOUNT; i++) {
34 271360 : UpdateQueue *queue = new UpdateQueue(ribout, i);
35 271360 : queue_vec_.push_back(queue);
36 : }
37 135680 : monitor_.reset(new RibUpdateMonitor(ribout, &queue_vec_));
38 135680 : memset(&stats_, 0, sizeof(stats_));
39 135680 : }
40 :
41 : //
42 : // Destructor. Get rid of all the UpdateQueues.
43 : //
44 271060 : RibOutUpdates::~RibOutUpdates() {
45 135680 : STLDeleteValues(&queue_vec_);
46 271060 : }
47 :
48 : //
49 : // Initialize static vectors of bgp/xmpp message pointers to NULL.
50 : //
51 158 : void RibOutUpdates::Initialize() {
52 158 : bgp_messages_.resize(DB::PartitionCount(), NULL);
53 158 : xmpp_messages_.resize(DB::PartitionCount(), NULL);
54 158 : }
55 :
56 : //
57 : // Free any memory allocated for bgp/xmpp messages.
58 : //
59 62 : void RibOutUpdates::Terminate() {
60 62 : STLDeleteValues(&bgp_messages_);
61 62 : STLDeleteValues(&xmpp_messages_);
62 62 : }
63 :
64 : //
65 : // Create if needed, and return the bgp/xmpp message for this RibOutUpdates.
66 : // Note that we use static vectors of bgp/xmpp messages, one per partition,
67 : // so that we don't need to allocate and free messages repeatedly.
68 : //
69 616133 : Message *RibOutUpdates::GetMessage() const {
70 616133 : if (ribout_->IsEncodingBgp()) {
71 159015 : if (!bgp_messages_[index_]) {
72 : MessageBuilder *builder =
73 154 : MessageBuilder::GetInstance(RibExportPolicy::BGP);
74 154 : Message *message = builder->Create();
75 154 : bgp_messages_[index_] = message;
76 : }
77 159015 : return bgp_messages_[index_];
78 : }
79 457126 : if (ribout_->IsEncodingXmpp()) {
80 457175 : if (!xmpp_messages_[index_]) {
81 : MessageBuilder *builder =
82 144 : MessageBuilder::GetInstance(RibExportPolicy::XMPP);
83 144 : Message *message = builder->Create();
84 144 : xmpp_messages_[index_] = message;
85 : }
86 457172 : return xmpp_messages_[index_];
87 : }
88 0 : return NULL;
89 : }
90 :
91 : //
92 : // Concurrency: Called in the context of the routing table partition task.
93 : //
94 : // Enqueue the RouteUpdate corresponding to the DBEntryBase into the queue.
95 : // This is called in the context of the routing table partition task. All
96 : // the concurrency issues are handled by going through the monitor.
97 : //
98 : // If the UpdateQueue corresponding to the RouteUpdate previously had no
99 : // updates after the tail marker, we kick the BgpUpdateSender to perform
100 : // a tail dequeue for the RibOut.
101 : //
102 900148 : void RibOutUpdates::Enqueue(DBEntryBase *db_entry, RouteUpdate *rt_update) {
103 900148 : CHECK_CONCURRENCY("db::DBTable");
104 :
105 900083 : bool need_tail_dequeue = monitor_->EnqueueUpdate(db_entry, rt_update);
106 900370 : if (need_tail_dequeue) {
107 536209 : ribout_->sender()->RibOutActive(index_, ribout_, rt_update->queue_id());
108 : }
109 900448 : }
110 :
111 : //
112 : // Concurrency: Called in the context of the bgp::SendUpdate task.
113 : //
114 : // Common dequeue routine invoked by tail dequeue and peer dequeue. It builds
115 : // and sends updates for each UpdateInfo element in the list hanging off the
116 : // RouteUpdate. For each update that it builds, it also includes prefixes for
117 : // other UpdateInfo elements that share the same attributes, provided that
118 : // the associated RouteUpdate was enqueued after the original one.
119 :
120 : // Each update is targeted at the peers in the RibPeerSet of the UpdateMarker
121 : // passed in to us. This set of peers is subsequently culled based on the
122 : // RibPeerSet in each UpdateInfo. IOW, the update is sent only to the set
123 : // of peers in the intersection of the UpdateMarker and the UpdateInfo. Note
124 : // that the UpdateMarker could specify a single peer if we are called from
125 : // peer dequeue.
126 : //
127 : // Return false if all the peers in the marker get blocked. In any case, the
128 : // blocked parameter is populated with the set of peers that are send blocked.
129 : //
130 627406 : bool RibOutUpdates::DequeueCommon(UpdateQueue *queue, UpdateMarker *marker,
131 : RouteUpdate *rt_update, RibPeerSet *blocked) {
132 627406 : CHECK_CONCURRENCY("bgp::SendUpdate");
133 :
134 : // Pass a hint to the Message telling it whether it needs to cache the
135 : // formatted version of each route. This is used only for xmpp messages.
136 : // Heuristic is to cache if there's markers other than the tail marker.
137 : // The reasoning is that the cached version can be used later when the
138 : // markers in question are being processed. Put another way - there's
139 : // no need to cache if route is not going to be advertised to any peers
140 : // other than the ones in the given UpdateMarker.
141 627444 : bool cache_routes = queue->marker_count() != 0;
142 :
143 : // Go through all UpdateInfo elements for the RouteUpdate.
144 627445 : int queue_id = rt_update->queue_id();
145 627441 : RibPeerSet rt_blocked;
146 627442 : for (UpdateInfoSList::List::iterator iter = rt_update->Updates()->begin();
147 2518825 : iter != rt_update->Updates()->end();) {
148 : // Get the UpdateInfo and move the iterator to next one before doing
149 : // any processing, since we may delete the UpdateInfo further down.
150 631901 : UpdateInfo *uinfo = iter.operator->();
151 : ++iter;
152 :
153 : // Skip if there's no overlap between the UpdateMarker and the targets
154 : // for the UpdateInfo. The intersection is the set of peers to which
155 : // the message we are about to build will be sent.
156 631901 : RibPeerSet msgset;
157 631926 : msgset.BuildIntersection(uinfo->target, marker->members);
158 631807 : if (msgset.empty()) {
159 15665 : continue;
160 : }
161 :
162 : // Generate the update, merge additional updates into that message and
163 : // send it message to the target RibPeerSet.
164 : //
165 : // In the rare case that the first route and it's attributes don't fit
166 : // into the message, clear the target bits in the UpdateInfo to ensure
167 : // that the UpdateQueue doesn't get wedged. However, don't update the
168 : // history bits in the RouteUpdate since the message did not get sent.
169 : //
170 : // The Create routine has the responsibility of logging an error and
171 : // incrementing any counters.
172 616131 : RibPeerSet msg_blocked;
173 616127 : stats_[queue_id].messages_built_count_++;
174 616127 : Message *message = GetMessage();
175 616110 : assert(message);
176 1848510 : bool msg_built = message->Start(
177 616110 : ribout_, cache_routes, &uinfo->roattr, rt_update->route());
178 616290 : if (msg_built) {
179 616186 : UpdatePack(queue_id, message, uinfo, msgset);
180 616221 : message->Finish();
181 616213 : UpdateSend(queue_id, message, msgset, &msg_blocked);
182 : }
183 :
184 : // Reset bits in the UpdateInfo. Note that this has already been done
185 : // via UpdatePack for all the other UpdateInfo elements that we packed
186 : // into this message.
187 616376 : bool empty = ClearAdvertisedBits(rt_update, uinfo, msgset, msg_built);
188 616369 : if (empty) {
189 615539 : rt_update->RemoveUpdateInfo(uinfo);
190 : }
191 :
192 : // Update RibPeerSet of peers that got blocked while processing this
193 : // RouteUpdate. Since there's no overlap of peers between UpdateInfos
194 : // for the same RouteUpdate, we can update the markers for all blocked
195 : // peers in one shot i.e. outside this loop.
196 616358 : rt_blocked |= msg_blocked;
197 632010 : }
198 :
199 : // Update the markers for any peers that got blocked while processing this
200 : // RouteUpdate. If all peers in the UpdateMarker got blocked, we shouldn't
201 : // build any more update messages. Return false to let the callers know
202 : // that this has happened.
203 627451 : if (rt_blocked.empty()) {
204 625789 : return true;
205 : } else {
206 1753 : *blocked |= rt_blocked;
207 1755 : return !UpdateMarkersOnBlocked(marker, rt_update, &rt_blocked);
208 : }
209 627545 : }
210 :
211 : //
212 : // Concurrency: Called in the context of the bgp::SendUpdate task.
213 : //
214 : // Dequeue and build updates for the in-sync peers in the RibPeerSet of the
215 : // tail marker for the given queue id.
216 : //
217 : // Return false if all the peers in the marker get blocked. In any case, the
218 : // blocked parameter is populated with the set of peers that are send blocked
219 : // and the unsync parameter is populated with the set of peers from the tail
220 : // marker that are not in the msync set passed in to the method.
221 : //
222 615499 : bool RibOutUpdates::TailDequeue(int queue_id, const RibPeerSet &msync,
223 : RibPeerSet *blocked, RibPeerSet *unsync) {
224 615499 : CHECK_CONCURRENCY("bgp::SendUpdate");
225 :
226 615587 : stats_[queue_id].tail_dequeue_count_++;
227 615587 : UpdateQueue *queue = queue_vec_[queue_id];
228 615584 : UpdateMarker *start_marker = queue->tail_marker();
229 615583 : RouteUpdatePtr update = monitor_->GetNextUpdate(queue_id, start_marker);
230 :
231 615621 : if (update.get() == NULL) {
232 1476 : return true;
233 : }
234 :
235 : // Intersect marker membership and in-sync peers to come up with the
236 : // unsync peers. If all the peers are unsync return right away. The
237 : // BgpUpdateSender will take care of triggering a TailDequeue again
238 : // when at least one peer becomes in-sync.
239 614145 : unsync->BuildComplement(start_marker->members, msync);
240 614097 : if (*unsync == start_marker->members) {
241 4907 : return false;
242 : }
243 :
244 : // Split the unsync peers from the tail marker. Note that this updates
245 : // the RibPeerSet in the tail marker.
246 609186 : if (!unsync->empty()) {
247 2655 : stats_[queue_id].marker_split_count_++;
248 2655 : queue->MarkerSplit(start_marker, *unsync);
249 : }
250 :
251 : // Update send loop. Select next update to send, format a message.
252 : // Add other updates with the same attributes and replicate the
253 : // packet.
254 609186 : RibPeerSet members = start_marker->members;
255 609179 : RouteUpdatePtr next_update;
256 1235160 : for (; update.get() != NULL; update = next_update) {
257 626987 : if (!DequeueCommon(queue, start_marker, update.get(), blocked)) {
258 : // Be sure to get rid of the RouteUpdate if it's empty.
259 1036 : if (update->empty()) {
260 921 : ClearUpdate(&update);
261 : }
262 :
263 1036 : return false;
264 : }
265 :
266 : // Iterate to the next update before we potentially delete the
267 : // current one. If there are no more updates in the queue, the
268 : // marker will get moved so that it's after the current update.
269 626069 : next_update = monitor_->GetNextUpdate(queue_id, update.get());
270 :
271 : // Be sure to get rid of the RouteUpdate if it's empty.
272 626050 : if (update->empty()) {
273 609879 : ClearUpdate(&update);
274 : }
275 : }
276 :
277 : // Request peers to flush accumulated update messages.
278 : // Return false if all peers got blocked.
279 608102 : UpdateFlush(members, blocked);
280 608232 : return (members != *blocked);
281 615650 : }
282 :
283 : //
284 : // Concurrency: Called in the context of the bgp::SendUpdate task.
285 : //
286 : // Dequeue and build updates for all the peers that share the same marker as
287 : // the specified peer. This routine has some extra intelligence beyond the
288 : // TailDequeue. As it encounters update markers, it merges in any send ready
289 : // peers from those with the marker being processed for dequeue. This is done
290 : // to reduce the number of times we build an update message containing the
291 : // the same information.
292 : //
293 : // Return false if all the peers in the marker get blocked. In any case, the
294 : // blocked parameter is populated with the set of peers that are send blocked.
295 : //
296 299 : bool RibOutUpdates::PeerDequeue(int queue_id, IPeerUpdate *peer,
297 : RibPeerSet *blocked) {
298 299 : CHECK_CONCURRENCY("bgp::SendUpdate");
299 :
300 299 : stats_[queue_id].peer_dequeue_count_++;
301 299 : UpdateQueue *queue = queue_vec_[queue_id];
302 299 : int peer_idx = ribout_->GetPeerIndex(peer);
303 299 : UpdateMarker *start_marker = queue->GetMarker(peer_idx);
304 :
305 : // We're done if this is the same as the tail marker. Updates will be
306 : // built subsequently via TailDequeue.
307 299 : assert(start_marker);
308 299 : if (start_marker == queue->tail_marker()) {
309 209 : return true;
310 : }
311 :
312 : // We're done if the lead peer is not send ready. This can happen if
313 : // the peer got blocked when processing updates in another partition.
314 90 : RibPeerSet mready;
315 90 : ribout_->BuildSendReadyBitSet(start_marker->members, &mready);
316 90 : if (!mready.test(peer_idx)) {
317 0 : blocked->set(peer_idx);
318 0 : return false;
319 : }
320 :
321 : // Split out any peers from the marker that are not send ready. Note that
322 : // this updates the RibPeerSet in the marker.
323 90 : RibPeerSet notready;
324 90 : notready.BuildComplement(start_marker->members, mready);
325 90 : if (!notready.empty()) {
326 3 : stats_[queue_id].marker_split_count_++;
327 3 : queue->MarkerSplit(start_marker, notready);
328 : }
329 :
330 : // Get the encapsulator for the first RouteUpdate. Even if there's no
331 : // RouteUpdate, we should find another marker or the tail marker.
332 : UpdateEntry *upentry;
333 : RouteUpdatePtr update =
334 90 : monitor_->GetNextEntry(queue_id, start_marker, &upentry);
335 90 : assert(upentry);
336 :
337 : // Update loop. Keep going till we reach the tail marker or till all the
338 : // peers get blocked.
339 90 : RibPeerSet members = start_marker->members;
340 90 : RouteUpdatePtr next_update;
341 : UpdateEntry *next_upentry;
342 507 : for (; upentry != NULL; upentry = next_upentry, update = next_update) {
343 507 : UpdateMarker *marker = NULL;
344 507 : if (upentry->IsMarker()) {
345 : // The queue entry is a marker. We're done if we've reached the
346 : // tail marker. Updates will be built later via TailDequeue.
347 88 : marker = static_cast<UpdateMarker *>(upentry);
348 88 : if (marker == queue->tail_marker()) {
349 75 : stats_[queue_id].marker_merge_count_++;
350 75 : queue->MarkerMerge(queue->tail_marker(), start_marker,
351 75 : start_marker->members);
352 75 : break;
353 : }
354 : } else {
355 : // The queue entry is a RouteUpdate. Go ahead and build an update
356 : // message. Bail if all the peers in the marker get blocked.
357 419 : if (!DequeueCommon(queue, start_marker, update.get(), blocked)) {
358 : // Be sure to get rid of the RouteUpdate if it's empty.
359 15 : if (update->empty()) {
360 14 : ClearUpdate(&update);
361 : }
362 15 : break;
363 : }
364 : }
365 :
366 : // Iterate to the next element before we potentially delete the
367 : // current one.
368 : next_update =
369 417 : monitor_->GetNextEntry(queue_id, upentry, &next_upentry);
370 :
371 417 : if (upentry->IsMarker()) {
372 : // As the entry is a marker, merge send-ready peers from it
373 : // with the marker that is being processed for dequeue. Note
374 : // that this updates the RibPeerSet in the marker.
375 13 : RibPeerSet mmove;
376 13 : ribout_->BuildSendReadyBitSet(marker->members, &mmove);
377 13 : if (!mmove.empty()) {
378 9 : stats_[queue_id].marker_merge_count_++;
379 9 : queue->MarkerMerge(start_marker, marker, mmove);
380 9 : members |= mmove;
381 : }
382 417 : } else if (update->empty()) {
383 : // Be sure to get rid of the RouteUpdate since it's empty.
384 359 : ClearUpdate(&update);
385 : }
386 : }
387 :
388 : // Request peers to flush accumulated update messages.
389 : // Return false if all peers got blocked.
390 90 : UpdateFlush(members, blocked);
391 90 : return (members != *blocked);
392 90 : }
393 :
394 : //
395 : // Concurrency: Called in the context of the bgp::SendUpdate task.
396 : //
397 : // Go through all the UpdateInfo elements that have the same attribute as
398 : // the start parameter and pack the corresponding prefixes into the Message.
399 : // The attributes and the prefix associated with start are already in the
400 : // Message when this method is invoked.
401 : //
402 : // The set of peers for which this update is being built is represented by
403 : // the msgset parameter. As the msgset has already been determined by the
404 : // caller, we should only add prefixes that need to go to all the peers in
405 : // the msgset.
406 : //
407 616167 : void RibOutUpdates::UpdatePack(int queue_id, Message *message,
408 : UpdateInfo *start_uinfo, const RibPeerSet &msgset) {
409 616167 : CHECK_CONCURRENCY("bgp::SendUpdate");
410 :
411 : UpdateInfo *uinfo, *next_uinfo;
412 616121 : RouteUpdatePtr next_update;
413 :
414 : // Walk through all the UpdateInfo elements with the same attribute in
415 : // enqueue order.
416 : RouteUpdatePtr update =
417 616117 : monitor_->GetAttrNext(queue_id, start_uinfo, &uinfo);
418 927211 : for (; update.get() != NULL; update = next_update, uinfo = next_uinfo) {
419 : // Iterate to the next element before we potentially delete the
420 : // current one.
421 311247 : next_update = monitor_->GetAttrNext(queue_id, uinfo, &next_uinfo);
422 :
423 : // Skip if the msgset RibPeerSet is not a subset of the target in
424 : // UpdateInfo.
425 311194 : if (!uinfo->target.Contains(msgset))
426 3817 : continue;
427 :
428 : // Go ahead and add the route to the message. Terminate the loop
429 : // if the message doesn't have room for the route. The route will
430 : // get included in another update message.
431 307361 : bool success = message->AddRoute(update->route(), &uinfo->roattr);
432 307430 : if (!success) {
433 255 : break;
434 : }
435 :
436 : // First clear the advertised bits as represented by msgset from
437 : // the target RibPeerSet in the UpdateInfo. If the target is now
438 : // empty, remove the UpdateInfo from the list container in the
439 : // underlying RouteUpdate.
440 : //
441 : // If the RouteUpdate itself is now empty i.e. there are no more
442 : // UpdateInfo elements associated with it, we can get rid of it.
443 307175 : bool empty = ClearAdvertisedBits(update.get(), uinfo, msgset, true);
444 307152 : if (empty && update->RemoveUpdateInfo(uinfo)) {
445 306385 : ClearUpdate(&update);
446 : }
447 : }
448 616180 : }
449 :
450 : //
451 : // Concurrency: Called in the context of the bgp::SendUpdate task.
452 : //
453 : // Go through all the peers in the specified RibPeerSet and send the given
454 : // message to each of them. Update the blocked RibPeerSet with peers that
455 : // become blocked after sending the message.
456 : //
457 616184 : void RibOutUpdates::UpdateSend(int queue_id, Message *message,
458 : const RibPeerSet &dst, RibPeerSet *blocked) {
459 616184 : CHECK_CONCURRENCY("bgp::SendUpdate");
460 :
461 616143 : RibOut::PeerIterator iter(ribout_, dst);
462 2360372 : while (iter.HasNext()) {
463 1744036 : int ix_current = iter.index();
464 1744009 : IPeerUpdate *peer = iter.Next();
465 1743635 : size_t msgsize = 0;
466 1743635 : const string *msg_str = NULL;
467 1743635 : string temp;
468 1743624 : const uint8_t *data = message->GetData(peer, &msgsize, &msg_str, &temp);
469 1743731 : if (Sandesh::LoggingLevel() >= Sandesh::LoggingUtLevel()) {
470 2113210 : BGP_LOG_PEER(Message, peer, Sandesh::LoggingUtLevel(),
471 : BGP_LOG_FLAG_SYSLOG, BGP_PEER_DIR_OUT,
472 : "Update size " << msgsize <<
473 : " reach " << message->num_reach_routes() <<
474 : " unreach " << message->num_unreach_routes());
475 : }
476 1744117 : stats_[queue_id].messages_sent_count_++;
477 1744117 : stats_[queue_id].reach_count_ += message->num_reach_routes();
478 1744086 : stats_[queue_id].unreach_count_ += message->num_unreach_routes();
479 1744077 : bool more = peer->SendUpdate(data, msgsize, msg_str);
480 1744169 : if (!more) {
481 3857 : blocked->set(ix_current);
482 : }
483 1744205 : IPeer *ipeer = dynamic_cast<IPeer *>(peer);
484 1744205 : if (!ipeer) {
485 10472 : continue;
486 : }
487 1733733 : IPeerDebugStats *stats = ipeer->peer_stats();
488 1733708 : if (stats) {
489 1733725 : stats->UpdateTxReachRoute(message->num_reach_routes());
490 1733792 : stats->UpdateTxUnreachRoute(message->num_unreach_routes());
491 : }
492 1744272 : }
493 616187 : }
494 :
495 : //
496 : // Concurrency: Called in the context of the bgp::SendUpdate task.
497 : //
498 : // Go through all the peers in the specified RibPeerSet and ask them to flush
499 : // i.e. send immediately, any accumulated updates. Update blocked RibPeerSet
500 : // with peers that become blocked after flushing.
501 : //
502 : // Skip if the RibOut is XMPP.
503 : //
504 608251 : void RibOutUpdates::UpdateFlush(const RibPeerSet &dst, RibPeerSet *blocked) {
505 608251 : CHECK_CONCURRENCY("bgp::SendUpdate");
506 :
507 608288 : if (ribout_->IsEncodingXmpp())
508 465652 : return;
509 :
510 142637 : RibOut::PeerIterator iter(ribout_, dst);
511 300726 : while (iter.HasNext()) {
512 158048 : int ix_current = iter.index();
513 158050 : IPeerUpdate *peer = iter.Next();
514 158048 : bool more = peer->FlushUpdate();
515 158063 : if (!more) {
516 19 : blocked->set(ix_current);
517 : }
518 : }
519 : }
520 :
521 : //
522 : // Take the AdvertisedInfo history in the RouteUpdate and move it to a new
523 : // RouteState. Go through the monitor to associate the new RouteState as the
524 : // listener state for the Route.
525 : //
526 583696 : void RibOutUpdates::StoreHistory(RouteUpdate *rt_update) {
527 583696 : CHECK_CONCURRENCY("bgp::SendUpdate");
528 :
529 583670 : BgpRoute *route = rt_update->route();
530 583668 : RouteState *rstate = new RouteState();
531 583655 : rt_update->MoveHistory(rstate);
532 583671 : monitor_->SetEntryState(route, rstate);
533 583755 : }
534 :
535 : //
536 : // Go through the monitor to clear the listener state for the underlying Route
537 : // of the RouteUpdate.
538 : //
539 333640 : void RibOutUpdates::ClearState(RouteUpdate *rt_update) {
540 333640 : CHECK_CONCURRENCY("bgp::SendUpdate");
541 :
542 333604 : BgpRoute *route = rt_update->route();
543 333601 : monitor_->ClearEntryState(route);
544 333762 : }
545 :
546 : //
547 : // Called when the RouteUpdate encapsulated by the RouteUpdatePtr has no more
548 : // UpdateInfo elements. Releases ownership of the RouteUpdate and deletes the
549 : // RouteUpdate, as well as any associated UpdateList if appropriate.
550 : //
551 917551 : void RibOutUpdates::ClearUpdate(RouteUpdatePtr *update) {
552 917551 : CHECK_CONCURRENCY("bgp::SendUpdate");
553 :
554 : // Dequeue the route update.
555 917405 : RouteUpdate *rt_update = update->get();
556 917398 : monitor_->DequeueUpdate(rt_update);
557 :
558 917511 : if (rt_update->OnUpdateList()) {
559 : // Remove the route update from the update list and check if the list
560 : // can now be downgraded back to a route update.
561 110 : UpdateList *uplist = rt_update->GetUpdateList(ribout_);
562 110 : uplist->RemoveUpdate(rt_update);
563 110 : RouteUpdate *last_rt_update = uplist->MakeRouteUpdate();
564 :
565 : // If we were able to downgrade, set the DBEntry to point to the last
566 : // remaining route update and get rid of the current route update and
567 : // the update list. Otherwise, just get rid of the route update.
568 110 : if (last_rt_update) {
569 110 : monitor_->SetEntryState(rt_update->route(), last_rt_update);
570 110 : update->release();
571 110 : delete rt_update;
572 110 : delete uplist;
573 : } else {
574 0 : update->release();
575 0 : delete rt_update;
576 : }
577 : } else {
578 : // Store the history from the route update or clear the state for the
579 : // DBEntry depending on whether we advertised the route. In either
580 : // case, get rid of the route update.
581 917396 : if (rt_update->IsAdvertised()) {
582 583704 : StoreHistory(rt_update);
583 : } else {
584 333623 : ClearState(rt_update);
585 : }
586 917506 : update->release();
587 917436 : delete rt_update;
588 : }
589 917437 : }
590 :
591 : //
592 : // Clear the advertised bits specified by isect from the target RibPeerSet in
593 : // the UpdateInfo. If the target is now empty, remove the UpdateInfo from the
594 : // set container in the UpdateQueue. Note that the UpdateInfo will still be
595 : // on the SList in the RouteUpdate.
596 : //
597 : // Return true if the UpdateInfo was removed from the set container.
598 : //
599 923547 : bool RibOutUpdates::ClearAdvertisedBits(RouteUpdate *rt_update,
600 : UpdateInfo *uinfo, const RibPeerSet &isect, bool update_history) {
601 923547 : CHECK_CONCURRENCY("bgp::SendUpdate");
602 :
603 923408 : if (update_history) {
604 923296 : rt_update->UpdateHistory(ribout_, &uinfo->roattr, isect);
605 : }
606 923412 : uinfo->target.Reset(isect);
607 923350 : bool empty = uinfo->target.empty();
608 923339 : if (empty) {
609 922225 : UpdateQueue *queue = queue_vec_[rt_update->queue_id()];
610 922218 : queue->AttrDequeue(uinfo);
611 : }
612 923491 : return empty;
613 : }
614 :
615 : //
616 : // Concurrency: Called in the context of the bgp::SendUpdate task.
617 : //
618 : // Update the markers for all the peers in the blocked RibPeerSet. In the
619 : // general case we clear the blocked RibPeerSet from the UpdateMarker and
620 : // create a new UpdateMarker for the blocked peers.
621 : //
622 : // Return true in the special case where all peers in the UpdateMarker have
623 : // become blocked.
624 : //
625 1755 : bool RibOutUpdates::UpdateMarkersOnBlocked(UpdateMarker *marker,
626 : RouteUpdate *rt_update,
627 : const RibPeerSet *blocked) {
628 1755 : CHECK_CONCURRENCY("bgp::SendUpdate");
629 :
630 1756 : assert(!blocked->empty());
631 1756 : int queue_id = rt_update->queue_id();
632 1756 : UpdateQueue *queue = queue_vec_[queue_id];
633 :
634 : // If all the peers in the UpdateMarker are blocked, we simply move the
635 : // marker after the RouteUpdate.
636 1756 : if (marker->members == *blocked) {
637 1051 : stats_[queue_id].marker_move_count_++;
638 1051 : queue->MoveMarker(marker, rt_update);
639 1051 : return true;
640 : }
641 :
642 : // Reset bits in the specified UpdateMarker, create a new one for the
643 : // blocked peers and insert the new one after the RouteUpdate.
644 705 : marker->members.Reset(*blocked);
645 705 : assert(!marker->members.empty());
646 705 : UpdateMarker *new_marker = new UpdateMarker();
647 705 : new_marker->members = *blocked;
648 705 : stats_[queue_id].marker_split_count_++;
649 705 : queue->AddMarker(new_marker, rt_update);
650 :
651 705 : return false;
652 : }
653 :
654 8 : bool RibOutUpdates::Empty() const {
655 24 : for (int i = 0; i < RibOutUpdates::QCOUNT; ++i) {
656 16 : UpdateQueue *queue = queue_vec_[i];
657 16 : if (!queue->empty()) {
658 0 : return false;
659 : }
660 : }
661 8 : return true;
662 : }
663 :
664 13494 : size_t RibOutUpdates::queue_size(int queue_id) const {
665 13494 : const UpdateQueue *queue = queue_vec_[queue_id];
666 13494 : return queue->size();
667 : }
668 :
669 13494 : size_t RibOutUpdates::queue_marker_count(int queue_id) const {
670 13494 : const UpdateQueue *queue = queue_vec_[queue_id];
671 13494 : return queue->marker_count();
672 : }
673 :
674 735010 : bool RibOutUpdates::QueueJoin(int queue_id, int bit) {
675 735010 : UpdateQueue *queue = queue_vec_[queue_id];
676 735010 : return queue->Join(bit);
677 : }
678 :
679 735010 : void RibOutUpdates::QueueLeave(int queue_id, int bit) {
680 735010 : UpdateQueue *queue = queue_vec_[queue_id];
681 735010 : queue->Leave(bit);
682 735010 : }
683 :
684 : //
685 : // Add statistics information to the provided Stats structure.
686 : //
687 16 : void RibOutUpdates::AddStatisticsInfo(int queue_id, Stats *stats) const {
688 16 : stats->messages_built_count_ += stats_[queue_id].messages_built_count_;
689 16 : stats->messages_sent_count_ += stats_[queue_id].messages_sent_count_;
690 16 : stats->reach_count_ += stats_[queue_id].reach_count_;
691 16 : stats->unreach_count_ += stats_[queue_id].unreach_count_;
692 16 : stats->tail_dequeue_count_ += stats_[queue_id].tail_dequeue_count_;
693 16 : stats->peer_dequeue_count_ += stats_[queue_id].peer_dequeue_count_;
694 16 : stats->marker_split_count_ += stats_[queue_id].marker_split_count_;
695 16 : stats->marker_merge_count_ += stats_[queue_id].marker_merge_count_;
696 16 : stats->marker_move_count_ += stats_[queue_id].marker_move_count_;
697 16 : }
|