Line data Source code
1 : /* 2 : * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include "db/db_table_walk_mgr.h" 6 : 7 : #include <boost/bind/bind.hpp> 8 : #include <boost/foreach.hpp> 9 : 10 : #include "base/logging.h" 11 : #include "base/task.h" 12 : #include "base/task_annotations.h" 13 : #include "db/db.h" 14 : #include "db/db_partition.h" 15 : #include "db/db_table.h" 16 : #include "db/db_table_partition.h" 17 : 18 : using namespace boost::placeholders; 19 : 20 27397 : DBTableWalkMgr::DBTableWalkMgr() 21 54794 : : walk_request_trigger_(new TaskTrigger( 22 : boost::bind(&DBTableWalkMgr::ProcessWalkRequestList, this), 23 54794 : TaskScheduler::GetInstance()->GetTaskId("db::Walker"), 0)), 24 54794 : walk_done_trigger_(new TaskTrigger( 25 : boost::bind(&DBTableWalkMgr::ProcessWalkDone, this), 26 54794 : TaskScheduler::GetInstance()->GetTaskId("db::Walker"), 0)) { 27 27397 : } 28 : 29 328531 : bool DBTableWalkMgr::ProcessWalkRequestList() { 30 328531 : CHECK_CONCURRENCY("db::Walker"); 31 328531 : std::scoped_lock lock(mutex_); 32 328531 : if (!current_table_walk_.empty()) return true; 33 : while (true) { 34 312128 : if (walk_request_list_.empty()) break; 35 226156 : WalkRequestInfoPtr info = walk_request_list_.front(); 36 226156 : walk_request_set_.erase(info.get()); 37 226156 : walk_request_list_.pop_front(); 38 226156 : current_table_walk_.swap(info->pending_requests); 39 226156 : DBTable *table = info->table; 40 226156 : bool walk_table = false; 41 453626 : for (auto walker : current_table_walk_) { 42 227470 : if (walker->stopped()) continue; 43 227468 : walker->set_in_progress(); 44 227468 : walker->reset_walk_again(); 45 227468 : walk_table = true; 46 227470 : } 47 226156 : if (walk_table) { 48 : // start the walk 49 226155 : table->StartWalk(); 50 226155 : break; 51 : } else { 52 1 : current_table_walk_.clear(); 53 : } 54 226157 : } 55 312127 : return true; 56 328531 : } 57 : 58 226155 : bool DBTableWalkMgr::ProcessWalkDone() { 59 226155 : CHECK_CONCURRENCY("db::Walker"); 60 226155 : assert(!current_table_walk_.empty()); 61 453624 : for (auto walker : current_table_walk_) { 62 227469 : if (walker->walk_again()) 63 244 : walker->set_walk_requested(); 64 227225 : else if (!walker->stopped()) 65 227222 : walker->set_walk_done(); 66 227469 : if (walker->stopped() || walker->walk_again()) continue; 67 227222 : walker->walk_complete()(walker, walker->table()); 68 227469 : } 69 226155 : current_table_walk_.clear(); 70 226155 : walk_request_trigger_->Set(); 71 226155 : return true; 72 : } 73 : 74 218999 : DBTable::DBTableWalkRef DBTableWalkMgr::AllocWalker(DBTable *table, 75 : DBTable::WalkFn walk_fn, DBTable::WalkCompleteFn walk_complete) { 76 218999 : table->incr_walker_count(); 77 218999 : DBTableWalk *walker = new DBTableWalk(table, walk_fn, walk_complete); 78 218999 : return DBTable::DBTableWalkRef(walker); 79 : } 80 : 81 157601 : void DBTableWalkMgr::ReleaseWalker(DBTable::DBTableWalkRef &ref) { 82 157601 : ref->set_walk_stopped(); 83 157601 : ref.reset(); 84 157601 : } 85 : 86 28746 : void DBTableWalkMgr::WalkAgain(DBTable::DBTableWalkRef ref) { 87 28746 : WalkTable(ref); 88 28746 : } 89 : 90 255670 : void DBTableWalkMgr::WalkTable(DBTable::DBTableWalkRef walk) { 91 255670 : std::scoped_lock lock(mutex_); 92 255670 : DBTable *table = walk->table(); 93 : 94 255670 : if (walk->in_progress()) { 95 461 : table->incr_walk_again_count(); 96 461 : walk->set_walk_again(); 97 : } else { 98 255209 : table->incr_walk_request_count(); 99 255209 : walk->set_walk_requested(); 100 : } 101 : 102 255670 : WalkRequestInfo tmp_info = WalkRequestInfo(table); 103 255670 : WalkRequestInfoSet::iterator it = walk_request_set_.find(&tmp_info); 104 255670 : if (it != walk_request_set_.end()) { 105 29514 : (*it)->AppendWalkReq(walk); 106 29514 : return; 107 : } 108 : 109 226156 : WalkRequestInfo *new_info = new WalkRequestInfo(table); 110 226156 : new_info->AppendWalkReq(walk); 111 226156 : walk_request_list_.push_back(WalkRequestInfoPtr(new_info)); 112 226156 : walk_request_set_.insert(new_info); 113 226156 : walk_request_trigger_->Set(); 114 285184 : } 115 : 116 226155 : void DBTableWalkMgr::WalkDone() { 117 226155 : walk_done_trigger_->Set(); 118 226155 : } 119 : 120 792756 : bool DBTableWalkMgr::InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry) { 121 792756 : uint32_t skip_walk_count = 0; 122 1587709 : for (auto walker : current_table_walk_) { 123 794870 : if (walker->done() || walker->stopped() || walker->walk_again()) { 124 383 : skip_walk_count++; 125 383 : continue; 126 : } 127 793935 : bool more = walker->walk_fn()(part, entry); 128 794227 : if (!more) { 129 5 : skip_walk_count++; 130 5 : if (!walker->stopped()) walker->set_walk_done(); 131 : } 132 794609 : } 133 792823 : return (skip_walk_count < current_table_walk_.size()); 134 : }