Line data Source code
1 : /*
2 : * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3 : */
4 : #include <base/address_util.h>
5 : #include <boost/functional/hash.hpp>
6 : #include <init/agent_param.h>
7 : #include <cmn/agent_stats.h>
8 : #include <oper/agent_profile.h>
9 : #include <vrouter/ksync/flowtable_ksync.h>
10 : #include <vrouter/ksync/ksync_init.h>
11 : #include <vrouter/ksync/ksync_flow_index_manager.h>
12 : #include "vrouter/flow_stats/flow_stats_collector.h"
13 : #include "flow_proto.h"
14 : #include "flow_mgmt.h"
15 : #include "flow_event.h"
16 :
17 : //////////////////////////////////////////////////////////////////////////////
18 : // FlowEventQueue routines
19 : //////////////////////////////////////////////////////////////////////////////
20 0 : FlowEventQueueBase::FlowEventQueueBase(FlowProto *proto,
21 : const std::string &name,
22 : uint32_t task_id, int task_instance,
23 : FlowTokenPool *pool,
24 : uint16_t latency_limit,
25 0 : uint32_t max_iterations) :
26 0 : flow_proto_(proto), token_pool_(pool), task_start_(0), count_(0),
27 0 : events_processed_(0), latency_limit_(latency_limit) {
28 0 : queue_ = new Queue(task_id, task_instance,
29 : boost::bind(&FlowEventQueueBase::Handler, this, _1),
30 0 : Queue::kMaxSize, max_iterations);
31 : char buff[100];
32 0 : sprintf(buff, "%s-%d", name.c_str(), task_instance);
33 0 : queue_->set_name(buff);
34 0 : if (token_pool_)
35 0 : queue_->SetStartRunnerFunc(boost::bind(&FlowEventQueueBase::TokenCheck,
36 : this));
37 0 : queue_->set_measure_busy_time(proto->agent()->MeasureQueueDelay());
38 0 : if (latency_limit_) {
39 0 : queue_->SetEntryCallback(boost::bind(&FlowEventQueueBase::TaskEntry,
40 : this));
41 0 : queue_->SetExitCallback(boost::bind(&FlowEventQueueBase::TaskExit,
42 : this, _1));
43 : }
44 0 : }
45 :
46 0 : FlowEventQueueBase::~FlowEventQueueBase() {
47 0 : delete queue_;
48 0 : }
49 :
50 0 : void FlowEventQueueBase::Shutdown() {
51 0 : queue_->Shutdown();
52 0 : }
53 :
54 0 : void FlowEventQueueBase::Enqueue(FlowEvent *event) {
55 0 : if (CanEnqueue(event) == false) {
56 0 : delete event;
57 0 : return;
58 : }
59 0 : queue_->Enqueue(event);
60 : }
61 :
62 0 : bool FlowEventQueueBase::TokenCheck() {
63 0 : return flow_proto_->TokenCheck(token_pool_);
64 : }
65 :
66 0 : bool FlowEventQueueBase::TaskEntry() {
67 0 : count_ = 0;
68 0 : task_start_ = ClockMonotonicUsec();
69 0 : getrusage(RUSAGE_THREAD, &rusage_);
70 0 : return true;
71 : }
72 :
73 0 : void FlowEventQueueBase::TaskExit(bool done) {
74 0 : if (task_start_ == 0)
75 0 : return;
76 :
77 0 : uint64_t t = ClockMonotonicUsec();
78 0 : if (((t - task_start_) / 1000) >= latency_limit_) {
79 : struct rusage r;
80 0 : getrusage(RUSAGE_THREAD, &r);
81 :
82 0 : uint32_t user = (r.ru_utime.tv_sec - rusage_.ru_utime.tv_sec) * 1000;
83 0 : user += ((r.ru_utime.tv_usec - rusage_.ru_utime.tv_usec) / 1000);
84 :
85 0 : uint32_t sys = (r.ru_stime.tv_sec - rusage_.ru_stime.tv_sec) * 1000;
86 0 : sys += ((r.ru_stime.tv_usec - rusage_.ru_stime.tv_usec) / 1000);
87 :
88 0 : LOG(ERROR, queue_->Description()
89 : << " Time exceeded " << ((t - task_start_) / 1000)
90 : << " Count " << count_
91 : << " User " << user << " Sys " << sys);
92 : }
93 0 : return;
94 : }
95 :
96 0 : bool FlowEventQueueBase::Handler(FlowEvent *event) {
97 0 : std::unique_ptr<FlowEvent> event_ptr(event);
98 0 : count_++;
99 0 : if (CanProcess(event) == false) {
100 0 : ProcessDone(event, false);
101 0 : return true;
102 : }
103 :
104 0 : HandleEvent(event);
105 :
106 0 : ProcessDone(event, true);
107 0 : return true;
108 0 : }
109 :
110 0 : bool FlowEventQueueBase::CanEnqueue(FlowEvent *event) {
111 0 : FlowEntry *flow = event->flow();
112 0 : bool ret = true;
113 0 : switch (event->event()) {
114 :
115 0 : case FlowEvent::DELETE_DBENTRY:
116 : case FlowEvent::DELETE_FLOW: {
117 0 : std::scoped_lock mutext(flow->mutex());
118 0 : ret = flow->GetPendingAction()->SetDelete();
119 0 : break;
120 0 : }
121 :
122 : // lock already token for the flow
123 0 : case FlowEvent::FLOW_MESSAGE: {
124 0 : ret = flow->GetPendingAction()->SetRecompute();
125 0 : break;
126 : }
127 :
128 0 : case FlowEvent::RECOMPUTE_FLOW: {
129 0 : std::scoped_lock mutext(flow->mutex());
130 0 : ret = flow->GetPendingAction()->SetRecomputeDBEntry();
131 0 : break;
132 0 : }
133 :
134 0 : case FlowEvent::REVALUATE_DBENTRY: {
135 0 : std::scoped_lock mutext(flow->mutex());
136 0 : ret = flow->GetPendingAction()->SetRevaluate();
137 0 : break;
138 0 : }
139 :
140 0 : default:
141 0 : break;
142 : }
143 :
144 0 : return ret;
145 : }
146 :
147 0 : bool FlowEventQueueBase::CanProcess(FlowEvent *event) {
148 0 : FlowEntry *flow = event->flow();
149 0 : bool ret = true;
150 0 : switch (event->event()) {
151 :
152 0 : case FlowEvent::DELETE_DBENTRY:
153 : case FlowEvent::DELETE_FLOW: {
154 0 : std::scoped_lock mutext(flow->mutex());
155 0 : events_processed_++;
156 0 : ret = flow->GetPendingAction()->CanDelete();
157 0 : break;
158 0 : }
159 :
160 0 : case FlowEvent::FLOW_MESSAGE: {
161 0 : std::scoped_lock mutext(flow->mutex());
162 0 : events_processed_++;
163 0 : ret = flow->GetPendingAction()->CanRecompute();
164 0 : break;
165 0 : }
166 :
167 0 : case FlowEvent::RECOMPUTE_FLOW: {
168 0 : std::scoped_lock mutext(flow->mutex());
169 0 : events_processed_++;
170 0 : ret = flow->GetPendingAction()->CanRecomputeDBEntry();
171 0 : break;
172 0 : }
173 :
174 0 : case FlowEvent::REVALUATE_DBENTRY: {
175 0 : events_processed_++;
176 0 : std::scoped_lock mutext(flow->mutex());
177 0 : ret = flow->GetPendingAction()->CanRevaluate();
178 0 : break;
179 0 : }
180 :
181 0 : default:
182 0 : break;
183 : }
184 :
185 0 : return ret;
186 : }
187 :
188 0 : void FlowEventQueueBase::ProcessDone(FlowEvent *event, bool update_rev_flow) {
189 0 : FlowEntry *flow = event->flow();
190 0 : FlowEntry *rflow = NULL;
191 0 : if (flow && update_rev_flow)
192 0 : rflow = flow->reverse_flow_entry();
193 :
194 0 : switch (event->event()) {
195 :
196 0 : case FlowEvent::DELETE_DBENTRY:
197 : case FlowEvent::DELETE_FLOW: {
198 0 : FLOW_LOCK(flow, rflow, event->event());
199 0 : flow->GetPendingAction()->ResetDelete();
200 0 : if (rflow)
201 0 : rflow->GetPendingAction()->ResetDelete();
202 0 : break;
203 0 : }
204 :
205 0 : case FlowEvent::FLOW_MESSAGE: {
206 0 : FLOW_LOCK(flow, rflow, event->event());
207 0 : flow->GetPendingAction()->ResetRecompute();
208 0 : if (rflow)
209 0 : rflow->GetPendingAction()->ResetRecompute();
210 0 : break;
211 0 : }
212 :
213 0 : case FlowEvent::RECOMPUTE_FLOW: {
214 0 : std::scoped_lock mutext(flow->mutex());
215 0 : flow->GetPendingAction()->ResetRecomputeDBEntry();
216 0 : break;
217 0 : }
218 :
219 0 : case FlowEvent::REVALUATE_DBENTRY: {
220 0 : FLOW_LOCK(flow, rflow, event->event());
221 0 : flow->GetPendingAction()->ResetRevaluate();
222 0 : if (rflow)
223 0 : rflow->GetPendingAction()->ResetRevaluate();
224 0 : break;
225 0 : }
226 :
227 0 : default:
228 0 : break;
229 : }
230 :
231 0 : return;
232 : }
233 :
234 0 : FlowEventQueue::FlowEventQueue(Agent *agent, FlowProto *proto,
235 : FlowTable *table, FlowTokenPool *pool,
236 : uint16_t latency_limit,
237 0 : uint32_t max_iterations) :
238 : FlowEventQueueBase(proto, "Flow Event Queue",
239 0 : agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
240 0 : table->table_index(), pool, latency_limit,
241 : max_iterations),
242 0 : flow_table_(table) {
243 0 : }
244 :
245 0 : FlowEventQueue::~FlowEventQueue() {
246 0 : }
247 :
248 0 : bool FlowEventQueue::HandleEvent(FlowEvent *event) {
249 0 : return flow_proto_->FlowEventHandler(event, flow_table_);
250 : }
251 :
252 0 : DeleteFlowEventQueue::DeleteFlowEventQueue(Agent *agent, FlowProto *proto,
253 : FlowTable *table,
254 : FlowTokenPool *pool,
255 : uint16_t latency_limit,
256 0 : uint32_t max_iterations) :
257 : FlowEventQueueBase(proto, "Flow Delete Queue",
258 0 : agent->task_scheduler()->GetTaskId(kTaskFlowDelete),
259 0 : table->table_index(), pool, latency_limit,
260 : max_iterations),
261 0 : flow_table_(table) {
262 0 : }
263 :
264 0 : DeleteFlowEventQueue::~DeleteFlowEventQueue() {
265 0 : }
266 :
267 0 : bool DeleteFlowEventQueue::HandleEvent(FlowEvent *event) {
268 0 : return flow_proto_->FlowDeleteHandler(event, flow_table_);
269 : }
270 :
271 0 : KSyncFlowEventQueue::KSyncFlowEventQueue(Agent *agent, FlowProto *proto,
272 : FlowTable *table,
273 : FlowTokenPool *pool,
274 : uint16_t latency_limit,
275 0 : uint32_t max_iterations) :
276 : FlowEventQueueBase(proto, "Flow KSync Queue",
277 0 : agent->task_scheduler()->GetTaskId(kTaskFlowKSync),
278 0 : table->table_index(), pool, latency_limit,
279 : max_iterations),
280 0 : flow_table_(table) {
281 0 : }
282 :
283 0 : KSyncFlowEventQueue::~KSyncFlowEventQueue() {
284 0 : }
285 :
286 0 : bool KSyncFlowEventQueue::HandleEvent(FlowEvent *event) {
287 0 : return flow_proto_->FlowKSyncMsgHandler(event, flow_table_);
288 : }
289 :
290 0 : UpdateFlowEventQueue::UpdateFlowEventQueue(Agent *agent, FlowProto *proto,
291 : FlowTokenPool *pool,
292 : uint16_t latency_limit,
293 0 : uint32_t max_iterations) :
294 : FlowEventQueueBase(proto, "Flow Update Queue",
295 0 : agent->task_scheduler()->GetTaskId(kTaskFlowUpdate), 0,
296 0 : pool, latency_limit, max_iterations) {
297 0 : }
298 :
299 0 : UpdateFlowEventQueue::~UpdateFlowEventQueue() {
300 0 : }
301 :
302 0 : bool UpdateFlowEventQueue::HandleEvent(FlowEvent *event) {
303 0 : return flow_proto_->FlowUpdateHandler(event);
304 : }
|