00001 /*!@file Beowulf/TCPcliServ.C A client/server to receive/send TCPmessage */ 00002 00003 // //////////////////////////////////////////////////////////////////// // 00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2001 by the // 00005 // University of Southern California (USC) and the iLab at USC. // 00006 // See http://iLab.usc.edu for information about this project. // 00007 // //////////////////////////////////////////////////////////////////// // 00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected // 00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency // 00010 // in Visual Environments, and Applications'' by Christof Koch and // 00011 // Laurent Itti, California Institute of Technology, 2001 (patent // 00012 // pending; application number 09/912,225 filed July 23, 2001; see // 00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status). // 00014 // //////////////////////////////////////////////////////////////////// // 00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit. // 00016 // // 00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can // 00018 // redistribute it and/or modify it under the terms of the GNU General // 00019 // Public License as published by the Free Software Foundation; either // 00020 // version 2 of the License, or (at your option) any later version. // 00021 // // 00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope // 00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the // 00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // 00025 // PURPOSE. See the GNU General Public License for more details. // 00026 // // 00027 // You should have received a copy of the GNU General Public License // 00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write // 00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, // 00030 // Boston, MA 02111-1307 USA. // 00031 // //////////////////////////////////////////////////////////////////// // 00032 // 00033 // Primary maintainer for this file: Laurent Itti <itti@usc.edu> 00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Beowulf/TCPcliServ.C $ 00035 // $Id: TCPcliServ.C 11538 2009-07-30 06:23:37Z itti $ 00036 // 00037 00038 #include "Beowulf/TCPcliServ.H" 00039 00040 #include "Beowulf/TCPdefs.H" 00041 #include "Util/log.H" 00042 #include <arpa/inet.h> 00043 #include <errno.h> 00044 #include <fcntl.h> 00045 #ifdef HAVE_NET_ETHERNET_H 00046 #include <net/ethernet.h> 00047 #else 00048 // workaround if we don't have <net/ethernet.h> 00049 #define ETHERMTU 1500 00050 #endif 00051 #include <netdb.h> 00052 #include <netinet/in.h> 00053 #include <netinet/tcp.h> 00054 #include <stdlib.h> 00055 #include <string.h> 00056 #include <sys/shm.h> 00057 #include <sys/socket.h> 00058 #include <sys/types.h> 00059 #include <unistd.h> 00060 00061 //! for ID-logging. See log.H: 00062 #define MYLOGID fd 00063 00064 //! internal states: 00065 #define TCPCS_UNKNOWN 0 00066 #define TCPCS_READING 1 00067 #define TCPCS_WRITING 2 00068 #define TCPCS_BOGUS 4 00069 00070 // ###################################################################### 00071 TCPcliServ::TCPcliServ() 00072 { state = TCPCS_BOGUS; } 00073 00074 // ###################################################################### 00075 void TCPcliServ::init(const int connected_fd, 00076 const in_addr_t my_ipaddr, 00077 const short int my_port, 00078 const in_addr_t cli_ipaddr, 00079 const short int cli_port, 00080 const int inqlen, const int outqlen, 00081 const bool indlast, const bool outdlast, 00082 const bool disableShm) 00083 { 00084 shmcount = (getpid() << 24) + (size_t(this) << 16); 00085 fd = connected_fd; myIP = my_ipaddr; myPort = my_port; 00086 cliIP = cli_ipaddr; cliPort = cli_port; 00087 inmsgqlen = inqlen; outmsgqlen = outqlen; 00088 indroplast = indlast; outdroplast = outdlast; 00089 00090 // switch fd to non-blocking and TCP_NODELAY modes: 00091 int flags = fcntl(fd, F_GETFL, 0); 00092 if (flags == -1) 00093 { IDPLERROR("Cannot get socket flags"); state = TCPCS_BOGUS; } 00094 if (fcntl(fd, F_SETFL, O_NONBLOCK | flags) == -1) 00095 { IDPLERROR("Cannot set socket flags"); state = TCPCS_BOGUS; } 00096 int w = 1; // watermark size 00097 if (setsockopt(fd, 6, TCP_NODELAY, (void *)&w, sizeof(int))) 00098 { IDPLERROR("Cannot set socket TCP_NODELAY"); state = TCPCS_BOGUS; } 00099 00100 // init mutexes: FIXME: bogus if init called several times... 00101 pthread_mutex_init(&mutin, NULL); 00102 pthread_mutex_init(&mutou, NULL); 00103 00104 // do other standard initializations: 00105 reset(); 00106 00107 // can we use shared memory? 00108 if (myIP == cliIP && disableShm == false) useShm = true; else useShm = false; 00109 00110 // show some debug info: 00111 struct in_addr me; me.s_addr = htonl(myIP); 00112 struct in_addr him; him.s_addr = htonl(cliIP); 00113 char buf[32]; strncpy(buf, inet_ntoa(me), 32); 00114 if (useShm) 00115 IDLDEBUG("Ready: %s:%hu <-SHM-> %s:%hu", buf, myPort, 00116 inet_ntoa(him), cliPort); 00117 else 00118 IDLDEBUG("Ready: %s:%hu <-TCP-> %s:%hu", buf, myPort, 00119 inet_ntoa(him), cliPort); 00120 } 00121 00122 // ###################################################################### 00123 TCPcliServ::TCPcliServ(const int connected_fd, 00124 const in_addr_t my_ipaddr, 00125 const short int my_port, 00126 const in_addr_t cli_ipaddr, 00127 const short int cli_port, 00128 const int inqlen, const int outqlen, 00129 const bool indlast, const bool outdlast, 00130 const bool disableShm) 00131 { 00132 state = TCPCS_BOGUS; 00133 init(connected_fd, my_ipaddr, my_port, cli_ipaddr, cli_port, 00134 inqlen, outqlen, indlast, outdlast, disableShm); 00135 } 00136 00137 // ###################################################################### 00138 TCPcliServ::~TCPcliServ() 00139 { 00140 disconnect(); pthread_mutex_destroy(&mutin); pthread_mutex_destroy(&mutou); 00141 00142 // release all our shared memory segments: 00143 if (shmmap.empty() == false) 00144 for (std::map<int, void *>::const_iterator 00145 m = shmmap.begin(); m != shmmap.end(); m ++) 00146 if (shmdt(m->second) == -1) IDPLERROR("Error detaching Shm %d",m->first); 00147 } 00148 00149 // ###################################################################### 00150 void TCPcliServ::send(const TCPmessage& msg) 00151 { 00152 pthread_mutex_lock(&mutou); 00153 00154 // check if our outgoing queue is full, and take action. Be careful 00155 // not to drop Shm acknowledge messages, though: 00156 int xx; 00157 if (outmsgqlen > 0 && oumsg.size() >= outmsgqlen && 00158 msg.checkShmDone(xx) == false) 00159 { 00160 if (outdroplast) 00161 { 00162 // IDLERROR("Outgoing queue full -- DROPPING MOST RECENT MESSAGE"); 00163 } 00164 else 00165 { 00166 // IDLERROR("Outgoing queue full -- DROPPING LEAST RECENT MESSAGE"); 00167 if (oumsg.front().checkShmDone(xx) == false) oumsg.pop_front(); 00168 oumsg.push_back(msg); // keep local copy of message 00169 } 00170 } 00171 else 00172 oumsg.push_back(msg); // keep local copy of message 00173 00174 pthread_mutex_unlock(&mutou); 00175 // message will be sent out whenever its turn comes in check()... 00176 } 00177 00178 // ###################################################################### 00179 bool TCPcliServ::receive(TCPmessage& msg) 00180 { 00181 bool ret = false; 00182 pthread_mutex_lock(&mutin); 00183 if (inmsg.size() && inmsg.front().isBusy() == false) 00184 { msg = inmsg.front(); inmsg.pop_front(); ret = true; } 00185 pthread_mutex_unlock(&mutin); 00186 return ret; 00187 } 00188 00189 // ###################################################################### 00190 int TCPcliServ::nbReceived() 00191 { 00192 pthread_mutex_lock(&mutin); 00193 int ret = inmsg.size(); 00194 if (ret > 0 && inmsg.front().isBusy()) ret = 0; // one msg, still receiving 00195 pthread_mutex_unlock(&mutin); 00196 return ret; 00197 } 00198 00199 // ###################################################################### 00200 int TCPcliServ::check() 00201 { 00202 if (state == TCPCS_BOGUS) return TCPBUG; // I am bogus... leave me alone! 00203 00204 // are we reading in a message? 00205 if (state & TCPCS_READING) 00206 { 00207 int r = im.readFrom(fd); // read more; returns TCPWAITREAD, TCPDONE, or TCPBUG 00208 switch(r) 00209 { 00210 case TCPWAITREAD: // need more data for current message 00211 break; 00212 case TCPDONE: // current message done; save it and wait for new one 00213 storeReceivedMessage(); // store what we have received 00214 state &= ~TCPCS_READING; // not reading a message any more... 00215 break; 00216 case TCPBUG: // something went wrong; suicide 00217 state = TCPCS_BOGUS; 00218 break; 00219 default: 00220 LFATAL("Unknown return from TCPmessage::readFrom()"); 00221 } 00222 } 00223 00224 // if we are not reading anything, is there a new incoming message? 00225 if ((state & TCPCS_READING) == 0) 00226 { 00227 // attempt a read and see if there is anything coming in: 00228 int r = im.readHeaderFrom(fd); 00229 switch(r) 00230 { 00231 case TCPDONE: // got a new message and we have its header ready 00232 // does the message have a non-empty body? if so, let's get 00233 // ready to read it. Otherwise we are done with this message. 00234 if (im.getSize() > 0) state |= TCPCS_READING; // will start reading 00235 else storeReceivedMessage(); // store what we have received 00236 break; 00237 case TCPWAITREAD: // nothing new 00238 break; 00239 default: // something bogus happened 00240 state = TCPCS_BOGUS; 00241 } 00242 } 00243 00244 // are we writing out a message? 00245 if (state & TCPCS_WRITING) 00246 { 00247 int r = om.writeTo(fd); // returns WAITWRITE, DONE, or BUG 00248 switch(r) 00249 { 00250 case TCPWAITWRITE: // send more data for current message 00251 break; 00252 case TCPDONE: // current message done 00253 state &= ~TCPCS_WRITING; 00254 break; 00255 case TCPBUG: // something went wrong; suicide 00256 state = TCPCS_BOGUS; 00257 break; 00258 default: 00259 LFATAL("Unknown return from TCPmessage::writeTo()"); 00260 } 00261 } 00262 00263 // if not writing, do we have pending messages in out queue? 00264 if ((state & TCPCS_WRITING) == 0) 00265 { 00266 // we are not writing any message; do we have one in the queue? 00267 pthread_mutex_lock(&mutou); 00268 if (oumsg.size()) 00269 { 00270 int xx; bool shmdone = oumsg.front().checkShmDone(xx); 00271 int siz = oumsg.front().getSize(); 00272 00273 // are we using shared memory, and this message is not an shm 00274 // message acknowledge (which must go via TCP), and the 00275 // message is not very small (otherwise, TCP will be faster)? 00276 if (useShm && 00277 shmdone == false && 00278 siz > int(ETHERMTU - sizeof(TCPmessage::TCPmessageHeader))) 00279 { 00280 key_t key = shmcount ++; 00281 00282 // create new shared memory segment: 00283 int shmid = shmget(key, siz, 0666 | IPC_CREAT | IPC_EXCL); 00284 if (shmid == -1) 00285 IDPLFATAL("Cannot create shared memory segment"); 00286 00287 // attach segment to our address space: 00288 char *shmbuf = (char *)shmat(shmid, NULL, 0); 00289 if (shmbuf == (char *)(-1)) 00290 IDPLFATAL("Cannot attach shared memory segment"); 00291 00292 // keep track of this new segment: 00293 shmmap[shmid] = shmbuf; 00294 00295 // copy message into shared memory zone: 00296 memcpy(shmbuf, (void *)(oumsg.front().getMsg()), siz); 00297 00298 // create a TCPmessage of type TCPMSG_SHMINFO 00299 om.reset(oumsg.front().getID(), oumsg.front().getAction()); 00300 om.addShmInfo(shmid, siz); 00301 00302 // pop message to be sent from send queue: 00303 oumsg.pop_front(); 00304 } 00305 else 00306 { 00307 // regular TCP transfer: 00308 om = oumsg.front(); // get next message to write out 00309 oumsg.pop_front(); // not in the queue any more... 00310 } 00311 if (shmdone == false) 00312 LDEBUG("Sending off [%d, %d] to %d",om.getID(),om.getAction(),fd); 00313 00314 // write out the message header: 00315 if (om.writeHeaderTo(fd) != TCPDONE) 00316 { state = TCPCS_BOGUS; return TCPBUG; } 00317 00318 // does the message have a non-empty body? if so, we will 00319 // write it out at the next check(). Otherwise we are done. 00320 if (om.getSize() > 0) 00321 state |= TCPCS_WRITING; // ok, we are writing this guy out 00322 } 00323 pthread_mutex_unlock(&mutou); 00324 } 00325 00326 // ok, now what do we return to SockServ? we always want to monitor for read, 00327 // and also for write if we are writing something out. 00328 if (state == TCPCS_BOGUS) return TCPBUG; 00329 else if (state & TCPCS_WRITING) return TCPWAITRW; 00330 else return TCPWAITREAD; 00331 } 00332 00333 // ###################################################################### 00334 int TCPcliServ::disconnect() 00335 { reset(); return TCPFINISH; } 00336 00337 // ###################################################################### 00338 int TCPcliServ::reset() 00339 { 00340 state = TCPCS_UNKNOWN; 00341 return TCPDONE; 00342 } 00343 00344 // ###################################################################### 00345 void TCPcliServ::storeReceivedMessage() 00346 { 00347 int shmid, siz; 00348 00349 // is it a shared memory message? 00350 if (im.checkShmInfo(shmid, siz)) 00351 { 00352 // attach to shared memory segment: 00353 char *msgbuf = (char *)shmat(shmid, NULL, 0); 00354 if (msgbuf == (char *)(-1)) 00355 IDPLERROR("Cannot attach to shared memory segment"); 00356 else 00357 { 00358 // make a deep copy of the shared memory segment into our 00359 // private memory, so that we can immediately release the 00360 // shared memory: 00361 TCPmessage insm(im.getID(), im.getAction(), im.getETI(), 00362 msgbuf, siz); 00363 00364 // push the TCPmessage into our received queue: 00365 queueIncomingMessage(insm); 00366 00367 // build an acknowledgement message: 00368 TCPmessage ack(insm.getID(), insm.getAction()); 00369 ack.addShmDone(shmid); 00370 00371 // release shared memory: 00372 if (shmdt(msgbuf) == -1) 00373 IDPLERROR("Error detaching Shm segment %d", shmid); 00374 00375 // send acknowledgement (will be forced to go via TCP): 00376 send(ack); 00377 } 00378 } 00379 // is it a shared memory segment release? 00380 else if (im.checkShmDone(shmid)) 00381 { 00382 std::map<int, void *>::const_iterator m = shmmap.find(shmid); 00383 if (m != shmmap.end()) 00384 { 00385 // ok, we do have this segment; let's release it: 00386 if (shmdt(m->second) == -1) 00387 IDPLERROR("Error detaching Shm segment %d", m->first); 00388 00389 // nobody needs it anymore; let's destroy it: 00390 if (shmctl(shmid, IPC_RMID, 0) == -1) 00391 IDPLERROR("Error deleting Shm segment %d", m->first); 00392 00393 // let's forget about this segment: 00394 shmmap.erase(shmid); 00395 } 00396 else 00397 IDLERROR("Attempt to release unknown Shm %d -- IGNORED", shmid); 00398 } 00399 // then it's a regular TCP message 00400 else 00401 queueIncomingMessage(im); 00402 00403 // we are done with the contents of this message: 00404 im.freeMem(); 00405 } 00406 00407 // ###################################################################### 00408 void TCPcliServ::queueIncomingMessage(TCPmessage& imsg) 00409 { 00410 pthread_mutex_lock(&mutin); 00411 00412 // check if our incoming queue is full, and take action. Be careful 00413 // not to drop Shm acknowledge messages, though: 00414 int xx; 00415 if (inmsgqlen > 0 && inmsg.size() >= inmsgqlen && 00416 imsg.checkShmDone(xx) == false) 00417 { 00418 if (indroplast) 00419 { 00420 // IDLERROR("Incoming queue full -- DROPPING MOST RECENT MESSAGE"); 00421 } 00422 else 00423 { 00424 // IDLERROR("Incoming queue full -- DROPPING LEAST RECENT MESSAGE"); 00425 if (inmsg.front().checkShmDone(xx) == false) inmsg.pop_front(); 00426 inmsg.push_back(imsg); 00427 } 00428 } 00429 else 00430 inmsg.push_back(imsg); 00431 00432 pthread_mutex_unlock(&mutin); 00433 } 00434 00435 // ###################################################################### 00436 /* So things look consistent in everyone's emacs... */ 00437 /* Local Variables: */ 00438 /* indent-tabs-mode: nil */ 00439 /* End: */