Line data Source code
1 : /* 2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include "db/db_partition.h" 6 : 7 : #include <list> 8 : #include <mutex> 9 : #include <atomic> 10 : 11 : #include <tbb/concurrent_queue.h> 12 : 13 : #include "base/task.h" 14 : #include "db/db.h" 15 : #include "db/db_client.h" 16 : #include "db/db_entry.h" 17 : 18 : using tbb::concurrent_queue; 19 : 20 : struct RequestQueueEntry { 21 : // Constructor takes ownership of DBRequest key, data. 22 1135120 : RequestQueueEntry(DBTablePartBase *tpart, DBClient *client, DBRequest *req) 23 1135120 : : tpart(tpart), client(client) { 24 1135108 : request.Swap(req); 25 1135076 : } 26 : DBTablePartBase *tpart; 27 : DBClient *client; 28 : DBRequest request; 29 : }; 30 : 31 : struct RemoveQueueEntry { 32 518657 : RemoveQueueEntry(DBTablePartBase *tpart, DBEntryBase *db_entry) 33 518657 : : tpart(tpart), db_entry(db_entry) { 34 518657 : } 35 : DBTablePartBase *tpart; 36 : DBEntryBase *db_entry; 37 : }; 38 : 39 : class DBPartition::WorkQueue { 40 : public: 41 : static const int kThreshold = 1024; 42 : typedef concurrent_queue<RequestQueueEntry *> RequestQueue; 43 : typedef concurrent_queue<RemoveQueueEntry *> RemoveQueue; 44 : typedef std::list<DBTablePartBase *> TablePartList; 45 : 46 109306 : explicit WorkQueue(DBPartition *partition, int partition_id) 47 109306 : : db_partition_(partition), 48 109306 : db_partition_id_(partition_id), 49 109306 : disable_(false), 50 109306 : running_(false) { 51 109306 : request_count_ = 0; 52 109306 : max_request_queue_len_ = 0; 53 109306 : total_request_count_ = 0; 54 109306 : } 55 109242 : ~WorkQueue() { 56 109242 : for (RequestQueue::iterator iter = request_queue_.unsafe_begin(); 57 109242 : iter != request_queue_.unsafe_end();) { 58 0 : RequestQueueEntry *req_entry = *iter; 59 0 : ++iter; 60 0 : delete req_entry; 61 109242 : } 62 109242 : request_queue_.clear(); 63 109242 : } 64 : 65 1135081 : bool EnqueueRequest(RequestQueueEntry *req_entry) { 66 1135081 : request_queue_.push(req_entry); 67 1135162 : MaybeStartRunner(); 68 1135175 : uint32_t max = request_count_.fetch_add(1); 69 1135175 : if (max > max_request_queue_len_) 70 154426 : max_request_queue_len_ = max; 71 1135175 : total_request_count_++; 72 1135175 : return max < (kThreshold - 1); 73 : 74 : } 75 : 76 1976446 : bool DequeueRequest(RequestQueueEntry **req_entry) { 77 1976446 : bool success = request_queue_.try_pop(*req_entry); 78 1976165 : if (success) { 79 1135149 : request_count_.fetch_sub(1); 80 : } 81 1976165 : return success; 82 : } 83 : 84 518776 : void EnqueueRemove(RemoveQueueEntry *rm_entry) { 85 518776 : remove_queue_.push(rm_entry); 86 518784 : MaybeStartRunner(); 87 518820 : } 88 : 89 1364393 : bool DequeueRemove(RemoveQueueEntry **rm_entry) { 90 1364393 : bool success = remove_queue_.try_pop(*rm_entry); 91 1364169 : return success; 92 : } 93 : 94 : void MaybeStartRunnerUnlocked(); 95 : void MaybeStartRunner(); 96 : bool RunnerDone(); 97 : 98 : // Normally called from single task that either runs in DB context or is 99 : // exclusive with DB task, but can be called concurrently from multiple 100 : // bgp::ConfigHelper tasks. 101 1676876 : void SetActive(DBTablePartBase *tpart) { 102 1676876 : std::scoped_lock lock(mutex_); 103 1677123 : change_list_.push_back(tpart); 104 1676784 : MaybeStartRunnerUnlocked(); 105 1676870 : } 106 : 107 2518264 : DBTablePartBase *GetActiveTable() { 108 2518264 : DBTablePartBase *tpart = NULL; 109 2518264 : if (!change_list_.empty()) { 110 1676982 : tpart = change_list_.front(); 111 1676926 : change_list_.pop_front(); 112 : } 113 2518338 : return tpart; 114 : } 115 : 116 798470 : int db_partition_id() { 117 798470 : return db_partition_id_; 118 : } 119 : 120 798460 : int db_task_id() const { return db_partition_->task_id(); } 121 : 122 6257628 : bool IsDBQueueEmpty() const { 123 6257628 : return (request_queue_.empty() && change_list_.empty()); 124 : } 125 : 126 849098 : bool disable() { return disable_; } 127 90 : void set_disable(bool disable) { disable_ = disable; } 128 : 129 0 : long request_queue_len() const { 130 0 : return request_count_; 131 : } 132 : 133 0 : uint64_t total_request_count() const { 134 0 : return total_request_count_; 135 : } 136 : 137 0 : uint64_t max_request_queue_len() const { 138 0 : return max_request_queue_len_; 139 : } 140 : 141 : private: 142 : DBPartition *db_partition_; 143 : RequestQueue request_queue_; 144 : TablePartList change_list_; 145 : std::atomic<long> request_count_; 146 : uint64_t total_request_count_; 147 : uint64_t max_request_queue_len_; 148 : RemoveQueue remove_queue_; 149 : std::mutex mutex_; 150 : int db_partition_id_; 151 : bool disable_; 152 : bool running_; 153 : 154 : DISALLOW_COPY_AND_ASSIGN(WorkQueue); 155 : }; 156 : 157 6257628 : bool DBPartition::IsDBQueueEmpty() const { 158 6257628 : return work_queue_->IsDBQueueEmpty(); 159 : } 160 : 161 90 : void DBPartition::SetQueueDisable(bool disable) { 162 90 : if (disable) { 163 45 : work_queue_->set_disable(true); 164 : } else { 165 45 : work_queue_->set_disable(false); 166 45 : work_queue_->MaybeStartRunner(); 167 : } 168 90 : } 169 : 170 : class DBPartition::QueueRunner : public Task { 171 : public: 172 : static const int kMaxIterations = 32; 173 798460 : QueueRunner(WorkQueue *queue) 174 798460 : : Task(queue->db_task_id(), queue->db_partition_id()), 175 798460 : queue_(queue) { 176 798461 : } 177 : 178 849097 : virtual bool Run() { 179 849097 : int count = 0; 180 : 181 : // Skip if the queue is disabled. 182 849097 : if (queue_->disable()) 183 1113 : return queue_->RunnerDone(); 184 : 185 847986 : RemoveQueueEntry *rm_entry = NULL; 186 1364463 : while (queue_->DequeueRemove(&rm_entry)) { 187 518684 : DBEntryBase *db_entry = rm_entry->db_entry; 188 : { 189 : tbb::spin_rw_mutex::scoped_lock 190 518684 : lock(rm_entry->tpart->dbstate_mutex(), false); 191 1037512 : if (!db_entry->IsDeleted() || db_entry->is_onlist() || 192 518734 : !db_entry->is_state_empty_unlocked(rm_entry->tpart)) { 193 42 : db_entry->ClearOnRemoveQ(); 194 42 : db_entry = NULL; 195 : } 196 518759 : } 197 518842 : if (db_entry) { 198 518800 : rm_entry->tpart->Remove(db_entry); 199 : } 200 518818 : delete rm_entry; 201 518835 : if (++count == kMaxIterations) { 202 2358 : return false; 203 : } 204 : } 205 : 206 845484 : RequestQueueEntry *req_entry = NULL; 207 1976370 : while (queue_->DequeueRequest(&req_entry)) { 208 1135144 : req_entry->tpart->Process(req_entry->client, &req_entry->request); 209 1134992 : delete req_entry; 210 1135117 : if (++count == kMaxIterations) { 211 4231 : return false; 212 : } 213 : } 214 : 215 : while (true) { 216 2518191 : DBTablePartBase *tpart = queue_->GetActiveTable(); 217 2518306 : if (tpart == NULL) { 218 841375 : break; 219 : } 220 1676931 : bool done = tpart->RunNotify(); 221 1676995 : if (!done) { 222 45 : return false; 223 : } 224 1676950 : } 225 : 226 : // Running is done only if queue_ is empty. It's possible that more 227 : // entries are added into in the input or remove queues during the 228 : // time we were processing those queues. 229 841375 : return queue_->RunnerDone(); 230 : } 231 : 232 0 : std::string Description() const { 233 0 : return "DBPartition QueueRunner"; 234 : } 235 : private: 236 : WorkQueue *queue_; 237 : }; 238 : 239 3330710 : void DBPartition::WorkQueue::MaybeStartRunnerUnlocked() { 240 3330710 : if (running_) { 241 2532330 : return; 242 : } 243 798380 : running_ = true; 244 798380 : QueueRunner *runner = new QueueRunner(this); 245 798457 : TaskScheduler *scheduler = TaskScheduler::GetInstance(); 246 798439 : scheduler->Enqueue(runner); 247 : } 248 : 249 1653973 : void DBPartition::WorkQueue::MaybeStartRunner() { 250 1653973 : std::scoped_lock lock(mutex_); 251 1653986 : MaybeStartRunnerUnlocked(); 252 1654011 : } 253 : 254 842482 : bool DBPartition::WorkQueue::RunnerDone() { 255 842482 : std::scoped_lock lock(mutex_); 256 842540 : if (disable_ || (request_queue_.empty() && remove_queue_.empty())) { 257 798462 : running_ = false; 258 798462 : return true; 259 : } 260 : 261 43982 : running_ = true; 262 43982 : return false; 263 842444 : } 264 : 265 109306 : DBPartition::DBPartition(DB *db, int partition_id) 266 109306 : : db_(db), work_queue_(new WorkQueue(this, partition_id)) { 267 109306 : } 268 : 269 : // The DBPartition destructor needs to be defined after WorkQueue has 270 : // been declared. 271 109242 : DBPartition::~DBPartition() { 272 109242 : } 273 : 274 1135107 : bool DBPartition::EnqueueRequest(DBTablePartBase *tpart, DBClient *client, 275 : DBRequest *req) { 276 1135107 : RequestQueueEntry *entry = new RequestQueueEntry(tpart, client, req); 277 1135076 : return work_queue_->EnqueueRequest(entry); 278 : } 279 : 280 518656 : void DBPartition::EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry) { 281 518656 : RemoveQueueEntry *entry = new RemoveQueueEntry(tpart, db_entry); 282 518663 : db_entry->SetOnRemoveQ(); 283 518843 : work_queue_->EnqueueRemove(entry); 284 518814 : } 285 : 286 : // concurrency: called from DBPartition task. 287 1676912 : void DBPartition::OnTableChange(DBTablePartBase *tablepart) { 288 1676912 : work_queue_->SetActive(tablepart); 289 1677076 : } 290 : 291 0 : long DBPartition::request_queue_len() const { 292 0 : return work_queue_->request_queue_len(); 293 : } 294 : 295 0 : uint64_t DBPartition::total_request_count() const { 296 0 : return work_queue_->total_request_count(); 297 : } 298 : 299 0 : uint64_t DBPartition::max_request_queue_len() const { 300 0 : return work_queue_->max_request_queue_len(); 301 : } 302 : 303 798458 : int DBPartition::task_id() const { 304 798458 : return db_->task_id(); 305 : }