00001
00002
00003
00004 #include "osl/search/parallelSearch.h"
00005 #include "osl/misc/ncores.h"
00006 #include <bitset>
00007 #include <iostream>
00008 #include <time.h>
00009
00010 #ifndef OSL_NCPUS
00011 static const int default_ncpus = osl::misc::ncores();
00012 #else
00013 static const int default_ncpus = OSL_NCPUS;
00014 #endif
00015
00016 osl::search::ParallelSearch osl::search::parallelSearch;
00017
00018 osl::search::StopNow::StopNow()
00019 : std::runtime_error("StopNow")
00020 {
00021 }
00022
00023
00024
00025 osl::search::DecActiveHelper::DecActiveHelper(ParallelSearch & parallel_search)
00026 : parallel_search(parallel_search){
00027 parallel_search.decActives();
00028 }
00029 osl::search::DecActiveHelper::~DecActiveHelper(){
00030 parallel_search.incActives();
00031 }
00032
00033 osl::search::IncActiveHelper::IncActiveHelper(ParallelSearch & parallel_search)
00034 : parallel_search(parallel_search){
00035 parallel_search.incActives();
00036 }
00037 osl::search::IncActiveHelper::~IncActiveHelper(){
00038 parallel_search.decActives();
00039 }
00040
00041 osl::search::IncWaitingHelper::IncWaitingHelper(ParallelSearch & parallel_search)
00042 : parallel_search(parallel_search){
00043 parallel_search.incWaiting();
00044 }
00045 osl::search::IncWaitingHelper::~IncWaitingHelper(){
00046 parallel_search.decWaiting();
00047 }
00048
00049 osl::search::DecWaitingHelper::DecWaitingHelper(ParallelSearch & parallel_search)
00050 : parallel_search(parallel_search){
00051 parallel_search.decWaiting();
00052 }
00053 osl::search::DecWaitingHelper::~DecWaitingHelper(){
00054 parallel_search.incWaiting();
00055 }
00056
00057
00058
00059 osl::search::JobContent::~JobContent()
00060 {
00061 }
00062
00063 void osl::search::JobContent::waitResult()
00064 {
00065 boost::mutex::scoped_lock lk(mutex);
00066 if (isFinished())
00067 return;
00068 DecActiveHelper decActiveHelper(parallelSearch);
00069 parallelSearch.makeWorkers();
00070 condition.wait(lk);
00071 parallelSearch.checkStop();
00072 }
00073
00074 void osl::search::JobContent::checkStop(){
00075 if (must_stop_now || parallelSearch.stopping()) {
00076 throw StopNow();
00077 }
00078 }
00079
00080 void osl::search::JobContent::stopNow() {
00081 must_stop_now=true;
00082 boost::mutex::scoped_lock lk(mutex);
00083 for (list_t::iterator p=children.begin(); p!=children.end(); ++p) {
00084 if (boost::shared_ptr<JobContent> ptr = p->lock()) {
00085 if (! ptr->stopping()) {
00086 ptr->stopNow();
00087 Worker *worker = ptr->getWorker();
00088 if (worker)
00089 worker->block_condition.notify_one();
00090 }
00091 }
00092 }
00093 }
00094
00095 void osl::search::JobContent::dump() const {
00096 std::cerr << " id " << id() << " w " << worker
00097 << " f " << std::boolalpha << finished
00098 << " p " << priority << " b " <<blocking
00099 << " stop " << must_stop_now << " children ";
00100 for (list_t::const_iterator p=children.begin(); p!=children.end(); ++p) {
00101 if (boost::shared_ptr<JobContent> ptr = p->lock()) {
00102 std::cerr << "job id " << (ptr ? ptr->id() : -1) << " "
00103 << std::boolalpha << ptr->stopping() << " ";
00104 }
00105 }
00106 std::cerr << "\n";
00107 }
00108
00109
00110 osl::search::JobPool::iterator
00111 osl::search::JobPool::getBlockingJob(uint64_t blocking, uint64_t sibling)
00112 {
00113 for (iterator p=jobs.begin(); p!=jobs.end(); ++p) {
00114 if ((blocking & p->getBlocking())
00115 != blocking)
00116 continue;
00117 if ((sibling | p->getBlocking())
00118 == 0)
00119 continue;
00120 return p;
00121 }
00122 return end();
00123 }
00124
00125
00126 void osl::search::Worker::operator()(void) {
00127 th=new boost::thread();
00128 for (;;) {
00129 setWaiting(true);
00130 assert(waitingBlock() == 0);
00131 try{
00132 Job job=parent->getJob(this);
00133 setWaiting(false);
00134 SetCurrentJobLock lock(*this,job);
00135 job();
00136 }
00137 catch (StopNow& e) {
00138 std::cerr << "ignore StopNow at root " << threadId() << "\n";
00139 }
00140 catch (std::exception& e) {
00141 std::cerr << "\ncaught exception at root " << threadId() << " "
00142 << e.what() << "\n";
00143 stopMaster();
00144 }
00145 catch (...)
00146 {
00147 std::cerr << "\ncaught unknown exception at root " << threadId() << "\n";
00148 stopMaster();
00149 }
00150 }
00151 }
00152
00153 void osl::search::Worker::stopMaster() {
00154 std::cerr << "\a";
00155 const bool success = parent->stopMaster();
00156 if (! success) {
00157 std::cerr << "stopMaster failed in worker " << threadId()
00158 << std::endl;
00159 }
00160 }
00161
00162 void osl::search::Worker::push_back(Job job) {
00163 {
00164 boost::mutex::scoped_lock lk(parent->shared_mutex);
00165 job_pool->push_back(job);
00166 }
00167 parent->makeWorkers();
00168 }
00169
00170 bool osl::search::Worker::pop_back_job() {
00171 boost::mutex::scoped_lock lk(parent->shared_mutex);
00172 return job_pool->pop_back();
00173 }
00174
00175 bool osl::search::Worker::pop_back_job(JobContent *content) {
00176 boost::mutex::scoped_lock lk(parent->shared_mutex);
00177 return job_pool->pop_back(content);
00178 }
00179
00180 void osl::search::Worker::checkStop() const {
00181 if (!current_job.isNull()) {
00182 current_job.checkStop();
00183 }
00184 else if (parallelSearch.stopping()) {
00185 throw StopNow();
00186 }
00187 }
00188
00189
00190
00191 osl::search::
00192 ParallelSearch::ParallelSearch()
00193 : num_actives(1), waiting(0), num_cpus(default_ncpus), master_stop_flag(0),
00194 verbose(false)
00195 {
00196 #ifdef ONE_THREAD_PER_PROC
00197 workers.reserve(num_cpus);
00198 #else
00199 workers.reserve(num_cpus*8);
00200 #endif
00201 Worker *self=new Worker(this, new boost::thread());
00202 addInLock(self);
00203 assert(self == getWorker());
00204 }
00205
00206 osl::search::
00207 ParallelSearch::~ParallelSearch()
00208 {
00209 if (verbose)
00210 std::cerr << "ParallelSearch"
00211 << " max_num_actives " << max_num_actives.value()
00212 << " workers " << workers.size() << std::endl;
00213 }
00214
00215 const osl::search::Job osl::search::
00216 ParallelSearch::getJobInLock(Worker *w)
00217 {
00218 int min_priority=JobContent::NotFoundPriority;
00219 int min_i= -1;
00220 for (size_t i=0;i<workers.size();i++) {
00221 int priority=workers[i]->getPriority();
00222 if (priority<min_priority) {
00223 min_priority=priority;
00224 min_i=i;
00225 }
00226 }
00227 if (min_i>=0) {
00228 Job ret=workers[min_i]->getJob(w);
00229 assert(!ret.isNull());
00230 return ret;
00231 }
00232 return Job();
00233 }
00234
00235 const osl::search::Job osl::search::
00236 ParallelSearch::getJobNonBlock(Worker *w)
00237 {
00238 w->checkStop();
00239 Job ret;
00240 {
00241 boost::mutex::scoped_lock lk(shared_mutex);
00242 ret=getJobInLock(w);
00243 }
00244 return ret;
00245 }
00246
00247 const osl::search::Job osl::search::
00248 ParallelSearch::getJobNonBlock(Worker *w, uint64_t blocking, uint64_t sibling)
00249 {
00250 w->checkStop();
00251 {
00252 boost::mutex::scoped_lock lk(shared_mutex);
00253
00254 int min_priority=JobContent::NotFoundPriority;
00255 int min_i= -1;
00256 JobPool::iterator min_p;
00257 for (size_t i=0;i<workers.size();i++) {
00258 JobPool::iterator p = workers[i]->job_pool->getBlockingJob(blocking, sibling);
00259 if (p == workers[i]->job_pool->end())
00260 continue;
00261 int priority=p->getPriority();
00262 if (priority<min_priority) {
00263 min_priority=priority;
00264 min_i=i;
00265 min_p = p;
00266 }
00267 }
00268 if (min_i>=0) {
00269 Job ret=workers[min_i]->job_pool->getJob(w, min_p);
00270 assert(!ret.isNull());
00271 w->setWaiting(false);
00272 return ret;
00273 }
00274 }
00275 return Job();
00276 }
00277
00278 const osl::search::Job osl::search::
00279 ParallelSearch::getJob(Worker *w)
00280 {
00281 IncWaitingHelper incWaitingHelper(parallelSearch);
00282 DecActiveHelper decActiveHelper(parallelSearch);
00283 for (;;) {
00284 boost::mutex::scoped_lock lk(shared_mutex);
00285 Job ret=getJobInLock(w);
00286 if (!ret.isNull())
00287 return ret;
00288 shared_condition.wait(lk);
00289 w->checkStop();
00290 }
00291 }
00292
00293 osl::search::Worker* osl::search::
00294 ParallelSearch::getWorker() {
00295 boost::mutex::scoped_lock lk(shared_mutex);
00296 boost::thread self;
00297 for (size_t i=0;i<workers.size();i++) {
00298 boost::thread* th=workers[i]->getThread();
00299 if (th!=0 && *th==self)
00300 return workers[i];
00301 }
00302 Worker *worker=new Worker(this,new boost::thread());
00303 addInLock(worker);
00304 return worker;
00305 }
00306
00307 void osl::search::
00308 ParallelSearch::checkStop() {
00309 Worker *worker=getWorker();
00310 assert(worker!=0);
00311 worker->checkStop();
00312 }
00313
00314 namespace osl
00315 {
00316 namespace search
00317 {
00318 struct WorkerProxy
00319 {
00320 Worker *original;
00321 explicit WorkerProxy(Worker *o) : original(o)
00322 {
00323 }
00324 void operator()()
00325 {
00326 original->operator()();
00327 }
00328 };
00329 }
00330 }
00331
00332
00333 void osl::search::
00334 ParallelSearch::makeWorkers() {
00335 {
00336 boost::mutex::scoped_lock lk(shared_mutex);
00337 #ifdef ONE_THREAD_PER_PROC
00338 while(thread_count.value()<num_cpus-1){
00339 thread_count.inc();
00340 incActives();
00341 Worker *worker=new Worker(this);
00342 addInLock(worker);
00343 new boost::thread(WorkerProxy(worker));
00344 }
00345 if (waiting.value()!=0)
00346 shared_condition.notify_one();
00347 #else
00348 if (waiting.value()==0) {
00349 if (num_actives.value()<num_cpus) {
00350 thread_count.inc();
00351 incActives();
00352 Worker *worker=new Worker(this);
00353 addInLock(worker);
00354 new boost::thread(*worker);
00355 }
00356 }
00357 else if (num_actives.value()<num_cpus) {
00358 notify_one();
00359 }
00360 #endif
00361 }
00362 checkTime();
00363 }
00364
00365 void osl::search::
00366 ParallelSearch::checkTime() {
00367 if (! verbose)
00368 return;
00369 time_t new_time=time(NULL);
00370 if (last_time != new_time) {
00371 last_time=new_time;
00372 if (num_actives.value() != num_cpus)
00373 std::cerr << "num_actives=" << num_actives.value() <<
00374 ",waiting=" << waiting.value() << std::endl;
00375 }
00376 }
00377
00378 void osl::search::
00379 ParallelSearch::incActives() {
00380 num_actives.inc();
00381 max_num_actives.max(num_actives.value());
00382 checkTime();
00383 }
00384 void osl::search::
00385 ParallelSearch::decActives() {
00386 checkTime();
00387 num_actives.dec();
00388 }
00389 void osl::search::
00390 ParallelSearch::incWaiting() {
00391 waiting.inc();
00392 }
00393 void osl::search::
00394 ParallelSearch::decWaiting() {
00395 waiting.dec();
00396 }
00397
00398
00399 void osl::search::
00400 ParallelSearch::waitAll() {
00401 if (verbose)
00402 std::cerr << "ParallelSearch::waitAll\n";
00403 struct timespec t = { 0, 100000 };
00404 for (;;) {
00405 {
00406 boost::mutex::scoped_lock lk(shared_mutex);
00407 stop_all=true;
00408
00409 for (size_t i=0;i<workers.size();i++)
00410 workers[i]->clearJobs();
00411
00412 #ifdef ONE_THREAD_PER_PROC
00413 const int n=1+thread_count.value()-waiting.value();
00414 #else
00415 const int n=num_actives.value();
00416 #endif
00417 if (n<=1)
00418 break;
00419 }
00420 nanosleep(&t, 0);
00421 }
00422 stop_all=false;
00423 master_stop_flag = AlarmSwitch(0);
00424 }
00425
00426 bool osl::search::
00427 ParallelSearch::stopMaster()
00428 {
00429 if (! master_stop_flag.notify)
00430 {
00431 std::cerr << "ParallelSearch master_stop_flag is null\n";
00432 return false;
00433 }
00434 *master_stop_flag.notify = Alarm::TIMEOUT;
00435 return true;
00436 }
00437
00438 bool osl::search::
00439 ParallelSearch::hasWaiting(uint64_t blocking, osl::search::Worker *& who) {
00440 who = 0;
00441 if (waiting.value())
00442 return true;
00443 if (numActives() >= numCpus())
00444 return false;
00445
00446 boost::mutex::scoped_lock lk(shared_mutex);
00447 for (size_t i=0;i<workers.size();i++) {
00448 if (! workers[i]->isWaiting())
00449 continue;
00450 if ((workers[i]->waitingBlock() & blocking)
00451 == workers[i]->waitingBlock()) {
00452 if (workers[i]->waitingSibling() | blocking) {
00453 who = workers[i];
00454 return true;
00455 }
00456 }
00457 }
00458 return false;
00459 }
00460
00461 void osl::search::
00462 ParallelSearch::reportWorkers()
00463 {
00464 boost::mutex::scoped_lock lk(shared_mutex);
00465 std::cerr << "+++ workers\n";
00466 for (size_t i=0;i<workers.size();i++) {
00467 std::cerr << "worker " << i << " ";
00468 if (workers[i]->current_job.isNull()) {
00469 std::cerr << "null\n";
00470 continue;
00471 }
00472 std::cerr << std::bitset<64>(workers[i]->current_job.getBlocking())
00473 << std::endl;
00474 std::cerr << "pool " << i << "\n";
00475 for (JobPool::const_iterator p=workers[i]->job_pool->jobs.begin();
00476 p!=workers[i]->job_pool->jobs.end(); ++p) {
00477 std::cerr << p->getPriority() << " " << std::bitset<64>(p->getBlocking())
00478 << "\n";
00479 }
00480 }
00481 }
00482
00483
00484
00485
00486
00487