TCPcommunicator Class Reference

This class handles passing TCPmessages in and out. More...

#include <Beowulf/TCPcommunicator.H>

Inheritance diagram for TCPcommunicator:
Inheritance graph
[legend]
Collaboration diagram for TCPcommunicator:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 TCPcommunicator (OptionManager &mgr, const std::string &descrName="TCP Communicator", const std::string &tagName="TCPcommunicator")
 default constructor
virtual ~TCPcommunicator ()
 TCPcommunicator destructor.
int contact (const char *hostname, const bool blocking=true)
 contact a peer (return fd). hostname contains name:port
int contact (const in_addr_t ip, const short int port, const bool blocking=true)
 contact a peer using its IP and port (returns fd)
void send (const int sfd, TCPmessage &smsg)
 send a message to a known peer:
bool receive (int &rfd, TCPmessage &rmsg, const int timeout=0, int *err=0)
 receive a message
int nbReceived (const int rfd=-1)
 Do we have any received messages?
void waitFor (const int rfd, const int timeout)
 Wait for availability of an incoming message.
void terminateAll ()
 terminate all open connections (except master server for new incoming)
void terminateAllButOne (const int fd)
 terminate all open connections but one (and master server)
void run ()
 thread for data transfers

Protected Member Functions

void start1 ()
 get started and initialize ourselves (before SockServ start)
void start2 ()
 get started and initialize ourselves (after SockServ start)
void stop1 ()
 get stopped (before SockServ stop)
void pollvar (volatile bool *var)
 poll var until it becomes false
bool receiveIt (int &rfd, TCPmessage &rmsg, int *err)
 internal non-blocking receive check
void terminateCli (const int fd)
 terminate a TCPcliServ

Protected Attributes

OModelParam< in_addr > myIP
 My IP address to listen on.
OModelParam< int > inqlen
 incoming queue lengths
OModelParam< int > ouqlen
 outgoing queue lengths
OModelParam< bool > indroplast
 incoming queue drop policy
OModelParam< bool > oudroplast
 outgoing queue drop policy
OModelParam< bool > disableShm
 disbale shm transfers if true
nub::soft_ref< SockServserver
 manages incoming connections and transfers
TCPcliServ ** cli
 array of monitored TCPcliServ's
pthread_t runner
 all transfers are done via a separate thread
bool to_add
 these are for communication between main and running threads:
bool to_stop
int addfd
in_addr_t addip
short int addport
rutz::atomic_int_t threadRunning

Detailed Description

This class handles passing TCPmessages in and out.

This class handles passing TCPmessages in and out; CAUTION: it is a multi-threaded class. Users can establish a connection with another host also running a TCPcommunicator by using contact(). Then, they can send out TCPmessages to that host using send(). Similarly, other hosts may contact us and send us messages, which we can then retrieve using receive().

Definition at line 65 of file TCPcommunicator.H.


Constructor & Destructor Documentation

TCPcommunicator::TCPcommunicator ( OptionManager mgr,
const std::string descrName = "TCP Communicator",
const std::string tagName = "TCPcommunicator" 
)

default constructor

Definition at line 79 of file TCPcommunicator.C.

References ModelComponent::addSubComponent(), rutz::ix86_atomic_int::atomic_set(), and server.

TCPcommunicator::~TCPcommunicator (  )  [virtual]

TCPcommunicator destructor.

Definition at line 158 of file TCPcommunicator.C.


Member Function Documentation

int TCPcommunicator::contact ( const in_addr_t  ip,
const short int  port,
const bool  blocking = true 
)

contact a peer using its IP and port (returns fd)

Definition at line 182 of file TCPcommunicator.C.

References ASSERT, pollvar(), ModelComponent::started(), and to_add.

int TCPcommunicator::contact ( const char *  hostname,
const bool  blocking = true 
)

contact a peer (return fd). hostname contains name:port

Definition at line 162 of file TCPcommunicator.C.

References DEFPORT.

int TCPcommunicator::nbReceived ( const int  rfd = -1  ) 

Do we have any received messages?

Returns the total number of messages in the incoming queues of our various TCPcliServ members. If rfd != -1, only consider a given client (fd) rather than any client.

Definition at line 374 of file TCPcommunicator.C.

References cli, and TCPcliServ::nbReceived().

void TCPcommunicator::pollvar ( volatile bool *  var  )  [protected]

poll var until it becomes false

Definition at line 406 of file TCPcommunicator.C.

References POLL_SLEEP.

Referenced by contact().

bool TCPcommunicator::receive ( int &  rfd,
TCPmessage rmsg,
const int  timeout = 0,
int *  err = 0 
)

receive a message

receive a message (returns false if no new message); use rfd = -1 to receive from any currently connected fd (fd received from will then be put in rfd). If timeout is zero, the call returns immediately, otherwise may wait up to timeout ms. If err is non-null, then (*err) will be set to non-zero if an error occurs.

