Line data Source code
1 : /*
2 : * Copyright (c) 2014 Juniper Networks, Inc. All right reserved.
3 : */
4 :
5 : #include "oper/instance_task.h"
6 : #include "base/logging.h"
7 : #include "io/event_manager.h"
8 :
9 1 : InstanceTask::InstanceTask()
10 1 : : is_running_(false), start_time_(0), reattempts_(0)
11 1 : {}
12 :
13 1 : InstanceTaskExecvp::InstanceTaskExecvp(const std::string &name,
14 : const std::string &cmd,
15 1 : int cmd_type, EventManager *evm) :
16 1 : name_(name), cmd_(cmd), input_(*(evm->io_service())),
17 1 : setup_done_(false), pid_(0), cmd_type_(cmd_type), pipe_stdout_(false) {
18 1 : }
19 :
20 98 : void InstanceTaskExecvp::ReadData(const boost::system::error_code &ec,
21 : size_t read_bytes) {
22 98 : if (read_bytes) {
23 49 : std::string data(rx_buff_, read_bytes);
24 49 : if (!on_data_cb_.empty()) {
25 49 : on_data_cb_(this, data);
26 : }
27 49 : LOG(DEBUG, "Command output: " + data);
28 49 : }
29 :
30 98 : if (ec) {
31 49 : boost::system::error_code close_ec;
32 49 : input_.close(close_ec);
33 :
34 49 : if (!on_exit_cb_.empty()) {
35 49 : on_exit_cb_(this, ec);
36 : }
37 49 : LOG(DEBUG, "Command code: " + ec.message());
38 49 : return;
39 : }
40 :
41 49 : bzero(rx_buff_, sizeof(rx_buff_));
42 49 : input_.async_read_some(boost::asio::buffer(rx_buff_, kBufLen),
43 98 : boost::bind(&InstanceTaskExecvp::ReadData,
44 : this, boost::asio::placeholders::error,
45 : boost::asio::placeholders::bytes_transferred));
46 : }
47 :
48 0 : void InstanceTaskExecvp::Stop() {
49 0 : assert(pid_);
50 0 : kill(pid_, SIGTERM);
51 0 : }
52 :
53 0 : void InstanceTaskExecvp::Terminate() {
54 0 : assert(pid_);
55 0 : kill(pid_, SIGKILL);
56 0 : }
57 :
58 0 : bool InstanceTaskExecvp::IsSetup() {
59 0 : return setup_done_;
60 : }
61 :
62 : // If there is an error before the fork, task is set to "not running"
63 : // and "false" is returned to caller so that caller can take appropriate
64 : // action on task. If an error is encounted after fork, it is very
65 : // likely that child process is running so we keep the task status as
66 : // "running" and return "false" to caller, so that caller does not
67 : // attempt to run the same task again. In this case, the child process
68 : // exit notification can not be received by instance manager, hence
69 : // instance manager has to rely on TaskTimeout delete the task.
70 50 : bool InstanceTaskExecvp::Run() {
71 50 : std::vector<std::string> argv;
72 :
73 50 : is_running_ = true;
74 :
75 50 : boost::split(argv, cmd_, boost::is_any_of(" "), boost::token_compress_on);
76 50 : std::vector<const char *> c_argv(argv.size() + 1);
77 150 : for (std::size_t i = 0; i != argv.size(); ++i) {
78 100 : c_argv[i] = argv[i].c_str();
79 : }
80 :
81 : int err[2];
82 50 : if (pipe(err) < 0) {
83 0 : return is_running_ = false;
84 : }
85 :
86 50 : pid_ = vfork();
87 100 : if (pid_ == 0) {
88 50 : close(err[0]);
89 50 : if (pipe_stdout_) {
90 50 : dup2(err[1], STDOUT_FILENO);
91 : } else {
92 0 : dup2(err[1], STDERR_FILENO);
93 : }
94 50 : close(err[1]);
95 :
96 50 : if (!pipe_stdout_) {
97 0 : close(STDOUT_FILENO);
98 0 : close(STDIN_FILENO);
99 : }
100 :
101 : /* Close all the open fds before execvp */
102 50 : CloseTaskFds();
103 50 : execvp(c_argv[0], (char **) c_argv.data());
104 0 : perror("execvp");
105 :
106 0 : _exit(127);
107 : }
108 :
109 50 : close(err[1]);
110 :
111 50 : start_time_ = time(NULL);
112 :
113 50 : int fd = ::dup(err[0]);
114 50 : close(err[0]);
115 50 : if (fd == -1) {
116 : //is_running_ is still true indicating the child process is
117 : //running. Caller needs to verify the status before acting on
118 : //the task again
119 0 : return false;
120 : }
121 50 : boost::system::error_code ec;
122 50 : input_.assign(fd, ec);
123 50 : if (ec) {
124 0 : close(fd);
125 :
126 : //is_running_ is still true indicating the child process is
127 : //running. Caller needs to verify the status before acting on
128 : //the task again
129 0 : return false;
130 : }
131 50 : setup_done_ = true;
132 :
133 50 : bzero(rx_buff_, sizeof(rx_buff_));
134 50 : input_.async_read_some(boost::asio::buffer(rx_buff_, kBufLen),
135 50 : boost::bind(&InstanceTaskExecvp::ReadData,
136 : this, boost::asio::placeholders::error,
137 : boost::asio::placeholders::bytes_transferred));
138 50 : return true;
139 :
140 50 : }
141 :
142 1 : InstanceTaskQueue::InstanceTaskQueue(EventManager *evm) : evm_(evm),
143 2 : timeout_timer_(TimerManager::CreateTimer(
144 1 : *evm_->io_service(),
145 : "Instance Manager Task Timeout",
146 : TaskScheduler::GetInstance()->GetTaskId(
147 1 : INSTANCE_MANAGER_TASK_NAME), 0)) {
148 1 : }
149 :
150 1 : InstanceTaskQueue::~InstanceTaskQueue() {
151 1 : TimerManager::DeleteTimer(timeout_timer_);
152 1 : }
153 :
154 0 : void InstanceTaskQueue::StartTimer(int time) {
155 0 : timeout_timer_->Start(time,
156 : boost::bind(&InstanceTaskQueue::OnTimerTimeout,
157 : this),
158 : boost::bind(&InstanceTaskQueue::TimerErrorHandler,
159 : this, _1, _2));
160 0 : }
161 :
162 0 : void InstanceTaskQueue::StopTimer() {
163 0 : timeout_timer_->Cancel();
164 0 : }
165 :
166 0 : bool InstanceTaskQueue::OnTimerTimeout() {
167 0 : if (! on_timeout_cb_.empty()) {
168 0 : on_timeout_cb_(this);
169 : }
170 :
171 0 : return true;
172 : }
173 :
174 0 : void InstanceTaskQueue::TimerErrorHandler(const std::string &name, std::string error) {
175 0 : LOG(ERROR, "NetNS timeout error: " << error);
176 0 : }
177 :
178 1 : void InstanceTaskQueue::Clear() {
179 1 : timeout_timer_->Cancel();
180 :
181 1 : while(!task_queue_.empty()) {
182 0 : InstanceTask *task = task_queue_.front();
183 0 : task_queue_.pop();
184 0 : delete task;
185 : }
186 1 : }
|