00001 /*!@file Beowulf/TCPcommunicator.H A class to handle multiple TCPmessage 00002 communications */ 00003 00004 // //////////////////////////////////////////////////////////////////// // 00005 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2001 by the // 00006 // University of Southern California (USC) and the iLab at USC. // 00007 // See http://iLab.usc.edu for information about this project. // 00008 // //////////////////////////////////////////////////////////////////// // 00009 // Major portions of the iLab Neuromorphic Vision Toolkit are protected // 00010 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency // 00011 // in Visual Environments, and Applications'' by Christof Koch and // 00012 // Laurent Itti, California Institute of Technology, 2001 (patent // 00013 // pending; application number 09/912,225 filed July 23, 2001; see // 00014 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status). // 00015 // //////////////////////////////////////////////////////////////////// // 00016 // This file is part of the iLab Neuromorphic Vision C++ Toolkit. // 00017 // // 00018 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can // 00019 // redistribute it and/or modify it under the terms of the GNU General // 00020 // Public License as published by the Free Software Foundation; either // 00021 // version 2 of the License, or (at your option) any later version. // 00022 // // 00023 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope // 00024 // that it will be useful, but WITHOUT ANY WARRANTY; without even the // 00025 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // 00026 // PURPOSE. See the GNU General Public License for more details. // 00027 // // 00028 // You should have received a copy of the GNU General Public License // 00029 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write // 00030 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, // 00031 // Boston, MA 02111-1307 USA. // 00032 // //////////////////////////////////////////////////////////////////// // 00033 // 00034 // Primary maintainer for this file: Laurent Itti <itti@usc.edu> 00035 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Beowulf/TCPcommunicator.H $ 00036 // $Id: TCPcommunicator.H 6986 2006-08-10 23:51:03Z rjpeters $ 00037 // 00038 00039 #ifndef TCPCOMMUNICATOR_H_DEFINED 00040 #define TCPCOMMUNICATOR_H_DEFINED 00041 00042 #include "Beowulf/SockServ.H" 00043 #include "Beowulf/TCPcliServ.H" 00044 #include "Beowulf/TCPdefs.H" 00045 #include "Component/ModelComponent.H" 00046 #include "Component/ModelParam.H" 00047 #include "rutz/atomic.h" 00048 00049 #include <pthread.h> 00050 #include <sys/time.h> 00051 #include <sys/types.h> 00052 #include <unistd.h> 00053 #include <unistd.h> 00054 00055 //! delay in us of sleep while polling for running thread: 00056 #define POLL_SLEEP 100 00057 00058 //! This class handles passing TCPmessages in and out 00059 /*! This class handles passing TCPmessages in and out; CAUTION: it is a 00060 multi-threaded class. Users can establish a connection with 00061 another host also running a TCPcommunicator by using 00062 contact(). Then, they can send out TCPmessages to that host using 00063 send(). Similarly, other hosts may contact us and send us 00064 messages, which we can then retrieve using receive(). */ 00065 class TCPcommunicator : public ModelComponent { 00066 public: 00067 //! default constructor 00068 TCPcommunicator(OptionManager& mgr, 00069 const std::string& descrName = "TCP Communicator", 00070 const std::string& tagName = "TCPcommunicator"); 00071 00072 //! TCPcommunicator destructor 00073 virtual ~TCPcommunicator(); 00074 00075 //! contact a peer (return fd). hostname contains name:port 00076 int contact(const char *hostname, const bool blocking = true); 00077 00078 //! contact a peer using its IP and port (returns fd) 00079 int contact(const in_addr_t ip, const short int port, 00080 const bool blocking = true); 00081 00082 //! send a message to a known peer: 00083 void send(const int sfd, TCPmessage& smsg); 00084 00085 //! receive a message 00086 /*! receive a message (returns false if no new message); use rfd = 00087 -1 to receive from any currently connected fd (fd received from 00088 will then be put in rfd). If timeout is zero, the call returns 00089 immediately, otherwise may wait up to timeout ms. If err is 00090 non-null, then (*err) will be set to non-zero if an error 00091 occurs. */ 00092 bool receive(int& rfd, TCPmessage& rmsg, const int timeout = 0, 00093 int* err = 0); 00094 00095 //! Do we have any received messages? 00096 /*! Returns the total number of messages in the incoming queues of 00097 our various TCPcliServ members. If rfd != -1, only consider a 00098 given client (fd) rather than any client. */ 00099 int nbReceived(const int rfd = -1); 00100 00101 //! Wait for availability of an incoming message 00102 /*! use rfd = -1 to wait for message on any currently connected fd; 00103 timeout is in milliseconds. */ 00104 void waitFor(const int rfd, const int timeout); 00105 00106 //! terminate all open connections (except master server for new incoming) 00107 void terminateAll(); 00108 00109 //! terminate all open connections but one (and master server) 00110 void terminateAllButOne(const int fd); 00111 00112 //! thread for data transfers 00113 /*! this will run in a different thread and take care of the actual 00114 data transfers, off-loading the main thread from them. Do not call 00115 this directly! */ 00116 void run(); 00117 00118 protected: 00119 OModelParam<in_addr> myIP; //!< My IP address to listen on 00120 OModelParam<int> inqlen; //!< incoming queue lengths 00121 OModelParam<int> ouqlen; //!< outgoing queue lengths 00122 OModelParam<bool> indroplast; //!< incoming queue drop policy 00123 OModelParam<bool> oudroplast; //!< outgoing queue drop policy 00124 OModelParam<bool> disableShm; //!< disbale shm transfers if true 00125 00126 //! manages incoming connections and transfers 00127 nub::soft_ref<SockServ> server; 00128 00129 //! get started and initialize ourselves (before SockServ start) 00130 void start1(); 00131 //! get started and initialize ourselves (after SockServ start) 00132 void start2(); 00133 //! get stopped (before SockServ stop) 00134 void stop1(); 00135 00136 //! poll var until it becomes false 00137 void pollvar(volatile bool *var); 00138 00139 //! internal non-blocking receive check 00140 bool receiveIt(int& rfd, TCPmessage& rmsg, int* err); 00141 00142 //! terminate a TCPcliServ 00143 void terminateCli(const int fd); 00144 00145 //! array of monitored TCPcliServ's 00146 TCPcliServ** cli; 00147 00148 //! all transfers are done via a separate thread 00149 pthread_t runner; 00150 00151 //! these are for communication between main and running threads: 00152 mutable bool to_add, to_stop; 00153 int addfd; 00154 in_addr_t addip; 00155 short int addport; 00156 rutz::atomic_int_t threadRunning; 00157 }; 00158 00159 #endif 00160 00161 // ###################################################################### 00162 /* So things look consistent in everyone's emacs... */ 00163 /* Local Variables: */ 00164 /* indent-tabs-mode: nil */ 00165 /* End: */