WorkThreadServer.H

Go to the documentation of this file.
00001 /*!@file Util/WorkThreadServer.H Generic low-level worker-thread server class */
00002 
00003 // //////////////////////////////////////////////////////////////////// //
00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2000-2005   //
00005 // by the University of Southern California (USC) and the iLab at USC.  //
00006 // See http://iLab.usc.edu for information about this project.          //
00007 // //////////////////////////////////////////////////////////////////// //
00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected //
00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency //
00010 // in Visual Environments, and Applications'' by Christof Koch and      //
00011 // Laurent Itti, California Institute of Technology, 2001 (patent       //
00012 // pending; application number 09/912,225 filed July 23, 2001; see      //
00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status).     //
00014 // //////////////////////////////////////////////////////////////////// //
00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit.       //
00016 //                                                                      //
00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can   //
00018 // redistribute it and/or modify it under the terms of the GNU General  //
00019 // Public License as published by the Free Software Foundation; either  //
00020 // version 2 of the License, or (at your option) any later version.     //
00021 //                                                                      //
00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope  //
00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the   //
00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR      //
00025 // PURPOSE.  See the GNU General Public License for more details.       //
00026 //                                                                      //
00027 // You should have received a copy of the GNU General Public License    //
00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write   //
00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,   //
00030 // Boston, MA 02111-1307 USA.                                           //
00031 // //////////////////////////////////////////////////////////////////// //
00032 //
00033 // Primary maintainer for this file: Rob Peters <rjpeters at usc dot edu>
00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Util/WorkThreadServer.H $
00035 // $Id: WorkThreadServer.H 14387 2011-01-12 17:15:26Z lior $
00036 //
00037 
00038 #ifndef UTIL_WORKTHREADSERVER_H_DEFINED
00039 #define UTIL_WORKTHREADSERVER_H_DEFINED
00040 
00041 #include "Util/JobServer.H"
00042 #include "Util/Semaphore.H"
00043 #include "rutz/shared_ptr.h"
00044 #include "rutz/atomic.h"
00045 #include "rutz/time.h"
00046 
00047 #include <deque>
00048 #include <list>
00049 #include <map>
00050 #include <pthread.h>
00051 #include <string>
00052 
00053 /// Generic low-level thread-server class
00054 /** To use this class, derive a job class from JobServer::Job
00055     whose run() function does the work that you want done. Then
00056     instantiate a WorkThreadServer object, start() it with your
00057     desired number of work threads, and start calling enqueueJob()
00058     with your Job objects. See DiskDataStream for an
00059     example of usage. */
00060 class WorkThreadServer : public JobServer
00061 {
00062 public:
00063 
00064   /// Construct with a descriptive nameand a number of worker threads
00065   WorkThreadServer(const char* name, unsigned int n, const bool verbose_logs = true);
00066 
00067   /// Virtual destructor for safe inheritance
00068   virtual ~WorkThreadServer();
00069 
00070 private:
00071   /// Start \a n worker threads ready to receive Job requests.
00072   void start(unsigned int n);
00073 
00074   /// Stop all worker threads (and wait for them to actually stop).
00075   void stop();
00076 
00077 public:
00078   /// Put a Job in the queue.
00079   /** Worker threads process Job requests in the order they are received. */
00080   virtual void enqueueJob(const rutz::shared_ptr<Job>& job);
00081 
00082   /// Submit multiple jobs in a batch
00083   /** This is more efficient than multiple individual enqueueJob()
00084       calls, since we can lock/unlock the queue mutex just once for
00085       the entire batch. */
00086   virtual void enqueueJobs(const rutz::shared_ptr<Job>* jobs,
00087                            const size_t njobs);
00088 
00089   /// Returns the number of worker threads
00090   virtual unsigned int getParallelismHint();
00091 
00092   /// Wait now for all jobs in the queue to be completed
00093   void flushQueue(uint sleepLength = 250000, bool verbose = true);
00094 
00095   /// Get the number of jobs currently waiting in the queue.
00096   size_t size();
00097 
00098   //! Get the number of jobs ran
00099   int getNumRun() { return itsNumRun.atomic_get(); }
00100 
00101   /// How often (in number of jobs) should a checkpoint log entry be written
00102   /** If <= 0, then never print checkpoint entries. */
00103   void setCheckpointPeriod(int p) { itsCheckpointPeriod = p; }
00104 
00105   /// Number of microseconds to usleep() in between running each job
00106   /** If <= 0, then don't usleep() at all. */
00107   void setSleepUsecs(unsigned int us) { itsSleepUsecs = us; }
00108 
00109   /// Change the maximum size for the internal queue
00110   /** If an enqueueJob() or enqueueJobs() call would otherwise cause
00111       the maximum queue size to be exceeded, then jobs will be dropped
00112       from the queue in accordance with the current drop policy. */
00113   void setMaxQueueSize(size_t maxsize);
00114 
00115   /// Whether to wait for the job queue to become empty before stopping the worker threads
00116   /** If false (the default), then any jobs left in the queue will be
00117       abandoned when stop() or the destructor is called. */
00118   void setFlushBeforeStopping(bool v) { itsFlushBeforeStopping = v; }
00119 
00120   /// Whether to sort jobs by priority
00121   void setSortJobs(bool v) { itsSortJobs = v; }
00122 
00123   enum DropPolicy
00124     {
00125       DROP_OLDEST,  ///< drop oldest job (OR, if we are sorting, that means the highest priority job)
00126       DROP_NEWEST,  ///< drop newest job (OR, if we are sorting, that means the lowest priority job)
00127       DROP_OLDEST_LOWEST_PRIORITY, ///< drop oldest+lowest priority job regardless of whether we are sorting by priority
00128       DROP_NEWEST_LOWEST_PRIORITY  ///< drop newest+lowest priority job regardless of whether we are sorting by priority
00129     };
00130 
00131   /// Specify the policy to be used to drop jobs to avoid exceeding the maximum queue size
00132   void setDropPolicy(DropPolicy p) { itsDropPolicy = p; }
00133 
00134 private:
00135   struct ThreadData;
00136   struct Checkpoint;
00137   struct JobStats;
00138 
00139   // For internal use only; this must be called only when itsJobsMutex
00140   // is already locked
00141   void doEnqueueJob(const rutz::shared_ptr<Job>& job);
00142 
00143   static void* c_run(void* p);
00144 
00145   void run(ThreadData* dat);
00146 
00147   std::string                   itsName;
00148 
00149   unsigned int                  itsNumThreads;
00150   bool                          itsVerboseLogs;
00151   ThreadData*                   itsThreads;
00152 
00153   pthread_mutex_t               itsJobsMutex;
00154   std::deque<rutz::shared_ptr<Job> >   itsJobs;
00155   bool                          itsSortJobs;
00156   Semaphore                     itsJobsCounter;
00157   size_t                        itsMaxQueueSize;
00158   DropPolicy                    itsDropPolicy;
00159   unsigned int                  itsNumQueued;
00160   unsigned int                  itsNumDropped;
00161   bool                          itsFlushBeforeStopping;
00162   bool                          itsJobsQuit;
00163   rutz::atomic_int_t            itsNumRun;
00164   size_t                        itsMaxObservedQueueSize;
00165   int                           itsLastCheckpoint;
00166   int                           itsCheckpointPeriod;
00167   unsigned int                  itsSleepUsecs;
00168   std::list<Checkpoint>         itsCheckpoints;
00169   std::map<std::string, JobStats> itsJobStats;
00170 };
00171 
00172 // ######################################################################
00173 /* So things look consistent in everyone's emacs... */
00174 /* Local Variables: */
00175 /* mode: c++ */
00176 /* indent-tabs-mode: nil */
00177 /* End: */
00178 
00179 #endif // UTIL_WORKTHREADSERVER_H_DEFINED
Generated on Sun May 8 08:42:32 2011 for iLab Neuromorphic Vision Toolkit by  doxygen 1.6.3