Line data Source code
1 : /* 2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include "xmpp/xmpp_channel_mux.h" 6 : 7 : #include <boost/foreach.hpp> 8 : 9 : #include "base/task_annotations.h" 10 : #include "xmpp/xmpp_init.h" 11 : #include "xmpp/xmpp_connection.h" 12 : 13 : using namespace std; 14 : using namespace xmsm; 15 : 16 11604 : XmppChannelMux::XmppChannelMux(XmppConnection *connection) 17 11604 : : connection_(connection), rx_message_trace_cb_(NULL), 18 23208 : tx_message_trace_cb_(NULL) { 19 11604 : last_received_ = 0; 20 11604 : last_sent_ = 0; 21 11604 : } 22 : 23 11701 : XmppChannelMux::~XmppChannelMux() { 24 11604 : assert(map_.empty()); 25 11701 : } 26 : 27 540 : void XmppChannelMux::Close() { 28 540 : connection_->Clear(); 29 540 : } 30 : 31 9068 : bool XmppChannelMux::LastReceived(time_t duration) const { 32 9068 : return (UTCTimestamp() - last_received_) <= duration; 33 : } 34 : 35 2969 : bool XmppChannelMux::LastSent(time_t duration) const { 36 2969 : return (UTCTimestamp() - last_sent_) <= duration; 37 : } 38 : 39 3516613 : xmps::PeerState XmppChannelMux::GetPeerState() const { 40 3516613 : xmsm::XmState st = connection_->GetStateMcState(); 41 3516532 : return (st == xmsm::ESTABLISHED) ? xmps::READY : 42 3516532 : xmps::NOT_READY; 43 : } 44 : 45 2 : void XmppChannelMux::WriteReady(const boost::system::error_code &ec) { 46 2 : std::scoped_lock lock(mutex_); 47 : 48 2 : WriteReadyCbMap::iterator iter = map_.begin(); 49 2 : WriteReadyCbMap::iterator next = iter; 50 4 : for (; iter != map_.end(); iter = next) { 51 2 : ++next; 52 2 : SendReadyCb cb = iter->second; 53 2 : cb(ec); 54 2 : map_.erase(iter); 55 2 : } 56 2 : } 57 : 58 1617541 : bool XmppChannelMux::Send(const uint8_t *msg, size_t msgsize, 59 : const string *msg_str, xmps::PeerId id, 60 : SendReadyCb cb) { 61 1617541 : if (!connection_) return false; 62 : 63 1617541 : std::scoped_lock lock(mutex_); 64 1617950 : last_sent_ = UTCTimestamp(); 65 1618021 : bool res = connection_->Send(msg, msgsize, msg_str); 66 1617869 : if (res == false) { 67 69 : RegisterWriteReady(id, cb); 68 : } 69 1617869 : return res; 70 1617869 : } 71 : 72 22364 : int XmppChannelMux::GetTaskInstance() const { 73 22364 : return connection_->GetTaskInstance(); 74 : } 75 : 76 9998 : void XmppChannelMux::RegisterReferer(xmps::PeerId id) { 77 9998 : referers_.insert(id); 78 9998 : } 79 : 80 9998 : void XmppChannelMux::UnRegisterReferer(xmps::PeerId id) { 81 9998 : referers_.erase(id); 82 9998 : } 83 : 84 13945 : void XmppChannelMux::RegisterReceive(xmps::PeerId id, ReceiveCb cb) { 85 13945 : rxmap_.insert(make_pair(id, cb)); 86 13945 : } 87 : 88 14065 : void XmppChannelMux::UnRegisterReceive(xmps::PeerId id) { 89 14065 : ReceiveCbMap::iterator it = rxmap_.find(id); 90 14065 : if (it != rxmap_.end()) { 91 13941 : rxmap_.erase(it); 92 : } 93 : 94 14065 : if (ReceiverCount()) 95 4074 : return; 96 : 97 : XmppServerConnection *server_connection = 98 10243 : dynamic_cast<XmppServerConnection *>(connection_); 99 : 100 : // If GracefulRestart helper mode close process is complete, restart the 101 : // state machine to form new session with the client. 102 10512 : if (!connection_->IsDeleted() && server_connection && 103 269 : server_connection->server()->IsGRHelperModeEnabled()) { 104 252 : server_connection->state_machine()->Initialize(); 105 252 : return; 106 : } 107 : 108 9991 : connection_->RetryDelete(); 109 : } 110 : 111 11666 : size_t XmppChannelMux::RefererCount() const { 112 11666 : return referers_.size(); 113 : } 114 : 115 31895 : size_t XmppChannelMux::ReceiverCount() const { 116 31895 : return rxmap_.size(); 117 : } 118 : 119 72 : vector<string> XmppChannelMux::GetReceiverList() const { 120 72 : vector<string> receivers; 121 96 : for (const auto& value : rxmap_) { 122 24 : receivers.push_back(xmps::PeerIdToName(value.first)); 123 : } 124 72 : return receivers; 125 0 : } 126 : 127 : // 128 : // To be called after acquiring mutex 129 : // 130 70 : void XmppChannelMux::RegisterWriteReady(xmps::PeerId id, SendReadyCb cb) { 131 70 : map_.insert(make_pair(id, cb)); 132 70 : } 133 : 134 : // 135 : // To be called after acquiring mutex 136 : // 137 13841 : void XmppChannelMux::UnRegisterWriteReady(xmps::PeerId id) { 138 13841 : map_.erase(id); 139 13841 : } 140 : 141 1673446 : const std::string &XmppChannelMux::ToString() const { 142 1673446 : return connection_->ToString(); 143 : } 144 : 145 0 : const std::string &XmppChannelMux::FromString() const { 146 0 : return connection_->FromString(); 147 : } 148 : 149 974 : std::string XmppChannelMux::StateName() const { 150 974 : return connection_->StateName(); 151 : } 152 : 153 0 : std::string XmppChannelMux::AuthType() const { 154 0 : return connection_->GetXmppAuthenticationType(); 155 : } 156 : 157 0 : std::string XmppChannelMux::PeerAddress() const { 158 0 : return connection_->endpoint_string(); 159 : } 160 : 161 3156010 : inline bool MatchCallback(string to, xmps::PeerId peer) { 162 3156010 : if ((to.find(XmppInit::kBgpPeer) != string::npos) && 163 : (peer == xmps::BGP)) { 164 1606432 : return true; 165 : } 166 1549587 : if ((to.find(XmppInit::kConfigPeer) != string::npos) && 167 : (peer == xmps::CONFIG)) { 168 0 : return true; 169 : } 170 1549587 : if ((to.find(XmppInit::kDnsPeer) != string::npos) && 171 : (peer == xmps::DNS)) { 172 0 : return true; 173 : } 174 1549587 : if ((to.find(XmppInit::kOtherPeer) != string::npos) && 175 : (peer == xmps::OTHER)) { 176 4 : return true; 177 : } 178 1549583 : return false; 179 : } 180 : 181 1606622 : void XmppChannelMux::ProcessXmppMessage(const XmppStanza::XmppMessage *msg) { 182 1606622 : last_received_ = UTCTimestamp(); 183 1606622 : ReceiveCbMap::iterator iter = rxmap_.begin(); 184 4762639 : for (; iter != rxmap_.end(); ++iter) { 185 3156013 : if (MatchCallback(msg->to, iter->first)) { 186 1606433 : ReceiveCb cb = iter->second; 187 1606431 : cb(msg, GetPeerState()); 188 1606434 : } 189 : } 190 1606620 : } 191 : 192 25191 : void XmppChannelMux::HandleStateEvent(xmsm::XmState state) { 193 25191 : CHECK_CONCURRENCY("xmpp::StateMachine"); 194 25189 : xmps::PeerState st = xmps::NOT_READY; 195 25189 : if (state == xmsm::ESTABLISHED) { 196 12604 : st = xmps::READY; 197 12585 : } else if (state == xmsm::ACTIVE) { 198 233 : st = xmps::TIMEDOUT; 199 : } 200 : 201 25189 : if (connection_->IsClient()) { 202 12292 : XmppClient *client = static_cast<XmppClient *>(connection_->server()); 203 12292 : client->NotifyConnectionEvent(this, st); 204 : } else { 205 : // Event to create the peer on server 206 12896 : XmppServer *server = static_cast<XmppServer *>(connection_->server()); 207 12901 : server->NotifyConnectionEvent(this, st); 208 : } 209 25197 : } 210 : 211 490 : std::string XmppChannelMux::LastStateName() const { 212 490 : return connection_->LastStateName(); 213 : } 214 487 : std::string XmppChannelMux::LastStateChangeAt() const { 215 487 : return connection_->LastStateChangeAt(); 216 : } 217 487 : std::string XmppChannelMux::LastEvent() const { 218 487 : return connection_->LastEvent(); 219 : } 220 546 : uint32_t XmppChannelMux::rx_open() const { 221 546 : return connection_->rx_open(); 222 : } 223 546 : uint32_t XmppChannelMux::rx_close() const { 224 546 : return connection_->rx_close(); 225 : } 226 546 : uint32_t XmppChannelMux::rx_update() const { 227 546 : return connection_->rx_update(); 228 : } 229 546 : uint32_t XmppChannelMux::rx_keepalive() const { 230 546 : return connection_->rx_keepalive(); 231 : } 232 546 : uint32_t XmppChannelMux::tx_open() const { 233 546 : return connection_->tx_open(); 234 : } 235 546 : uint32_t XmppChannelMux::tx_close() const { 236 546 : return connection_->tx_close(); 237 : } 238 546 : uint32_t XmppChannelMux::tx_update() const { 239 546 : return connection_->tx_update(); 240 : } 241 546 : uint32_t XmppChannelMux::tx_keepalive() const { 242 546 : return connection_->tx_keepalive(); 243 : } 244 1461 : uint32_t XmppChannelMux::FlapCount() const { 245 1461 : return connection_->flap_count(); 246 : } 247 1461 : std::string XmppChannelMux::LastFlap() const { 248 1461 : return connection_->last_flap_at(); 249 : } 250 : 251 1 : void XmppChannelMux::RegisterRxMessageTraceCallback(RxMessageTraceCb cb) { 252 1 : rx_message_trace_cb_ = cb; 253 1 : } 254 1 : void XmppChannelMux::RegisterTxMessageTraceCallback(TxMessageTraceCb cb) { 255 1 : tx_message_trace_cb_ = cb; 256 1 : } 257 : 258 1626829 : bool XmppChannelMux::RxMessageTrace(const std::string &to_address, 259 : int port, 260 : int msg_size, 261 : const std::string &msg, 262 : const XmppStanza::XmppMessage *xmpp_msg) { 263 1626829 : if (rx_message_trace_cb_) { 264 0 : return rx_message_trace_cb_(to_address, port, msg_size, msg, xmpp_msg); 265 : } 266 1626827 : return false; 267 : } 268 : 269 1617607 : bool XmppChannelMux::TxMessageTrace(const std::string &to_address, 270 : int port, 271 : int msg_size, 272 : const std::string &msg, 273 : const XmppStanza::XmppMessage *xmpp_msg) { 274 1617607 : if (tx_message_trace_cb_) { 275 0 : return tx_message_trace_cb_(to_address, port, msg_size, msg, xmpp_msg); 276 : } 277 1617585 : return false; 278 : }