Line data Source code
1 : /* 2 : * Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : // ksync_tx_queue.h 6 : // 7 : // Implmentation of a shared transmit queue between agent and KSync. 8 : // 9 : // KSync i/o operations are done on KSync Netlink socket. Even if the socket is 10 : // set to non-blocking mode, the KSync socket i/o call will block till VRouter 11 : // completes message processing. So, netlink i/o is not done in agent context. 12 : // All messages are enqueued to a transmit and messages and i/o is done thru 13 : // the transmit queue. This ensures agent doesnt block due to ksync processing 14 : // 15 : // There are two implementations of transmit queue 16 : // 17 : // WorkQueue based 18 : // --------------- 19 : // WorkQueue based implementation is used only for UT cases. WorkQueue based 20 : // implementation is not used in production enviroment due to performance 21 : // reasons. When we have slow producer and fast consumer, WorkQueue will 22 : // result in spawning of a "task" for every message. The task spawning happens 23 : // in producer context. In case of flow processing, agent is slow producer and 24 : // vrouter is fast consumer. As a result, using WorkQueue will result in many 25 : // task spawns thereby introducing latencies. 26 : // 27 : // We retain WorkQueue based implementations since UT code depends on 28 : // WaitForIdle() APIs to continue with validations. Using Event-FD based 29 : // implementations cannot rely on WaitForIdle() 30 : // 31 : // 32 : // Event-FD based 33 : // -------------- 34 : // This implementation is based on event_fd. The producer will add the entry 35 : // into a tbb::concurrent_queue and notify the consumer on an event_fd. The 36 : // consumer will block with "read" on event_fd. On getting an event, it will 37 : // drain the queue of all messages in a tight loop. Consumer blocks on "read" 38 : // when there is no data in the queue. This is an efficient implementation of 39 : // queue between agent and ksync 40 : // 41 : #ifndef controller_src_ksync_ksync_tx_queue_h 42 : #define controller_src_ksync_ksync_tx_queue_h 43 : 44 : #include <sys/eventfd.h> 45 : #include <pthread.h> 46 : #include <algorithm> 47 : #include <atomic> 48 : #include <vector> 49 : #include <set> 50 : 51 : #include <tbb/concurrent_queue.h> 52 : class KSyncSock; 53 : class IoContext; 54 : 55 : class KSyncTxQueue { 56 : public: 57 : typedef tbb::concurrent_queue<IoContext *> Queue; 58 : 59 : KSyncTxQueue(KSyncSock *sock); 60 : ~KSyncTxQueue(); 61 : 62 : void Init(bool use_work_queue, const std::string &cpu_pin_policy); 63 : void Shutdown(); 64 : bool Run(); 65 : 66 0 : size_t enqueues() const { return enqueues_; } 67 0 : size_t dequeues() const { return dequeues_; } 68 : uint32_t write_events() const { return write_events_; } 69 0 : uint32_t read_events() const { return read_events_; } 70 0 : size_t queue_len() const { return queue_len_; } 71 0 : uint64_t busy_time() const { return busy_time_; } 72 0 : uint32_t max_queue_len() const { return max_queue_len_; } 73 0 : void set_measure_busy_time(bool val) const { measure_busy_time_ = val; } 74 0 : void ClearStats() const { 75 0 : max_queue_len_ = 0; 76 0 : enqueues_ = 0; 77 0 : dequeues_ = 0; 78 0 : busy_time_ = 0; 79 0 : read_events_ = 0; 80 0 : } 81 : 82 0 : bool Enqueue(IoContext *io_context) { 83 0 : return EnqueueInternal(io_context); 84 : } 85 : 86 : private: 87 : bool EnqueueInternal(IoContext *io_context); 88 : 89 : WorkQueue<IoContext *> *work_queue_; 90 : int event_fd_; 91 : // CPU pinning policy for netlink task 92 : std::string cpu_pin_policy_; 93 : KSyncSock *sock_; 94 : Queue queue_; 95 : std::atomic<bool> shutdown_; 96 : pthread_t event_thread_; 97 : std::atomic<size_t> queue_len_; 98 : mutable size_t max_queue_len_; 99 : 100 : mutable size_t enqueues_; 101 : mutable size_t dequeues_; 102 : mutable size_t write_events_; 103 : mutable size_t read_events_; 104 : mutable uint64_t busy_time_; 105 : mutable bool measure_busy_time_; 106 : 107 : DISALLOW_COPY_AND_ASSIGN(KSyncTxQueue); 108 : }; 109 : 110 : #endif // controller_src_ksync_ksync_tx_queue_h