00001 /*!@file Beowulf/TCPmessage.H Direct message passing over TCP connections */ 00002 00003 // //////////////////////////////////////////////////////////////////// // 00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2001 by the // 00005 // University of Southern California (USC) and the iLab at USC. // 00006 // See http://iLab.usc.edu for information about this project. // 00007 // //////////////////////////////////////////////////////////////////// // 00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected // 00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency // 00010 // in Visual Environments, and Applications'' by Christof Koch and // 00011 // Laurent Itti, California Institute of Technology, 2001 (patent // 00012 // pending; application number 09/912,225 filed July 23, 2001; see // 00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status). // 00014 // //////////////////////////////////////////////////////////////////// // 00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit. // 00016 // // 00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can // 00018 // redistribute it and/or modify it under the terms of the GNU General // 00019 // Public License as published by the Free Software Foundation; either // 00020 // version 2 of the License, or (at your option) any later version. // 00021 // // 00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope // 00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the // 00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // 00025 // PURPOSE. See the GNU General Public License for more details. // 00026 // // 00027 // You should have received a copy of the GNU General Public License // 00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write // 00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, // 00030 // Boston, MA 02111-1307 USA. // 00031 // //////////////////////////////////////////////////////////////////// // 00032 // 00033 // Primary maintainer for this file: Laurent Itti <itti@usc.edu> 00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Beowulf/TCPmessage.H $ 00035 // $Id: TCPmessage.H 9412 2008-03-10 23:10:15Z farhan $ 00036 // 00037 00038 #ifndef TCPMESSAGE_H_DEFINED 00039 #define TCPMESSAGE_H_DEFINED 00040 00041 #include "Util/log.H" 00042 #include "Util/Types.H" // for int32, etc 00043 #include "Image/ArrayData.H" 00044 #include "Image/Point2D.H" 00045 00046 #include <string.h> 00047 #include <stdlib.h> 00048 #include <sys/shm.h> 00049 00050 template <class T> class Image; 00051 template <class T> class ImageSet; 00052 template <class T> class PixRGB; 00053 00054 //! types of possible message fields: 00055 #define TCPMSG_END 0 00056 #define TCPMSG_SHMINFO 1 00057 #define TCPMSG_SHMDONE 2 00058 00059 #define TCPMSG_COLBYTEIMA 100 00060 #define TCPMSG_FIXATION 101 00061 #define TCPMSG_STRING 102 00062 #define TCPMSG_BYTEIMA 103 00063 #define TCPMSG_FLOATIMA 104 00064 #define TCPMSG_FLOATIMASET 105 00065 #define TCPMSG_INT32 106 00066 #define TCPMSG_FLOAT 107 00067 #define TCPMSG_DOUBLE 108 00068 #define TCPMSG_INT64 109 00069 00070 struct Fixation 00071 { 00072 Fixation() : i(-1), j(-1), frame(-1) {} 00073 00074 Fixation(const Point2D<int>& p, int f) : i(p.i), j(p.j), frame(f) {} 00075 00076 int32 i; 00077 int32 j; 00078 int32 frame; 00079 }; 00080 00081 //! This class defines a modular message to be passed over TCP connections. 00082 /*! The basic message structure is as follows: 00083 - one TCPmessageHeader (see private section of TCPmessage definition) 00084 - [repeat as needed:] 00085 - field type (int32) 00086 - field data (size and type depending on field type) 00087 - field type = TCPMSG_END 00088 CAUTION: to optimize speed, there is no semaphore when modifying 00089 the contents of the message (e.g., adding fields) or when parsing it. 00090 So, be careful with multi-threaded uses! 00091 */ 00092 class TCPmessage 00093 { 00094 public: 00095 //! uninitialized constructor (for pointers) 00096 TCPmessage(); 00097 00098 //! constructor with empty message body 00099 /*! @param msgid a message identification (user-defined) 00100 @param msgaction additional field where users can specify an 00101 action to be executed when this message is received 00102 @param msgeti estimated time to idle, in seconds. This is used for 00103 automatic load balancing, where the Beowulf class will dispatch 00104 processing to nodes with shorter eti's in priority, when a 00105 load-balanced send is requested. If you are not doing load 00106 balancing, just use the default of 0.0. */ 00107 TCPmessage(const int32 msgid, const int32 msgaction, 00108 const float msgeti = 0.0F); 00109 00110 //! constructor with copy of given message body 00111 /*! NOTE: we do a deep copy of the given buffer, so that it can be 00112 freed immediately upon return from this constructor */ 00113 TCPmessage(const int32 msgid, const int32 msgaction, const float msgeti, 00114 const void *buf, const int bufsize); 00115 00116 //! copy constructor 00117 TCPmessage(const TCPmessage& m); 00118 00119 //! assignment 00120 TCPmessage& operator=(const TCPmessage& m); 00121 00122 //! destructor 00123 ~TCPmessage(); 00124 00125 //! method to add elements into a message to be sent out 00126 void addImage(const Image< PixRGB<byte> >& im); 00127 //! method to add elements into a message to be sent out 00128 void addImage(const Image<byte>& im); 00129 //! method to add elements into a message to be sent out 00130 void addImage(const Image<float>& im); 00131 //! method to add elements into a message to be sent out 00132 void addImageSet(const ImageSet<float>& im); 00133 //! method to add elements into a message to be sent out 00134 void addFixation(const Fixation& fix); 00135 //! method to add elements into a message to be sent out 00136 void addString(const char* str); 00137 //! method to add elements into a message to be sent out 00138 void addInt32(const int32 val); 00139 //! method to add elements into a message to be sent out 00140 void addFloat(const float val); 00141 //! method to add elements into a message to be sent out 00142 void addDouble(const double val); 00143 //! method to add elements into a message to be sent out 00144 void addInt64(const int64 val); 00145 00146 //! modify message ID 00147 inline void setID(const int32 id); 00148 //! get message ID 00149 inline int32 getID() const; 00150 00151 //! modify message action 00152 inline void setAction(const int32 ac); 00153 //! get message action 00154 inline int32 getAction() const; 00155 00156 //! modify message eti (estimated time to idle) 00157 inline void setETI(const float eti); 00158 //! get message eti (estimated time to idle) 00159 inline float getETI() const; 00160 00161 //! get total message size (mostly for debuging purposes) 00162 inline int32 getSize() const; 00163 00164 //! check whether message is busy (sending or receiving data): 00165 inline bool isBusy() const; 00166 00167 //! empty old contents and get ready for a re-use: 00168 void reset(const int32 msgid, const int32 msgaction, 00169 const float msgeti = 0.0F); 00170 00171 //! free memory: 00172 void freeMem(); 00173 00174 //! DONT'T USE THIS; use one of the type-specific getElement functions below 00175 /*! OK, only use this function if you need to handle multiple 00176 possible message types; if you know for sure which type of 00177 element you expect, then it is safer and more efficient to call 00178 one of the type-specific functions like getElementByteIma() or 00179 getElementInt32(); see below. 00180 00181 If you do need to use this raw function, then after calling it, 00182 typ will contain TCPMSG_xxx so that the type of elem can be 00183 inferred. CAUTION: object will be allocated with new; you must 00184 delete it when done with it! 00185 */ 00186 void getElementRaw(void **elem, int32& typ); 00187 00188 //! Get a color byte image from the message stream 00189 /*! The current message element must have type TCPMSG_COLBYTEIMA */ 00190 Image<PixRGB<byte> > getElementColByteIma(); 00191 00192 //! Get a byte image from the message stream 00193 /*! The current message element must have type TCPMSG_BYTEIMA */ 00194 Image<byte> getElementByteIma(); 00195 00196 //! Get a float image from the message stream 00197 /*! The current message element must have type TCPMSG_FLOATIMA */ 00198 Image<float> getElementFloatIma(); 00199 00200 //! Get a float image set from the message stream 00201 /*! The current message element must have type TCPMSG_FLOATIMASET */ 00202 ImageSet<float> getElementFloatImaSet(); 00203 00204 //! Get an int32 value from the message stream 00205 /*! The current message element must have type TCPMSG_INT32 */ 00206 int32 getElementInt32(); 00207 00208 //! Get an int64 value from the message stream 00209 /*! The current message element must have type TCPMSG_INT64 */ 00210 int64 getElementInt64(); 00211 00212 //! Get a double value from the message stream 00213 /*! The current message element must have type TCPMSG_DOUBLE */ 00214 double getElementDouble(); 00215 00216 //! Get a float value from the message stream 00217 /*! The current message element must have type TCPMSG_FLOAT */ 00218 float getElementFloat(); 00219 00220 //! Get a Fixation from the message stream 00221 /*! The current message element must have type TCPMSG_FIXATION */ 00222 Fixation getElementFixation(); 00223 00224 //! Get a string value from the message stream 00225 /*! The current message element must have type TCPMSG_STRING */ 00226 std::string getElementString(); 00227 00228 private: 00229 //! This private struct is the common header in all TCPmessage objects 00230 struct TCPmessageHeader 00231 { 00232 int32 itsId; //!< user-defined message ID 00233 int32 itsAction; //!< user-defined message action 00234 float itsETI; //!< user-defined estimated time to idle 00235 int32 itsSize; //!< total size of message data, not including header 00236 }; 00237 00238 //! pack some data into message: 00239 template <class T> inline void pack(const T *data, const int nitem); 00240 00241 //! resize message to new size: 00242 inline void resize(const int target_size, const bool do_copy = true); 00243 00244 //! unpack some data from message (memory must be pre-allocated): 00245 public: 00246 template <class T> inline void unpack(const T *data, const int nitem); 00247 private: 00248 00249 //! get a type field from the buffer and make sure it's what we expected 00250 /*! will generate an LFATAL() if the type doesn't match expected_type */ 00251 void unpackAndVerifyType(int32 expected_type); 00252 00253 //! decode a color byte image from the message buffer 00254 /*! this assumes that you have already extracted the message type 00255 and verified that it is TCPMSG_COLBYTEIMA */ 00256 Image<PixRGB<byte> > decodeColByteIma(); 00257 00258 //! decode a byte image from the message buffer 00259 /*! this assumes that you have already extracted the message type 00260 and verified that it is TCPMSG_BYTEIMA */ 00261 Image<byte> decodeByteIma(); 00262 00263 //! decode a float image from the message buffer 00264 /*! this assumes that you have already extracted the message type 00265 and verified that it is TCPMSG_FLOATIMA */ 00266 Image<float> decodeFloatIma(); 00267 00268 //! decode a float image from the message buffer 00269 /*! this assumes that you have already extracted the message type 00270 and verified that it is TCPMSG_FLOATIMASET */ 00271 ImageSet<float> decodeFloatImaSet(); 00272 00273 //! decode a Fixation from the message buffer 00274 /*! this assumes that you have already extracted the message type 00275 and verified that it is TCPMSG_FIXATION */ 00276 Fixation decodeFixation(); 00277 00278 //! read header in from a socket: 00279 int readHeaderFrom(const int fd); 00280 00281 //! read more data in from a socket: 00282 int readFrom(const int fd); 00283 00284 //! write header to a socket: 00285 int writeHeaderTo(const int fd); 00286 00287 //! write more data to a socket: 00288 int writeTo(const int fd); 00289 00290 //! make our private methods accessible to TCPcliServ: 00291 friend class TCPcliServ; 00292 00293 //! add shared memory segment info into message 00294 void addShmInfo(const int shmid, const int siz); 00295 00296 //! add shared memory segment release order into message 00297 void addShmDone(const int shmid); 00298 00299 //! check if message refers to a shared memory segment (true if so) 00300 bool checkShmInfo(int& shmid, int& siz) const; 00301 00302 //! check if message refers to a shared memory segment (true if so) 00303 bool checkShmDone(int& shmid) const; 00304 00305 //! get pointer to message body (dangerous!) 00306 inline const char *getMsg() const; 00307 00308 //! get pointer to message body (dangerous!) 00309 inline char *getMsg(); 00310 00311 //! attach to an existing message buffer 00312 /*! CAUTION: you need to detach() before going out of scope 00313 otherwise your buffer will be free'd. CAUTION: msgbuf must contain 00314 a valid finalized message size. */ 00315 void attach(char *msgbuf, const int siz); 00316 00317 //! detach from attached message buffer (no memory freeing) 00318 void detach(); 00319 00320 bool itsBusy; //!< being sent/received 00321 int itsUpkidx; //!< index for unpacking message contents 00322 int itsHeadIdx; //!< index for receiving header data 00323 TCPmessageHeader itsHead; //!< message header 00324 ArrayHandle<char> itsMsg; //!< message data (ref-counted) 00325 }; 00326 00327 // ###################################################################### 00328 // ##### INLINED METHODS: 00329 // ###################################################################### 00330 00331 // ###################################################################### 00332 inline void TCPmessage::setID(const int32 id) 00333 { itsHead.itsId = id; } 00334 00335 // ###################################################################### 00336 inline int32 TCPmessage::getID() const 00337 { return itsHead.itsId; } 00338 00339 // ###################################################################### 00340 inline void TCPmessage::setAction(const int32 action) 00341 { itsHead.itsAction = action; } 00342 00343 // ###################################################################### 00344 inline int32 TCPmessage::getAction() const 00345 { return itsHead.itsAction; } 00346 00347 // ###################################################################### 00348 inline void TCPmessage::setETI(const float eti) 00349 { itsHead.itsETI = eti; } 00350 00351 // ###################################################################### 00352 inline float TCPmessage::getETI() const 00353 { return itsHead.itsETI; } 00354 00355 // ###################################################################### 00356 inline int32 TCPmessage::getSize() const 00357 { return itsHead.itsSize; } 00358 00359 // ###################################################################### 00360 inline bool TCPmessage::isBusy() const 00361 { return itsBusy; } 00362 00363 // ###################################################################### 00364 template <class T> inline 00365 void TCPmessage::pack(const T *data, const int nitem) 00366 { 00367 if (nitem == 0) return; // nothing to do 00368 resize(itsHead.itsSize + nitem * sizeof(T)); 00369 memcpy((void *)(getMsg() + itsHead.itsSize), 00370 (const void *)data, nitem * sizeof(T)); 00371 itsHead.itsSize += nitem * sizeof(T); 00372 } 00373 00374 // ###################################################################### 00375 inline void TCPmessage::resize(const int target_size, const bool do_copy) 00376 { 00377 if (itsBusy) LFATAL("Attempted to resize() busy TCPmessage!"); 00378 int siz = itsMsg.get().w() * itsMsg.get().h(); // current allocation 00379 00380 // do a reallocation of memory if necessary: 00381 if (siz < target_size) { 00382 // use exponential size growth to ensure that append operations are 00383 // amortized in O(N): 00384 int newsiz = (siz * 3) / 2 + 2048; 00385 if (newsiz < target_size + 2048) newsiz = target_size + 2048; 00386 Dims newdims(newsiz, 1); 00387 ArrayData<char> *a = new ArrayData<char>(newdims); 00388 ArrayHandle<char> h(a); itsMsg.swap(h); 00389 00390 // now copy the data if there was any: 00391 if (do_copy && itsHead.itsSize > 0) 00392 memcpy(getMsg(), h.get().data(), itsHead.itsSize); 00393 } 00394 } 00395 00396 // ###################################################################### 00397 template <class T> inline 00398 void TCPmessage::unpack(const T *data, const int nitem) 00399 { 00400 if (nitem == 0) return; // nothing to do 00401 memcpy((void *)data, (const void *)(getMsg() + itsUpkidx), 00402 nitem * sizeof(T)); 00403 itsUpkidx += nitem * sizeof(T); 00404 } 00405 00406 // ###################################################################### 00407 inline const char* TCPmessage::getMsg() const 00408 { return itsMsg.get().data(); } 00409 00410 // ###################################################################### 00411 inline char* TCPmessage::getMsg() 00412 { return itsMsg.uniq().dataw(); } 00413 00414 #endif 00415 00416 // ###################################################################### 00417 /* So things look consistent in everyone's emacs... */ 00418 /* Local Variables: */ 00419 /* indent-tabs-mode: nil */ 00420 /* End: */