LCOV - code coverage report
Current view: top level - ifmap - ifmap_update_sender.cc (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 121 183 66.1 %
Date: 2026-06-22 02:21:21 Functions: 13 16 81.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : 
       5             : #include "ifmap/ifmap_update_sender.h"
       6             : #include "base/task.h"
       7             : #include "ifmap/ifmap_client.h"
       8             : #include "ifmap/ifmap_server.h"
       9             : #include "ifmap/ifmap_exporter.h"
      10             : #include "ifmap/ifmap_log.h"
      11             : #include "ifmap/ifmap_log_types.h"
      12             : #include "ifmap/ifmap_update.h"
      13             : #include "ifmap/ifmap_update_queue.h"
      14             : 
      15             : using namespace std;
      16             : 
      17         191 : IFMapUpdateSender::IFMapUpdateSender(IFMapServer *server,
      18         191 :                                      IFMapUpdateQueue *queue)
      19         191 :     : server_(server), queue_(queue), message_(new IFMapMessage()),
      20         382 :       task_scheduled_(false), queue_active_(false) {
      21         191 : }
      22             : 
      23         370 : IFMapUpdateSender::~IFMapUpdateSender() {
      24         191 :     delete(message_);
      25         370 : }
      26             : 
      27             : class IFMapUpdateSender::SendTask : public Task {
      28             : public:
      29          47 :     explicit SendTask(IFMapUpdateSender *sender)
      30          47 :         : Task(TaskScheduler::GetInstance()->GetTaskId("db::IFMapTable"), 0),
      31          47 :           sender_(sender) {
      32          47 :     }
      33          47 :     virtual bool Run() {
      34          47 :         BitSet send_scheduled;
      35          47 :         sender_->GetSendScheduled(&send_scheduled);
      36          47 :         sender_->send_blocked_.Reset(send_scheduled);
      37          47 :         for (size_t i = send_scheduled.find_first(); i != BitSet::npos;
      38           0 :              i = send_scheduled.find_next(i)) {
      39             :             // Dequeue from client marker (i).
      40           0 :             IFMAP_UPD_SENDER_TRACE(IFMapUSSendScheduled, "Send scheduled for",
      41             :                 send_scheduled.ToNumberedString(), "client", i,
      42             :                 sender_->queue_->GetMarker(i)->ToString());
      43           0 :             sender_->Send(sender_->queue_->GetMarker(i));
      44             :         }
      45          47 :         if (sender_->queue_active_) {
      46             :             // Dequeue from tail marker.
      47             :             // Reset queue_active_
      48          47 :             IFMAP_UPD_SENDER_TRACE(IFMapUSQueueActive, "Queue active for",
      49             :                 sender_->queue_->tail_marker()->ToString());
      50          47 :             sender_->Send(sender_->queue_->tail_marker());
      51          47 :             sender_->queue_active_ = false;
      52             :         }
      53          47 :         return true;
      54          47 :     }
      55             : 
      56           0 :     std::string Description() const { return "IFMapUpdateSender::SendTask"; }
      57             : private:
      58             :     IFMapUpdateSender *sender_;
      59             : };
      60             : 
      61          47 : void IFMapUpdateSender::StartTask() {
      62          47 :     if (!task_scheduled_) {
      63             :         // create new task
      64          47 :         SendTask *send_task = new SendTask(this);
      65          47 :         TaskScheduler *scheduler = TaskScheduler::GetInstance();
      66          47 :         scheduler->Enqueue(send_task);
      67          47 :         task_scheduled_ = true;
      68             :     }
      69          47 : }
      70             : 
      71        1013 : void IFMapUpdateSender::QueueActive() {
      72        1013 :     if (queue_active_) {
      73         966 :         return;
      74             :     }
      75          47 :     queue_active_ = true;
      76          47 :     std::scoped_lock lock(mutex_);
      77          47 :     StartTask();
      78          47 : }
      79             : 
      80           0 : void IFMapUpdateSender::SendActive(int index) {
      81           0 :     std::scoped_lock lock(mutex_);
      82           0 :     send_scheduled_.set(index);
      83           0 :     StartTask();
      84           0 : }
      85             : 
      86          47 : void IFMapUpdateSender::GetSendScheduled(BitSet *current) {
      87          47 :     std::scoped_lock lock(mutex_);
      88          47 :     *current = send_scheduled_;
      89          47 :     send_scheduled_.clear();
      90          47 :     task_scheduled_ = false;
      91          47 : }
      92             : 
      93           6 : void IFMapUpdateSender::CleanupClient(int index) {
      94           6 :     std::scoped_lock lock(mutex_);
      95           6 :     send_scheduled_.reset(index);
      96           6 :     send_blocked_.reset(index);
      97           6 : }
      98             : 
      99             : // We return only under 2 conditions:
     100             : // 1. All the clients in the marker are blocked.
     101             : // 2. We have finished traversing the Q.
     102             : // Invariant: while we are traversing the Q, the marker that we are working
     103             : // with only has ready clients. As soon as a client blocks, we split it out and
     104             : // continue with the ready set.
     105          47 : void IFMapUpdateSender::Send(IFMapMarker *imarker) {
     106          47 :     IFMapMarker *marker = imarker;
     107             : 
     108             :     // Get the clients in this marker that are blocked. If all of the clients in
     109             :     // this marker are blocked, we are done.
     110          47 :     BitSet blocked_clients;
     111          47 :     blocked_clients = (marker->mask & send_blocked_);
     112          47 :     if (blocked_clients == marker->mask) {
     113           0 :         return;
     114             :     }
     115             : 
     116             :     // If any of the clients are blocked, create a new marker for the set of
     117             :     // blocked clients, insert it before marker and continue with the ready
     118             :     // set.
     119          47 :     if (!blocked_clients.empty()) {
     120           0 :         IFMAP_UPD_SENDER_TRACE(IFMapUSSplitBlocked, "Splitting blocked clients",
     121             :             blocked_clients.ToNumberedString(), "from", marker->ToString());
     122           0 :         queue_->MarkerSplitBefore(marker, marker, blocked_clients);
     123             :     }
     124             : 
     125          47 :     IFMapListEntry *next = queue_->Next(marker);
     126          47 :     BitSet base_send_set;
     127             : 
     128             :     // Start with the node after the 'marker'
     129         948 :     for (IFMapListEntry *curr = next; curr != NULL; curr = next) {
     130         901 :         next = queue_->Next(curr);
     131             : 
     132         901 :         if (curr->IsMarker()) {
     133           0 :             IFMapMarker *next_marker = static_cast<IFMapMarker *>(curr);
     134             :             // Processing the next_marker can change the send_set and all
     135             :             // clients in the next_marker should have already seen the updates
     136             :             // currently sitting in the buffer. So, flush the buffer to the
     137             :             // existing client-set before processing the marker so that we dont
     138             :             // send duplicates.
     139           0 :             if (!message_->IsEmpty()) {
     140           0 :                 BitSet blocked_set;
     141           0 :                 SendUpdate(base_send_set, &blocked_set);
     142           0 :             }
     143             :             bool done;
     144           0 :             marker = ProcessMarker(marker, next_marker, &done);
     145           0 :             if (done) {
     146             :                 // All the clients in this marker are blocked. We are done.
     147           0 :                 return;
     148             :             }
     149             :             // marker has the ready clients. Continue as if we are starting
     150             :             // fresh.
     151           0 :             base_send_set.clear();
     152           0 :             continue;
     153           0 :         }
     154             : 
     155             :         // ...else its an update or delete
     156             : 
     157         901 :         IFMapUpdate *update = static_cast<IFMapUpdate *>(curr);
     158         901 :         BitSet send_set = update->advertise() & marker->mask;
     159         901 :         if (send_set.empty()) {
     160           0 :             continue;
     161             :         }
     162             : 
     163         901 :         if (base_send_set.empty()) {
     164          47 :             base_send_set = send_set;
     165             :         }
     166             : 
     167             :         // Flush the message to all possible clients if:
     168             :         // 1. The buffer is full OR
     169             :         // 2. The send_set is changing and buffer is filled.
     170        1745 :         if (message_->IsFull() ||
     171         844 :             ((base_send_set != send_set) && !message_->IsEmpty())) {
     172             : 
     173          65 :             BitSet blocked_set;
     174          65 :             SendUpdate(base_send_set, &blocked_set);
     175          65 :             if (!blocked_set.empty()) {
     176             :                 // All the clients in this marker are blocked. We are done.
     177           0 :                 if (blocked_set == marker->mask) {
     178           0 :                     IFMAP_UPD_SENDER_TRACE(IFMapUSAllBlocked, marker->ToString(),
     179             :                         "blocked before", curr->ToString());
     180           0 :                     queue_->MoveMarkerBefore(marker, curr);
     181           0 :                     return;
     182             :                 }
     183             :                 // Only a subset of clients in this marker are blocked. Insert
     184             :                 // a marker for them 'before' curr since they have seen
     185             :                 // everything before curr. Let the ready clients continue the
     186             :                 // traversal.
     187           0 :                 IFMAP_UPD_SENDER_TRACE(IFMapUSSubsetBlocked, "Clients",
     188             :                     blocked_set.ToNumberedString(), "blocked before",
     189             :                     curr->ToString(), "and split from", marker->ToString());
     190           0 :                 queue_->MarkerSplitBefore(marker, curr, blocked_set);
     191           0 :                 send_set.Reset(blocked_set);
     192             :             }
     193             : 
     194             :             // The send_set for this marker is changing. Pick up the new one.
     195          65 :             base_send_set = send_set;
     196          65 :         }
     197             : 
     198             :         // base_send_set is same as send_set at this point.
     199         901 :         ProcessUpdate(update, base_send_set);
     200         901 :     }
     201             : 
     202             :     // The buffer will be filled in the common case of updates being added
     203             :     // after the tail_marker.
     204          47 :     BitSet blk_set;
     205          47 :     if (!message_->IsEmpty()) {
     206          47 :         SendUpdate(base_send_set, &blk_set);
     207             :     }
     208             :     // If the last node in the Q was the tail_marker, we would have already
     209             :     // flushed the buffer and merged with it and we would be the last node in
     210             :     // the Q.
     211          47 :     IFMapListEntry *last = queue_->GetLast();
     212          47 :     if (marker != last) {
     213             :         // Since we have reached the end of the Q, we better be the tail_marker
     214           0 :         assert(marker == queue_->tail_marker());
     215             :         // If we have any blocked clients, splitting markers for them is not
     216             :         // useful at this point. Just move the marker to the end of the Q,
     217             :         // immediately after last, even if it has blocked clients. Being lazy
     218             :         // is advantageous since by the time we get the next trigger, a blocked
     219             :         // client could have become ready and splitting the marker now would be
     220             :         // useless.
     221           0 :         IFMAP_UPD_SENDER_TRACE(IFMapUSMoveAfterLast, "Moving", marker->ToString(),
     222             :             "before", last->ToString(), "with blocked_set",
     223             :             blk_set.ToNumberedString());
     224           0 :         queue_->MoveMarkerAfter(marker, last);
     225             :     }
     226          47 :     return;
     227          47 : }
     228             : 
     229         901 : void IFMapUpdateSender::ProcessUpdate(IFMapUpdate *update,
     230             :                                       const BitSet &base_send_set) {
     231         901 :     LogAndCountSentUpdate(update, base_send_set);
     232             : 
     233             :     // Append the contents of the update-node to the message.
     234         901 :     message_->EncodeUpdate(update);
     235             : 
     236             :     // Clean up the node if everybody has seen it.
     237         901 :     update->AdvertiseReset(base_send_set);
     238         901 :     if (update->advertise().empty()) {
     239         901 :         queue_->Dequeue(update);
     240             :     }
     241             :     // Update may be freed.
     242        1802 :     server_->exporter()->StateUpdateOnDequeue(update, base_send_set,
     243         901 :                                               update->IsDelete());
     244         901 : }
     245             : 
     246             : // blocked_set is a subset of send_set
     247         112 : void IFMapUpdateSender::SendUpdate(BitSet send_set, BitSet *blocked_set) {
     248             :     IFMapClient *client;
     249             :     bool send_result;
     250             : 
     251         112 :     assert(!message_->IsEmpty());
     252             : 
     253         226 :     for (size_t i = send_set.find_first(); i != BitSet::npos;
     254         114 :          i = send_set.find_next(i)) {
     255         114 :         assert(!send_blocked_.test(i));
     256         114 :         client = server_->GetClient(i);
     257         114 :         assert(client);
     258             : 
     259         114 :         message_->SetReceiverInMsg(client->identifier());
     260             :         // Close the message to save the document as string
     261         114 :         message_->Close();
     262             : 
     263             :         // Send the string version of the message to the client.
     264         114 :         send_result = client->SendUpdate(message_->get_string());
     265             : 
     266             :         // Keep track of all the clients whose buffers are full.
     267         114 :         if (!send_result) {
     268           0 :             blocked_set->set(i);
     269           0 :             send_blocked_.set(i);
     270             :         }
     271             :     }
     272             :     // Reset the message to init things for the next message
     273         112 :     message_->Reset();
     274         112 : }
     275             : 
     276             : // marker is before next_marker in the Q. next_marker could be the tail_marker.
     277             : // 'done' is set to true only if all the clients in the union of the
     278             : // client-sets of the 2 markers are blocked.
     279           0 : IFMapMarker* IFMapUpdateSender::ProcessMarker(IFMapMarker *marker,
     280             :                                               IFMapMarker *next_marker,
     281             :                                               bool *done) {
     282             :     // There should never be a marker beyond the tail_marker
     283           0 :     assert(marker != queue_->tail_marker());
     284             : 
     285             :     // Get the union (total_set) of the client-sets in the 2 markers. Then, get
     286             :     // the subset of clients in the union that are blocked (blocked_set). The
     287             :     // remaining subset of clients are ready (ready_set).
     288           0 :     BitSet total_set = (marker->mask | next_marker->mask);
     289           0 :     BitSet blocked_set = (total_set & send_blocked_);
     290           0 :     BitSet ready_set;
     291           0 :     ready_set.BuildComplement(total_set, blocked_set); // *this = lhs & ~rhs
     292             : 
     293             :     // If all the clients are ready or all are blocked, merge marker into
     294             :     // next_marker. marker will be deleted.
     295           0 :     if (blocked_set.empty() || ready_set.empty()) {
     296           0 :         IFMAP_UPD_SENDER_TRACE(IFMapUSMarkerMerge, "Merging", marker->ToString(),
     297             :             "into", next_marker->ToString());
     298           0 :         queue_->MarkerMerge(next_marker, marker, marker->mask);
     299           0 :         assert(next_marker->mask == total_set);
     300             :     } else {
     301             :         // We have both, ready and blocked, clients. First, merge both the
     302             :         // markers into next_marker so that next_marker has the total_set. Then
     303             :         // split next_marker into 2 markers: first with the blocked_set and the
     304             :         // second with the ready_set, with first(blocked) preceding the
     305             :         // second(ready).
     306           0 :         IFMAP_UPD_SENDER_TRACE(IFMapUSMarkerMerge, "Merging", marker->ToString(),
     307             :             "into", next_marker->ToString());
     308           0 :         queue_->MarkerMerge(next_marker, marker, marker->mask);
     309           0 :         assert(next_marker->mask == total_set);
     310           0 :         IFMAP_UPD_SENDER_TRACE(IFMapUSMarkerSplit, "Splitting blocked clients",
     311             :             blocked_set.ToNumberedString(), "from", next_marker->ToString());
     312           0 :         queue_->MarkerSplitBefore(next_marker, next_marker, blocked_set);
     313             :     }
     314           0 :     if (ready_set.empty()) {
     315             :         // If all the clients are blocked, we are done.
     316           0 :         *done = true;
     317             :     } else {
     318             :         // Atleast some clients are ready to continue.
     319           0 :         *done = false;
     320             :     }
     321             : 
     322             :     // next_marker has the ready_set if done is false
     323           0 :     return next_marker;
     324           0 : }
     325             : 
     326         901 : void IFMapUpdateSender::LogAndCountSentUpdate(IFMapUpdate *update,
     327             :                                               const BitSet &base_send_set) {
     328         901 :     size_t total = base_send_set.count();
     329             :     // Avoid dealing with return value of BitSet::npos
     330         901 :     if (total) {
     331         901 :         string name = update->ConfigName();
     332         901 :         string operation = update->TypeToString();
     333         901 :         size_t client_id = base_send_set.find_first();
     334        1804 :         while (total--) {
     335         903 :             IFMapClient *client = server_->GetClient(client_id);
     336         903 :             if (client) {
     337         903 :                 IFMAP_DEBUG_ONLY(IFMapClientSendInfo, operation, name,
     338             :                                  client->identifier(), client->name());
     339         903 :                 if (update->IsNode()) {
     340         435 :                     if (update->IsUpdate()) {
     341         435 :                         client->incr_update_nodes_sent();
     342           0 :                     } else if (update->IsDelete()) {
     343           0 :                         client->incr_delete_nodes_sent();
     344             :                     } else {
     345           0 :                         assert(0);
     346             :                     }
     347         468 :                 } else if (update->IsLink()) {
     348         468 :                     if (update->IsUpdate()) {
     349         468 :                         client->incr_update_links_sent();
     350           0 :                     } else if (update->IsDelete()) {
     351           0 :                         client->incr_delete_links_sent();
     352             :                     } else {
     353           0 :                         assert(0);
     354             :                     }
     355             :                 }
     356             :             }
     357         903 :             client_id = base_send_set.find_next(client_id);
     358             :         }
     359         901 :     }
     360         901 : }
     361             : 

Generated by: LCOV version 1.14