TCPcommunicator.C

Go to the documentation of this file.
00001 /*!@file Beowulf/TCPcommunicator.C A class to handle multiple TCPmessage
00002   communications */
00003 
00004 // //////////////////////////////////////////////////////////////////// //
00005 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2001 by the //
00006 // University of Southern California (USC) and the iLab at USC.         //
00007 // See http://iLab.usc.edu for information about this project.          //
00008 // //////////////////////////////////////////////////////////////////// //
00009 // Major portions of the iLab Neuromorphic Vision Toolkit are protected //
00010 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency //
00011 // in Visual Environments, and Applications'' by Christof Koch and      //
00012 // Laurent Itti, California Institute of Technology, 2001 (patent       //
00013 // pending; application number 09/912,225 filed July 23, 2001; see      //
00014 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status).     //
00015 // //////////////////////////////////////////////////////////////////// //
00016 // This file is part of the iLab Neuromorphic Vision C++ Toolkit.       //
00017 //                                                                      //
00018 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can   //
00019 // redistribute it and/or modify it under the terms of the GNU General  //
00020 // Public License as published by the Free Software Foundation; either  //
00021 // version 2 of the License, or (at your option) any later version.     //
00022 //                                                                      //
00023 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope  //
00024 // that it will be useful, but WITHOUT ANY WARRANTY; without even the   //
00025 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR      //
00026 // PURPOSE.  See the GNU General Public License for more details.       //
00027 //                                                                      //
00028 // You should have received a copy of the GNU General Public License    //
00029 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write   //
00030 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,   //
00031 // Boston, MA 02111-1307 USA.                                           //
00032 // //////////////////////////////////////////////////////////////////// //
00033 //
00034 // Primary maintainer for this file: Laurent Itti <itti@usc.edu>
00035 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Beowulf/TCPcommunicator.C $
00036 // $Id: TCPcommunicator.C 11538 2009-07-30 06:23:37Z itti $
00037 //
00038 
00039 #include "Beowulf/TCPcommunicator.H"
00040 
00041 #include "Beowulf/BeowulfOpts.H"
00042 #include "Beowulf/SockServ.H"
00043 #include "Beowulf/TCPdefs.H"
00044 #include "Component/OptionManager.H"
00045 #include "Util/Assert.H"
00046 #include "Util/log.H"
00047 
00048 #include <arpa/inet.h>
00049 #include <fcntl.h>
00050 #include <netdb.h>
00051 #include <netinet/in.h>
00052 #include <signal.h>
00053 #include <sys/socket.h>
00054 #include <unistd.h>
00055 
00056 void* TCPcommunicator_run(void *c0);  // will live in a separate thread
00057 
00058 //! for id-logging; see log.H:
00059 #define MYLOGID fd
00060 
00061 namespace
00062 {
00063   bool operator==(const in_addr& addr1, const in_addr& addr2)
00064   {
00065     return addr1.s_addr == addr2.s_addr;
00066   }
00067 }
00068 
00069 // ######################################################################
00070 void* TCPcommunicator_run(void *c0)
00071 {
00072   // this is just a wrapper so that we call the member function run() on
00073   // the TCPcommunicator object passed as argument:
00074   TCPcommunicator *c = (TCPcommunicator *)c0;
00075   c->run(); return NULL;
00076 }
00077 
00078 // ######################################################################
00079 TCPcommunicator::TCPcommunicator(OptionManager& mgr,
00080                                  const std::string& descrName,
00081                                  const std::string& tagName) :
00082   ModelComponent(mgr, descrName, tagName),
00083   myIP(&OPT_TCPcommunicatorIPaddr, this),
00084   inqlen(&OPT_TCPcommunicatorInQlen, this),
00085   ouqlen(&OPT_TCPcommunicatorOuQlen, this),
00086   indroplast(&OPT_TCPcommunicatorInDropLast, this),
00087   oudroplast(&OPT_TCPcommunicatorOuDropLast, this),
00088   disableShm(&OPT_TCPcommunicatorDisableShm, this),
00089   server(new SockServ(mgr))
00090 {
00091   // make our server a subcomponent of us:
00092   addSubComponent(server);
00093   threadRunning.atomic_set(0);
00094 }
00095 
00096 // ######################################################################
00097 void TCPcommunicator::start1()
00098 {
00099   // determine my IP if not given:
00100   if (myIP.getVal().s_addr == 0)
00101     {
00102       char hostname[32]; gethostname(hostname, 32);
00103       struct hostent *he = gethostbyname(hostname);
00104       if (he == NULL) LFATAL("Cannot determine my IP address");
00105 
00106       in_addr myip;
00107       myip.s_addr = ntohl( ((in_addr *)(he->h_addr_list[0]))->s_addr );
00108       myIP.setVal(myip);
00109       LINFO("Using IP address %s", myIP.getValString().c_str());
00110     }
00111 
00112   // block signals:
00113   sigset_t sset; sigemptyset(&sset);
00114   sigaddset(&sset, SIGHUP); sigaddset(&sset, SIGPIPE);
00115   int s = sigprocmask(SIG_BLOCK, &sset, NULL);
00116   if (s != 0) PLERROR("Sigprocmask failed");
00117 
00118   // let now our SockServ start, and we'll be back in start2()
00119 }
00120 
00121 // ######################################################################
00122 void TCPcommunicator::start2()
00123 {
00124   // setup client array:
00125   typedef TCPcliServ* TCPcliServPtr;
00126   cli = new TCPcliServPtr[FD_SETSIZE];
00127   for (int i = 0; i < FD_SETSIZE; i ++) cli[i] = NULL; // no client
00128   to_add = false; to_stop = false;
00129 
00130   // start thread for run():
00131   threadRunning.atomic_set(1);
00132   pthread_create(&runner, NULL, &TCPcommunicator_run, (void *)this);
00133 }
00134 
00135 // ######################################################################
00136 void TCPcommunicator::stop1()
00137 {
00138   // we are going to signal to the worker thread that we want it to
00139   // stop by setting to_stop=true, then we wait for it to respond by
00140   // setting threadRunning=0; however this will also work if the
00141   // worker thread has already quit before we even begin stop1(), in
00142   // which case threadRunning will be already 0
00143 
00144   struct timespec ts, ts2;
00145   while (threadRunning.atomic_get() != 0)
00146     {
00147       to_stop = true;
00148       ts.tv_sec = 0; ts.tv_nsec = 1000 * POLL_SLEEP;
00149       nanosleep(&ts, &ts2);
00150     }
00151 
00152   ASSERT(threadRunning.atomic_get() == 0);
00153 
00154   delete [] cli; cli = 0;
00155 }
00156 
00157 // ######################################################################
00158 TCPcommunicator::~TCPcommunicator()
00159 { }
00160 
00161 // ######################################################################
00162 int TCPcommunicator::contact(const char *hostname, const bool blocking)
00163 {
00164   // extract port information from name:port format in hostname:
00165   char hn2[strlen(hostname) + 1]; strcpy(hn2, hostname);
00166   int port = DEFPORT;
00167   for (int i = strlen(hn2) - 1; i > 0; i --)
00168     if (hn2[i] == ':') // ok, we have port information
00169       { hn2[i] = '\0'; port = atoi(&(hn2[i + 1])); }
00170 
00171   // get IP address:
00172   struct hostent *he = gethostbyname(hn2);
00173   if (he == NULL)
00174     { LERROR("Cannot gethostbyname(%s): %d", hn2, h_errno); return -1; }
00175 
00176   // ready to contact:
00177   return contact(ntohl( ((in_addr *)(he->h_addr_list[0]))->s_addr ),
00178                  port, blocking);
00179 }
00180 
00181 // ######################################################################
00182 int TCPcommunicator::contact(const in_addr_t ip, const short int port,
00183                              const bool blocking)
00184 {
00185   ASSERT(started());
00186 
00187   int fd = socket(AF_INET, SOCK_STREAM, 0);
00188   if (fd == -1) { PLERROR("Cannot create socket"); return -1; }
00189 
00190   // attempt a connect to server:
00191   struct sockaddr_in addr;
00192   addr.sin_family = AF_INET;
00193   addr.sin_addr.s_addr = htonl(ip);
00194   addr.sin_port = htons(port);
00195 
00196   struct in_addr him; him.s_addr = htonl(ip);
00197   if (connect(fd, (struct sockaddr*)(&addr), sizeof(addr)) == -1) {
00198     IDPLERROR("Error connecting to %s:%hu", inet_ntoa(him), port);
00199     return -1;
00200   }
00201   IDLDEBUG("Connected to %s:%hu", inet_ntoa(him), port);
00202 
00203   // switch to non-blocking mode:
00204   int flags = fcntl(fd, F_GETFL, 0);
00205   if (flags == -1)
00206     { IDPLERROR("Cannot get socket flags"); close(fd); return -1; }
00207   if (fcntl(fd, F_SETFL, O_NONBLOCK | flags) == -1)
00208     { IDPLERROR("Cannot set socket flags"); close(fd); return -1; }
00209 
00210   // get ready to add new TCPcliServ (will be done in run thread):
00211   addfd = fd; addip = ip; addport = port;
00212   pollvar(&to_add); to_add = true; if (blocking) pollvar(&to_add);
00213 
00214   return fd;
00215 }
00216 
00217 // ######################################################################
00218 void TCPcommunicator::run()
00219 {
00220   bool running = true;
00221   const short myPort = server->getModelParamVal<short>("SockServPort");
00222 
00223   while(running) {
00224     // were we instructed to stop?
00225     if (to_stop) { running = false; break; }
00226 
00227     // check if we have a new client added through contact():
00228     if (to_add) {
00229       if (cli[addfd]) {
00230         LERROR("Already have client %d! Killing.", addfd);
00231         terminateCli(addfd);
00232       }
00233       cli[addfd] = new
00234         TCPcliServ(addfd, myIP.getVal().s_addr, myPort, addip, addport,
00235                    inqlen.getVal(), ouqlen.getVal(),
00236                    indroplast.getVal(), oudroplast.getVal(),
00237                    disableShm.getVal());
00238       server->addClient(addfd); server->monitorRead(addfd);
00239       server->monitorError(addfd); to_add = false;
00240     }
00241 
00242     // check what's going on with all our clients:
00243     int client;
00244     int result = server->check(TCP_TIMEOUT_SEC, TCP_TIMEOUT_USEC);
00245 
00246     switch(result) {
00247 
00248     case SOCKSERV_BUG:                        // server got bogus. The end...
00249       LERROR("Server messed-up. ABORT."); running = false;
00250       break;
00251     case SOCKSERV_IDLE:                                // nothing happened...
00252       break;
00253     case SOCKSERV_ACTIV:                 // some activity! let's check it out
00254 
00255       // ########## first, check for new clients and configure them:
00256       while((client = server->getNewClient())) {
00257         if (cli[client]) {
00258           LDEBUG("Already have TCPcliServ %d? Killing", client);
00259           terminateCli(client);
00260         }
00261         cli[client] = new TCPcliServ(client, myIP.getVal().s_addr, myPort,
00262                                      server->getClientIP(client),
00263                                      server->getClientPort(client),
00264                                      inqlen.getVal(), ouqlen.getVal(),
00265                                      indroplast.getVal(), oudroplast.getVal(),
00266                                      disableShm.getVal());
00267         server->monitorRead(client); server->monitorWrite(client, false);
00268         server->monitorError(client);
00269         // wait until this client has some data for us to read
00270       }
00271 
00272       // ########## check for clients that got errored:
00273       while((client = server->getErrorClient())) {
00274         if (cli[client] == NULL) continue;  // already gone
00275         LDEBUG("Connection to client %d messed-up. Terminating.", client);
00276         terminateCli(client);
00277       }
00278 
00279       // ########## check for clients ready to be read from / written to:
00280       while((client = server->getRWClient()))
00281         {
00282           if (cli[client] == NULL) continue;  // already gone
00283           switch(cli[client]->check())
00284             {
00285             case TCPDONE:                 // transaction done -> wait for next one
00286             case TCPWAITREAD:                          // this guy needs more data
00287               server->monitorRead(client);
00288               server->monitorWrite(client, false);
00289               break;
00290             case TCPWAITWRITE:                 // this guy wants to send more data
00291               server->monitorWrite(client);
00292               server->monitorRead(client);  // always be open to incoming stuff
00293               break;
00294             case TCPWAITRW:             // this guy wants to send & read more data
00295               server->monitorWrite(client);
00296               server->monitorRead(client);
00297               break;
00298             case TCPBUG:                                  // this guy is messed-up
00299               LDEBUG("TCPcliServ %d messed-up. Terminating.", client);
00300               terminateCli(client);
00301               break;
00302             case TCPFINISH:                                // this guy is finished
00303               terminateCli(client);
00304               break;
00305             }
00306         }
00307       break;
00308     default:
00309       LERROR("Server gave unknown result %d. Ignored.", result);
00310     }
00311   }
00312   threadRunning.atomic_set(0);
00313   pthread_exit(0);
00314 }
00315 
00316 // ######################################################################
00317 void TCPcommunicator::terminateAll()
00318 { for (int i = 0; i < FD_SETSIZE; i ++) if (cli[i]) terminateCli(i); }
00319 
00320 // ######################################################################
00321 void TCPcommunicator::terminateAllButOne(const int fd)
00322 { for (int i = 0; i < FD_SETSIZE; i ++) if (cli[i] && i!=fd) terminateCli(i); }
00323 
00324 // ######################################################################
00325 void TCPcommunicator::terminateCli(const int fd)
00326 {
00327   ASSERT(fd >= 0 && fd < FD_SETSIZE);
00328   if (cli[fd])
00329     {
00330       server->disconnect(fd);
00331       if (cli[fd]) delete cli[fd];
00332       cli[fd] = NULL;
00333     }
00334 }
00335 
00336 // ######################################################################
00337 void TCPcommunicator::send(const int sfd, TCPmessage& smsg)
00338 {
00339   ASSERT(sfd >= 0 && sfd < FD_SETSIZE);
00340   if (cli[sfd] == NULL)
00341     LERROR("No client %d to send to!", sfd);
00342   else
00343     {
00344       cli[sfd]->send(smsg);  // thread-safe
00345       server->monitorWrite(sfd); server->monitorRead(sfd);
00346       server->monitorError(sfd);
00347     }
00348 }
00349 
00350 // ######################################################################
00351 bool TCPcommunicator::receive(int& rfd, TCPmessage& rmsg,
00352                               const int timeout, int* err)
00353 {
00354   // is there any incoming message already?
00355   bool got_one = receiveIt(rfd, rmsg, err);
00356   if (got_one) return true;  // got a message!
00357 
00358   // if we have a non-zero timeout, let's do a select() and try again:
00359   if (timeout) {
00360     // wait for the fds to have something ready
00361     waitFor(rfd, timeout);
00362 
00363     // if something came in, it should have also been picked up by the
00364     // receiver thread, and it may be available now.  Try once more
00365     // and return status:
00366     return receiveIt(rfd, rmsg, err);
00367   }
00368 
00369   // nothing received:
00370   return false;
00371 }
00372 
00373 // ######################################################################
00374 int TCPcommunicator::nbReceived(const int rfd)
00375 {
00376   int nb = 0;
00377 
00378   if (rfd != -1)  // only check specific fd
00379     {
00380       if (cli[rfd] == NULL) { LERROR("No client %d!", rfd); nb = 0; }
00381       else nb = cli[rfd]->nbReceived();
00382     }
00383   else  // rfd = -1, then check all fds
00384     for (int i = 0; i < FD_SETSIZE; i ++)
00385       if (cli[i] != NULL)
00386         nb += cli[i]->nbReceived();
00387   return nb;
00388 }
00389 
00390 // ######################################################################
00391 void TCPcommunicator::waitFor(const int rfd, const int timeout)
00392 {
00393   // we'll do a blocking select on those fds we are interested in:
00394   fd_set rfds, wfds, efds; FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds);
00395   if (rfd == -1) {  // monitor all our fds
00396     for (int i = 0; i < FD_SETSIZE; i ++)
00397       if (cli[i] != NULL) FD_SET(i, &rfds);
00398   } else FD_SET(rfd, &rfds);
00399   struct timeval to; to.tv_sec = 0; to.tv_usec = timeout * 1000;
00400 
00401   // don't care about the result; we just want to wait:
00402   select(FD_SETSIZE, &rfds, &wfds, &efds, &to);
00403 }
00404 
00405 // ######################################################################
00406 void TCPcommunicator::pollvar(volatile bool *var)
00407 {
00408   struct timespec ts, ts2;
00409   while (*var == true)
00410     {
00411        ts.tv_sec = 0; ts.tv_nsec = 1000 * POLL_SLEEP;
00412        nanosleep(&ts, &ts2);
00413     }
00414 }
00415 
00416 // ######################################################################
00417 bool TCPcommunicator::receiveIt(int& rfd, TCPmessage& rmsg, int* err)
00418 {
00419   if (threadRunning.atomic_get() != 1)
00420     LFATAL("Oops! The communicator is not running anymore");
00421 
00422   ASSERT(rfd >= -1 && rfd < FD_SETSIZE); bool got_one = false;
00423 
00424   if (rfd != -1)  // receive from specific fd:
00425     {
00426       if (cli[rfd] == NULL)
00427         {
00428           LERROR("No client %d to receive from!", rfd);
00429           if (err != 0) *err = 1;
00430           got_one = false;
00431         }
00432       else
00433         got_one = cli[rfd]->receive(rmsg);
00434     }
00435   else  // rfd = -1, then receive from any fd:
00436     {
00437       for (int i = 0; i < FD_SETSIZE; i ++)
00438         if (cli[i] != NULL)
00439           {
00440             got_one = cli[i]->receive(rmsg);
00441             if (got_one) { rfd = i; break; }
00442           }
00443     }
00444   return got_one;
00445 }
00446 
00447 // ######################################################################
00448 /* So things look consistent in everyone's emacs... */
00449 /* Local Variables: */
00450 /* indent-tabs-mode: nil */
00451 /* End: */
Generated on Sun May 8 08:40:20 2011 for iLab Neuromorphic Vision Toolkit by  doxygen 1.6.3