Line data Source code
1 : /* 2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include "base/regex.h" 6 : #include "xmpp/xmpp_session.h" 7 : 8 : #include "xmpp/xmpp_connection.h" 9 : #include "xmpp/xmpp_log.h" 10 : #include "xmpp/xmpp_proto.h" 11 : #include "xmpp/xmpp_server.h" 12 : #include "xmpp/xmpp_state_machine.h" 13 : 14 : #include "sandesh/sandesh_trace.h" 15 : #include "sandesh/xmpp_trace_sandesh_types.h" 16 : 17 : using namespace std; 18 : using contrail::regex; 19 : using contrail::regex_match; 20 : using contrail::regex_search; 21 : 22 : using boost::asio::mutable_buffer; 23 : 24 : const regex XmppSession::patt_(rXMPP_MESSAGE); 25 : const regex XmppSession::stream_patt_(rXMPP_STREAM_START); 26 : const regex XmppSession::stream_res_end_(rXMPP_STREAM_END); 27 : const regex XmppSession::whitespace_(sXMPP_WHITESPACE); 28 : const regex XmppSession::stream_features_patt_(rXMPP_STREAM_FEATURES); 29 : const regex XmppSession::starttls_patt_(rXMPP_STREAM_STARTTLS); 30 : const regex XmppSession::proceed_patt_(rXMPP_STREAM_PROCEED); 31 : const regex XmppSession::end_patt_(rXMPP_STREAM_STANZA_END); 32 : 33 14048 : XmppSession::XmppSession(XmppConnectionManager *manager, SslSocket *socket, 34 14048 : bool async_ready) 35 : : SslSession(manager, socket, async_ready), 36 14048 : manager_(manager), 37 14048 : connection_(NULL), 38 14048 : tag_known_(0), 39 14048 : task_instance_(-1), 40 14048 : stats_(XmppStanza::RESERVED_STANZA, XmppSession::StatsPair(0, 0)), 41 14048 : keepalive_probes_(kSessionKeepaliveProbes) { 42 14047 : buf_.reserve(kMaxMessageSize); 43 14048 : offset_ = buf_.begin(); 44 14048 : stream_open_matched_ = false; 45 14048 : } 46 : 47 28094 : XmppSession::~XmppSession() { 48 14048 : set_observer(NULL); 49 14047 : connection_ = NULL; 50 28093 : } 51 : 52 14127 : void XmppSession::SetConnection(XmppConnection *connection) { 53 14127 : assert(connection); 54 14127 : connection_ = connection; 55 14127 : task_instance_ = connection_->GetTaskInstance(); 56 14127 : } 57 : 58 : // 59 : // Dissociate the connection from the this XmppSession. 60 : // Do not invalidate the task_instance since it can be used to spawn an 61 : // io::ReaderTask while this method is being executed. 62 : // 63 33256 : void XmppSession::ClearConnection() { 64 33256 : connection_ = NULL; 65 33256 : } 66 : 67 : // 68 : // Concurrency: called in the context of bgp::Config task. 69 : // 70 : // Process write ready callback. 71 : // 72 0 : void XmppSession::ProcessWriteReady() { 73 0 : if (!connection_) 74 0 : return; 75 0 : connection_->WriteReady(); 76 : } 77 : 78 : // 79 : // Concurrency: called in the context of io thread. 80 : // 81 : // Handle write ready callback. 82 : // 83 : // Enqueue session to the XmppConnectionManager. The session is added to a 84 : // WorkQueue gets processed in the context of bgp::Config task. Doing this 85 : // ensures that we don't access the XmppConnection while the XmppConnection 86 : // is trying to clear our back pointer to it. 87 : // 88 : // We can ignore any errors since the StateMachine will get informed of the 89 : // TcpSession close independently and react to it. 90 : // 91 0 : void XmppSession::WriteReady(const boost::system::error_code &error) { 92 0 : if (error) 93 0 : return; 94 0 : manager_->EnqueueSession(this); 95 : } 96 : 97 30 : XmppSession::StatsPair XmppSession::Stats(unsigned int type) const { 98 30 : assert (type < (unsigned int)XmppStanza::RESERVED_STANZA); 99 30 : return stats_[type]; 100 : } 101 : 102 4164843 : void XmppSession::IncStats(unsigned int type, uint64_t bytes) { 103 4164843 : assert (type < (unsigned int)XmppStanza::RESERVED_STANZA); 104 4164843 : stats_[type].first++; 105 4164843 : stats_[type].second += bytes; 106 4164843 : } 107 : 108 6585 : boost::system::error_code XmppSession::EnableTcpKeepalive(int hold_time) { 109 6585 : char *keepalive_time_str = getenv("TCP_KEEPALIVE_SECONDS"); 110 6585 : if (keepalive_time_str) { 111 0 : hold_time = strtoul(keepalive_time_str, NULL, 0) * 3; 112 0 : if (!hold_time) 113 0 : return boost::system::error_code(); 114 : } 115 : 116 6585 : if (hold_time <= 9) { 117 0 : hold_time = 9; // min hold-time in secs. 118 : } 119 6585 : hold_time = ((hold_time > 18)? hold_time/2 : hold_time); 120 6585 : keepalive_idle_time_ = hold_time/3; 121 6585 : keepalive_interval_ = 122 6585 : ((hold_time - keepalive_idle_time_)/keepalive_probes_); 123 6585 : tcp_user_timeout_ = (hold_time * 1000); // msec 124 : 125 6585 : return (SetSocketKeepaliveOptions(keepalive_idle_time_, 126 : keepalive_interval_, 127 : keepalive_probes_, 128 6585 : tcp_user_timeout_)); 129 : } 130 : 131 1713320 : regex XmppSession::tag_to_pattern(const char *tag) { 132 1713320 : std::string token("</"); 133 1713307 : token += ++tag; 134 1713309 : token += "[\\s\\t\\r\\n]*>"; 135 : 136 3426647 : regex exp(token.c_str()); 137 3426646 : return exp; 138 1713323 : } 139 : 140 1177511 : void XmppSession::SetBuf(const std::string &str) { 141 1177511 : if (buf_.empty()) { 142 1124243 : ReplaceBuf(str); 143 : } else { 144 53272 : int pos = offset_ - buf_.begin(); 145 53272 : buf_ += str; 146 53272 : offset_ = buf_.begin() + pos; 147 : } 148 1177513 : } 149 : 150 4312411 : void XmppSession::ReplaceBuf(const std::string &str) { 151 4312411 : buf_ = str; 152 4312403 : buf_.reserve(kMaxMessageSize+8); 153 4312411 : offset_ = buf_.begin(); 154 4312412 : } 155 : 156 4164863 : bool XmppSession::LeftOver() const { 157 4164863 : if (buf_.empty()) 158 0 : return false; 159 4164863 : return (buf_.end() != offset_); 160 : } 161 : 162 : // Match a pattern in the buffer. Partially matched string is 163 : // kept in buf_ for use in conjucntion with next buffer read. 164 3410945 : int XmppSession::MatchRegex(const regex &patt) { 165 : 166 3410945 : std::string::const_iterator end = buf_.end(); 167 : 168 3410934 : if (regex_search(offset_, end, res_, patt, 169 3410934 : boost::match_default | boost::match_partial) == 0) { 170 47078 : return -1; 171 : } 172 3363856 : if(res_[0].matched == false) { 173 : // partial match 174 6206 : offset_ = res_[0].first; 175 6206 : return 1; 176 : } else { 177 3357648 : begin_tag_ = string(res_[0].first, res_[0].second); 178 3357651 : offset_ = res_[0].second; 179 3357648 : return 0; 180 : } 181 : } 182 : 183 4365658 : bool XmppSession::Match(Buffer buffer, int *result, bool NewBuf) { 184 4365658 : const XmppConnection *connection = this->Connection(); 185 : 186 4365658 : if (connection == NULL) { 187 0 : return true; 188 : } 189 : 190 4365658 : xmsm::XmState state = connection->GetStateMcState(); 191 : xmsm::XmOpenConfirmState oc_state = 192 4365645 : connection->GetStateMcOpenConfirmState(); 193 : 194 4365640 : if (NewBuf) { 195 1177485 : const uint8_t *cp = BufferData(buffer); 196 : // TODO Avoid this copy 197 1177481 : std::string str(cp, cp + BufferSize(buffer)); 198 1177512 : XmppSession::SetBuf(str); 199 1177510 : } 200 : 201 4365664 : int m = -1; 202 4365664 : *result = 0; 203 : do { 204 6044497 : if (!tag_known_) { 205 : // check for whitespaces 206 4312565 : size_t pos = buf_.find_first_not_of(sXMPP_VALIDWS); 207 4312574 : if (pos != 0) { 208 2633577 : if (pos == string::npos) pos = buf_.size(); 209 2633580 : offset_ = buf_.begin() + pos; 210 2633585 : return false; 211 : } 212 : } 213 : 214 3410929 : if (state == xmsm::ACTIVE || state == xmsm::IDLE) { 215 13032 : m = MatchRegex(tag_known_ ? stream_res_end_:stream_patt_); 216 3397897 : } else if (state == xmsm::CONNECT || state == xmsm::OPENSENT) { 217 : // Note, these are client only states 218 14928 : if (!stream_open_matched_) { 219 12562 : m = MatchRegex(tag_known_ ? stream_res_end_:stream_patt_); 220 12562 : if ((m == 0) && (tag_known_)) { 221 6281 : stream_open_matched_ = true; 222 : } 223 : } else { 224 2366 : m = MatchRegex(tag_known_ ? tag_to_pattern(begin_tag_.c_str()): 225 : stream_features_patt_); 226 : } 227 3382969 : } else if ((state == xmsm::OPENCONFIRM) && !(IsSslDisabled())) { 228 12184 : if (connection->IsClient()) { 229 6364 : if (oc_state == xmsm::OPENCONFIRM_FEATURE_NEGOTIATION) { 230 2910 : m = MatchRegex(tag_known_ ? end_patt_: proceed_patt_); 231 2910 : if ((m == 0) && (tag_known_)) { 232 : // set the flag, as we do not want OnRead function to 233 : // read any more data from basic socket. 234 1455 : SetSslHandShakeInProgress(true); 235 : } 236 3454 : } else if (oc_state == xmsm::OPENCONFIRM_FEATURE_SUCCESS) { 237 2910 : m = MatchRegex(tag_known_ ? stream_res_end_:stream_patt_); 238 : } else { 239 544 : m = MatchRegex(tag_known_ ? tag_to_pattern(begin_tag_.c_str()): 240 : stream_features_patt_); 241 : } 242 : } else { 243 5820 : if (oc_state == xmsm::OPENCONFIRM_FEATURE_SUCCESS) { 244 2910 : m = MatchRegex(tag_known_ ? stream_res_end_:stream_patt_); 245 : } else { 246 2910 : m = MatchRegex(tag_known_ ? end_patt_:starttls_patt_); 247 2910 : if ((m == 0) && (tag_known_)) { 248 1455 : SetSslHandShakeInProgress(true); 249 : } 250 : } 251 : } 252 3370786 : } else if (state == xmsm::OPENCONFIRM || state == xmsm::ESTABLISHED) { 253 3370796 : m = MatchRegex(tag_known_ ? tag_to_pattern(begin_tag_.c_str()):patt_); 254 : } 255 : 256 3410939 : if (m == 0) { // full match 257 3357659 : *result = 0; 258 3357659 : tag_known_ ^= 1; 259 3357659 : if (!tag_known_) { 260 : // Found well formed xml 261 1678826 : return false; 262 : } 263 53280 : } else if (m == -1) { // no match 264 47076 : return true; 265 : } else { 266 6204 : return true; // partial. read more 267 : } 268 1678833 : } while (true); 269 : 270 : return true; 271 : } 272 : 273 : // Read the socket stream and send messages to the connection object. 274 : // The buffer is copied to local string for regex match. 275 : // TODO Code need to change st Match() is done on buffer itself. 276 1177572 : void XmppSession::OnRead(Buffer buffer) { 277 1177572 : if (this->Connection() == NULL || !connection_) { 278 : // Connection is deleted. Session is being deleted as well 279 : // Drop the packet. 280 82 : ReleaseBuffer(buffer); 281 82 : return; 282 : } 283 : 284 1177487 : if (connection_->disable_read()) { 285 0 : ReleaseBuffer(buffer); 286 : 287 : // Reset the hold timer as we did receive some thing from the peer 288 0 : connection_->state_machine()->StartHoldTimer(); 289 0 : return; 290 : } 291 : 292 1177491 : int result = 0; 293 1177491 : bool more = Match(buffer, &result, true); 294 : do { 295 4365683 : if (more == false) { 296 4312403 : if (result < 0) { 297 : // TODO generate error, close connection. 298 147539 : break; 299 : } 300 : 301 : // We got good match. Process the message 302 4312403 : std::string::const_iterator st = buf_.begin(); 303 4312395 : std::string xml = string(st, offset_); 304 : // Ensure we have not reached the end 305 4312384 : if (buf_.begin() == offset_) { // xml.size() == 0 306 147541 : buf_.clear(); 307 147541 : break; 308 : } 309 : 310 4164845 : connection_->ReceiveMsg(this, xml); 311 : 312 4312407 : } else { 313 : // Read more data. Either we have partial match 314 : // or no match but in this state we need to keep 315 : // reading data. 316 53280 : break; 317 : } 318 : 319 4164864 : if (LeftOver()) { 320 3188171 : std::string::const_iterator st = buf_.end(); 321 3188167 : ReplaceBuf(string(offset_, st)); 322 3188171 : more = Match(buffer, &result, false); 323 : } else { 324 : // No more data in the Buffer 325 976684 : buf_.clear(); 326 976693 : break; 327 : } 328 3188166 : } while (true); 329 : 330 1177512 : ReleaseBuffer(buffer); 331 1177524 : return; 332 : }