WorkThreadServer.C

Go to the documentation of this file.
00001 /*!@file Util/WorkThreadServer.C Generic low-level worker-thread server class */
00002 
00003 // //////////////////////////////////////////////////////////////////// //
00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2000-2005   //
00005 // by the University of Southern California (USC) and the iLab at USC.  //
00006 // See http://iLab.usc.edu for information about this project.          //
00007 // //////////////////////////////////////////////////////////////////// //
00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected //
00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency //
00010 // in Visual Environments, and Applications'' by Christof Koch and      //
00011 // Laurent Itti, California Institute of Technology, 2001 (patent       //
00012 // pending; application number 09/912,225 filed July 23, 2001; see      //
00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status).     //
00014 // //////////////////////////////////////////////////////////////////// //
00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit.       //
00016 //                                                                      //
00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can   //
00018 // redistribute it and/or modify it under the terms of the GNU General  //
00019 // Public License as published by the Free Software Foundation; either  //
00020 // version 2 of the License, or (at your option) any later version.     //
00021 //                                                                      //
00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope  //
00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the   //
00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR      //
00025 // PURPOSE.  See the GNU General Public License for more details.       //
00026 //                                                                      //
00027 // You should have received a copy of the GNU General Public License    //
00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write   //
00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,   //
00030 // Boston, MA 02111-1307 USA.                                           //
00031 // //////////////////////////////////////////////////////////////////// //
00032 //
00033 // Primary maintainer for this file: Rob Peters <rjpeters at usc dot edu>
00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Util/WorkThreadServer.C $
00035 // $Id: WorkThreadServer.C 14129 2010-10-13 04:45:06Z itti $
00036 //
00037 
00038 #ifndef UTIL_WORKTHREADSERVER_C_DEFINED
00039 #define UTIL_WORKTHREADSERVER_C_DEFINED
00040 
00041 #include "Util/WorkThreadServer.H"
00042 
00043 #include "Util/Assert.H"
00044 #include "Util/log.H"
00045 #include "rutz/error_context.h"
00046 #include "rutz/mutex.h"
00047 #include "rutz/sfmt.h"
00048 
00049 #include <algorithm> // for stable_sort()
00050 #include <cstdlib>
00051 #include <signal.h>
00052 #include <unistd.h>
00053 
00054 struct WorkThreadServer::ThreadData
00055 {
00056   ThreadData()
00057     :
00058     srv(0), thread(), index(0), njobs(0), worktime(), waittime()
00059   {}
00060 
00061   WorkThreadServer* srv;
00062   pthread_t thread;
00063   unsigned int index;
00064   unsigned int njobs;
00065   rutz::time worktime;
00066   rutz::time waittime;
00067 };
00068 
00069 struct WorkThreadServer::Checkpoint
00070 {
00071   Checkpoint(int j, size_t qs, size_t mqs) : jobnum(j), qsize(qs), maxqsize(mqs) {}
00072 
00073   int jobnum;
00074   size_t qsize;
00075   size_t maxqsize;
00076 };
00077 
00078 struct WorkThreadServer::JobStats
00079 {
00080   JobStats() : nqueued(0), ndropped(0) {}
00081 
00082   unsigned int nqueued;
00083   unsigned int ndropped;
00084 };
00085 
00086 namespace
00087 {
00088   struct JobPriorityCompare
00089   {
00090     bool operator()(const rutz::shared_ptr<JobServer::Job>& j1,
00091                     const rutz::shared_ptr<JobServer::Job>& j2)
00092     {
00093       return j1->priority() < j2->priority();
00094     }
00095   };
00096 }
00097 
00098 // ######################################################################
00099 WorkThreadServer::WorkThreadServer(const char* name, unsigned int n, const bool verbose_logs)
00100   :
00101   itsName(name),
00102   itsNumThreads(0),
00103   itsVerboseLogs(verbose_logs),
00104   itsThreads(0),
00105   itsJobsMutex(),
00106   itsJobs(),
00107   itsSortJobs(false),
00108   itsJobsCounter(0u),
00109   itsMaxQueueSize(itsJobs.max_size()),
00110   itsDropPolicy(DROP_OLDEST),
00111   itsNumQueued(0),
00112   itsNumDropped(0),
00113   itsFlushBeforeStopping(false),
00114   itsJobsQuit(false),
00115   itsMaxObservedQueueSize(0),
00116   itsLastCheckpoint(-1),
00117   itsCheckpointPeriod(0),
00118   itsSleepUsecs(0),
00119   itsCheckpoints()
00120 {
00121   // init mutex
00122   if (0 != pthread_mutex_init(&itsJobsMutex, NULL))
00123     LFATAL("pthread_mutex_init() failed");
00124 
00125   itsNumRun.atomic_set(0);
00126 
00127   this->start(n);
00128 }
00129 
00130 // ######################################################################
00131 WorkThreadServer::~WorkThreadServer()
00132 {
00133   this->stop();
00134 
00135   if (0 != pthread_mutex_destroy(&itsJobsMutex))
00136     // not LFATAL(), because we never want to generate an exception in
00137     // a destructor
00138     LERROR("pthread_mutex_destroy() failed");
00139 }
00140 
00141 // ######################################################################
00142 void WorkThreadServer::start(unsigned int nthreads)
00143 {
00144   // start threads
00145   ASSERT(itsThreads == 0);
00146   itsNumThreads = nthreads;
00147   itsThreads = new ThreadData[itsNumThreads];
00148   itsJobsQuit = false;
00149   for (uint i = 0; i < itsNumThreads; ++i)
00150     {
00151       itsThreads[i].srv = this;
00152       itsThreads[i].index = 0;
00153       if (0 != pthread_create(&(itsThreads[i].thread), NULL,
00154                               &WorkThreadServer::c_run,
00155                               static_cast<void*>(itsThreads+i)))
00156         LFATAL("pthread_create() failed for thread %u of %u",
00157                i+1, itsNumThreads);
00158     }
00159   itsNumQueued = 0;
00160 
00161   itsCheckpoints.clear();
00162 }
00163 
00164 // ######################################################################
00165 void WorkThreadServer::stop()
00166 {
00167   // do we want to wait for pending jobs to complete before we shut
00168   // down the worker threads?
00169   if (itsFlushBeforeStopping)
00170     this->flushQueue();
00171 
00172   // tell the worker threads to quit when they wake up next
00173   itsJobsQuit = true;
00174 
00175   // post a bunch of fake jobs to make sure the worker threads wake up
00176   // and realize that we want to quit
00177   for (uint i = 0; i < itsNumThreads; ++i)
00178     itsJobsCounter.post();
00179 
00180   if (itsVerboseLogs) {
00181     LINFO("%s: %u jobs were queued (%u of those dropped) into %u worker thread(s)",
00182           itsName.c_str(), itsNumQueued, itsNumDropped, itsNumThreads);
00183 
00184     for (std::map<std::string, JobStats>::iterator
00185            itr = itsJobStats.begin(), stop = itsJobStats.end(); itr != stop; ++itr)
00186       LINFO("%s: including %u jobs (%u dropped) of type %s", itsName.c_str(), (*itr).second.nqueued,
00187             (*itr).second.ndropped, (*itr).first.c_str());
00188 
00189     // dump the contents of our checkpoint cache:
00190     for (std::list<Checkpoint>::const_iterator
00191            itr = itsCheckpoints.begin(), stop = itsCheckpoints.end(); itr != stop; ++itr)
00192       LINFO("%s checkpoint: job #%06d - queue len: %"ZU" (max %"ZU")",
00193             itsName.c_str(), (*itr).jobnum, (*itr).qsize, (*itr).maxqsize);
00194   }
00195 
00196   for (uint i = 0; i < itsNumThreads; ++i) {
00197       if (0 != pthread_join(itsThreads[i].thread, NULL))
00198         LERROR("pthread_join() failed for thread %u of %u", i+1, itsNumThreads);
00199       if (itsVerboseLogs)
00200         LINFO("%s: thread %u of %u ran %u jobs with %.3fs work time and %.3fs wait time",
00201               itsName.c_str(), i+1, itsNumThreads, itsThreads[i].njobs, itsThreads[i].worktime.sec(),
00202               itsThreads[i].waittime.sec());
00203   }
00204 
00205   delete [] itsThreads;
00206   itsThreads = 0;
00207   itsNumThreads = 0;
00208 
00209   if (itsJobs.size() > 0 && itsVerboseLogs)
00210     LINFO("%s: %"ZU" jobs were abandoned in the job queue", itsName.c_str(), itsJobs.size());
00211 
00212   itsJobs.resize(0);
00213 }
00214 
00215 // ######################################################################
00216 void WorkThreadServer::doEnqueueJob(const rutz::shared_ptr<Job>& j)
00217 {
00218   // the caller must ensure that itsJobsMutex is already locked when
00219   // this function is called
00220 
00221   itsJobs.push_back(j);
00222 
00223   if (itsSortJobs)
00224     {
00225       std::stable_sort(itsJobs.begin(), itsJobs.end(),
00226                        JobPriorityCompare());
00227     }
00228 
00229   if (itsJobs.size() <= itsMaxQueueSize)
00230     {
00231       itsJobsCounter.post();
00232     }
00233   else
00234     {
00235       ++itsNumDropped;
00236 
00237       ASSERT(itsJobs.size() > 1); // because itsMaxQueueSize must be >= 1
00238 
00239       switch (itsDropPolicy)
00240         {
00241         case DROP_OLDEST:
00242 
00243           // drop the job at the front of the queue
00244           itsJobStats[itsJobs.front()->jobType()].ndropped++;
00245           itsJobs.front()->drop();
00246           itsJobs.pop_front();
00247           break;
00248 
00249         case DROP_NEWEST:
00250 
00251           // drop the job at the back of the queue (this may or may
00252           // not be the same job that we have just added, depending on
00253           // whether or not we have done priority sort)
00254           itsJobStats[itsJobs.back()->jobType()].ndropped++;
00255           itsJobs.back()->drop();
00256           itsJobs.pop_back();
00257           break;
00258 
00259         case DROP_OLDEST_LOWEST_PRIORITY: // fall-through
00260         case DROP_NEWEST_LOWEST_PRIORITY:
00261           {
00262             // find the lowest priority job in the queue...
00263             int lowprio = itsJobs.front()->priority();
00264 
00265             typedef std::deque<rutz::shared_ptr<Job> >::iterator iterator;
00266 
00267             iterator todrop = itsJobs.begin();
00268             iterator itr = todrop; ++itr;
00269             iterator const stop = itsJobs.end();
00270 
00271             for ( ; itr != stop; ++itr)
00272               if (itsDropPolicy == DROP_OLDEST_LOWEST_PRIORITY)
00273                 {
00274                   // use strictly-greater-than here, so that we get
00275                   // the item with lowest priority that is closest to
00276                   // the front of the queue (i.e., oldest)
00277                   if ((*itr)->priority() > lowprio)
00278                     todrop = itr;
00279                 }
00280               else // itsDropPolicy == DROP_NEWEST_LOWEST_PRIORITY
00281                 {
00282                   // use greater-than-or-equal here, so that we get
00283                   // the item with lowest priority that is closest to
00284                   // the back of the queue (i.e., newest)
00285                   if ((*itr)->priority() >= lowprio)
00286                     todrop = itr;
00287                 }
00288 
00289             // and drop it:
00290             itsJobStats[(*todrop)->jobType()].ndropped++;
00291             (*todrop)->drop();
00292             itsJobs.erase(todrop);
00293           }
00294           break;
00295         }
00296     }
00297 }
00298 
00299 // ######################################################################
00300 void WorkThreadServer::enqueueJob(const rutz::shared_ptr<Job>& j)
00301 {
00302   if (itsNumThreads == 0 || itsThreads == NULL)
00303     LFATAL("Can't enqueue jobs into a server with no threads "
00304            "(jobs would block forever)");
00305 
00306   ASSERT(itsMaxQueueSize >= 1);
00307 
00308   {
00309     GVX_MUTEX_LOCK(&itsJobsMutex);
00310     this->doEnqueueJob(j);
00311   }
00312 
00313   itsJobStats[j->jobType()].nqueued++;
00314 
00315   ++itsNumQueued;
00316 }
00317 
00318 // ######################################################################
00319 void WorkThreadServer::enqueueJobs(const rutz::shared_ptr<Job>* jobs,
00320                                    const size_t njobs)
00321 {
00322   if (itsNumThreads == 0 || itsThreads == NULL)
00323     LFATAL("Can't enqueue jobs into a server with no threads "
00324            "(jobs would block forever)");
00325 
00326   ASSERT(itsMaxQueueSize >= 1);
00327 
00328   {
00329     GVX_MUTEX_LOCK(&itsJobsMutex);
00330     for (size_t i = 0; i < njobs; ++i)
00331       this->doEnqueueJob(jobs[i]);
00332   }
00333 
00334   for (size_t i = 0; i < njobs; ++i)
00335     itsJobStats[jobs[i]->jobType()].nqueued++;
00336 
00337   itsNumQueued += njobs;
00338 }
00339 
00340 // ######################################################################
00341 unsigned int WorkThreadServer::getParallelismHint()
00342 {
00343   return itsNumThreads;
00344 }
00345 
00346 // ######################################################################
00347 void WorkThreadServer::flushQueue(uint sleepLength, bool verbose)
00348 {
00349   while (true)
00350     {
00351       const size_t sz = this->size();
00352 
00353       if (sz == 0)
00354         break;
00355 
00356       if(verbose) LINFO("%s: flushing queue - %"ZU" remaining",
00357             itsName.c_str(), sz);
00358       usleep(sleepLength);
00359     }
00360 }
00361 
00362 // ######################################################################
00363 size_t WorkThreadServer::size()
00364 {
00365   size_t ret;
00366   {
00367     GVX_MUTEX_LOCK(&itsJobsMutex);
00368     ret = itsJobs.size();
00369   }
00370   return ret;
00371 }
00372 
00373 // ######################################################################
00374 void WorkThreadServer::setMaxQueueSize(size_t maxsize)
00375 {
00376   if (maxsize == 0)
00377     LFATAL("queue size must be greater than zero");
00378 
00379   GVX_MUTEX_LOCK(&itsJobsMutex);
00380   itsMaxQueueSize = maxsize;
00381 }
00382 
00383 // ######################################################################
00384 void WorkThreadServer::run(ThreadData* dat)
00385 {
00386   GVX_ERR_CONTEXT(rutz::sfmt("running %s worker thread %u/%u",
00387                              itsName.c_str(), dat->index + 1, itsNumThreads));
00388 
00389   rutz::time prev = rutz::time::wall_clock_now();
00390 
00391   while (1)
00392     {
00393       itsJobsCounter.wait();
00394 
00395       {
00396         rutz::time t = rutz::time::wall_clock_now();
00397         dat->waittime += (t - prev);
00398         prev = t;
00399       }
00400 
00401       if (itsJobsQuit == true)
00402         break;
00403 
00404       rutz::shared_ptr<JobServer::Job> job;
00405 
00406       size_t qsize = 0;
00407 
00408       {
00409         GVX_MUTEX_LOCK(&itsJobsMutex);
00410 
00411         if (itsJobs.size() == 0)
00412           {
00413             // If this happens, it probably represents an OS bug
00414             // (somehow the semaphore counts have gotten corrupted),
00415             // but we will issue an LERROR() instead of an LFATAL()
00416             // and try to keep limping along with the goal of not
00417             // unnecessarily bombing out of a long analysis job.
00418             LERROR("Oops! Received a semaphore but the job queue is empty!");
00419             continue;
00420           }
00421 
00422         ASSERT(itsJobs.size() > 0);
00423 
00424         if (itsJobs.size() > itsMaxObservedQueueSize)
00425           itsMaxObservedQueueSize = itsJobs.size();
00426 
00427         job = itsJobs.front();
00428         itsJobs.pop_front();
00429         qsize = itsJobs.size();
00430 
00431         ++(dat->njobs);
00432       }
00433 
00434       job->run();
00435 
00436       const int c = itsNumRun.atomic_incr_return();
00437 
00438       // show our queue once in a while:
00439       if (itsCheckpointPeriod > 0
00440           && c > itsLastCheckpoint
00441           && c % itsCheckpointPeriod == 0)
00442         {
00443           itsCheckpoints.push_back(Checkpoint(c, qsize, itsMaxObservedQueueSize));
00444           itsLastCheckpoint = c;
00445         }
00446 
00447       {
00448         rutz::time t = rutz::time::wall_clock_now();
00449         dat->worktime += (t - prev);
00450         prev = t;
00451       }
00452 
00453       if (itsSleepUsecs > 0)
00454         usleep(itsSleepUsecs);
00455     }
00456 }
00457 
00458 // ######################################################################
00459 void* WorkThreadServer::c_run(void* p)
00460 {
00461   try
00462     {
00463       // block all signals in this worker thread; instead of receiving
00464       // signals here, we rely on the main thread to catch any
00465       // important signals and then twiddle with the WorkThreadServer
00466       // object as needed (for example, destroying it to cleanly shut
00467       // down all worker threads)
00468       sigset_t ss;
00469       if (sigfillset(&ss) != 0)
00470         PLFATAL("sigfillset() failed");
00471 
00472       if (pthread_sigmask(SIG_SETMASK, &ss, 0) != 0)
00473         PLFATAL("pthread_sigmask() failed");
00474 
00475       ThreadData* const dat = static_cast<ThreadData*>(p);
00476 
00477       WorkThreadServer* const srv = dat->srv;
00478 
00479       srv->run(dat);
00480     }
00481   catch (...)
00482     {
00483       REPORT_CURRENT_EXCEPTION;
00484       abort();
00485     }
00486 
00487   return NULL;
00488 }
00489 
00490 // ######################################################################
00491 /* So things look consistent in everyone's emacs... */
00492 /* Local Variables: */
00493 /* mode: c++ */
00494 /* indent-tabs-mode: nil */
00495 /* End: */
00496 
00497 #endif // UTIL_WORKTHREADSERVER_C_DEFINED
Generated on Sun May 8 08:07:01 2011 for iLab Neuromorphic Vision Toolkit by  doxygen 1.6.3