Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #ifndef SRC_BGP_BGP_XMPP_CHANNEL_H_
6 : #define SRC_BGP_BGP_XMPP_CHANNEL_H_
7 :
8 : #include <map>
9 : #include <set>
10 : #include <string>
11 : #include <utility>
12 : #include <mutex>
13 : #include <atomic>
14 :
15 : #include <boost/function.hpp>
16 : #include <boost/system/error_code.hpp>
17 : #include <boost/scoped_ptr.hpp>
18 :
19 : #include "base/queue_task.h"
20 : #include "bgp/bgp_rib_policy.h"
21 : #include "bgp/routing-instance/routing_instance.h"
22 : #include "io/tcp_session.h"
23 : #include "net/rd.h"
24 : #include "schema/xmpp_mvpn_types.h"
25 : #include "xmpp/xmpp_channel.h"
26 :
27 : namespace pugi {
28 : class xml_node;
29 : }
30 :
31 : class BgpGlobalSystemConfig;
32 : class BgpRouterState;
33 : class BgpServer;
34 : class BgpXmppRTargetManager;
35 : struct DBRequest;
36 : class IPeer;
37 : class PeerCloseManager;
38 : class XmppServer;
39 : class BgpXmppChannelMock;
40 : class BgpXmppChannelManager;
41 : class BgpXmppChannelManagerMock;
42 : class BgpXmppPeerClose;
43 : class Timer;
44 : class XmppConfigUpdater;
45 : class XmppPeerInfoData;
46 : class XmppSession;
47 :
48 : class BgpXmppChannel {
49 : public:
50 : static const int kEndOfRibSendRetryTime = 1; // Seconds
51 : enum StatsIndex {
52 : RX,
53 : TX,
54 : };
55 : struct Stats {
56 20088 : Stats() {
57 20088 : rt_updates = 0;
58 20088 : reach = 0;
59 20088 : unreach = 0;
60 20088 : end_of_rib = 0;
61 20088 : }
62 : std::atomic<uint64_t> rt_updates;
63 : std::atomic<uint64_t> reach;
64 : std::atomic<uint64_t> unreach;
65 : std::atomic<uint64_t> end_of_rib;
66 : };
67 : struct ChannelStats {
68 10044 : ChannelStats() {
69 10044 : instance_subscribe = 0;
70 10044 : instance_unsubscribe = 0;
71 10044 : table_subscribe = 0;
72 10044 : table_subscribe_complete = 0;
73 10044 : table_unsubscribe = 0;
74 10044 : table_unsubscribe_complete = 0;
75 10044 : }
76 : std::atomic<uint64_t> instance_subscribe;
77 : std::atomic<uint64_t> instance_unsubscribe;
78 : std::atomic<uint64_t> table_subscribe;
79 : std::atomic<uint64_t> table_subscribe_complete;
80 : std::atomic<uint64_t> table_unsubscribe;
81 : std::atomic<uint64_t> table_unsubscribe_complete;
82 : };
83 :
84 : struct ErrorStats {
85 10044 : ErrorStats() {
86 10044 : inet6_rx_bad_xml_token_count = 0;
87 10044 : inet6_rx_bad_prefix_count = 0;
88 10044 : inet6_rx_bad_nexthop_count = 0;
89 10044 : inet6_rx_bad_afi_safi_count = 0;
90 10044 : }
91 : void incr_inet6_rx_bad_xml_token_count();
92 : void incr_inet6_rx_bad_prefix_count();
93 : void incr_inet6_rx_bad_nexthop_count();
94 : void incr_inet6_rx_bad_afi_safi_count();
95 : uint64_t get_inet6_rx_bad_xml_token_count() const;
96 : uint64_t get_inet6_rx_bad_prefix_count() const;
97 : uint64_t get_inet6_rx_bad_nexthop_count() const;
98 : uint64_t get_inet6_rx_bad_afi_safi_count() const;
99 :
100 : std::atomic<uint64_t> inet6_rx_bad_xml_token_count;
101 : std::atomic<uint64_t> inet6_rx_bad_prefix_count;
102 : std::atomic<uint64_t> inet6_rx_bad_nexthop_count;
103 : std::atomic<uint64_t> inet6_rx_bad_afi_safi_count;
104 : };
105 :
106 : explicit BgpXmppChannel(XmppChannel *channel, BgpServer *bgp_server = NULL,
107 : BgpXmppChannelManager *manager = NULL);
108 : virtual ~BgpXmppChannel();
109 :
110 : void Close();
111 : IPeer *Peer();
112 : const IPeer *Peer() const;
113 : virtual TcpSession::Endpoint endpoint() const;
114 :
115 : const std::string &ToString() const;
116 : const std::string &ToUVEKey() const;
117 : std::string StateName() const;
118 : TcpSession::Endpoint remote_endpoint() const;
119 : TcpSession::Endpoint local_endpoint() const;
120 : std::string transport_address_string() const;
121 :
122 : void set_peer_closed(bool flag);
123 : bool peer_deleted() const;
124 : uint64_t peer_closed_at() const;
125 : bool table_membership_request_map_empty() const;
126 : size_t GetMembershipRequestQueueSize() const;
127 :
128 : const XmppSession *GetSession() const;
129 : const Stats &rx_stats() const { return stats_[RX]; }
130 : const Stats &tx_stats() const { return stats_[TX]; }
131 81 : ErrorStats &error_stats() { return error_stats_; }
132 : const ErrorStats &error_stats() const { return error_stats_; }
133 9995 : void set_deleted(bool deleted) { deleted_ = deleted; }
134 6180 : bool deleted() { return deleted_; }
135 : void RoutingInstanceCallback(std::string vrf_name, int op);
136 : void ASNUpdateCallback(as_t old_asn, as_t old_local_asn);
137 : void FillInstanceMembershipInfo(BgpNeighborResp *resp) const;
138 : void FillTableMembershipInfo(BgpNeighborResp *resp) const;
139 : void FillCloseInfo(BgpNeighborResp *resp) const;
140 : void StaleCurrentSubscriptions();
141 : void LlgrStaleCurrentSubscriptions();
142 : void SweepCurrentSubscriptions();
143 : void XMPPPeerInfoSend(const XmppPeerInfoData &peer_info) const;
144 974 : const XmppChannel *channel() const { return channel_; }
145 25710 : XmppChannel *channel() { return channel_; }
146 : void StartEndOfRibReceiveTimer();
147 : void ResetEndOfRibSendState();
148 : bool EndOfRibSendTimerExpired();
149 : bool MembershipResponseHandler(std::string table_name);
150 18 : Timer *eor_send_timer() const { return eor_send_timer_; }
151 9 : bool eor_sent() const { return eor_sent_; }
152 : size_t table_membership_requests() const;
153 : void ClearEndOfRibState();
154 72768 : PeerCloseManager *close_manager() { return close_manager_.get(); }
155 :
156 7 : uint64_t get_rx_route_reach() const { return stats_[RX].reach; }
157 7 : uint64_t get_rx_route_unreach() const { return stats_[RX].unreach; }
158 : uint64_t get_rx_update() const { return stats_[RX].rt_updates; }
159 :
160 7 : uint64_t get_tx_route_reach() const { return stats_[TX].reach; }
161 7 : uint64_t get_tx_route_unreach() const { return stats_[TX].unreach; }
162 7 : uint64_t get_tx_update() const { return stats_[TX].rt_updates; }
163 : bool SkipUpdateSend();
164 41994 : bool delete_in_progress() const { return delete_in_progress_; }
165 6180 : void set_delete_in_progress(bool flag) { delete_in_progress_ = flag; }
166 :
167 7239 : BgpXmppRTargetManager *rtarget_manager() {
168 7239 : return rtarget_manager_.get();
169 : }
170 : bool IsSubscriptionGrStale(RoutingInstance *instance) const;
171 : bool IsSubscriptionLlgrStale(RoutingInstance *instance) const;
172 : bool IsSubscriptionEmpty() const;
173 : const RoutingInstance::RouteTargetList &GetSubscribedRTargets(
174 : RoutingInstance *instance) const;
175 6180 : void ClearSubscriptions() { routing_instances_.clear(); }
176 273287 : BgpServer *bgp_server() { return bgp_server_; }
177 27395 : const BgpXmppChannelManager *manager() const { return manager_; }
178 37990 : BgpXmppChannelManager *manager() { return manager_; }
179 : XmppChannel *xmpp_channel() const { return channel_; }
180 : void ReceiveEndOfRIB(Address::Family family);
181 : void ProcessPendingSubscriptions();
182 : int GetPrimaryInstanceID(const std::string &s,
183 : bool expect_prefix_len) const;
184 :
185 : protected:
186 : struct InstanceMembershipRequestState {
187 182 : InstanceMembershipRequestState(int instance_id = -1,
188 : bool no_ribout = false)
189 182 : : instance_id(instance_id), no_ribout(no_ribout) {
190 182 : };
191 :
192 : int instance_id;
193 : bool no_ribout;
194 : };
195 :
196 :
197 : XmppChannel *channel_;
198 :
199 : private:
200 : friend class BgpXmppChannelMock;
201 : friend class BgpXmppChannelManager;
202 : friend class BgpXmppParseTest;
203 : friend class BgpXmppUnitTest;
204 : class XmppPeer;
205 : class PeerClose;
206 : class PeerStats;
207 :
208 : //
209 : // State the instance id received in Membership subscription request
210 : // Also remember we received unregister request
211 : //
212 : enum RequestType {
213 : NONE,
214 : SUBSCRIBE,
215 : UNSUBSCRIBE,
216 : };
217 : struct TableMembershipRequestState {
218 72862 : TableMembershipRequestState(RequestType current, int id,
219 : bool no_ribout = false)
220 72862 : : current_req(current),
221 72862 : instance_id(id),
222 72862 : pending_req(current),
223 72862 : no_ribout(no_ribout) {
224 72862 : }
225 : RequestType current_req;
226 : int instance_id;
227 : RequestType pending_req;
228 : bool no_ribout;
229 : };
230 :
231 : // Map of routing instances to which this BgpXmppChannel is subscribed.
232 : struct SubscriptionState {
233 : enum State {
234 : NONE = 0,
235 : GR_STALE = 1 << 0,
236 : LLGR_STALE = 1 << 1
237 : };
238 :
239 : SubscriptionState() : index(-1), state(NONE) { }
240 14235 : SubscriptionState(const RoutingInstance::RouteTargetList &targets,
241 : int index)
242 14235 : : targets(targets), index(index), state(NONE) { }
243 :
244 10051 : bool IsGrStale() const { return((state & GR_STALE) != 0); }
245 740 : void SetGrStale() { state |= GR_STALE; }
246 :
247 121 : void SetLlgrStale() { state |= LLGR_STALE; }
248 9242 : bool IsLlgrStale() const { return((state & LLGR_STALE) != 0); }
249 :
250 252 : void ClearStale() { state &= ~(GR_STALE | LLGR_STALE); }
251 :
252 : RoutingInstance::RouteTargetList targets;
253 : int index;
254 : uint32_t state;
255 : };
256 :
257 : typedef std::map<RoutingInstance *, SubscriptionState>
258 : SubscribedRoutingInstanceList;
259 :
260 : // Map of table name to subscription request state.
261 : typedef std::map<std::string,
262 : TableMembershipRequestState> TableMembershipRequestMap;
263 :
264 : // Map of routing-instance name to subscription request state. This map
265 : // maintains list of requests that are received for subscription before
266 : // routing instance is created or recreated (in case the instance is in
267 : // the process of being deleted).
268 : typedef std::map<std::string,
269 : InstanceMembershipRequestState> InstanceMembershipRequestMap;
270 :
271 : // The code assumes that multimap preserves insertion order for duplicate
272 : // values of same key.
273 : typedef std::pair<const std::string, const std::string> VrfTableName;
274 : typedef std::multimap<VrfTableName, DBRequest *> DeferQ;
275 :
276 : virtual void ReceiveUpdate(const XmppStanza::XmppMessage *msg);
277 :
278 : virtual bool GetMembershipInfo(BgpTable *table,
279 : int *instance_id, uint64_t *subscribed_at, RequestType *req_type);
280 : bool VerifyMembership(const std::string &vrf_name, Address::Family family,
281 : BgpTable **table, int *instance_id, uint64_t *subscribed_at,
282 : bool *subscribe_pending, bool add_change);
283 :
284 : void AddTableMembershipState(const std::string &table_name,
285 : TableMembershipRequestState tmr_state);
286 : bool DeleteTableMembershipState(const std::string &table_name);
287 : TableMembershipRequestState *GetTableMembershipState(
288 : const std::string &table_name);
289 : const TableMembershipRequestState *GetTableMembershipState(
290 : const std::string &table_name) const;
291 :
292 : void AddInstanceMembershipState(const std::string &instance,
293 : InstanceMembershipRequestState imr_state);
294 : bool DeleteInstanceMembershipState(const std::string &instance);
295 : virtual const InstanceMembershipRequestState *GetInstanceMembershipState(
296 : const std::string &instance) const;
297 :
298 : bool ProcessItem(std::string vrf_name, const pugi::xml_node &node,
299 : bool add_change, int primary_instance_id = 0);
300 : bool ProcessInet6Item(std::string vrf_name, const pugi::xml_node &node,
301 : bool add_change);
302 : bool ProcessMcastItem(std::string vrf_name,
303 : const pugi::xml_node &item, bool add_change);
304 : bool ProcessMvpnItem(std::string vrf_name,
305 : const pugi::xml_node &item, bool add_change);
306 : void CreateType7MvpnRouteRequest(IpAddress grp_address,
307 : IpAddress src_address, bool add_change, uint64_t subscription_gen_id,
308 : DBRequest &req);
309 : void CreateType5MvpnRouteRequest(IpAddress grp_address,
310 : IpAddress src_address, bool add_change, uint64_t subscription_gen_id,
311 : int instance_id, DBRequest &req, const autogen::MvpnNextHopType &nh);
312 : bool ProcessEnetItem(std::string vrf_name,
313 : const pugi::xml_node &item, bool add_change);
314 : void ProcessSubscriptionRequest(std::string rt_instance,
315 : const XmppStanza::XmppMessageIq *iq,
316 : bool add_change);
317 : void AddSubscriptionState(RoutingInstance *rt_instance, int index);
318 : void DeleteSubscriptionState(RoutingInstance *rt_instance);
319 : SubscriptionState *GetSubscriptionState(RoutingInstance *rt_instance);
320 : const SubscriptionState *GetSubscriptionState(
321 : RoutingInstance *rt_instance) const;
322 :
323 : void RegisterTable(int line, BgpTable *table,
324 : const TableMembershipRequestState *tmr_state);
325 : void UnregisterTable(int line, BgpTable *table);
326 : void MembershipRequestCallback(BgpTable *table);
327 : void DequeueRequest(const std::string &table_name, DBRequest *request);
328 : bool XmppDecodeAddress(int af, const std::string &address,
329 : IpAddress *addrp, bool zero_ok = false);
330 : bool ResumeClose();
331 : void FlushDeferQ(std::string vrf_name);
332 : void FlushDeferQ(std::string vrf_name, std::string table_name);
333 : void ProcessDeferredSubscribeRequest(RoutingInstance *rt_instance,
334 : const InstanceMembershipRequestState &imr_state);
335 : void ClearStaledSubscription(RoutingInstance *rt_instance,
336 : SubscriptionState *sub_state);
337 : bool ProcessMembershipResponse(std::string table_name,
338 : TableMembershipRequestState *tmr_state);
339 : bool EndOfRibReceiveTimerExpired();
340 : void EndOfRibTimerErrorHandler(std::string error_name,
341 : std::string error_message);
342 : void SendEndOfRIB();
343 : virtual time_t GetEndOfRibSendTime() const;
344 :
345 : xmps::PeerId peer_id_;
346 : boost::scoped_ptr<BgpXmppRTargetManager> rtarget_manager_;
347 : BgpServer *bgp_server_;
348 : boost::scoped_ptr<XmppPeer> peer_;
349 : boost::scoped_ptr<BgpXmppPeerClose> peer_close_;
350 : boost::scoped_ptr<PeerCloseManager> close_manager_;
351 : boost::scoped_ptr<PeerStats> peer_stats_;
352 : RibExportPolicy bgp_policy_;
353 :
354 : // DB Requests pending membership request response.
355 : DeferQ defer_q_;
356 :
357 : TableMembershipRequestMap table_membership_request_map_;
358 : InstanceMembershipRequestMap instance_membership_request_map_;
359 : BgpXmppChannelManager *manager_;
360 : bool delete_in_progress_;
361 : bool deleted_;
362 : bool defer_peer_close_;
363 : bool skip_update_send_;
364 : bool skip_update_send_cached_;
365 : bool eor_sent_;
366 : Timer *eor_receive_timer_;
367 : Timer *eor_send_timer_;
368 : time_t eor_receive_timer_start_time_;
369 : time_t eor_send_timer_start_time_;
370 : WorkQueue<std::string> membership_response_worker_;
371 : SubscribedRoutingInstanceList routing_instances_;
372 :
373 : // statistics
374 : Stats stats_[2];
375 : ChannelStats channel_stats_;
376 : ErrorStats error_stats_;
377 :
378 : // Label block manager for multicast labels.
379 : LabelBlockManagerPtr lb_mgr_;
380 :
381 : DISALLOW_COPY_AND_ASSIGN(BgpXmppChannel);
382 : };
383 :
384 : class BgpXmppChannelManager {
385 : public:
386 : typedef std::map<const XmppChannel *, BgpXmppChannel *> XmppChannelMap;
387 : typedef std::map<std::string, BgpXmppChannel *> XmppChannelNameMap;
388 : typedef XmppChannelNameMap::const_iterator const_name_iterator;
389 : typedef boost::function<void(BgpXmppChannel *)> VisitorFn;
390 :
391 : BgpXmppChannelManager(XmppServer *, BgpServer *);
392 : virtual ~BgpXmppChannelManager();
393 :
394 : const_name_iterator name_cbegin() const {
395 : return channel_name_map_.begin();
396 : }
397 2506 : const_name_iterator name_cend() const {
398 2506 : return channel_name_map_.end();
399 : }
400 386 : const_name_iterator name_clower_bound(const std::string &name) const {
401 386 : return channel_name_map_.lower_bound(name);
402 : }
403 :
404 : void VisitChannels(BgpXmppChannelManager::VisitorFn);
405 : void VisitChannels(BgpXmppChannelManager::VisitorFn) const;
406 : BgpXmppChannel *FindChannel(const XmppChannel *channel);
407 : BgpXmppChannel *FindChannel(std::string client);
408 : void RemoveChannel(XmppChannel *channel);
409 : virtual void XmppHandleChannelEvent(XmppChannel *channel,
410 : xmps::PeerState state);
411 :
412 260 : const XmppChannelMap &channel_map() const { return channel_map_; }
413 6180 : void Enqueue(BgpXmppChannel *bx_channel) {
414 6180 : queue_.Enqueue(bx_channel);
415 6180 : }
416 : bool IsReadyForDeletion();
417 : void SetQueueDisable(bool disabled);
418 : size_t GetQueueSize() const;
419 : void AdminDownCallback();
420 : void ASNUpdateCallback(as_t old_asn, as_t old_local_asn);
421 : void IdentifierUpdateCallback(Ip4Address old_identifier);
422 : void RoutingInstanceCallback(std::string vrf_name, int op);
423 : void DSCPUpdateCallback(uint8_t value);
424 :
425 192 : uint32_t count() const {
426 192 : std::scoped_lock lock(mutex_);
427 384 : return channel_map_.size();
428 192 : }
429 656 : uint32_t NumUpPeer() const {
430 656 : std::scoped_lock lock(mutex_);
431 1312 : return channel_map_.size();
432 656 : }
433 :
434 192 : int32_t deleting_count() const { return deleting_count_; }
435 6180 : void increment_deleting_count() { deleting_count_++; }
436 6180 : void decrement_deleting_count() {
437 6180 : assert(deleting_count_);
438 6180 : deleting_count_--;
439 6180 : }
440 :
441 326 : BgpServer *bgp_server() { return bgp_server_; }
442 18632 : XmppServer *xmpp_server() { return xmpp_server_; }
443 18264 : const XmppServer *xmpp_server() const { return xmpp_server_; }
444 68275 : uint64_t get_subscription_gen_id() {
445 136550 : return subscription_gen_id_.fetch_add(1);
446 : }
447 : bool CollectStats(BgpRouterState *state, bool first) const;
448 :
449 : protected:
450 : virtual BgpXmppChannel *CreateChannel(XmppChannel *channel);
451 : virtual bool DeleteChannel(BgpXmppChannel *channel);
452 :
453 : private:
454 : friend class BgpXmppChannelManagerMock;
455 : friend class BgpXmppUnitTest;
456 :
457 : void FillPeerInfo(const BgpXmppChannel *channel) const;
458 :
459 : XmppServer *xmpp_server_;
460 : BgpServer *bgp_server_;
461 : WorkQueue<BgpXmppChannel *> queue_;
462 : mutable std::mutex mutex_;
463 : XmppChannelMap channel_map_;
464 : XmppChannelNameMap channel_name_map_;
465 : int id_;
466 : int admin_down_listener_id_;
467 : int asn_listener_id_;
468 : int identifier_listener_id_;
469 : int dscp_listener_id_;
470 : std::atomic<int32_t> deleting_count_;
471 : // Generation number for subscription tracking
472 : std::atomic<uint64_t> subscription_gen_id_;
473 :
474 : DISALLOW_COPY_AND_ASSIGN(BgpXmppChannelManager);
475 : };
476 :
477 : #endif // SRC_BGP_BGP_XMPP_CHANNEL_H_
|