This class handles passing TCPmessages in and out. More...
#include <Beowulf/TCPcommunicator.H>
Public Member Functions | |
TCPcommunicator (OptionManager &mgr, const std::string &descrName="TCP Communicator", const std::string &tagName="TCPcommunicator") | |
default constructor | |
virtual | ~TCPcommunicator () |
TCPcommunicator destructor. | |
int | contact (const char *hostname, const bool blocking=true) |
contact a peer (return fd). hostname contains name:port | |
int | contact (const in_addr_t ip, const short int port, const bool blocking=true) |
contact a peer using its IP and port (returns fd) | |
void | send (const int sfd, TCPmessage &smsg) |
send a message to a known peer: | |
bool | receive (int &rfd, TCPmessage &rmsg, const int timeout=0, int *err=0) |
receive a message | |
int | nbReceived (const int rfd=-1) |
Do we have any received messages? | |
void | waitFor (const int rfd, const int timeout) |
Wait for availability of an incoming message. | |
void | terminateAll () |
terminate all open connections (except master server for new incoming) | |
void | terminateAllButOne (const int fd) |
terminate all open connections but one (and master server) | |
void | run () |
thread for data transfers | |
Protected Member Functions | |
void | start1 () |
get started and initialize ourselves (before SockServ start) | |
void | start2 () |
get started and initialize ourselves (after SockServ start) | |
void | stop1 () |
get stopped (before SockServ stop) | |
void | pollvar (volatile bool *var) |
poll var until it becomes false | |
bool | receiveIt (int &rfd, TCPmessage &rmsg, int *err) |
internal non-blocking receive check | |
void | terminateCli (const int fd) |
terminate a TCPcliServ | |
Protected Attributes | |
OModelParam< in_addr > | myIP |
My IP address to listen on. | |
OModelParam< int > | inqlen |
incoming queue lengths | |
OModelParam< int > | ouqlen |
outgoing queue lengths | |
OModelParam< bool > | indroplast |
incoming queue drop policy | |
OModelParam< bool > | oudroplast |
outgoing queue drop policy | |
OModelParam< bool > | disableShm |
disbale shm transfers if true | |
nub::soft_ref< SockServ > | server |
manages incoming connections and transfers | |
TCPcliServ ** | cli |
array of monitored TCPcliServ's | |
pthread_t | runner |
all transfers are done via a separate thread | |
bool | to_add |
these are for communication between main and running threads: | |
bool | to_stop |
int | addfd |
in_addr_t | addip |
short int | addport |
rutz::atomic_int_t | threadRunning |
This class handles passing TCPmessages in and out.
This class handles passing TCPmessages in and out; CAUTION: it is a multi-threaded class. Users can establish a connection with another host also running a TCPcommunicator by using contact(). Then, they can send out TCPmessages to that host using send(). Similarly, other hosts may contact us and send us messages, which we can then retrieve using receive().
Definition at line 65 of file TCPcommunicator.H.
TCPcommunicator::TCPcommunicator | ( | OptionManager & | mgr, | |
const std::string & | descrName = "TCP Communicator" , |
|||
const std::string & | tagName = "TCPcommunicator" | |||
) |
default constructor
Definition at line 79 of file TCPcommunicator.C.
References ModelComponent::addSubComponent(), rutz::ix86_atomic_int::atomic_set(), and server.
TCPcommunicator::~TCPcommunicator | ( | ) | [virtual] |
TCPcommunicator destructor.
Definition at line 158 of file TCPcommunicator.C.
int TCPcommunicator::contact | ( | const in_addr_t | ip, | |
const short int | port, | |||
const bool | blocking = true | |||
) |
contact a peer using its IP and port (returns fd)
Definition at line 182 of file TCPcommunicator.C.
References ASSERT, pollvar(), ModelComponent::started(), and to_add.
int TCPcommunicator::contact | ( | const char * | hostname, | |
const bool | blocking = true | |||
) |
contact a peer (return fd). hostname contains name:port
Definition at line 162 of file TCPcommunicator.C.
References DEFPORT.
int TCPcommunicator::nbReceived | ( | const int | rfd = -1 |
) |
Do we have any received messages?
Returns the total number of messages in the incoming queues of our various TCPcliServ members. If rfd != -1, only consider a given client (fd) rather than any client.
Definition at line 374 of file TCPcommunicator.C.
References cli, and TCPcliServ::nbReceived().
void TCPcommunicator::pollvar | ( | volatile bool * | var | ) | [protected] |
poll var until it becomes false
Definition at line 406 of file TCPcommunicator.C.
References POLL_SLEEP.
Referenced by contact().
bool TCPcommunicator::receive | ( | int & | rfd, | |
TCPmessage & | rmsg, | |||
const int | timeout = 0 , |
|||
int * | err = 0 | |||
) |
receive a message
receive a message (returns false if no new message); use rfd = -1 to receive from any currently connected fd (fd received from will then be put in rfd). If timeout is zero, the call returns immediately, otherwise may wait up to timeout ms. If err is non-null, then (*err) will be set to non-zero if an error occurs.
Definition at line 351 of file TCPcommunicator.C.
References receiveIt(), and waitFor().
bool TCPcommunicator::receiveIt | ( | int & | rfd, | |
TCPmessage & | rmsg, | |||
int * | err | |||
) | [protected] |
internal non-blocking receive check
Definition at line 417 of file TCPcommunicator.C.
References ASSERT, rutz::ix86_atomic_int::atomic_get(), cli, and TCPcliServ::receive().
Referenced by receive().
void TCPcommunicator::run | ( | ) |
thread for data transfers
this will run in a different thread and take care of the actual data transfers, off-loading the main thread from them. Do not call this directly!
Definition at line 218 of file TCPcommunicator.C.
References rutz::ix86_atomic_int::atomic_set(), cli, disableShm, OModelParam< T >::getVal(), indroplast, inqlen, myIP, oudroplast, ouqlen, server, SOCKSERV_IDLE, TCPDONE, terminateCli(), and to_add.
void TCPcommunicator::send | ( | const int | sfd, | |
TCPmessage & | smsg | |||
) |
send a message to a known peer:
Definition at line 337 of file TCPcommunicator.C.
References ASSERT, cli, TCPcliServ::send(), and server.
void TCPcommunicator::start1 | ( | ) | [protected, virtual] |
get started and initialize ourselves (before SockServ start)
Reimplemented from ModelComponent.
Definition at line 97 of file TCPcommunicator.C.
References OModelParam< T >::getVal(), OModelParam< T >::getValString(), myIP, and OModelParam< T >::setVal().
void TCPcommunicator::start2 | ( | ) | [protected, virtual] |
get started and initialize ourselves (after SockServ start)
Reimplemented from ModelComponent.
Definition at line 122 of file TCPcommunicator.C.
References rutz::ix86_atomic_int::atomic_set(), cli, runner, and to_add.
void TCPcommunicator::stop1 | ( | ) | [protected, virtual] |
get stopped (before SockServ stop)
Reimplemented from ModelComponent.
Definition at line 136 of file TCPcommunicator.C.
References ASSERT, rutz::ix86_atomic_int::atomic_get(), cli, and POLL_SLEEP.
void TCPcommunicator::terminateAll | ( | ) |
terminate all open connections (except master server for new incoming)
Definition at line 317 of file TCPcommunicator.C.
References cli, and terminateCli().
void TCPcommunicator::terminateAllButOne | ( | const int | fd | ) |
terminate all open connections but one (and master server)
Definition at line 321 of file TCPcommunicator.C.
References cli, and terminateCli().
void TCPcommunicator::terminateCli | ( | const int | fd | ) | [protected] |
terminate a TCPcliServ
Definition at line 325 of file TCPcommunicator.C.
References ASSERT, cli, and server.
Referenced by run(), terminateAll(), and terminateAllButOne().
void TCPcommunicator::waitFor | ( | const int | rfd, | |
const int | timeout | |||
) |
Wait for availability of an incoming message.
use rfd = -1 to wait for message on any currently connected fd; timeout is in milliseconds.
Definition at line 391 of file TCPcommunicator.C.
References cli.
Referenced by receive().
TCPcliServ** TCPcommunicator::cli [protected] |
array of monitored TCPcliServ's
Definition at line 146 of file TCPcommunicator.H.
Referenced by nbReceived(), receiveIt(), run(), send(), start2(), stop1(), terminateAll(), terminateAllButOne(), terminateCli(), and waitFor().
OModelParam<bool> TCPcommunicator::disableShm [protected] |
disbale shm transfers if true
Definition at line 124 of file TCPcommunicator.H.
Referenced by run().
OModelParam<bool> TCPcommunicator::indroplast [protected] |
OModelParam<int> TCPcommunicator::inqlen [protected] |
OModelParam<in_addr> TCPcommunicator::myIP [protected] |
My IP address to listen on.
Definition at line 119 of file TCPcommunicator.H.
OModelParam<bool> TCPcommunicator::oudroplast [protected] |
OModelParam<int> TCPcommunicator::ouqlen [protected] |
pthread_t TCPcommunicator::runner [protected] |
all transfers are done via a separate thread
Definition at line 149 of file TCPcommunicator.H.
Referenced by start2().
nub::soft_ref<SockServ> TCPcommunicator::server [protected] |
manages incoming connections and transfers
Definition at line 127 of file TCPcommunicator.H.
Referenced by run(), send(), TCPcommunicator(), and terminateCli().
bool TCPcommunicator::to_add [mutable, protected] |
these are for communication between main and running threads:
Definition at line 152 of file TCPcommunicator.H.