Definition at line 351 of file TCPcommunicator.C.

References receiveIt(), and waitFor().

bool TCPcommunicator::receiveIt ( int &  rfd,
TCPmessage rmsg,
int *  err 
) [protected]

internal non-blocking receive check

Definition at line 417 of file TCPcommunicator.C.

References ASSERT, rutz::ix86_atomic_int::atomic_get(), cli, and TCPcliServ::receive().

Referenced by receive().

void TCPcommunicator::run (  ) 

thread for data transfers

this will run in a different thread and take care of the actual data transfers, off-loading the main thread from them. Do not call this directly!

Definition at line 218 of file TCPcommunicator.C.

References rutz::ix86_atomic_int::atomic_set(), cli, disableShm, OModelParam< T >::getVal(), indroplast, inqlen, myIP, oudroplast, ouqlen, server, SOCKSERV_IDLE, TCPDONE, terminateCli(), and to_add.

void TCPcommunicator::send ( const int  sfd,
TCPmessage smsg 
)

send a message to a known peer:

Definition at line 337 of file TCPcommunicator.C.

References ASSERT, cli, TCPcliServ::send(), and server.

void TCPcommunicator::start1 (  )  [protected, virtual]

get started and initialize ourselves (before SockServ start)

Reimplemented from ModelComponent.

Definition at line 97 of file TCPcommunicator.C.

References OModelParam< T >::getVal(), OModelParam< T >::getValString(), myIP, and OModelParam< T >::setVal().

void TCPcommunicator::start2 (  )  [protected, virtual]

get started and initialize ourselves (after SockServ start)

Reimplemented from ModelComponent.

Definition at line 122 of file TCPcommunicator.C.

References rutz::ix86_atomic_int::atomic_set(), cli, runner, and to_add.

void TCPcommunicator::stop1 (  )  [protected, virtual]

get stopped (before SockServ stop)

Reimplemented from ModelComponent.

Definition at line 136 of file TCPcommunicator.C.

References ASSERT, rutz::ix86_atomic_int::atomic_get(), cli, and POLL_SLEEP.

void TCPcommunicator::terminateAll (  ) 

terminate all open connections (except master server for new incoming)

Definition at line 317 of file TCPcommunicator.C.

References cli, and terminateCli().

void TCPcommunicator::terminateAllButOne ( const int  fd  ) 

terminate all open connections but one (and master server)

Definition at line 321 of file TCPcommunicator.C.

References cli, and terminateCli().

void TCPcommunicator::terminateCli ( const int  fd  )  [protected]

terminate a TCPcliServ

Definition at line 325 of file TCPcommunicator.C.

References ASSERT, cli, and server.

Referenced by run(), terminateAll(), and terminateAllButOne().

void TCPcommunicator::waitFor ( const int  rfd,
const int  timeout 
)

Wait for availability of an incoming message.

use rfd = -1 to wait for message on any currently connected fd; timeout is in milliseconds.

Definition at line 391 of file TCPcommunicator.C.

References cli.

Referenced by receive().


Member Data Documentation

array of monitored TCPcliServ's

Definition at line 146 of file TCPcommunicator.H.

Referenced by nbReceived(), receiveIt(), run(), send(), start2(), stop1(), terminateAll(), terminateAllButOne(), terminateCli(), and waitFor().

disbale shm transfers if true

Definition at line 124 of file TCPcommunicator.H.

Referenced by run().

incoming queue drop policy

Definition at line 122 of file TCPcommunicator.H.

Referenced by run().

incoming queue lengths

Definition at line 120 of file TCPcommunicator.H.

Referenced by run().

OModelParam<in_addr> TCPcommunicator::myIP [protected]

My IP address to listen on.

Definition at line 119 of file TCPcommunicator.H.

Referenced by run(), and start1().

outgoing queue drop policy

Definition at line 123 of file TCPcommunicator.H.

Referenced by run().

outgoing queue lengths

Definition at line 121 of file TCPcommunicator.H.

Referenced by run().

pthread_t TCPcommunicator::runner [protected]

all transfers are done via a separate thread

Definition at line 149 of file TCPcommunicator.H.

Referenced by start2().

manages incoming connections and transfers

Definition at line 127 of file TCPcommunicator.H.

Referenced by run(), send(), TCPcommunicator(), and terminateCli().

bool TCPcommunicator::to_add [mutable, protected]

these are for communication between main and running threads:

Definition at line 152 of file TCPcommunicator.H.

Referenced by contact(), run(), and start2().


The documentation for this class was generated from the following files:
Generated on Sun May 8 08:43:50 2011 for iLab Neuromorphic Vision Toolkit by  doxygen 1.6.3