Line data Source code
1 : /*
2 : * Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "ksync_sock.h"
6 :
7 : #include <boost/asio.hpp>
8 : #include <boost/bind/bind.hpp>
9 : #include <errno.h>
10 :
11 : class AgentParam;
12 :
13 : using namespace boost::asio;
14 : using namespace boost::placeholders;
15 :
16 : /////////////////////////////////////////////////////////////////////////////
17 : // KSyncSockUds routines
18 : /////////////////////////////////////////////////////////////////////////////
19 : //Unix domain socket class for interacting with dpdk vrouter
20 :
21 : class KSyncSockUdsReadTask : public Task {
22 : public:
23 0 : KSyncSockUdsReadTask(TaskScheduler *scheduler, KSyncSockUds *queue) :
24 0 : Task(scheduler->GetTaskId("Ksync::KSyncSockUdsRead"), 0), queue_(queue) {
25 0 : }
26 0 : ~KSyncSockUdsReadTask() {
27 0 : }
28 :
29 0 : bool Run() {
30 0 : queue_->Run();
31 0 : return true;
32 : }
33 0 : std::string Description() const { return "KSyncSockUdsRead"; }
34 : private:
35 : KSyncSockUds *queue_;
36 :
37 : };
38 :
39 :
40 :
41 : string KSyncSockUds::sockpath_= KSYNC_AGENT_VROUTER_SOCK_PATH;
42 :
43 0 : KSyncSockUds::KSyncSockUds(boost::asio::io_context &ios) :
44 0 : sock_(ios),
45 0 : server_ep_(sockpath_),
46 0 : rx_buff_(NULL),
47 0 : rx_buff_q_(NULL),
48 0 : remain_(0),
49 0 : socket_(0),
50 0 : connected_(false) {
51 0 : boost::system::error_code ec;
52 0 : reset_use_wait_tree();
53 0 : set_process_data_inline();
54 0 : retry:;
55 0 : sock_.connect(server_ep_, ec);
56 0 : if (ec) {
57 0 : sleep(1);
58 0 : goto retry;
59 : }
60 0 : socket_ = sock_.native_handle();
61 0 : connected_ = true;
62 0 : rx_buff_ = new char[10*kBufLen];
63 0 : rx_buff_q_ = new char[10*kBufLen];
64 0 : }
65 :
66 0 : bool KSyncSockUds::Run() {
67 0 : AgentSandeshContext *ctxt = KSyncSock::GetAgentSandeshContext(0);
68 0 : char *ret_buff = new char[1024*kBufLen];
69 0 : boost::system::error_code ec;
70 :
71 : // Read data from the socket and append it to the existing
72 : // unprocessed data in the local buffer.
73 : while (1) {
74 0 : char *bufp = rx_buff_;
75 0 : struct nlmsghdr *nlh = NULL;
76 : struct nlmsghdr tnlh;
77 0 : size_t offset = 0;
78 : int ret_val;
79 0 : size_t bytes_transferred = 0;
80 0 : bytes_transferred = ret_val = recv(socket_, rx_buff_, 10*kBufLen, 0);
81 0 : if (ret_val == 0) {
82 : // connection reset by peer
83 : // close socket and exit
84 0 : sock_.close(ec);
85 0 : LOG(INFO, " dpdk vrouter is down, exiting.. errno:" << errno);
86 0 : exit(0);
87 : }
88 0 : if (ret_val < 0) {
89 0 : if (errno != EAGAIN) {
90 0 : sock_.close(ec);
91 0 : connected_ = false;
92 0 : retry:;
93 0 : sock_.connect(server_ep_, ec);
94 0 : if (ec) {
95 0 : sleep(1);
96 0 : goto retry;
97 : }
98 0 : socket_ = sock_.native_handle();
99 0 : connected_ = true;
100 : }
101 0 : continue;
102 : }
103 :
104 0 : if (remain_ != 0) {
105 0 : if (remain_ < sizeof(struct nlmsghdr)) {
106 0 : memcpy((char *)&tnlh, rx_buff_q_, remain_);
107 0 : memcpy(((char *)&tnlh) + remain_, rx_buff_,
108 0 : (sizeof(struct nlmsghdr) - remain_));
109 0 : nlh = &tnlh;
110 : } else {
111 0 : nlh = (struct nlmsghdr *)rx_buff_q_;
112 : }
113 0 : if (remain_ > nlh->nlmsg_len)
114 0 : assert(0);
115 0 : memcpy(ret_buff, rx_buff_q_, remain_);
116 0 : memcpy(ret_buff+remain_, rx_buff_, nlh->nlmsg_len - remain_);
117 0 : bufp += (nlh->nlmsg_len - remain_);
118 0 : ctxt->SetErrno(0);
119 0 : ProcessDataInline(ret_buff);
120 0 : offset = nlh->nlmsg_len - remain_;
121 : }
122 0 : while (offset < bytes_transferred) {
123 0 : if ((bytes_transferred - offset) > (sizeof(struct nlmsghdr))) {
124 0 : nlh = (struct nlmsghdr *)(rx_buff_ + offset);
125 0 : if ((bytes_transferred - offset) > nlh->nlmsg_len) {
126 0 : memcpy(ret_buff, rx_buff_ + offset, nlh->nlmsg_len);
127 0 : ctxt->SetErrno(0);
128 0 : ProcessDataInline(ret_buff);
129 0 : offset += nlh->nlmsg_len;
130 : } else {
131 0 : break;
132 : }
133 : } else {
134 0 : break;
135 : }
136 : }
137 0 : memcpy(rx_buff_q_, rx_buff_ + offset, bytes_transferred - offset);
138 0 : remain_ = bytes_transferred - offset;
139 0 : }
140 : return true;
141 : }
142 :
143 0 : void KSyncSockUds::Init(io_service &ios, const std::string &cpu_pin_policy,
144 : const std::string &sockpathvr) {
145 0 : KSyncSock::SetSockTableEntry(new KSyncSockUds(ios));
146 0 : SetNetlinkFamilyId(10);
147 0 : KSyncSock::Init(false, cpu_pin_policy);
148 0 : sockpath_ = sockpathvr;
149 0 : }
150 :
151 0 : uint32_t KSyncSockUds::GetSeqno(char *data) {
152 0 : return GetNetlinkSeqno(data);
153 : }
154 :
155 0 : bool KSyncSockUds::IsMoreData(char *data) {
156 0 : return NetlinkMsgDone(data);
157 : }
158 :
159 0 : bool KSyncSockUds::Decoder(char *data, AgentSandeshContext *context) {
160 0 : KSyncSockNetlink::NetlinkDecoder(data, context);
161 0 : return true;
162 : }
163 :
164 0 : bool KSyncSockUds::BulkDecoder(char *data,
165 : KSyncBulkSandeshContext *bulk_sandesh_context) {
166 : // Get sandesh buffer and buffer-length
167 0 : uint32_t buf_len = 0;
168 0 : char *buf = NULL;
169 0 : GetNetlinkPayload(data, &buf, &buf_len);
170 0 : return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
171 : }
172 :
173 0 : void KSyncSockUds::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
174 : HandlerCb cb) {
175 0 : if (connected_ == true)
176 0 : SendTo(iovec, seq_no);
177 0 : }
178 :
179 0 : size_t KSyncSockUds::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
180 0 : size_t len = 0, ret;
181 : struct msghdr msg;
182 0 : struct iovec iov[max_bulk_msg_count_*2];
183 : int i;
184 :
185 0 : memset(&msg, 0, sizeof(msg));
186 0 : msg.msg_iov = iov;
187 :
188 0 : ResetNetlink(nl_client_);
189 0 : int offset = nl_client_->cl_buf_offset;
190 0 : UpdateNetlink(nl_client_, bulk_buf_size_, seq_no);
191 :
192 0 : KSyncBufferList::iterator it = iovec->begin();
193 0 : iovec->insert(it, buffer((char *)nl_client_->cl_buf, offset));
194 :
195 0 : int count = iovec->size();
196 0 : for(i = 0; i < count; i++) {
197 0 : mutable_buffers_1 buf = iovec->at(i);
198 0 : size_t buf_size = boost::asio::buffer_size(buf);
199 0 : void* cbuf = boost::asio::buffer_cast<void*>(buf);
200 0 : len += buf_size;
201 0 : iov[i].iov_base = cbuf;
202 0 : iov[i].iov_len = buf_size;
203 : }
204 :
205 0 : msg.msg_iovlen = i;
206 0 : ret = sendmsg(socket_, &msg, 0);
207 0 : if (ret != len) {
208 0 : LOG(ERROR, "sendmsg failure " << ret << "len " << len);
209 : }
210 0 : return len;
211 0 : }
212 :
213 0 : bool KSyncSockUds::Validate(char *data) {
214 0 : return true;
215 : }
216 :
217 0 : void KSyncSockUds::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
218 : static int started = 0;
219 0 : if (!started) {
220 0 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
221 : //Receive is handled in a separate Run() thread
222 0 : KSyncSockUdsReadTask *task = new KSyncSockUdsReadTask(scheduler, this);
223 0 : scheduler->Enqueue(task);
224 0 : started = 1;
225 : }
226 0 : }
227 :
228 0 : void KSyncSockUds::Receive(mutable_buffers_1 buf) {
229 0 : boost::system::error_code ec;
230 0 : uint32_t bytes_read = 0;
231 0 : const struct nlmsghdr *nlh = NULL;
232 :
233 0 : char *netlink_header(buffer_cast<char *>(buf));
234 :
235 0 : while (bytes_read < sizeof(struct nlmsghdr)) {
236 0 : char *buffer = netlink_header + bytes_read;
237 0 : bytes_read += recv(socket_, buffer, sizeof(struct nlmsghdr) - bytes_read, 0);
238 : //Data read is lesser than netlink header
239 : //continue reading
240 0 : if (bytes_read == sizeof(struct nlmsghdr)) {
241 0 : nlh = buffer_cast<struct nlmsghdr *>(buf);
242 : }
243 : }
244 :
245 0 : if (nlh->nlmsg_type == NLMSG_ERROR) {
246 0 : LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
247 : << " len " << nlh->nlmsg_len);
248 0 : assert(0);
249 : }
250 :
251 0 : bytes_read = 0;
252 0 : uint32_t payload_size = nlh->nlmsg_len - sizeof(struct nlmsghdr);
253 0 : char *data(buffer_cast<char *>(buf + sizeof(struct nlmsghdr)));
254 :
255 0 : while (bytes_read < payload_size) {
256 0 : char *buffer = data + bytes_read;
257 0 : bytes_read += recv(socket_, buffer, payload_size - bytes_read, 0);
258 : }
259 0 : }
|