
Go to the documentation of this file.
00001 /*!@file Beowulf/TCPmessage.C Direct message passing over TCP connections */
00003 // //////////////////////////////////////////////////////////////////// //
00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2001 by the //
00005 // University of Southern California (USC) and the iLab at USC.         //
00006 // See for information about this project.          //
00007 // //////////////////////////////////////////////////////////////////// //
00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected //
00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency //
00010 // in Visual Environments, and Applications'' by Christof Koch and      //
00011 // Laurent Itti, California Institute of Technology, 2001 (patent       //
00012 // pending; application number 09/912,225 filed July 23, 2001; see      //
00013 // for current status).     //
00014 // //////////////////////////////////////////////////////////////////// //
00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit.       //
00016 //                                                                      //
00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can   //
00018 // redistribute it and/or modify it under the terms of the GNU General  //
00019 // Public License as published by the Free Software Foundation; either  //
00020 // version 2 of the License, or (at your option) any later version.     //
00021 //                                                                      //
00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope  //
00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the   //
00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR      //
00025 // PURPOSE.  See the GNU General Public License for more details.       //
00026 //                                                                      //
00027 // You should have received a copy of the GNU General Public License    //
00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write   //
00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,   //
00030 // Boston, MA 02111-1307 USA.                                           //
00031 // //////////////////////////////////////////////////////////////////// //
00032 //
00033 // Primary maintainer for this file: Laurent Itti <>
00034 // $HeadURL: svn:// $
00035 // $Id: TCPmessage.C 11538 2009-07-30 06:23:37Z itti $
00036 //
00038 #include "Beowulf/TCPmessage.H"
00040 #include "Beowulf/TCPdefs.H"
00041 #include "Image/Image.H"
00042 #include "Image/ImageSet.H"
00043 #include "Image/Pixels.H"
00044 #include "Util/log.H"
00046 #include <arpa/inet.h>
00047 #include <errno.h>
00048 #include <netdb.h>
00049 #include <netinet/in.h>
00050 #include <unistd.h>
00052 //! use message ID for ID-logging; see log.H:
00053 #define MYLOGID itsHead.itsId
00055 // ######################################################################
00056 TCPmessage::TCPmessage() :
00057   itsMsg(new ArrayData<char>())
00058 { itsBusy = false; reset(0, 0, 0.0F); }
00060 // ######################################################################
00061 TCPmessage::TCPmessage(const int32 msgid, const int32 msgaction,
00062                        const float msgeti) :
00063   itsMsg(new ArrayData<char>())
00064 { itsBusy = false; reset(msgid, msgaction, msgeti); }
00066 // ######################################################################
00067 TCPmessage::TCPmessage(const int32 msgid, const int32 msgaction,
00068                        const float msgeti,
00069                        const void *buf, const int bufsize) :
00070   itsMsg(new ArrayData<char>())
00071 {
00072   itsBusy = false; reset(msgid, msgaction, msgeti);
00073   resize(bufsize, false);
00074   memcpy(getMsg(), buf, bufsize);
00075   itsUpkidx = 0; itsHead.itsSize = bufsize;
00076 }
00078 // ######################################################################
00079 TCPmessage::TCPmessage(const TCPmessage& m) :
00080   itsMsg(m.itsMsg)
00081 { itsBusy = m.itsBusy; itsHead = m.itsHead; itsUpkidx = m.itsUpkidx; }
00083 // ######################################################################
00084 TCPmessage& TCPmessage::operator=(const TCPmessage& m)
00085 {
00086   ArrayHandle<char> msgcopy(m.itsMsg); itsMsg.swap(msgcopy);
00087   itsBusy = m.itsBusy; itsHead = m.itsHead; itsUpkidx = m.itsUpkidx;
00088   return *this;
00089 }
00091 // ######################################################################
00092 TCPmessage::~TCPmessage()
00093 { }
00095 // ######################################################################
00096 void TCPmessage::addImage(const Image< PixRGB<byte> >& im)
00097 {
00098   int32 stuff[3];
00099   stuff[0] = TCPMSG_COLBYTEIMA; stuff[1] = im.getWidth();
00100   stuff[2] = im.getHeight(); pack(stuff, 3);
00101   pack(im.getArrayPtr(), im.getSize());
00102 }
00104 // ######################################################################
00105 void TCPmessage::addImage(const Image<byte>& im)
00106 {
00107   int32 stuff[3];
00108   stuff[0] = TCPMSG_BYTEIMA; stuff[1] = im.getWidth();
00109   stuff[2] = im.getHeight(); pack(stuff, 3);
00110   pack(im.getArrayPtr(), im.getSize());
00111 }
00113 // ######################################################################
00114 void TCPmessage::addImage(const Image<float>& im)
00115 {
00116   int32 stuff[3];
00117   stuff[0] = TCPMSG_FLOATIMA; stuff[1] = im.getWidth();
00118   stuff[2] = im.getHeight(); pack(stuff, 3);
00119   pack(im.getArrayPtr(), im.getSize());
00120 }
00122 // ######################################################################
00123 void TCPmessage::addImageSet(const ImageSet<float>& im)
00124 {
00125   int32 stuff[2];
00126   stuff[0] = TCPMSG_FLOATIMASET; stuff[1] = im.size(); pack(stuff, 2);
00127   for (uint i = 0; i < im.size(); i ++) addImage(im[i]);
00128 }
00130 // ######################################################################
00131 void TCPmessage::addFixation(const Fixation& fix)
00132 {
00133   int32 typ = TCPMSG_FIXATION;
00134   pack(&typ, 1); pack(&fix.i, 1); pack(&fix.j, 1); pack(&fix.frame, 1);
00135 }
00137 // ######################################################################
00138 void TCPmessage::addString(const char* str)
00139 {
00140   int32 typ = TCPMSG_STRING; pack(&typ, 1); pack(str, strlen(str) + 1);
00141 }
00143 // ######################################################################
00144 void TCPmessage::addInt32(const int32 val)
00145 {
00146   int32 typ = TCPMSG_INT32; pack(&typ, 1); pack(&val, 1);
00147 }
00149 // ######################################################################
00150 void TCPmessage::addInt64(const int64 val)
00151 {
00152   int32 typ = TCPMSG_INT64; pack(&typ, 1); pack(&val, 1);
00153 }
00155 // ######################################################################
00156 void TCPmessage::addFloat(const float val)
00157 {
00158   int32 typ = TCPMSG_FLOAT; pack(&typ, 1); pack(&val, 1);
00159 }
00161 // ######################################################################
00162 void TCPmessage::addDouble(const double val)
00163 {
00164   int32 typ = TCPMSG_DOUBLE; pack(&typ, 1); pack(&val, 1);
00165 }
00167 // ######################################################################
00168 void TCPmessage::reset(const int32 msgid, const int32 msgaction,
00169                        const float msgeti)
00170 {
00171   freeMem();           // delete old message if any
00172   itsHead.itsId = msgid; itsHead.itsAction = msgaction;
00173   itsHead.itsETI = msgeti; itsHead.itsSize = 0;
00174 }
00176 // ######################################################################
00177 void TCPmessage::freeMem()
00178 {
00179   while (itsBusy) { LERROR("freeMem() while busy! Sleeping..."); sleep(1); }
00180   ArrayHandle<char> h;  // handle with empty array
00181   itsMsg.swap(h);
00182   itsHead.itsId = 0; itsHead.itsAction = 0; itsHead.itsSize = 0;
00183   itsHead.itsETI = 0.0F; itsBusy = false; itsUpkidx = 0;
00184   itsHeadIdx = 0;
00185 }
00187 // ######################################################################
00188 void TCPmessage::getElementRaw(void **elem, int32& typ)
00189 {
00190   if (itsUpkidx >= itsHead.itsSize)
00191     LFATAL("Trying to unpack past message end");
00193   unpack(&typ, 1);  // get element type
00194   switch(typ)
00195     {
00196     case TCPMSG_COLBYTEIMA:
00197       {
00198         Image<PixRGB<byte> >* im =
00199           new Image<PixRGB<byte> >(decodeColByteIma());
00200         *elem = static_cast<void*>(im);
00201         break;
00202       }
00203     case TCPMSG_BYTEIMA:
00204       {
00205         Image<byte>* im = new Image<byte>(decodeByteIma());
00206         *elem = static_cast<void*>(im);
00207         break;
00208       }
00209     case TCPMSG_FLOATIMA:
00210       {
00211         Image<float>* im = new Image<float>(decodeFloatIma());
00212         *elem = static_cast<void*>(im);
00213         break;
00214       }
00215     case TCPMSG_FLOATIMASET:
00216       {
00217         ImageSet<float>* ims = new ImageSet<float>(decodeFloatImaSet());
00218         *elem = static_cast<void*>(ims);
00219         break;
00220       }
00221     case TCPMSG_FIXATION:
00222       {
00223         Fixation* fix = new Fixation(decodeFixation());
00224         *elem = static_cast<void*>(fix);
00225         break;
00226       }
00227     case TCPMSG_STRING:
00228       {
00229         int32 s = strlen(getMsg() + itsUpkidx) + 1;
00230         char *str = new char[s];
00231         unpack(str, s); *elem = (void *)str;
00232         break;
00233       }
00234     case TCPMSG_INT32:
00235       {
00236         int32 *val = new int32; unpack(val, 1);
00237         *elem = (void *)val;
00238         break;
00239       }
00240     case TCPMSG_FLOAT:
00241       {
00242         float *val = new float; unpack(val, 1);
00243         *elem = (void *)val;
00244         break;
00245       }
00246     case TCPMSG_DOUBLE:
00247       {
00248         double *val = new double; unpack(val, 1);
00249         *elem = (void *)val;
00250         break;
00251       }
00252     case TCPMSG_INT64:
00253       {
00254         int64 *val = new int64; unpack(val, 1);
00255         *elem = (void *)val;
00256         break;
00257       }
00258     default:
00259       LFATAL("Bogus element %d", typ);
00260     }
00261 }
00263 // ######################################################################
00264 Image<PixRGB<byte> > TCPmessage::getElementColByteIma()
00265 {
00266   unpackAndVerifyType(TCPMSG_COLBYTEIMA);
00267   return decodeColByteIma();
00268 }
00270 // ######################################################################
00271 Image<byte> TCPmessage::getElementByteIma()
00272 {
00273   unpackAndVerifyType(TCPMSG_BYTEIMA);
00274   return decodeByteIma();
00275 }
00277 // ######################################################################
00278 Image<float> TCPmessage::getElementFloatIma()
00279 {
00280   unpackAndVerifyType(TCPMSG_FLOATIMA);
00281   return decodeFloatIma();
00282 }
00284 // ######################################################################
00285 ImageSet<float> TCPmessage::getElementFloatImaSet()
00286 {
00287   unpackAndVerifyType(TCPMSG_FLOATIMASET);
00288   return decodeFloatImaSet();
00289 }
00291 // ######################################################################
00292 int32 TCPmessage::getElementInt32()
00293 {
00294   unpackAndVerifyType(TCPMSG_INT32);
00295   int32 result; unpack(&result, 1); return result;
00296 }
00298 // ######################################################################
00299 int64 TCPmessage::getElementInt64()
00300 {
00301   unpackAndVerifyType(TCPMSG_INT64);
00302   int64 result; unpack(&result, 1); return result;
00303 }
00305 // ######################################################################
00306 double TCPmessage::getElementDouble()
00307 {
00308   unpackAndVerifyType(TCPMSG_DOUBLE);
00309   double result; unpack(&result, 1); return result;
00310 }
00312 // ######################################################################
00313 float TCPmessage::getElementFloat()
00314 {
00315   unpackAndVerifyType(TCPMSG_FLOAT);
00316   float result; unpack(&result, 1); return result;
00317 }
00319 // ######################################################################
00320 Fixation TCPmessage::getElementFixation()
00321 {
00322   unpackAndVerifyType(TCPMSG_FIXATION);
00323   return decodeFixation();
00324 }
00326 // ######################################################################
00327 std::string TCPmessage::getElementString()
00328 {
00329   unpackAndVerifyType(TCPMSG_STRING);
00330   const size_t sz = strlen(getMsg() + itsUpkidx);
00331   std::string result;
00332   result.resize(sz);
00333   unpack(, sz);
00334   char nullterm = 1;
00335   unpack(&nullterm, 1);
00336   ASSERT(nullterm == '\0');
00337   return result;
00338 }
00340 namespace
00341 {
00342   const char* tcpmsgFieldTypeName(int32 typ)
00343   {
00344     switch (typ)
00345       {
00346 #define DO_CASE(x) case x: return #x
00348         DO_CASE(TCPMSG_COLBYTEIMA); break;
00349         DO_CASE(TCPMSG_FIXATION); break;
00350         DO_CASE(TCPMSG_STRING); break;
00351         DO_CASE(TCPMSG_BYTEIMA); break;
00352         DO_CASE(TCPMSG_FLOATIMA); break;
00353         DO_CASE(TCPMSG_FLOATIMASET); break;
00354         DO_CASE(TCPMSG_INT32); break;
00355         DO_CASE(TCPMSG_FLOAT); break;
00356         DO_CASE(TCPMSG_DOUBLE); break;
00357         DO_CASE(TCPMSG_INT64); break;
00359 #undef DO_CASE
00360       }
00362     // default:
00363     return "UNKNOWN";
00364   }
00365 }
00367 // ######################################################################
00368 void TCPmessage::unpackAndVerifyType(int32 expected_type)
00369 {
00370   if (itsUpkidx >= itsHead.itsSize)
00371     LFATAL("Trying to unpack past message end");
00373   int32 typ; unpack(&typ, 1);
00374   if (typ != expected_type)
00375     LFATAL("expected TCPmessage field %s, but got %s",
00376            tcpmsgFieldTypeName(expected_type),
00377            tcpmsgFieldTypeName(typ));
00378 }
00380 // ######################################################################
00381 Image<PixRGB<byte> > TCPmessage::decodeColByteIma()
00382 {
00383   int32 xx[2]; unpack(xx, 2);
00384   Image< PixRGB<byte> > im(xx[0], xx[1], NO_INIT);
00385   unpack(im.getArrayPtr(), xx[0] * xx[1]);
00386   return im;
00387 }
00389 // ######################################################################
00390 Image<byte> TCPmessage::decodeByteIma()
00391 {
00392   int32 xx[2]; unpack(xx, 2);
00393   Image<byte> im(xx[0], xx[1], NO_INIT);
00394   unpack(im.getArrayPtr(), xx[0] * xx[1]);
00395   return im;
00396 }
00398 // ######################################################################
00399 Image<float> TCPmessage::decodeFloatIma()
00400 {
00401   int32 xx[2]; unpack(xx, 2);
00402   Image<float> im(xx[0], xx[1], NO_INIT);
00403   unpack(im.getArrayPtr(), xx[0] * xx[1]);
00404   return im;
00405 }
00407 // ######################################################################
00408 ImageSet<float> TCPmessage::decodeFloatImaSet()
00409 {
00410   int32 numimg; unpack(&numimg, 1);
00411   ImageSet<float> ims(numimg);
00412   for (int i = 0; i < numimg; i ++)
00413     // store the image into the set
00414     ims[i] = this->getElementFloatIma();
00415   return ims;
00416 }
00418 // ######################################################################
00419 Fixation TCPmessage::decodeFixation()
00420 {
00421   Fixation fix;
00422   unpack(&fix.i, 1);
00423   unpack(&fix.j, 1);
00424   unpack(&fix.frame, 1);
00425   return fix;
00426 }
00428 // ######################################################################
00429 int TCPmessage::readHeaderFrom(const int fd)
00430 {
00431   if (itsHeadIdx == 0) freeMem(); // delete old message if any
00432   int toread = int(sizeof(TCPmessageHeader) - itsHeadIdx);
00433   if (toread <= 0) { LERROR("Internal error!"); return TCPBUG; }
00434   int nb = read(fd, (char*)(&itsHead) + itsHeadIdx, toread);
00435   if (nb == toread) { // ok, done reading
00436     if (itsHead.itsSize > 0)
00437       {
00438         resize(itsHead.itsSize, false);  // allocate memory for body
00439         itsBusy = true;                  // ready to receive the body
00440         itsHeadIdx = 0;                  // not reading header anymore
00441       }
00442     return TCPDONE;                       // done with header part
00443   } else if (nb > 0) {  // got only partial header data
00444     itsHeadIdx += nb; return TCPWAITREAD;
00445   } else if (nb == -1) {                                        // error?
00446     if (errno == EAGAIN) return TCPWAITREAD;   // ok, nothing received; wait
00447     else { PLDEBUG("Read error"); return TCPBUG; }
00448   } else if (nb == 0) {
00449     LDEBUG("Peer closed connection. Abort."); return TCPBUG;
00450   } else { LERROR("What is that?"); return TCPBUG; }
00451 }
00453 // ######################################################################
00454 int TCPmessage::readFrom(const int fd)
00455 {
00456   if (!itsBusy) { LERROR("not busy?"); itsBusy = true; itsUpkidx = 0; }
00457   int n = itsHead.itsSize - itsUpkidx; // number of bytes to read
00458   if (n <= 0) LFATAL("Bogus read request for %d bytes",  n);
00459   int nb = read(fd, getMsg() + itsUpkidx, n);
00460   //LDEBUG("%d/%d bytes from %d", nb + itsUpkidx, itsHead.itsSize, fd);
00461   if (nb == n) {                                      // ok, done reading
00462     itsBusy = false; itsUpkidx = 0;
00463     //LDEBUG("Receive complete");
00464     return TCPDONE;
00465   } else if (nb > 0) {             // ok, we received some; wait for more
00466     itsUpkidx += nb; return TCPWAITREAD;
00467   } else if (nb == -1) {                                        // error?
00468     if (errno == EAGAIN) return TCPWAITREAD;   // ok, nothing received; wait
00469     else { PLDEBUG("Read error"); return TCPBUG; }
00470   } else if (nb == 0) {
00471     LDEBUG("Peer closed connection. Abort."); return TCPBUG;
00472   } else { LERROR("What is that?"); return TCPBUG; }
00473 }
00475 // ######################################################################
00476 int TCPmessage::writeHeaderTo(const int fd)
00477 {
00478   if (!itsBusy) { itsBusy = true; itsUpkidx = 0; }      // start transfer
00479   int nb = write(fd, &itsHead, sizeof(TCPmessageHeader));
00480   if (nb == sizeof(TCPmessageHeader)) {                 // ok, done writing
00481     // do we have an empty body?
00482     if (itsHead.itsSize <= 0) itsBusy = false;
00483     return TCPDONE;
00484   } else {
00485     // ideally we should wait & retry but this seems to never happen
00486     LERROR("Could not write header in one shot");
00487     return TCPBUG;
00488   }
00489 }
00491 // ######################################################################
00492 int TCPmessage::writeTo(const int fd)
00493 {
00494   if (!itsBusy) { itsBusy = true; itsUpkidx = 0; }      // start transfer
00495   int n = itsHead.itsSize - itsUpkidx; // number of bytes to write
00496   if (n <= 0) LFATAL("Bogus write request for %d bytes",  n);
00497   int nb = write(fd, getMsg() + itsUpkidx, n);
00498   //LDEBUG("%d/%d bytes to %d", nb + itsUpkidx, itsHead.itsSize, fd);
00499   if (nb == n) {                                      // ok, done writing
00500     //LDEBUG("Done writing %d/%d", itsHead.itsId, itsHead.itsAction);
00501     itsBusy = false; itsUpkidx = 0; return TCPDONE;
00502   } else if (nb > 0) {          // we could write some, but not all; wait
00503     itsUpkidx += nb; return TCPWAITWRITE;
00504   } else if (nb == -1) {                   // we could not write anything
00505     if (errno == EAGAIN) return TCPWAITWRITE;         // ok, we'll try again
00506     else { PLDEBUG("Write error"); return TCPBUG; }
00507   } else if (nb == 0) {
00508     LDEBUG("Peer closed connection. Abort."); return TCPBUG;
00509   } else { LERROR("What is that?"); return TCPBUG; }
00510 }
00512 // ######################################################################
00513 void TCPmessage::addShmInfo(const int shmid, const int siz)
00514 { int32 typ = TCPMSG_SHMINFO; pack(&typ, 1); pack(&shmid, 1); pack(&siz, 1); }
00516 // ######################################################################
00517 void TCPmessage::addShmDone(const int shmid)
00518 { int32 typ = TCPMSG_SHMDONE; pack(&typ, 1); pack(&shmid, 1); }
00520 // ######################################################################
00521 bool TCPmessage::checkShmInfo(int& shmid, int& siz) const
00522 {
00523   if (itsHead.itsSize < (int)sizeof(int32)) return false; // no message body
00524   const int32 *ptr = (const int32 *)getMsg();
00525   if (ptr[0] == TCPMSG_SHMINFO) { shmid = ptr[1]; siz = ptr[2]; return true; }
00526   return false;
00527 }
00529 // ######################################################################
00530 bool TCPmessage::checkShmDone(int& shmid) const
00531 {
00532   if (itsHead.itsSize < (int)sizeof(int32)) return false; // no message body
00533   const int32 *ptr = (const int32 *)getMsg();
00534   if (ptr[0] == TCPMSG_SHMDONE) { shmid = ptr[1]; return true; }
00535   return false;
00536 }
00538 // ######################################################################
00539 void TCPmessage::attach(char *msgbuf, const int siz)
00540 {
00541   ArrayHandle<char> h(new ArrayData<char>(Dims(siz, 1), msgbuf, WRITE_THRU));
00542   itsMsg.swap(h); itsHead.itsSize = siz; itsBusy = false; itsUpkidx = 0;
00543   //LDEBUG("attached to %d bytes at 0x%lx", siz, msgbuf);
00544 }
00546 // ######################################################################
00547 void TCPmessage::detach()
00548 {
00549   while (itsBusy) { LERROR("detach() while busy! Sleeping..."); sleep(1); }
00550   ArrayHandle<char> empty; itsMsg.swap(empty);
00551   itsBusy = false; itsHead.itsSize = 0; itsUpkidx = 0;
00552 }
00554 // ######################################################################
00555 /* So things look consistent in everyone's emacs... */
00556 /* Local Variables: */
00557 /* indent-tabs-mode: nil */
00558 /* End: */
Generated on Sun May 8 08:40:20 2011 for iLab Neuromorphic Vision Toolkit by  doxygen 1.6.3