00001 /*!@file Beowulf/TCPcommunicator.C A class to handle multiple TCPmessage 00002 communications */ 00003 00004 // //////////////////////////////////////////////////////////////////// // 00005 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2001 by the // 00006 // University of Southern California (USC) and the iLab at USC. // 00007 // See http://iLab.usc.edu for information about this project. // 00008 // //////////////////////////////////////////////////////////////////// // 00009 // Major portions of the iLab Neuromorphic Vision Toolkit are protected // 00010 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency // 00011 // in Visual Environments, and Applications'' by Christof Koch and // 00012 // Laurent Itti, California Institute of Technology, 2001 (patent // 00013 // pending; application number 09/912,225 filed July 23, 2001; see // 00014 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status). // 00015 // //////////////////////////////////////////////////////////////////// // 00016 // This file is part of the iLab Neuromorphic Vision C++ Toolkit. // 00017 // // 00018 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can // 00019 // redistribute it and/or modify it under the terms of the GNU General // 00020 // Public License as published by the Free Software Foundation; either // 00021 // version 2 of the License, or (at your option) any later version. // 00022 // // 00023 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope // 00024 // that it will be useful, but WITHOUT ANY WARRANTY; without even the // 00025 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // 00026 // PURPOSE. See the GNU General Public License for more details. // 00027 // // 00028 // You should have received a copy of the GNU General Public License // 00029 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write // 00030 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, // 00031 // Boston, MA 02111-1307 USA. // 00032 // //////////////////////////////////////////////////////////////////// // 00033 // 00034 // Primary maintainer for this file: Laurent Itti <itti@usc.edu> 00035 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Beowulf/TCPcommunicator.C $ 00036 // $Id: TCPcommunicator.C 11538 2009-07-30 06:23:37Z itti $ 00037 // 00038 00039 #include "Beowulf/TCPcommunicator.H" 00040 00041 #include "Beowulf/BeowulfOpts.H" 00042 #include "Beowulf/SockServ.H" 00043 #include "Beowulf/TCPdefs.H" 00044 #include "Component/OptionManager.H" 00045 #include "Util/Assert.H" 00046 #include "Util/log.H" 00047 00048 #include <arpa/inet.h> 00049 #include <fcntl.h> 00050 #include <netdb.h> 00051 #include <netinet/in.h> 00052 #include <signal.h> 00053 #include <sys/socket.h> 00054 #include <unistd.h> 00055 00056 void* TCPcommunicator_run(void *c0); // will live in a separate thread 00057 00058 //! for id-logging; see log.H: 00059 #define MYLOGID fd 00060 00061 namespace 00062 { 00063 bool operator==(const in_addr& addr1, const in_addr& addr2) 00064 { 00065 return addr1.s_addr == addr2.s_addr; 00066 } 00067 } 00068 00069 // ###################################################################### 00070 void* TCPcommunicator_run(void *c0) 00071 { 00072 // this is just a wrapper so that we call the member function run() on 00073 // the TCPcommunicator object passed as argument: 00074 TCPcommunicator *c = (TCPcommunicator *)c0; 00075 c->run(); return NULL; 00076 } 00077 00078 // ###################################################################### 00079 TCPcommunicator::TCPcommunicator(OptionManager& mgr, 00080 const std::string& descrName, 00081 const std::string& tagName) : 00082 ModelComponent(mgr, descrName, tagName), 00083 myIP(&OPT_TCPcommunicatorIPaddr, this), 00084 inqlen(&OPT_TCPcommunicatorInQlen, this), 00085 ouqlen(&OPT_TCPcommunicatorOuQlen, this), 00086 indroplast(&OPT_TCPcommunicatorInDropLast, this), 00087 oudroplast(&OPT_TCPcommunicatorOuDropLast, this), 00088 disableShm(&OPT_TCPcommunicatorDisableShm, this), 00089 server(new SockServ(mgr)) 00090 { 00091 // make our server a subcomponent of us: 00092 addSubComponent(server); 00093 threadRunning.atomic_set(0); 00094 } 00095 00096 // ###################################################################### 00097 void TCPcommunicator::start1() 00098 { 00099 // determine my IP if not given: 00100 if (myIP.getVal().s_addr == 0) 00101 { 00102 char hostname[32]; gethostname(hostname, 32); 00103 struct hostent *he = gethostbyname(hostname); 00104 if (he == NULL) LFATAL("Cannot determine my IP address"); 00105 00106 in_addr myip; 00107 myip.s_addr = ntohl( ((in_addr *)(he->h_addr_list[0]))->s_addr ); 00108 myIP.setVal(myip); 00109 LINFO("Using IP address %s", myIP.getValString().c_str()); 00110 } 00111 00112 // block signals: 00113 sigset_t sset; sigemptyset(&sset); 00114 sigaddset(&sset, SIGHUP); sigaddset(&sset, SIGPIPE); 00115 int s = sigprocmask(SIG_BLOCK, &sset, NULL); 00116 if (s != 0) PLERROR("Sigprocmask failed"); 00117 00118 // let now our SockServ start, and we'll be back in start2() 00119 } 00120 00121 // ###################################################################### 00122 void TCPcommunicator::start2() 00123 { 00124 // setup client array: 00125 typedef TCPcliServ* TCPcliServPtr; 00126 cli = new TCPcliServPtr[FD_SETSIZE]; 00127 for (int i = 0; i < FD_SETSIZE; i ++) cli[i] = NULL; // no client 00128 to_add = false; to_stop = false; 00129 00130 // start thread for run(): 00131 threadRunning.atomic_set(1); 00132 pthread_create(&runner, NULL, &TCPcommunicator_run, (void *)this); 00133 } 00134 00135 // ###################################################################### 00136 void TCPcommunicator::stop1() 00137 { 00138 // we are going to signal to the worker thread that we want it to 00139 // stop by setting to_stop=true, then we wait for it to respond by 00140 // setting threadRunning=0; however this will also work if the 00141 // worker thread has already quit before we even begin stop1(), in 00142 // which case threadRunning will be already 0 00143 00144 struct timespec ts, ts2; 00145 while (threadRunning.atomic_get() != 0) 00146 { 00147 to_stop = true; 00148 ts.tv_sec = 0; ts.tv_nsec = 1000 * POLL_SLEEP; 00149 nanosleep(&ts, &ts2); 00150 } 00151 00152 ASSERT(threadRunning.atomic_get() == 0); 00153 00154 delete [] cli; cli = 0; 00155 } 00156 00157 // ###################################################################### 00158 TCPcommunicator::~TCPcommunicator() 00159 { } 00160 00161 // ###################################################################### 00162 int TCPcommunicator::contact(const char *hostname, const bool blocking) 00163 { 00164 // extract port information from name:port format in hostname: 00165 char hn2[strlen(hostname) + 1]; strcpy(hn2, hostname); 00166 int port = DEFPORT; 00167 for (int i = strlen(hn2) - 1; i > 0; i --) 00168 if (hn2[i] == ':') // ok, we have port information 00169 { hn2[i] = '\0'; port = atoi(&(hn2[i + 1])); } 00170 00171 // get IP address: 00172 struct hostent *he = gethostbyname(hn2); 00173 if (he == NULL) 00174 { LERROR("Cannot gethostbyname(%s): %d", hn2, h_errno); return -1; } 00175 00176 // ready to contact: 00177 return contact(ntohl( ((in_addr *)(he->h_addr_list[0]))->s_addr ), 00178 port, blocking); 00179 } 00180 00181 // ###################################################################### 00182 int TCPcommunicator::contact(const in_addr_t ip, const short int port, 00183 const bool blocking) 00184 { 00185 ASSERT(started()); 00186 00187 int fd = socket(AF_INET, SOCK_STREAM, 0); 00188 if (fd == -1) { PLERROR("Cannot create socket"); return -1; } 00189 00190 // attempt a connect to server: 00191 struct sockaddr_in addr; 00192 addr.sin_family = AF_INET; 00193 addr.sin_addr.s_addr = htonl(ip); 00194 addr.sin_port = htons(port); 00195 00196 struct in_addr him; him.s_addr = htonl(ip); 00197 if (connect(fd, (struct sockaddr*)(&addr), sizeof(addr)) == -1) { 00198 IDPLERROR("Error connecting to %s:%hu", inet_ntoa(him), port); 00199 return -1; 00200 } 00201 IDLDEBUG("Connected to %s:%hu", inet_ntoa(him), port); 00202 00203 // switch to non-blocking mode: 00204 int flags = fcntl(fd, F_GETFL, 0); 00205 if (flags == -1) 00206 { IDPLERROR("Cannot get socket flags"); close(fd); return -1; } 00207 if (fcntl(fd, F_SETFL, O_NONBLOCK | flags) == -1) 00208 { IDPLERROR("Cannot set socket flags"); close(fd); return -1; } 00209 00210 // get ready to add new TCPcliServ (will be done in run thread): 00211 addfd = fd; addip = ip; addport = port; 00212 pollvar(&to_add); to_add = true; if (blocking) pollvar(&to_add); 00213 00214 return fd; 00215 } 00216 00217 // ###################################################################### 00218 void TCPcommunicator::run() 00219 { 00220 bool running = true; 00221 const short myPort = server->getModelParamVal<short>("SockServPort"); 00222 00223 while(running) { 00224 // were we instructed to stop? 00225 if (to_stop) { running = false; break; } 00226 00227 // check if we have a new client added through contact(): 00228 if (to_add) { 00229 if (cli[addfd]) { 00230 LERROR("Already have client %d! Killing.", addfd); 00231 terminateCli(addfd); 00232 } 00233 cli[addfd] = new 00234 TCPcliServ(addfd, myIP.getVal().s_addr, myPort, addip, addport, 00235 inqlen.getVal(), ouqlen.getVal(), 00236 indroplast.getVal(), oudroplast.getVal(), 00237 disableShm.getVal()); 00238 server->addClient(addfd); server->monitorRead(addfd); 00239 server->monitorError(addfd); to_add = false; 00240 } 00241 00242 // check what's going on with all our clients: 00243 int client; 00244 int result = server->check(TCP_TIMEOUT_SEC, TCP_TIMEOUT_USEC); 00245 00246 switch(result) { 00247 00248 case SOCKSERV_BUG: // server got bogus. The end... 00249 LERROR("Server messed-up. ABORT."); running = false; 00250 break; 00251 case SOCKSERV_IDLE: // nothing happened... 00252 break; 00253 case SOCKSERV_ACTIV: // some activity! let's check it out 00254 00255 // ########## first, check for new clients and configure them: 00256 while((client = server->getNewClient())) { 00257 if (cli[client]) { 00258 LDEBUG("Already have TCPcliServ %d? Killing", client); 00259 terminateCli(client); 00260 } 00261 cli[client] = new TCPcliServ(client, myIP.getVal().s_addr, myPort, 00262 server->getClientIP(client), 00263 server->getClientPort(client), 00264 inqlen.getVal(), ouqlen.getVal(), 00265 indroplast.getVal(), oudroplast.getVal(), 00266 disableShm.getVal()); 00267 server->monitorRead(client); server->monitorWrite(client, false); 00268 server->monitorError(client); 00269 // wait until this client has some data for us to read 00270 } 00271 00272 // ########## check for clients that got errored: 00273 while((client = server->getErrorClient())) { 00274 if (cli[client] == NULL) continue; // already gone 00275 LDEBUG("Connection to client %d messed-up. Terminating.", client); 00276 terminateCli(client); 00277 } 00278 00279 // ########## check for clients ready to be read from / written to: 00280 while((client = server->getRWClient())) 00281 { 00282 if (cli[client] == NULL) continue; // already gone 00283 switch(cli[client]->check()) 00284 { 00285 case TCPDONE: // transaction done -> wait for next one 00286 case TCPWAITREAD: // this guy needs more data 00287 server->monitorRead(client); 00288 server->monitorWrite(client, false); 00289 break; 00290 case TCPWAITWRITE: // this guy wants to send more data 00291 server->monitorWrite(client); 00292 server->monitorRead(client); // always be open to incoming stuff 00293 break; 00294 case TCPWAITRW: // this guy wants to send & read more data 00295 server->monitorWrite(client); 00296 server->monitorRead(client); 00297 break; 00298 case TCPBUG: // this guy is messed-up 00299 LDEBUG("TCPcliServ %d messed-up. Terminating.", client); 00300 terminateCli(client); 00301 break; 00302 case TCPFINISH: // this guy is finished 00303 terminateCli(client); 00304 break; 00305 } 00306 } 00307 break; 00308 default: 00309 LERROR("Server gave unknown result %d. Ignored.", result); 00310 } 00311 } 00312 threadRunning.atomic_set(0); 00313 pthread_exit(0); 00314 } 00315 00316 // ###################################################################### 00317 void TCPcommunicator::terminateAll() 00318 { for (int i = 0; i < FD_SETSIZE; i ++) if (cli[i]) terminateCli(i); } 00319 00320 // ###################################################################### 00321 void TCPcommunicator::terminateAllButOne(const int fd) 00322 { for (int i = 0; i < FD_SETSIZE; i ++) if (cli[i] && i!=fd) terminateCli(i); } 00323 00324 // ###################################################################### 00325 void TCPcommunicator::terminateCli(const int fd) 00326 { 00327 ASSERT(fd >= 0 && fd < FD_SETSIZE); 00328 if (cli[fd]) 00329 { 00330 server->disconnect(fd); 00331 if (cli[fd]) delete cli[fd]; 00332 cli[fd] = NULL; 00333 } 00334 } 00335 00336 // ###################################################################### 00337 void TCPcommunicator::send(const int sfd, TCPmessage& smsg) 00338 { 00339 ASSERT(sfd >= 0 && sfd < FD_SETSIZE); 00340 if (cli[sfd] == NULL) 00341 LERROR("No client %d to send to!", sfd); 00342 else 00343 { 00344 cli[sfd]->send(smsg); // thread-safe 00345 server->monitorWrite(sfd); server->monitorRead(sfd); 00346 server->monitorError(sfd); 00347 } 00348 } 00349 00350 // ###################################################################### 00351 bool TCPcommunicator::receive(int& rfd, TCPmessage& rmsg, 00352 const int timeout, int* err) 00353 { 00354 // is there any incoming message already? 00355 bool got_one = receiveIt(rfd, rmsg, err); 00356 if (got_one) return true; // got a message! 00357 00358 // if we have a non-zero timeout, let's do a select() and try again: 00359 if (timeout) { 00360 // wait for the fds to have something ready 00361 waitFor(rfd, timeout); 00362 00363 // if something came in, it should have also been picked up by the 00364 // receiver thread, and it may be available now. Try once more 00365 // and return status: 00366 return receiveIt(rfd, rmsg, err); 00367 } 00368 00369 // nothing received: 00370 return false; 00371 } 00372 00373 // ###################################################################### 00374 int TCPcommunicator::nbReceived(const int rfd) 00375 { 00376 int nb = 0; 00377 00378 if (rfd != -1) // only check specific fd 00379 { 00380 if (cli[rfd] == NULL) { LERROR("No client %d!", rfd); nb = 0; } 00381 else nb = cli[rfd]->nbReceived(); 00382 } 00383 else // rfd = -1, then check all fds 00384 for (int i = 0; i < FD_SETSIZE; i ++) 00385 if (cli[i] != NULL) 00386 nb += cli[i]->nbReceived(); 00387 return nb; 00388 } 00389 00390 // ###################################################################### 00391 void TCPcommunicator::waitFor(const int rfd, const int timeout) 00392 { 00393 // we'll do a blocking select on those fds we are interested in: 00394 fd_set rfds, wfds, efds; FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); 00395 if (rfd == -1) { // monitor all our fds 00396 for (int i = 0; i < FD_SETSIZE; i ++) 00397 if (cli[i] != NULL) FD_SET(i, &rfds); 00398 } else FD_SET(rfd, &rfds); 00399 struct timeval to; to.tv_sec = 0; to.tv_usec = timeout * 1000; 00400 00401 // don't care about the result; we just want to wait: 00402 select(FD_SETSIZE, &rfds, &wfds, &efds, &to); 00403 } 00404 00405 // ###################################################################### 00406 void TCPcommunicator::pollvar(volatile bool *var) 00407 { 00408 struct timespec ts, ts2; 00409 while (*var == true) 00410 { 00411 ts.tv_sec = 0; ts.tv_nsec = 1000 * POLL_SLEEP; 00412 nanosleep(&ts, &ts2); 00413 } 00414 } 00415 00416 // ###################################################################### 00417 bool TCPcommunicator::receiveIt(int& rfd, TCPmessage& rmsg, int* err) 00418 { 00419 if (threadRunning.atomic_get() != 1) 00420 LFATAL("Oops! The communicator is not running anymore"); 00421 00422 ASSERT(rfd >= -1 && rfd < FD_SETSIZE); bool got_one = false; 00423 00424 if (rfd != -1) // receive from specific fd: 00425 { 00426 if (cli[rfd] == NULL) 00427 { 00428 LERROR("No client %d to receive from!", rfd); 00429 if (err != 0) *err = 1; 00430 got_one = false; 00431 } 00432 else 00433 got_one = cli[rfd]->receive(rmsg); 00434 } 00435 else // rfd = -1, then receive from any fd: 00436 { 00437 for (int i = 0; i < FD_SETSIZE; i ++) 00438 if (cli[i] != NULL) 00439 { 00440 got_one = cli[i]->receive(rmsg); 00441 if (got_one) { rfd = i; break; } 00442 } 00443 } 00444 return got_one; 00445 } 00446 00447 // ###################################################################### 00448 /* So things look consistent in everyone's emacs... */ 00449 /* Local Variables: */ 00450 /* indent-tabs-mode: nil */ 00451 /* End: */