00001 /*!@file Util/WorkThreadServer.H Generic low-level worker-thread server class */ 00002 00003 // //////////////////////////////////////////////////////////////////// // 00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2000-2005 // 00005 // by the University of Southern California (USC) and the iLab at USC. // 00006 // See http://iLab.usc.edu for information about this project. // 00007 // //////////////////////////////////////////////////////////////////// // 00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected // 00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency // 00010 // in Visual Environments, and Applications'' by Christof Koch and // 00011 // Laurent Itti, California Institute of Technology, 2001 (patent // 00012 // pending; application number 09/912,225 filed July 23, 2001; see // 00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status). // 00014 // //////////////////////////////////////////////////////////////////// // 00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit. // 00016 // // 00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can // 00018 // redistribute it and/or modify it under the terms of the GNU General // 00019 // Public License as published by the Free Software Foundation; either // 00020 // version 2 of the License, or (at your option) any later version. // 00021 // // 00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope // 00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the // 00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // 00025 // PURPOSE. See the GNU General Public License for more details. // 00026 // // 00027 // You should have received a copy of the GNU General Public License // 00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write // 00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, // 00030 // Boston, MA 02111-1307 USA. // 00031 // //////////////////////////////////////////////////////////////////// // 00032 // 00033 // Primary maintainer for this file: Rob Peters <rjpeters at usc dot edu> 00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Util/WorkThreadServer.H $ 00035 // $Id: WorkThreadServer.H 14387 2011-01-12 17:15:26Z lior $ 00036 // 00037 00038 #ifndef UTIL_WORKTHREADSERVER_H_DEFINED 00039 #define UTIL_WORKTHREADSERVER_H_DEFINED 00040 00041 #include "Util/JobServer.H" 00042 #include "Util/Semaphore.H" 00043 #include "rutz/shared_ptr.h" 00044 #include "rutz/atomic.h" 00045 #include "rutz/time.h" 00046 00047 #include <deque> 00048 #include <list> 00049 #include <map> 00050 #include <pthread.h> 00051 #include <string> 00052 00053 /// Generic low-level thread-server class 00054 /** To use this class, derive a job class from JobServer::Job 00055 whose run() function does the work that you want done. Then 00056 instantiate a WorkThreadServer object, start() it with your 00057 desired number of work threads, and start calling enqueueJob() 00058 with your Job objects. See DiskDataStream for an 00059 example of usage. */ 00060 class WorkThreadServer : public JobServer 00061 { 00062 public: 00063 00064 /// Construct with a descriptive nameand a number of worker threads 00065 WorkThreadServer(const char* name, unsigned int n, const bool verbose_logs = true); 00066 00067 /// Virtual destructor for safe inheritance 00068 virtual ~WorkThreadServer(); 00069 00070 private: 00071 /// Start \a n worker threads ready to receive Job requests. 00072 void start(unsigned int n); 00073 00074 /// Stop all worker threads (and wait for them to actually stop). 00075 void stop(); 00076 00077 public: 00078 /// Put a Job in the queue. 00079 /** Worker threads process Job requests in the order they are received. */ 00080 virtual void enqueueJob(const rutz::shared_ptr<Job>& job); 00081 00082 /// Submit multiple jobs in a batch 00083 /** This is more efficient than multiple individual enqueueJob() 00084 calls, since we can lock/unlock the queue mutex just once for 00085 the entire batch. */ 00086 virtual void enqueueJobs(const rutz::shared_ptr<Job>* jobs, 00087 const size_t njobs); 00088 00089 /// Returns the number of worker threads 00090 virtual unsigned int getParallelismHint(); 00091 00092 /// Wait now for all jobs in the queue to be completed 00093 void flushQueue(uint sleepLength = 250000, bool verbose = true); 00094 00095 /// Get the number of jobs currently waiting in the queue. 00096 size_t size(); 00097 00098 //! Get the number of jobs ran 00099 int getNumRun() { return itsNumRun.atomic_get(); } 00100 00101 /// How often (in number of jobs) should a checkpoint log entry be written 00102 /** If <= 0, then never print checkpoint entries. */ 00103 void setCheckpointPeriod(int p) { itsCheckpointPeriod = p; } 00104 00105 /// Number of microseconds to usleep() in between running each job 00106 /** If <= 0, then don't usleep() at all. */ 00107 void setSleepUsecs(unsigned int us) { itsSleepUsecs = us; } 00108 00109 /// Change the maximum size for the internal queue 00110 /** If an enqueueJob() or enqueueJobs() call would otherwise cause 00111 the maximum queue size to be exceeded, then jobs will be dropped 00112 from the queue in accordance with the current drop policy. */ 00113 void setMaxQueueSize(size_t maxsize); 00114 00115 /// Whether to wait for the job queue to become empty before stopping the worker threads 00116 /** If false (the default), then any jobs left in the queue will be 00117 abandoned when stop() or the destructor is called. */ 00118 void setFlushBeforeStopping(bool v) { itsFlushBeforeStopping = v; } 00119 00120 /// Whether to sort jobs by priority 00121 void setSortJobs(bool v) { itsSortJobs = v; } 00122 00123 enum DropPolicy 00124 { 00125 DROP_OLDEST, ///< drop oldest job (OR, if we are sorting, that means the highest priority job) 00126 DROP_NEWEST, ///< drop newest job (OR, if we are sorting, that means the lowest priority job) 00127 DROP_OLDEST_LOWEST_PRIORITY, ///< drop oldest+lowest priority job regardless of whether we are sorting by priority 00128 DROP_NEWEST_LOWEST_PRIORITY ///< drop newest+lowest priority job regardless of whether we are sorting by priority 00129 }; 00130 00131 /// Specify the policy to be used to drop jobs to avoid exceeding the maximum queue size 00132 void setDropPolicy(DropPolicy p) { itsDropPolicy = p; } 00133 00134 private: 00135 struct ThreadData; 00136 struct Checkpoint; 00137 struct JobStats; 00138 00139 // For internal use only; this must be called only when itsJobsMutex 00140 // is already locked 00141 void doEnqueueJob(const rutz::shared_ptr<Job>& job); 00142 00143 static void* c_run(void* p); 00144 00145 void run(ThreadData* dat); 00146 00147 std::string itsName; 00148 00149 unsigned int itsNumThreads; 00150 bool itsVerboseLogs; 00151 ThreadData* itsThreads; 00152 00153 pthread_mutex_t itsJobsMutex; 00154 std::deque<rutz::shared_ptr<Job> > itsJobs; 00155 bool itsSortJobs; 00156 Semaphore itsJobsCounter; 00157 size_t itsMaxQueueSize; 00158 DropPolicy itsDropPolicy; 00159 unsigned int itsNumQueued; 00160 unsigned int itsNumDropped; 00161 bool itsFlushBeforeStopping; 00162 bool itsJobsQuit; 00163 rutz::atomic_int_t itsNumRun; 00164 size_t itsMaxObservedQueueSize; 00165 int itsLastCheckpoint; 00166 int itsCheckpointPeriod; 00167 unsigned int itsSleepUsecs; 00168 std::list<Checkpoint> itsCheckpoints; 00169 std::map<std::string, JobStats> itsJobStats; 00170 }; 00171 00172 // ###################################################################### 00173 /* So things look consistent in everyone's emacs... */ 00174 /* Local Variables: */ 00175 /* mode: c++ */ 00176 /* indent-tabs-mode: nil */ 00177 /* End: */ 00178 00179 #endif // UTIL_WORKTHREADSERVER_H_DEFINED