Line data Source code
1 : /*
2 : * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3 : */
4 : #include <base/address_util.h>
5 : #include <boost/functional/hash.hpp>
6 : #include <init/agent_param.h>
7 : #include <cmn/agent_stats.h>
8 : #include <oper/agent_profile.h>
9 : #include <vrouter/ksync/flowtable_ksync.h>
10 : #include <vrouter/ksync/ksync_init.h>
11 : #include <vrouter/ksync/ksync_flow_index_manager.h>
12 : #include "vrouter/flow_stats/flow_stats_collector.h"
13 : #include "flow_proto.h"
14 : #include "flow_mgmt.h"
15 : #include "flow_event.h"
16 :
17 : //////////////////////////////////////////////////////////////////////////////
18 : // FlowEventQueue routines
19 : //////////////////////////////////////////////////////////////////////////////
20 10 : FlowEventQueueBase::FlowEventQueueBase(FlowProto *proto,
21 : const std::string &name,
22 : uint32_t task_id, int task_instance,
23 : FlowTokenPool *pool,
24 : uint16_t latency_limit,
25 10 : uint32_t max_iterations) :
26 10 : flow_proto_(proto), token_pool_(pool), task_start_(0), count_(0),
27 10 : events_processed_(0), latency_limit_(latency_limit) {
28 20 : queue_ = new Queue(task_id, task_instance,
29 : boost::bind(&FlowEventQueueBase::Handler, this, _1),
30 10 : Queue::kMaxSize, max_iterations);
31 : char buff[100];
32 10 : sprintf(buff, "%s-%d", name.c_str(), task_instance);
33 10 : queue_->set_name(buff);
34 10 : if (token_pool_)
35 8 : queue_->SetStartRunnerFunc(boost::bind(&FlowEventQueueBase::TokenCheck,
36 : this));
37 10 : queue_->set_measure_busy_time(proto->agent()->MeasureQueueDelay());
38 10 : if (latency_limit_) {
39 0 : queue_->SetEntryCallback(boost::bind(&FlowEventQueueBase::TaskEntry,
40 : this));
41 0 : queue_->SetExitCallback(boost::bind(&FlowEventQueueBase::TaskExit,
42 : this, _1));
43 : }
44 10 : }
45 :
46 10 : FlowEventQueueBase::~FlowEventQueueBase() {
47 10 : delete queue_;
48 10 : }
49 :
50 10 : void FlowEventQueueBase::Shutdown() {
51 10 : queue_->Shutdown();
52 10 : }
53 :
54 1046 : void FlowEventQueueBase::Enqueue(FlowEvent *event) {
55 1046 : if (CanEnqueue(event) == false) {
56 47 : delete event;
57 47 : return;
58 : }
59 999 : queue_->Enqueue(event);
60 : }
61 :
62 286 : bool FlowEventQueueBase::TokenCheck() {
63 286 : return flow_proto_->TokenCheck(token_pool_);
64 : }
65 :
66 0 : bool FlowEventQueueBase::TaskEntry() {
67 0 : count_ = 0;
68 0 : task_start_ = ClockMonotonicUsec();
69 0 : getrusage(RUSAGE_THREAD, &rusage_);
70 0 : return true;
71 : }
72 :
73 0 : void FlowEventQueueBase::TaskExit(bool done) {
74 0 : if (task_start_ == 0)
75 0 : return;
76 :
77 0 : uint64_t t = ClockMonotonicUsec();
78 0 : if (((t - task_start_) / 1000) >= latency_limit_) {
79 : struct rusage r;
80 0 : getrusage(RUSAGE_THREAD, &r);
81 :
82 0 : uint32_t user = (r.ru_utime.tv_sec - rusage_.ru_utime.tv_sec) * 1000;
83 0 : user += ((r.ru_utime.tv_usec - rusage_.ru_utime.tv_usec) / 1000);
84 :
85 0 : uint32_t sys = (r.ru_stime.tv_sec - rusage_.ru_stime.tv_sec) * 1000;
86 0 : sys += ((r.ru_stime.tv_usec - rusage_.ru_stime.tv_usec) / 1000);
87 :
88 0 : LOG(ERROR, queue_->Description()
89 : << " Time exceeded " << ((t - task_start_) / 1000)
90 : << " Count " << count_
91 : << " User " << user << " Sys " << sys);
92 : }
93 0 : return;
94 : }
95 :
96 998 : bool FlowEventQueueBase::Handler(FlowEvent *event) {
97 998 : std::unique_ptr<FlowEvent> event_ptr(event);
98 997 : count_++;
99 997 : if (CanProcess(event) == false) {
100 26 : ProcessDone(event, false);
101 26 : return true;
102 : }
103 :
104 972 : HandleEvent(event);
105 :
106 973 : ProcessDone(event, true);
107 973 : return true;
108 999 : }
109 :
110 1046 : bool FlowEventQueueBase::CanEnqueue(FlowEvent *event) {
111 1046 : FlowEntry *flow = event->flow();
112 1046 : bool ret = true;
113 1046 : switch (event->event()) {
114 :
115 58 : case FlowEvent::DELETE_DBENTRY:
116 : case FlowEvent::DELETE_FLOW: {
117 58 : std::scoped_lock mutext(flow->mutex());
118 58 : ret = flow->GetPendingAction()->SetDelete();
119 58 : break;
120 58 : }
121 :
122 : // lock already token for the flow
123 27 : case FlowEvent::FLOW_MESSAGE: {
124 27 : ret = flow->GetPendingAction()->SetRecompute();
125 27 : break;
126 : }
127 :
128 71 : case FlowEvent::RECOMPUTE_FLOW: {
129 71 : std::scoped_lock mutext(flow->mutex());
130 71 : ret = flow->GetPendingAction()->SetRecomputeDBEntry();
131 71 : break;
132 71 : }
133 :
134 15 : case FlowEvent::REVALUATE_DBENTRY: {
135 15 : std::scoped_lock mutext(flow->mutex());
136 15 : ret = flow->GetPendingAction()->SetRevaluate();
137 15 : break;
138 15 : }
139 :
140 875 : default:
141 875 : break;
142 : }
143 :
144 1046 : return ret;
145 : }
146 :
147 997 : bool FlowEventQueueBase::CanProcess(FlowEvent *event) {
148 997 : FlowEntry *flow = event->flow();
149 997 : bool ret = true;
150 997 : switch (event->event()) {
151 :
152 37 : case FlowEvent::DELETE_DBENTRY:
153 : case FlowEvent::DELETE_FLOW: {
154 37 : std::scoped_lock mutext(flow->mutex());
155 37 : events_processed_++;
156 37 : ret = flow->GetPendingAction()->CanDelete();
157 37 : break;
158 37 : }
159 :
160 25 : case FlowEvent::FLOW_MESSAGE: {
161 25 : std::scoped_lock mutext(flow->mutex());
162 25 : events_processed_++;
163 25 : ret = flow->GetPendingAction()->CanRecompute();
164 25 : break;
165 25 : }
166 :
167 59 : case FlowEvent::RECOMPUTE_FLOW: {
168 59 : std::scoped_lock mutext(flow->mutex());
169 59 : events_processed_++;
170 59 : ret = flow->GetPendingAction()->CanRecomputeDBEntry();
171 59 : break;
172 59 : }
173 :
174 3 : case FlowEvent::REVALUATE_DBENTRY: {
175 3 : events_processed_++;
176 3 : std::scoped_lock mutext(flow->mutex());
177 3 : ret = flow->GetPendingAction()->CanRevaluate();
178 3 : break;
179 3 : }
180 :
181 874 : default:
182 874 : break;
183 : }
184 :
185 998 : return ret;
186 : }
187 :
188 999 : void FlowEventQueueBase::ProcessDone(FlowEvent *event, bool update_rev_flow) {
189 999 : FlowEntry *flow = event->flow();
190 999 : FlowEntry *rflow = NULL;
191 999 : if (flow && update_rev_flow)
192 142 : rflow = flow->reverse_flow_entry();
193 :
194 999 : switch (event->event()) {
195 :
196 37 : case FlowEvent::DELETE_DBENTRY:
197 : case FlowEvent::DELETE_FLOW: {
198 74 : FLOW_LOCK(flow, rflow, event->event());
199 37 : flow->GetPendingAction()->ResetDelete();
200 37 : if (rflow)
201 0 : rflow->GetPendingAction()->ResetDelete();
202 37 : break;
203 37 : }
204 :
205 25 : case FlowEvent::FLOW_MESSAGE: {
206 25 : FLOW_LOCK(flow, rflow, event->event());
207 25 : flow->GetPendingAction()->ResetRecompute();
208 25 : if (rflow)
209 25 : rflow->GetPendingAction()->ResetRecompute();
210 25 : break;
211 25 : }
212 :
213 59 : case FlowEvent::RECOMPUTE_FLOW: {
214 59 : std::scoped_lock mutext(flow->mutex());
215 59 : flow->GetPendingAction()->ResetRecomputeDBEntry();
216 59 : break;
217 59 : }
218 :
219 3 : case FlowEvent::REVALUATE_DBENTRY: {
220 3 : FLOW_LOCK(flow, rflow, event->event());
221 3 : flow->GetPendingAction()->ResetRevaluate();
222 3 : if (rflow)
223 3 : rflow->GetPendingAction()->ResetRevaluate();
224 3 : break;
225 3 : }
226 :
227 875 : default:
228 875 : break;
229 : }
230 :
231 999 : return;
232 : }
233 :
234 4 : FlowEventQueue::FlowEventQueue(Agent *agent, FlowProto *proto,
235 : FlowTable *table, FlowTokenPool *pool,
236 : uint16_t latency_limit,
237 4 : uint32_t max_iterations) :
238 : FlowEventQueueBase(proto, "Flow Event Queue",
239 8 : agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
240 4 : table->table_index(), pool, latency_limit,
241 : max_iterations),
242 8 : flow_table_(table) {
243 4 : }
244 :
245 8 : FlowEventQueue::~FlowEventQueue() {
246 8 : }
247 :
248 756 : bool FlowEventQueue::HandleEvent(FlowEvent *event) {
249 756 : return flow_proto_->FlowEventHandler(event, flow_table_);
250 : }
251 :
252 2 : DeleteFlowEventQueue::DeleteFlowEventQueue(Agent *agent, FlowProto *proto,
253 : FlowTable *table,
254 : FlowTokenPool *pool,
255 : uint16_t latency_limit,
256 2 : uint32_t max_iterations) :
257 : FlowEventQueueBase(proto, "Flow Delete Queue",
258 4 : agent->task_scheduler()->GetTaskId(kTaskFlowDelete),
259 2 : table->table_index(), pool, latency_limit,
260 : max_iterations),
261 4 : flow_table_(table) {
262 2 : }
263 :
264 4 : DeleteFlowEventQueue::~DeleteFlowEventQueue() {
265 4 : }
266 :
267 0 : bool DeleteFlowEventQueue::HandleEvent(FlowEvent *event) {
268 0 : return flow_proto_->FlowDeleteHandler(event, flow_table_);
269 : }
270 :
271 2 : KSyncFlowEventQueue::KSyncFlowEventQueue(Agent *agent, FlowProto *proto,
272 : FlowTable *table,
273 : FlowTokenPool *pool,
274 : uint16_t latency_limit,
275 2 : uint32_t max_iterations) :
276 : FlowEventQueueBase(proto, "Flow KSync Queue",
277 4 : agent->task_scheduler()->GetTaskId(kTaskFlowKSync),
278 2 : table->table_index(), pool, latency_limit,
279 : max_iterations),
280 4 : flow_table_(table) {
281 2 : }
282 :
283 4 : KSyncFlowEventQueue::~KSyncFlowEventQueue() {
284 4 : }
285 :
286 144 : bool KSyncFlowEventQueue::HandleEvent(FlowEvent *event) {
287 144 : return flow_proto_->FlowKSyncMsgHandler(event, flow_table_);
288 : }
289 :
290 2 : UpdateFlowEventQueue::UpdateFlowEventQueue(Agent *agent, FlowProto *proto,
291 : FlowTokenPool *pool,
292 : uint16_t latency_limit,
293 2 : uint32_t max_iterations) :
294 : FlowEventQueueBase(proto, "Flow Update Queue",
295 4 : agent->task_scheduler()->GetTaskId(kTaskFlowUpdate), 0,
296 4 : pool, latency_limit, max_iterations) {
297 2 : }
298 :
299 2 : UpdateFlowEventQueue::~UpdateFlowEventQueue() {
300 2 : }
301 :
302 73 : bool UpdateFlowEventQueue::HandleEvent(FlowEvent *event) {
303 73 : return flow_proto_->FlowUpdateHandler(event);
304 : }
|