00001 #ifndef _PARALLEL_SEARCH_H
00002 #define _PARALLEL_SEARCH_H
00003 #include "osl/misc/lightMutex.h"
00004 #include "osl/misc/atomicCounter.h"
00005 #include "osl/misc/alarm.h"
00006 #include "osl/stl/vector.h"
00007 #include "osl/stl/slist.h"
00008 #include "osl/misc/cstdint.h"
00009 #include <boost/shared_ptr.hpp>
00010 #include <boost/weak_ptr.hpp>
00011 #include <boost/thread.hpp>
00012 #include <deque>
00013 #include <stdexcept>
00014
00015 #ifndef NOT_ONE_THREAD_PER_PROC
00016
00017 # define ONE_THREAD_PER_PROC
00018 #endif
00019
00020 namespace osl
00021 {
00022 namespace search
00023 {
00024 typedef LightMutex Mutex;
00025
00026 class Worker;
00027 class Job;
00028 struct StopNow : std::runtime_error
00029 {
00030 StopNow();
00031 };
00032 class JobContent
00033 {
00034 public:
00035 static const int DefaultPriority=100;
00036 static const int NotFoundPriority=100000;
00037 private:
00038 boost::condition condition;
00039 boost::mutex mutex;
00040 Worker* worker;
00041 volatile bool finished;
00042 int priority;
00043 uint64_t blocking;
00044 volatile bool must_stop_now;
00045 typedef slist<boost::weak_ptr<JobContent> > list_t;
00046 list_t children;
00047 public:
00048 JobContent() : worker(0), finished(0), priority(DefaultPriority),
00049 blocking(0),
00050 must_stop_now(false)
00051 {
00052 }
00053 explicit JobContent(int priority, uint64_t b=0)
00054 : worker(0), finished(0), priority(priority), blocking(b),
00055 must_stop_now(false)
00056 {
00057 assert(priority<NotFoundPriority);
00058 }
00059 virtual ~JobContent();
00060 virtual void operator()(void)=0;
00061 virtual int id() const { return 0; }
00062 void setFinished(bool val) {
00063 boost::mutex::scoped_lock lk(mutex);
00064 finished=val;
00065 condition.notify_all();
00066 }
00067 void dump() const;
00068 bool isFinished() const{
00069 return finished;
00070 }
00071 void setWorker(Worker* w) { worker=w; }
00072 Worker *getWorker() const { return worker; }
00073 void checkStop();
00074 void waitResult();
00075 int getPriority() const{
00076 return priority;
00077 }
00078 uint64_t getBlocking() const{
00079 return blocking;
00080 }
00081 void setPriority(int p) {
00082 priority=p;
00083 }
00084 void stopNow();
00085 void addChildJob(const boost::shared_ptr<JobContent>& new_child) {
00086 boost::mutex::scoped_lock lk(mutex);
00087 children.push_front(new_child);
00088 }
00089 bool stopping() const {
00090 return must_stop_now;
00091 }
00092 };
00093 class ExitNullJobLock
00094 {
00095 boost::shared_ptr<JobContent> job_content;
00096 public:
00097 ExitNullJobLock(const boost::shared_ptr<JobContent>& job_content)
00098 : job_content(job_content) {}
00099 ~ExitNullJobLock() {
00100 job_content->setWorker(0);
00101 job_content->setFinished(true);
00102 }
00103 };
00104 class Job
00105 {
00106 boost::shared_ptr<JobContent> job_content;
00107 public:
00108 Job() {}
00109 explicit Job(const boost::shared_ptr<JobContent>& job_content)
00110 : job_content(job_content) {
00111 }
00112 bool isNull() const{
00113 return !job_content;
00114 }
00115 JobContent* getContent() { return job_content.get(); }
00116 void operator()(void) {
00117 ExitNullJobLock lock(job_content);
00118 assert(!isNull());
00119 try{
00120 (*job_content)();
00121 }
00122 catch (StopNow& e) {
00123 }
00124 }
00125 void setWorker(Worker* w) {
00126 job_content->setWorker(w);
00127 }
00128 void setFinished(bool val) {
00129 job_content->setFinished(val);
00130 }
00131 void waitResult() {
00132 job_content->waitResult();
00133 }
00134 int getPriority() const{
00135 return job_content->getPriority();
00136 }
00137 uint64_t getBlocking() const{
00138 return job_content->getBlocking();
00139 }
00140 void stopNow() const {
00141 job_content->stopNow();
00142 }
00143 void checkStop() const {
00144 job_content->checkStop();
00145 }
00146 void addChildJob(const boost::shared_ptr<JobContent>& new_child) {
00147 if (job_content)
00148 job_content->addChildJob(new_child);
00149 }
00150 friend class JobPool;
00151 };
00152 class JobPool
00153 {
00154 std::list<Job> jobs;
00155 friend class ParallelSearch;
00156 public:
00157 typedef std::list<Job>::iterator iterator;
00158 typedef std::list<Job>::const_iterator const_iterator;
00159 void push_back(Job job) {
00160 jobs.push_back(job);
00161 }
00162 bool pop_back() {
00163 if (jobs.empty())
00164 return false;
00165 jobs.pop_back();
00166 return true;
00167 }
00168 bool pop_back(JobContent *content) {
00169 for (iterator p=jobs.begin(); p!=jobs.end(); ++p) {
00170 if (p->job_content.get() == content) {
00171 jobs.erase(p);
00172 return true;
00173 }
00174 }
00175 return false;
00176 }
00177 int getPriority() const{
00178 if (jobs.empty())
00179 return JobContent::NotFoundPriority;
00180 return jobs.front().getPriority();
00181 }
00182 int getPriorityMax() const{
00183 if (jobs.empty())
00184 return -1;
00185 return jobs.back().getPriority();
00186 }
00188 const Job getJob(Worker *worker) {
00189 if (jobs.empty())
00190 return Job();
00191 Job ret=jobs.front();
00192 jobs.pop_front();
00193 ret.setWorker(worker);
00194 return ret;
00195 }
00196
00197 iterator getBlockingJob(uint64_t, uint64_t);
00198 const Job getJob(Worker *worker, iterator p)
00199 {
00200 assert(p!=jobs.end());
00201 Job ret = *p;
00202 jobs.erase(p);
00203 ret.setWorker(worker);
00204 return ret;
00205 }
00206 iterator end() { return jobs.end(); }
00207 void clear(){
00208 jobs.clear();
00209 }
00210 };
00211 class ParallelSearch;
00215 class Worker
00216 {
00217 ParallelSearch* parent;
00218 boost::thread *th;
00219 boost::shared_ptr<JobPool> job_pool;
00220 Job current_job;
00221 unsigned int thread_id;
00222 volatile bool waiting;
00223 volatile uint64_t waiting_block;
00224 volatile uint64_t sibling;
00225 public:
00226 explicit Worker(ParallelSearch* parent,boost::thread* th=0)
00227 : parent(parent),th(th),job_pool(new JobPool()),
00228 thread_id(static_cast<unsigned int>(-1)), waiting(true), waiting_block(0)
00229 {
00230 }
00231 void operator()(void);
00233 const Job getJob(Worker *w) {
00234 Job ret=job_pool->getJob(w);
00235 return ret;
00236 }
00237 int getPriority() const{
00238 return job_pool->getPriority();
00239 }
00240 int getPriorityMax() const{
00241 return job_pool->getPriorityMax();
00242 }
00243 boost::thread* getThread() const{
00244 return th;
00245 }
00247 void push_back(Job job);
00249 bool pop_back_job();
00250 bool pop_back_job(JobContent*);
00251 public:
00252 boost::mutex block_mutex;
00253 boost::condition block_condition;
00254
00255 void checkStop() const;
00256 const Job getCurrentJob() const{
00257 return current_job;
00258 }
00259 void setCurrentJob(Job const& job) {
00260 current_job=job;
00261 }
00262 void resetCrurrentJob() {
00263 current_job=Job();
00264 }
00265 unsigned int threadId() const { return thread_id; }
00266 friend class ParallelSearch;
00267
00268 void stopMaster();
00269 bool isWaiting() const { return waiting; }
00270 void setWaiting(bool wait) { waiting = wait; }
00271 void setWaitingBlock(uint64_t block, uint64_t sib) { waiting = true; waiting_block = block; sibling = sib; }
00272 void resetWaitingBlock() { waiting = false; waiting_block = 0; }
00273 uint64_t waitingBlock() const { return waiting_block; }
00274 uint64_t waitingSibling() const { return sibling; }
00275 private:
00276 void setThreadId(unsigned int id) { thread_id = id; }
00278 void clearJobs() { job_pool->clear(); };
00279 };
00280
00281 class SetCurrentJobLock
00282 {
00283 Worker& worker;
00284 Job backup;
00285 public:
00286 SetCurrentJobLock(Worker& w,Job const& job) :worker(w),backup(w.getCurrentJob()){
00287 worker.setCurrentJob(job);
00288 }
00289 ~SetCurrentJobLock(){
00290 worker.setCurrentJob(backup);
00291 }
00292 };
00293
00294 struct DecActiveHelper{
00295 ParallelSearch & parallel_search;
00296 explicit DecActiveHelper(ParallelSearch & parallel_search);
00297 ~DecActiveHelper();
00298 };
00299 struct IncActiveHelper{
00300 ParallelSearch & parallel_search;
00301 explicit IncActiveHelper(ParallelSearch & parallel_search);
00302 ~IncActiveHelper();
00303 };
00304 struct IncWaitingHelper{
00305 ParallelSearch & parallel_search;
00306 explicit IncWaitingHelper(ParallelSearch & parallel_search);
00307 ~IncWaitingHelper();
00308 };
00309 struct DecWaitingHelper{
00310 ParallelSearch & parallel_search;
00311 explicit DecWaitingHelper(ParallelSearch & parallel_search);
00312 ~DecWaitingHelper();
00313 };
00314
00315 class ParallelSearch
00316 {
00317 AtomicCounter num_actives;
00318 AtomicCounter max_num_actives;
00319 AtomicCounter thread_count;
00320 AtomicCounter waiting;
00321 vector<Worker*> workers;
00322 int num_cpus;
00323 time_t last_time;
00324 AlarmSwitch master_stop_flag;
00325 bool verbose;
00326 volatile bool stop_all;
00327 void checkTime();
00328 public:
00329 static const int MaxCPUs = 16;
00330
00331 boost::condition shared_condition;
00332 boost::mutex shared_mutex;
00333
00334 ParallelSearch();
00335 ~ParallelSearch();
00336
00337 void setVerbose(bool v=true) { verbose = v; }
00338 void addInLock(Worker* worker) {
00339 workers.push_back(worker);
00340 worker->setThreadId(workers.size()-1);
00341 }
00342 void add(Worker* worker) {
00343 boost::mutex::scoped_lock lk(shared_mutex);
00344 addInLock(worker);
00345 }
00346 const Job getJob(Worker *w);
00347 const Job getJobInLock(Worker *w);
00348 const Job getJobNonBlock(Worker *w);
00350 const Job getJobNonBlock(Worker *w, uint64_t blocking, uint64_t sibling);
00355 Worker* getWorker();
00356 void notify_one() {
00357 shared_condition.notify_one();
00358 }
00359 void makeWorkers();
00360 void setNumCPUs(int n) {
00361 assert(n <= MaxCPUs);
00362 num_cpus=n;
00363 }
00364 int numCpus() const { return num_cpus; }
00365 void checkStop();
00366 bool stopping() const {
00367 return stop_all
00368 || (master_stop_flag.notify
00369 && *master_stop_flag.notify == Alarm::TIMEOUT);
00370 }
00371 void waitAll();
00372
00373 void setMasterAlarm(AlarmSwitch alarm) { master_stop_flag = alarm; }
00374 bool stopMaster();
00375 void reportWorkers();
00376
00377 int numActives() const { return num_actives.value(); }
00378
00379
00380
00381 bool hasWaiting(uint64_t blocking, Worker *& who);
00382 private:
00383 friend class JobContent;
00384 friend class IncActiveHelper;
00385 friend class DecActiveHelper;
00386 friend class IncWaitingHelper;
00387 friend class DecWaitingHelper;
00388 void incActives();
00389 void decActives();
00390 void incWaiting();
00391 void decWaiting();
00392 };
00393
00394 extern osl::search::ParallelSearch parallelSearch;
00395 }
00396 }
00397 #endif
00398
00399
00400
00401