Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

WorkThreadServer.H

Go to the documentation of this file.
00001 /*!@file Util/WorkThreadServer.H Generic low-level worker-thread server class */
00002 
00003 // //////////////////////////////////////////////////////////////////// //
00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2000-2005   //
00005 // by the University of Southern California (USC) and the iLab at USC.  //
00006 // See http://iLab.usc.edu for information about this project.          //
00007 // //////////////////////////////////////////////////////////////////// //
00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected //
00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency //
00010 // in Visual Environments, and Applications'' by Christof Koch and      //
00011 // Laurent Itti, California Institute of Technology, 2001 (patent       //
00012 // pending; application number 09/912,225 filed July 23, 2001; see      //
00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status).     //
00014 // //////////////////////////////////////////////////////////////////// //
00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit.       //
00016 //                                                                      //
00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can   //
00018 // redistribute it and/or modify it under the terms of the GNU General  //
00019 // Public License as published by the Free Software Foundation; either  //
00020 // version 2 of the License, or (at your option) any later version.     //
00021 //                                                                      //
00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope  //
00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the   //
00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR      //
00025 // PURPOSE.  See the GNU General Public License for more details.       //
00026 //                                                                      //
00027 // You should have received a copy of the GNU General Public License    //
00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write   //
00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,   //
00030 // Boston, MA 02111-1307 USA.                                           //
00031 // //////////////////////////////////////////////////////////////////// //
00032 //
00033 // Primary maintainer for this file: Rob Peters <rjpeters at usc dot edu>
00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Util/WorkThreadServer.H $
00035 // $Id: WorkThreadServer.H 9865 2008-07-02 00:11:32Z beobot $
00036 //
00037 
00038 #ifndef UTIL_WORKTHREADSERVER_H_DEFINED
00039 #define UTIL_WORKTHREADSERVER_H_DEFINED
00040 
00041 #include "Util/JobServer.H"
00042 #include "Util/Semaphore.H"
00043 #include "rutz/shared_ptr.h"
00044 #include "rutz/atomic.h"
00045 #include "rutz/time.h"
00046 
00047 #include <deque>
00048 #include <list>
00049 #include <map>
00050 #include <pthread.h>
00051 #include <string>
00052 
00053 /// Generic low-level thread-server class
00054 /** To use this class, derive a job class from JobServer::Job
00055     whose run() function does the work that you want done. Then
00056     instantiate a WorkThreadServer object, start() it with your
00057     desired number of work threads, and start calling enqueueJob()
00058     with your Job objects. See DiskDataStream for an
00059     example of usage. */
00060 class WorkThreadServer : public JobServer
00061 {
00062 public:
00063 
00064   /// Construct with a descriptive nameand a number of worker threads
00065   WorkThreadServer(const char* name, unsigned int n);
00066 
00067   /// Virtual destructor for safe inheritance
00068   virtual ~WorkThreadServer();
00069 
00070 private:
00071   /// Start \a n worker threads ready to receive Job requests.
00072   void start(unsigned int n);
00073 
00074   /// Stop all worker threads (and wait for them to actually stop).
00075   void stop();
00076 
00077 public:
00078   /// Put a Job in the queue.
00079   /** Worker threads process Job requests in the order they are received. */
00080   virtual void enqueueJob(const rutz::shared_ptr<Job>& job);
00081 
00082   /// Submit multiple jobs in a batch
00083   /** This is more efficient than multiple individual enqueueJob()
00084       calls, since we can lock/unlock the queue mutex just once for
00085       the entire batch. */
00086   virtual void enqueueJobs(const rutz::shared_ptr<Job>* jobs,
00087                            const size_t njobs);
00088 
00089   /// Returns the number of worker threads
00090   virtual unsigned int getParallelismHint();
00091 
00092   /// Wait now for all jobs in the queue to be completed
00093   void flushQueue(uint sleepLength = 250000, bool verbose = true);
00094 
00095   /// Get the number of jobs currently waiting in the queue.
00096   size_t size();
00097 
00098   /// How often (in number of jobs) should a checkpoint log entry be written
00099   /** If <= 0, then never print checkpoint entries. */
00100   void setCheckpointPeriod(int p) { itsCheckpointPeriod = p; }
00101 
00102   /// Number of microseconds to usleep() in between running each job
00103   /** If <= 0, then don't usleep() at all. */
00104   void setSleepUsecs(unsigned int us) { itsSleepUsecs = us; }
00105 
00106   /// Change the maximum size for the internal queue
00107   /** If an enqueueJob() or enqueueJobs() call would otherwise cause
00108       the maximum queue size to be exceeded, then jobs will be dropped
00109       from the queue in accordance with the current drop policy. */
00110   void setMaxQueueSize(size_t maxsize);
00111 
00112   /// Whether to wait for the job queue to become empty before stopping the worker threads
00113   /** If false (the default), then any jobs left in the queue will be
00114       abandoned when stop() or the destructor is called. */
00115   void setFlushBeforeStopping(bool v) { itsFlushBeforeStopping = v; }
00116 
00117   /// Whether to sort jobs by priority
00118   void setSortJobs(bool v) { itsSortJobs = v; }
00119 
00120   enum DropPolicy
00121     {
00122       DROP_OLDEST,  ///< drop oldest job (OR, if we are sorting, that means the highest priority job)
00123       DROP_NEWEST,  ///< drop newest job (OR, if we are sorting, that means the lowest priority job)
00124       DROP_OLDEST_LOWEST_PRIORITY, ///< drop oldest+lowest priority job regardless of whether we are sorting by priority
00125       DROP_NEWEST_LOWEST_PRIORITY  ///< drop newest+lowest priority job regardless of whether we are sorting by priority
00126     };
00127 
00128   /// Specify the policy to be used to drop jobs to avoid exceeding the maximum queue size
00129   void setDropPolicy(DropPolicy p) { itsDropPolicy = p; }
00130 
00131 private:
00132   struct ThreadData;
00133   struct Checkpoint;
00134   struct JobStats;
00135 
00136   // For internal use only; this must be called only when itsJobsMutex
00137   // is already locked
00138   void doEnqueueJob(const rutz::shared_ptr<Job>& job);
00139 
00140   static void* c_run(void* p);
00141 
00142   void run(ThreadData* dat);
00143 
00144   std::string                   itsName;
00145 
00146   unsigned int                  itsNumThreads;
00147   ThreadData*                   itsThreads;
00148 
00149   pthread_mutex_t               itsJobsMutex;
00150   std::deque<rutz::shared_ptr<Job> >   itsJobs;
00151   bool                          itsSortJobs;
00152   Semaphore                     itsJobsCounter;
00153   size_t                        itsMaxQueueSize;
00154   DropPolicy                    itsDropPolicy;
00155   unsigned int                  itsNumQueued;
00156   unsigned int                  itsNumDropped;
00157   bool                          itsFlushBeforeStopping;
00158   bool                          itsJobsQuit;
00159   rutz::atomic_int_t            itsNumRun;
00160   size_t                        itsMaxObservedQueueSize;
00161   int                           itsLastCheckpoint;
00162   int                           itsCheckpointPeriod;
00163   unsigned int                  itsSleepUsecs;
00164   std::list<Checkpoint>         itsCheckpoints;
00165   std::map<std::string, JobStats> itsJobStats;
00166 };
00167 
00168 // ######################################################################
00169 /* So things look consistent in everyone's emacs... */
00170 /* Local Variables: */
00171 /* mode: c++ */
00172 /* indent-tabs-mode: nil */
00173 /* End: */
00174 
00175 #endif // UTIL_WORKTHREADSERVER_H_DEFINED

Generated on Sun Nov 22 13:43:17 2009 for iLab Neuromorphic Vision Toolkit by  doxygen 1.4.4