Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include <stdint.h>
6 : #include <string.h>
7 : #include <pugixml/pugixml.hpp>
8 : #include <xml/xml_base.h>
9 : #include <xml/xml_pugi.h>
10 :
11 : #include <base/util.h>
12 : #include <base/logging.h>
13 : #include <xmpp/xmpp_channel.h>
14 : #include <ifmap/ifmap_agent_parser.h>
15 :
16 : #include <cmn/agent_cmn.h>
17 : #include <cmn/agent_stats.h>
18 :
19 : #include <controller/controller_init.h>
20 : #include <controller/controller_ifmap.h>
21 : #include <controller/controller_peer.h>
22 : #include <controller/controller_types.h>
23 :
24 :
25 : uint64_t AgentIfMapXmppChannel::seq_number_;
26 :
27 1 : AgentIfMapXmppChannel::AgentIfMapXmppChannel(Agent *agent, XmppChannel *channel,
28 1 : uint8_t cnt) : channel_(channel),
29 1 : xs_idx_(cnt),
30 1 : agent_(agent) {
31 1 : channel_->RegisterReceive(xmps::CONFIG,
32 : boost::bind(&AgentIfMapXmppChannel::ReceiveInternal,
33 : this, _1));
34 1 : config_cleanup_timer_.reset(new ConfigCleanupTimer(agent));
35 1 : end_of_config_timer_.reset(new EndOfConfigTimer(agent, this));
36 1 : }
37 :
38 2 : AgentIfMapXmppChannel::~AgentIfMapXmppChannel() {
39 1 : channel_->UnRegisterWriteReady(xmps::CONFIG);
40 1 : channel_->UnRegisterReceive(xmps::CONFIG);
41 1 : config_cleanup_timer_.reset();
42 1 : end_of_config_timer_.reset();
43 2 : }
44 :
45 2 : uint64_t AgentIfMapXmppChannel::NewSeqNumber() {
46 :
47 2 : seq_number_++;
48 :
49 2 : if (seq_number_ == 0) {
50 0 : return ++seq_number_;
51 : }
52 :
53 2 : CONTROLLER_TRACE(IFMapSeqTrace, GetSeqNumber(), seq_number_, "New Config Seq Num");
54 2 : return seq_number_;
55 : }
56 :
57 3 : ConfigCleanupTimer *AgentIfMapXmppChannel::config_cleanup_timer() {
58 3 : return config_cleanup_timer_.get();
59 : }
60 :
61 3 : EndOfConfigTimer *AgentIfMapXmppChannel::end_of_config_timer() {
62 3 : return end_of_config_timer_.get();
63 : }
64 :
65 0 : bool AgentIfMapXmppChannel::SendUpdate(const std::string &msg) {
66 0 : if (!channel_) return false;
67 0 : return channel_->Send((const uint8_t *)msg.data(), msg.size(), xmps::CONFIG,
68 0 : boost::bind(&AgentIfMapXmppChannel::WriteReadyCb, this, _1));
69 : }
70 :
71 0 : void AgentIfMapXmppChannel::ReceiveUpdate(const XmppStanza::XmppMessage *msg) {
72 0 : if (msg && msg->type == XmppStanza::IQ_STANZA) {
73 0 : std::unique_ptr<XmlBase> impl(XmppXmlImplFactory::Instance()->GetXmlImpl());
74 0 : XmlPugi *pugi = reinterpret_cast<XmlPugi *>(impl.get());
75 0 : XmlPugi *msg_pugi = reinterpret_cast<XmlPugi *>(msg->dom.get());
76 0 : pugi->LoadXmlDoc(msg_pugi->doc());
77 : boost::shared_ptr<ControllerXmppData> data(new ControllerXmppData(xmps::CONFIG,
78 : xmps::UNKNOWN,
79 0 : xs_idx_,
80 0 : std::move(impl),
81 0 : true));
82 0 : agent_->controller()->Enqueue(data);
83 0 : end_of_config_timer()->last_config_receive_time_ = UTCTimestampUsec();
84 0 : }
85 0 : }
86 :
87 0 : void AgentIfMapXmppChannel::ReceiveConfigMessage(std::unique_ptr<XmlBase> impl) {
88 :
89 0 : if (GetXmppServerIdx() != agent_->ifmap_active_xmpp_server_index()) {
90 0 : LOG(WARN, "IFMap config on non primary channel");
91 0 : return;
92 : }
93 :
94 0 : XmlPugi *pugi = reinterpret_cast<XmlPugi *>(impl.get());
95 0 : pugi::xml_node node = pugi->FindNode("config");
96 0 : IFMapAgentParser *parser = agent_->ifmap_parser();
97 0 : assert(parser);
98 0 : parser->ConfigParse(node, seq_number_);
99 : }
100 :
101 0 : void AgentIfMapXmppChannel::ReceiveInternal(const XmppStanza::XmppMessage *msg) {
102 0 : if (agent_->stats())
103 0 : agent_->stats()->incr_xmpp_config_in_msgs(GetXmppServerIdx());
104 0 : ReceiveUpdate(msg);
105 0 : }
106 :
107 0 : std::string AgentIfMapXmppChannel::ToString() const {
108 0 : return channel_->ToString();
109 : }
110 :
111 0 : void AgentIfMapXmppChannel::WriteReadyCb(const boost::system::error_code &ec) {
112 0 : }
113 :
114 0 : void AgentIfMapXmppChannel::StartEndOfConfigTimer() {
115 : //First start for end of config identification.
116 0 : end_of_config_timer()->Start(agent_->controller_xmpp_channel(xs_idx_));
117 0 : }
118 :
119 0 : void AgentIfMapXmppChannel::StopEndOfConfigTimer() {
120 : //First start for end of config identification.
121 0 : end_of_config_timer()->Cancel();
122 0 : }
123 :
124 0 : void AgentIfMapXmppChannel::StartConfigCleanupTimer() {
125 0 : config_cleanup_timer()->Start(agent_->controller_xmpp_channel(xs_idx_));
126 0 : }
127 :
128 0 : void AgentIfMapXmppChannel::StopConfigCleanupTimer() {
129 0 : config_cleanup_timer()->Cancel();
130 0 : }
131 :
132 1 : void AgentIfMapXmppChannel::EnqueueEndOfConfig() {
133 1 : EndOfConfigDataPtr data(new EndOfConfigData(this));
134 : VNController::ControllerWorkQueueDataType base_data =
135 1 : boost::static_pointer_cast<ControllerWorkQueueData>(data);
136 1 : agent_->controller()->Enqueue(base_data);
137 1 : }
138 :
139 0 : void AgentIfMapXmppChannel::ProcessEndOfConfig() {
140 0 : config_cleanup_timer()->Start(agent_->controller_xmpp_channel(xs_idx_));
141 0 : agent_->controller()->StartEndOfRibTxTimer();
142 0 : end_of_config_timer()->end_of_config_processed_time_ = UTCTimestampUsec();
143 0 : }
144 :
145 1 : EndOfConfigData::EndOfConfigData(AgentIfMapXmppChannel *ch) :
146 1 : ControllerWorkQueueData(), channel_(ch) {
147 1 : }
148 :
149 : // Get active xmpp-peer
150 22 : static AgentXmppChannel *GetActivePeer(Agent *agent) {
151 22 : int active_index = agent->ifmap_active_xmpp_server_index();
152 22 : if (active_index == -1) {
153 0 : return NULL;
154 : }
155 :
156 22 : AgentXmppChannel *peer = agent->controller_xmpp_channel(active_index);
157 22 : if (peer == NULL)
158 0 : return NULL;
159 :
160 22 : if (AgentXmppChannel::IsBgpPeerActive(agent, peer) == false) {
161 0 : return NULL;
162 : }
163 :
164 22 : return peer;
165 : }
166 :
167 :
168 22 : static AgentIfMapXmppChannel *GetActiveChannel
169 : (Agent *agent, struct AgentIfMapVmExport::VmExportInfo *info) {
170 22 : int active_index = agent->ifmap_active_xmpp_server_index();
171 22 : AgentIfMapXmppChannel *ifmap = agent->ifmap_xmpp_channel(active_index);
172 : // Peer is valid, but channel may be down
173 22 : if (ifmap == NULL)
174 22 : return NULL;
175 :
176 0 : if (info && (info->seq_number_ != ifmap->GetSeqNumber())) {
177 0 : return NULL;
178 : }
179 :
180 0 : return ifmap;
181 : }
182 :
183 12 : void AgentIfMapVmExport::VmiDelete(const ControllerVmiSubscribeData *entry) {
184 12 : VmMap::iterator vm_it = vm_map_.find(entry->vm_uuid_);
185 12 : if (vm_map_.end() == vm_it) {
186 0 : return;
187 : }
188 :
189 12 : struct VmExportInfo *info = vm_it->second;
190 : //If delete already processed, neglect the delete
191 12 : if (!info)
192 0 : return;
193 :
194 : // Find VMI-UUID in the list of VMI
195 12 : UuidList::const_iterator vmi_it = std::find(info->vmi_list_.begin(),
196 : info->vmi_list_.end(),
197 24 : entry->vmi_uuid_);
198 12 : if (vmi_it == info->vmi_list_.end())
199 0 : return;
200 :
201 12 : CONTROLLER_TRACE(IFMapVmExportTrace, UuidToString(entry->vm_uuid_),
202 : UuidToString(entry->vmi_uuid_), "Delete");
203 12 : info->vmi_list_.remove(entry->vmi_uuid_);
204 : // Stop here if VM has more interfaces
205 12 : if (info->vmi_list_.size() != 0) {
206 2 : return;
207 : }
208 :
209 : // All interfaces deleted. Remove the VM entry
210 10 : vm_map_.erase(entry->vm_uuid_);
211 :
212 : // Unsubscribe from config if we have active channel
213 10 : AgentXmppChannel *peer = GetActivePeer(agent_);
214 10 : if (peer == NULL) {
215 0 : delete info;
216 0 : CONTROLLER_TRACE(IFMapVmExportTrace, UuidToString(entry->vm_uuid_), "",
217 : "Peer NULL skipped Unsubscribe ");
218 0 : return;
219 : }
220 :
221 10 : AgentIfMapXmppChannel *ifmap = GetActiveChannel(agent_, info);
222 10 : delete info;
223 10 : if (ifmap == NULL) {
224 10 : CONTROLLER_TRACE(IFMapVmExportTrace, UuidToString(entry->vm_uuid_), "",
225 : "Channel NULL skipped Unsubscribe ");
226 10 : return;
227 : }
228 :
229 0 : CONTROLLER_TRACE(IFMapVmExportTrace, UuidToString(entry->vm_uuid_), "",
230 : "Unsubscribe ");
231 0 : AgentXmppChannel::ControllerSendVmCfgSubscribe(peer, entry->vm_uuid_,
232 : false);
233 0 : return;
234 : }
235 :
236 12 : void AgentIfMapVmExport::VmiAdd(const ControllerVmiSubscribeData *entry) {
237 12 : struct VmExportInfo *info = NULL;
238 12 : VmMap::iterator vm_it = vm_map_.find(entry->vm_uuid_);
239 12 : if (vm_map_.end() != vm_it) {
240 2 : info = vm_it->second;
241 : } else {
242 : //If first VMI create the data
243 10 : info = new VmExportInfo(0);
244 10 : vm_map_[entry->vm_uuid_] = info;
245 : }
246 :
247 : //If VMI is not found, insert in the list
248 12 : UuidList::const_iterator vmi_it = std::find(info->vmi_list_.begin(),
249 : info->vmi_list_.end(),
250 24 : entry->vmi_uuid_);
251 12 : if (vmi_it == info->vmi_list_.end()) {
252 12 : CONTROLLER_TRACE(IFMapVmExportTrace, UuidToString(entry->vm_uuid_),
253 : UuidToString(entry->vmi_uuid_), "Add");
254 12 : info->vmi_list_.push_back(entry->vmi_uuid_);
255 : }
256 :
257 : // Ensure that peer exists and is in active state
258 12 : AgentXmppChannel *peer = GetActivePeer(agent_);
259 12 : if (peer == NULL) {
260 12 : return;
261 : }
262 :
263 : // Ensure that channel is valid
264 12 : AgentIfMapXmppChannel *ifmap = GetActiveChannel(agent_, NULL);
265 12 : if (ifmap == NULL) {
266 12 : return;
267 : }
268 :
269 : //We have already sent the subscribe
270 0 : if (info->seq_number_ == ifmap->GetSeqNumber()) {
271 0 : return;
272 : }
273 :
274 0 : CONTROLLER_TRACE(IFMapVmExportTrace, UuidToString(entry->vm_uuid_),
275 : UuidToString(entry->vmi_uuid_), "Subscribe");
276 0 : AgentXmppChannel::ControllerSendVmCfgSubscribe(peer, entry->vm_uuid_, true);
277 :
278 : //Update the sequence number
279 0 : info->seq_number_ = ifmap->GetSeqNumber();
280 : }
281 :
282 24 : void AgentIfMapVmExport::VmiEvent(const ControllerVmiSubscribeData *entry) {
283 24 : if (entry->del_) {
284 12 : VmiDelete(entry);
285 12 : return;
286 : }
287 :
288 12 : VmiAdd(entry);
289 12 : return;
290 : }
291 :
292 1 : void AgentIfMapVmExport::NotifyAll(AgentXmppChannel *peer) {
293 1 : VmMap::iterator vm_it;
294 1 : AgentIfMapXmppChannel *ifmap = NULL;
295 1 : struct VmExportInfo *info = NULL;
296 1 : Agent *agent = peer->agent();
297 :
298 1 : if (!AgentXmppChannel::IsBgpPeerActive(agent, peer)) {
299 0 : return;
300 : }
301 :
302 1 : ifmap = agent->ifmap_xmpp_channel(agent->ifmap_active_xmpp_server_index());
303 1 : if (!ifmap) {
304 1 : return;
305 : }
306 :
307 : //We have all the required data. Send config subscribe for all VM's
308 : AgentIfMapVmExport *agent_ifmap_vm_export = peer->agent()->controller()->
309 0 : agent_ifmap_vm_export();
310 0 : vm_it = agent_ifmap_vm_export->vm_map_.begin();
311 0 : for(; vm_it != agent_ifmap_vm_export->vm_map_.end(); vm_it++) {
312 0 : info = vm_it->second;
313 0 : if (info->seq_number_ == ifmap->GetSeqNumber()) {
314 0 : continue;
315 : }
316 :
317 0 : std::stringstream vmid;
318 0 : vmid << vm_it->first;
319 0 : CONTROLLER_TRACE(IFMapVmExportTrace, vmid.str(), "",
320 : "Subscribe");
321 0 : AgentXmppChannel::ControllerSendVmCfgSubscribe(peer,
322 0 : vm_it->first, true);
323 :
324 : //Update the sequence number so that we dont send duplicate
325 : //subscribe
326 0 : info->seq_number_ = ifmap->GetSeqNumber();
327 0 : }
328 :
329 0 : return;
330 : }
331 :
332 1 : AgentIfMapVmExport::AgentIfMapVmExport(Agent *agent) :
333 1 : agent_(agent) {
334 1 : }
335 :
336 1 : AgentIfMapVmExport::~AgentIfMapVmExport() {
337 1 : VmMap::iterator vm_it;
338 1 : struct VmExportInfo *info = NULL;
339 :
340 : //Destroy the vm_map and vmi list
341 1 : for(vm_it = vm_map_.begin(); vm_it != vm_map_.end(); vm_it++) {
342 0 : info = vm_it->second;
343 0 : delete info;
344 : }
345 :
346 1 : vm_map_.clear();
347 1 : }
|