LCOV - code coverage report
Current view: top level - ksync - ksync_sock.cc (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 0 639 0.0 %
Date: 2026-06-22 02:21:21 Functions: 0 104 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : 
       5             : #include <atomic>
       6             : #include <string>
       7             : 
       8             : #include "base/os.h"
       9             : 
      10             : #if defined(__linux__)
      11             : #include <asm/types.h>
      12             : #include <linux/netlink.h>
      13             : #include <linux/rtnetlink.h>
      14             : #include <linux/genetlink.h>
      15             : #include <linux/sockios.h>
      16             : #endif
      17             : 
      18             : #include <sys/socket.h>
      19             : 
      20             : #include <boost/bind/bind.hpp>
      21             : 
      22             : #include <base/logging.h>
      23             : #include <db/db.h>
      24             : #include <db/db_entry.h>
      25             : #include <db/db_table.h>
      26             : #include <db/db_table_partition.h>
      27             : 
      28             : #include "ksync_index.h"
      29             : #include "ksync_entry.h"
      30             : #include "ksync_object.h"
      31             : #include "ksync_sock.h"
      32             : #include "ksync_sock_user.h"
      33             : #include "ksync_types.h"
      34             : 
      35             : #include "nl_util.h"
      36             : #include "udp_util.h"
      37             : #include "vr_genetlink.h"
      38             : #include "vr_types.h"
      39             : 
      40             : using namespace boost::asio;
      41             : using namespace boost::placeholders;
      42             : 
      43             : /* Note SO_RCVBUFFORCE is supported only for linux version 2.6.14 and above */
      44             : typedef boost::asio::detail::socket_option::integer<SOL_SOCKET,
      45             :         SO_RCVBUFFORCE> ReceiveBuffForceSize;
      46             : 
      47             : int KSyncSock::vnsw_netlink_family_id_;
      48             : AgentSandeshContext *KSyncSock::agent_sandesh_ctx_[kRxWorkQueueCount];
      49             : std::unique_ptr<KSyncSock> KSyncSock::sock_;
      50             : pid_t KSyncSock::pid_;
      51             : std::atomic<bool> KSyncSock::shutdown_;
      52             : 
      53             : // Name of task used in KSync Response work-queues
      54             : const char* IoContext::io_wq_names[IoContext::MAX_WORK_QUEUES] =
      55             :                                                 {
      56             :                                                     "Agent::Uve",
      57             :                                                     "Agent::KSync"
      58             :                                                 };
      59             : /////////////////////////////////////////////////////////////////////////////
      60             : // Netlink utilities
      61             : /////////////////////////////////////////////////////////////////////////////
      62           0 : uint32_t GetNetlinkSeqno(char *data) {
      63           0 :     struct nlmsghdr *nlh = (struct nlmsghdr *)data;
      64           0 :     return nlh->nlmsg_seq;
      65             : }
      66             : 
      67           0 : bool NetlinkMsgDone(char *data) {
      68           0 :     struct nlmsghdr *nlh = (struct nlmsghdr *)data;
      69           0 :     return ((nlh->nlmsg_flags & NLM_F_MULTI) != 0);
      70             : }
      71             : 
      72             : // Common validation for netlink messages
      73           0 : bool ValidateNetlink(char *data) {
      74           0 :     struct nlmsghdr *nlh = (struct nlmsghdr *)data;
      75           0 :     if (nlh->nlmsg_type == NLMSG_ERROR) {
      76           0 :         LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq << " len "
      77             :             << nlh->nlmsg_len);
      78           0 :         assert(0);
      79             :         return false;
      80             :     }
      81             : 
      82           0 :     if (nlh->nlmsg_len > KSyncSock::kBufLen) {
      83           0 :         LOG(ERROR, "Length of " << nlh->nlmsg_len << " is more than expected "
      84             :             "length of " << KSyncSock::kBufLen);
      85           0 :         assert(0);
      86             :         return false;
      87             :     }
      88             : 
      89           0 :     if (nlh->nlmsg_type == NLMSG_DONE) {
      90           0 :         return true;
      91             :     }
      92             : 
      93             :     // Sanity checks for generic-netlink message
      94           0 :     if (nlh->nlmsg_type != KSyncSock::GetNetlinkFamilyId()) {
      95           0 :         LOG(ERROR, "Netlink unknown message type : " << nlh->nlmsg_type);
      96           0 :         assert(0);
      97             :         return false;
      98             :     }
      99             : 
     100           0 :     struct genlmsghdr *genlh = (struct genlmsghdr *) (data + NLMSG_HDRLEN);
     101           0 :     if (genlh->cmd != SANDESH_REQUEST) {
     102           0 :         LOG(ERROR, "Unknown generic netlink cmd : " << genlh->cmd);
     103           0 :         assert(0);
     104             :         return false;
     105             :     }
     106             : 
     107           0 :     struct nlattr * attr = (struct nlattr *)(data + NLMSG_HDRLEN
     108             :                                              + GENL_HDRLEN);
     109           0 :     if (attr->nla_type != NL_ATTR_VR_MESSAGE_PROTOCOL) {
     110           0 :         LOG(ERROR, "Unknown generic netlink TLV type : " << attr->nla_type);
     111           0 :         assert(0);
     112             :         return false;
     113             :     }
     114             : 
     115           0 :     return true;
     116             : }
     117             : 
     118           0 : void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len) {
     119           0 :     struct nlmsghdr *nlh = (struct nlmsghdr *)data;
     120           0 :     int len = 0;
     121           0 :     if (nlh->nlmsg_type == NLMSG_DONE) {
     122           0 :         len = NLMSG_HDRLEN;
     123             :     } else {
     124           0 :         len = NLMSG_HDRLEN + GENL_HDRLEN + NLA_HDRLEN;
     125             :     }
     126             : 
     127           0 :     *buf = data + len;
     128           0 :     *buf_len = nlh->nlmsg_len - len;
     129           0 : }
     130             : 
     131           0 : void InitNetlink(nl_client *client) {
     132           0 :     nl_init_generic_client_req(client, KSyncSock::GetNetlinkFamilyId());
     133             :     unsigned char *nl_buf;
     134             :     uint32_t nl_buf_len;
     135           0 :     assert(nl_build_header(client, &nl_buf, &nl_buf_len) >= 0);
     136           0 : }
     137             : 
     138           0 : void ResetNetlink(nl_client *client) {
     139             :     unsigned char *nl_buf;
     140             :     uint32_t nl_buf_len;
     141           0 :     client->cl_buf_offset = 0;
     142           0 :     nl_build_header(client, &nl_buf, &nl_buf_len);
     143           0 : }
     144             : 
     145           0 : void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no) {
     146           0 :     nl_update_header(client, len);
     147           0 :     struct nlmsghdr *nlh = (struct nlmsghdr *)client->cl_buf;
     148           0 :     nlh->nlmsg_pid = KSyncSock::GetPid();
     149           0 :     nlh->nlmsg_seq = seq_no;
     150           0 : }
     151             : 
     152           0 : void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context,
     153             :                            uint32_t alignment) {
     154           0 :     while (buf_len > (alignment - 1)) {
     155             :         int error;
     156           0 :         int decode_len = Sandesh::ReceiveBinaryMsgOne((uint8_t *)buf, buf_len,
     157             :                                                       &error, sandesh_context);
     158           0 :         if (decode_len < 0) {
     159           0 :             LOG(DEBUG, "Incorrect decode len " << decode_len);
     160           0 :             break;
     161             :         }
     162           0 :         buf += decode_len;
     163           0 :         buf_len -= decode_len;
     164             :     }
     165           0 : }
     166             : 
     167             : /////////////////////////////////////////////////////////////////////////////
     168             : // KSyncSock routines
     169             : /////////////////////////////////////////////////////////////////////////////
     170           0 : KSyncSock::KSyncSock() :
     171           0 :     nl_client_(NULL), wait_tree_(), send_queue_(this),
     172           0 :     max_bulk_msg_count_(kMaxBulkMsgCount),
     173           0 :     bulk_seq_no_(kInvalidBulkSeqNo), bulk_buf_size_(0), bulk_msg_count_(0),
     174           0 :     rx_buff_(NULL), read_inline_(true), bulk_msg_context_(NULL),
     175           0 :     use_wait_tree_(true), process_data_inline_(false),
     176           0 :     ksync_bulk_sandesh_context_(), uve_bulk_sandesh_context_(),
     177           0 :     tx_count_(0), ack_count_(0), err_count_(0), 
     178           0 :     rx_process_queue_(TaskScheduler::GetInstance()->GetTaskId("Agent::KSync"), 0,
     179           0 :                     boost::bind(&KSyncSock::ProcessRxData, this, _1)) {
     180           0 :     TaskScheduler *scheduler = TaskScheduler::GetInstance();
     181             : 
     182             :     uint32_t uve_task_id =
     183           0 :         scheduler->GetTaskId(IoContext::io_wq_names[IoContext::IOC_UVE]);
     184             :     uint32_t ksync_task_id =
     185           0 :         scheduler->GetTaskId(IoContext::io_wq_names[IoContext::IOC_KSYNC]);
     186           0 :     for(uint32_t i = 0; i < kRxWorkQueueCount; i++) {
     187           0 :         ksync_rx_queue[i] = AllocQueue(ksync_bulk_sandesh_context_,
     188             :                                        ksync_task_id, i, "KSync Receive Queue");
     189           0 :         uve_rx_queue[i] = AllocQueue(uve_bulk_sandesh_context_,
     190             :                                      uve_task_id, i, "KSync UVE Receive Queue");
     191             :     }
     192             : 
     193           0 :     nl_client_ = (nl_client *)malloc(sizeof(nl_client));
     194           0 :     memset(nl_client_, 0, sizeof(nl_client));
     195           0 :     rx_buff_ = NULL;
     196           0 :     seqno_ = 0;
     197           0 :     uve_seqno_ = 0;
     198             : 
     199           0 :     memset(bulk_mctx_arr_, 0, sizeof(bulk_mctx_arr_));
     200           0 :     bmca_prod_ = bmca_cons_ = 0;
     201           0 : }
     202             : 
     203           0 : KSyncSock::~KSyncSock() {
     204           0 :     assert(wait_tree_.size() == 0);
     205             : 
     206           0 :     if (rx_buff_) {
     207           0 :         delete [] rx_buff_;
     208           0 :         rx_buff_ = NULL;
     209             :     }
     210             : 
     211           0 :     for(int i = 0; i < kRxWorkQueueCount; i++) {
     212           0 :         ksync_rx_queue[i]->Shutdown();
     213           0 :         delete ksync_rx_queue[i];
     214             : 
     215           0 :         uve_rx_queue[i]->Shutdown();
     216           0 :         delete uve_rx_queue[i];
     217             :     }
     218             : 
     219           0 :     if (nl_client_->cl_buf) {
     220           0 :         free(nl_client_->cl_buf);
     221             :     }
     222           0 :     free(nl_client_);
     223           0 : }
     224             : 
     225           0 : void KSyncSock::Shutdown() {
     226           0 :     shutdown_ = true;
     227           0 :     sock_->send_queue_.Shutdown();
     228           0 :     sock_.release();
     229           0 : }
     230             : 
     231           0 : void KSyncSock::Init(bool use_work_queue, const std::string &cpu_pin_policy) {
     232           0 :     sock_->send_queue_.Init(use_work_queue, cpu_pin_policy);
     233           0 :     pid_ = getpid();
     234           0 :     shutdown_ = false;
     235           0 : }
     236             : 
     237           0 : KSyncSock::KSyncReceiveQueue *KSyncSock::AllocQueue
     238             : (KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance,
     239             :  const char *name) {
     240             :     KSyncReceiveQueue *queue;
     241           0 :     queue = new KSyncReceiveQueue
     242             :         (task_id, instance, boost::bind(&KSyncSock::ProcessKernelData, this,
     243           0 :                                         &ctxt[instance], _1));
     244             :     char tmp[128];
     245           0 :     sprintf(tmp, "%s-%d", name, instance);
     246           0 :     queue->set_name(tmp);
     247           0 :     return queue;
     248             : }
     249             : 
     250           0 : void KSyncSock::SetMeasureQueueDelay(bool val) {
     251           0 :     sock_->send_queue_.set_measure_busy_time(val);
     252           0 :     for (int i = 0; i < kRxWorkQueueCount; i++) {
     253           0 :         ksync_rx_queue[i]->set_measure_busy_time(val);
     254             :     }
     255           0 : }
     256             : 
     257           0 : void KSyncSock::Start(bool read_inline) {
     258           0 :     sock_->read_inline_ = read_inline;
     259           0 :     if (sock_->read_inline_) {
     260           0 :         return;
     261             :     }
     262           0 :     sock_->rx_buff_ = new char[kBufLen];
     263           0 :     sock_->AsyncReceive(boost::asio::buffer(sock_->rx_buff_, kBufLen),
     264             :                         boost::bind(&KSyncSock::ReadHandler, sock_.get(),
     265             :                                     placeholders::error,
     266             :                                     placeholders::bytes_transferred));
     267             : }
     268             : 
     269           0 : void KSyncSock::SetSockTableEntry(KSyncSock *sock) {
     270           0 :     assert(sock_.get() == NULL);
     271           0 :     sock_.reset(sock);
     272           0 : }
     273             : 
     274           0 : void KSyncSock::SetNetlinkFamilyId(int id) {
     275           0 :     vnsw_netlink_family_id_ = id;
     276           0 :     InitNetlink(sock_->nl_client_);
     277           0 : }
     278             : 
     279           0 : uint32_t KSyncSock::WaitTreeSize() const {
     280           0 :     return wait_tree_.size();
     281             : }
     282             : 
     283           0 : void KSyncSock::SetSeqno(uint32_t seq) {
     284           0 :     seqno_ = seq;
     285           0 :     uve_seqno_ = seq;
     286           0 : }
     287             : 
     288           0 : uint32_t KSyncSock::AllocSeqNo(IoContext::Type type, uint32_t instance) {
     289             :     uint32_t seq;
     290           0 :     if (type == IoContext::IOC_UVE) {
     291           0 :         seq = uve_seqno_.fetch_add(1);
     292           0 :         seq = (seq * kRxWorkQueueCount + (instance % kRxWorkQueueCount)) << 1;
     293             :     } else {
     294           0 :         seq = seqno_.fetch_add(1);
     295           0 :         seq = (seq * kRxWorkQueueCount + (instance % kRxWorkQueueCount)) << 1;
     296           0 :         seq |= KSYNC_DEFAULT_Q_ID_SEQ;
     297             :     }
     298           0 :     if (seq == kInvalidBulkSeqNo) {
     299           0 :         return AllocSeqNo(type, instance);
     300             :     }
     301           0 :     return seq;
     302             : }
     303             : 
     304           0 : uint32_t KSyncSock::AllocSeqNo(IoContext::Type type) {
     305           0 :     return AllocSeqNo(type, 0);
     306             : }
     307             : 
     308           0 : KSyncSock::KSyncReceiveQueue *KSyncSock::GetReceiveQueue(IoContext::Type type,
     309             :                                                          uint32_t instance) {
     310           0 :     if (type == IoContext::IOC_UVE) {
     311           0 :         return uve_rx_queue[instance % kRxWorkQueueCount];
     312             :     } else {
     313           0 :         return ksync_rx_queue[instance % kRxWorkQueueCount];
     314             :     }
     315             : }
     316             : 
     317           0 : KSyncSock::KSyncReceiveQueue *KSyncSock::GetReceiveQueue(uint32_t seqno) {
     318             :     IoContext::Type type;
     319           0 :     if (seqno & KSYNC_DEFAULT_Q_ID_SEQ)
     320           0 :         type = IoContext::IOC_KSYNC;
     321             :     else
     322           0 :         type = IoContext::IOC_UVE;
     323             : 
     324           0 :     uint32_t instance = (seqno >> 1) % kRxWorkQueueCount;
     325           0 :     return GetReceiveQueue(type, instance);
     326             : }
     327           0 : void KSyncSock::EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event) {
     328           0 :     rx_process_queue_.Enqueue(KSyncRxQueueData(entry, event));
     329           0 : }
     330           0 : bool KSyncSock::ProcessRxData(KSyncRxQueueData data) {
     331           0 :     assert(data.event_ != KSyncEntry::INVALID);
     332           0 :     KSyncObject *object = data.entry_->GetObject();
     333           0 :     object->NetlinkAck(data.entry_, data.event_);
     334           0 :     return true;
     335             : }
     336           0 : KSyncBulkSandeshContext *KSyncSock::GetBulkSandeshContext(uint32_t seqno) {
     337             : 
     338           0 :     uint32_t instance = (seqno >> 1) % kRxWorkQueueCount;
     339           0 :     if (seqno & KSYNC_DEFAULT_Q_ID_SEQ)
     340           0 :         return &ksync_bulk_sandesh_context_[instance];
     341             :     else
     342           0 :         return &uve_bulk_sandesh_context_[instance];
     343             : }
     344             : 
     345           0 : KSyncSock *KSyncSock::Get(DBTablePartBase *partition) {
     346           0 :     return sock_.get();
     347             : }
     348             : 
     349           0 : KSyncSock *KSyncSock::Get(int idx) {
     350           0 :     assert(idx == 0);
     351           0 :     return sock_.get();
     352             : }
     353             : 
     354           0 : bool KSyncSock::ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context) {
     355           0 :     Validate(data);
     356             : 
     357             :     KSyncReceiveQueue *queue;
     358           0 :     if (context) {
     359           0 :         queue = GetReceiveQueue(context->io_context_type(),
     360             :                                 context->work_queue_index());
     361             :     } else {
     362           0 :         queue = GetReceiveQueue(GetSeqno(data));
     363             :     }
     364           0 :     queue->Enqueue(KSyncRxData(data, context));
     365           0 :     return true;
     366             : }
     367             : 
     368             : // Read handler registered with boost::asio. Demux done based on seqno_
     369           0 : void KSyncSock::ReadHandler(const boost::system::error_code& error,
     370             :                             size_t bytes_transferred) {
     371           0 :     if (error) {
     372           0 :         LOG(ERROR, "Error reading from Ksync sock. Error : " <<
     373             :             boost::system::system_error(error).what());
     374           0 :         if (shutdown_ == false) {
     375           0 :             assert(0);
     376             :         }
     377           0 :         return;
     378             :     }
     379             : 
     380           0 :     ValidateAndEnqueue(rx_buff_, NULL);
     381             : 
     382           0 :     rx_buff_ = new char[kBufLen];
     383           0 :     AsyncReceive(boost::asio::buffer(rx_buff_, kBufLen),
     384             :                  boost::bind(&KSyncSock::ReadHandler, this,
     385             :                              placeholders::error,
     386             :                              placeholders::bytes_transferred));
     387             : }
     388             : 
     389             : // Process kernel data - executes in the task specified by IoContext
     390             : // Currently only Agent::KSync and Agent::Uve are possibilities
     391           0 : bool KSyncSock::ProcessKernelData(KSyncBulkSandeshContext *bulk_sandesh_context,
     392             :                                   const KSyncRxData &data) {
     393           0 :     KSyncBulkMsgContext *bulk_message_context = data.bulk_msg_context_;
     394           0 :     WaitTree::iterator it;
     395           0 :     if (data.bulk_msg_context_ == NULL) {
     396           0 :         uint32_t seqno = GetSeqno(data.buff_);
     397             :         {
     398           0 :             std::scoped_lock lock(mutex_);
     399           0 :             it = wait_tree_.find(seqno);
     400           0 :         }
     401           0 :         if (it == wait_tree_.end()) {
     402           0 :             LOG(ERROR, "KSync error in finding for sequence number : "
     403             :                 << seqno);
     404           0 :             assert(0);
     405             :         }
     406           0 :         bulk_message_context = &(it->second);
     407             :     }
     408             : 
     409           0 :     bulk_sandesh_context->set_bulk_message_context(bulk_message_context);
     410           0 :     BulkDecoder(data.buff_, bulk_sandesh_context);
     411             :     // Remove the IoContext only on last netlink message
     412           0 :     if (IsMoreData(data.buff_) == false) {
     413           0 :         if (data.bulk_msg_context_ != NULL) {
     414           0 :             delete data.bulk_msg_context_;
     415             :         } else {
     416           0 :             std::scoped_lock lock(mutex_);
     417           0 :             wait_tree_.erase(it);
     418           0 :         }
     419             :     }
     420           0 :     delete[] data.buff_;
     421           0 :     return true;
     422             : }
     423             : 
     424           0 : bool KSyncSock::BlockingRecv() {
     425             :     char data[kBufLen];
     426           0 :     bool ret = false;
     427             : 
     428             :     do {
     429           0 :         Receive(boost::asio::buffer(data, kBufLen));
     430           0 :         AgentSandeshContext *ctxt = KSyncSock::GetAgentSandeshContext(0);
     431           0 :         ctxt->SetErrno(0);
     432             :         // BlockingRecv used only during Init and doesnt support bulk messages
     433             :         // Use non-bulk version of decoder
     434           0 :         Decoder(data, ctxt);
     435           0 :         if (ctxt->GetErrno() != 0 && ctxt->GetErrno() != EEXIST) {
     436           0 :             KSYNC_ERROR(VRouterError, "VRouter operation failed. Error <",
     437             :                         ctxt->GetErrno(), ":",
     438             :                         KSyncEntry::VrouterErrorToString(ctxt->GetErrno()),
     439             :                         ">. Object <", "N/A", ">. State <", "N/A",
     440             :                         ">. Message number :", 0);
     441           0 :             ret = true;
     442             :         }
     443           0 :     } while (IsMoreData(data));
     444             : 
     445           0 :     return ret;
     446             : }
     447             : 
     448             : // BlockingSend does not support bulk messages.
     449           0 : size_t KSyncSock::BlockingSend(char *msg, int msg_len) {
     450           0 :     KSyncBufferList iovec;
     451           0 :     iovec.push_back(buffer(msg, msg_len));
     452           0 :     bulk_buf_size_ = msg_len;
     453           0 :     return SendTo(&iovec, 0);
     454           0 : }
     455             : 
     456           0 : void KSyncSock::GenericSend(IoContext *ioc) {
     457           0 :     send_queue_.Enqueue(ioc);
     458           0 : }
     459             : 
     460           0 : void KSyncSock::SendAsync(KSyncEntry *entry, int msg_len, char *msg,
     461             :                           KSyncEntry::KSyncEvent event) {
     462           0 :     KSyncIoContext *ioc = new KSyncIoContext(this, entry, msg_len, msg, event);
     463             :     // Pre-allocate buffers to minimize processing in KSyncTxQueue context
     464           0 :     if (read_inline_ && entry->pre_alloc_rx_buffer()) {
     465           0 :         ioc->rx_buffer1_ = new char [kBufLen];
     466           0 :         ioc->rx_buffer2_ = new char [kBufLen];
     467             :     } else {
     468           0 :         ioc->rx_buffer1_ = ioc->rx_buffer2_ = NULL;
     469             :     }
     470           0 :     send_queue_.Enqueue(ioc);
     471           0 : }
     472             : 
     473             : // Write handler registered with boost::asio
     474           0 : void KSyncSock::WriteHandler(const boost::system::error_code& error,
     475             :                              size_t bytes_transferred) {
     476           0 :     if (error) {
     477           0 :         LOG(ERROR, "Ksync sock write error : " <<
     478             :             boost::system::system_error(error).what());
     479           0 :         if (shutdown_ == false) {
     480           0 :             assert(0);
     481             :         }
     482             :     }
     483           0 : }
     484             : 
     485             : // End of messages in the work-queue. Send messages pending in bulk context
     486           0 : void KSyncSock::OnEmptyQueue(bool done) {
     487           0 :     if (bulk_seq_no_ == kInvalidBulkSeqNo)
     488           0 :         return;
     489             : 
     490           0 :     KSyncBulkMsgContext *bulk_message_context = NULL;
     491           0 :     if (use_wait_tree_) {
     492           0 :         if (read_inline_ == false) {
     493           0 :             std::scoped_lock lock(mutex_);
     494           0 :             WaitTree::iterator it = wait_tree_.find(bulk_seq_no_);
     495           0 :             assert(it != wait_tree_.end());
     496           0 :             bulk_message_context = &it->second;
     497           0 :         } else {
     498           0 :             bulk_message_context = bulk_msg_context_;
     499             :         }
     500             :     } else {
     501           0 :         bulk_message_context = bulk_mctx_arr_[bmca_prod_];
     502             :     }
     503             : 
     504           0 :     SendBulkMessage(bulk_message_context, bulk_seq_no_);
     505             : }
     506             : 
     507             : // Send messages accumilated in bulk context
     508           0 : int KSyncSock::SendBulkMessage(KSyncBulkMsgContext *bulk_message_context,
     509             :                                uint32_t seqno) {
     510           0 :     KSyncBufferList iovec;
     511             :     // Get all buffers to send into single io-vector
     512           0 :     bulk_message_context->Data(&iovec);
     513           0 :     tx_count_++;
     514             : 
     515           0 :     if (!read_inline_) {
     516           0 :         if (!use_wait_tree_) {
     517           0 :             bmca_prod_++;
     518           0 :             if (bmca_prod_ >= KSYNC_BMC_ARR_SIZE) {
     519           0 :                 bmca_prod_ = 0;
     520             :             }
     521             :         }
     522             : 
     523           0 :         AsyncSendTo(&iovec, seqno,
     524             :                     boost::bind(&KSyncSock::WriteHandler, this,
     525             :                                 placeholders::error,
     526             :                                 placeholders::bytes_transferred));
     527             :     } else {
     528           0 :         SendTo(&iovec, seqno);
     529           0 :         bool more_data = false;
     530           0 :         do {
     531           0 :             char *rxbuf = bulk_message_context->GetReceiveBuffer();
     532           0 :             Receive(boost::asio::buffer(rxbuf, kBufLen));
     533           0 :             more_data = IsMoreData(rxbuf);
     534           0 :             if (!process_data_inline_) {
     535           0 :                 ValidateAndEnqueue(rxbuf, bulk_message_context);
     536             :             } else {
     537           0 :                 ProcessDataInline(rxbuf);
     538             :             }
     539             :         } while(more_data);
     540             :     }
     541             : 
     542           0 :     bulk_msg_context_ = NULL;
     543           0 :     bulk_seq_no_ = kInvalidBulkSeqNo;
     544           0 :     return true;
     545           0 : }
     546             : 
     547             : // Get the bulk-context for sequence-number
     548           0 : KSyncBulkMsgContext *KSyncSock::LocateBulkContext
     549             : (uint32_t seqno, IoContext::Type io_context_type,
     550             :  uint32_t work_queue_index) {
     551           0 :     if (read_inline_) {
     552           0 :         if (bulk_seq_no_ == kInvalidBulkSeqNo) {
     553           0 :             assert(bulk_msg_context_ == NULL);
     554           0 :             bulk_seq_no_ = seqno;
     555           0 :             bulk_buf_size_ = 0;
     556           0 :             bulk_msg_count_ = 0;
     557           0 :             bulk_msg_context_ = new KSyncBulkMsgContext(io_context_type,
     558           0 :                                                         work_queue_index);
     559             :         }
     560           0 :         return bulk_msg_context_;
     561             :     }
     562             : 
     563           0 :     if (use_wait_tree_) {
     564           0 :         std::scoped_lock lock(mutex_);
     565           0 :         if (bulk_seq_no_ == kInvalidBulkSeqNo) {
     566           0 :             bulk_seq_no_ = seqno;
     567           0 :             bulk_buf_size_ = 0;
     568           0 :             bulk_msg_count_ = 0;
     569             : 
     570           0 :             wait_tree_.insert(WaitTreePair(seqno,
     571           0 :                                        KSyncBulkMsgContext(io_context_type,
     572             :                                                            work_queue_index)));
     573             :         }
     574             : 
     575           0 :         WaitTree::iterator it = wait_tree_.find(bulk_seq_no_);
     576           0 :         assert(it != wait_tree_.end());
     577           0 :         return &it->second;
     578           0 :     } else {
     579           0 :         if (bulk_seq_no_ == kInvalidBulkSeqNo) {
     580           0 :             bulk_seq_no_ = seqno;
     581           0 :             bulk_buf_size_ = 0;
     582           0 :             bulk_msg_count_ = 0;
     583             : 
     584           0 :             bulk_mctx_arr_[bmca_prod_] = new KSyncBulkMsgContext(io_context_type,
     585           0 :                                                             work_queue_index);
     586           0 :             bulk_mctx_arr_[bmca_prod_]->set_seqno(seqno);
     587             :         }
     588             : 
     589           0 :         return bulk_mctx_arr_[bmca_prod_];
     590             :     }
     591             : 
     592             :     return NULL;
     593             : }
     594             : 
     595             : // Try adding an io-context to bulk context. Returns
     596             : //  - true  : if message can be added to bulk context
     597             : //  - false : if message cannot be added to bulk context
     598           0 : bool KSyncSock::TryAddToBulk(KSyncBulkMsgContext *bulk_message_context,
     599             :                              IoContext *ioc) {
     600           0 :     if (bulk_msg_count_ >= max_bulk_msg_count_)
     601           0 :         return false;
     602             : 
     603           0 :     if (bulk_message_context->io_context_type() != ioc->type())
     604           0 :         return false;
     605             : 
     606           0 :     if (use_wait_tree_) {
     607           0 :         if (bulk_message_context->work_queue_index() != ioc->index())
     608           0 :             return false;
     609             :     }
     610             : 
     611           0 :     bulk_buf_size_ += ioc->GetMsgLen();
     612           0 :     bulk_msg_count_++;
     613             : 
     614           0 :     bulk_message_context->Insert(ioc);
     615           0 :     if (ioc->rx_buffer1()) {
     616           0 :         bulk_message_context->AddReceiveBuffer(ioc->rx_buffer1());
     617           0 :         ioc->reset_rx_buffer1();
     618             : 
     619             :     }
     620           0 :     if (ioc->rx_buffer2()) {
     621           0 :         bulk_message_context->AddReceiveBuffer(ioc->rx_buffer2());
     622           0 :         ioc->reset_rx_buffer2();
     623             :     }
     624           0 :     return true;
     625             : }
     626             : 
     627           0 : bool KSyncSock::SendAsyncImpl(IoContext *ioc) {
     628             :     KSyncBulkMsgContext *bulk_message_context =
     629           0 :         LocateBulkContext(ioc->GetSeqno(), ioc->type(), ioc->index());
     630             :     // Try adding message to bulk-message list
     631           0 :     if (TryAddToBulk(bulk_message_context, ioc)) {
     632             :         // Message added to bulk-list. Nothing more to do
     633           0 :         return true;
     634             :     }
     635             : 
     636             :     // Message cannot be added to bulk-list. Send the current list
     637           0 :     SendBulkMessage(bulk_message_context, bulk_seq_no_);
     638             : 
     639             :     // Allocate a new context and add message to it
     640           0 :     bulk_message_context = LocateBulkContext(ioc->GetSeqno(), ioc->type(),
     641             :                                              ioc->index());
     642           0 :     assert(TryAddToBulk(bulk_message_context, ioc));
     643           0 :     return true;
     644             : }
     645             : 
     646             : 
     647             : /////////////////////////////////////////////////////////////////////////////
     648             : // KSyncSockNetlink routines
     649             : /////////////////////////////////////////////////////////////////////////////
     650           0 : KSyncSockNetlink::KSyncSockNetlink(boost::asio::io_context &ios, int protocol)
     651           0 :     : sock_(ios, protocol) {
     652           0 :     ReceiveBuffForceSize set_rcv_buf;
     653           0 :     set_rcv_buf = KSYNC_SOCK_RECV_BUFF_SIZE;
     654           0 :     boost::system::error_code ec;
     655           0 :     sock_.set_option(set_rcv_buf, ec);
     656           0 :     if (ec.value() != 0) {
     657           0 :         LOG(ERROR, "Error Changing netlink receive sock buffer size to " <<
     658             :                 set_rcv_buf.value() << " error = " <<
     659             :                 boost::system::system_error(ec).what());
     660             :     }
     661           0 :     boost::asio::socket_base::receive_buffer_size rcv_buf_size;
     662           0 :     boost::system::error_code ec1;
     663           0 :     sock_.get_option(rcv_buf_size, ec);
     664           0 :     LOG(INFO, "Current receive sock buffer size is " << rcv_buf_size.value());
     665           0 : }
     666             : 
     667           0 : KSyncSockNetlink::~KSyncSockNetlink() {
     668           0 : }
     669             : 
     670           0 : void KSyncSockNetlink::Init(io_service &ios, int protocol, bool use_work_queue,
     671             :                             const std::string &cpu_pin_policy) {
     672           0 :     KSyncSock::SetSockTableEntry(new KSyncSockNetlink(ios, protocol));
     673           0 :     KSyncSock::Init(use_work_queue, cpu_pin_policy);
     674           0 : }
     675             : 
     676           0 : uint32_t KSyncSockNetlink::GetSeqno(char *data) {
     677           0 :     return GetNetlinkSeqno(data);
     678             : }
     679             : 
     680           0 : bool KSyncSockNetlink::IsMoreData(char *data) {
     681           0 :     return NetlinkMsgDone(data);
     682             : }
     683             : 
     684           0 : bool KSyncSockNetlink::Validate(char *data) {
     685           0 :     return ValidateNetlink(data);
     686             : }
     687             : 
     688             : //netlink socket class for interacting with kernel
     689           0 : void KSyncSockNetlink::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
     690             :                                    HandlerCb cb) {
     691           0 :     ResetNetlink(nl_client_);
     692           0 :     KSyncBufferList::iterator it = iovec->begin();
     693           0 :     iovec->insert(it, buffer((char *)nl_client_->cl_buf,
     694           0 :                              nl_client_->cl_buf_offset));
     695           0 :     UpdateNetlink(nl_client_, bulk_buf_size_, seq_no);
     696             : 
     697           0 :     boost::asio::netlink::raw::endpoint ep;
     698           0 :     sock_.async_send_to(*iovec, ep, cb);
     699           0 : }
     700             : 
     701           0 : size_t KSyncSockNetlink::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
     702           0 :     ResetNetlink(nl_client_);
     703           0 :     KSyncBufferList::iterator it = iovec->begin();
     704           0 :     iovec->insert(it, buffer((char *)nl_client_->cl_buf,
     705           0 :                              nl_client_->cl_buf_offset));
     706           0 :     UpdateNetlink(nl_client_, bulk_buf_size_, seq_no);
     707             : 
     708           0 :     boost::asio::netlink::raw::endpoint ep;
     709           0 :     return sock_.send_to(*iovec, ep);
     710             : }
     711             : 
     712             : // Static method to decode non-bulk message
     713           0 : void KSyncSockNetlink::NetlinkDecoder(char *data, SandeshContext *ctxt) {
     714           0 :     assert(ValidateNetlink(data));
     715           0 :     char *buf = NULL;
     716           0 :     uint32_t buf_len = 0;
     717           0 :     GetNetlinkPayload(data, &buf, &buf_len);
     718           0 :     DecodeSandeshMessages(buf, buf_len, ctxt, NLA_ALIGNTO);
     719           0 : }
     720             : 
     721           0 : bool KSyncSockNetlink::Decoder(char *data, AgentSandeshContext *context) {
     722           0 :     NetlinkDecoder(data, context);
     723           0 :     return true;
     724             : }
     725             : 
     726             : // Static method used in ksync_sock_user only
     727           0 : void KSyncSockNetlink::NetlinkBulkDecoder(char *data, SandeshContext *ctxt,
     728             :                                           bool more) {
     729           0 :     assert(ValidateNetlink(data));
     730           0 :     char *buf = NULL;
     731           0 :     uint32_t buf_len = 0;
     732           0 :     GetNetlinkPayload(data, &buf, &buf_len);
     733             :     KSyncBulkSandeshContext *bulk_sandesh_context =
     734           0 :         dynamic_cast<KSyncBulkSandeshContext *>(ctxt);
     735           0 :     bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, more);
     736           0 : }
     737             : 
     738           0 : bool KSyncSockNetlink::BulkDecoder(char *data,
     739             :                                    KSyncBulkSandeshContext *bulk_sandesh_context) {
     740             :     // Get sandesh buffer and buffer-length
     741           0 :     uint32_t buf_len = 0;
     742           0 :     char *buf = NULL;
     743           0 :     GetNetlinkPayload(data, &buf, &buf_len);
     744           0 :     return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
     745             : }
     746             : 
     747           0 : void KSyncSockNetlink::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
     748           0 :     sock_.async_receive(buf, cb);
     749           0 : }
     750             : 
     751           0 : void KSyncSockNetlink::Receive(mutable_buffers_1 buf) {
     752           0 :     sock_.receive(buf);
     753           0 :     struct nlmsghdr *nlh = buffer_cast<struct nlmsghdr *>(buf);
     754           0 :     if (nlh->nlmsg_type == NLMSG_ERROR) {
     755           0 :         LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
     756             :                 << " len " << nlh->nlmsg_len);
     757           0 :         assert(0);
     758             :     }
     759           0 : }
     760             : 
     761             : /////////////////////////////////////////////////////////////////////////////
     762             : // KSyncSockUdp routines
     763             : /////////////////////////////////////////////////////////////////////////////
     764             : //Udp socket class for interacting with kernel
     765           0 : KSyncSockUdp::KSyncSockUdp(boost::asio::io_context &ios, int port) :
     766           0 :     sock_(ios, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)),
     767           0 :     server_ep_(boost::asio::ip::address::from_string("127.0.0.1"), port) {
     768           0 : }
     769             : 
     770           0 : void KSyncSockUdp::Init(io_service &ios, int port,
     771             :                         const std::string &cpu_pin_policy) {
     772           0 :     KSyncSock::SetSockTableEntry(new KSyncSockUdp(ios, port));
     773           0 :     KSyncSock::Init(false, cpu_pin_policy);
     774           0 : }
     775             : 
     776           0 : uint32_t KSyncSockUdp::GetSeqno(char *data) {
     777           0 :     struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
     778           0 :     return hdr->seq_no;
     779             : }
     780             : 
     781           0 : bool KSyncSockUdp::IsMoreData(char *data) {
     782           0 :     struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
     783           0 :     return ((hdr->flags & UVR_MORE) == UVR_MORE);
     784             : }
     785             : 
     786             : // We dont expect any non-bulk operation on UDP
     787           0 : bool KSyncSockUdp::Decoder(char *data, AgentSandeshContext *context) {
     788           0 :     assert(0);
     789             :     return false;
     790             : }
     791             : 
     792           0 : bool KSyncSockUdp::BulkDecoder(char *data,
     793             :                                KSyncBulkSandeshContext *bulk_sandesh_context) {
     794           0 :     struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
     795           0 :     uint32_t buf_len = hdr->msg_len;
     796           0 :     char *buf = data + sizeof(struct uvr_msg_hdr);
     797           0 :     return bulk_sandesh_context->Decoder(buf, buf_len, 1, IsMoreData(data));
     798             : }
     799             : 
     800           0 : void KSyncSockUdp::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
     801             :                                HandlerCb cb) {
     802             :     struct uvr_msg_hdr hdr;
     803           0 :     hdr.seq_no = seq_no;
     804           0 :     hdr.flags = 0;
     805           0 :     hdr.msg_len = bulk_buf_size_;
     806             : 
     807           0 :     KSyncBufferList::iterator it = iovec->begin();
     808           0 :     iovec->insert(it, buffer((char *)(&hdr), sizeof(hdr)));
     809             : 
     810           0 :     sock_.async_send_to(*iovec, server_ep_, cb);
     811           0 : }
     812             : 
     813           0 : size_t KSyncSockUdp::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
     814             :     struct uvr_msg_hdr hdr;
     815           0 :     hdr.seq_no = seq_no;
     816           0 :     hdr.flags = 0;
     817           0 :     hdr.msg_len = bulk_buf_size_;
     818             : 
     819           0 :     KSyncBufferList::iterator it = iovec->begin();
     820           0 :     iovec->insert(it, buffer((char *)(&hdr), sizeof(hdr)));
     821             : 
     822           0 :     return sock_.send_to(*iovec, server_ep_, MSG_DONTWAIT);
     823             : }
     824             : 
     825           0 : bool KSyncSockUdp::Validate(char *data) {
     826           0 :     return true;
     827             : }
     828             : 
     829           0 : void KSyncSockUdp::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
     830           0 :     boost::asio::ip::udp::endpoint ep;
     831           0 :     sock_.async_receive_from(buf, ep, cb);
     832           0 : }
     833             : 
     834           0 : void KSyncSockUdp::Receive(mutable_buffers_1 buf) {
     835           0 :     boost::asio::ip::udp::endpoint ep;
     836           0 :     sock_.receive_from(buf, ep);
     837           0 : }
     838             : 
     839           0 : void KSyncSock::ProcessDataInline(char *data) {
     840           0 :     KSyncBulkMsgContext *bulk_message_context = NULL;
     841             :     KSyncBulkSandeshContext *bulk_sandesh_context;
     842           0 :     uint32_t seqno = GetSeqno(data);
     843             : 
     844           0 :     assert(!use_wait_tree_);
     845           0 :     Validate(data);
     846             : 
     847           0 :     bulk_sandesh_context = GetBulkSandeshContext(seqno);
     848           0 :     bulk_message_context = bulk_mctx_arr_[bmca_cons_];
     849           0 :     assert(bulk_message_context->seqno() == seqno);
     850             : 
     851           0 :     bulk_sandesh_context->set_bulk_message_context(bulk_message_context);
     852           0 :     BulkDecoder(data, bulk_sandesh_context);
     853             : 
     854             :     // Remove the IoContext only on last netlink message
     855           0 :     if (IsMoreData(data) == false) {
     856           0 :         delete bulk_message_context;
     857           0 :         bmca_cons_++;
     858           0 :         if (bmca_cons_ >= KSYNC_BMC_ARR_SIZE) {
     859           0 :             bmca_cons_ = 0;
     860             :         }
     861             :     }
     862             : 
     863           0 :     return;
     864             : }
     865             : 
     866             : /////////////////////////////////////////////////////////////////////////////
     867             : // KSyncIoContext routines
     868             : /////////////////////////////////////////////////////////////////////////////
     869           0 : KSyncIoContext::KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry,
     870             :                                int msg_len, char *msg,
     871           0 :                                KSyncEntry::KSyncEvent event) :
     872             :     IoContext(msg, msg_len, 0,
     873           0 :               sock->GetAgentSandeshContext(sync_entry->GetTableIndex()),
     874           0 :               IoContext::IOC_KSYNC, sync_entry->GetTableIndex()),
     875           0 :     entry_(sync_entry), event_(event), sock_(sock) {
     876           0 :     SetSeqno(sock->AllocSeqNo(type(), index()));
     877           0 : }
     878             : 
     879           0 : void KSyncIoContext::Handler() {
     880           0 :     sock_->EnqueueRxProcessData(entry_, event_);
     881           0 : }
     882             : 
     883           0 : void KSyncIoContext::ErrorHandler(int err) {
     884           0 :     entry_->ErrorHandler(err, GetSeqno(), event_);
     885           0 : }
     886             : 
     887             : /////////////////////////////////////////////////////////////////////////////
     888             : // Routines for KSyncBulkSandeshContext
     889             : /////////////////////////////////////////////////////////////////////////////
     890           0 : KSyncBulkSandeshContext::KSyncBulkSandeshContext() :
     891           0 :     AgentSandeshContext(), bulk_msg_context_(NULL)  { }
     892             : 
     893           0 : KSyncBulkSandeshContext::~KSyncBulkSandeshContext() {
     894           0 : }
     895             : 
     896             : // Sandesh responses for old context are done. Check for any errors
     897           0 : void KSyncBulkSandeshContext::IoContextDone() {
     898           0 :     IoContext *io_context = &(*bulk_msg_context_->io_context_list_it_);
     899           0 :     AgentSandeshContext *sandesh_context = io_context->GetSandeshContext();
     900             : 
     901           0 :     sandesh_context->set_ksync_io_ctx(NULL);
     902           0 :     if (sandesh_context->GetErrno() != 0 &&
     903           0 :         sandesh_context->GetErrno() != EEXIST) {
     904           0 :         io_context->ErrorHandler(sandesh_context->GetErrno());
     905             :     }
     906           0 :     io_context->Handler();
     907           0 : }
     908             : 
     909           0 : void KSyncBulkSandeshContext::IoContextStart() {
     910           0 :     bulk_msg_context_->vr_response_count_++;
     911           0 :     IoContext &io_context = *bulk_msg_context_->io_context_list_it_;
     912           0 :     AgentSandeshContext *sandesh_context = io_context.GetSandeshContext();
     913             :     sandesh_context->set_ksync_io_ctx
     914           0 :         (static_cast<KSyncIoContext *>(&io_context));
     915           0 : }
     916             : 
     917             : // Process the sandesh messages
     918             : // There can be more then one sandesh messages in the netlink buffer.
     919             : // Iterate and process all of them
     920           0 : bool KSyncBulkSandeshContext::Decoder(char *data, uint32_t len,
     921             :                                       uint32_t alignment, bool more) {
     922           0 :     DecodeSandeshMessages(data, len, this, alignment);
     923           0 :     assert(bulk_msg_context_->io_context_list_it_ !=
     924             :            bulk_msg_context_->io_context_list_.end());
     925           0 :     if (more == true)
     926           0 :         return false;
     927             : 
     928           0 :     IoContextDone();
     929             : 
     930             :     // No more netlink messages. Validate that iterator points to last element
     931             :     // in IoContextList
     932           0 :     bulk_msg_context_->io_context_list_it_++;
     933           0 :     assert(bulk_msg_context_->io_context_list_it_ ==
     934             :            bulk_msg_context_->io_context_list_.end());
     935           0 :     return true;
     936             : }
     937             : 
     938           0 : void KSyncBulkSandeshContext::SetErrno(int err) {
     939           0 :     AgentSandeshContext *context = GetSandeshContext();
     940           0 :     context->SetErrno(err);
     941           0 : }
     942             : 
     943           0 : AgentSandeshContext *KSyncBulkSandeshContext::GetSandeshContext() {
     944           0 :     assert(bulk_msg_context_->vr_response_count_);
     945           0 :     return bulk_msg_context_->io_context_list_it_->GetSandeshContext();
     946             : }
     947             : 
     948           0 : void KSyncBulkSandeshContext::IfMsgHandler(vr_interface_req *req) {
     949           0 :     AgentSandeshContext *context = GetSandeshContext();
     950           0 :     context->IfMsgHandler(req);
     951           0 : }
     952             : 
     953           0 : void KSyncBulkSandeshContext::NHMsgHandler(vr_nexthop_req *req) {
     954           0 :     AgentSandeshContext *context = GetSandeshContext();
     955           0 :     context->NHMsgHandler(req);
     956           0 : }
     957             : 
     958           0 : void KSyncBulkSandeshContext::RouteMsgHandler(vr_route_req *req) {
     959           0 :     AgentSandeshContext *context = GetSandeshContext();
     960           0 :     context->RouteMsgHandler(req);
     961           0 : }
     962             : 
     963           0 : void KSyncBulkSandeshContext::MplsMsgHandler(vr_mpls_req *req) {
     964           0 :     AgentSandeshContext *context = GetSandeshContext();
     965           0 :     context->MplsMsgHandler(req);
     966           0 : }
     967             : 
     968           0 : void KSyncBulkSandeshContext::QosConfigMsgHandler(vr_qos_map_req *req) {
     969           0 :     AgentSandeshContext *context = GetSandeshContext();
     970           0 :     context->QosConfigMsgHandler(req);
     971           0 : }
     972             : 
     973           0 : void KSyncBulkSandeshContext::ForwardingClassMsgHandler(vr_fc_map_req *req) {
     974           0 :     AgentSandeshContext *context = GetSandeshContext();
     975           0 :     context->ForwardingClassMsgHandler(req);
     976           0 : }
     977             : 
     978             : // vr_response message is treated as delimiter in a bulk-context. So, move to
     979             : // next io-context within bulk-message context.
     980           0 : int KSyncBulkSandeshContext::VrResponseMsgHandler(vr_response *resp) {
     981           0 :     AgentSandeshContext *sandesh_context = NULL;
     982             :     // If this is first vr_reponse received, move io-context to first entry in
     983             :     // bulk context
     984           0 :     if (bulk_msg_context_->vr_response_count_ == 0) {
     985           0 :         bulk_msg_context_->io_context_list_it_ =
     986           0 :             bulk_msg_context_->io_context_list_.begin();
     987             :         sandesh_context =
     988           0 :             bulk_msg_context_->io_context_list_it_->GetSandeshContext();
     989           0 :         IoContextStart();
     990             :     } else {
     991             :         // Sandesh responses for old io-context are done.
     992             :         // Check for any errors and trigger state-machine for old io-context
     993           0 :         IoContextDone();
     994             :         // Move to the next io-context
     995           0 :         bulk_msg_context_->io_context_list_it_++;
     996           0 :         assert(bulk_msg_context_->io_context_list_it_ !=
     997             :                bulk_msg_context_->io_context_list_.end());
     998             :         sandesh_context =
     999           0 :             bulk_msg_context_->io_context_list_it_->GetSandeshContext();
    1000           0 :         IoContextStart();
    1001             :     }
    1002           0 :     return sandesh_context->VrResponseMsgHandler(resp);
    1003             : }
    1004             : 
    1005           0 : void KSyncBulkSandeshContext::MirrorMsgHandler(vr_mirror_req *req) {
    1006           0 :     AgentSandeshContext *context = GetSandeshContext();
    1007           0 :     context->MirrorMsgHandler(req);
    1008           0 : }
    1009             : 
    1010           0 : void KSyncBulkSandeshContext::FlowMsgHandler(vr_flow_req *req) {
    1011           0 :     AgentSandeshContext *context = GetSandeshContext();
    1012           0 :     context->FlowMsgHandler(req);
    1013           0 : }
    1014             : 
    1015           0 : void KSyncBulkSandeshContext::FlowResponseHandler(vr_flow_response *req) {
    1016           0 :     AgentSandeshContext *context = GetSandeshContext();
    1017           0 :     context->FlowResponseHandler(req);
    1018           0 : }
    1019             : 
    1020           0 : void KSyncBulkSandeshContext::VrfAssignMsgHandler(vr_vrf_assign_req *req) {
    1021           0 :     AgentSandeshContext *context = GetSandeshContext();
    1022           0 :     context->VrfAssignMsgHandler(req);
    1023           0 : }
    1024             : 
    1025           0 : void KSyncBulkSandeshContext::VrfMsgHandler(vr_vrf_req *req) {
    1026           0 :     AgentSandeshContext *context = GetSandeshContext();
    1027           0 :     context->VrfMsgHandler(req);
    1028           0 : }
    1029             : 
    1030           0 : void KSyncBulkSandeshContext::VrfStatsMsgHandler(vr_vrf_stats_req *req) {
    1031           0 :     AgentSandeshContext *context = GetSandeshContext();
    1032           0 :     context->VrfStatsMsgHandler(req);
    1033           0 : }
    1034             : 
    1035           0 : void KSyncBulkSandeshContext::DropStatsMsgHandler(vr_drop_stats_req *req) {
    1036           0 :     AgentSandeshContext *context = GetSandeshContext();
    1037           0 :     context->DropStatsMsgHandler(req);
    1038           0 : }
    1039             : 
    1040           0 : void KSyncBulkSandeshContext::VxLanMsgHandler(vr_vxlan_req *req) {
    1041           0 :     AgentSandeshContext *context = GetSandeshContext();
    1042           0 :     context->VxLanMsgHandler(req);
    1043           0 : }
    1044             : 
    1045           0 : void KSyncBulkSandeshContext::VrouterOpsMsgHandler(vrouter_ops *req) {
    1046           0 :     AgentSandeshContext *context = GetSandeshContext();
    1047           0 :     context->VrouterOpsMsgHandler(req);
    1048           0 : }
    1049             : 
    1050             : /////////////////////////////////////////////////////////////////////////////
    1051             : // KSyncBulkMsgContext routines
    1052             : /////////////////////////////////////////////////////////////////////////////
    1053           0 : KSyncBulkMsgContext::KSyncBulkMsgContext(IoContext::Type type,
    1054           0 :                                          uint32_t index) :
    1055           0 :     io_context_list_(), io_context_type_(type), work_queue_index_(index),
    1056           0 :     rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
    1057           0 : }
    1058             : 
    1059           0 : KSyncBulkMsgContext::KSyncBulkMsgContext(const KSyncBulkMsgContext &rhs) :
    1060           0 :     io_context_list_(), io_context_type_(rhs.io_context_type_),
    1061           0 :     work_queue_index_(rhs.work_queue_index_),
    1062           0 :     rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
    1063           0 :     assert(rhs.vr_response_count_ == 0);
    1064           0 :     assert(rhs.rx_buffer_index_ == 0);
    1065           0 :     assert(rhs.io_context_list_.size() == 0);
    1066           0 : }
    1067             : 
    1068             : struct IoContextDisposer {
    1069           0 :     void operator() (IoContext *io_context) { delete io_context; }
    1070             : };
    1071             : 
    1072           0 : KSyncBulkMsgContext::~KSyncBulkMsgContext() {
    1073           0 :     assert(vr_response_count_ == io_context_list_.size());
    1074           0 :     io_context_list_.clear_and_dispose(IoContextDisposer());
    1075           0 :     for (uint32_t i = 0; i < rx_buffer_index_; i++) {
    1076           0 :         delete[] rx_buffers_[i];
    1077             :     }
    1078           0 : }
    1079             : 
    1080           0 : char *KSyncBulkMsgContext::GetReceiveBuffer() {
    1081           0 :     if (rx_buffer_index_ == 0)
    1082           0 :         return new char[KSyncSock::kBufLen];
    1083             : 
    1084           0 :     return rx_buffers_[--rx_buffer_index_];
    1085             : }
    1086             : 
    1087           0 : void KSyncBulkMsgContext::AddReceiveBuffer(char *buff) {
    1088           0 :     assert(rx_buffer_index_ < kMaxRxBufferCount);
    1089           0 :     rx_buffers_[rx_buffer_index_++] = buff;
    1090           0 : }
    1091             : 
    1092           0 : void KSyncBulkMsgContext::Insert(IoContext *ioc) {
    1093           0 :     io_context_list_.push_back(*ioc);
    1094           0 :     return;
    1095             : }
    1096             : 
    1097           0 : void KSyncBulkMsgContext::Data(KSyncBufferList *iovec) {
    1098           0 :     IoContextList::iterator it = io_context_list_.begin();
    1099           0 :     while (it != io_context_list_.end()) {
    1100           0 :         iovec->push_back(buffer(it->GetMsg(), it->GetMsgLen()));
    1101           0 :         it++;
    1102             :     }
    1103           0 : }

Generated by: LCOV version 1.14