Line data Source code
1 : /* 2 : * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include "db/db_table_walk_mgr.h" 6 : 7 : #include <boost/bind/bind.hpp> 8 : #include <boost/foreach.hpp> 9 : 10 : #include "base/logging.h" 11 : #include "base/task.h" 12 : #include "base/task_annotations.h" 13 : #include "db/db.h" 14 : #include "db/db_partition.h" 15 : #include "db/db_table.h" 16 : #include "db/db_table_partition.h" 17 : 18 : using namespace boost::placeholders; 19 : 20 27398 : DBTableWalkMgr::DBTableWalkMgr() 21 54796 : : walk_request_trigger_(new TaskTrigger( 22 : boost::bind(&DBTableWalkMgr::ProcessWalkRequestList, this), 23 54796 : TaskScheduler::GetInstance()->GetTaskId("db::Walker"), 0)), 24 54796 : walk_done_trigger_(new TaskTrigger( 25 : boost::bind(&DBTableWalkMgr::ProcessWalkDone, this), 26 54796 : TaskScheduler::GetInstance()->GetTaskId("db::Walker"), 0)) { 27 27398 : } 28 : 29 327924 : bool DBTableWalkMgr::ProcessWalkRequestList() { 30 327924 : CHECK_CONCURRENCY("db::Walker"); 31 327924 : std::scoped_lock lock(mutex_); 32 327924 : if (!current_table_walk_.empty()) return true; 33 : while (true) { 34 311609 : if (walk_request_list_.empty()) break; 35 226166 : WalkRequestInfoPtr info = walk_request_list_.front(); 36 226166 : walk_request_set_.erase(info.get()); 37 226166 : walk_request_list_.pop_front(); 38 226166 : current_table_walk_.swap(info->pending_requests); 39 226166 : DBTable *table = info->table; 40 226166 : bool walk_table = false; 41 453677 : for (auto walker : current_table_walk_) { 42 227511 : if (walker->stopped()) continue; 43 227509 : walker->set_in_progress(); 44 227509 : walker->reset_walk_again(); 45 227509 : walk_table = true; 46 227511 : } 47 226166 : if (walk_table) { 48 : // start the walk 49 226165 : table->StartWalk(); 50 226165 : break; 51 : } else { 52 1 : current_table_walk_.clear(); 53 : } 54 226167 : } 55 311608 : return true; 56 327924 : } 57 : 58 226165 : bool DBTableWalkMgr::ProcessWalkDone() { 59 226165 : CHECK_CONCURRENCY("db::Walker"); 60 226165 : assert(!current_table_walk_.empty()); 61 453675 : for (auto walker : current_table_walk_) { 62 227510 : if (walker->walk_again()) 63 234 : walker->set_walk_requested(); 64 227276 : else if (!walker->stopped()) 65 227273 : walker->set_walk_done(); 66 227510 : if (walker->stopped() || walker->walk_again()) continue; 67 227273 : walker->walk_complete()(walker, walker->table()); 68 227510 : } 69 226165 : current_table_walk_.clear(); 70 226165 : walk_request_trigger_->Set(); 71 226165 : return true; 72 : } 73 : 74 219039 : DBTable::DBTableWalkRef DBTableWalkMgr::AllocWalker(DBTable *table, 75 : DBTable::WalkFn walk_fn, DBTable::WalkCompleteFn walk_complete) { 76 219039 : table->incr_walker_count(); 77 219039 : DBTableWalk *walker = new DBTableWalk(table, walk_fn, walk_complete); 78 219039 : return DBTable::DBTableWalkRef(walker); 79 : } 80 : 81 157605 : void DBTableWalkMgr::ReleaseWalker(DBTable::DBTableWalkRef &ref) { 82 157605 : ref->set_walk_stopped(); 83 157605 : ref.reset(); 84 157605 : } 85 : 86 28858 : void DBTableWalkMgr::WalkAgain(DBTable::DBTableWalkRef ref) { 87 28858 : WalkTable(ref); 88 28858 : } 89 : 90 255742 : void DBTableWalkMgr::WalkTable(DBTable::DBTableWalkRef walk) { 91 255742 : std::scoped_lock lock(mutex_); 92 255742 : DBTable *table = walk->table(); 93 : 94 255742 : if (walk->in_progress()) { 95 442 : table->incr_walk_again_count(); 96 442 : walk->set_walk_again(); 97 : } else { 98 255300 : table->incr_walk_request_count(); 99 255300 : walk->set_walk_requested(); 100 : } 101 : 102 255742 : WalkRequestInfo tmp_info = WalkRequestInfo(table); 103 255742 : WalkRequestInfoSet::iterator it = walk_request_set_.find(&tmp_info); 104 255742 : if (it != walk_request_set_.end()) { 105 29576 : (*it)->AppendWalkReq(walk); 106 29576 : return; 107 : } 108 : 109 226166 : WalkRequestInfo *new_info = new WalkRequestInfo(table); 110 226166 : new_info->AppendWalkReq(walk); 111 226166 : walk_request_list_.push_back(WalkRequestInfoPtr(new_info)); 112 226166 : walk_request_set_.insert(new_info); 113 226166 : walk_request_trigger_->Set(); 114 285318 : } 115 : 116 226165 : void DBTableWalkMgr::WalkDone() { 117 226165 : walk_done_trigger_->Set(); 118 226165 : } 119 : 120 795721 : bool DBTableWalkMgr::InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry) { 121 795721 : uint32_t skip_walk_count = 0; 122 1593603 : for (auto walker : current_table_walk_) { 123 797817 : if (walker->done() || walker->stopped() || walker->walk_again()) { 124 363 : skip_walk_count++; 125 363 : continue; 126 : } 127 797038 : bool more = walker->walk_fn()(part, entry); 128 797247 : if (!more) { 129 5 : skip_walk_count++; 130 5 : if (!walker->stopped()) walker->set_walk_done(); 131 : } 132 797611 : } 133 795654 : return (skip_walk_count < current_table_walk_.size()); 134 : }