Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "xmpp/xmpp_connection.h"
6 :
7 : #include <boost/date_time/posix_time/posix_time.hpp>
8 : #include <sstream>
9 :
10 : #include "base/lifetime.h"
11 : #include "base/task_annotations.h"
12 : #include "io/event_manager.h"
13 : #include "xml/xml_base.h"
14 : #include "xmpp/xmpp_client.h"
15 : #include "xmpp/xmpp_config.h"
16 : #include "xmpp/xmpp_factory.h"
17 : #include "xmpp/xmpp_log.h"
18 : #include "xmpp/xmpp_server.h"
19 : #include "xmpp/xmpp_session.h"
20 :
21 : #include "sandesh/common/vns_types.h"
22 : #include "sandesh/common/vns_constants.h"
23 : #include "sandesh/xmpp_client_server_sandesh_types.h"
24 : #include "sandesh/xmpp_message_sandesh_types.h"
25 : #include "sandesh/xmpp_server_types.h"
26 : #include "sandesh/xmpp_state_machine_sandesh_types.h"
27 : #include "sandesh/xmpp_trace_sandesh_types.h"
28 : #include "sandesh/xmpp_peer_info_types.h"
29 :
30 : using namespace std;
31 : using boost::system::error_code;
32 :
33 : const char *XmppConnection::kAuthTypeNil = "NIL";
34 : const char *XmppConnection::kAuthTypeTls = "TLS";
35 :
36 : // Maximum XMPP control message size. Typically this is less then 256 bytes. But
37 : // in scenarios where host names are quite long, we need a larger buffer size.
38 : #define XMPP_CONTROL_MESSAGE_MAX_SIZE 1024
39 :
40 11605 : XmppConnection::XmppConnection(TcpServer *server,
41 11605 : const XmppChannelConfig *config)
42 11605 : : server_(server),
43 11605 : session_(NULL),
44 11605 : endpoint_(config->endpoint),
45 11605 : local_endpoint_(config->local_endpoint),
46 11605 : config_(NULL),
47 23210 : keepalive_timer_(TimerManager::CreateTimer(
48 11605 : *server->event_manager()->io_service(),
49 : "Xmpp keepalive timer",
50 : TaskScheduler::GetInstance()->GetTaskId("xmpp::StateMachine"),
51 11605 : GetTaskInstance(config->ClientOnly()))),
52 11605 : is_client_(config->ClientOnly()),
53 11605 : log_uve_(config->logUVE),
54 11605 : admin_down_(false),
55 11605 : disable_read_(false),
56 11605 : from_(config->FromAddr),
57 11605 : to_(config->ToAddr),
58 11605 : auth_enabled_(config->auth_enabled),
59 11605 : dscp_value_(config->dscp_value), xmlns_(config->xmlns),
60 11605 : state_machine_(XmppStaticObjectFactory::Create<XmppStateMachine>(
61 11605 : this, config->ClientOnly(), config->auth_enabled, config->xmpp_hold_time)),
62 69630 : mux_(XmppStaticObjectFactory::Create<XmppChannelMux>(this)) {
63 11605 : ostringstream oss;
64 11605 : oss << FromString() << ":" << endpoint().address().to_string();
65 11605 : uve_key_str_ = oss.str();
66 11605 : }
67 :
68 11605 : XmppConnection::~XmppConnection() {
69 11605 : StopKeepAliveTimer();
70 11605 : TimerManager::DeleteTimer(keepalive_timer_);
71 11605 : XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
72 : "XmppConnection destructor", FromString(), ToString());
73 11605 : }
74 :
75 1046 : std::string XmppConnection::GetXmppAuthenticationType() const {
76 1046 : if (auth_enabled_) {
77 36 : return (XmppConnection::kAuthTypeTls);
78 : } else {
79 1010 : return (XmppConnection::kAuthTypeNil);
80 : }
81 : }
82 :
83 0 : void XmppConnection::SetConfig(const XmppChannelConfig *config) {
84 0 : config_ = config;
85 0 : }
86 :
87 19320 : void XmppConnection::set_session(XmppSession *session) {
88 19320 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
89 19320 : assert(session);
90 19320 : session_ = session;
91 19320 : if (session_ && dscp_value_) {
92 8 : session_->SetDscpSocketOption(dscp_value_);
93 : }
94 19320 : }
95 :
96 20617 : void XmppConnection::clear_session() {
97 20617 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
98 20620 : if (!session_)
99 1405 : return;
100 19215 : session_->ClearConnection();
101 19210 : session_ = NULL;
102 20615 : }
103 :
104 40 : const XmppSession *XmppConnection::session() const {
105 40 : return session_;
106 : }
107 :
108 17462 : XmppSession *XmppConnection::session() {
109 17462 : return session_;
110 : }
111 :
112 0 : void XmppConnection::WriteReady() {
113 0 : boost::system::error_code ec;
114 0 : mux_->WriteReady(ec);
115 0 : }
116 :
117 128 : void XmppConnection::Shutdown() {
118 128 : ManagedDelete();
119 128 : }
120 :
121 118187 : bool XmppConnection::IsDeleted() const {
122 118187 : return deleter()->IsDeleted();
123 : }
124 :
125 17660 : bool XmppConnection::MayDelete() const {
126 17660 : return !mux_->ReceiverCount() && !mux_->RefererCount();
127 : }
128 :
129 7531 : XmppSession *XmppConnection::CreateSession() {
130 7531 : TcpSession *session = server_->CreateSession();
131 7531 : XmppSession *xmpp_session = static_cast<XmppSession *>(session);
132 7531 : xmpp_session->SetConnection(this);
133 7531 : return xmpp_session;
134 : }
135 :
136 : //
137 : // Return the task instance for this XmppConnection.
138 : // Calculate from the remote IpAddress so that a restarting session uses the
139 : // same value as before.
140 : // Do not make this method virtual since it gets called from the constructor.
141 : //
142 :
143 95537 : int XmppConnection::GetTaskInstance(bool is_client) const {
144 95537 : if (is_client)
145 36527 : return 0;
146 59010 : IpAddress address = endpoint().address();
147 59006 : int thread_count = TaskScheduler::GetInstance()->HardwareThreadCount();
148 59006 : if (address.is_v4()) {
149 59005 : return address.to_v4().to_ulong() % thread_count;
150 : } else {
151 0 : return 0;
152 : }
153 : }
154 :
155 7913498 : xmsm::XmState XmppConnection::GetStateMcState() const {
156 7913498 : const XmppStateMachine *sm = state_machine();
157 7913477 : assert(sm);
158 7913477 : return sm->StateType();
159 : }
160 :
161 4369647 : xmsm::XmOpenConfirmState XmppConnection::GetStateMcOpenConfirmState() const {
162 4369647 : const XmppStateMachine *sm = state_machine();
163 4369641 : assert(sm);
164 4369641 : return sm->OpenConfirmStateType();
165 : }
166 :
167 :
168 59010 : boost::asio::ip::tcp::endpoint XmppConnection::endpoint() const {
169 59010 : return endpoint_;
170 : }
171 :
172 7531 : boost::asio::ip::tcp::endpoint XmppConnection::local_endpoint() const {
173 7531 : return local_endpoint_;
174 : }
175 :
176 72 : string XmppConnection::endpoint_string() const {
177 72 : ostringstream oss;
178 72 : oss << endpoint_;
179 144 : return oss.str();
180 72 : }
181 :
182 72 : string XmppConnection::local_endpoint_string() const {
183 72 : ostringstream oss;
184 72 : oss << local_endpoint_;
185 144 : return oss.str();
186 72 : }
187 :
188 34215 : const string &XmppConnection::FromString() const {
189 34215 : return from_;
190 : }
191 :
192 1837462 : const string &XmppConnection::ToString() const {
193 1837462 : return to_;
194 : }
195 :
196 6235041 : const std::string &XmppConnection::ToUVEKey() const {
197 6235041 : return uve_key_str_;
198 : }
199 :
200 0 : static void XMPPPeerInfoSend(XmppPeerInfoData &peer_info) {
201 0 : assert(!peer_info.get_name().empty());
202 0 : XMPPPeerInfo::Send(peer_info);
203 0 : }
204 :
205 15771 : void XmppConnection::SetTo(const string &to) {
206 15771 : if ((to_.size() == 0) && (to.size() != 0)) {
207 6564 : to_ = to;
208 6564 : if (!logUVE()) return;
209 0 : XmppPeerInfoData peer_info;
210 0 : peer_info.set_name(ToUVEKey());
211 0 : peer_info.set_identifier(to_);
212 0 : XMPPPeerInfoSend(peer_info);
213 0 : }
214 : }
215 :
216 0 : void XmppConnection::SetAdminDown(bool toggle) {
217 : // TODO: generate state machine event.
218 0 : admin_down_ = toggle;
219 0 : }
220 :
221 6523 : bool XmppConnection::AcceptSession(XmppSession *session) {
222 6523 : session->SetConnection(this);
223 6523 : return state_machine_->PassiveOpen(session);
224 : }
225 :
226 1631366 : bool XmppConnection::Send(const uint8_t *data, size_t size,
227 : const string *msg_str) {
228 1631366 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
229 1631338 : if (session_ == NULL) {
230 13 : return false;
231 : }
232 :
233 1631325 : TcpSession::Endpoint endpoint = session_->remote_endpoint();
234 1631198 : const string &endpoint_addr_str = session_->remote_addr_string();
235 1631159 : string str;
236 1631155 : if (!msg_str) {
237 57145 : str.append(reinterpret_cast<const char *>(data), size);
238 57145 : msg_str = &str;
239 : }
240 :
241 1630781 : if (!(mux_ &&
242 1631063 : (mux_->TxMessageTrace(endpoint_addr_str, endpoint.port(),
243 1631155 : size, *msg_str, NULL)))) {
244 1630783 : XMPP_MESSAGE_TRACE(XmppTxStream,
245 : endpoint_addr_str, endpoint.port(), size, *msg_str);
246 : }
247 :
248 1631477 : stats_[1].update++;
249 : size_t sent;
250 1631486 : return session_->Send(data, size, &sent);
251 1631297 : }
252 :
253 11 : int XmppConnection::SetDscpValue(uint8_t value) {
254 11 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
255 11 : dscp_value_ = value;
256 11 : if (!session_) {
257 0 : return 0;
258 : }
259 11 : return session_->SetDscpSocketOption(value);
260 11 : }
261 :
262 8047 : bool XmppConnection::SendOpen(XmppSession *session) {
263 8047 : if (!session) return false;
264 8047 : XmppProto::XmppStanza::XmppStreamMessage openstream;
265 8047 : openstream.strmtype = XmppStanza::XmppStreamMessage::INIT_STREAM_HEADER;
266 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
267 8047 : int len = XmppProto::EncodeStream(openstream, to_, from_, xmlns_, data,
268 : sizeof(data));
269 8047 : if (len <= 0) {
270 0 : inc_open_fail();
271 0 : return false;
272 : } else {
273 8047 : XMPP_UTDEBUG(XmppOpen, ToUVEKey(), XMPP_PEER_DIR_OUT, len, from_, to_,
274 : xmlns_);
275 8047 : session->Send(data, len, NULL);
276 8047 : stats_[1].open++;
277 8047 : return true;
278 : }
279 8047 : }
280 :
281 7819 : bool XmppConnection::SendOpenConfirm(XmppSession *session) {
282 7819 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
283 7819 : if (!session_) return false;
284 7819 : XmppStanza::XmppStreamMessage openstream;
285 7819 : openstream.strmtype = XmppStanza::XmppStreamMessage::INIT_STREAM_HEADER_RESP;
286 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
287 7819 : int len = XmppProto::EncodeStream(openstream, to_, from_, xmlns_, data,
288 : sizeof(data));
289 7819 : if (len <= 0) {
290 0 : inc_open_fail();
291 0 : return false;
292 : } else {
293 7819 : XMPP_UTDEBUG(XmppOpenConfirm, ToUVEKey(), XMPP_PEER_DIR_OUT, len,
294 : from_, to_);
295 7819 : session_->Send(data, len, NULL);
296 7819 : stats_[1].open++;
297 7819 : return true;
298 : }
299 7819 : }
300 :
301 1512 : bool XmppConnection::SendStreamFeatureRequest(XmppSession *session) {
302 1512 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
303 1512 : if (!session_) return false;
304 1512 : XmppStanza::XmppStreamMessage featurestream;
305 1512 : featurestream.strmtype = XmppStanza::XmppStreamMessage::FEATURE_TLS;
306 1512 : featurestream.strmtlstype = XmppStanza::XmppStreamMessage::TLS_FEATURE_REQUEST;
307 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
308 1512 : int len = XmppProto::EncodeStream(featurestream, to_, from_, xmlns_, data,
309 : sizeof(data));
310 1512 : if (len <= 0) {
311 0 : inc_stream_feature_fail();
312 0 : return false;
313 : } else {
314 1512 : XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
315 : "Send Stream Feature Request", len, from_, to_);
316 1512 : session_->Send(data, len, NULL);
317 : //stats_[1].open++;
318 1512 : return true;
319 : }
320 1512 : }
321 :
322 1469 : bool XmppConnection::SendStartTls(XmppSession *session) {
323 1469 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
324 1469 : if (!session_) return false;
325 1469 : XmppStanza::XmppStreamMessage stream;
326 1469 : stream.strmtype = XmppStanza::XmppStreamMessage::FEATURE_TLS;
327 1469 : stream.strmtlstype = XmppStanza::XmppStreamMessage::TLS_START;
328 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
329 1469 : int len = XmppProto::EncodeStream(stream, to_, from_, xmlns_, data,
330 : sizeof(data));
331 1469 : if (len <= 0) {
332 0 : inc_stream_feature_fail();
333 0 : return false;
334 : } else {
335 1469 : XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
336 : "Send Start Tls", len, from_, to_);
337 1469 : session_->Send(data, len, NULL);
338 : //stats_[1].open++;
339 1469 : return true;
340 : }
341 1469 : }
342 :
343 1464 : bool XmppConnection::SendProceedTls(XmppSession *session) {
344 1464 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
345 1464 : if (!session_) return false;
346 1464 : XmppStanza::XmppStreamMessage stream;
347 1464 : stream.strmtype = XmppStanza::XmppStreamMessage::FEATURE_TLS;
348 1464 : stream.strmtlstype = XmppStanza::XmppStreamMessage::TLS_PROCEED;
349 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
350 1464 : int len = XmppProto::EncodeStream(stream, to_, from_, xmlns_, data,
351 : sizeof(data));
352 1464 : if (len <= 0) {
353 0 : inc_stream_feature_fail();
354 0 : return false;
355 : } else {
356 1464 : XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
357 : "Send Proceed Tls", len, from_, to_);
358 1464 : session_->Send(data, len, NULL);
359 : //stats_[1].open++;
360 1464 : return true;
361 : }
362 1464 : }
363 :
364 2 : void XmppConnection::SendClose(XmppSession *session) {
365 2 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
366 2 : if (!session_) return;
367 1 : string str("</stream:stream>");
368 : uint8_t data[64];
369 1 : memcpy(data, str.data(), str.size());
370 1 : XMPP_UTDEBUG(XmppClose, ToUVEKey(), XMPP_PEER_DIR_OUT, str.size(), from_,
371 : to_);
372 1 : session_->Send(data, str.size(), NULL);
373 1 : stats_[1].close++;
374 2 : }
375 :
376 2910 : void XmppConnection::ProcessSslHandShakeResponse(SslSessionPtr session,
377 : const boost::system::error_code& error) {
378 2910 : if (!state_machine())
379 0 : return;
380 :
381 2910 : if (error) {
382 0 : inc_handshake_failure();
383 :
384 0 : if (error.category() == boost::asio::error::get_ssl_category()) {
385 0 : string err = error.message();
386 0 : err = string(" (")
387 0 : +boost::lexical_cast<string>(ERR_GET_LIB(error.value()))+","
388 0 : +boost::lexical_cast<string>(ERR_GET_REASON(error.value()))+") ";
389 :
390 : char buf[128];
391 0 : ::ERR_error_string_n(error.value(), buf, sizeof(buf));
392 0 : err += buf;
393 0 : XMPP_ALERT(XmppSslHandShakeFailure, ToUVEKey(), XMPP_PEER_DIR_IN,
394 : "failure", err);
395 0 : }
396 :
397 0 : state_machine()->OnEvent(session.get(), xmsm::EvTLSHANDSHAKE_FAILURE);
398 :
399 : } else {
400 2910 : XMPP_DEBUG(XmppSslHandShakeMessage, session->ToUVEKey(),
401 : XMPP_PEER_DIR_IN, "success", "");
402 2910 : state_machine()->OnEvent(session.get(), xmsm::EvTLSHANDSHAKE_SUCCESS);
403 : }
404 : }
405 :
406 0 : void XmppConnection::LogMsg(std::string msg) {
407 0 : log4cplus::Logger logger = log4cplus::Logger::getRoot();
408 0 : LOG4CPLUS_DEBUG(logger, msg << ToString() << " " <<
409 : local_endpoint_.address() << ":" << local_endpoint_.port() << "::" <<
410 : endpoint_.address() << ":" << endpoint_.port());
411 0 : }
412 :
413 6442 : void XmppConnection::LogKeepAliveSend() {
414 : static bool init_ = false;
415 : static bool log_ = false;
416 :
417 6442 : if (!init_) {
418 61 : char *str = getenv("XMPP_ASSERT_ON_HOLD_TIMEOUT");
419 61 : if (str && strtoul(str, NULL, 0) != 0) log_ = true;
420 61 : init_ = true;
421 : }
422 :
423 6442 : if (log_) LogMsg("SEND KEEPALIVE: ");
424 6442 : }
425 :
426 6442 : void XmppConnection::SendKeepAlive() {
427 6442 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
428 6442 : if (!session_) return;
429 6442 : XmppStanza::XmppMessage msg(XmppStanza::WHITESPACE_MESSAGE_STANZA);
430 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
431 6442 : int len = XmppProto::EncodeStream(msg, data, sizeof(data));
432 6442 : assert(len > 0);
433 6442 : session_->Send(data, len, NULL);
434 6442 : stats_[1].keepalive++;
435 6442 : LogKeepAliveSend();
436 6442 : }
437 :
438 144 : bool XmppConnection::KeepAliveTimerExpired() {
439 144 : if (state_machine_->get_state() != xmsm::ESTABLISHED)
440 0 : return false;
441 :
442 : // TODO: check timestamp of last received packet.
443 144 : SendKeepAlive();
444 :
445 : //
446 : // Start the timer again, by returning true
447 : //
448 144 : return true;
449 : }
450 :
451 0 : void XmppConnection::KeepaliveTimerErrorHanlder(string error_name,
452 : string error_message) {
453 0 : XMPP_WARNING(XmppKeepaliveTimeError, ToUVEKey(), XMPP_PEER_DIR_NA,
454 : error_name, error_message);
455 0 : }
456 :
457 12629 : void XmppConnection::StartKeepAliveTimer() {
458 12629 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
459 12629 : if (!session_)
460 0 : return;
461 :
462 12629 : int holdtime_msecs = state_machine_->hold_time_msecs();
463 12629 : if (holdtime_msecs <= 0)
464 0 : return;
465 :
466 12629 : keepalive_timer_->Start(holdtime_msecs / 3,
467 : boost::bind(&XmppConnection::KeepAliveTimerExpired, this),
468 : boost::bind(&XmppConnection::KeepaliveTimerErrorHanlder, this, _1, _2));
469 12629 : }
470 :
471 24064 : void XmppConnection::StopKeepAliveTimer() {
472 24064 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
473 24062 : keepalive_timer_->Cancel();
474 24062 : }
475 :
476 24 : void XmppConnection::UpdateKeepAliveTimer(uint8_t time_out) {
477 24 : state_machine_->set_hold_time(time_out);
478 24 : StopKeepAliveTimer();
479 24 : StartKeepAliveTimer();
480 24 : }
481 :
482 75350 : XmppStateMachine *XmppConnection::state_machine() {
483 75350 : return state_machine_.get();
484 : }
485 :
486 12282814 : const XmppStateMachine *XmppConnection::state_machine() const {
487 12282814 : return state_machine_.get();
488 : }
489 :
490 72 : const XmppChannelMux *XmppConnection::channel_mux() const {
491 72 : return mux_.get();
492 : }
493 :
494 4169168 : void XmppConnection::IncProtoStats(unsigned int type) {
495 4169168 : switch (type) {
496 20075 : case XmppStanza::STREAM_HEADER:
497 20075 : stats_[0].open++;
498 20075 : break;
499 2486616 : case XmppStanza::WHITESPACE_MESSAGE_STANZA:
500 2486616 : stats_[0].keepalive++;
501 2486617 : break;
502 98591 : case XmppStanza::IQ_STANZA:
503 98591 : stats_[0].update++;
504 98592 : break;
505 1563920 : case XmppStanza::MESSAGE_STANZA:
506 1563920 : stats_[0].update++;
507 1563920 : break;
508 : }
509 4169170 : }
510 :
511 942 : void XmppConnection::inc_connect_error() {
512 942 : error_stats_.connect_error++;
513 942 : }
514 :
515 6435 : void XmppConnection::inc_session_close() {
516 6435 : error_stats_.session_close++;
517 6435 : }
518 :
519 0 : void XmppConnection::inc_open_fail() {
520 0 : error_stats_.open_fail++;
521 0 : }
522 :
523 0 : void XmppConnection::inc_stream_feature_fail() {
524 0 : error_stats_.stream_feature_fail++;
525 0 : }
526 :
527 0 : void XmppConnection::inc_handshake_failure() {
528 0 : error_stats_.handshake_fail++;
529 0 : }
530 :
531 792 : size_t XmppConnection::get_connect_error() {
532 792 : return error_stats_.connect_error;
533 : }
534 :
535 574 : size_t XmppConnection::get_session_close() {
536 574 : return error_stats_.session_close;
537 : }
538 :
539 0 : size_t XmppConnection::get_open_fail() {
540 0 : return error_stats_.open_fail;
541 : }
542 :
543 0 : size_t XmppConnection::get_stream_feature_fail() {
544 0 : return error_stats_.stream_feature_fail;
545 : }
546 :
547 0 : size_t XmppConnection::get_handshake_failure() {
548 0 : return error_stats_.handshake_fail;
549 : }
550 :
551 72 : size_t XmppConnection::get_sm_connect_attempts() {
552 72 : return state_machine_->get_connect_attempts();
553 : }
554 :
555 265 : size_t XmppConnection::get_sm_keepalive_count() {
556 265 : return state_machine_->get_keepalive_count();
557 : }
558 :
559 4169176 : void XmppConnection::ReceiveMsg(XmppSession *session, const string &msg) {
560 4169176 : XmppStanza::XmppMessage *minfo = XmppDecode(msg);
561 :
562 4169150 : if (minfo) {
563 4127509 : session->IncStats((unsigned int)minfo->type, msg.size());
564 4127505 : if (minfo->type != XmppStanza::WHITESPACE_MESSAGE_STANZA) {
565 1640900 : if (!(mux_ &&
566 3281800 : (mux_->RxMessageTrace(session->
567 3281792 : remote_endpoint().address().to_string(),
568 3281795 : session->remote_endpoint().port(),
569 3281793 : msg.size(), msg, minfo)))) {
570 1640900 : XMPP_MESSAGE_TRACE(XmppRxStream,
571 : session->
572 : remote_endpoint().address().to_string(),
573 : session->
574 : remote_endpoint().port(), msg.size(), msg);
575 : }
576 : }
577 4127507 : IncProtoStats((unsigned int)minfo->type);
578 4127519 : state_machine_->OnMessage(session, minfo);
579 41641 : } else if ((minfo = last_msg_.get()) != NULL) {
580 41681 : session->IncStats((unsigned int)minfo->type, msg.size());
581 41681 : IncProtoStats((unsigned int)minfo->type);
582 : } else {
583 0 : session->IncStats(XmppStanza::INVALID, msg.size());
584 0 : XMPP_MESSAGE_TRACE(XmppRxStreamInvalid,
585 : session->remote_endpoint().address().to_string(),
586 : session->remote_endpoint().port(), msg.size(), msg);
587 : }
588 4169197 : return;
589 : }
590 :
591 4169177 : XmppStanza::XmppMessage *XmppConnection::XmppDecode(const string &msg) {
592 4169177 : unique_ptr<XmppStanza::XmppMessage> minfo(XmppProto::Decode(this, msg));
593 4169154 : if (minfo.get() == NULL) {
594 0 : XMPP_INFO(XmppSessionDelete, ToUVEKey(), XMPP_PEER_DIR_IN, "Server",
595 : FromString(), ToString());
596 0 : Clear();
597 0 : return NULL;
598 : }
599 :
600 4169140 : if (minfo->type == XmppStanza::IQ_STANZA) {
601 : const XmppStanza::XmppMessageIq *iq =
602 98590 : static_cast<const XmppStanza::XmppMessageIq *>(minfo.get());
603 :
604 :
605 98590 : if (iq->action.compare("publish") == 0) {
606 41680 : last_msg_.reset(minfo.release());
607 41681 : return NULL;
608 : }
609 :
610 56911 : if (iq->action.compare("collection") == 0) {
611 41682 : if (last_msg_.get() != NULL) {
612 : XmppStanza::XmppMessageIq *last_iq =
613 41682 : static_cast<XmppStanza::XmppMessageIq *>(last_msg_.get());
614 :
615 41682 : if (last_iq->node.compare(iq->as_node) == 0) {
616 41682 : XmlBase *impl = last_iq->dom.get();
617 41682 : impl->ReadNode("publish");
618 41682 : impl->ModifyAttribute("node", iq->node);
619 41682 : last_iq->node = impl->ReadAttrib("node");
620 41681 : last_iq->is_as_node = iq->is_as_node;
621 : //Save the complete ass/dissociate node
622 41681 : last_iq->as_node = iq->as_node;
623 : } else {
624 0 : XMPP_WARNING(XmppIqMessageInvalid, ToUVEKey(),
625 : XMPP_PEER_DIR_IN);
626 0 : goto error;
627 : }
628 : } else {
629 0 : XMPP_ERROR(XmppIqCollectionError, ToUVEKey(), XMPP_PEER_DIR_IN);
630 0 : goto error;
631 : }
632 : // iq message merged with collection info
633 41682 : return last_msg_.release();
634 : }
635 : }
636 4085767 : return minfo.release();
637 :
638 0 : error:
639 0 : last_msg_.reset();
640 0 : return NULL;
641 4169193 : }
642 :
643 1563819 : int XmppConnection::ProcessXmppChatMessage(
644 : const XmppStanza::XmppChatMessage *msg) {
645 1563819 : mux_->ProcessXmppMessage(msg);
646 1563819 : return 0;
647 : }
648 :
649 56905 : int XmppConnection::ProcessXmppIqMessage(const XmppStanza::XmppMessage *msg) {
650 56905 : mux_->ProcessXmppMessage(msg);
651 56905 : return 0;
652 : }
653 :
654 : class XmppServerConnection::DeleteActor : public LifetimeActor {
655 : public:
656 6570 : DeleteActor(XmppServer *server, XmppServerConnection *parent)
657 6570 : : LifetimeActor(server->lifetime_manager()),
658 6570 : server_(server), parent_(parent) {
659 6570 : }
660 :
661 12649 : virtual bool MayDelete() const {
662 12649 : return (!parent_->on_work_queue() && parent_->MayDelete());
663 : }
664 :
665 6570 : virtual void Shutdown() {
666 6570 : CHECK_CONCURRENCY("bgp::Config");
667 :
668 : // If the connection is still on the WorkQueue, simply add it to the
669 : // ConnectionSet. It won't be on the ConnectionMap.
670 6570 : if (parent_->on_work_queue()) {
671 24 : server_->InsertDeletedConnection(parent_);
672 : }
673 :
674 : // If the connection was rejected as duplicate, it will already be in
675 : // the ConnectionSet. Non-duplicate connections need to be moved from
676 : // from the ConnectionMap into the ConnectionSet. We add it to the
677 : // ConnectionSet and then remove it from ConnectionMap to ensure that
678 : // the XmppServer connection count doesn't temporarily become 0. This
679 : // is friendly to tests that wait for the XmppServer connection count
680 : // to become 0.
681 : //
682 : // Breaking association with the XmppConnectionEndpoint here allows
683 : // a new XmppServerConnection with the same Endpoint to come up. We
684 : // may end up leaking memory if current XmppServerConnection doesn't
685 : // get cleaned up completely, but we at least prevent the other end
686 : // from getting stuck forever.
687 6546 : else if (!parent_->duplicate()) {
688 6546 : server_->InsertDeletedConnection(parent_);
689 6546 : server_->RemoveConnection(parent_);
690 6546 : server_->ReleaseConnectionEndpoint(parent_);
691 : }
692 :
693 6570 : if (parent_->state_machine()) {
694 6570 : parent_->state_machine()->Clear();
695 : }
696 :
697 6570 : XmppSession *session = NULL;
698 6570 : if (parent_->state_machine()) {
699 6570 : session = parent_->state_machine()->session();
700 6570 : parent_->state_machine()->clear_session();
701 : }
702 6570 : if (session) {
703 1479 : server_->DeleteSession(session);
704 : }
705 6570 : }
706 :
707 6570 : virtual void Destroy() {
708 6570 : delete parent_;
709 6570 : }
710 :
711 : private:
712 : XmppServer *server_;
713 : XmppServerConnection *parent_;
714 : };
715 :
716 6570 : XmppServerConnection::XmppServerConnection(XmppServer *server,
717 6570 : const XmppChannelConfig *config)
718 : : XmppConnection(server, config),
719 6570 : duplicate_(false),
720 6570 : on_work_queue_(false),
721 6570 : conn_endpoint_(NULL),
722 6570 : deleter_(new DeleteActor(server, this)),
723 13140 : server_delete_ref_(this, server->deleter()) {
724 6570 : assert(!config->ClientOnly());
725 6570 : XMPP_INFO(XmppConnectionCreate, ToUVEKey(), XMPP_PEER_DIR_IN,
726 : "Server", FromString(), ToString());
727 6570 : }
728 :
729 13127 : XmppServerConnection::~XmppServerConnection() {
730 6570 : CHECK_CONCURRENCY("bgp::Config");
731 :
732 6570 : XMPP_INFO(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA, "Server",
733 : FromString(), ToString());
734 6570 : server()->RemoveDeletedConnection(this);
735 13127 : }
736 :
737 8226 : void XmppServerConnection::ManagedDelete() {
738 8226 : XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
739 : "Managed server connection delete", FromString(), ToString());
740 8226 : deleter_->Delete();
741 8226 : }
742 :
743 6192 : void XmppServerConnection::RetryDelete() {
744 6192 : if (!deleter()->IsDeleted())
745 17 : return;
746 6175 : deleter()->RetryDelete();
747 : }
748 :
749 0 : LifetimeManager *XmppServerConnection::lifetime_manager() {
750 0 : return server()->lifetime_manager();
751 : }
752 :
753 8391 : XmppServer *XmppServerConnection::server() {
754 8391 : return static_cast<XmppServer *>(server_);
755 : }
756 :
757 12415 : LifetimeActor *XmppServerConnection::deleter() {
758 12415 : return deleter_.get();
759 : }
760 :
761 109306 : const LifetimeActor *XmppServerConnection::deleter() const {
762 109306 : return deleter_.get();
763 : }
764 :
765 4 : void XmppServerConnection::set_close_reason(const string &close_reason) {
766 4 : if (conn_endpoint_)
767 0 : conn_endpoint_->set_close_reason(close_reason);
768 :
769 4 : if (!logUVE())
770 4 : return;
771 :
772 0 : XmppPeerInfoData peer_info;
773 0 : peer_info.set_name(ToUVEKey());
774 0 : peer_info.set_close_reason(close_reason);
775 0 : XMPPPeerInfoSend(peer_info);
776 0 : }
777 :
778 1461 : uint32_t XmppServerConnection::flap_count() const {
779 1461 : return conn_endpoint_ ? conn_endpoint_->flap_count() : 0;
780 : }
781 :
782 6303 : void XmppServerConnection::increment_flap_count() {
783 6303 : XmppConnectionEndpoint *conn_endpoint = conn_endpoint_;
784 6303 : if (!conn_endpoint)
785 1447 : conn_endpoint = server()->FindConnectionEndpoint(ToString());
786 6303 : if (!conn_endpoint)
787 6304 : return;
788 6303 : conn_endpoint->increment_flap_count();
789 :
790 6304 : if (!logUVE())
791 6304 : return;
792 :
793 0 : XmppPeerInfoData peer_info;
794 0 : peer_info.set_name(ToUVEKey());
795 0 : PeerFlapInfo flap_info;
796 0 : flap_info.set_flap_count(conn_endpoint->flap_count());
797 0 : flap_info.set_flap_time(conn_endpoint->last_flap());
798 0 : peer_info.set_flap_info(flap_info);
799 0 : XMPPPeerInfoSend(peer_info);
800 0 : }
801 :
802 1461 : const std::string XmppServerConnection::last_flap_at() const {
803 1461 : return conn_endpoint_ ? conn_endpoint_->last_flap_at() : "";
804 : }
805 :
806 72 : void XmppServerConnection::FillShowInfo(
807 : ShowXmppConnection *show_connection) const {
808 72 : show_connection->set_name(ToString());
809 72 : show_connection->set_deleted(IsDeleted());
810 72 : show_connection->set_remote_endpoint(endpoint_string());
811 72 : show_connection->set_local_endpoint(local_endpoint_string());
812 72 : show_connection->set_state(StateName());
813 72 : show_connection->set_last_event(LastEvent());
814 72 : show_connection->set_last_state(LastStateName());
815 72 : show_connection->set_last_state_at(LastStateChangeAt());
816 72 : show_connection->set_receivers(channel_mux()->GetReceiverList());
817 72 : show_connection->set_server_auth_type(GetXmppAuthenticationType());
818 72 : show_connection->set_dscp_value(dscp_value());
819 72 : }
820 :
821 : class XmppClientConnection::DeleteActor : public LifetimeActor {
822 : public:
823 5035 : DeleteActor(XmppClient *client, XmppClientConnection *parent)
824 5035 : : LifetimeActor(client->lifetime_manager()),
825 5035 : client_(client), parent_(parent) {
826 5035 : }
827 :
828 5035 : virtual bool MayDelete() const {
829 5035 : return parent_->MayDelete();
830 : }
831 :
832 5035 : virtual void Shutdown() {
833 5035 : if (parent_->session()) {
834 541 : client_->NotifyConnectionEvent(parent_->ChannelMux(),
835 : xmps::NOT_READY);
836 : }
837 :
838 5035 : XmppSession *session = NULL;
839 5035 : if (parent_->state_machine()) {
840 5035 : session = parent_->state_machine()->session();
841 5035 : parent_->state_machine()->clear_session();
842 : }
843 5035 : if (session) {
844 563 : client_->DeleteSession(session);
845 : }
846 5035 : }
847 :
848 5035 : virtual void Destroy() {
849 5035 : delete parent_;
850 5035 : }
851 :
852 : private:
853 : XmppClient *client_;
854 : XmppClientConnection *parent_;
855 : };
856 :
857 5035 : XmppClientConnection::XmppClientConnection(XmppClient *server,
858 5035 : const XmppChannelConfig *config)
859 : : XmppConnection(server, config),
860 5035 : flap_count_(0),
861 5035 : deleter_(new DeleteActor(server, this)),
862 10070 : server_delete_ref_(this, server->deleter()) {
863 5035 : assert(config->ClientOnly());
864 5035 : XMPP_UTDEBUG(XmppConnectionCreate, ToUVEKey(), XMPP_PEER_DIR_NA, "Client",
865 : FromString(), ToString());
866 5035 : }
867 :
868 10043 : XmppClientConnection::~XmppClientConnection() {
869 5035 : CHECK_CONCURRENCY("bgp::Config");
870 :
871 5035 : XMPP_INFO(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
872 : "Client", FromString(), ToString());
873 5035 : server()->RemoveConnection(this);
874 10043 : }
875 :
876 5035 : void XmppClientConnection::ManagedDelete() {
877 5035 : XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
878 : "Managed Client Delete", FromString(), ToString());
879 5035 : deleter_->Delete();
880 5035 : }
881 :
882 3820 : void XmppClientConnection::RetryDelete() {
883 3820 : if (!deleter()->IsDeleted())
884 3820 : return;
885 0 : deleter()->RetryDelete();
886 : }
887 :
888 0 : LifetimeManager *XmppClientConnection::lifetime_manager() {
889 0 : return server()->lifetime_manager();
890 : }
891 :
892 5035 : XmppClient *XmppClientConnection::server() {
893 5035 : return static_cast<XmppClient *>(server_);
894 : }
895 :
896 3820 : LifetimeActor *XmppClientConnection::deleter() {
897 3820 : return deleter_.get();
898 : }
899 :
900 8882 : const LifetimeActor *XmppClientConnection::deleter() const {
901 8882 : return deleter_.get();
902 : }
903 :
904 3365 : void XmppClientConnection::set_close_reason(const string &close_reason) {
905 3365 : close_reason_ = close_reason;
906 3365 : if (!logUVE())
907 3365 : return;
908 :
909 0 : XmppPeerInfoData peer_info;
910 0 : peer_info.set_name(ToUVEKey());
911 0 : peer_info.set_close_reason(close_reason_);
912 0 : XMPPPeerInfoSend(peer_info);
913 0 : }
914 :
915 3489 : uint32_t XmppClientConnection::flap_count() const {
916 3489 : return flap_count_;
917 : }
918 :
919 5762 : void XmppClientConnection::increment_flap_count() {
920 5762 : flap_count_++;
921 5762 : last_flap_ = UTCTimestampUsec();
922 :
923 5762 : if (!logUVE())
924 5762 : return;
925 :
926 0 : XmppPeerInfoData peer_info;
927 0 : peer_info.set_name(ToUVEKey());
928 0 : PeerFlapInfo flap_info;
929 0 : flap_info.set_flap_count(flap_count_);
930 0 : flap_info.set_flap_time(last_flap_);
931 0 : peer_info.set_flap_info(flap_info);
932 0 : XMPPPeerInfoSend(peer_info);
933 0 : }
934 :
935 0 : const std::string XmppClientConnection::last_flap_at() const {
936 0 : return last_flap_ ? integerToString(UTCUsecToPTime(last_flap_)) : "";
937 : }
938 :
939 4876 : XmppConnectionEndpoint::XmppConnectionEndpoint(const string &client)
940 4876 : : client_(client), flap_count_(0), last_flap_(0), connection_(NULL) {
941 4876 : }
942 :
943 0 : void XmppConnectionEndpoint::set_close_reason(const string &close_reason) {
944 0 : close_reason_ = close_reason;
945 0 : }
946 :
947 2484 : uint32_t XmppConnectionEndpoint::flap_count() const {
948 2484 : return flap_count_;
949 : }
950 :
951 6303 : void XmppConnectionEndpoint::increment_flap_count() {
952 6303 : flap_count_++;
953 6303 : last_flap_ = UTCTimestampUsec();
954 6304 : }
955 :
956 0 : uint64_t XmppConnectionEndpoint::last_flap() const {
957 0 : return last_flap_;
958 : }
959 :
960 1425 : const std::string XmppConnectionEndpoint::last_flap_at() const {
961 1425 : return last_flap_ ? integerToString(UTCUsecToPTime(last_flap_)) : "";
962 : }
963 :
964 11125 : XmppConnection *XmppConnectionEndpoint::connection() {
965 11125 : return connection_;
966 : }
967 :
968 47 : const XmppConnection *XmppConnectionEndpoint::connection() const {
969 47 : return connection_;
970 : }
971 :
972 6203 : void XmppConnectionEndpoint::set_connection(XmppConnection *connection) {
973 6203 : assert(!connection_);
974 6203 : connection_ = connection;
975 6203 : }
976 :
977 6203 : void XmppConnectionEndpoint::reset_connection() {
978 6203 : assert(connection_);
979 6203 : connection_ = NULL;
980 6203 : }
981 :
982 : // Swap relavent contents between two XmppConnection objects.
983 104 : void XmppConnection::SwapContents(XmppConnection *other) {
984 104 : assert(!IsClient());
985 104 : assert(!other->IsClient());
986 : // Update the ConnectionMap in the server as the endpoints are the keys.
987 104 : XmppServer *server = dynamic_cast<XmppServerConnection *>(this)->server();
988 104 : server->SwapXmppConnectionMapEntries(this, other);
989 : // Swap all other connection related information.
990 104 : swap(local_endpoint_, other->local_endpoint_);
991 104 : stats_[0].swap(other->stats_[0]);
992 104 : stats_[1].swap(other->stats_[1]);
993 104 : error_stats_.swap(other->error_stats_);
994 104 : swap(last_msg_, other->last_msg_);
995 104 : swap(to_, other->to_);
996 104 : swap(from_, other->from_);
997 104 : swap(xmlns_, other->xmlns_);
998 104 : swap(dscp_value_, other->dscp_value_);
999 104 : swap(disable_read_, other->disable_read_);
1000 104 : }
|