LCOV - code coverage report
Current view: top level - ksync - ksync_sock.h (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 56 88 63.6 %
Date: 2026-06-08 02:02:55 Functions: 35 62 56.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : 
       5             : #ifndef ctrlplane_ksync_sock_h
       6             : #define ctrlplane_ksync_sock_h
       7             : 
       8             : #include <atomic>
       9             : #include <vector>
      10             : #include <mutex>
      11             : 
      12             : #include <boost/asio.hpp>
      13             : #include <boost/asio/buffer.hpp>
      14             : 
      15             : #include <boost/asio/netlink_protocol.hpp>
      16             : #include <boost/asio/netlink_endpoint.hpp>
      17             : 
      18             : #include <base/queue_task.h>
      19             : #include <sandesh/common/vns_constants.h>
      20             : #include <sandesh/common/vns_types.h>
      21             : #include <io/tcp_session.h>
      22             : #include <vr_types.h>
      23             : #include <nl_util.h>
      24             : #include "ksync_entry.h"
      25             : #include "ksync_tx_queue.h"
      26             : 
      27             : #define KSYNC_DEFAULT_MSG_SIZE    4096
      28             : #define KSYNC_DEFAULT_Q_ID_SEQ    0x00000001
      29             : #define KSYNC_ACK_WAIT_THRESHOLD  200
      30             : #define KSYNC_SOCK_RECV_BUFF_SIZE (256 * 1024)
      31             : #define KSYNC_BMC_ARR_SIZE        1024
      32             : 
      33             : class KSyncEntry;
      34             : class KSyncIoContext;
      35             : class KSyncSockTcpSession;
      36             : struct nl_client;
      37             : class KSyncBulkSandeshContext;
      38             : 
      39             : typedef std::vector<boost::asio::mutable_buffers_1> KSyncBufferList;
      40             : 
      41             : uint32_t GetNetlinkSeqno(char *data);
      42             : bool NetlinkMsgDone(char *data);
      43             : bool ValidateNetlink(char *data);
      44             : void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len);
      45             : void InitNetlink(nl_client *client);
      46             : void ResetNetlink(nl_client *client);
      47             : void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no);
      48             : void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context,
      49             :                            uint32_t alignment);
      50             : 
      51             : /* Base class to hold sandesh context information which is passed to
      52             :  * Sandesh decode
      53             :  */
      54             : class AgentSandeshContext : public SandeshContext {
      55             : public:
      56        1822 :     AgentSandeshContext() : errno_(0), ksync_io_ctx_(NULL) { }
      57       82858 :     virtual ~AgentSandeshContext() { }
      58             : 
      59             :     virtual void IfMsgHandler(vr_interface_req *req) = 0;
      60             :     virtual void NHMsgHandler(vr_nexthop_req *req) = 0;
      61             :     virtual void RouteMsgHandler(vr_route_req *req) = 0;
      62             :     virtual void MplsMsgHandler(vr_mpls_req *req) = 0;
      63             :     virtual int VrResponseMsgHandler(vr_response *resp) = 0;
      64             :     virtual void MirrorMsgHandler(vr_mirror_req *req) = 0;
      65             :     virtual void FlowMsgHandler(vr_flow_req *req) = 0;
      66           0 :     virtual void FlowResponseHandler(vr_flow_response *req) { assert(0); }
      67           0 :     virtual void FlowTableInfoHandler(vr_flow_table_data *r) { assert(0); }
      68           0 :     virtual void BridgeTableInfoHandler(vr_bridge_table_data *r) { assert(0);}
      69             :     virtual void VrfAssignMsgHandler(vr_vrf_assign_req *req) = 0;
      70             :     virtual void VrfMsgHandler(vr_vrf_req *req) = 0;
      71             :     virtual void VrfStatsMsgHandler(vr_vrf_stats_req *req) = 0;
      72             :     virtual void DropStatsMsgHandler(vr_drop_stats_req *req) = 0;
      73             :     virtual void VxLanMsgHandler(vr_vxlan_req *req) = 0;
      74           0 :     virtual void VrouterHugePageHandler(vr_hugepage_config *req) {}
      75             :     virtual void VrouterOpsMsgHandler(vrouter_ops *req) = 0;
      76             :     virtual void QosConfigMsgHandler(vr_qos_map_req *req) = 0;
      77             :     virtual void ForwardingClassMsgHandler(vr_fc_map_req *req) = 0;
      78       42651 :     virtual void SetErrno(int err) {errno_ = err;}
      79             : 
      80        1082 :     int GetErrno() const {return errno_;}
      81        1874 :     void set_ksync_io_ctx(const KSyncIoContext *ioc) {ksync_io_ctx_ = ioc;}
      82         144 :     const KSyncIoContext *ksync_io_ctx() const {return ksync_io_ctx_;}
      83             : private:
      84             :     int errno_;
      85             :     const KSyncIoContext *ksync_io_ctx_;
      86             : };
      87             : 
      88             : 
      89             : /* Base class for context management. Used while sending and
      90             :  * receiving data via ksync socket
      91             :  */
      92             : class  IoContext {
      93             : public:
      94             :     // Type of IoContext. The work-queue used for processing response is based
      95             :     // on this
      96             :     // IOC_UVE : Used for UVEs
      97             :     // IOC_KSYNC : Used for KSync objects
      98             :     enum Type {
      99             :         IOC_UVE,
     100             :         IOC_KSYNC,
     101             :         MAX_WORK_QUEUES // This should always be last
     102             :     };
     103             :     // Work-queue neames used for the ksync receive work-queues
     104             :     static const char *io_wq_names[MAX_WORK_QUEUES];
     105             : 
     106             :     IoContext() :
     107             :         sandesh_context_(NULL), msg_(NULL), msg_len_(0), seqno_(0),
     108             :         type_(IOC_KSYNC), index_(0), rx_buffer1_(NULL), rx_buffer2_(NULL) {
     109             :     }
     110           0 :     IoContext(char *msg, uint32_t len, uint32_t seq, AgentSandeshContext *ctx,
     111           0 :               Type type) :
     112           0 :         sandesh_context_(ctx), msg_(msg), msg_len_(len), seqno_(seq),
     113           0 :         type_(type), index_(0), rx_buffer1_(NULL), rx_buffer2_(NULL) {
     114           0 :     }
     115         937 :     IoContext(char *msg, uint32_t len, uint32_t seq, AgentSandeshContext *ctx,
     116         937 :               Type type, uint32_t index) :
     117         937 :         sandesh_context_(ctx), msg_(msg), msg_len_(len), seqno_(seq),
     118         937 :         type_(type), index_(index), rx_buffer1_(NULL), rx_buffer2_(NULL) {
     119         937 :     }
     120         937 :     virtual ~IoContext() {
     121         937 :         if (msg_ != NULL)
     122         937 :             free(msg_);
     123         937 :         assert(rx_buffer1_ == NULL);
     124         937 :         assert(rx_buffer2_ == NULL);
     125         937 :     }
     126             : 
     127             :     bool operator<(const IoContext &rhs) const {
     128             :         return seqno_ < rhs.seqno_;
     129             :     }
     130             : 
     131           0 :     virtual void Handler() {}
     132           0 :     virtual void ErrorHandler(int err) {}
     133             : 
     134        3892 :     AgentSandeshContext *GetSandeshContext() { return sandesh_context_; }
     135        2811 :     Type type() { return type_; }
     136             : 
     137         937 :     void SetSeqno(uint32_t seqno) {seqno_ = seqno;}
     138         937 :     uint32_t GetSeqno() const {return seqno_;}
     139         937 :     char *GetMsg() const { return msg_; }
     140        1874 :     uint32_t GetMsgLen() const { return msg_len_; }
     141        1081 :     char *rx_buffer1() const { return rx_buffer1_; }
     142         144 :     void reset_rx_buffer1() { rx_buffer1_ = NULL; }
     143        1081 :     char *rx_buffer2() { return rx_buffer2_; }
     144         144 :     void reset_rx_buffer2() { rx_buffer2_ = NULL; }
     145        2811 :     uint32_t index() const { return index_; }
     146             : 
     147             :     boost::intrusive::list_member_hook<> node_;
     148             : 
     149             : protected:
     150             :     AgentSandeshContext *sandesh_context_;
     151             : 
     152             : private:
     153             :     char *msg_;
     154             :     uint32_t msg_len_;
     155             :     uint32_t seqno_;
     156             :     Type type_;
     157             :     uint32_t index_;
     158             :     // Buffers allocated to read the ksync responses for this IoContext.
     159             :     // As an optimization, KSync Tx Queue will use these buffers to minimize
     160             :     // computation in KSync Tx Queue context.
     161             :     char *rx_buffer1_;
     162             :     char *rx_buffer2_;
     163             : 
     164             :     friend class KSyncSock;
     165             : };
     166             : 
     167             : /* IoContext tied to KSyncEntry */
     168             : class  KSyncIoContext : public IoContext {
     169             : public:
     170             :     KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry, int msg_len,
     171             :                    char *msg, KSyncEntry::KSyncEvent event);
     172        1874 :     virtual ~KSyncIoContext() {
     173        1874 :     }
     174             : 
     175             :     virtual void Handler();
     176             : 
     177             :     void ErrorHandler(int err);
     178         144 :     KSyncEntry *GetKSyncEntry() const {return entry_;}
     179         144 :     KSyncEntry::KSyncEvent event() const {return event_;}
     180             : private:
     181             :     KSyncEntry *entry_;
     182             :     KSyncEntry::KSyncEvent event_;
     183             :     AgentSandeshContext *agent_sandesh_ctx_;
     184             :     KSyncSock *sock_;
     185             : };
     186             : 
     187             : /*
     188             :  * KSync implementation of bunching messages.
     189             :  *
     190             :  * Message bunching has two parts,
     191             :  * Encoding messages
     192             :  *   KSyncTxQueue is responsible to bunch KSync requests into single message
     193             :  *
     194             :  *   KSyncTxQueue uses KSyncBulkMsgContext to bunch KSync events.
     195             :  *   The IoContext for bunched KSync events are stored as list inside
     196             :  *   KSyncBulkMessageContext
     197             :  *
     198             :  *   KSync Events are bunched if pass following constraints,
     199             :  *   - Number of KSync events is less than max_bulk_msg_count_
     200             :  *   - Total size of buffer for events is less than max_bulk_buf_size_
     201             :  *   - Type of IoContext is same.
     202             :  *     When type of IoContext is same, it also ensures that KSync Responses
     203             :  *     are processed in same WorkQueue.
     204             :  *   - Each IoContext type has multiple work-queues. Flows are sprayed across
     205             :  *     the work-queues based on flow-table index. Entries are bunched only if
     206             :  *     they belong to same work-queue.
     207             :  *     This also ensures that flows from a flow-table partition are processed
     208             :  *     in single KSync Response Work-Queue
     209             :  *   - The input queue for KSyncTxQueue is not empty.
     210             :  *
     211             :  * Decoding messages
     212             :  *   KSync response are processed in context of KSyncSock::receive_work_queue_.
     213             :  *   Each IoContext type has its own set of receive work-queue.
     214             :  *
     215             :  *   Bulk decoder works on following assumption,
     216             :  *   - Expects KSync Responses for each IoContext in KSyncBulkMsgContext
     217             :  *   - Response for each IoContext can itself be more than one Sandesh Response.
     218             :  *   - The sequence of response is same as sequence of IoContext entries
     219             :  *   - Response for each IoContext starts with VrResponseMsg followed
     220             :  *     optionally by other response. Hence, decoder moves to next IoContext
     221             :  *     after getting VrResponse message
     222             :  *
     223             :  *     class KSyncBulkSandeshContext is used to decode the Sandesh Responses
     224             :  *     and move the IoContext on getting VrResponse
     225             :  */
     226             : typedef boost::intrusive::member_hook<IoContext,
     227             :         boost::intrusive::list_member_hook<>,
     228             :         &IoContext::node_> KSyncSockNode;
     229             : typedef boost::intrusive::list<IoContext, KSyncSockNode> IoContextList;
     230             : 
     231             : class KSyncBulkMsgContext {
     232             : public:
     233             :     const static unsigned kMaxRxBufferCount = 64;
     234             :     KSyncBulkMsgContext(IoContext::Type type, uint32_t index);
     235             :     KSyncBulkMsgContext(const KSyncBulkMsgContext &rhs);
     236             :     ~KSyncBulkMsgContext();
     237             : 
     238             :     void Insert(IoContext *ioc);
     239             :     void Data(KSyncBufferList *iovec);
     240        1906 :     IoContext::Type io_context_type() const {
     241        1906 :         return io_context_type_;
     242             :     }
     243             :     void AddReceiveBuffer(char *buff);
     244             :     char *GetReceiveBuffer();
     245        1906 :     uint32_t work_queue_index() const { return work_queue_index_; }
     246           0 :     void set_seqno(uint32_t seq) { seqno_ = seq; }
     247           0 :     uint32_t seqno() { return seqno_; }
     248             : private:
     249             :     friend class KSyncBulkSandeshContext;
     250             :     // List of IoContext to be processed in this context
     251             :     IoContextList io_context_list_;
     252             :     // Type of message
     253             :     IoContext::Type io_context_type_;
     254             :     // Index of work-queue
     255             :     uint32_t work_queue_index_;
     256             :     // List of rx-buffers
     257             :     // The buffers are taken from IoContext added to the list above
     258             :     // If IoContext does not have buffer, then it dyamically allocates
     259             :     // buffer
     260             :     char *rx_buffers_[kMaxRxBufferCount];
     261             :     // Index of next buffer to process
     262             :     uint32_t rx_buffer_index_;
     263             : 
     264             :     ///////////////////////////////////////////////////////////////////////
     265             :     // Following fields are used for decode processing
     266             :     ///////////////////////////////////////////////////////////////////////
     267             :     // Number of responses already processed
     268             :     uint32_t vr_response_count_;
     269             :     // Iterator to IoContext being processed
     270             :     IoContextList::iterator io_context_list_it_;
     271             :     uint32_t seqno_;
     272             : };
     273             : 
     274             : class KSyncBulkSandeshContext : public AgentSandeshContext {
     275             : public:
     276             :     KSyncBulkSandeshContext();
     277             :     virtual ~KSyncBulkSandeshContext();
     278             : 
     279             :     void IfMsgHandler(vr_interface_req *req);
     280             :     void NHMsgHandler(vr_nexthop_req *req);
     281             :     void RouteMsgHandler(vr_route_req *req);
     282             :     void MplsMsgHandler(vr_mpls_req *req);
     283             :     int VrResponseMsgHandler(vr_response *resp);
     284             :     void MirrorMsgHandler(vr_mirror_req *req);
     285             :     void FlowMsgHandler(vr_flow_req *req);
     286             :     void FlowResponseHandler(vr_flow_response *req);
     287             :     void VrfAssignMsgHandler(vr_vrf_assign_req *req);
     288             :     void VrfMsgHandler(vr_vrf_req *req);
     289             :     void VrfStatsMsgHandler(vr_vrf_stats_req *req);
     290             :     void DropStatsMsgHandler(vr_drop_stats_req *req);
     291             :     void VxLanMsgHandler(vr_vxlan_req *req);
     292             :     void VrouterOpsMsgHandler(vrouter_ops *req);
     293             :     void QosConfigMsgHandler(vr_qos_map_req *req);
     294             :     void ForwardingClassMsgHandler(vr_fc_map_req *req);
     295             :     void SetErrno(int err);
     296             : 
     297             :     bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more);
     298             :     AgentSandeshContext *GetSandeshContext();
     299         969 :     void set_bulk_message_context(KSyncBulkMsgContext *bulk_context) {
     300         969 :         bulk_msg_context_ = bulk_context;
     301         969 :     }
     302             : 
     303             :     void IoContextStart();
     304             :     void IoContextDone();
     305             : 
     306             : private:
     307             :     KSyncBulkMsgContext *bulk_msg_context_;
     308             :     DISALLOW_COPY_AND_ASSIGN(KSyncBulkSandeshContext);
     309             : };
     310             : 
     311             : class KSyncSock {
     312             : public:
     313             :     // Number of flow receive queues
     314             :     const static int kRxWorkQueueCount = 2;
     315             :     const static int kMsgGrowSize = 16;
     316             :     const static unsigned kBufLen = (4*1024);
     317             : 
     318             :     // Number of messages that can be bunched together
     319             :     const static unsigned kMaxBulkMsgCount = 16;
     320             :     // Sequence number to denote invalid builk-context
     321             :     const static unsigned kInvalidBulkSeqNo = 0xFFFFFFFF;
     322             : 
     323             :     typedef std::map<uint32_t, KSyncBulkMsgContext> WaitTree;
     324             :     typedef std::pair<uint32_t, KSyncBulkMsgContext> WaitTreePair;
     325             :     typedef boost::function<void(const boost::system::error_code &, size_t)>
     326             :         HandlerCb;
     327             : 
     328             :     // Request structure in the KSync Response Queue
     329             :     struct KSyncRxData {
     330             :         // buffer having KSync response
     331             :         char *buff_;
     332             :         // bulk context for decoding response
     333             :         KSyncBulkMsgContext *bulk_msg_context_;
     334             : 
     335         329 :         KSyncRxData() : buff_(NULL), bulk_msg_context_(NULL) { }
     336        3876 :         KSyncRxData(const KSyncRxData &rhs) :
     337        3876 :             buff_(rhs.buff_), bulk_msg_context_(rhs.bulk_msg_context_) {
     338        3876 :         }
     339         969 :         KSyncRxData(char *buff, KSyncBulkMsgContext *ctxt) :
     340         969 :             buff_(buff), bulk_msg_context_(ctxt) {
     341         969 :         }
     342             :     };
     343             :     typedef WorkQueue<KSyncRxData> KSyncReceiveQueue;
     344             :     // structure for ksyncrprocess Rx queue
     345             :     struct KSyncRxQueueData {
     346             :          KSyncEntry             *entry_;
     347             :          KSyncEntry::KSyncEvent event_;
     348         329 :          KSyncRxQueueData():entry_(NULL),event_(KSyncEntry::INVALID) {}
     349         937 :          KSyncRxQueueData(KSyncEntry *entry, KSyncEntry::KSyncEvent event) :
     350         937 :                  entry_(entry), event_(event) {
     351         937 :          }
     352             :      };
     353             :     typedef WorkQueue<KSyncRxQueueData> KSyncRxWorkQueue;
     354             : 
     355             :     KSyncSock();
     356             :     virtual ~KSyncSock();
     357             : 
     358             :     // Virtual methods
     359             :     virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt) = 0;
     360             :     virtual bool Decoder(char *data, AgentSandeshContext *ctxt) = 0;
     361             : 
     362             :     // Write a KSyncEntry to kernel
     363             :     void SendAsync(KSyncEntry *entry, int msg_len, char *msg,
     364             :                    KSyncEntry::KSyncEvent event);
     365             :     std::size_t BlockingSend(char *msg, int msg_len);
     366             :     bool BlockingRecv();
     367             :     void GenericSend(IoContext *ctx);
     368             :     uint32_t AllocSeqNo(IoContext::Type type);
     369             :     uint32_t AllocSeqNo(IoContext::Type type, uint32_t instance);
     370             :     KSyncReceiveQueue *GetReceiveQueue(IoContext::Type type, uint32_t instance);
     371             :     KSyncReceiveQueue *GetReceiveQueue(uint32_t seqno);
     372             : 
     373             :     // Bulk Messaging methods
     374             :     KSyncBulkMsgContext *LocateBulkContext(uint32_t seqno,
     375             :                              IoContext::Type io_context_type,
     376             :                              uint32_t work_queue_index);
     377             :     int SendBulkMessage(KSyncBulkMsgContext *bulk_context, uint32_t seqno);
     378             :     bool TryAddToBulk(KSyncBulkMsgContext *bulk_context, IoContext *ioc);
     379             :     void OnEmptyQueue(bool done);
     380           2 :     int tx_count() const { return tx_count_; }
     381             : 
     382             :     // Start Ksync Asio operations
     383             :     static void Start(bool read_inline);
     384             :     static void Shutdown();
     385             : 
     386             :     // Partition to KSyncSock mapping
     387             :     static KSyncSock *Get(DBTablePartBase *partition);
     388             :     static KSyncSock *Get(int partition_id);
     389             : 
     390           0 :     static uint32_t GetPid() {return pid_;};
     391        1877 :     static int GetNetlinkFamilyId() {return vnsw_netlink_family_id_;}
     392             :     static void SetNetlinkFamilyId(int id);
     393             : 
     394         940 :     static AgentSandeshContext *GetAgentSandeshContext(uint32_t type) {
     395         940 :         return agent_sandesh_ctx_[type % kRxWorkQueueCount];
     396             :     }
     397           4 :     static void SetAgentSandeshContext(AgentSandeshContext *ctx, uint32_t idx) {
     398           4 :         agent_sandesh_ctx_[idx] = ctx;
     399           4 :     }
     400             : 
     401           0 :     const KSyncTxQueue *send_queue() const { return &send_queue_; }
     402           0 :     const KSyncReceiveQueue *get_receive_work_queue(uint16_t index) const {
     403           0 :         return ksync_rx_queue[index];
     404             :     }
     405             :     // Allocate a recieve work-queue
     406             : 
     407             :     KSyncReceiveQueue *AllocQueue(KSyncBulkSandeshContext ctxt[],
     408             :                                   uint32_t task_id, uint32_t instance,
     409             :                                   const char *name);
     410             : 
     411             :     uint32_t WaitTreeSize() const;
     412             :     void SetSeqno(uint32_t seq);
     413             :     void SetMeasureQueueDelay(bool val);
     414           0 :     void reset_use_wait_tree() { use_wait_tree_ = false; }
     415           0 :     void set_process_data_inline() { process_data_inline_ = true; }
     416             :     // API to enqueue ksync events to rx process work queue
     417             :     void EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event);
     418             : protected:
     419             :     static void Init(bool use_work_queue, const std::string &cpu_pin_policy);
     420             :     static void SetSockTableEntry(KSyncSock *sock);
     421             :     bool ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context);
     422             :     KSyncBulkSandeshContext *GetBulkSandeshContext(uint32_t seqno);
     423             :     void ProcessDataInline(char *data);
     424             : 
     425             :     std::mutex mutex_;
     426             :     nl_client *nl_client_;
     427             :     // Tree of all IoContext pending ksync response
     428             :     WaitTree wait_tree_;
     429             :     KSyncTxQueue send_queue_;
     430             :     KSyncReceiveQueue *uve_rx_queue[kRxWorkQueueCount];
     431             :     KSyncReceiveQueue *ksync_rx_queue[kRxWorkQueueCount];
     432             : 
     433             :     // Information maintained for bulk processing
     434             : 
     435             :     // Max messages in one bulk context
     436             :     uint32_t max_bulk_msg_count_;
     437             :     // Max buffer size in one bulk context
     438             :     uint32_t max_bulk_buf_size_;
     439             : 
     440             :     // Sequence number of first message in bulk context. Entry in WaitTree is
     441             :     // added based on this sequence number
     442             :     uint32_t bulk_seq_no_;
     443             :     // Current buffer size in bulk context
     444             :     uint32_t bulk_buf_size_;
     445             :     // Current message count in bulk context
     446             :     uint32_t bulk_msg_count_;
     447             : 
     448             :     uint32_t bmca_prod_;
     449             :     uint32_t bmca_cons_;
     450             :     KSyncBulkMsgContext *bulk_mctx_arr_[KSYNC_BMC_ARR_SIZE];
     451             : 
     452             : private:
     453             :     friend class KSyncTxQueue;
     454             :     virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb) = 0;
     455             :     virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
     456             :                              HandlerCb cb) = 0;
     457             :     virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no) = 0;
     458             :     virtual void Receive(boost::asio::mutable_buffers_1) = 0;
     459             :     virtual uint32_t GetSeqno(char *data) = 0;
     460             :     virtual bool IsMoreData(char *data) = 0;
     461             :     virtual bool Validate(char *data) = 0;
     462             : 
     463             :     // Read handler registered with boost::asio. Demux done based on seqno_
     464             :     void ReadHandler(const boost::system::error_code& error,
     465             :                      size_t bytes_transferred);
     466             : 
     467             :     // Write handler registered with boost::asio. Demux done based on seqno_
     468             :     void WriteHandler(const boost::system::error_code& error,
     469             :                       size_t bytes_transferred);
     470             : 
     471             :     bool ProcessKernelData(KSyncBulkSandeshContext *ksync_context,
     472             :                            const KSyncRxData &data);
     473             :     bool ProcessRxData(KSyncRxQueueData data);
     474             :     bool SendAsyncImpl(IoContext *ioc);
     475             :     bool SendAsyncStart() {
     476             :         std::scoped_lock lock(mutex_);
     477             :         return (wait_tree_.size() <= KSYNC_ACK_WAIT_THRESHOLD);
     478             :     }
     479             : 
     480             : private:
     481             :     char *rx_buff_;
     482             :     std::atomic<uint32_t> seqno_;
     483             :     std::atomic<uint32_t> uve_seqno_;
     484             :     // Read ksync responses inline
     485             :     // The IoContext WaitTree is not used when response is read-inline
     486             :     bool read_inline_;
     487             :     KSyncBulkMsgContext *bulk_msg_context_;
     488             :     bool use_wait_tree_;
     489             :     bool process_data_inline_;
     490             :     KSyncBulkSandeshContext ksync_bulk_sandesh_context_[kRxWorkQueueCount];
     491             :     KSyncBulkSandeshContext uve_bulk_sandesh_context_[kRxWorkQueueCount];
     492             : 
     493             :     // Debug stats
     494             :     int tx_count_;
     495             :     int ack_count_;
     496             :     int err_count_;
     497             :     
     498             :     // IO context can defer ksync event processing 
     499             :     // by defering them to this work queue, this queue gets 
     500             :     // processed in Agent::KSync context
     501             :     KSyncRxWorkQueue  rx_process_queue_;
     502             :     static std::unique_ptr<KSyncSock> sock_;
     503             :     static pid_t pid_;
     504             :     static int vnsw_netlink_family_id_;
     505             :     // AgentSandeshContext used for KSync response handling
     506             :     // AgentSandeshContext used for decode is picked based on work-queue index
     507             :     // Picking AgentSandeshContext based on work-queue index also makes it
     508             :     // thread safe
     509             :     static AgentSandeshContext *agent_sandesh_ctx_[kRxWorkQueueCount];
     510             :     static std::atomic<bool> shutdown_;
     511             : 
     512             :     DISALLOW_COPY_AND_ASSIGN(KSyncSock);
     513             : };
     514             : 
     515             : //netlink socket class for interacting with kernel
     516             : class KSyncSockNetlink : public KSyncSock {
     517             : public:
     518             :     KSyncSockNetlink(boost::asio::io_context &ios, int protocol);
     519             :     virtual ~KSyncSockNetlink();
     520             : 
     521             :     virtual uint32_t GetSeqno(char *data);
     522             :     virtual bool IsMoreData(char *data);
     523             :     virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt);
     524             :     virtual bool Decoder(char *data, AgentSandeshContext *ctxt);
     525             :     virtual bool Validate(char *data);
     526             :     virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb);
     527             :     virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
     528             :                              HandlerCb cb);
     529             :     virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no);
     530             :     virtual void Receive(boost::asio::mutable_buffers_1);
     531             : 
     532             :     static void NetlinkDecoder(char *data, SandeshContext *ctxt);
     533             :     static void NetlinkBulkDecoder(char *data, SandeshContext *ctxt, bool more);
     534             :     static void Init(boost::asio::io_context &ios, int protocol, bool use_work_queue,
     535             :                      const std::string &cpu_pin_policy);
     536             : private:
     537             :     boost::asio::netlink::raw::socket sock_;
     538             : };
     539             : 
     540             : //udp socket class for interacting with user vrouter
     541             : class KSyncSockUdp : public KSyncSock {
     542             : public:
     543             :     KSyncSockUdp(boost::asio::io_context &ios, int port);
     544           0 :     virtual ~KSyncSockUdp() { }
     545             : 
     546             :     virtual uint32_t GetSeqno(char *data);
     547             :     virtual bool IsMoreData(char *data);
     548             :     virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt);
     549             :     virtual bool Decoder(char *data, AgentSandeshContext *ctxt);
     550             :     virtual bool Validate(char *data);
     551             :     virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb);
     552             :     virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
     553             :                              HandlerCb cb);
     554             :     virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no);
     555             :     virtual void Receive(boost::asio::mutable_buffers_1);
     556             : 
     557             :     static void Init(boost::asio::io_context &ios, int port,
     558             :                      const std::string &cpu_pin_policy);
     559             : private:
     560             :     boost::asio::ip::udp::socket sock_;
     561             :     boost::asio::ip::udp::endpoint server_ep_;
     562             : };
     563             : 
     564             : //Unix domain socket class for interacting with user vrouter
     565             : #define KSYNC_AGENT_VROUTER_SOCK_PATH "/var/run/vrouter/dpdk_netlink"
     566             : class KSyncSockUds : public KSyncSock {
     567             : public:
     568             :     KSyncSockUds(boost::asio::io_context &ios);
     569           0 :     virtual ~KSyncSockUds() {
     570           0 :         delete rx_buff_;
     571           0 :         delete rx_buff_q_;
     572           0 :     }
     573             : 
     574             :     virtual uint32_t GetSeqno(char *data);
     575             :     virtual bool IsMoreData(char *data);
     576             :     virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt);
     577             :     virtual bool Decoder(char *data, AgentSandeshContext *ctxt);
     578             :     virtual bool Validate(char *data);
     579             :     virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb);
     580             :     virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
     581             :                              HandlerCb cb);
     582             :     virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no);
     583             :     virtual void Receive(boost::asio::mutable_buffers_1);
     584             :     virtual bool Run(void);
     585             : 
     586             :     static void Init(boost::asio::io_context &ios,
     587             :                      const std::string &cpu_pin_policy,
     588             :                      const std::string &sockpathvr="");
     589             : private:
     590             :     boost::asio::local::stream_protocol::socket sock_;
     591             :     boost::asio::local::stream_protocol::endpoint server_ep_;
     592             :     char *rx_buff_;
     593             :     char *rx_buff_q_;
     594             :     size_t remain_;
     595             :     int socket_;
     596             :     int connected_;
     597             :     static string sockpath_;
     598             : };
     599             : 
     600             : class KSyncSockTcpSessionReader : public TcpMessageReader {
     601             : public:
     602             :      KSyncSockTcpSessionReader(TcpSession *session, ReceiveCallback callback);
     603           0 :      virtual ~KSyncSockTcpSessionReader() { }
     604             : 
     605             : protected:
     606             :     virtual int MsgLength(Buffer buffer, int offset);
     607             : 
     608           0 :     virtual const int GetHeaderLenSize() {
     609           0 :         return sizeof(struct nlmsghdr);
     610             :     }
     611             : 
     612           0 :     virtual const int GetMaxMessageSize() {
     613           0 :         return kMaxMessageSize;
     614             :     }
     615             : 
     616             : private:
     617             :     static const int kMaxMessageSize = 4096;
     618             : };
     619             : 
     620             : class KSyncSockTcpSession : public TcpSession {
     621             : public:
     622             :     KSyncSockTcpSession(TcpServer *server, Socket *sock,
     623             :                         bool async_ready = false);
     624             :     virtual ~KSyncSockTcpSession();
     625             : protected:
     626             :     virtual void OnRead(Buffer buffer);
     627             : private:
     628             :     KSyncSockTcpSessionReader *reader_;
     629             : };
     630             : 
     631             : class KSyncSockTcp : public KSyncSock, public TcpServer {
     632             : public:
     633             :     KSyncSockTcp(EventManager *evm, boost::asio::ip::address ip_addr,
     634             :                  int port);
     635           0 :     virtual ~KSyncSockTcp() { }
     636             : 
     637             :     virtual uint32_t GetSeqno(char *data);
     638             :     virtual bool IsMoreData(char *data);
     639             :     virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt);
     640             :     virtual bool Decoder(char *data, AgentSandeshContext *ctxt);
     641             :     virtual bool Validate(char *data);
     642             :     virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb);
     643             :     virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
     644             :                              HandlerCb cb);
     645             :     virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no);
     646             :     virtual void Receive(boost::asio::mutable_buffers_1);
     647             :     virtual TcpSession *AllocSession(Socket *socket);
     648             :     virtual bool Run(void);
     649             : 
     650             :     bool ReceiveMsg(const u_int8_t *msg, size_t size);
     651             :     void OnSessionEvent(TcpSession *session, TcpSession::Event event);
     652           0 :     bool connect_complete() const {
     653           0 :         return connect_complete_;
     654             :     }
     655             :     void AsyncReadStart();
     656             : 
     657             :     static void Init(EventManager *evm,
     658             :                      boost::asio::ip::address ip_addr, int port,
     659             :                      const std::string &cpu_pin_policy);
     660             : private:
     661             :     EventManager *evm_;
     662             :     TcpSession *session_;
     663             :     boost::asio::ip::tcp::endpoint server_ep_;
     664             :     boost::asio::ip::tcp::socket *tcp_socket_;
     665             :     bool connect_complete_;
     666             :     char *rx_buff_;
     667             :     char *rx_buff_rem_;
     668             :     size_t remain_;
     669             : };
     670             : #endif // ctrlplane_ksync_sock_h

Generated by: LCOV version 1.14