00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #ifndef UTIL_WORKTHREADSERVER_C_DEFINED
00039 #define UTIL_WORKTHREADSERVER_C_DEFINED
00040
00041 #include "Util/WorkThreadServer.H"
00042
00043 #include "Util/Assert.H"
00044 #include "Util/log.H"
00045 #include "rutz/error_context.h"
00046 #include "rutz/mutex.h"
00047 #include "rutz/sfmt.h"
00048
00049 #include <algorithm>
00050 #include <cstdlib>
00051 #include <signal.h>
00052 #include <unistd.h>
00053
00054 struct WorkThreadServer::ThreadData
00055 {
00056 ThreadData()
00057 :
00058 srv(0), thread(), index(0), njobs(0), worktime(), waittime()
00059 {}
00060
00061 WorkThreadServer* srv;
00062 pthread_t thread;
00063 unsigned int index;
00064 unsigned int njobs;
00065 rutz::time worktime;
00066 rutz::time waittime;
00067 };
00068
00069 struct WorkThreadServer::Checkpoint
00070 {
00071 Checkpoint(int j, size_t qs, size_t mqs) : jobnum(j), qsize(qs), maxqsize(mqs) {}
00072
00073 int jobnum;
00074 size_t qsize;
00075 size_t maxqsize;
00076 };
00077
00078 struct WorkThreadServer::JobStats
00079 {
00080 JobStats() : nqueued(0), ndropped(0) {}
00081
00082 unsigned int nqueued;
00083 unsigned int ndropped;
00084 };
00085
00086 namespace
00087 {
00088 struct JobPriorityCompare
00089 {
00090 bool operator()(const rutz::shared_ptr<JobServer::Job>& j1,
00091 const rutz::shared_ptr<JobServer::Job>& j2)
00092 {
00093 return j1->priority() < j2->priority();
00094 }
00095 };
00096 }
00097
00098
00099 WorkThreadServer::WorkThreadServer(const char* name, unsigned int n, const bool verbose_logs)
00100 :
00101 itsName(name),
00102 itsNumThreads(0),
00103 itsVerboseLogs(verbose_logs),
00104 itsThreads(0),
00105 itsJobsMutex(),
00106 itsJobs(),
00107 itsSortJobs(false),
00108 itsJobsCounter(0u),
00109 itsMaxQueueSize(itsJobs.max_size()),
00110 itsDropPolicy(DROP_OLDEST),
00111 itsNumQueued(0),
00112 itsNumDropped(0),
00113 itsFlushBeforeStopping(false),
00114 itsJobsQuit(false),
00115 itsMaxObservedQueueSize(0),
00116 itsLastCheckpoint(-1),
00117 itsCheckpointPeriod(0),
00118 itsSleepUsecs(0),
00119 itsCheckpoints()
00120 {
00121
00122 if (0 != pthread_mutex_init(&itsJobsMutex, NULL))
00123 LFATAL("pthread_mutex_init() failed");
00124
00125 itsNumRun.atomic_set(0);
00126
00127 this->start(n);
00128 }
00129
00130
00131 WorkThreadServer::~WorkThreadServer()
00132 {
00133 this->stop();
00134
00135 if (0 != pthread_mutex_destroy(&itsJobsMutex))
00136
00137
00138 LERROR("pthread_mutex_destroy() failed");
00139 }
00140
00141
00142 void WorkThreadServer::start(unsigned int nthreads)
00143 {
00144
00145 ASSERT(itsThreads == 0);
00146 itsNumThreads = nthreads;
00147 itsThreads = new ThreadData[itsNumThreads];
00148 itsJobsQuit = false;
00149 for (uint i = 0; i < itsNumThreads; ++i)
00150 {
00151 itsThreads[i].srv = this;
00152 itsThreads[i].index = 0;
00153 if (0 != pthread_create(&(itsThreads[i].thread), NULL,
00154 &WorkThreadServer::c_run,
00155 static_cast<void*>(itsThreads+i)))
00156 LFATAL("pthread_create() failed for thread %u of %u",
00157 i+1, itsNumThreads);
00158 }
00159 itsNumQueued = 0;
00160
00161 itsCheckpoints.clear();
00162 }
00163
00164
00165 void WorkThreadServer::stop()
00166 {
00167
00168
00169 if (itsFlushBeforeStopping)
00170 this->flushQueue();
00171
00172
00173 itsJobsQuit = true;
00174
00175
00176
00177 for (uint i = 0; i < itsNumThreads; ++i)
00178 itsJobsCounter.post();
00179
00180 if (itsVerboseLogs) {
00181 LINFO("%s: %u jobs were queued (%u of those dropped) into %u worker thread(s)",
00182 itsName.c_str(), itsNumQueued, itsNumDropped, itsNumThreads);
00183
00184 for (std::map<std::string, JobStats>::iterator
00185 itr = itsJobStats.begin(), stop = itsJobStats.end(); itr != stop; ++itr)
00186 LINFO("%s: including %u jobs (%u dropped) of type %s", itsName.c_str(), (*itr).second.nqueued,
00187 (*itr).second.ndropped, (*itr).first.c_str());
00188
00189
00190 for (std::list<Checkpoint>::const_iterator
00191 itr = itsCheckpoints.begin(), stop = itsCheckpoints.end(); itr != stop; ++itr)
00192 LINFO("%s checkpoint: job #%06d - queue len: %"ZU" (max %"ZU")",
00193 itsName.c_str(), (*itr).jobnum, (*itr).qsize, (*itr).maxqsize);
00194 }
00195
00196 for (uint i = 0; i < itsNumThreads; ++i) {
00197 if (0 != pthread_join(itsThreads[i].thread, NULL))
00198 LERROR("pthread_join() failed for thread %u of %u", i+1, itsNumThreads);
00199 if (itsVerboseLogs)
00200 LINFO("%s: thread %u of %u ran %u jobs with %.3fs work time and %.3fs wait time",
00201 itsName.c_str(), i+1, itsNumThreads, itsThreads[i].njobs, itsThreads[i].worktime.sec(),
00202 itsThreads[i].waittime.sec());
00203 }
00204
00205 delete [] itsThreads;
00206 itsThreads = 0;
00207 itsNumThreads = 0;
00208
00209 if (itsJobs.size() > 0 && itsVerboseLogs)
00210 LINFO("%s: %"ZU" jobs were abandoned in the job queue", itsName.c_str(), itsJobs.size());
00211
00212 itsJobs.resize(0);
00213 }
00214
00215
00216 void WorkThreadServer::doEnqueueJob(const rutz::shared_ptr<Job>& j)
00217 {
00218
00219
00220
00221 itsJobs.push_back(j);
00222
00223 if (itsSortJobs)
00224 {
00225 std::stable_sort(itsJobs.begin(), itsJobs.end(),
00226 JobPriorityCompare());
00227 }
00228
00229 if (itsJobs.size() <= itsMaxQueueSize)
00230 {
00231 itsJobsCounter.post();
00232 }
00233 else
00234 {
00235 ++itsNumDropped;
00236
00237 ASSERT(itsJobs.size() > 1);
00238
00239 switch (itsDropPolicy)
00240 {
00241 case DROP_OLDEST:
00242
00243
00244 itsJobStats[itsJobs.front()->jobType()].ndropped++;
00245 itsJobs.front()->drop();
00246 itsJobs.pop_front();
00247 break;
00248
00249 case DROP_NEWEST:
00250
00251
00252
00253
00254 itsJobStats[itsJobs.back()->jobType()].ndropped++;
00255 itsJobs.back()->drop();
00256 itsJobs.pop_back();
00257 break;
00258
00259 case DROP_OLDEST_LOWEST_PRIORITY:
00260 case DROP_NEWEST_LOWEST_PRIORITY:
00261 {
00262
00263 int lowprio = itsJobs.front()->priority();
00264
00265 typedef std::deque<rutz::shared_ptr<Job> >::iterator iterator;
00266
00267 iterator todrop = itsJobs.begin();
00268 iterator itr = todrop; ++itr;
00269 iterator const stop = itsJobs.end();
00270
00271 for ( ; itr != stop; ++itr)
00272 if (itsDropPolicy == DROP_OLDEST_LOWEST_PRIORITY)
00273 {
00274
00275
00276
00277 if ((*itr)->priority() > lowprio)
00278 todrop = itr;
00279 }
00280 else
00281 {
00282
00283
00284
00285 if ((*itr)->priority() >= lowprio)
00286 todrop = itr;
00287 }
00288
00289
00290 itsJobStats[(*todrop)->jobType()].ndropped++;
00291 (*todrop)->drop();
00292 itsJobs.erase(todrop);
00293 }
00294 break;
00295 }
00296 }
00297 }
00298
00299
00300 void WorkThreadServer::enqueueJob(const rutz::shared_ptr<Job>& j)
00301 {
00302 if (itsNumThreads == 0 || itsThreads == NULL)
00303 LFATAL("Can't enqueue jobs into a server with no threads "
00304 "(jobs would block forever)");
00305
00306 ASSERT(itsMaxQueueSize >= 1);
00307
00308 {
00309 GVX_MUTEX_LOCK(&itsJobsMutex);
00310 this->doEnqueueJob(j);
00311 }
00312
00313 itsJobStats[j->jobType()].nqueued++;
00314
00315 ++itsNumQueued;
00316 }
00317
00318
00319 void WorkThreadServer::enqueueJobs(const rutz::shared_ptr<Job>* jobs,
00320 const size_t njobs)
00321 {
00322 if (itsNumThreads == 0 || itsThreads == NULL)
00323 LFATAL("Can't enqueue jobs into a server with no threads "
00324 "(jobs would block forever)");
00325
00326 ASSERT(itsMaxQueueSize >= 1);
00327
00328 {
00329 GVX_MUTEX_LOCK(&itsJobsMutex);
00330 for (size_t i = 0; i < njobs; ++i)
00331 this->doEnqueueJob(jobs[i]);
00332 }
00333
00334 for (size_t i = 0; i < njobs; ++i)
00335 itsJobStats[jobs[i]->jobType()].nqueued++;
00336
00337 itsNumQueued += njobs;
00338 }
00339
00340
00341 unsigned int WorkThreadServer::getParallelismHint()
00342 {
00343 return itsNumThreads;
00344 }
00345
00346
00347 void WorkThreadServer::flushQueue(uint sleepLength, bool verbose)
00348 {
00349 while (true)
00350 {
00351 const size_t sz = this->size();
00352
00353 if (sz == 0)
00354 break;
00355
00356 if(verbose) LINFO("%s: flushing queue - %"ZU" remaining",
00357 itsName.c_str(), sz);
00358 usleep(sleepLength);
00359 }
00360 }
00361
00362
00363 size_t WorkThreadServer::size()
00364 {
00365 size_t ret;
00366 {
00367 GVX_MUTEX_LOCK(&itsJobsMutex);
00368 ret = itsJobs.size();
00369 }
00370 return ret;
00371 }
00372
00373
00374 void WorkThreadServer::setMaxQueueSize(size_t maxsize)
00375 {
00376 if (maxsize == 0)
00377 LFATAL("queue size must be greater than zero");
00378
00379 GVX_MUTEX_LOCK(&itsJobsMutex);
00380 itsMaxQueueSize = maxsize;
00381 }
00382
00383
00384 void WorkThreadServer::run(ThreadData* dat)
00385 {
00386 GVX_ERR_CONTEXT(rutz::sfmt("running %s worker thread %u/%u",
00387 itsName.c_str(), dat->index + 1, itsNumThreads));
00388
00389 rutz::time prev = rutz::time::wall_clock_now();
00390
00391 while (1)
00392 {
00393 itsJobsCounter.wait();
00394
00395 {
00396 rutz::time t = rutz::time::wall_clock_now();
00397 dat->waittime += (t - prev);
00398 prev = t;
00399 }
00400
00401 if (itsJobsQuit == true)
00402 break;
00403
00404 rutz::shared_ptr<JobServer::Job> job;
00405
00406 size_t qsize = 0;
00407
00408 {
00409 GVX_MUTEX_LOCK(&itsJobsMutex);
00410
00411 if (itsJobs.size() == 0)
00412 {
00413
00414
00415
00416
00417
00418 LERROR("Oops! Received a semaphore but the job queue is empty!");
00419 continue;
00420 }
00421
00422 ASSERT(itsJobs.size() > 0);
00423
00424 if (itsJobs.size() > itsMaxObservedQueueSize)
00425 itsMaxObservedQueueSize = itsJobs.size();
00426
00427 job = itsJobs.front();
00428 itsJobs.pop_front();
00429 qsize = itsJobs.size();
00430
00431 ++(dat->njobs);
00432 }
00433
00434 job->run();
00435
00436 const int c = itsNumRun.atomic_incr_return();
00437
00438
00439 if (itsCheckpointPeriod > 0
00440 && c > itsLastCheckpoint
00441 && c % itsCheckpointPeriod == 0)
00442 {
00443 itsCheckpoints.push_back(Checkpoint(c, qsize, itsMaxObservedQueueSize));
00444 itsLastCheckpoint = c;
00445 }
00446
00447 {
00448 rutz::time t = rutz::time::wall_clock_now();
00449 dat->worktime += (t - prev);
00450 prev = t;
00451 }
00452
00453 if (itsSleepUsecs > 0)
00454 usleep(itsSleepUsecs);
00455 }
00456 }
00457
00458
00459 void* WorkThreadServer::c_run(void* p)
00460 {
00461 try
00462 {
00463
00464
00465
00466
00467
00468 sigset_t ss;
00469 if (sigfillset(&ss) != 0)
00470 PLFATAL("sigfillset() failed");
00471
00472 if (pthread_sigmask(SIG_SETMASK, &ss, 0) != 0)
00473 PLFATAL("pthread_sigmask() failed");
00474
00475 ThreadData* const dat = static_cast<ThreadData*>(p);
00476
00477 WorkThreadServer* const srv = dat->srv;
00478
00479 srv->run(dat);
00480 }
00481 catch (...)
00482 {
00483 REPORT_CURRENT_EXCEPTION;
00484 abort();
00485 }
00486
00487 return NULL;
00488 }
00489
00490
00491
00492
00493
00494
00495
00496
00497 #endif // UTIL_WORKTHREADSERVER_C_DEFINED