Line data Source code
1 : /*
2 : * Copyright (c) 2014 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "oper/instance_manager.h"
6 :
7 : #include <boost/bind/bind.hpp>
8 : #include <boost/functional/hash.hpp>
9 : #include <boost/filesystem.hpp>
10 : #include <boost/tokenizer.hpp>
11 : #include <sys/wait.h>
12 : #include "cmn/agent.h"
13 : #include "db/db.h"
14 : #include "io/event_manager.h"
15 : #include "oper/instance_task.h"
16 : #include "oper/operdb_init.h"
17 : #include "oper/service_instance.h"
18 : #include "oper/vm.h"
19 : #include "oper/docker_instance_adapter.h"
20 : #include "oper/netns_instance_adapter.h"
21 : #ifdef WITH_LIBVIRT
22 : #include "oper/libvirt_instance_adapter.h"
23 : #endif
24 : #include "base/util.h"
25 :
26 : using namespace boost::placeholders;
27 :
28 : using boost::uuids::uuid;
29 : SandeshTraceBufferPtr InstanceManagerTraceBuf(
30 : SandeshTraceBufferCreate("InstanceManager", 1000));
31 :
32 : static const char loadbalancer_config_path_default[] =
33 : "/var/lib/contrail/loadbalancer/";
34 : static const char namespace_store_path_default[] =
35 : "/var/run/netns";
36 : static const char namespace_prefix[] = "vrouter-";
37 :
38 : class InstanceManager::NamespaceStaleCleaner {
39 : public:
40 1 : NamespaceStaleCleaner(Agent *agent, InstanceManager *manager)
41 1 : : agent_(agent), manager_(manager) {
42 1 : }
43 :
44 1 : void CleanStaleEntries() {
45 : namespace fs = boost::filesystem;
46 :
47 : //Read all the Namepaces in the system
48 1 : fs::path ns(manager_->namespace_store_path_);
49 1 : if ( !fs::exists(ns) || !fs::is_directory(ns)) {
50 1 : return;
51 : }
52 :
53 : typedef boost::tokenizer<boost::char_separator<char> > tokenizer;
54 0 : boost::char_separator<char> slash_sep("/");
55 0 : boost::char_separator<char> colon_sep(":");
56 0 : fs::directory_iterator end_iter;
57 0 : for(fs::directory_iterator iter(ns); iter != end_iter; iter++) {
58 :
59 : // Get to the name of namespace by removing complete path
60 0 : tokenizer tokens(iter->path().string(), slash_sep);
61 0 : std::string ns_name;
62 0 : for(tokenizer::iterator it=tokens.begin(); it!=tokens.end(); it++){
63 0 : ns_name = *it;
64 0 : }
65 :
66 : //We are interested only in namespaces starting with a given
67 : //prefix
68 : std::size_t vrouter_found;
69 0 : vrouter_found = ns_name.find(namespace_prefix);
70 0 : if (vrouter_found == std::string::npos) {
71 0 : continue;
72 : }
73 :
74 : //Remove the standard prefix
75 0 : ns_name.replace(vrouter_found, strlen(namespace_prefix), "");
76 :
77 : //Namespace might have a ":". Extract both left and right of
78 : //":" Left of ":" is the VM uuid. If not found in Agent's VM
79 : //DB, it can be deleted
80 0 : tokenizer tok(ns_name, colon_sep);
81 0 : boost::uuids::uuid vm_uuid = StringToUuid(*tok.begin());
82 0 : VmKey key(vm_uuid);
83 0 : if (agent_->vm_table()->Find(&key, true)) {
84 0 : continue;
85 : }
86 :
87 0 : ServiceInstance::Properties prop;
88 0 : prop.instance_id = vm_uuid;
89 0 : prop.service_type = ServiceInstance::SourceNAT;
90 0 : tokenizer::iterator next_tok = ++(tok.begin());
91 : //Loadbalancer namespace
92 0 : if (next_tok != tok.end()) {
93 0 : prop.loadbalancer_id = *next_tok;
94 0 : prop.service_type = ServiceInstance::LoadBalancer;
95 : }
96 :
97 : //Delete Namespace
98 0 : manager_->StopStaleNetNS(prop);
99 :
100 : //If Loadbalncer, delete the config files as well
101 0 : if (prop.service_type == ServiceInstance::LoadBalancer) {
102 : //Delete the complete directory
103 0 : std::stringstream pathgen;
104 0 : pathgen << manager_->loadbalancer_config_path_
105 0 : << prop.loadbalancer_id << ".conf";
106 :
107 0 : boost::system::error_code error;
108 0 : if (fs::exists(pathgen.str())) {
109 0 : fs::remove(pathgen.str(), error);
110 0 : if (error) {
111 0 : std::stringstream ss;
112 0 : ss << "Stale loadbalancer cfg fle delete error ";
113 0 : ss << error.message();
114 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
115 0 : }
116 : }
117 0 : }
118 0 : }
119 1 : }
120 :
121 : private:
122 : Agent *agent_;
123 : InstanceManager *manager_;
124 : };
125 :
126 2 : InstanceManager::~InstanceManager() {
127 2 : TimerManager::DeleteTimer(stale_timer_);
128 2 : STLDeleteValues(&adapters_);
129 2 : }
130 :
131 2 : InstanceManager::InstanceManager(Agent *agent)
132 2 : : si_listener_(DBTableBase::kInvalidId),
133 2 : netns_timeout_(-1),
134 2 : work_queue_(TaskScheduler::GetInstance()->GetTaskId(INSTANCE_MANAGER_TASK_NAME), 0,
135 : boost::bind(&InstanceManager::DequeueEvent, this, _1)),
136 2 : loadbalancer_config_path_(loadbalancer_config_path_default),
137 2 : namespace_store_path_(namespace_store_path_default),
138 2 : stale_timer_interval_(5 * 60 * 1000),
139 2 : stale_timer_(TimerManager::CreateTimer(*(agent->event_manager()->io_service()),
140 : "NameSpaceStaleTimer", TaskScheduler::GetInstance()->
141 6 : GetTaskId(INSTANCE_MANAGER_TASK_NAME), 0)), agent_(agent) {
142 2 : if (agent->isMockMode()) {
143 0 : loadbalancer_config_path_ = "/tmp/" + agent->AgentGUID()
144 0 : + loadbalancer_config_path_default;
145 0 : namespace_store_path_ = "/tmp/" + agent->AgentGUID()
146 0 : + namespace_store_path_default;
147 : }
148 2 : work_queue_.set_name("Instance Manager");
149 :
150 2 : }
151 :
152 2 : void InstanceManager::Initialize(DB *database, const std::string &netns_cmd,
153 : const std::string &docker_cmd,
154 : const int netns_workers,
155 : const int netns_timeout) {
156 2 : DBTableBase *si_table = agent_->service_instance_table();
157 2 : assert(si_table);
158 2 : si_listener_ = si_table->Register(
159 : boost::bind(&InstanceManager::EventObserver, this, _1, _2));
160 :
161 2 : netns_cmd_ = netns_cmd;
162 2 : if (netns_cmd_.length() == 0) {
163 2 : LOG(ERROR, "NetNS path for network namespace command not specified "
164 : "in the config file, the namespaces won't be started");
165 : }
166 2 : if (docker_cmd.length() == 0) {
167 2 : LOG(ERROR, "Path for Docker starter command not specified "
168 : "in the config file, the Docker instances won't be started");
169 : }
170 :
171 2 : std::stringstream pathgen;
172 2 : pathgen << loadbalancer_config_path_;
173 2 : boost::filesystem::path dir(pathgen.str());
174 2 : boost::system::error_code error;
175 2 : boost::filesystem::create_directories(dir, error);
176 2 : if (error) {
177 0 : LOG(ERROR, "Falied to create Loadbalancer Directory " << pathgen.str());
178 : }
179 :
180 2 : adapters_.push_back(new DockerInstanceAdapter(docker_cmd, agent_));
181 2 : adapters_.push_back(new NetNSInstanceAdapter(netns_cmd,
182 2 : loadbalancer_config_path_, agent_));
183 : #ifdef WITH_LIBVIRT
184 : adapters_.push_back(new LibvirtInstanceAdapter(agent_,
185 : "qemu:///system"));
186 : #endif
187 :
188 2 : netns_timeout_ = kTimeoutDefault;
189 2 : if (netns_timeout >= 1) {
190 0 : netns_timeout_ = netns_timeout;
191 : }
192 :
193 2 : netns_reattempts_ = kReattemptsDefault;
194 :
195 2 : int workers = kWorkersDefault;
196 2 : if (netns_workers > 0) {
197 0 : workers = netns_workers;
198 : }
199 :
200 2 : task_queues_.resize(workers);
201 2 : for (std::vector<InstanceTaskQueue *>::iterator iter = task_queues_.begin();
202 4 : iter != task_queues_.end(); ++iter) {
203 : InstanceTaskQueue *task_queue =
204 2 : new InstanceTaskQueue(agent_->event_manager());
205 2 : assert(task_queue);
206 2 : task_queue->set_on_timeout_cb(
207 : boost::bind(&InstanceManager::OnTaskTimeout,
208 : this, _1));
209 2 : *iter = task_queue;
210 : }
211 2 : stale_timer_->Start(StaleTimerInterval(),
212 : boost::bind(&InstanceManager::StaleTimeout, this));
213 :
214 2 : }
215 :
216 0 : void InstanceManager::SetNetNSCmd(const std::string &netns_cmd) {
217 0 : ServiceInstance::Properties prop;
218 0 : prop.virtualization_type =
219 : ServiceInstance::ServiceInstance::NetworkNamespace;
220 : NetNSInstanceAdapter *adapter = static_cast<NetNSInstanceAdapter
221 0 : *>(FindApplicableAdapter(prop));
222 0 : if (adapter)
223 0 : adapter->set_cmd(netns_cmd);
224 0 : }
225 :
226 0 : void InstanceManager::SetStaleTimerInterval(int minutes) {
227 0 : stale_timer_interval_ = minutes * 60 * 1000;
228 0 : }
229 :
230 0 : void InstanceManager::OnTaskTimeout(InstanceTaskQueue *task_queue) {
231 0 : InstanceManagerChildEvent event;
232 0 : event.type = OnTaskTimeoutEvent;
233 0 : event.task_queue = task_queue;
234 :
235 0 : work_queue_.Enqueue(event);
236 0 : }
237 :
238 0 : void InstanceManager::OnTaskTimeoutEventHandler(InstanceManagerChildEvent event) {
239 0 : std::stringstream ss;
240 0 : ss << "TaskTimeOut for the TaskQ " << event.task_queue;
241 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
242 :
243 0 : ScheduleNextTask(event.task_queue);
244 0 : }
245 :
246 0 : void InstanceManager::OnErrorEventHandler(InstanceManagerChildEvent event) {
247 0 : ServiceInstance *svc_instance = GetSvcInstance(event.task);
248 0 : if (!svc_instance) {
249 0 : return;
250 : }
251 :
252 0 : std::stringstream ss;
253 0 : ss << "Error for the Task " << event.task << " " << event.errors;
254 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
255 :
256 0 : InstanceState *state = GetState(svc_instance);
257 0 : if (state != NULL) {
258 0 : state->set_errors(event.errors);
259 : }
260 0 : }
261 :
262 0 : void InstanceManager::OnExitEventHandler(InstanceManagerChildEvent event) {
263 0 : ServiceInstance *svc_instance = GetSvcInstance(event.task);
264 0 : if (!svc_instance) {
265 0 : return;
266 : }
267 :
268 0 : std::stringstream ss;
269 0 : ss << "Exit event for the Task " << event.task;
270 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
271 :
272 0 : UpdateStateStatusType(event);
273 0 : for (std::vector<InstanceTaskQueue *>::iterator iter =
274 0 : task_queues_.begin();
275 0 : iter != task_queues_.end(); ++iter) {
276 0 : InstanceTaskQueue *task_queue = *iter;
277 0 : if (!task_queue->Empty()) {
278 0 : if (task_queue->Front() == event.task) {
279 0 : task_queue->Pop();
280 0 : delete event.task;
281 0 : task_queue->StopTimer();
282 0 : DeleteState(svc_instance);
283 0 : ScheduleNextTask(task_queue);
284 0 : return;
285 : }
286 : }
287 : }
288 :
289 0 : }
290 :
291 0 : bool InstanceManager::DequeueEvent(InstanceManagerChildEvent event) {
292 0 : if (event.type == OnErrorEvent) {
293 0 : OnErrorEventHandler(event);
294 0 : } else if (event.type == OnTaskTimeoutEvent) {
295 0 : OnTaskTimeoutEventHandler(event);
296 0 : } else if (event.type == OnExitEvent) {
297 0 : OnExitEventHandler(event);
298 : }
299 :
300 0 : return true;
301 : }
302 :
303 0 : void InstanceManager::UpdateStateStatusType(InstanceManagerChildEvent event) {
304 0 : ServiceInstance* svc_instance = UnregisterSvcInstance(event.task);
305 0 : if (svc_instance) {
306 0 : InstanceState *state = GetState(svc_instance);
307 :
308 : // The below code might not really capture the errors that
309 : // occured in the child process. As we are relying on the pipe
310 : // status to identify child exit, even if child process ends up
311 : // in error state, pipe might return "success" because pipe is
312 : // closed without erros. But what we want to reflect is the
313 : // error of child process. If there is any error in the pipe
314 : // status, we show that as is. If not, if there is any error string
315 : // in error_, we mark the error status as -1.
316 : // pipe returning boost::system::errc::no_such_file_or_directory
317 : // error is considered as no pipe errors
318 :
319 0 : if (state != NULL) {
320 0 : int error_status = event.error_val;
321 0 : if (error_status ==
322 : boost::system::errc::no_such_file_or_directory) {
323 0 : error_status = 0;
324 : }
325 0 : if (!state->errors().empty()) {
326 0 : if (error_status == 0) {
327 0 : error_status = -1;
328 : }
329 : }
330 :
331 0 : state->set_status(error_status);
332 :
333 0 : if (error_status != 0) {
334 0 : if (state->status_type() != InstanceState::Timeout) {
335 0 : state->set_status_type(InstanceState::Error);
336 : }
337 0 : } else if (state->status_type() == InstanceState::Starting) {
338 0 : state->set_status_type(InstanceState::Started);
339 0 : } else if (state->status_type() == InstanceState::Stopping) {
340 0 : state->set_status_type(InstanceState::Stopped);
341 : }
342 :
343 0 : std::stringstream ss;
344 0 : ss << "For the task " << event.task << " error status " <<
345 0 : error_status << " status type " << state->status_type();
346 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
347 0 : }
348 : }
349 0 : }
350 :
351 0 : InstanceState *InstanceManager::GetState(ServiceInstance *svc_instance) const {
352 : return static_cast<InstanceState *>(
353 0 : svc_instance->GetState(agent_->service_instance_table(),
354 0 : si_listener_));
355 : }
356 :
357 0 : InstanceState *InstanceManager::GetState(InstanceTask* task) const {
358 0 : ServiceInstance* svc_instance = GetSvcInstance(task);
359 0 : if (svc_instance) {
360 0 : InstanceState *state = GetState(svc_instance);
361 0 : return state;
362 : }
363 0 : return NULL;
364 : }
365 :
366 0 : void InstanceManager::SetState(ServiceInstance *svc_instance,
367 : InstanceState *state) {
368 0 : svc_instance->SetState(agent_->service_instance_table(),
369 : si_listener_,state);
370 0 : }
371 :
372 0 : void InstanceManager::ClearState(ServiceInstance *svc_instance) {
373 0 : svc_instance->ClearState(agent_->service_instance_table(), si_listener_);
374 0 : }
375 :
376 0 : bool InstanceManager::DeleteState(ServiceInstance *svc_instance) {
377 :
378 0 : if (!svc_instance || !svc_instance->IsDeleted()) {
379 0 : return false;
380 : }
381 :
382 0 : InstanceState *state = GetState(svc_instance);
383 0 : if (state && !state->tasks_running()) {
384 0 : ClearState(svc_instance);
385 0 : delete state;
386 0 : ClearLastCmdType(svc_instance);
387 0 : return true;
388 : }
389 :
390 0 : return false;
391 : }
392 :
393 :
394 2 : void InstanceManager::StateClear() {
395 : DBTablePartition *partition = static_cast<DBTablePartition *>(
396 2 : agent_->service_instance_table()->GetTablePartition(0));
397 :
398 2 : if (!partition)
399 0 : return;
400 :
401 2 : DBEntryBase *next = NULL;
402 2 : for (DBEntryBase *entry = partition->GetFirst(); entry; entry = next) {
403 0 : next = partition->GetNext(entry);
404 : DBState *state =
405 0 : entry->GetState(agent_->service_instance_table(), si_listener_);
406 0 : if (state != NULL) {
407 0 : entry->ClearState(agent_->service_instance_table(), si_listener_);
408 0 : delete state;
409 0 : ClearLastCmdType(static_cast<ServiceInstance *>(entry));
410 : }
411 : }
412 : }
413 :
414 2 : void InstanceManager::Terminate() {
415 2 : StateClear();
416 2 : agent_->service_instance_table()->Unregister(si_listener_);
417 2 : agent_->service_instance_table()->Clear();
418 :
419 : InstanceTaskQueue *task_queue;
420 2 : for (std::vector<InstanceTaskQueue *>::iterator iter = task_queues_.begin();
421 4 : iter != task_queues_.end(); ++iter) {
422 2 : if ((task_queue = *iter) == NULL) {
423 0 : continue;
424 : }
425 2 : task_queue->Clear();
426 :
427 2 : delete task_queue;
428 : }
429 2 : work_queue_.Shutdown();
430 2 : }
431 :
432 0 : void InstanceManager::Enqueue(InstanceTask *task,
433 : const boost::uuids::uuid &uuid) {
434 0 : std::stringstream ss;
435 0 : ss << uuid;
436 0 : InstanceTaskQueue *task_queue = GetTaskQueue(ss.str());
437 0 : task_queue->Push(task);
438 0 : ScheduleNextTask(task_queue);
439 0 : }
440 :
441 0 : InstanceTaskQueue *InstanceManager::GetTaskQueue(const std::string &str) {
442 : boost::hash<std::string> hash;
443 0 : int index = hash(str) % task_queues_.size();
444 0 : return task_queues_[index];
445 : }
446 :
447 : //After Run(), if child process is running, we keep the task status as
448 : //"Starting" or "Stopping". We start a timer to track TaskTimeout time.
449 : //if process is not runnning, we verify how many times we already attempted
450 : //to run. If netns_reattempts_ are already crossed, we return false,
451 : // so that caller deletes the task without running any further.
452 : //If required reattempts are not done, we start a timer and return true
453 : // so that same task is run again after the timeout. The task status is set to
454 : //"reattempt" to track reattempt case
455 0 : bool InstanceManager::StartTask(InstanceTaskQueue *task_queue,
456 : InstanceTask *task) {
457 :
458 :
459 0 : InstanceState *state = GetState(task);
460 0 : if (state) {
461 0 : state->reset_errors();
462 : }
463 :
464 : pid_t pid;
465 0 : bool status = task->Run();
466 :
467 0 : std::stringstream ss;
468 0 : ss << "Run status for the task " << task << " " << status;
469 0 : ss << " With running status " << task->is_running();
470 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
471 :
472 0 : if (status || task->is_running()) {
473 0 : pid = task->pid();
474 0 : if (state != NULL) {
475 0 : state->set_pid(pid);
476 0 : state->set_cmd(task->cmd());
477 0 : if (task->cmd_type() == Start) {
478 0 : state->set_status_type(InstanceState::Starting);
479 : } else {
480 0 : state->set_status_type(InstanceState::Stopping);
481 : }
482 : }
483 : } else {
484 :
485 0 : ss.str(std::string());
486 0 : ss << "Run failure for the task " << task << " attempt " << task->reattempts();
487 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
488 :
489 0 : if (state) {
490 0 : state->set_status_type(InstanceState::Reattempt);
491 0 : state->set_cmd(task->cmd());
492 : }
493 0 : if (task->incr_reattempts() > netns_reattempts_) {
494 0 : ss.str(std::string());
495 0 : ss << "Run failure for the task " << task << " attempts exceeded";
496 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
497 0 : return false;
498 : }
499 : }
500 :
501 0 : task_queue->StartTimer(netns_timeout_ * 1000);
502 :
503 0 : return true;
504 0 : }
505 :
506 : //If Starting the task succeds we wait for another event on that task.
507 : //If not the task is removed from the front of the queue and is delted.
508 0 : void InstanceManager::ScheduleNextTask(InstanceTaskQueue *task_queue) {
509 0 : while (!task_queue->Empty()) {
510 :
511 0 : InstanceTask *task = task_queue->Front();
512 0 : InstanceState *state = GetState(task);
513 :
514 0 : if (!task->is_running()) {
515 0 : bool status = StartTask(task_queue, task);
516 0 : if (status) {
517 0 : return;
518 : }
519 : } else {
520 0 : int delay = time(NULL) - task->start_time();
521 0 : if (delay < netns_timeout_) {
522 0 : return;
523 : }
524 0 : if (state) {
525 0 : state->set_status_type(InstanceState::Timeout);
526 : }
527 :
528 0 : std::stringstream ss;
529 0 : ss << "Timeout for the Task " << task << " delay " << delay;
530 0 : ss << " netns timeout " << netns_timeout_ << " ";
531 0 : ss << task->cmd();
532 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
533 :
534 0 : if (delay >= (netns_timeout_ * 2)) {
535 0 : task->Terminate();
536 0 : if (task->IsSetup())
537 0 : return;
538 : } else {
539 0 : task->Stop();
540 0 : return;
541 : }
542 0 : }
543 :
544 0 : task_queue->StopTimer();
545 0 : task_queue->Pop();
546 :
547 0 : ServiceInstance* svc_instance = GetSvcInstance(task);
548 0 : if (state && svc_instance)
549 0 : state->decr_tasks_running();
550 :
551 0 : task_svc_instances_.erase(task);
552 :
553 0 : std::stringstream ss;
554 0 : ss << "Delete of the Task " << task;
555 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
556 :
557 0 : DeleteState(svc_instance);
558 :
559 0 : delete task;
560 0 : }
561 : }
562 :
563 0 : ServiceInstance *InstanceManager::GetSvcInstance(InstanceTask *task) const {
564 : TaskSvcMap::const_iterator iter =
565 0 : task_svc_instances_.find(task);
566 0 : if (iter != task_svc_instances_.end()) {
567 0 : return iter->second;
568 : }
569 0 : return NULL;
570 : }
571 :
572 0 : void InstanceManager::RegisterSvcInstance(InstanceTask *task,
573 : ServiceInstance *svc_instance) {
574 : pair<TaskSvcMap::iterator, bool> result =
575 0 : task_svc_instances_.insert(std::make_pair(task, svc_instance));
576 0 : assert(result.second);
577 :
578 0 : InstanceState *state = GetState(svc_instance);
579 0 : assert(state);
580 0 : state->incr_tasks_running();
581 0 : }
582 :
583 0 : ServiceInstance *InstanceManager::UnregisterSvcInstance(InstanceTask *task) {
584 0 : for (TaskSvcMap::iterator iter =
585 0 : task_svc_instances_.begin();
586 0 : iter != task_svc_instances_.end(); ++iter) {
587 0 : if (task == iter->first) {
588 0 : ServiceInstance *svc_instance = iter->second;
589 0 : InstanceState *state = GetState(svc_instance);
590 0 : assert(state);
591 0 : state->decr_tasks_running();
592 0 : task_svc_instances_.erase(iter);
593 0 : return svc_instance;
594 : }
595 : }
596 :
597 0 : return NULL;
598 : }
599 :
600 0 : void InstanceManager::UnregisterSvcInstance(ServiceInstance *svc_instance) {
601 :
602 0 : InstanceState *state = GetState(svc_instance);
603 0 : assert(state);
604 :
605 : TaskSvcMap::iterator iter =
606 0 : task_svc_instances_.begin();
607 0 : while(iter != task_svc_instances_.end()) {
608 0 : if (svc_instance == iter->second) {
609 0 : task_svc_instances_.erase(iter++);
610 0 : state->decr_tasks_running();
611 : } else {
612 0 : ++iter;
613 : }
614 : }
615 0 : }
616 :
617 0 : InstanceManagerAdapter* InstanceManager::FindApplicableAdapter(const ServiceInstance::Properties &props) {
618 0 : for (std::vector<InstanceManagerAdapter *>::iterator iter = adapters_.begin();
619 0 : iter != adapters_.end(); ++iter) {
620 0 : InstanceManagerAdapter *adapter = *iter;
621 0 : if (adapter != NULL && adapter->isApplicable(props)) {
622 0 : return adapter;
623 : }
624 : }
625 0 : return NULL;
626 : }
627 :
628 0 : void InstanceManager::StartServiceInstance(ServiceInstance *svc_instance,
629 : InstanceState *state, bool update) {
630 0 : const ServiceInstance::Properties &props = svc_instance->properties();
631 0 : InstanceManagerAdapter *adapter = this->FindApplicableAdapter(props);
632 0 : std::stringstream ss;
633 0 : if (adapter != NULL) {
634 0 : InstanceTask *task = adapter->CreateStartTask(props, update);
635 0 : if (task != NULL) {
636 0 : ss << "Starting the Task " << task << " " << task->cmd();
637 0 : ss << " for " << props.instance_id;
638 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
639 0 : task->set_on_data_cb(boost::bind(&InstanceManager::OnError,
640 : this, _1, _2));
641 0 : task->set_on_exit_cb(boost::bind(&InstanceManager::OnExit,
642 : this, _1, _2));
643 0 : state->set_properties(props);
644 0 : RegisterSvcInstance(task, svc_instance);
645 0 : std::stringstream info;
646 0 : Enqueue(task, props.instance_id);
647 0 : } else {
648 0 : ss << "Error Starting the Task for " << props.instance_id;
649 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
650 : }
651 : } else {
652 0 : ss << "Unknown virtualization type " << props.virtualization_type;
653 0 : ss << " for " << svc_instance->ToString();
654 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
655 : }
656 0 : }
657 :
658 :
659 0 : void InstanceManager::StopServiceInstance(ServiceInstance *svc_instance,
660 : InstanceState *state) {
661 0 : const ServiceInstance::Properties &props = state->properties();
662 0 : InstanceManagerAdapter *adapter = this->FindApplicableAdapter(props);
663 0 : std::stringstream ss;
664 0 : if (adapter != NULL) {
665 0 : InstanceTask *task = adapter->CreateStopTask(props);
666 0 : if (task != NULL) {
667 0 : ss << "Stopping the Task " << task << " " << task->cmd();
668 0 : ss << " for " << props.instance_id;
669 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
670 0 : task->set_on_data_cb(boost::bind(&InstanceManager::OnError,
671 : this, _1, _2));
672 0 : task->set_on_exit_cb(boost::bind(&InstanceManager::OnExit,
673 : this, _1, _2));
674 0 : RegisterSvcInstance(task, svc_instance);
675 0 : std::stringstream info;
676 0 : Enqueue(task, props.instance_id);
677 0 : } else {
678 0 : std::stringstream ss;
679 0 : ss << "Error Stopping the Task for " << props.instance_id;
680 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
681 0 : }
682 : } else {
683 0 : ss << "Unknown virtualization type " << props.virtualization_type;
684 0 : ss << " for " << svc_instance->ToString();
685 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
686 : }
687 0 : }
688 :
689 0 : void InstanceManager::OnError(InstanceTask *task,
690 : const std::string errors) {
691 :
692 0 : InstanceManagerChildEvent event;
693 0 : event.type = OnErrorEvent;
694 0 : event.task = task;
695 0 : event.errors = errors;
696 :
697 0 : work_queue_.Enqueue(event);
698 0 : }
699 :
700 0 : void InstanceManager::OnExit(InstanceTask *task,
701 : const boost::system::error_code &ec) {
702 :
703 0 : InstanceManagerChildEvent event;
704 0 : event.type = OnExitEvent;
705 0 : event.task = task;
706 0 : event.error_val = ec.value();
707 :
708 0 : work_queue_.Enqueue(event);
709 0 : }
710 :
711 0 : void InstanceManager::StopStaleNetNS(ServiceInstance::Properties &props) {
712 0 : std::stringstream cmd_str;
713 :
714 0 : if (netns_cmd_.length() == 0) {
715 0 : return;
716 : }
717 0 : cmd_str << netns_cmd_ << " destroy";
718 :
719 0 : cmd_str << " " << props.ServiceTypeString();
720 0 : cmd_str << " " << UuidToString(props.instance_id);
721 0 : cmd_str << " " << UuidToString(boost::uuids::nil_uuid());
722 0 : cmd_str << " " << UuidToString(boost::uuids::nil_uuid());
723 0 : if (props.service_type == ServiceInstance::LoadBalancer) {
724 0 : if (props.loadbalancer_id.empty()) {
725 0 : LOG(ERROR, "loadbalancer id is missing for service instance: "
726 : << UuidToString(props.instance_id));
727 0 : return;
728 : }
729 0 : cmd_str << " --loadbalancer-id " << props.loadbalancer_id;
730 : }
731 :
732 0 : std::string cmd = cmd_str.str();
733 0 : std::vector<std::string> argv;
734 0 : boost::split(argv, cmd, boost::is_any_of(" "), boost::token_compress_on);
735 0 : std::vector<const char *> c_argv(argv.size() + 1);
736 0 : for (std::size_t i = 0; i != argv.size(); ++i) {
737 0 : c_argv[i] = argv[i].c_str();
738 : }
739 :
740 0 : std::stringstream ss;
741 0 : ss << "StaleNetNS " << cmd;
742 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
743 :
744 0 : pid_t pid = vfork();
745 0 : if (pid == 0) {
746 0 : CloseTaskFds();
747 0 : execvp(c_argv[0], (char **) c_argv.data());
748 0 : perror("execvp");
749 :
750 0 : _exit(127);
751 : }
752 0 : }
753 :
754 0 : void InstanceManager::SetLastCmdType(ServiceInstance *svc_instance,
755 : int last_cmd_type) {
756 0 : std::string uuid = UuidToString(svc_instance->uuid());
757 : std::map<std::string, int>::iterator iter =
758 0 : last_cmd_types_.find(uuid);
759 0 : if (iter != last_cmd_types_.end()) {
760 0 : iter->second = last_cmd_type;
761 : } else {
762 0 : last_cmd_types_.insert(std::make_pair(uuid, last_cmd_type));
763 : }
764 0 : }
765 :
766 0 : int InstanceManager::GetLastCmdType(ServiceInstance *svc_instance) const {
767 0 : std::string uuid = UuidToString(svc_instance->uuid());
768 : std::map<std::string, int>::const_iterator iter =
769 0 : last_cmd_types_.find(uuid);
770 0 : if (iter != last_cmd_types_.end()) {
771 0 : return iter->second;
772 : }
773 :
774 0 : return 0;
775 0 : }
776 :
777 0 : void InstanceManager::ClearLastCmdType(ServiceInstance *svc_instance) {
778 0 : std::string uuid = UuidToString(svc_instance->uuid());
779 : std::map<std::string, int>::iterator iter =
780 0 : last_cmd_types_.find(uuid);
781 0 : if (iter != last_cmd_types_.end()) {
782 0 : last_cmd_types_.erase(iter);
783 : }
784 0 : }
785 :
786 0 : void InstanceManager::EventObserver(
787 : DBTablePartBase *db_part, DBEntryBase *entry) {
788 0 : ServiceInstance *svc_instance = static_cast<ServiceInstance *>(entry);
789 :
790 0 : InstanceState *state = GetState(svc_instance);
791 0 : if (svc_instance->IsDeleted()) {
792 0 : if (state) {
793 0 : if (GetLastCmdType(svc_instance) == Start) {
794 0 : StopServiceInstance(svc_instance, state);
795 0 : SetLastCmdType(svc_instance, Stop);
796 : }
797 0 : if (DeleteState(svc_instance)) {
798 0 : return;
799 : }
800 : }
801 0 : ClearLastCmdType(svc_instance);
802 : } else {
803 0 : if (state == NULL) {
804 0 : state = new InstanceState();
805 0 : SetState(svc_instance, state);
806 : }
807 :
808 0 : bool usable = svc_instance->IsUsable();
809 :
810 0 : std::stringstream ss;
811 0 : ss << "NetNS event notification for uuid: " << svc_instance->ToString();
812 0 : ss << (usable ? " usable" : " not usable");
813 0 : INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
814 :
815 0 : if (!usable && GetLastCmdType(svc_instance) == Start) {
816 0 : StopServiceInstance(svc_instance, state);
817 0 : SetLastCmdType(svc_instance, Stop);
818 0 : } else if (usable) {
819 0 : if (GetLastCmdType(svc_instance) == Start && state->properties().CompareTo(
820 : svc_instance->properties()) != 0) {
821 0 : StartServiceInstance(svc_instance, state, true);
822 0 : } else if (GetLastCmdType(svc_instance) != Start) {
823 0 : StartServiceInstance(svc_instance, state, false);
824 0 : SetLastCmdType(svc_instance, Start);
825 : }
826 : }
827 0 : }
828 : }
829 :
830 1 : bool InstanceManager::StaleTimeout() {
831 :
832 1 : if (stale_cleaner_.get())
833 0 : return false;
834 1 : stale_cleaner_.reset(new NamespaceStaleCleaner(agent_, this));
835 1 : stale_cleaner_->CleanStaleEntries();
836 1 : stale_cleaner_.reset(NULL);
837 1 : return false;
838 : }
839 :
840 0 : void InstanceManager::SetNamespaceStorePath(std::string path) {
841 0 : namespace_store_path_ = path;
842 0 : }
843 :
844 : /*
845 : * InstanceState class
846 : */
847 0 : InstanceState::InstanceState() : DBState(),
848 0 : pid_(0), status_(0), status_type_(0), tasks_running_(0) {
849 0 : }
850 :
851 0 : void InstanceState::Clear() {
852 0 : pid_ = 0;
853 0 : status_ = 0;
854 0 : errors_.clear();
855 0 : cmd_.clear();
856 0 : }
|