TCPcliServ.C

Go to the documentation of this file.
00001 /*!@file Beowulf/TCPcliServ.C A client/server to receive/send TCPmessage */
00002 
00003 // //////////////////////////////////////////////////////////////////// //
00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2001 by the //
00005 // University of Southern California (USC) and the iLab at USC.         //
00006 // See http://iLab.usc.edu for information about this project.          //
00007 // //////////////////////////////////////////////////////////////////// //
00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected //
00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency //
00010 // in Visual Environments, and Applications'' by Christof Koch and      //
00011 // Laurent Itti, California Institute of Technology, 2001 (patent       //
00012 // pending; application number 09/912,225 filed July 23, 2001; see      //
00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status).     //
00014 // //////////////////////////////////////////////////////////////////// //
00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit.       //
00016 //                                                                      //
00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can   //
00018 // redistribute it and/or modify it under the terms of the GNU General  //
00019 // Public License as published by the Free Software Foundation; either  //
00020 // version 2 of the License, or (at your option) any later version.     //
00021 //                                                                      //
00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope  //
00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the   //
00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR      //
00025 // PURPOSE.  See the GNU General Public License for more details.       //
00026 //                                                                      //
00027 // You should have received a copy of the GNU General Public License    //
00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write   //
00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,   //
00030 // Boston, MA 02111-1307 USA.                                           //
00031 // //////////////////////////////////////////////////////////////////// //
00032 //
00033 // Primary maintainer for this file: Laurent Itti <itti@usc.edu>
00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Beowulf/TCPcliServ.C $
00035 // $Id: TCPcliServ.C 11538 2009-07-30 06:23:37Z itti $
00036 //
00037 
00038 #include "Beowulf/TCPcliServ.H"
00039 
00040 #include "Beowulf/TCPdefs.H"
00041 #include "Util/log.H"
00042 #include <arpa/inet.h>
00043 #include <errno.h>
00044 #include <fcntl.h>
00045 #ifdef HAVE_NET_ETHERNET_H
00046 #include <net/ethernet.h>
00047 #else
00048 // workaround if we don't have <net/ethernet.h>
00049 #define ETHERMTU 1500
00050 #endif
00051 #include <netdb.h>
00052 #include <netinet/in.h>
00053 #include <netinet/tcp.h>
00054 #include <stdlib.h>
00055 #include <string.h>
00056 #include <sys/shm.h>
00057 #include <sys/socket.h>
00058 #include <sys/types.h>
00059 #include <unistd.h>
00060 
00061 //! for ID-logging. See log.H:
00062 #define MYLOGID fd
00063 
00064 //! internal states:
00065 #define TCPCS_UNKNOWN   0
00066 #define TCPCS_READING   1
00067 #define TCPCS_WRITING   2
00068 #define TCPCS_BOGUS     4
00069 
00070 // ######################################################################
00071 TCPcliServ::TCPcliServ()
00072 { state = TCPCS_BOGUS; }
00073 
00074 // ######################################################################
00075 void TCPcliServ::init(const int connected_fd,
00076                       const in_addr_t my_ipaddr,
00077                       const short int my_port,
00078                       const in_addr_t cli_ipaddr,
00079                       const short int cli_port,
00080                       const int inqlen, const int outqlen,
00081                       const bool indlast, const bool outdlast,
00082                       const bool disableShm)
00083 {
00084   shmcount = (getpid() << 24) + (size_t(this) << 16);
00085   fd = connected_fd; myIP = my_ipaddr; myPort = my_port;
00086   cliIP = cli_ipaddr; cliPort = cli_port;
00087   inmsgqlen = inqlen; outmsgqlen = outqlen;
00088   indroplast = indlast; outdroplast = outdlast;
00089 
00090   // switch fd to non-blocking and TCP_NODELAY modes:
00091   int flags = fcntl(fd, F_GETFL, 0);
00092   if (flags == -1)
00093     { IDPLERROR("Cannot get socket flags"); state = TCPCS_BOGUS; }
00094   if (fcntl(fd, F_SETFL, O_NONBLOCK | flags) == -1)
00095     { IDPLERROR("Cannot set socket flags"); state = TCPCS_BOGUS; }
00096   int w = 1;  // watermark size
00097   if (setsockopt(fd, 6, TCP_NODELAY, (void *)&w, sizeof(int)))
00098     { IDPLERROR("Cannot set socket TCP_NODELAY"); state = TCPCS_BOGUS; }
00099 
00100   // init mutexes: FIXME: bogus if init called several times...
00101   pthread_mutex_init(&mutin, NULL);
00102   pthread_mutex_init(&mutou, NULL);
00103 
00104   // do other standard initializations:
00105   reset();
00106 
00107   // can we use shared memory?
00108   if (myIP == cliIP && disableShm == false) useShm = true; else useShm = false;
00109 
00110   // show some debug info:
00111   struct in_addr me; me.s_addr = htonl(myIP);
00112   struct in_addr him; him.s_addr = htonl(cliIP);
00113   char buf[32]; strncpy(buf, inet_ntoa(me), 32);
00114   if (useShm)
00115     IDLDEBUG("Ready: %s:%hu <-SHM-> %s:%hu", buf, myPort,
00116              inet_ntoa(him), cliPort);
00117   else
00118     IDLDEBUG("Ready: %s:%hu <-TCP-> %s:%hu", buf, myPort,
00119              inet_ntoa(him), cliPort);
00120 }
00121 
00122 // ######################################################################
00123 TCPcliServ::TCPcliServ(const int connected_fd,
00124                        const in_addr_t my_ipaddr,
00125                        const short int my_port,
00126                        const in_addr_t cli_ipaddr,
00127                        const short int cli_port,
00128                        const int inqlen, const int outqlen,
00129                        const bool indlast, const bool outdlast,
00130                        const bool disableShm)
00131 {
00132   state = TCPCS_BOGUS;
00133   init(connected_fd, my_ipaddr, my_port, cli_ipaddr, cli_port,
00134        inqlen, outqlen, indlast, outdlast, disableShm);
00135 }
00136 
00137 // ######################################################################
00138 TCPcliServ::~TCPcliServ()
00139 {
00140   disconnect(); pthread_mutex_destroy(&mutin); pthread_mutex_destroy(&mutou);
00141 
00142   // release all our shared memory segments:
00143   if (shmmap.empty() == false)
00144     for (std::map<int, void *>::const_iterator
00145            m = shmmap.begin(); m != shmmap.end(); m ++)
00146       if (shmdt(m->second) == -1) IDPLERROR("Error detaching Shm %d",m->first);
00147 }
00148 
00149 // ######################################################################
00150 void TCPcliServ::send(const TCPmessage& msg)
00151 {
00152   pthread_mutex_lock(&mutou);
00153 
00154   // check if our outgoing queue is full, and take action. Be careful
00155   // not to drop Shm acknowledge messages, though:
00156   int xx;
00157   if (outmsgqlen > 0 && oumsg.size() >= outmsgqlen &&
00158       msg.checkShmDone(xx) == false)
00159     {
00160       if (outdroplast)
00161       {
00162        // IDLERROR("Outgoing queue full -- DROPPING MOST RECENT MESSAGE");
00163       }
00164       else
00165         {
00166          // IDLERROR("Outgoing queue full -- DROPPING LEAST RECENT MESSAGE");
00167           if (oumsg.front().checkShmDone(xx) == false) oumsg.pop_front();
00168           oumsg.push_back(msg);  // keep local copy of message
00169         }
00170     }
00171   else
00172     oumsg.push_back(msg);  // keep local copy of message
00173 
00174   pthread_mutex_unlock(&mutou);
00175   // message will be sent out whenever its turn comes in check()...
00176 }
00177 
00178 // ######################################################################
00179 bool TCPcliServ::receive(TCPmessage& msg)
00180 {
00181   bool ret = false;
00182   pthread_mutex_lock(&mutin);
00183   if (inmsg.size() && inmsg.front().isBusy() == false)
00184     { msg = inmsg.front(); inmsg.pop_front(); ret = true; }
00185   pthread_mutex_unlock(&mutin);
00186   return ret;
00187 }
00188 
00189 // ######################################################################
00190 int TCPcliServ::nbReceived()
00191 {
00192   pthread_mutex_lock(&mutin);
00193   int ret = inmsg.size();
00194   if (ret > 0 && inmsg.front().isBusy()) ret = 0; // one msg, still receiving
00195   pthread_mutex_unlock(&mutin);
00196   return ret;
00197 }
00198 
00199 // ######################################################################
00200 int TCPcliServ::check()
00201 {
00202   if (state == TCPCS_BOGUS) return TCPBUG;  // I am bogus... leave me alone!
00203 
00204   // are we reading in a message?
00205   if (state & TCPCS_READING)
00206     {
00207       int r = im.readFrom(fd);  // read more; returns TCPWAITREAD, TCPDONE, or TCPBUG
00208       switch(r)
00209         {
00210         case TCPWAITREAD:  // need more data for current message
00211           break;
00212         case TCPDONE:      // current message done; save it and wait for new one
00213           storeReceivedMessage();  // store what we have received
00214           state &= ~TCPCS_READING;  // not reading a message any more...
00215           break;
00216         case TCPBUG:       // something went wrong; suicide
00217           state = TCPCS_BOGUS;
00218           break;
00219         default:
00220           LFATAL("Unknown return from TCPmessage::readFrom()");
00221         }
00222     }
00223 
00224   // if we are not reading anything, is there a new incoming message?
00225   if ((state & TCPCS_READING) == 0)
00226     {
00227       // attempt a read and see if there is anything coming in:
00228       int r = im.readHeaderFrom(fd);
00229       switch(r)
00230         {
00231         case TCPDONE:     // got a new message and we have its header ready
00232           // does the message have a non-empty body? if so, let's get
00233           // ready to read it. Otherwise we are done with this message.
00234           if (im.getSize() > 0) state |= TCPCS_READING;  // will start reading
00235           else storeReceivedMessage(); // store what we have received
00236           break;
00237         case TCPWAITREAD: // nothing new
00238           break;
00239         default:       // something bogus happened
00240           state = TCPCS_BOGUS;
00241         }
00242     }
00243 
00244   // are we writing out a message?
00245   if (state & TCPCS_WRITING)
00246     {
00247       int r = om.writeTo(fd);  // returns WAITWRITE, DONE, or BUG
00248       switch(r)
00249         {
00250         case TCPWAITWRITE: // send more data for current message
00251           break;
00252         case TCPDONE:      // current message done
00253           state &= ~TCPCS_WRITING;
00254           break;
00255         case TCPBUG:       // something went wrong; suicide
00256           state = TCPCS_BOGUS;
00257           break;
00258         default:
00259           LFATAL("Unknown return from TCPmessage::writeTo()");
00260         }
00261     }
00262 
00263   // if not writing, do we have pending messages in out queue?
00264   if ((state & TCPCS_WRITING) == 0)
00265     {
00266       // we are not writing any message; do we have one in the queue?
00267       pthread_mutex_lock(&mutou);
00268       if (oumsg.size())
00269         {
00270           int xx; bool shmdone = oumsg.front().checkShmDone(xx);
00271           int siz = oumsg.front().getSize();
00272 
00273           // are we using shared memory, and this message is not an shm
00274           // message acknowledge (which must go via TCP), and the
00275           // message is not very small (otherwise, TCP will be faster)?
00276           if (useShm &&
00277               shmdone == false &&
00278               siz > int(ETHERMTU - sizeof(TCPmessage::TCPmessageHeader)))
00279             {
00280               key_t key = shmcount ++;
00281 
00282               // create new shared memory segment:
00283               int shmid = shmget(key, siz, 0666 | IPC_CREAT | IPC_EXCL);
00284               if (shmid == -1)
00285                 IDPLFATAL("Cannot create shared memory segment");
00286 
00287               // attach segment to our address space:
00288               char *shmbuf = (char *)shmat(shmid, NULL, 0);
00289               if (shmbuf == (char *)(-1))
00290                 IDPLFATAL("Cannot attach shared memory segment");
00291 
00292               // keep track of this new segment:
00293               shmmap[shmid] = shmbuf;
00294 
00295               // copy message into shared memory zone:
00296               memcpy(shmbuf, (void *)(oumsg.front().getMsg()), siz);
00297 
00298               // create a TCPmessage of type TCPMSG_SHMINFO
00299               om.reset(oumsg.front().getID(), oumsg.front().getAction());
00300               om.addShmInfo(shmid, siz);
00301 
00302               // pop message to be sent from send queue:
00303               oumsg.pop_front();
00304             }
00305           else
00306             {
00307               // regular TCP transfer:
00308               om = oumsg.front(); // get next message to write out
00309               oumsg.pop_front();  // not in the queue any more...
00310             }
00311           if (shmdone == false)
00312             LDEBUG("Sending off [%d, %d] to %d",om.getID(),om.getAction(),fd);
00313 
00314           // write out the message header:
00315           if (om.writeHeaderTo(fd) != TCPDONE)
00316             { state = TCPCS_BOGUS; return TCPBUG; }
00317 
00318           // does the message have a non-empty body? if so, we will
00319           // write it out at the next check(). Otherwise we are done.
00320           if (om.getSize() > 0)
00321             state |= TCPCS_WRITING;  // ok, we are writing this guy out
00322         }
00323       pthread_mutex_unlock(&mutou);
00324     }
00325 
00326   // ok, now what do we return to SockServ? we always want to monitor for read,
00327   // and also for write if we are writing something out.
00328   if (state == TCPCS_BOGUS) return TCPBUG;
00329   else if (state & TCPCS_WRITING) return TCPWAITRW;
00330   else return TCPWAITREAD;
00331 }
00332 
00333 // ######################################################################
00334 int TCPcliServ::disconnect()
00335 { reset(); return TCPFINISH; }
00336 
00337 // ######################################################################
00338 int TCPcliServ::reset()
00339 {
00340   state = TCPCS_UNKNOWN;
00341   return TCPDONE;
00342 }
00343 
00344 // ######################################################################
00345 void TCPcliServ::storeReceivedMessage()
00346 {
00347   int shmid, siz;
00348 
00349   // is it a shared memory message?
00350   if (im.checkShmInfo(shmid, siz))
00351     {
00352       // attach to shared memory segment:
00353       char *msgbuf = (char *)shmat(shmid, NULL, 0);
00354       if (msgbuf == (char *)(-1))
00355         IDPLERROR("Cannot attach to shared memory segment");
00356       else
00357         {
00358           // make a deep copy of the shared memory segment into our
00359           // private memory, so that we can immediately release the
00360           // shared memory:
00361           TCPmessage insm(im.getID(), im.getAction(), im.getETI(),
00362                           msgbuf, siz);
00363 
00364           // push the TCPmessage into our received queue:
00365           queueIncomingMessage(insm);
00366 
00367           // build an acknowledgement message:
00368           TCPmessage ack(insm.getID(), insm.getAction());
00369           ack.addShmDone(shmid);
00370 
00371           // release shared memory:
00372           if (shmdt(msgbuf) == -1)
00373             IDPLERROR("Error detaching Shm segment %d", shmid);
00374 
00375           // send acknowledgement (will be forced to go via TCP):
00376           send(ack);
00377         }
00378     }
00379   // is it a shared memory segment release?
00380   else if (im.checkShmDone(shmid))
00381     {
00382       std::map<int, void *>::const_iterator m = shmmap.find(shmid);
00383       if (m != shmmap.end())
00384         {
00385           // ok, we do have this segment; let's release it:
00386           if (shmdt(m->second) == -1)
00387             IDPLERROR("Error detaching Shm segment %d", m->first);
00388 
00389           // nobody needs it anymore; let's destroy it:
00390           if (shmctl(shmid, IPC_RMID, 0) == -1)
00391             IDPLERROR("Error deleting Shm segment %d", m->first);
00392 
00393           // let's forget about this segment:
00394           shmmap.erase(shmid);
00395         }
00396       else
00397         IDLERROR("Attempt to release unknown Shm %d -- IGNORED", shmid);
00398     }
00399   // then it's a regular TCP message
00400   else
00401     queueIncomingMessage(im);
00402 
00403   // we are done with the contents of this message:
00404   im.freeMem();
00405 }
00406 
00407 // ######################################################################
00408 void TCPcliServ::queueIncomingMessage(TCPmessage& imsg)
00409 {
00410   pthread_mutex_lock(&mutin);
00411 
00412   // check if our incoming queue is full, and take action. Be careful
00413   // not to drop Shm acknowledge messages, though:
00414   int xx;
00415   if (inmsgqlen > 0 && inmsg.size() >= inmsgqlen &&
00416       imsg.checkShmDone(xx) == false)
00417     {
00418       if (indroplast)
00419       {
00420        // IDLERROR("Incoming queue full -- DROPPING MOST RECENT MESSAGE");
00421       }
00422       else
00423         {
00424          // IDLERROR("Incoming queue full -- DROPPING LEAST RECENT MESSAGE");
00425           if (inmsg.front().checkShmDone(xx) == false) inmsg.pop_front();
00426           inmsg.push_back(imsg);
00427         }
00428     }
00429   else
00430     inmsg.push_back(imsg);
00431 
00432   pthread_mutex_unlock(&mutin);
00433 }
00434 
00435 // ######################################################################
00436 /* So things look consistent in everyone's emacs... */
00437 /* Local Variables: */
00438 /* indent-tabs-mode: nil */
00439 /* End: */
Generated on Sun May 8 08:04:33 2011 for iLab Neuromorphic Vision Toolkit by  doxygen 1.6.3