WorkThreadServer Class Reference

Generic low-level thread-server class. More...

#include <Util/WorkThreadServer.H>

Inheritance diagram for WorkThreadServer:
Inheritance graph
[legend]
Collaboration diagram for WorkThreadServer:
Collaboration graph
[legend]

List of all members.

Classes

struct  Checkpoint
struct  JobStats
struct  ThreadData

Public Types

enum  DropPolicy { DROP_OLDEST, DROP_NEWEST, DROP_OLDEST_LOWEST_PRIORITY, DROP_NEWEST_LOWEST_PRIORITY }

Public Member Functions

 WorkThreadServer (const char *name, unsigned int n, const bool verbose_logs=true)
 Construct with a descriptive nameand a number of worker threads.
virtual ~WorkThreadServer ()
 Virtual destructor for safe inheritance.
virtual void enqueueJob (const rutz::shared_ptr< Job > &job)
 Put a Job in the queue.
virtual void enqueueJobs (const rutz::shared_ptr< Job > *jobs, const size_t njobs)
 Submit multiple jobs in a batch.
virtual unsigned int getParallelismHint ()
 Returns the number of worker threads.
void flushQueue (uint sleepLength=250000, bool verbose=true)
 Wait now for all jobs in the queue to be completed.
size_t size ()
 Get the number of jobs currently waiting in the queue.
int getNumRun ()
 Get the number of jobs ran.
void setCheckpointPeriod (int p)
 How often (in number of jobs) should a checkpoint log entry be written.
void setSleepUsecs (unsigned int us)
 Number of microseconds to usleep() in between running each job.
void setMaxQueueSize (size_t maxsize)
 Change the maximum size for the internal queue.
void setFlushBeforeStopping (bool v)
 Whether to wait for the job queue to become empty before stopping the worker threads.
void setSortJobs (bool v)
 Whether to sort jobs by priority.
void setDropPolicy (DropPolicy p)
 Specify the policy to be used to drop jobs to avoid exceeding the maximum queue size.

Detailed Description

Generic low-level thread-server class.

To use this class, derive a job class from JobServer::Job whose run() function does the work that you want done. Then instantiate a WorkThreadServer object, start() it with your desired number of work threads, and start calling enqueueJob() with your Job objects. See DiskDataStream for an example of usage.

Definition at line 60 of file WorkThreadServer.H.


Member Enumeration Documentation

Enumerator:
DROP_OLDEST 

drop oldest job (OR, if we are sorting, that means the highest priority job)

DROP_NEWEST 

drop newest job (OR, if we are sorting, that means the lowest priority job)

DROP_OLDEST_LOWEST_PRIORITY 

drop oldest+lowest priority job regardless of whether we are sorting by priority

DROP_NEWEST_LOWEST_PRIORITY 

drop newest+lowest priority job regardless of whether we are sorting by priority

Definition at line 123 of file WorkThreadServer.H.


Constructor & Destructor Documentation

WorkThreadServer::WorkThreadServer ( const char *  name,
unsigned int  n,
const bool  verbose_logs = true 
)

Construct with a descriptive nameand a number of worker threads.

Definition at line 99 of file WorkThreadServer.C.

References rutz::ix86_atomic_int::atomic_set().

WorkThreadServer::~WorkThreadServer (  )  [virtual]

Virtual destructor for safe inheritance.

Definition at line 131 of file WorkThreadServer.C.


Member Function Documentation

void WorkThreadServer::enqueueJob ( const rutz::shared_ptr< Job > &  job  )  [virtual]

Put a Job in the queue.

Worker threads process Job requests in the order they are received.

Implements JobServer.

Definition at line 300 of file WorkThreadServer.C.

References ASSERT.

Referenced by VisualObjectDB::getObjectMatchesParallel(), StimController::start2(), and DiskDataStream::writeFrame().

void WorkThreadServer::enqueueJobs ( const rutz::shared_ptr< Job > *  jobs,
const size_t  njobs 
) [virtual]

Submit multiple jobs in a batch.

This is more efficient than multiple individual enqueueJob() calls, since we can lock/unlock the queue mutex just once for the entire batch.

Reimplemented from JobServer.

Definition at line 319 of file WorkThreadServer.C.

References ASSERT.

void WorkThreadServer::flushQueue ( uint  sleepLength = 250000,
bool  verbose = true 
)

Wait now for all jobs in the queue to be completed.

Definition at line 347 of file WorkThreadServer.C.

References size().

Referenced by VisualObjectDB::getObjectMatchesParallel().

int WorkThreadServer::getNumRun (  )  [inline]

Get the number of jobs ran.

Definition at line 99 of file WorkThreadServer.H.

References rutz::ix86_atomic_int::atomic_get().

unsigned int WorkThreadServer::getParallelismHint (  )  [virtual]

Returns the number of worker threads.

Implements JobServer.

Definition at line 341 of file WorkThreadServer.C.

void WorkThreadServer::setCheckpointPeriod ( int  p  )  [inline]

How often (in number of jobs) should a checkpoint log entry be written.

If <= 0, then never print checkpoint entries.

Definition at line 103 of file WorkThreadServer.H.

Referenced by DiskDataStream::start2().

void WorkThreadServer::setDropPolicy ( DropPolicy  p  )  [inline]

Specify the policy to be used to drop jobs to avoid exceeding the maximum queue size.

Definition at line 132 of file WorkThreadServer.H.

void WorkThreadServer::setFlushBeforeStopping ( bool  v  )  [inline]

Whether to wait for the job queue to become empty before stopping the worker threads.

If false (the default), then any jobs left in the queue will be abandoned when stop() or the destructor is called.

Definition at line 118 of file WorkThreadServer.H.

void WorkThreadServer::setMaxQueueSize ( size_t  maxsize  ) 

Change the maximum size for the internal queue.

If an enqueueJob() or enqueueJobs() call would otherwise cause the maximum queue size to be exceeded, then jobs will be dropped from the queue in accordance with the current drop policy.

Definition at line 374 of file WorkThreadServer.C.

void WorkThreadServer::setSleepUsecs ( unsigned int  us  )  [inline]

Number of microseconds to usleep() in between running each job.

If <= 0, then don't usleep() at all.

Definition at line 107 of file WorkThreadServer.H.

Referenced by DiskDataStream::start2().

void WorkThreadServer::setSortJobs ( bool  v  )  [inline]

Whether to sort jobs by priority.

Definition at line 121 of file WorkThreadServer.H.

size_t WorkThreadServer::size (  ) 

Get the number of jobs currently waiting in the queue.

Definition at line 363 of file WorkThreadServer.C.

Referenced by flushQueue(), and DiskDataStream::stop1().


The documentation for this class was generated from the following files:
Generated on Sun May 8 08:44:03 2011 for iLab Neuromorphic Vision Toolkit by  doxygen 1.6.3