Line data Source code
1 : /*
2 : * Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "ksync_sock.h"
6 :
7 : #include <boost/asio.hpp>
8 : #include <boost/bind/bind.hpp>
9 :
10 : using namespace boost::asio;
11 : using namespace boost::placeholders;
12 :
13 : /////////////////////////////////////////////////////////////////////////////
14 : // KSyncSockTcp routines
15 : /////////////////////////////////////////////////////////////////////////////
16 : //TCP socket class for interacting with vrouter
17 0 : KSyncSockTcp::KSyncSockTcp(EventManager *evm,
18 0 : boost::asio::ip::address ip_address, int port) : TcpServer(evm), evm_(evm),
19 0 : session_(NULL), server_ep_(ip_address, port), connect_complete_(false) {
20 :
21 0 : reset_use_wait_tree();
22 0 : set_process_data_inline();
23 0 : if (rx_buff_ == NULL) {
24 0 : rx_buff_ = new char[kBufLen];
25 : }
26 0 : rx_buff_rem_ = new char[kBufLen];
27 0 : remain_ = 0;
28 :
29 0 : session_ = CreateSession();
30 0 : Connect(session_, server_ep_);
31 0 : }
32 :
33 0 : void KSyncSockTcp::Init(EventManager *evm, boost::asio::ip::address ip_addr,
34 : int port, const std::string &cpu_pin_policy) {
35 0 : KSyncSock::SetSockTableEntry(new KSyncSockTcp(evm, ip_addr, port));
36 0 : SetNetlinkFamilyId(10);
37 0 : KSyncSock::Init(false, cpu_pin_policy);
38 0 : }
39 :
40 0 : TcpSession* KSyncSockTcp::AllocSession(Socket *socket) {
41 0 : TcpSession *session = new KSyncSockTcpSession(this, socket, false);
42 0 : session->set_observer(boost::bind(&KSyncSockTcp::OnSessionEvent,
43 : this, _1, _2));
44 0 : return session;
45 : }
46 :
47 0 : uint32_t KSyncSockTcp::GetSeqno(char *data) {
48 0 : return GetNetlinkSeqno(data);
49 : }
50 :
51 0 : bool KSyncSockTcp::IsMoreData(char *data) {
52 0 : return NetlinkMsgDone(data);
53 : }
54 :
55 0 : size_t KSyncSockTcp::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
56 0 : size_t len = 0, ret;
57 : struct msghdr msg;
58 0 : struct iovec iov[max_bulk_msg_count_*2];
59 : int i, fd;
60 :
61 0 : memset(&msg, 0, sizeof(msg));
62 0 : msg.msg_iov = iov;
63 :
64 0 : ResetNetlink(nl_client_);
65 0 : int offset = nl_client_->cl_buf_offset;
66 0 : UpdateNetlink(nl_client_, bulk_buf_size_, seq_no);
67 :
68 0 : KSyncBufferList::iterator it = iovec->begin();
69 0 : iovec->insert(it, buffer((char *)nl_client_->cl_buf, offset));
70 :
71 0 : int count = iovec->size();
72 0 : for(i = 0; i < count; i++) {
73 0 : mutable_buffers_1 buf = iovec->at(i);
74 0 : size_t buf_size = boost::asio::buffer_size(buf);
75 0 : void* cbuf = boost::asio::buffer_cast<void*>(buf);
76 0 : len += buf_size;
77 0 : iov[i].iov_base = cbuf;
78 0 : iov[i].iov_len = buf_size;
79 : }
80 :
81 0 : msg.msg_iovlen = i;
82 0 : fd = tcp_socket_->native_handle();
83 0 : ret = sendmsg(fd, &msg, 0);
84 0 : if (ret != len) {
85 0 : LOG(ERROR, "sendmsg failure " << ret << "len " << len);
86 : }
87 0 : return len;
88 0 : }
89 :
90 0 : void KSyncSockTcp::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
91 : HandlerCb cb) {
92 0 : SendTo(iovec, seq_no);
93 0 : return;
94 : }
95 :
96 0 : bool KSyncSockTcp::Validate(char *data) {
97 0 : return ValidateNetlink(data);
98 : }
99 :
100 0 : bool KSyncSockTcp::Decoder(char *data, AgentSandeshContext *context) {
101 0 : KSyncSockNetlink::NetlinkDecoder(data, context);
102 0 : return true;
103 : }
104 :
105 0 : bool KSyncSockTcp::BulkDecoder(char *data,
106 : KSyncBulkSandeshContext *bulk_sandesh_context) {
107 : // Get sandesh buffer and buffer-length
108 0 : uint32_t buf_len = 0;
109 0 : char *buf = NULL;
110 0 : GetNetlinkPayload(data, &buf, &buf_len);
111 0 : return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
112 : }
113 :
114 0 : void KSyncSockTcp::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
115 : //Data would be read from ksync tcp session
116 : //hence no socket operation would be required
117 0 : }
118 :
119 0 : void KSyncSockTcp::Receive(mutable_buffers_1 buf) {
120 0 : uint32_t bytes_read = 0;
121 0 : boost::system::error_code ec;
122 0 : const struct nlmsghdr *nlh = NULL;
123 :
124 : //Create a buffer to read netlink header first
125 : mutable_buffers_1 netlink_header(buffer_cast<void *>(buf),
126 0 : sizeof(struct nlmsghdr));
127 :
128 0 : bool blocking_socket = session_->socket()->non_blocking();
129 0 : session_->socket()->non_blocking(false, ec);
130 0 : while (bytes_read < sizeof(struct nlmsghdr)) {
131 : mutable_buffers_1 buffer =
132 0 : static_cast<mutable_buffers_1>(netlink_header + bytes_read);
133 0 : bytes_read += session_->socket()->receive(buffer, 0, ec);
134 0 : if (ec.failed()) {
135 0 : assert(0);
136 : }
137 : //Data read is lesser than netlink header
138 : //continue reading
139 0 : if (bytes_read == sizeof(struct nlmsghdr)) {
140 0 : nlh = buffer_cast<struct nlmsghdr *>(buf);
141 : }
142 : }
143 :
144 0 : if (nlh->nlmsg_type == NLMSG_ERROR) {
145 0 : LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
146 : << " len " << nlh->nlmsg_len);
147 0 : assert(0);
148 : }
149 :
150 0 : bytes_read = 0;
151 0 : uint32_t payload_size = nlh->nlmsg_len - sizeof(struct nlmsghdr);
152 : //Read data
153 0 : mutable_buffers_1 data(buffer_cast<void *>(buf + sizeof(struct nlmsghdr)),
154 0 : payload_size);
155 :
156 0 : while (bytes_read < payload_size) {
157 : mutable_buffers_1 buffer =
158 0 : static_cast<mutable_buffers_1>(data + bytes_read);
159 0 : bytes_read += session_->socket()->receive(buffer, 0, ec);
160 0 : if (ec.failed()) {
161 0 : assert(0);
162 : }
163 : }
164 0 : session_->socket()->non_blocking(blocking_socket, ec);
165 0 : }
166 :
167 0 : bool KSyncSockTcp::ReceiveMsg(const u_int8_t *msg, size_t size) {
168 0 : AgentSandeshContext *ctxt = KSyncSock::GetAgentSandeshContext(0);
169 0 : ctxt->SetErrno(0);
170 0 : ProcessDataInline((char *) msg);
171 0 : return true;
172 : }
173 :
174 0 : bool KSyncSockTcp::Run() {
175 0 : AgentSandeshContext *ctxt = KSyncSock::GetAgentSandeshContext(0);
176 0 : int fd = tcp_socket_->native_handle();
177 :
178 : while (1) {
179 0 : char *bufp = rx_buff_;
180 0 : struct nlmsghdr *nlh = NULL;
181 : struct nlmsghdr tnlh;
182 0 : int offset = 0;
183 0 : int bytes_transferred = 0;
184 :
185 0 : bytes_transferred = recv(fd, rx_buff_, kBufLen, 0);
186 0 : if (bytes_transferred <= 0) {
187 0 : LOG(ERROR, "Connection to dpdk-vrouter lost.");
188 0 : sleep(10);
189 0 : exit(EXIT_FAILURE);
190 : }
191 :
192 0 : if (remain_ != 0) {
193 0 : if (remain_ < sizeof(struct nlmsghdr)) {
194 0 : memcpy((char *)&tnlh, rx_buff_rem_, remain_);
195 0 : memcpy(((char *)&tnlh) + remain_, rx_buff_,
196 0 : (sizeof(struct nlmsghdr) - remain_));
197 0 : nlh = &tnlh;
198 : } else {
199 0 : nlh = (struct nlmsghdr *)rx_buff_rem_;
200 : }
201 :
202 0 : if (remain_ > nlh->nlmsg_len)
203 0 : assert(0);
204 :
205 0 : memcpy(rx_buff_rem_+remain_, rx_buff_, nlh->nlmsg_len - remain_);
206 0 : bufp += (nlh->nlmsg_len - remain_);
207 0 : ctxt->SetErrno(0);
208 0 : ProcessDataInline(rx_buff_rem_);
209 0 : offset = nlh->nlmsg_len - remain_;
210 : }
211 :
212 0 : while (offset < bytes_transferred) {
213 0 : if ((unsigned int)(bytes_transferred - offset) > (sizeof(struct nlmsghdr))) {
214 0 : nlh = (struct nlmsghdr *)(rx_buff_ + offset);
215 0 : if ((unsigned int)(bytes_transferred - offset) > nlh->nlmsg_len) {
216 0 : ctxt->SetErrno(0);
217 0 : ProcessDataInline(rx_buff_ + offset);
218 0 : offset += nlh->nlmsg_len;
219 : } else {
220 0 : break;
221 : }
222 : } else {
223 0 : break;
224 : }
225 : }
226 :
227 0 : remain_ = bytes_transferred - offset;
228 0 : if (remain_) {
229 0 : memcpy(rx_buff_rem_, rx_buff_ + offset, bytes_transferred - offset);
230 : }
231 0 : }
232 :
233 : return true;
234 : }
235 :
236 : class KSyncSockTcpReadTask : public Task {
237 : public:
238 0 : KSyncSockTcpReadTask(TaskScheduler *scheduler, KSyncSockTcp *sock) :
239 0 : Task(scheduler->GetTaskId("Ksync::KSyncSockTcpRead"), 0), sock_(sock) {
240 0 : }
241 0 : ~KSyncSockTcpReadTask() {
242 0 : }
243 :
244 0 : bool Run() {
245 0 : sock_->Run();
246 0 : return true;
247 : }
248 0 : std::string Description() const { return "KSyncSockTcpRead"; }
249 : private:
250 : KSyncSockTcp *sock_;
251 :
252 : };
253 :
254 0 : void KSyncSockTcp::AsyncReadStart() {
255 : static int started = 0;
256 0 : boost::system::error_code ec;
257 :
258 0 : if (!started) {
259 0 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
260 0 : KSyncSockTcpReadTask *task = new KSyncSockTcpReadTask(scheduler, this);
261 0 : tcp_socket_->non_blocking(false, ec);
262 0 : scheduler->Enqueue(task);
263 0 : started = 1;
264 : }
265 0 : }
266 :
267 0 : void KSyncSockTcp::OnSessionEvent(TcpSession *session,
268 : TcpSession::Event event) {
269 0 : switch (event) {
270 0 : case TcpSession::CONNECT_FAILED:
271 : //Retry
272 0 : Connect(session_, server_ep_);
273 0 : break;
274 0 : case TcpSession::CLOSE:
275 0 : LOG(ERROR, "Connection to dpdk-vrouter lost.");
276 0 : sleep(10);
277 0 : exit(EXIT_FAILURE);
278 : break;
279 0 : case TcpSession::CONNECT_COMPLETE:
280 0 : tcp_socket_ = session_->socket();
281 0 : connect_complete_ = true;
282 0 : session_->SetTcpNoDelay();
283 0 : session_->SetTcpSendBufSize(max_bulk_buf_size_*16);
284 0 : session_->SetTcpRecvBufSize(max_bulk_buf_size_*16);
285 0 : default:
286 0 : break;
287 : }
288 0 : }
289 :
290 : /////////////////////////////////////////////////////////////////////////////
291 : // KSyncSockTcpSession routines
292 : /////////////////////////////////////////////////////////////////////////////
293 0 : KSyncSockTcpSession::KSyncSockTcpSession(TcpServer *server, Socket *sock,
294 0 : bool async_ready) : TcpSession(server, sock, async_ready) {
295 0 : KSyncSockTcp *tcp_ptr = static_cast<KSyncSockTcp *>(server);
296 0 : reader_ = new KSyncSockTcpSessionReader(this,
297 0 : boost::bind(&KSyncSockTcp::ReceiveMsg, tcp_ptr, _1, _2));
298 0 : }
299 :
300 0 : KSyncSockTcpSession::~KSyncSockTcpSession() {
301 0 : if (reader_) {
302 0 : delete reader_;
303 : }
304 0 : }
305 :
306 0 : void KSyncSockTcpSession::OnRead(Buffer buffer) {
307 0 : reader_->OnRead(buffer);
308 0 : }
309 :
310 0 : KSyncSockTcpSessionReader::KSyncSockTcpSessionReader(
311 0 : TcpSession *session, ReceiveCallback callback) :
312 0 : TcpMessageReader(session, callback) {
313 0 : }
314 :
315 0 : int KSyncSockTcpSessionReader::MsgLength(Buffer buffer, int offset) {
316 0 : size_t size = TcpSession::BufferSize(buffer);
317 0 : int remain = size - offset;
318 0 : if (remain < GetHeaderLenSize()) {
319 0 : return -1;
320 : }
321 :
322 : //Byte ordering?
323 : const struct nlmsghdr *nlh =
324 0 : (const struct nlmsghdr *)(TcpSession::BufferData(buffer) + offset);
325 0 : return nlh->nlmsg_len;
326 : }
|