Line data Source code
1 : /* 2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include <sandesh/request_pipeline.h> 6 : 7 : #include <boost/bind/bind.hpp> 8 : #include <boost/assign/list_of.hpp> 9 : #include "base/logging.h" 10 : 11 : #include "ifmap/ifmap_client.h" 12 : #include "ifmap/ifmap_sandesh_context.h" 13 : #include "ifmap/ifmap_server.h" 14 : #include "ifmap/ifmap_xmpp.h" 15 : #include "ifmap/ifmap_server_show_types.h" // sandesh 16 : 17 : using namespace boost::assign; 18 : using namespace std; 19 : using namespace boost::placeholders; 20 : 21 : class ShowIFMapXmppClientInfo { 22 : public: 23 : static const int kMaxElementsPerRound = 50; 24 : 25 : struct ShowData : public RequestPipeline::InstData { 26 : vector<IFMapXmppClientInfo> send_buffer; 27 : }; 28 : 29 0 : static RequestPipeline::InstData *AllocBuffer(int stage) { 30 0 : return static_cast<RequestPipeline::InstData *>(new ShowData); 31 : } 32 : 33 : struct TrackerData : public RequestPipeline::InstData { 34 : // init as 1 indicates we need to init 'first' to begin() since there is 35 : // no way to initialize an iterator here. 36 0 : TrackerData() : init(1) { } 37 : int init; 38 : vector<IFMapXmppClientInfo>::const_iterator first; 39 : }; 40 : 41 0 : static RequestPipeline::InstData *AllocTracker(int stage) { 42 0 : return static_cast<RequestPipeline::InstData *>(new TrackerData); 43 : } 44 : 45 : static bool BufferStage(const Sandesh *sr, 46 : const RequestPipeline::PipeSpec ps, int stage, 47 : int instNum, RequestPipeline::InstData *data); 48 : static void CopyNode(IFMapXmppClientInfo *dest, IFMapClient *src); 49 : static bool SendStage(const Sandesh *sr, const RequestPipeline::PipeSpec ps, 50 : int stage, int instNum, 51 : RequestPipeline::InstData *data); 52 : }; 53 : 54 0 : void ShowIFMapXmppClientInfo::CopyNode(IFMapXmppClientInfo *dest, 55 : IFMapClient *src) { 56 0 : dest->set_client_name(src->identifier()); 57 0 : dest->set_client_index(src->index()); 58 0 : dest->set_msgs_sent(src->msgs_sent()); 59 0 : dest->set_msgs_blocked(src->msgs_blocked()); 60 0 : dest->set_update_nodes_sent(src->update_nodes_sent()); 61 0 : dest->set_delete_nodes_sent(src->delete_nodes_sent()); 62 0 : dest->set_update_links_sent(src->update_links_sent()); 63 0 : dest->set_delete_links_sent(src->delete_links_sent()); 64 0 : dest->set_bytes_sent(src->bytes_sent()); 65 0 : dest->set_is_blocked(src->send_is_blocked()); 66 : 67 0 : VmRegInfo vm_reg_info; 68 0 : vm_reg_info.vm_list = src->vm_list(); 69 0 : vm_reg_info.vm_count = vm_reg_info.vm_list.size(); 70 0 : dest->set_vm_reg_info(vm_reg_info); 71 0 : } 72 : 73 0 : bool ShowIFMapXmppClientInfo::BufferStage(const Sandesh *sr, 74 : const RequestPipeline::PipeSpec ps, 75 : int stage, int instNum, 76 : RequestPipeline::InstData *data) { 77 : const IFMapXmppClientInfoShowReq *request = 78 0 : static_cast<const IFMapXmppClientInfoShowReq *>(ps.snhRequest_.get()); 79 : IFMapSandeshContext *sctx = 80 0 : static_cast<IFMapSandeshContext *>(request->module_context("IFMap")); 81 0 : IFMapServer* server = sctx->ifmap_server(); 82 : 83 0 : IFMapServer::ClientMap client_map = server->GetClientMap(); 84 : 85 0 : ShowData *show_data = static_cast<ShowData *>(data); 86 0 : show_data->send_buffer.reserve(client_map.size()); 87 : 88 0 : for (IFMapServer::ClientMap::iterator iter = client_map.begin(); 89 0 : iter != client_map.end(); ++iter) { 90 0 : IFMapXmppClientInfo dest; 91 0 : IFMapClient *src = iter->second; 92 0 : CopyNode(&dest, src); 93 0 : show_data->send_buffer.push_back(dest); 94 0 : } 95 : 96 0 : return true; 97 0 : } 98 : 99 : // Can be called multiple times i.e. approx total/kMaxElementsPerRound 100 0 : bool ShowIFMapXmppClientInfo::SendStage(const Sandesh *sr, 101 : const RequestPipeline::PipeSpec ps, 102 : int stage, int instNum, 103 : RequestPipeline::InstData *data) { 104 0 : const RequestPipeline::StageData *prev_stage_data = ps.GetStageData(0); 105 : const ShowIFMapXmppClientInfo::ShowData &show_data = 106 : static_cast<const ShowIFMapXmppClientInfo::ShowData &> 107 0 : (prev_stage_data->at(0)); 108 : 109 : // Data for this stage 110 0 : TrackerData *tracker_data = static_cast<TrackerData *>(data); 111 : 112 0 : vector<IFMapXmppClientInfo> dest_buffer; 113 0 : vector<IFMapXmppClientInfo>::const_iterator first, last; 114 0 : bool more = false; 115 : 116 0 : if (tracker_data->init) { 117 0 : first = show_data.send_buffer.begin(); 118 0 : tracker_data->init = 0; 119 : } else { 120 0 : first = tracker_data->first; 121 : } 122 0 : int rem_num = show_data.send_buffer.end() - first; 123 0 : int send_num = (rem_num < kMaxElementsPerRound) ? rem_num : 124 : kMaxElementsPerRound; 125 0 : last = first + send_num; 126 0 : copy(first, last, back_inserter(dest_buffer)); 127 : // Decide if we want to be called again. 128 0 : if ((rem_num - send_num) > 0) { 129 0 : more = true; 130 : } else { 131 0 : more = false; 132 : } 133 : const IFMapXmppClientInfoShowReq *request = 134 0 : static_cast<const IFMapXmppClientInfoShowReq *>(ps.snhRequest_.get()); 135 0 : IFMapXmppClientInfoShowResp *response = new IFMapXmppClientInfoShowResp(); 136 0 : response->set_client_stats(dest_buffer); 137 0 : response->set_context(request->context()); 138 0 : response->set_more(more); 139 0 : response->Response(); 140 0 : tracker_data->first = first + send_num; 141 : 142 : // Return 'false' to be called again 143 0 : return (!more); 144 0 : } 145 : 146 0 : void IFMapXmppClientInfoShowReq::HandleRequest() const { 147 : 148 0 : RequestPipeline::StageSpec s0, s1; 149 0 : TaskScheduler *scheduler = TaskScheduler::GetInstance(); 150 : 151 : // 2 stages - first: gather/read, second: send 152 : 153 0 : s0.taskId_ = scheduler->GetTaskId("db::IFMapTable"); 154 0 : s0.allocFn_ = ShowIFMapXmppClientInfo::AllocBuffer; 155 0 : s0.cbFn_ = ShowIFMapXmppClientInfo::BufferStage; 156 0 : s0.instances_.push_back(0); 157 : 158 : // control-node ifmap show command task 159 0 : s1.taskId_ = scheduler->GetTaskId("ifmap::ShowCommandSendStage"); 160 0 : s1.allocFn_ = ShowIFMapXmppClientInfo::AllocTracker; 161 0 : s1.cbFn_ = ShowIFMapXmppClientInfo::SendStage; 162 0 : s1.instances_.push_back(0); 163 : 164 0 : RequestPipeline::PipeSpec ps(this); 165 0 : ps.stages_= list_of(s0)(s1) 166 0 : .convert_to_container<vector<RequestPipeline::StageSpec> >(); 167 0 : RequestPipeline rp(ps); 168 0 : }