Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "xmpp/xmpp_connection.h"
6 :
7 : #include <boost/date_time/posix_time/posix_time.hpp>
8 : #include <sstream>
9 :
10 : #include "base/lifetime.h"
11 : #include "base/task_annotations.h"
12 : #include "io/event_manager.h"
13 : #include "xml/xml_base.h"
14 : #include "xmpp/xmpp_client.h"
15 : #include "xmpp/xmpp_config.h"
16 : #include "xmpp/xmpp_factory.h"
17 : #include "xmpp/xmpp_log.h"
18 : #include "xmpp/xmpp_server.h"
19 : #include "xmpp/xmpp_session.h"
20 :
21 : #include "sandesh/common/vns_types.h"
22 : #include "sandesh/common/vns_constants.h"
23 : #include "sandesh/xmpp_client_server_sandesh_types.h"
24 : #include "sandesh/xmpp_message_sandesh_types.h"
25 : #include "sandesh/xmpp_server_types.h"
26 : #include "sandesh/xmpp_state_machine_sandesh_types.h"
27 : #include "sandesh/xmpp_trace_sandesh_types.h"
28 : #include "sandesh/xmpp_peer_info_types.h"
29 :
30 : using namespace std;
31 : using boost::system::error_code;
32 :
33 : const char *XmppConnection::kAuthTypeNil = "NIL";
34 : const char *XmppConnection::kAuthTypeTls = "TLS";
35 :
36 : // Maximum XMPP control message size. Typically this is less then 256 bytes. But
37 : // in scenarios where host names are quite long, we need a larger buffer size.
38 : #define XMPP_CONTROL_MESSAGE_MAX_SIZE 1024
39 :
40 11603 : XmppConnection::XmppConnection(TcpServer *server,
41 11603 : const XmppChannelConfig *config)
42 11603 : : server_(server),
43 11603 : session_(NULL),
44 11603 : endpoint_(config->endpoint),
45 11603 : local_endpoint_(config->local_endpoint),
46 11603 : config_(NULL),
47 23206 : keepalive_timer_(TimerManager::CreateTimer(
48 11603 : *server->event_manager()->io_service(),
49 : "Xmpp keepalive timer",
50 : TaskScheduler::GetInstance()->GetTaskId("xmpp::StateMachine"),
51 11603 : GetTaskInstance(config->ClientOnly()))),
52 11603 : is_client_(config->ClientOnly()),
53 11603 : log_uve_(config->logUVE),
54 11603 : admin_down_(false),
55 11603 : disable_read_(false),
56 11603 : from_(config->FromAddr),
57 11603 : to_(config->ToAddr),
58 11603 : auth_enabled_(config->auth_enabled),
59 11603 : dscp_value_(config->dscp_value), xmlns_(config->xmlns),
60 11603 : state_machine_(XmppStaticObjectFactory::Create<XmppStateMachine>(
61 11603 : this, config->ClientOnly(), config->auth_enabled, config->xmpp_hold_time)),
62 69618 : mux_(XmppStaticObjectFactory::Create<XmppChannelMux>(this)) {
63 11603 : ostringstream oss;
64 11603 : oss << FromString() << ":" << endpoint().address().to_string();
65 11603 : uve_key_str_ = oss.str();
66 11603 : }
67 :
68 11603 : XmppConnection::~XmppConnection() {
69 11603 : StopKeepAliveTimer();
70 11603 : TimerManager::DeleteTimer(keepalive_timer_);
71 11603 : XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
72 : "XmppConnection destructor", FromString(), ToString());
73 11603 : }
74 :
75 1046 : std::string XmppConnection::GetXmppAuthenticationType() const {
76 1046 : if (auth_enabled_) {
77 36 : return (XmppConnection::kAuthTypeTls);
78 : } else {
79 1010 : return (XmppConnection::kAuthTypeNil);
80 : }
81 : }
82 :
83 0 : void XmppConnection::SetConfig(const XmppChannelConfig *config) {
84 0 : config_ = config;
85 0 : }
86 :
87 19314 : void XmppConnection::set_session(XmppSession *session) {
88 19314 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
89 19315 : assert(session);
90 19315 : session_ = session;
91 19315 : if (session_ && dscp_value_) {
92 8 : session_->SetDscpSocketOption(dscp_value_);
93 : }
94 19315 : }
95 :
96 20590 : void XmppConnection::clear_session() {
97 20590 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
98 20590 : if (!session_)
99 1379 : return;
100 19211 : session_->ClearConnection();
101 19210 : session_ = NULL;
102 20589 : }
103 :
104 40 : const XmppSession *XmppConnection::session() const {
105 40 : return session_;
106 : }
107 :
108 17460 : XmppSession *XmppConnection::session() {
109 17460 : return session_;
110 : }
111 :
112 1 : void XmppConnection::WriteReady() {
113 1 : boost::system::error_code ec;
114 1 : mux_->WriteReady(ec);
115 1 : }
116 :
117 128 : void XmppConnection::Shutdown() {
118 128 : ManagedDelete();
119 128 : }
120 :
121 118151 : bool XmppConnection::IsDeleted() const {
122 118151 : return deleter()->IsDeleted();
123 : }
124 :
125 17660 : bool XmppConnection::MayDelete() const {
126 17660 : return !mux_->ReceiverCount() && !mux_->RefererCount();
127 : }
128 :
129 7506 : XmppSession *XmppConnection::CreateSession() {
130 7506 : TcpSession *session = server_->CreateSession();
131 7506 : XmppSession *xmpp_session = static_cast<XmppSession *>(session);
132 7506 : xmpp_session->SetConnection(this);
133 7506 : return xmpp_session;
134 : }
135 :
136 : //
137 : // Return the task instance for this XmppConnection.
138 : // Calculate from the remote IpAddress so that a restarting session uses the
139 : // same value as before.
140 : // Do not make this method virtual since it gets called from the constructor.
141 : //
142 :
143 95504 : int XmppConnection::GetTaskInstance(bool is_client) const {
144 95504 : if (is_client)
145 36507 : return 0;
146 58997 : IpAddress address = endpoint().address();
147 58997 : int thread_count = TaskScheduler::GetInstance()->HardwareThreadCount();
148 58997 : if (address.is_v4()) {
149 58997 : return address.to_v4().to_ulong() % thread_count;
150 : } else {
151 0 : return 0;
152 : }
153 : }
154 :
155 7854680 : xmsm::XmState XmppConnection::GetStateMcState() const {
156 7854680 : const XmppStateMachine *sm = state_machine();
157 7854660 : assert(sm);
158 7854660 : return sm->StateType();
159 : }
160 :
161 4338313 : xmsm::XmOpenConfirmState XmppConnection::GetStateMcOpenConfirmState() const {
162 4338313 : const XmppStateMachine *sm = state_machine();
163 4338303 : assert(sm);
164 4338303 : return sm->OpenConfirmStateType();
165 : }
166 :
167 :
168 58997 : boost::asio::ip::tcp::endpoint XmppConnection::endpoint() const {
169 58997 : return endpoint_;
170 : }
171 :
172 7506 : boost::asio::ip::tcp::endpoint XmppConnection::local_endpoint() const {
173 7506 : return local_endpoint_;
174 : }
175 :
176 72 : string XmppConnection::endpoint_string() const {
177 72 : ostringstream oss;
178 72 : oss << endpoint_;
179 144 : return oss.str();
180 72 : }
181 :
182 72 : string XmppConnection::local_endpoint_string() const {
183 72 : ostringstream oss;
184 72 : oss << local_endpoint_;
185 144 : return oss.str();
186 72 : }
187 :
188 34207 : const string &XmppConnection::FromString() const {
189 34207 : return from_;
190 : }
191 :
192 1824302 : const string &XmppConnection::ToString() const {
193 1824302 : return to_;
194 : }
195 :
196 6192019 : const std::string &XmppConnection::ToUVEKey() const {
197 6192019 : return uve_key_str_;
198 : }
199 :
200 0 : static void XMPPPeerInfoSend(XmppPeerInfoData &peer_info) {
201 0 : assert(!peer_info.get_name().empty());
202 0 : XMPPPeerInfo::Send(peer_info);
203 0 : }
204 :
205 15763 : void XmppConnection::SetTo(const string &to) {
206 15763 : if ((to_.size() == 0) && (to.size() != 0)) {
207 6558 : to_ = to;
208 6558 : if (!logUVE()) return;
209 0 : XmppPeerInfoData peer_info;
210 0 : peer_info.set_name(ToUVEKey());
211 0 : peer_info.set_identifier(to_);
212 0 : XMPPPeerInfoSend(peer_info);
213 0 : }
214 : }
215 :
216 0 : void XmppConnection::SetAdminDown(bool toggle) {
217 : // TODO: generate state machine event.
218 0 : admin_down_ = toggle;
219 0 : }
220 :
221 6520 : bool XmppConnection::AcceptSession(XmppSession *session) {
222 6520 : session->SetConnection(this);
223 6520 : return state_machine_->PassiveOpen(session);
224 : }
225 :
226 1618134 : bool XmppConnection::Send(const uint8_t *data, size_t size,
227 : const string *msg_str) {
228 1618134 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
229 1618104 : if (session_ == NULL) {
230 6 : return false;
231 : }
232 :
233 1618098 : TcpSession::Endpoint endpoint = session_->remote_endpoint();
234 1617941 : const string &endpoint_addr_str = session_->remote_addr_string();
235 1617905 : string str;
236 1617901 : if (!msg_str) {
237 57137 : str.append(reinterpret_cast<const char *>(data), size);
238 57137 : msg_str = &str;
239 : }
240 :
241 1617562 : if (!(mux_ &&
242 1617823 : (mux_->TxMessageTrace(endpoint_addr_str, endpoint.port(),
243 1617901 : size, *msg_str, NULL)))) {
244 1617569 : XMPP_MESSAGE_TRACE(XmppTxStream,
245 : endpoint_addr_str, endpoint.port(), size, *msg_str);
246 : }
247 :
248 1618255 : stats_[1].update++;
249 : size_t sent;
250 1618267 : return session_->Send(data, size, &sent);
251 1618164 : }
252 :
253 11 : int XmppConnection::SetDscpValue(uint8_t value) {
254 11 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
255 11 : dscp_value_ = value;
256 11 : if (!session_) {
257 0 : return 0;
258 : }
259 11 : return session_->SetDscpSocketOption(value);
260 11 : }
261 :
262 8041 : bool XmppConnection::SendOpen(XmppSession *session) {
263 8041 : if (!session) return false;
264 8041 : XmppProto::XmppStanza::XmppStreamMessage openstream;
265 8041 : openstream.strmtype = XmppStanza::XmppStreamMessage::INIT_STREAM_HEADER;
266 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
267 8041 : int len = XmppProto::EncodeStream(openstream, to_, from_, xmlns_, data,
268 : sizeof(data));
269 8041 : if (len <= 0) {
270 0 : inc_open_fail();
271 0 : return false;
272 : } else {
273 8041 : XMPP_UTDEBUG(XmppOpen, ToUVEKey(), XMPP_PEER_DIR_OUT, len, from_, to_,
274 : xmlns_);
275 8041 : session->Send(data, len, NULL);
276 8041 : stats_[1].open++;
277 8041 : return true;
278 : }
279 8041 : }
280 :
281 7818 : bool XmppConnection::SendOpenConfirm(XmppSession *session) {
282 7818 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
283 7818 : if (!session_) return false;
284 7818 : XmppStanza::XmppStreamMessage openstream;
285 7817 : openstream.strmtype = XmppStanza::XmppStreamMessage::INIT_STREAM_HEADER_RESP;
286 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
287 7817 : int len = XmppProto::EncodeStream(openstream, to_, from_, xmlns_, data,
288 : sizeof(data));
289 7818 : if (len <= 0) {
290 0 : inc_open_fail();
291 0 : return false;
292 : } else {
293 7818 : XMPP_UTDEBUG(XmppOpenConfirm, ToUVEKey(), XMPP_PEER_DIR_OUT, len,
294 : from_, to_);
295 7818 : session_->Send(data, len, NULL);
296 7818 : stats_[1].open++;
297 7818 : return true;
298 : }
299 7818 : }
300 :
301 1510 : bool XmppConnection::SendStreamFeatureRequest(XmppSession *session) {
302 1510 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
303 1510 : if (!session_) return false;
304 1510 : XmppStanza::XmppStreamMessage featurestream;
305 1510 : featurestream.strmtype = XmppStanza::XmppStreamMessage::FEATURE_TLS;
306 1510 : featurestream.strmtlstype = XmppStanza::XmppStreamMessage::TLS_FEATURE_REQUEST;
307 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
308 1510 : int len = XmppProto::EncodeStream(featurestream, to_, from_, xmlns_, data,
309 : sizeof(data));
310 1510 : if (len <= 0) {
311 0 : inc_stream_feature_fail();
312 0 : return false;
313 : } else {
314 1510 : XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
315 : "Send Stream Feature Request", len, from_, to_);
316 1510 : session_->Send(data, len, NULL);
317 : //stats_[1].open++;
318 1510 : return true;
319 : }
320 1510 : }
321 :
322 1469 : bool XmppConnection::SendStartTls(XmppSession *session) {
323 1469 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
324 1469 : if (!session_) return false;
325 1469 : XmppStanza::XmppStreamMessage stream;
326 1469 : stream.strmtype = XmppStanza::XmppStreamMessage::FEATURE_TLS;
327 1469 : stream.strmtlstype = XmppStanza::XmppStreamMessage::TLS_START;
328 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
329 1469 : int len = XmppProto::EncodeStream(stream, to_, from_, xmlns_, data,
330 : sizeof(data));
331 1469 : if (len <= 0) {
332 0 : inc_stream_feature_fail();
333 0 : return false;
334 : } else {
335 1469 : XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
336 : "Send Start Tls", len, from_, to_);
337 1469 : session_->Send(data, len, NULL);
338 : //stats_[1].open++;
339 1469 : return true;
340 : }
341 1469 : }
342 :
343 1464 : bool XmppConnection::SendProceedTls(XmppSession *session) {
344 1464 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
345 1464 : if (!session_) return false;
346 1464 : XmppStanza::XmppStreamMessage stream;
347 1464 : stream.strmtype = XmppStanza::XmppStreamMessage::FEATURE_TLS;
348 1464 : stream.strmtlstype = XmppStanza::XmppStreamMessage::TLS_PROCEED;
349 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
350 1464 : int len = XmppProto::EncodeStream(stream, to_, from_, xmlns_, data,
351 : sizeof(data));
352 1464 : if (len <= 0) {
353 0 : inc_stream_feature_fail();
354 0 : return false;
355 : } else {
356 1464 : XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
357 : "Send Proceed Tls", len, from_, to_);
358 1464 : session_->Send(data, len, NULL);
359 : //stats_[1].open++;
360 1464 : return true;
361 : }
362 1464 : }
363 :
364 2 : void XmppConnection::SendClose(XmppSession *session) {
365 2 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
366 2 : if (!session_) return;
367 1 : string str("</stream:stream>");
368 : uint8_t data[64];
369 1 : memcpy(data, str.data(), str.size());
370 1 : XMPP_UTDEBUG(XmppClose, ToUVEKey(), XMPP_PEER_DIR_OUT, str.size(), from_,
371 : to_);
372 1 : session_->Send(data, str.size(), NULL);
373 1 : stats_[1].close++;
374 2 : }
375 :
376 2910 : void XmppConnection::ProcessSslHandShakeResponse(SslSessionPtr session,
377 : const boost::system::error_code& error) {
378 2910 : if (!state_machine())
379 0 : return;
380 :
381 2910 : if (error) {
382 0 : inc_handshake_failure();
383 :
384 0 : if (error.category() == boost::asio::error::get_ssl_category()) {
385 0 : string err = error.message();
386 0 : err = string(" (")
387 0 : +boost::lexical_cast<string>(ERR_GET_LIB(error.value()))+","
388 0 : +boost::lexical_cast<string>(ERR_GET_REASON(error.value()))+") ";
389 :
390 : char buf[128];
391 0 : ::ERR_error_string_n(error.value(), buf, sizeof(buf));
392 0 : err += buf;
393 0 : XMPP_ALERT(XmppSslHandShakeFailure, ToUVEKey(), XMPP_PEER_DIR_IN,
394 : "failure", err);
395 0 : }
396 :
397 0 : state_machine()->OnEvent(session.get(), xmsm::EvTLSHANDSHAKE_FAILURE);
398 :
399 : } else {
400 2910 : XMPP_DEBUG(XmppSslHandShakeMessage, session->ToUVEKey(),
401 : XMPP_PEER_DIR_IN, "success", "");
402 2910 : state_machine()->OnEvent(session.get(), xmsm::EvTLSHANDSHAKE_SUCCESS);
403 : }
404 : }
405 :
406 0 : void XmppConnection::LogMsg(std::string msg) {
407 0 : log4cplus::Logger logger = log4cplus::Logger::getRoot();
408 0 : LOG4CPLUS_DEBUG(logger, msg << ToString() << " " <<
409 : local_endpoint_.address() << ":" << local_endpoint_.port() << "::" <<
410 : endpoint_.address() << ":" << endpoint_.port());
411 0 : }
412 :
413 6440 : void XmppConnection::LogKeepAliveSend() {
414 : static bool init_ = false;
415 : static bool log_ = false;
416 :
417 6440 : if (!init_) {
418 61 : char *str = getenv("XMPP_ASSERT_ON_HOLD_TIMEOUT");
419 61 : if (str && strtoul(str, NULL, 0) != 0) log_ = true;
420 61 : init_ = true;
421 : }
422 :
423 6440 : if (log_) LogMsg("SEND KEEPALIVE: ");
424 6440 : }
425 :
426 6440 : void XmppConnection::SendKeepAlive() {
427 6440 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
428 6440 : if (!session_) return;
429 6440 : XmppStanza::XmppMessage msg(XmppStanza::WHITESPACE_MESSAGE_STANZA);
430 : uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
431 6440 : int len = XmppProto::EncodeStream(msg, data, sizeof(data));
432 6440 : assert(len > 0);
433 6440 : session_->Send(data, len, NULL);
434 6440 : stats_[1].keepalive++;
435 6440 : LogKeepAliveSend();
436 6440 : }
437 :
438 143 : bool XmppConnection::KeepAliveTimerExpired() {
439 143 : if (state_machine_->get_state() != xmsm::ESTABLISHED)
440 0 : return false;
441 :
442 : // TODO: check timestamp of last received packet.
443 143 : SendKeepAlive();
444 :
445 : //
446 : // Start the timer again, by returning true
447 : //
448 143 : return true;
449 : }
450 :
451 0 : void XmppConnection::KeepaliveTimerErrorHanlder(string error_name,
452 : string error_message) {
453 0 : XMPP_WARNING(XmppKeepaliveTimeError, ToUVEKey(), XMPP_PEER_DIR_NA,
454 : error_name, error_message);
455 0 : }
456 :
457 12629 : void XmppConnection::StartKeepAliveTimer() {
458 12629 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
459 12629 : if (!session_)
460 0 : return;
461 :
462 12629 : int holdtime_msecs = state_machine_->hold_time_msecs();
463 12629 : if (holdtime_msecs <= 0)
464 0 : return;
465 :
466 12629 : keepalive_timer_->Start(holdtime_msecs / 3,
467 : boost::bind(&XmppConnection::KeepAliveTimerExpired, this),
468 : boost::bind(&XmppConnection::KeepaliveTimerErrorHanlder, this, _1, _2));
469 12629 : }
470 :
471 24057 : void XmppConnection::StopKeepAliveTimer() {
472 24057 : tbb::spin_mutex::scoped_lock lock(spin_mutex_);
473 24059 : keepalive_timer_->Cancel();
474 24057 : }
475 :
476 24 : void XmppConnection::UpdateKeepAliveTimer(uint8_t time_out) {
477 24 : state_machine_->set_hold_time(time_out);
478 24 : StopKeepAliveTimer();
479 24 : StartKeepAliveTimer();
480 24 : }
481 :
482 75326 : XmppStateMachine *XmppConnection::state_machine() {
483 75326 : return state_machine_.get();
484 : }
485 :
486 12192781 : const XmppStateMachine *XmppConnection::state_machine() const {
487 12192781 : return state_machine_.get();
488 : }
489 :
490 72 : const XmppChannelMux *XmppConnection::channel_mux() const {
491 72 : return mux_.get();
492 : }
493 :
494 4139868 : void XmppConnection::IncProtoStats(unsigned int type) {
495 4139868 : switch (type) {
496 20070 : case XmppStanza::STREAM_HEADER:
497 20070 : stats_[0].open++;
498 20069 : break;
499 2471401 : case XmppStanza::WHITESPACE_MESSAGE_STANZA:
500 2471401 : stats_[0].keepalive++;
501 2471401 : break;
502 98547 : case XmppStanza::IQ_STANZA:
503 98547 : stats_[0].update++;
504 98552 : break;
505 1549872 : case XmppStanza::MESSAGE_STANZA:
506 1549872 : stats_[0].update++;
507 1549872 : break;
508 : }
509 4139872 : }
510 :
511 920 : void XmppConnection::inc_connect_error() {
512 920 : error_stats_.connect_error++;
513 920 : }
514 :
515 6440 : void XmppConnection::inc_session_close() {
516 6440 : error_stats_.session_close++;
517 6440 : }
518 :
519 0 : void XmppConnection::inc_open_fail() {
520 0 : error_stats_.open_fail++;
521 0 : }
522 :
523 0 : void XmppConnection::inc_stream_feature_fail() {
524 0 : error_stats_.stream_feature_fail++;
525 0 : }
526 :
527 0 : void XmppConnection::inc_handshake_failure() {
528 0 : error_stats_.handshake_fail++;
529 0 : }
530 :
531 792 : size_t XmppConnection::get_connect_error() {
532 792 : return error_stats_.connect_error;
533 : }
534 :
535 576 : size_t XmppConnection::get_session_close() {
536 576 : return error_stats_.session_close;
537 : }
538 :
539 0 : size_t XmppConnection::get_open_fail() {
540 0 : return error_stats_.open_fail;
541 : }
542 :
543 0 : size_t XmppConnection::get_stream_feature_fail() {
544 0 : return error_stats_.stream_feature_fail;
545 : }
546 :
547 0 : size_t XmppConnection::get_handshake_failure() {
548 0 : return error_stats_.handshake_fail;
549 : }
550 :
551 72 : size_t XmppConnection::get_sm_connect_attempts() {
552 72 : return state_machine_->get_connect_attempts();
553 : }
554 :
555 261 : size_t XmppConnection::get_sm_keepalive_count() {
556 261 : return state_machine_->get_keepalive_count();
557 : }
558 :
559 4139866 : void XmppConnection::ReceiveMsg(XmppSession *session, const string &msg) {
560 4139866 : XmppStanza::XmppMessage *minfo = XmppDecode(msg);
561 :
562 4139823 : if (minfo) {
563 4098213 : session->IncStats((unsigned int)minfo->type, msg.size());
564 4098210 : if (minfo->type != XmppStanza::WHITESPACE_MESSAGE_STANZA) {
565 1626827 : if (!(mux_ &&
566 3253654 : (mux_->RxMessageTrace(session->
567 3253652 : remote_endpoint().address().to_string(),
568 3253651 : session->remote_endpoint().port(),
569 3253649 : msg.size(), msg, minfo)))) {
570 1626827 : XMPP_MESSAGE_TRACE(XmppRxStream,
571 : session->
572 : remote_endpoint().address().to_string(),
573 : session->
574 : remote_endpoint().port(), msg.size(), msg);
575 : }
576 : }
577 4098212 : IncProtoStats((unsigned int)minfo->type);
578 4098230 : state_machine_->OnMessage(session, minfo);
579 41610 : } else if ((minfo = last_msg_.get()) != NULL) {
580 41656 : session->IncStats((unsigned int)minfo->type, msg.size());
581 41656 : IncProtoStats((unsigned int)minfo->type);
582 : } else {
583 0 : session->IncStats(XmppStanza::INVALID, msg.size());
584 0 : XMPP_MESSAGE_TRACE(XmppRxStreamInvalid,
585 : session->remote_endpoint().address().to_string(),
586 : session->remote_endpoint().port(), msg.size(), msg);
587 : }
588 4139879 : return;
589 : }
590 :
591 4139865 : XmppStanza::XmppMessage *XmppConnection::XmppDecode(const string &msg) {
592 4139865 : unique_ptr<XmppStanza::XmppMessage> minfo(XmppProto::Decode(this, msg));
593 4139845 : if (minfo.get() == NULL) {
594 0 : XMPP_INFO(XmppSessionDelete, ToUVEKey(), XMPP_PEER_DIR_IN, "Server",
595 : FromString(), ToString());
596 0 : Clear();
597 0 : return NULL;
598 : }
599 :
600 4139836 : if (minfo->type == XmppStanza::IQ_STANZA) {
601 : const XmppStanza::XmppMessageIq *iq =
602 98546 : static_cast<const XmppStanza::XmppMessageIq *>(minfo.get());
603 :
604 :
605 98545 : if (iq->action.compare("publish") == 0) {
606 41656 : last_msg_.reset(minfo.release());
607 41655 : return NULL;
608 : }
609 :
610 56891 : if (iq->action.compare("collection") == 0) {
611 41661 : if (last_msg_.get() != NULL) {
612 : XmppStanza::XmppMessageIq *last_iq =
613 41661 : static_cast<XmppStanza::XmppMessageIq *>(last_msg_.get());
614 :
615 41661 : if (last_iq->node.compare(iq->as_node) == 0) {
616 41661 : XmlBase *impl = last_iq->dom.get();
617 41661 : impl->ReadNode("publish");
618 41659 : impl->ModifyAttribute("node", iq->node);
619 41660 : last_iq->node = impl->ReadAttrib("node");
620 41661 : last_iq->is_as_node = iq->is_as_node;
621 : //Save the complete ass/dissociate node
622 41661 : last_iq->as_node = iq->as_node;
623 : } else {
624 0 : XMPP_WARNING(XmppIqMessageInvalid, ToUVEKey(),
625 : XMPP_PEER_DIR_IN);
626 0 : goto error;
627 : }
628 : } else {
629 0 : XMPP_ERROR(XmppIqCollectionError, ToUVEKey(), XMPP_PEER_DIR_IN);
630 0 : goto error;
631 : }
632 : // iq message merged with collection info
633 41660 : return last_msg_.release();
634 : }
635 : }
636 4056505 : return minfo.release();
637 :
638 0 : error:
639 0 : last_msg_.reset();
640 0 : return NULL;
641 4139876 : }
642 :
643 1549743 : int XmppConnection::ProcessXmppChatMessage(
644 : const XmppStanza::XmppChatMessage *msg) {
645 1549743 : mux_->ProcessXmppMessage(msg);
646 1549743 : return 0;
647 : }
648 :
649 56879 : int XmppConnection::ProcessXmppIqMessage(const XmppStanza::XmppMessage *msg) {
650 56879 : mux_->ProcessXmppMessage(msg);
651 56879 : return 0;
652 : }
653 :
654 : class XmppServerConnection::DeleteActor : public LifetimeActor {
655 : public:
656 6567 : DeleteActor(XmppServer *server, XmppServerConnection *parent)
657 6567 : : LifetimeActor(server->lifetime_manager()),
658 6567 : server_(server), parent_(parent) {
659 6567 : }
660 :
661 12648 : virtual bool MayDelete() const {
662 12648 : return (!parent_->on_work_queue() && parent_->MayDelete());
663 : }
664 :
665 6567 : virtual void Shutdown() {
666 6567 : CHECK_CONCURRENCY("bgp::Config");
667 :
668 : // If the connection is still on the WorkQueue, simply add it to the
669 : // ConnectionSet. It won't be on the ConnectionMap.
670 6567 : if (parent_->on_work_queue()) {
671 24 : server_->InsertDeletedConnection(parent_);
672 : }
673 :
674 : // If the connection was rejected as duplicate, it will already be in
675 : // the ConnectionSet. Non-duplicate connections need to be moved from
676 : // from the ConnectionMap into the ConnectionSet. We add it to the
677 : // ConnectionSet and then remove it from ConnectionMap to ensure that
678 : // the XmppServer connection count doesn't temporarily become 0. This
679 : // is friendly to tests that wait for the XmppServer connection count
680 : // to become 0.
681 : //
682 : // Breaking association with the XmppConnectionEndpoint here allows
683 : // a new XmppServerConnection with the same Endpoint to come up. We
684 : // may end up leaking memory if current XmppServerConnection doesn't
685 : // get cleaned up completely, but we at least prevent the other end
686 : // from getting stuck forever.
687 6543 : else if (!parent_->duplicate()) {
688 6543 : server_->InsertDeletedConnection(parent_);
689 6543 : server_->RemoveConnection(parent_);
690 6543 : server_->ReleaseConnectionEndpoint(parent_);
691 : }
692 :
693 6567 : if (parent_->state_machine()) {
694 6567 : parent_->state_machine()->Clear();
695 : }
696 :
697 6567 : XmppSession *session = NULL;
698 6567 : if (parent_->state_machine()) {
699 6567 : session = parent_->state_machine()->session();
700 6567 : parent_->state_machine()->clear_session();
701 : }
702 6567 : if (session) {
703 1475 : server_->DeleteSession(session);
704 : }
705 6567 : }
706 :
707 6567 : virtual void Destroy() {
708 6567 : delete parent_;
709 6567 : }
710 :
711 : private:
712 : XmppServer *server_;
713 : XmppServerConnection *parent_;
714 : };
715 :
716 6567 : XmppServerConnection::XmppServerConnection(XmppServer *server,
717 6567 : const XmppChannelConfig *config)
718 : : XmppConnection(server, config),
719 6567 : duplicate_(false),
720 6567 : on_work_queue_(false),
721 6567 : conn_endpoint_(NULL),
722 6567 : deleter_(new DeleteActor(server, this)),
723 13134 : server_delete_ref_(this, server->deleter()) {
724 6567 : assert(!config->ClientOnly());
725 6567 : XMPP_INFO(XmppConnectionCreate, ToUVEKey(), XMPP_PEER_DIR_IN,
726 : "Server", FromString(), ToString());
727 6567 : }
728 :
729 13121 : XmppServerConnection::~XmppServerConnection() {
730 6567 : CHECK_CONCURRENCY("bgp::Config");
731 :
732 6567 : XMPP_INFO(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA, "Server",
733 : FromString(), ToString());
734 6567 : server()->RemoveDeletedConnection(this);
735 13121 : }
736 :
737 8209 : void XmppServerConnection::ManagedDelete() {
738 8209 : XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
739 : "Managed server connection delete", FromString(), ToString());
740 8209 : deleter_->Delete();
741 8209 : }
742 :
743 6194 : void XmppServerConnection::RetryDelete() {
744 6194 : if (!deleter()->IsDeleted())
745 17 : return;
746 6177 : deleter()->RetryDelete();
747 : }
748 :
749 0 : LifetimeManager *XmppServerConnection::lifetime_manager() {
750 0 : return server()->lifetime_manager();
751 : }
752 :
753 8381 : XmppServer *XmppServerConnection::server() {
754 8381 : return static_cast<XmppServer *>(server_);
755 : }
756 :
757 12419 : LifetimeActor *XmppServerConnection::deleter() {
758 12419 : return deleter_.get();
759 : }
760 :
761 109269 : const LifetimeActor *XmppServerConnection::deleter() const {
762 109269 : return deleter_.get();
763 : }
764 :
765 4 : void XmppServerConnection::set_close_reason(const string &close_reason) {
766 4 : if (conn_endpoint_)
767 0 : conn_endpoint_->set_close_reason(close_reason);
768 :
769 4 : if (!logUVE())
770 4 : return;
771 :
772 0 : XmppPeerInfoData peer_info;
773 0 : peer_info.set_name(ToUVEKey());
774 0 : peer_info.set_close_reason(close_reason);
775 0 : XMPPPeerInfoSend(peer_info);
776 0 : }
777 :
778 1461 : uint32_t XmppServerConnection::flap_count() const {
779 1461 : return conn_endpoint_ ? conn_endpoint_->flap_count() : 0;
780 : }
781 :
782 6303 : void XmppServerConnection::increment_flap_count() {
783 6303 : XmppConnectionEndpoint *conn_endpoint = conn_endpoint_;
784 6303 : if (!conn_endpoint)
785 1441 : conn_endpoint = server()->FindConnectionEndpoint(ToString());
786 6304 : if (!conn_endpoint)
787 6304 : return;
788 6304 : conn_endpoint->increment_flap_count();
789 :
790 6304 : if (!logUVE())
791 6304 : return;
792 :
793 0 : XmppPeerInfoData peer_info;
794 0 : peer_info.set_name(ToUVEKey());
795 0 : PeerFlapInfo flap_info;
796 0 : flap_info.set_flap_count(conn_endpoint->flap_count());
797 0 : flap_info.set_flap_time(conn_endpoint->last_flap());
798 0 : peer_info.set_flap_info(flap_info);
799 0 : XMPPPeerInfoSend(peer_info);
800 0 : }
801 :
802 1461 : const std::string XmppServerConnection::last_flap_at() const {
803 1461 : return conn_endpoint_ ? conn_endpoint_->last_flap_at() : "";
804 : }
805 :
806 72 : void XmppServerConnection::FillShowInfo(
807 : ShowXmppConnection *show_connection) const {
808 72 : show_connection->set_name(ToString());
809 72 : show_connection->set_deleted(IsDeleted());
810 72 : show_connection->set_remote_endpoint(endpoint_string());
811 72 : show_connection->set_local_endpoint(local_endpoint_string());
812 72 : show_connection->set_state(StateName());
813 72 : show_connection->set_last_event(LastEvent());
814 72 : show_connection->set_last_state(LastStateName());
815 72 : show_connection->set_last_state_at(LastStateChangeAt());
816 72 : show_connection->set_receivers(channel_mux()->GetReceiverList());
817 72 : show_connection->set_server_auth_type(GetXmppAuthenticationType());
818 72 : show_connection->set_dscp_value(dscp_value());
819 72 : }
820 :
821 : class XmppClientConnection::DeleteActor : public LifetimeActor {
822 : public:
823 5036 : DeleteActor(XmppClient *client, XmppClientConnection *parent)
824 5036 : : LifetimeActor(client->lifetime_manager()),
825 5036 : client_(client), parent_(parent) {
826 5036 : }
827 :
828 5036 : virtual bool MayDelete() const {
829 5036 : return parent_->MayDelete();
830 : }
831 :
832 5036 : virtual void Shutdown() {
833 5036 : if (parent_->session()) {
834 541 : client_->NotifyConnectionEvent(parent_->ChannelMux(),
835 : xmps::NOT_READY);
836 : }
837 :
838 5036 : XmppSession *session = NULL;
839 5036 : if (parent_->state_machine()) {
840 5036 : session = parent_->state_machine()->session();
841 5036 : parent_->state_machine()->clear_session();
842 : }
843 5036 : if (session) {
844 562 : client_->DeleteSession(session);
845 : }
846 5036 : }
847 :
848 5036 : virtual void Destroy() {
849 5036 : delete parent_;
850 5036 : }
851 :
852 : private:
853 : XmppClient *client_;
854 : XmppClientConnection *parent_;
855 : };
856 :
857 5036 : XmppClientConnection::XmppClientConnection(XmppClient *server,
858 5036 : const XmppChannelConfig *config)
859 : : XmppConnection(server, config),
860 5036 : flap_count_(0),
861 5036 : deleter_(new DeleteActor(server, this)),
862 10072 : server_delete_ref_(this, server->deleter()) {
863 5036 : assert(config->ClientOnly());
864 5036 : XMPP_UTDEBUG(XmppConnectionCreate, ToUVEKey(), XMPP_PEER_DIR_NA, "Client",
865 : FromString(), ToString());
866 5036 : }
867 :
868 10045 : XmppClientConnection::~XmppClientConnection() {
869 5036 : CHECK_CONCURRENCY("bgp::Config");
870 :
871 5036 : XMPP_INFO(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
872 : "Client", FromString(), ToString());
873 5036 : server()->RemoveConnection(this);
874 10045 : }
875 :
876 5036 : void XmppClientConnection::ManagedDelete() {
877 5036 : XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
878 : "Managed Client Delete", FromString(), ToString());
879 5036 : deleter_->Delete();
880 5036 : }
881 :
882 3821 : void XmppClientConnection::RetryDelete() {
883 3821 : if (!deleter()->IsDeleted())
884 3821 : return;
885 0 : deleter()->RetryDelete();
886 : }
887 :
888 0 : LifetimeManager *XmppClientConnection::lifetime_manager() {
889 0 : return server()->lifetime_manager();
890 : }
891 :
892 5036 : XmppClient *XmppClientConnection::server() {
893 5036 : return static_cast<XmppClient *>(server_);
894 : }
895 :
896 3821 : LifetimeActor *XmppClientConnection::deleter() {
897 3821 : return deleter_.get();
898 : }
899 :
900 8884 : const LifetimeActor *XmppClientConnection::deleter() const {
901 8884 : return deleter_.get();
902 : }
903 :
904 3363 : void XmppClientConnection::set_close_reason(const string &close_reason) {
905 3363 : close_reason_ = close_reason;
906 3363 : if (!logUVE())
907 3363 : return;
908 :
909 0 : XmppPeerInfoData peer_info;
910 0 : peer_info.set_name(ToUVEKey());
911 0 : peer_info.set_close_reason(close_reason_);
912 0 : XMPPPeerInfoSend(peer_info);
913 0 : }
914 :
915 3417 : uint32_t XmppClientConnection::flap_count() const {
916 3417 : return flap_count_;
917 : }
918 :
919 5761 : void XmppClientConnection::increment_flap_count() {
920 5761 : flap_count_++;
921 5761 : last_flap_ = UTCTimestampUsec();
922 :
923 5761 : if (!logUVE())
924 5761 : return;
925 :
926 0 : XmppPeerInfoData peer_info;
927 0 : peer_info.set_name(ToUVEKey());
928 0 : PeerFlapInfo flap_info;
929 0 : flap_info.set_flap_count(flap_count_);
930 0 : flap_info.set_flap_time(last_flap_);
931 0 : peer_info.set_flap_info(flap_info);
932 0 : XMPPPeerInfoSend(peer_info);
933 0 : }
934 :
935 0 : const std::string XmppClientConnection::last_flap_at() const {
936 0 : return last_flap_ ? integerToString(UTCUsecToPTime(last_flap_)) : "";
937 : }
938 :
939 4876 : XmppConnectionEndpoint::XmppConnectionEndpoint(const string &client)
940 4876 : : client_(client), flap_count_(0), last_flap_(0), connection_(NULL) {
941 4876 : }
942 :
943 0 : void XmppConnectionEndpoint::set_close_reason(const string &close_reason) {
944 0 : close_reason_ = close_reason;
945 0 : }
946 :
947 2480 : uint32_t XmppConnectionEndpoint::flap_count() const {
948 2480 : return flap_count_;
949 : }
950 :
951 6304 : void XmppConnectionEndpoint::increment_flap_count() {
952 6304 : flap_count_++;
953 6304 : last_flap_ = UTCTimestampUsec();
954 6304 : }
955 :
956 0 : uint64_t XmppConnectionEndpoint::last_flap() const {
957 0 : return last_flap_;
958 : }
959 :
960 1425 : const std::string XmppConnectionEndpoint::last_flap_at() const {
961 1425 : return last_flap_ ? integerToString(UTCUsecToPTime(last_flap_)) : "";
962 : }
963 :
964 11106 : XmppConnection *XmppConnectionEndpoint::connection() {
965 11106 : return connection_;
966 : }
967 :
968 47 : const XmppConnection *XmppConnectionEndpoint::connection() const {
969 47 : return connection_;
970 : }
971 :
972 6204 : void XmppConnectionEndpoint::set_connection(XmppConnection *connection) {
973 6204 : assert(!connection_);
974 6204 : connection_ = connection;
975 6204 : }
976 :
977 6204 : void XmppConnectionEndpoint::reset_connection() {
978 6204 : assert(connection_);
979 6204 : connection_ = NULL;
980 6204 : }
981 :
982 : // Swap relavent contents between two XmppConnection objects.
983 104 : void XmppConnection::SwapContents(XmppConnection *other) {
984 104 : assert(!IsClient());
985 104 : assert(!other->IsClient());
986 : // Update the ConnectionMap in the server as the endpoints are the keys.
987 104 : XmppServer *server = dynamic_cast<XmppServerConnection *>(this)->server();
988 104 : server->SwapXmppConnectionMapEntries(this, other);
989 : // Swap all other connection related information.
990 104 : swap(local_endpoint_, other->local_endpoint_);
991 104 : stats_[0].swap(other->stats_[0]);
992 104 : stats_[1].swap(other->stats_[1]);
993 104 : error_stats_.swap(other->error_stats_);
994 104 : swap(last_msg_, other->last_msg_);
995 104 : swap(to_, other->to_);
996 104 : swap(from_, other->from_);
997 104 : swap(xmlns_, other->xmlns_);
998 104 : swap(dscp_value_, other->dscp_value_);
999 104 : swap(disable_read_, other->disable_read_);
1000 104 : }
|