00001 /*!@file Beowulf/TCPmessage.C Direct message passing over TCP connections */ 00002 00003 // //////////////////////////////////////////////////////////////////// // 00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2001 by the // 00005 // University of Southern California (USC) and the iLab at USC. // 00006 // See http://iLab.usc.edu for information about this project. // 00007 // //////////////////////////////////////////////////////////////////// // 00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected // 00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency // 00010 // in Visual Environments, and Applications'' by Christof Koch and // 00011 // Laurent Itti, California Institute of Technology, 2001 (patent // 00012 // pending; application number 09/912,225 filed July 23, 2001; see // 00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status). // 00014 // //////////////////////////////////////////////////////////////////// // 00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit. // 00016 // // 00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can // 00018 // redistribute it and/or modify it under the terms of the GNU General // 00019 // Public License as published by the Free Software Foundation; either // 00020 // version 2 of the License, or (at your option) any later version. // 00021 // // 00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope // 00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the // 00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // 00025 // PURPOSE. See the GNU General Public License for more details. // 00026 // // 00027 // You should have received a copy of the GNU General Public License // 00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write // 00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, // 00030 // Boston, MA 02111-1307 USA. // 00031 // //////////////////////////////////////////////////////////////////// // 00032 // 00033 // Primary maintainer for this file: Laurent Itti <itti@usc.edu> 00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Beowulf/TCPmessage.C $ 00035 // $Id: TCPmessage.C 11538 2009-07-30 06:23:37Z itti $ 00036 // 00037 00038 #include "Beowulf/TCPmessage.H" 00039 00040 #include "Beowulf/TCPdefs.H" 00041 #include "Image/Image.H" 00042 #include "Image/ImageSet.H" 00043 #include "Image/Pixels.H" 00044 #include "Util/log.H" 00045 00046 #include <arpa/inet.h> 00047 #include <errno.h> 00048 #include <netdb.h> 00049 #include <netinet/in.h> 00050 #include <unistd.h> 00051 00052 //! use message ID for ID-logging; see log.H: 00053 #define MYLOGID itsHead.itsId 00054 00055 // ###################################################################### 00056 TCPmessage::TCPmessage() : 00057 itsMsg(new ArrayData<char>()) 00058 { itsBusy = false; reset(0, 0, 0.0F); } 00059 00060 // ###################################################################### 00061 TCPmessage::TCPmessage(const int32 msgid, const int32 msgaction, 00062 const float msgeti) : 00063 itsMsg(new ArrayData<char>()) 00064 { itsBusy = false; reset(msgid, msgaction, msgeti); } 00065 00066 // ###################################################################### 00067 TCPmessage::TCPmessage(const int32 msgid, const int32 msgaction, 00068 const float msgeti, 00069 const void *buf, const int bufsize) : 00070 itsMsg(new ArrayData<char>()) 00071 { 00072 itsBusy = false; reset(msgid, msgaction, msgeti); 00073 resize(bufsize, false); 00074 memcpy(getMsg(), buf, bufsize); 00075 itsUpkidx = 0; itsHead.itsSize = bufsize; 00076 } 00077 00078 // ###################################################################### 00079 TCPmessage::TCPmessage(const TCPmessage& m) : 00080 itsMsg(m.itsMsg) 00081 { itsBusy = m.itsBusy; itsHead = m.itsHead; itsUpkidx = m.itsUpkidx; } 00082 00083 // ###################################################################### 00084 TCPmessage& TCPmessage::operator=(const TCPmessage& m) 00085 { 00086 ArrayHandle<char> msgcopy(m.itsMsg); itsMsg.swap(msgcopy); 00087 itsBusy = m.itsBusy; itsHead = m.itsHead; itsUpkidx = m.itsUpkidx; 00088 return *this; 00089 } 00090 00091 // ###################################################################### 00092 TCPmessage::~TCPmessage() 00093 { } 00094 00095 // ###################################################################### 00096 void TCPmessage::addImage(const Image< PixRGB<byte> >& im) 00097 { 00098 int32 stuff[3]; 00099 stuff[0] = TCPMSG_COLBYTEIMA; stuff[1] = im.getWidth(); 00100 stuff[2] = im.getHeight(); pack(stuff, 3); 00101 pack(im.getArrayPtr(), im.getSize()); 00102 } 00103 00104 // ###################################################################### 00105 void TCPmessage::addImage(const Image<byte>& im) 00106 { 00107 int32 stuff[3]; 00108 stuff[0] = TCPMSG_BYTEIMA; stuff[1] = im.getWidth(); 00109 stuff[2] = im.getHeight(); pack(stuff, 3); 00110 pack(im.getArrayPtr(), im.getSize()); 00111 } 00112 00113 // ###################################################################### 00114 void TCPmessage::addImage(const Image<float>& im) 00115 { 00116 int32 stuff[3]; 00117 stuff[0] = TCPMSG_FLOATIMA; stuff[1] = im.getWidth(); 00118 stuff[2] = im.getHeight(); pack(stuff, 3); 00119 pack(im.getArrayPtr(), im.getSize()); 00120 } 00121 00122 // ###################################################################### 00123 void TCPmessage::addImageSet(const ImageSet<float>& im) 00124 { 00125 int32 stuff[2]; 00126 stuff[0] = TCPMSG_FLOATIMASET; stuff[1] = im.size(); pack(stuff, 2); 00127 for (uint i = 0; i < im.size(); i ++) addImage(im[i]); 00128 } 00129 00130 // ###################################################################### 00131 void TCPmessage::addFixation(const Fixation& fix) 00132 { 00133 int32 typ = TCPMSG_FIXATION; 00134 pack(&typ, 1); pack(&fix.i, 1); pack(&fix.j, 1); pack(&fix.frame, 1); 00135 } 00136 00137 // ###################################################################### 00138 void TCPmessage::addString(const char* str) 00139 { 00140 int32 typ = TCPMSG_STRING; pack(&typ, 1); pack(str, strlen(str) + 1); 00141 } 00142 00143 // ###################################################################### 00144 void TCPmessage::addInt32(const int32 val) 00145 { 00146 int32 typ = TCPMSG_INT32; pack(&typ, 1); pack(&val, 1); 00147 } 00148 00149 // ###################################################################### 00150 void TCPmessage::addInt64(const int64 val) 00151 { 00152 int32 typ = TCPMSG_INT64; pack(&typ, 1); pack(&val, 1); 00153 } 00154 00155 // ###################################################################### 00156 void TCPmessage::addFloat(const float val) 00157 { 00158 int32 typ = TCPMSG_FLOAT; pack(&typ, 1); pack(&val, 1); 00159 } 00160 00161 // ###################################################################### 00162 void TCPmessage::addDouble(const double val) 00163 { 00164 int32 typ = TCPMSG_DOUBLE; pack(&typ, 1); pack(&val, 1); 00165 } 00166 00167 // ###################################################################### 00168 void TCPmessage::reset(const int32 msgid, const int32 msgaction, 00169 const float msgeti) 00170 { 00171 freeMem(); // delete old message if any 00172 itsHead.itsId = msgid; itsHead.itsAction = msgaction; 00173 itsHead.itsETI = msgeti; itsHead.itsSize = 0; 00174 } 00175 00176 // ###################################################################### 00177 void TCPmessage::freeMem() 00178 { 00179 while (itsBusy) { LERROR("freeMem() while busy! Sleeping..."); sleep(1); } 00180 ArrayHandle<char> h; // handle with empty array 00181 itsMsg.swap(h); 00182 itsHead.itsId = 0; itsHead.itsAction = 0; itsHead.itsSize = 0; 00183 itsHead.itsETI = 0.0F; itsBusy = false; itsUpkidx = 0; 00184 itsHeadIdx = 0; 00185 } 00186 00187 // ###################################################################### 00188 void TCPmessage::getElementRaw(void **elem, int32& typ) 00189 { 00190 if (itsUpkidx >= itsHead.itsSize) 00191 LFATAL("Trying to unpack past message end"); 00192 00193 unpack(&typ, 1); // get element type 00194 switch(typ) 00195 { 00196 case TCPMSG_COLBYTEIMA: 00197 { 00198 Image<PixRGB<byte> >* im = 00199 new Image<PixRGB<byte> >(decodeColByteIma()); 00200 *elem = static_cast<void*>(im); 00201 break; 00202 } 00203 case TCPMSG_BYTEIMA: 00204 { 00205 Image<byte>* im = new Image<byte>(decodeByteIma()); 00206 *elem = static_cast<void*>(im); 00207 break; 00208 } 00209 case TCPMSG_FLOATIMA: 00210 { 00211 Image<float>* im = new Image<float>(decodeFloatIma()); 00212 *elem = static_cast<void*>(im); 00213 break; 00214 } 00215 case TCPMSG_FLOATIMASET: 00216 { 00217 ImageSet<float>* ims = new ImageSet<float>(decodeFloatImaSet()); 00218 *elem = static_cast<void*>(ims); 00219 break; 00220 } 00221 case TCPMSG_FIXATION: 00222 { 00223 Fixation* fix = new Fixation(decodeFixation()); 00224 *elem = static_cast<void*>(fix); 00225 break; 00226 } 00227 case TCPMSG_STRING: 00228 { 00229 int32 s = strlen(getMsg() + itsUpkidx) + 1; 00230 char *str = new char[s]; 00231 unpack(str, s); *elem = (void *)str; 00232 break; 00233 } 00234 case TCPMSG_INT32: 00235 { 00236 int32 *val = new int32; unpack(val, 1); 00237 *elem = (void *)val; 00238 break; 00239 } 00240 case TCPMSG_FLOAT: 00241 { 00242 float *val = new float; unpack(val, 1); 00243 *elem = (void *)val; 00244 break; 00245 } 00246 case TCPMSG_DOUBLE: 00247 { 00248 double *val = new double; unpack(val, 1); 00249 *elem = (void *)val; 00250 break; 00251 } 00252 case TCPMSG_INT64: 00253 { 00254 int64 *val = new int64; unpack(val, 1); 00255 *elem = (void *)val; 00256 break; 00257 } 00258 default: 00259 LFATAL("Bogus element %d", typ); 00260 } 00261 } 00262 00263 // ###################################################################### 00264 Image<PixRGB<byte> > TCPmessage::getElementColByteIma() 00265 { 00266 unpackAndVerifyType(TCPMSG_COLBYTEIMA); 00267 return decodeColByteIma(); 00268 } 00269 00270 // ###################################################################### 00271 Image<byte> TCPmessage::getElementByteIma() 00272 { 00273 unpackAndVerifyType(TCPMSG_BYTEIMA); 00274 return decodeByteIma(); 00275 } 00276 00277 // ###################################################################### 00278 Image<float> TCPmessage::getElementFloatIma() 00279 { 00280 unpackAndVerifyType(TCPMSG_FLOATIMA); 00281 return decodeFloatIma(); 00282 } 00283 00284 // ###################################################################### 00285 ImageSet<float> TCPmessage::getElementFloatImaSet() 00286 { 00287 unpackAndVerifyType(TCPMSG_FLOATIMASET); 00288 return decodeFloatImaSet(); 00289 } 00290 00291 // ###################################################################### 00292 int32 TCPmessage::getElementInt32() 00293 { 00294 unpackAndVerifyType(TCPMSG_INT32); 00295 int32 result; unpack(&result, 1); return result; 00296 } 00297 00298 // ###################################################################### 00299 int64 TCPmessage::getElementInt64() 00300 { 00301 unpackAndVerifyType(TCPMSG_INT64); 00302 int64 result; unpack(&result, 1); return result; 00303 } 00304 00305 // ###################################################################### 00306 double TCPmessage::getElementDouble() 00307 { 00308 unpackAndVerifyType(TCPMSG_DOUBLE); 00309 double result; unpack(&result, 1); return result; 00310 } 00311 00312 // ###################################################################### 00313 float TCPmessage::getElementFloat() 00314 { 00315 unpackAndVerifyType(TCPMSG_FLOAT); 00316 float result; unpack(&result, 1); return result; 00317 } 00318 00319 // ###################################################################### 00320 Fixation TCPmessage::getElementFixation() 00321 { 00322 unpackAndVerifyType(TCPMSG_FIXATION); 00323 return decodeFixation(); 00324 } 00325 00326 // ###################################################################### 00327 std::string TCPmessage::getElementString() 00328 { 00329 unpackAndVerifyType(TCPMSG_STRING); 00330 const size_t sz = strlen(getMsg() + itsUpkidx); 00331 std::string result; 00332 result.resize(sz); 00333 unpack(result.data(), sz); 00334 char nullterm = 1; 00335 unpack(&nullterm, 1); 00336 ASSERT(nullterm == '\0'); 00337 return result; 00338 } 00339 00340 namespace 00341 { 00342 const char* tcpmsgFieldTypeName(int32 typ) 00343 { 00344 switch (typ) 00345 { 00346 #define DO_CASE(x) case x: return #x 00347 00348 DO_CASE(TCPMSG_COLBYTEIMA); break; 00349 DO_CASE(TCPMSG_FIXATION); break; 00350 DO_CASE(TCPMSG_STRING); break; 00351 DO_CASE(TCPMSG_BYTEIMA); break; 00352 DO_CASE(TCPMSG_FLOATIMA); break; 00353 DO_CASE(TCPMSG_FLOATIMASET); break; 00354 DO_CASE(TCPMSG_INT32); break; 00355 DO_CASE(TCPMSG_FLOAT); break; 00356 DO_CASE(TCPMSG_DOUBLE); break; 00357 DO_CASE(TCPMSG_INT64); break; 00358 00359 #undef DO_CASE 00360 } 00361 00362 // default: 00363 return "UNKNOWN"; 00364 } 00365 } 00366 00367 // ###################################################################### 00368 void TCPmessage::unpackAndVerifyType(int32 expected_type) 00369 { 00370 if (itsUpkidx >= itsHead.itsSize) 00371 LFATAL("Trying to unpack past message end"); 00372 00373 int32 typ; unpack(&typ, 1); 00374 if (typ != expected_type) 00375 LFATAL("expected TCPmessage field %s, but got %s", 00376 tcpmsgFieldTypeName(expected_type), 00377 tcpmsgFieldTypeName(typ)); 00378 } 00379 00380 // ###################################################################### 00381 Image<PixRGB<byte> > TCPmessage::decodeColByteIma() 00382 { 00383 int32 xx[2]; unpack(xx, 2); 00384 Image< PixRGB<byte> > im(xx[0], xx[1], NO_INIT); 00385 unpack(im.getArrayPtr(), xx[0] * xx[1]); 00386 return im; 00387 } 00388 00389 // ###################################################################### 00390 Image<byte> TCPmessage::decodeByteIma() 00391 { 00392 int32 xx[2]; unpack(xx, 2); 00393 Image<byte> im(xx[0], xx[1], NO_INIT); 00394 unpack(im.getArrayPtr(), xx[0] * xx[1]); 00395 return im; 00396 } 00397 00398 // ###################################################################### 00399 Image<float> TCPmessage::decodeFloatIma() 00400 { 00401 int32 xx[2]; unpack(xx, 2); 00402 Image<float> im(xx[0], xx[1], NO_INIT); 00403 unpack(im.getArrayPtr(), xx[0] * xx[1]); 00404 return im; 00405 } 00406 00407 // ###################################################################### 00408 ImageSet<float> TCPmessage::decodeFloatImaSet() 00409 { 00410 int32 numimg; unpack(&numimg, 1); 00411 ImageSet<float> ims(numimg); 00412 for (int i = 0; i < numimg; i ++) 00413 // store the image into the set 00414 ims[i] = this->getElementFloatIma(); 00415 return ims; 00416 } 00417 00418 // ###################################################################### 00419 Fixation TCPmessage::decodeFixation() 00420 { 00421 Fixation fix; 00422 unpack(&fix.i, 1); 00423 unpack(&fix.j, 1); 00424 unpack(&fix.frame, 1); 00425 return fix; 00426 } 00427 00428 // ###################################################################### 00429 int TCPmessage::readHeaderFrom(const int fd) 00430 { 00431 if (itsHeadIdx == 0) freeMem(); // delete old message if any 00432 int toread = int(sizeof(TCPmessageHeader) - itsHeadIdx); 00433 if (toread <= 0) { LERROR("Internal error!"); return TCPBUG; } 00434 int nb = read(fd, (char*)(&itsHead) + itsHeadIdx, toread); 00435 if (nb == toread) { // ok, done reading 00436 if (itsHead.itsSize > 0) 00437 { 00438 resize(itsHead.itsSize, false); // allocate memory for body 00439 itsBusy = true; // ready to receive the body 00440 itsHeadIdx = 0; // not reading header anymore 00441 } 00442 return TCPDONE; // done with header part 00443 } else if (nb > 0) { // got only partial header data 00444 itsHeadIdx += nb; return TCPWAITREAD; 00445 } else if (nb == -1) { // error? 00446 if (errno == EAGAIN) return TCPWAITREAD; // ok, nothing received; wait 00447 else { PLDEBUG("Read error"); return TCPBUG; } 00448 } else if (nb == 0) { 00449 LDEBUG("Peer closed connection. Abort."); return TCPBUG; 00450 } else { LERROR("What is that?"); return TCPBUG; } 00451 } 00452 00453 // ###################################################################### 00454 int TCPmessage::readFrom(const int fd) 00455 { 00456 if (!itsBusy) { LERROR("not busy?"); itsBusy = true; itsUpkidx = 0; } 00457 int n = itsHead.itsSize - itsUpkidx; // number of bytes to read 00458 if (n <= 0) LFATAL("Bogus read request for %d bytes", n); 00459 int nb = read(fd, getMsg() + itsUpkidx, n); 00460 //LDEBUG("%d/%d bytes from %d", nb + itsUpkidx, itsHead.itsSize, fd); 00461 if (nb == n) { // ok, done reading 00462 itsBusy = false; itsUpkidx = 0; 00463 //LDEBUG("Receive complete"); 00464 return TCPDONE; 00465 } else if (nb > 0) { // ok, we received some; wait for more 00466 itsUpkidx += nb; return TCPWAITREAD; 00467 } else if (nb == -1) { // error? 00468 if (errno == EAGAIN) return TCPWAITREAD; // ok, nothing received; wait 00469 else { PLDEBUG("Read error"); return TCPBUG; } 00470 } else if (nb == 0) { 00471 LDEBUG("Peer closed connection. Abort."); return TCPBUG; 00472 } else { LERROR("What is that?"); return TCPBUG; } 00473 } 00474 00475 // ###################################################################### 00476 int TCPmessage::writeHeaderTo(const int fd) 00477 { 00478 if (!itsBusy) { itsBusy = true; itsUpkidx = 0; } // start transfer 00479 int nb = write(fd, &itsHead, sizeof(TCPmessageHeader)); 00480 if (nb == sizeof(TCPmessageHeader)) { // ok, done writing 00481 // do we have an empty body? 00482 if (itsHead.itsSize <= 0) itsBusy = false; 00483 return TCPDONE; 00484 } else { 00485 // ideally we should wait & retry but this seems to never happen 00486 LERROR("Could not write header in one shot"); 00487 return TCPBUG; 00488 } 00489 } 00490 00491 // ###################################################################### 00492 int TCPmessage::writeTo(const int fd) 00493 { 00494 if (!itsBusy) { itsBusy = true; itsUpkidx = 0; } // start transfer 00495 int n = itsHead.itsSize - itsUpkidx; // number of bytes to write 00496 if (n <= 0) LFATAL("Bogus write request for %d bytes", n); 00497 int nb = write(fd, getMsg() + itsUpkidx, n); 00498 //LDEBUG("%d/%d bytes to %d", nb + itsUpkidx, itsHead.itsSize, fd); 00499 if (nb == n) { // ok, done writing 00500 //LDEBUG("Done writing %d/%d", itsHead.itsId, itsHead.itsAction); 00501 itsBusy = false; itsUpkidx = 0; return TCPDONE; 00502 } else if (nb > 0) { // we could write some, but not all; wait 00503 itsUpkidx += nb; return TCPWAITWRITE; 00504 } else if (nb == -1) { // we could not write anything 00505 if (errno == EAGAIN) return TCPWAITWRITE; // ok, we'll try again 00506 else { PLDEBUG("Write error"); return TCPBUG; } 00507 } else if (nb == 0) { 00508 LDEBUG("Peer closed connection. Abort."); return TCPBUG; 00509 } else { LERROR("What is that?"); return TCPBUG; } 00510 } 00511 00512 // ###################################################################### 00513 void TCPmessage::addShmInfo(const int shmid, const int siz) 00514 { int32 typ = TCPMSG_SHMINFO; pack(&typ, 1); pack(&shmid, 1); pack(&siz, 1); } 00515 00516 // ###################################################################### 00517 void TCPmessage::addShmDone(const int shmid) 00518 { int32 typ = TCPMSG_SHMDONE; pack(&typ, 1); pack(&shmid, 1); } 00519 00520 // ###################################################################### 00521 bool TCPmessage::checkShmInfo(int& shmid, int& siz) const 00522 { 00523 if (itsHead.itsSize < (int)sizeof(int32)) return false; // no message body 00524 const int32 *ptr = (const int32 *)getMsg(); 00525 if (ptr[0] == TCPMSG_SHMINFO) { shmid = ptr[1]; siz = ptr[2]; return true; } 00526 return false; 00527 } 00528 00529 // ###################################################################### 00530 bool TCPmessage::checkShmDone(int& shmid) const 00531 { 00532 if (itsHead.itsSize < (int)sizeof(int32)) return false; // no message body 00533 const int32 *ptr = (const int32 *)getMsg(); 00534 if (ptr[0] == TCPMSG_SHMDONE) { shmid = ptr[1]; return true; } 00535 return false; 00536 } 00537 00538 // ###################################################################### 00539 void TCPmessage::attach(char *msgbuf, const int siz) 00540 { 00541 ArrayHandle<char> h(new ArrayData<char>(Dims(siz, 1), msgbuf, WRITE_THRU)); 00542 itsMsg.swap(h); itsHead.itsSize = siz; itsBusy = false; itsUpkidx = 0; 00543 //LDEBUG("attached to %d bytes at 0x%lx", siz, msgbuf); 00544 } 00545 00546 // ###################################################################### 00547 void TCPmessage::detach() 00548 { 00549 while (itsBusy) { LERROR("detach() while busy! Sleeping..."); sleep(1); } 00550 ArrayHandle<char> empty; itsMsg.swap(empty); 00551 itsBusy = false; itsHead.itsSize = 0; itsUpkidx = 0; 00552 } 00553 00554 // ###################################################################### 00555 /* So things look consistent in everyone's emacs... */ 00556 /* Local Variables: */ 00557 /* indent-tabs-mode: nil */ 00558 /* End: */