00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include "Beowulf/TCPmessage.H"
00039
00040 #include "Beowulf/TCPdefs.H"
00041 #include "Image/Image.H"
00042 #include "Image/ImageSet.H"
00043 #include "Image/Pixels.H"
00044 #include "Util/log.H"
00045
00046 #include <arpa/inet.h>
00047 #include <errno.h>
00048 #include <netdb.h>
00049 #include <netinet/in.h>
00050 #include <unistd.h>
00051
00052
00053 #define MYLOGID itsHead.itsId
00054
00055
00056 TCPmessage::TCPmessage() :
00057 itsMsg(new ArrayData<char>())
00058 { itsBusy = false; reset(0, 0, 0.0F); }
00059
00060
00061 TCPmessage::TCPmessage(const int32 msgid, const int32 msgaction,
00062 const float msgeti) :
00063 itsMsg(new ArrayData<char>())
00064 { itsBusy = false; reset(msgid, msgaction, msgeti); }
00065
00066
00067 TCPmessage::TCPmessage(const int32 msgid, const int32 msgaction,
00068 const float msgeti,
00069 const void *buf, const int bufsize) :
00070 itsMsg(new ArrayData<char>())
00071 {
00072 itsBusy = false; reset(msgid, msgaction, msgeti);
00073 resize(bufsize, false);
00074 memcpy(getMsg(), buf, bufsize);
00075 itsUpkidx = 0; itsHead.itsSize = bufsize;
00076 }
00077
00078
00079 TCPmessage::TCPmessage(const TCPmessage& m) :
00080 itsMsg(m.itsMsg)
00081 { itsBusy = m.itsBusy; itsHead = m.itsHead; itsUpkidx = m.itsUpkidx; }
00082
00083
00084 TCPmessage& TCPmessage::operator=(const TCPmessage& m)
00085 {
00086 ArrayHandle<char> msgcopy(m.itsMsg); itsMsg.swap(msgcopy);
00087 itsBusy = m.itsBusy; itsHead = m.itsHead; itsUpkidx = m.itsUpkidx;
00088 return *this;
00089 }
00090
00091
00092 TCPmessage::~TCPmessage()
00093 { }
00094
00095
00096 void TCPmessage::addImage(const Image< PixRGB<byte> >& im)
00097 {
00098 int32 stuff[3];
00099 stuff[0] = TCPMSG_COLBYTEIMA; stuff[1] = im.getWidth();
00100 stuff[2] = im.getHeight(); pack(stuff, 3);
00101 pack(im.getArrayPtr(), im.getSize());
00102 }
00103
00104
00105 void TCPmessage::addImage(const Image<byte>& im)
00106 {
00107 int32 stuff[3];
00108 stuff[0] = TCPMSG_BYTEIMA; stuff[1] = im.getWidth();
00109 stuff[2] = im.getHeight(); pack(stuff, 3);
00110 pack(im.getArrayPtr(), im.getSize());
00111 }
00112
00113
00114 void TCPmessage::addImage(const Image<float>& im)
00115 {
00116 int32 stuff[3];
00117 stuff[0] = TCPMSG_FLOATIMA; stuff[1] = im.getWidth();
00118 stuff[2] = im.getHeight(); pack(stuff, 3);
00119 pack(im.getArrayPtr(), im.getSize());
00120 }
00121
00122
00123 void TCPmessage::addImageSet(const ImageSet<float>& im)
00124 {
00125 int32 stuff[2];
00126 stuff[0] = TCPMSG_FLOATIMASET; stuff[1] = im.size(); pack(stuff, 2);
00127 for (uint i = 0; i < im.size(); i ++) addImage(im[i]);
00128 }
00129
00130
00131 void TCPmessage::addFixation(const Fixation& fix)
00132 {
00133 int32 typ = TCPMSG_FIXATION;
00134 pack(&typ, 1); pack(&fix.i, 1); pack(&fix.j, 1); pack(&fix.frame, 1);
00135 }
00136
00137
00138 void TCPmessage::addString(const char* str)
00139 {
00140 int32 typ = TCPMSG_STRING; pack(&typ, 1); pack(str, strlen(str) + 1);
00141 }
00142
00143
00144 void TCPmessage::addInt32(const int32 val)
00145 {
00146 int32 typ = TCPMSG_INT32; pack(&typ, 1); pack(&val, 1);
00147 }
00148
00149
00150 void TCPmessage::addInt64(const int64 val)
00151 {
00152 int32 typ = TCPMSG_INT64; pack(&typ, 1); pack(&val, 1);
00153 }
00154
00155
00156 void TCPmessage::addFloat(const float val)
00157 {
00158 int32 typ = TCPMSG_FLOAT; pack(&typ, 1); pack(&val, 1);
00159 }
00160
00161
00162 void TCPmessage::addDouble(const double val)
00163 {
00164 int32 typ = TCPMSG_DOUBLE; pack(&typ, 1); pack(&val, 1);
00165 }
00166
00167
00168 void TCPmessage::reset(const int32 msgid, const int32 msgaction,
00169 const float msgeti)
00170 {
00171 freeMem();
00172 itsHead.itsId = msgid; itsHead.itsAction = msgaction;
00173 itsHead.itsETI = msgeti; itsHead.itsSize = 0;
00174 }
00175
00176
00177 void TCPmessage::freeMem()
00178 {
00179 while (itsBusy) { LERROR("freeMem() while busy! Sleeping..."); sleep(1); }
00180 ArrayHandle<char> h;
00181 itsMsg.swap(h);
00182 itsHead.itsId = 0; itsHead.itsAction = 0; itsHead.itsSize = 0;
00183 itsHead.itsETI = 0.0F; itsBusy = false; itsUpkidx = 0;
00184 itsHeadIdx = 0;
00185 }
00186
00187
00188 void TCPmessage::getElementRaw(void **elem, int32& typ)
00189 {
00190 if (itsUpkidx >= itsHead.itsSize)
00191 LFATAL("Trying to unpack past message end");
00192
00193 unpack(&typ, 1);
00194 switch(typ)
00195 {
00196 case TCPMSG_COLBYTEIMA:
00197 {
00198 Image<PixRGB<byte> >* im =
00199 new Image<PixRGB<byte> >(decodeColByteIma());
00200 *elem = static_cast<void*>(im);
00201 break;
00202 }
00203 case TCPMSG_BYTEIMA:
00204 {
00205 Image<byte>* im = new Image<byte>(decodeByteIma());
00206 *elem = static_cast<void*>(im);
00207 break;
00208 }
00209 case TCPMSG_FLOATIMA:
00210 {
00211 Image<float>* im = new Image<float>(decodeFloatIma());
00212 *elem = static_cast<void*>(im);
00213 break;
00214 }
00215 case TCPMSG_FLOATIMASET:
00216 {
00217 ImageSet<float>* ims = new ImageSet<float>(decodeFloatImaSet());
00218 *elem = static_cast<void*>(ims);
00219 break;
00220 }
00221 case TCPMSG_FIXATION:
00222 {
00223 Fixation* fix = new Fixation(decodeFixation());
00224 *elem = static_cast<void*>(fix);
00225 break;
00226 }
00227 case TCPMSG_STRING:
00228 {
00229 int32 s = strlen(getMsg() + itsUpkidx) + 1;
00230 char *str = new char[s];
00231 unpack(str, s); *elem = (void *)str;
00232 break;
00233 }
00234 case TCPMSG_INT32:
00235 {
00236 int32 *val = new int32; unpack(val, 1);
00237 *elem = (void *)val;
00238 break;
00239 }
00240 case TCPMSG_FLOAT:
00241 {
00242 float *val = new float; unpack(val, 1);
00243 *elem = (void *)val;
00244 break;
00245 }
00246 case TCPMSG_DOUBLE:
00247 {
00248 double *val = new double; unpack(val, 1);
00249 *elem = (void *)val;
00250 break;
00251 }
00252 case TCPMSG_INT64:
00253 {
00254 int64 *val = new int64; unpack(val, 1);
00255 *elem = (void *)val;
00256 break;
00257 }
00258 default:
00259 LFATAL("Bogus element %d", typ);
00260 }
00261 }
00262
00263
00264 Image<PixRGB<byte> > TCPmessage::getElementColByteIma()
00265 {
00266 unpackAndVerifyType(TCPMSG_COLBYTEIMA);
00267 return decodeColByteIma();
00268 }
00269
00270
00271 Image<byte> TCPmessage::getElementByteIma()
00272 {
00273 unpackAndVerifyType(TCPMSG_BYTEIMA);
00274 return decodeByteIma();
00275 }
00276
00277
00278 Image<float> TCPmessage::getElementFloatIma()
00279 {
00280 unpackAndVerifyType(TCPMSG_FLOATIMA);
00281 return decodeFloatIma();
00282 }
00283
00284
00285 ImageSet<float> TCPmessage::getElementFloatImaSet()
00286 {
00287 unpackAndVerifyType(TCPMSG_FLOATIMASET);
00288 return decodeFloatImaSet();
00289 }
00290
00291
00292 int32 TCPmessage::getElementInt32()
00293 {
00294 unpackAndVerifyType(TCPMSG_INT32);
00295 int32 result; unpack(&result, 1); return result;
00296 }
00297
00298
00299 int64 TCPmessage::getElementInt64()
00300 {
00301 unpackAndVerifyType(TCPMSG_INT64);
00302 int64 result; unpack(&result, 1); return result;
00303 }
00304
00305
00306 double TCPmessage::getElementDouble()
00307 {
00308 unpackAndVerifyType(TCPMSG_DOUBLE);
00309 double result; unpack(&result, 1); return result;
00310 }
00311
00312
00313 float TCPmessage::getElementFloat()
00314 {
00315 unpackAndVerifyType(TCPMSG_FLOAT);
00316 float result; unpack(&result, 1); return result;
00317 }
00318
00319
00320 Fixation TCPmessage::getElementFixation()
00321 {
00322 unpackAndVerifyType(TCPMSG_FIXATION);
00323 return decodeFixation();
00324 }
00325
00326
00327 std::string TCPmessage::getElementString()
00328 {
00329 unpackAndVerifyType(TCPMSG_STRING);
00330 const size_t sz = strlen(getMsg() + itsUpkidx);
00331 std::string result;
00332 result.resize(sz);
00333 unpack(result.data(), sz);
00334 char nullterm = 1;
00335 unpack(&nullterm, 1);
00336 ASSERT(nullterm == '\0');
00337 return result;
00338 }
00339
00340 namespace
00341 {
00342 const char* tcpmsgFieldTypeName(int32 typ)
00343 {
00344 switch (typ)
00345 {
00346 #define DO_CASE(x) case x: return #x
00347
00348 DO_CASE(TCPMSG_COLBYTEIMA); break;
00349 DO_CASE(TCPMSG_FIXATION); break;
00350 DO_CASE(TCPMSG_STRING); break;
00351 DO_CASE(TCPMSG_BYTEIMA); break;
00352 DO_CASE(TCPMSG_FLOATIMA); break;
00353 DO_CASE(TCPMSG_FLOATIMASET); break;
00354 DO_CASE(TCPMSG_INT32); break;
00355 DO_CASE(TCPMSG_FLOAT); break;
00356 DO_CASE(TCPMSG_DOUBLE); break;
00357 DO_CASE(TCPMSG_INT64); break;
00358
00359 #undef DO_CASE
00360 }
00361
00362
00363 return "UNKNOWN";
00364 }
00365 }
00366
00367
00368 void TCPmessage::unpackAndVerifyType(int32 expected_type)
00369 {
00370 if (itsUpkidx >= itsHead.itsSize)
00371 LFATAL("Trying to unpack past message end");
00372
00373 int32 typ; unpack(&typ, 1);
00374 if (typ != expected_type)
00375 LFATAL("expected TCPmessage field %s, but got %s",
00376 tcpmsgFieldTypeName(expected_type),
00377 tcpmsgFieldTypeName(typ));
00378 }
00379
00380
00381 Image<PixRGB<byte> > TCPmessage::decodeColByteIma()
00382 {
00383 int32 xx[2]; unpack(xx, 2);
00384 Image< PixRGB<byte> > im(xx[0], xx[1], NO_INIT);
00385 unpack(im.getArrayPtr(), xx[0] * xx[1]);
00386 return im;
00387 }
00388
00389
00390 Image<byte> TCPmessage::decodeByteIma()
00391 {
00392 int32 xx[2]; unpack(xx, 2);
00393 Image<byte> im(xx[0], xx[1], NO_INIT);
00394 unpack(im.getArrayPtr(), xx[0] * xx[1]);
00395 return im;
00396 }
00397
00398
00399 Image<float> TCPmessage::decodeFloatIma()
00400 {
00401 int32 xx[2]; unpack(xx, 2);
00402 Image<float> im(xx[0], xx[1], NO_INIT);
00403 unpack(im.getArrayPtr(), xx[0] * xx[1]);
00404 return im;
00405 }
00406
00407
00408 ImageSet<float> TCPmessage::decodeFloatImaSet()
00409 {
00410 int32 numimg; unpack(&numimg, 1);
00411 ImageSet<float> ims(numimg);
00412 for (int i = 0; i < numimg; i ++)
00413
00414 ims[i] = this->getElementFloatIma();
00415 return ims;
00416 }
00417
00418
00419 Fixation TCPmessage::decodeFixation()
00420 {
00421 Fixation fix;
00422 unpack(&fix.i, 1);
00423 unpack(&fix.j, 1);
00424 unpack(&fix.frame, 1);
00425 return fix;
00426 }
00427
00428
00429 int TCPmessage::readHeaderFrom(const int fd)
00430 {
00431 if (itsHeadIdx == 0) freeMem();
00432 int toread = int(sizeof(TCPmessageHeader) - itsHeadIdx);
00433 if (toread <= 0) { LERROR("Internal error!"); return TCPBUG; }
00434 int nb = read(fd, (char*)(&itsHead) + itsHeadIdx, toread);
00435 if (nb == toread) {
00436 if (itsHead.itsSize > 0)
00437 {
00438 resize(itsHead.itsSize, false);
00439 itsBusy = true;
00440 itsHeadIdx = 0;
00441 }
00442 return TCPDONE;
00443 } else if (nb > 0) {
00444 itsHeadIdx += nb; return TCPWAITREAD;
00445 } else if (nb == -1) {
00446 if (errno == EAGAIN) return TCPWAITREAD;
00447 else { PLDEBUG("Read error"); return TCPBUG; }
00448 } else if (nb == 0) {
00449 LDEBUG("Peer closed connection. Abort."); return TCPBUG;
00450 } else { LERROR("What is that?"); return TCPBUG; }
00451 }
00452
00453
00454 int TCPmessage::readFrom(const int fd)
00455 {
00456 if (!itsBusy) { LERROR("not busy?"); itsBusy = true; itsUpkidx = 0; }
00457 int n = itsHead.itsSize - itsUpkidx;
00458 if (n <= 0) LFATAL("Bogus read request for %d bytes", n);
00459 int nb = read(fd, getMsg() + itsUpkidx, n);
00460
00461 if (nb == n) {
00462 itsBusy = false; itsUpkidx = 0;
00463
00464 return TCPDONE;
00465 } else if (nb > 0) {
00466 itsUpkidx += nb; return TCPWAITREAD;
00467 } else if (nb == -1) {
00468 if (errno == EAGAIN) return TCPWAITREAD;
00469 else { PLDEBUG("Read error"); return TCPBUG; }
00470 } else if (nb == 0) {
00471 LDEBUG("Peer closed connection. Abort."); return TCPBUG;
00472 } else { LERROR("What is that?"); return TCPBUG; }
00473 }
00474
00475
00476 int TCPmessage::writeHeaderTo(const int fd)
00477 {
00478 if (!itsBusy) { itsBusy = true; itsUpkidx = 0; }
00479 int nb = write(fd, &itsHead, sizeof(TCPmessageHeader));
00480 if (nb == sizeof(TCPmessageHeader)) {
00481
00482 if (itsHead.itsSize <= 0) itsBusy = false;
00483 return TCPDONE;
00484 } else {
00485
00486 LERROR("Could not write header in one shot");
00487 return TCPBUG;
00488 }
00489 }
00490
00491
00492 int TCPmessage::writeTo(const int fd)
00493 {
00494 if (!itsBusy) { itsBusy = true; itsUpkidx = 0; }
00495 int n = itsHead.itsSize - itsUpkidx;
00496 if (n <= 0) LFATAL("Bogus write request for %d bytes", n);
00497 int nb = write(fd, getMsg() + itsUpkidx, n);
00498
00499 if (nb == n) {
00500
00501 itsBusy = false; itsUpkidx = 0; return TCPDONE;
00502 } else if (nb > 0) {
00503 itsUpkidx += nb; return TCPWAITWRITE;
00504 } else if (nb == -1) {
00505 if (errno == EAGAIN) return TCPWAITWRITE;
00506 else { PLDEBUG("Write error"); return TCPBUG; }
00507 } else if (nb == 0) {
00508 LDEBUG("Peer closed connection. Abort."); return TCPBUG;
00509 } else { LERROR("What is that?"); return TCPBUG; }
00510 }
00511
00512
00513 void TCPmessage::addShmInfo(const int shmid, const int siz)
00514 { int32 typ = TCPMSG_SHMINFO; pack(&typ, 1); pack(&shmid, 1); pack(&siz, 1); }
00515
00516
00517 void TCPmessage::addShmDone(const int shmid)
00518 { int32 typ = TCPMSG_SHMDONE; pack(&typ, 1); pack(&shmid, 1); }
00519
00520
00521 bool TCPmessage::checkShmInfo(int& shmid, int& siz) const
00522 {
00523 if (itsHead.itsSize < (int)sizeof(int32)) return false;
00524 const int32 *ptr = (const int32 *)getMsg();
00525 if (ptr[0] == TCPMSG_SHMINFO) { shmid = ptr[1]; siz = ptr[2]; return true; }
00526 return false;
00527 }
00528
00529
00530 bool TCPmessage::checkShmDone(int& shmid) const
00531 {
00532 if (itsHead.itsSize < (int)sizeof(int32)) return false;
00533 const int32 *ptr = (const int32 *)getMsg();
00534 if (ptr[0] == TCPMSG_SHMDONE) { shmid = ptr[1]; return true; }
00535 return false;
00536 }
00537
00538
00539 void TCPmessage::attach(char *msgbuf, const int siz)
00540 {
00541 ArrayHandle<char> h(new ArrayData<char>(Dims(siz, 1), msgbuf, WRITE_THRU));
00542 itsMsg.swap(h); itsHead.itsSize = siz; itsBusy = false; itsUpkidx = 0;
00543
00544 }
00545
00546
00547 void TCPmessage::detach()
00548 {
00549 while (itsBusy) { LERROR("detach() while busy! Sleeping..."); sleep(1); }
00550 ArrayHandle<char> empty; itsMsg.swap(empty);
00551 itsBusy = false; itsHead.itsSize = 0; itsUpkidx = 0;
00552 }
00553
00554
00555
00556
00557
00558