
00001 /*!@file Util/WorkThreadServer.H Generic low-level worker-thread server class */ 00002 00003 // //////////////////////////////////////////////////////////////////// // 00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2000-2005 // 00005 // by the University of Southern California (USC) and the iLab at USC. // 00006 // See http://iLab.usc.edu for information about this project. // 00007 // //////////////////////////////////////////////////////////////////// // 00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected // 00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency // 00010 // in Visual Environments, and Applications'' by Christof Koch and // 00011 // Laurent Itti, California Institute of Technology, 2001 (patent // 00012 // pending; application number 09/912,225 filed July 23, 2001; see // 00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status). // 00014 // //////////////////////////////////////////////////////////////////// // 00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit. // 00016 // // 00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can // 00018 // redistribute it and/or modify it under the terms of the GNU General // 00019 // Public License as published by the Free Software Foundation; either // 00020 // version 2 of the License, or (at your option) any later version. // 00021 // // 00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope // 00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the // 00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // 00025 // PURPOSE. See the GNU General Public License for more details. // 00026 // // 00027 // You should have received a copy of the GNU General Public License // 00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write // 00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, // 00030 // Boston, MA 02111-1307 USA. // 00031 // //////////////////////////////////////////////////////////////////// // 00032 // 00033 // Primary maintainer for this file: Rob Peters <rjpeters at usc dot edu> 00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Util/WorkThreadServer.H $ 00035 // $Id: WorkThreadServer.H 9865 2008-07-02 00:11:32Z beobot $ 00036 // 00037 00038 #ifndef UTIL_WORKTHREADSERVER_H_DEFINED 00039 #define UTIL_WORKTHREADSERVER_H_DEFINED 00040 00041 #include "Util/JobServer.H" 00042 #include "Util/Semaphore.H" 00043 #include "rutz/shared_ptr.h" 00044 #include "rutz/atomic.h" 00045 #include "rutz/time.h" 00046 00047 #include <deque> 00048 #include <list> 00049 #include <map> 00050 #include <pthread.h> 00051 #include <string> 00052 00053 /// Generic low-level thread-server class 00054 /** To use this class, derive a job class from JobServer::Job 00055 whose run() function does the work that you want done. Then 00056 instantiate a WorkThreadServer object, start() it with your 00057 desired number of work threads, and start calling enqueueJob() 00058 with your Job objects. See DiskDataStream for an 00059 example of usage. */ 00060 class WorkThreadServer : public JobServer 00061 { 00062 public: 00063 00064 /// Construct with a descriptive nameand a number of worker threads 00065 WorkThreadServer(const char* name, unsigned int n); 00066 00067 /// Virtual destructor for safe inheritance 00068 virtual ~WorkThreadServer(); 00069 00070 private: 00071 /// Start \a n worker threads ready to receive Job requests. 00072 void start(unsigned int n); 00073 00074 /// Stop all worker threads (and wait for them to actually stop). 00075 void stop(); 00076 00077 public: 00078 /// Put a Job in the queue. 00079 /** Worker threads process Job requests in the order they are received. */ 00080 virtual void enqueueJob(const rutz::shared_ptr<Job>& job); 00081 00082 /// Submit multiple jobs in a batch 00083 /** This is more efficient than multiple individual enqueueJob() 00084 calls, since we can lock/unlock the queue mutex just once for 00085 the entire batch. */ 00086 virtual void enqueueJobs(const rutz::shared_ptr<Job>* jobs, 00087 const size_t njobs); 00088 00089 /// Returns the number of worker threads 00090 virtual unsigned int getParallelismHint(); 00091 00092 /// Wait now for all jobs in the queue to be completed 00093 void flushQueue(uint sleepLength = 250000, bool verbose = true); 00094 00095 /// Get the number of jobs currently waiting in the queue. 00096 size_t size(); 00097 00098 /// How often (in number of jobs) should a checkpoint log entry be written 00099 /** If <= 0, then never print checkpoint entries. */ 00100 void setCheckpointPeriod(int p) { itsCheckpointPeriod = p; } 00101 00102 /// Number of microseconds to usleep() in between running each job 00103 /** If <= 0, then don't usleep() at all. */ 00104 void setSleepUsecs(unsigned int us) { itsSleepUsecs = us; } 00105 00106 /// Change the maximum size for the internal queue 00107 /** If an enqueueJob() or enqueueJobs() call would otherwise cause 00108 the maximum queue size to be exceeded, then jobs will be dropped 00109 from the queue in accordance with the current drop policy. */ 00110 void setMaxQueueSize(size_t maxsize); 00111 00112 /// Whether to wait for the job queue to become empty before stopping the worker threads 00113 /** If false (the default), then any jobs left in the queue will be 00114 abandoned when stop() or the destructor is called. */ 00115 void setFlushBeforeStopping(bool v) { itsFlushBeforeStopping = v; } 00116 00117 /// Whether to sort jobs by priority 00118 void setSortJobs(bool v) { itsSortJobs = v; } 00119 00120 enum DropPolicy 00121 { 00122 DROP_OLDEST, ///< drop oldest job (OR, if we are sorting, that means the highest priority job) 00123 DROP_NEWEST, ///< drop newest job (OR, if we are sorting, that means the lowest priority job) 00124 DROP_OLDEST_LOWEST_PRIORITY, ///< drop oldest+lowest priority job regardless of whether we are sorting by priority 00125 DROP_NEWEST_LOWEST_PRIORITY ///< drop newest+lowest priority job regardless of whether we are sorting by priority 00126 }; 00127 00128 /// Specify the policy to be used to drop jobs to avoid exceeding the maximum queue size 00129 void setDropPolicy(DropPolicy p) { itsDropPolicy = p; } 00130 00131 private: 00132 struct ThreadData; 00133 struct Checkpoint; 00134 struct JobStats; 00135 00136 // For internal use only; this must be called only when itsJobsMutex 00137 // is already locked 00138 void doEnqueueJob(const rutz::shared_ptr<Job>& job); 00139 00140 static void* c_run(void* p); 00141 00142 void run(ThreadData* dat); 00143 00144 std::string itsName; 00145 00146 unsigned int itsNumThreads; 00147 ThreadData* itsThreads; 00148 00149 pthread_mutex_t itsJobsMutex; 00150 std::deque<rutz::shared_ptr<Job> > itsJobs; 00151 bool itsSortJobs; 00152 Semaphore itsJobsCounter; 00153 size_t itsMaxQueueSize; 00154 DropPolicy itsDropPolicy; 00155 unsigned int itsNumQueued; 00156 unsigned int itsNumDropped; 00157 bool itsFlushBeforeStopping; 00158 bool itsJobsQuit; 00159 rutz::atomic_int_t itsNumRun; 00160 size_t itsMaxObservedQueueSize; 00161 int itsLastCheckpoint; 00162 int itsCheckpointPeriod; 00163 unsigned int itsSleepUsecs; 00164 std::list<Checkpoint> itsCheckpoints; 00165 std::map<std::string, JobStats> itsJobStats; 00166 }; 00167 00168 // ###################################################################### 00169 /* So things look consistent in everyone's emacs... */ 00170 /* Local Variables: */ 00171 /* mode: c++ */ 00172 /* indent-tabs-mode: nil */ 00173 /* End: */ 00174 00175 #endif // UTIL_WORKTHREADSERVER_H_DEFINED
1.4.4