00001 /*!@file Util/WorkThreadServer.C Generic low-level worker-thread server class */ 00002 00003 // //////////////////////////////////////////////////////////////////// // 00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2000-2005 // 00005 // by the University of Southern California (USC) and the iLab at USC. // 00006 // See http://iLab.usc.edu for information about this project. // 00007 // //////////////////////////////////////////////////////////////////// // 00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected // 00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency // 00010 // in Visual Environments, and Applications'' by Christof Koch and // 00011 // Laurent Itti, California Institute of Technology, 2001 (patent // 00012 // pending; application number 09/912,225 filed July 23, 2001; see // 00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status). // 00014 // //////////////////////////////////////////////////////////////////// // 00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit. // 00016 // // 00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can // 00018 // redistribute it and/or modify it under the terms of the GNU General // 00019 // Public License as published by the Free Software Foundation; either // 00020 // version 2 of the License, or (at your option) any later version. // 00021 // // 00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope // 00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the // 00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // 00025 // PURPOSE. See the GNU General Public License for more details. // 00026 // // 00027 // You should have received a copy of the GNU General Public License // 00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write // 00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, // 00030 // Boston, MA 02111-1307 USA. // 00031 // //////////////////////////////////////////////////////////////////// // 00032 // 00033 // Primary maintainer for this file: Rob Peters <rjpeters at usc dot edu> 00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Util/WorkThreadServer.C $ 00035 // $Id: WorkThreadServer.C 14129 2010-10-13 04:45:06Z itti $ 00036 // 00037 00038 #ifndef UTIL_WORKTHREADSERVER_C_DEFINED 00039 #define UTIL_WORKTHREADSERVER_C_DEFINED 00040 00041 #include "Util/WorkThreadServer.H" 00042 00043 #include "Util/Assert.H" 00044 #include "Util/log.H" 00045 #include "rutz/error_context.h" 00046 #include "rutz/mutex.h" 00047 #include "rutz/sfmt.h" 00048 00049 #include <algorithm> // for stable_sort() 00050 #include <cstdlib> 00051 #include <signal.h> 00052 #include <unistd.h> 00053 00054 struct WorkThreadServer::ThreadData 00055 { 00056 ThreadData() 00057 : 00058 srv(0), thread(), index(0), njobs(0), worktime(), waittime() 00059 {} 00060 00061 WorkThreadServer* srv; 00062 pthread_t thread; 00063 unsigned int index; 00064 unsigned int njobs; 00065 rutz::time worktime; 00066 rutz::time waittime; 00067 }; 00068 00069 struct WorkThreadServer::Checkpoint 00070 { 00071 Checkpoint(int j, size_t qs, size_t mqs) : jobnum(j), qsize(qs), maxqsize(mqs) {} 00072 00073 int jobnum; 00074 size_t qsize; 00075 size_t maxqsize; 00076 }; 00077 00078 struct WorkThreadServer::JobStats 00079 { 00080 JobStats() : nqueued(0), ndropped(0) {} 00081 00082 unsigned int nqueued; 00083 unsigned int ndropped; 00084 }; 00085 00086 namespace 00087 { 00088 struct JobPriorityCompare 00089 { 00090 bool operator()(const rutz::shared_ptr<JobServer::Job>& j1, 00091 const rutz::shared_ptr<JobServer::Job>& j2) 00092 { 00093 return j1->priority() < j2->priority(); 00094 } 00095 }; 00096 } 00097 00098 // ###################################################################### 00099 WorkThreadServer::WorkThreadServer(const char* name, unsigned int n, const bool verbose_logs) 00100 : 00101 itsName(name), 00102 itsNumThreads(0), 00103 itsVerboseLogs(verbose_logs), 00104 itsThreads(0), 00105 itsJobsMutex(), 00106 itsJobs(), 00107 itsSortJobs(false), 00108 itsJobsCounter(0u), 00109 itsMaxQueueSize(itsJobs.max_size()), 00110 itsDropPolicy(DROP_OLDEST), 00111 itsNumQueued(0), 00112 itsNumDropped(0), 00113 itsFlushBeforeStopping(false), 00114 itsJobsQuit(false), 00115 itsMaxObservedQueueSize(0), 00116 itsLastCheckpoint(-1), 00117 itsCheckpointPeriod(0), 00118 itsSleepUsecs(0), 00119 itsCheckpoints() 00120 { 00121 // init mutex 00122 if (0 != pthread_mutex_init(&itsJobsMutex, NULL)) 00123 LFATAL("pthread_mutex_init() failed"); 00124 00125 itsNumRun.atomic_set(0); 00126 00127 this->start(n); 00128 } 00129 00130 // ###################################################################### 00131 WorkThreadServer::~WorkThreadServer() 00132 { 00133 this->stop(); 00134 00135 if (0 != pthread_mutex_destroy(&itsJobsMutex)) 00136 // not LFATAL(), because we never want to generate an exception in 00137 // a destructor 00138 LERROR("pthread_mutex_destroy() failed"); 00139 } 00140 00141 // ###################################################################### 00142 void WorkThreadServer::start(unsigned int nthreads) 00143 { 00144 // start threads 00145 ASSERT(itsThreads == 0); 00146 itsNumThreads = nthreads; 00147 itsThreads = new ThreadData[itsNumThreads]; 00148 itsJobsQuit = false; 00149 for (uint i = 0; i < itsNumThreads; ++i) 00150 { 00151 itsThreads[i].srv = this; 00152 itsThreads[i].index = 0; 00153 if (0 != pthread_create(&(itsThreads[i].thread), NULL, 00154 &WorkThreadServer::c_run, 00155 static_cast<void*>(itsThreads+i))) 00156 LFATAL("pthread_create() failed for thread %u of %u", 00157 i+1, itsNumThreads); 00158 } 00159 itsNumQueued = 0; 00160 00161 itsCheckpoints.clear(); 00162 } 00163 00164 // ###################################################################### 00165 void WorkThreadServer::stop() 00166 { 00167 // do we want to wait for pending jobs to complete before we shut 00168 // down the worker threads? 00169 if (itsFlushBeforeStopping) 00170 this->flushQueue(); 00171 00172 // tell the worker threads to quit when they wake up next 00173 itsJobsQuit = true; 00174 00175 // post a bunch of fake jobs to make sure the worker threads wake up 00176 // and realize that we want to quit 00177 for (uint i = 0; i < itsNumThreads; ++i) 00178 itsJobsCounter.post(); 00179 00180 if (itsVerboseLogs) { 00181 LINFO("%s: %u jobs were queued (%u of those dropped) into %u worker thread(s)", 00182 itsName.c_str(), itsNumQueued, itsNumDropped, itsNumThreads); 00183 00184 for (std::map<std::string, JobStats>::iterator 00185 itr = itsJobStats.begin(), stop = itsJobStats.end(); itr != stop; ++itr) 00186 LINFO("%s: including %u jobs (%u dropped) of type %s", itsName.c_str(), (*itr).second.nqueued, 00187 (*itr).second.ndropped, (*itr).first.c_str()); 00188 00189 // dump the contents of our checkpoint cache: 00190 for (std::list<Checkpoint>::const_iterator 00191 itr = itsCheckpoints.begin(), stop = itsCheckpoints.end(); itr != stop; ++itr) 00192 LINFO("%s checkpoint: job #%06d - queue len: %"ZU" (max %"ZU")", 00193 itsName.c_str(), (*itr).jobnum, (*itr).qsize, (*itr).maxqsize); 00194 } 00195 00196 for (uint i = 0; i < itsNumThreads; ++i) { 00197 if (0 != pthread_join(itsThreads[i].thread, NULL)) 00198 LERROR("pthread_join() failed for thread %u of %u", i+1, itsNumThreads); 00199 if (itsVerboseLogs) 00200 LINFO("%s: thread %u of %u ran %u jobs with %.3fs work time and %.3fs wait time", 00201 itsName.c_str(), i+1, itsNumThreads, itsThreads[i].njobs, itsThreads[i].worktime.sec(), 00202 itsThreads[i].waittime.sec()); 00203 } 00204 00205 delete [] itsThreads; 00206 itsThreads = 0; 00207 itsNumThreads = 0; 00208 00209 if (itsJobs.size() > 0 && itsVerboseLogs) 00210 LINFO("%s: %"ZU" jobs were abandoned in the job queue", itsName.c_str(), itsJobs.size()); 00211 00212 itsJobs.resize(0); 00213 } 00214 00215 // ###################################################################### 00216 void WorkThreadServer::doEnqueueJob(const rutz::shared_ptr<Job>& j) 00217 { 00218 // the caller must ensure that itsJobsMutex is already locked when 00219 // this function is called 00220 00221 itsJobs.push_back(j); 00222 00223 if (itsSortJobs) 00224 { 00225 std::stable_sort(itsJobs.begin(), itsJobs.end(), 00226 JobPriorityCompare()); 00227 } 00228 00229 if (itsJobs.size() <= itsMaxQueueSize) 00230 { 00231 itsJobsCounter.post(); 00232 } 00233 else 00234 { 00235 ++itsNumDropped; 00236 00237 ASSERT(itsJobs.size() > 1); // because itsMaxQueueSize must be >= 1 00238 00239 switch (itsDropPolicy) 00240 { 00241 case DROP_OLDEST: 00242 00243 // drop the job at the front of the queue 00244 itsJobStats[itsJobs.front()->jobType()].ndropped++; 00245 itsJobs.front()->drop(); 00246 itsJobs.pop_front(); 00247 break; 00248 00249 case DROP_NEWEST: 00250 00251 // drop the job at the back of the queue (this may or may 00252 // not be the same job that we have just added, depending on 00253 // whether or not we have done priority sort) 00254 itsJobStats[itsJobs.back()->jobType()].ndropped++; 00255 itsJobs.back()->drop(); 00256 itsJobs.pop_back(); 00257 break; 00258 00259 case DROP_OLDEST_LOWEST_PRIORITY: // fall-through 00260 case DROP_NEWEST_LOWEST_PRIORITY: 00261 { 00262 // find the lowest priority job in the queue... 00263 int lowprio = itsJobs.front()->priority(); 00264 00265 typedef std::deque<rutz::shared_ptr<Job> >::iterator iterator; 00266 00267 iterator todrop = itsJobs.begin(); 00268 iterator itr = todrop; ++itr; 00269 iterator const stop = itsJobs.end(); 00270 00271 for ( ; itr != stop; ++itr) 00272 if (itsDropPolicy == DROP_OLDEST_LOWEST_PRIORITY) 00273 { 00274 // use strictly-greater-than here, so that we get 00275 // the item with lowest priority that is closest to 00276 // the front of the queue (i.e., oldest) 00277 if ((*itr)->priority() > lowprio) 00278 todrop = itr; 00279 } 00280 else // itsDropPolicy == DROP_NEWEST_LOWEST_PRIORITY 00281 { 00282 // use greater-than-or-equal here, so that we get 00283 // the item with lowest priority that is closest to 00284 // the back of the queue (i.e., newest) 00285 if ((*itr)->priority() >= lowprio) 00286 todrop = itr; 00287 } 00288 00289 // and drop it: 00290 itsJobStats[(*todrop)->jobType()].ndropped++; 00291 (*todrop)->drop(); 00292 itsJobs.erase(todrop); 00293 } 00294 break; 00295 } 00296 } 00297 } 00298 00299 // ###################################################################### 00300 void WorkThreadServer::enqueueJob(const rutz::shared_ptr<Job>& j) 00301 { 00302 if (itsNumThreads == 0 || itsThreads == NULL) 00303 LFATAL("Can't enqueue jobs into a server with no threads " 00304 "(jobs would block forever)"); 00305 00306 ASSERT(itsMaxQueueSize >= 1); 00307 00308 { 00309 GVX_MUTEX_LOCK(&itsJobsMutex); 00310 this->doEnqueueJob(j); 00311 } 00312 00313 itsJobStats[j->jobType()].nqueued++; 00314 00315 ++itsNumQueued; 00316 } 00317 00318 // ###################################################################### 00319 void WorkThreadServer::enqueueJobs(const rutz::shared_ptr<Job>* jobs, 00320 const size_t njobs) 00321 { 00322 if (itsNumThreads == 0 || itsThreads == NULL) 00323 LFATAL("Can't enqueue jobs into a server with no threads " 00324 "(jobs would block forever)"); 00325 00326 ASSERT(itsMaxQueueSize >= 1); 00327 00328 { 00329 GVX_MUTEX_LOCK(&itsJobsMutex); 00330 for (size_t i = 0; i < njobs; ++i) 00331 this->doEnqueueJob(jobs[i]); 00332 } 00333 00334 for (size_t i = 0; i < njobs; ++i) 00335 itsJobStats[jobs[i]->jobType()].nqueued++; 00336 00337 itsNumQueued += njobs; 00338 } 00339 00340 // ###################################################################### 00341 unsigned int WorkThreadServer::getParallelismHint() 00342 { 00343 return itsNumThreads; 00344 } 00345 00346 // ###################################################################### 00347 void WorkThreadServer::flushQueue(uint sleepLength, bool verbose) 00348 { 00349 while (true) 00350 { 00351 const size_t sz = this->size(); 00352 00353 if (sz == 0) 00354 break; 00355 00356 if(verbose) LINFO("%s: flushing queue - %"ZU" remaining", 00357 itsName.c_str(), sz); 00358 usleep(sleepLength); 00359 } 00360 } 00361 00362 // ###################################################################### 00363 size_t WorkThreadServer::size() 00364 { 00365 size_t ret; 00366 { 00367 GVX_MUTEX_LOCK(&itsJobsMutex); 00368 ret = itsJobs.size(); 00369 } 00370 return ret; 00371 } 00372 00373 // ###################################################################### 00374 void WorkThreadServer::setMaxQueueSize(size_t maxsize) 00375 { 00376 if (maxsize == 0) 00377 LFATAL("queue size must be greater than zero"); 00378 00379 GVX_MUTEX_LOCK(&itsJobsMutex); 00380 itsMaxQueueSize = maxsize; 00381 } 00382 00383 // ###################################################################### 00384 void WorkThreadServer::run(ThreadData* dat) 00385 { 00386 GVX_ERR_CONTEXT(rutz::sfmt("running %s worker thread %u/%u", 00387 itsName.c_str(), dat->index + 1, itsNumThreads)); 00388 00389 rutz::time prev = rutz::time::wall_clock_now(); 00390 00391 while (1) 00392 { 00393 itsJobsCounter.wait(); 00394 00395 { 00396 rutz::time t = rutz::time::wall_clock_now(); 00397 dat->waittime += (t - prev); 00398 prev = t; 00399 } 00400 00401 if (itsJobsQuit == true) 00402 break; 00403 00404 rutz::shared_ptr<JobServer::Job> job; 00405 00406 size_t qsize = 0; 00407 00408 { 00409 GVX_MUTEX_LOCK(&itsJobsMutex); 00410 00411 if (itsJobs.size() == 0) 00412 { 00413 // If this happens, it probably represents an OS bug 00414 // (somehow the semaphore counts have gotten corrupted), 00415 // but we will issue an LERROR() instead of an LFATAL() 00416 // and try to keep limping along with the goal of not 00417 // unnecessarily bombing out of a long analysis job. 00418 LERROR("Oops! Received a semaphore but the job queue is empty!"); 00419 continue; 00420 } 00421 00422 ASSERT(itsJobs.size() > 0); 00423 00424 if (itsJobs.size() > itsMaxObservedQueueSize) 00425 itsMaxObservedQueueSize = itsJobs.size(); 00426 00427 job = itsJobs.front(); 00428 itsJobs.pop_front(); 00429 qsize = itsJobs.size(); 00430 00431 ++(dat->njobs); 00432 } 00433 00434 job->run(); 00435 00436 const int c = itsNumRun.atomic_incr_return(); 00437 00438 // show our queue once in a while: 00439 if (itsCheckpointPeriod > 0 00440 && c > itsLastCheckpoint 00441 && c % itsCheckpointPeriod == 0) 00442 { 00443 itsCheckpoints.push_back(Checkpoint(c, qsize, itsMaxObservedQueueSize)); 00444 itsLastCheckpoint = c; 00445 } 00446 00447 { 00448 rutz::time t = rutz::time::wall_clock_now(); 00449 dat->worktime += (t - prev); 00450 prev = t; 00451 } 00452 00453 if (itsSleepUsecs > 0) 00454 usleep(itsSleepUsecs); 00455 } 00456 } 00457 00458 // ###################################################################### 00459 void* WorkThreadServer::c_run(void* p) 00460 { 00461 try 00462 { 00463 // block all signals in this worker thread; instead of receiving 00464 // signals here, we rely on the main thread to catch any 00465 // important signals and then twiddle with the WorkThreadServer 00466 // object as needed (for example, destroying it to cleanly shut 00467 // down all worker threads) 00468 sigset_t ss; 00469 if (sigfillset(&ss) != 0) 00470 PLFATAL("sigfillset() failed"); 00471 00472 if (pthread_sigmask(SIG_SETMASK, &ss, 0) != 0) 00473 PLFATAL("pthread_sigmask() failed"); 00474 00475 ThreadData* const dat = static_cast<ThreadData*>(p); 00476 00477 WorkThreadServer* const srv = dat->srv; 00478 00479 srv->run(dat); 00480 } 00481 catch (...) 00482 { 00483 REPORT_CURRENT_EXCEPTION; 00484 abort(); 00485 } 00486 00487 return NULL; 00488 } 00489 00490 // ###################################################################### 00491 /* So things look consistent in everyone's emacs... */ 00492 /* Local Variables: */ 00493 /* mode: c++ */ 00494 /* indent-tabs-mode: nil */ 00495 /* End: */ 00496 00497 #endif // UTIL_WORKTHREADSERVER_C_DEFINED