00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include "Beowulf/TCPcliServ.H"
00039
00040 #include "Beowulf/TCPdefs.H"
00041 #include "Util/log.H"
00042 #include <arpa/inet.h>
00043 #include <errno.h>
00044 #include <fcntl.h>
00045 #ifdef HAVE_NET_ETHERNET_H
00046 #include <net/ethernet.h>
00047 #else
00048
00049 #define ETHERMTU 1500
00050 #endif
00051 #include <netdb.h>
00052 #include <netinet/in.h>
00053 #include <netinet/tcp.h>
00054 #include <stdlib.h>
00055 #include <string.h>
00056 #include <sys/shm.h>
00057 #include <sys/socket.h>
00058 #include <sys/types.h>
00059 #include <unistd.h>
00060
00061
00062 #define MYLOGID fd
00063
00064
00065 #define TCPCS_UNKNOWN 0
00066 #define TCPCS_READING 1
00067 #define TCPCS_WRITING 2
00068 #define TCPCS_BOGUS 4
00069
00070
00071 TCPcliServ::TCPcliServ()
00072 { state = TCPCS_BOGUS; }
00073
00074
00075 void TCPcliServ::init(const int connected_fd,
00076 const in_addr_t my_ipaddr,
00077 const short int my_port,
00078 const in_addr_t cli_ipaddr,
00079 const short int cli_port,
00080 const int inqlen, const int outqlen,
00081 const bool indlast, const bool outdlast,
00082 const bool disableShm)
00083 {
00084 shmcount = (getpid() << 24) + (size_t(this) << 16);
00085 fd = connected_fd; myIP = my_ipaddr; myPort = my_port;
00086 cliIP = cli_ipaddr; cliPort = cli_port;
00087 inmsgqlen = inqlen; outmsgqlen = outqlen;
00088 indroplast = indlast; outdroplast = outdlast;
00089
00090
00091 int flags = fcntl(fd, F_GETFL, 0);
00092 if (flags == -1)
00093 { IDPLERROR("Cannot get socket flags"); state = TCPCS_BOGUS; }
00094 if (fcntl(fd, F_SETFL, O_NONBLOCK | flags) == -1)
00095 { IDPLERROR("Cannot set socket flags"); state = TCPCS_BOGUS; }
00096 int w = 1;
00097 if (setsockopt(fd, 6, TCP_NODELAY, (void *)&w, sizeof(int)))
00098 { IDPLERROR("Cannot set socket TCP_NODELAY"); state = TCPCS_BOGUS; }
00099
00100
00101 pthread_mutex_init(&mutin, NULL);
00102 pthread_mutex_init(&mutou, NULL);
00103
00104
00105 reset();
00106
00107
00108 if (myIP == cliIP && disableShm == false) useShm = true; else useShm = false;
00109
00110
00111 struct in_addr me; me.s_addr = htonl(myIP);
00112 struct in_addr him; him.s_addr = htonl(cliIP);
00113 char buf[32]; strncpy(buf, inet_ntoa(me), 32);
00114 if (useShm)
00115 IDLDEBUG("Ready: %s:%hu <-SHM-> %s:%hu", buf, myPort,
00116 inet_ntoa(him), cliPort);
00117 else
00118 IDLDEBUG("Ready: %s:%hu <-TCP-> %s:%hu", buf, myPort,
00119 inet_ntoa(him), cliPort);
00120 }
00121
00122
00123 TCPcliServ::TCPcliServ(const int connected_fd,
00124 const in_addr_t my_ipaddr,
00125 const short int my_port,
00126 const in_addr_t cli_ipaddr,
00127 const short int cli_port,
00128 const int inqlen, const int outqlen,
00129 const bool indlast, const bool outdlast,
00130 const bool disableShm)
00131 {
00132 state = TCPCS_BOGUS;
00133 init(connected_fd, my_ipaddr, my_port, cli_ipaddr, cli_port,
00134 inqlen, outqlen, indlast, outdlast, disableShm);
00135 }
00136
00137
00138 TCPcliServ::~TCPcliServ()
00139 {
00140 disconnect(); pthread_mutex_destroy(&mutin); pthread_mutex_destroy(&mutou);
00141
00142
00143 if (shmmap.empty() == false)
00144 for (std::map<int, void *>::const_iterator
00145 m = shmmap.begin(); m != shmmap.end(); m ++)
00146 if (shmdt(m->second) == -1) IDPLERROR("Error detaching Shm %d",m->first);
00147 }
00148
00149
00150 void TCPcliServ::send(const TCPmessage& msg)
00151 {
00152 pthread_mutex_lock(&mutou);
00153
00154
00155
00156 int xx;
00157 if (outmsgqlen > 0 && oumsg.size() >= outmsgqlen &&
00158 msg.checkShmDone(xx) == false)
00159 {
00160 if (outdroplast)
00161 {
00162
00163 }
00164 else
00165 {
00166
00167 if (oumsg.front().checkShmDone(xx) == false) oumsg.pop_front();
00168 oumsg.push_back(msg);
00169 }
00170 }
00171 else
00172 oumsg.push_back(msg);
00173
00174 pthread_mutex_unlock(&mutou);
00175
00176 }
00177
00178
00179 bool TCPcliServ::receive(TCPmessage& msg)
00180 {
00181 bool ret = false;
00182 pthread_mutex_lock(&mutin);
00183 if (inmsg.size() && inmsg.front().isBusy() == false)
00184 { msg = inmsg.front(); inmsg.pop_front(); ret = true; }
00185 pthread_mutex_unlock(&mutin);
00186 return ret;
00187 }
00188
00189
00190 int TCPcliServ::nbReceived()
00191 {
00192 pthread_mutex_lock(&mutin);
00193 int ret = inmsg.size();
00194 if (ret > 0 && inmsg.front().isBusy()) ret = 0;
00195 pthread_mutex_unlock(&mutin);
00196 return ret;
00197 }
00198
00199
00200 int TCPcliServ::check()
00201 {
00202 if (state == TCPCS_BOGUS) return TCPBUG;
00203
00204
00205 if (state & TCPCS_READING)
00206 {
00207 int r = im.readFrom(fd);
00208 switch(r)
00209 {
00210 case TCPWAITREAD:
00211 break;
00212 case TCPDONE:
00213 storeReceivedMessage();
00214 state &= ~TCPCS_READING;
00215 break;
00216 case TCPBUG:
00217 state = TCPCS_BOGUS;
00218 break;
00219 default:
00220 LFATAL("Unknown return from TCPmessage::readFrom()");
00221 }
00222 }
00223
00224
00225 if ((state & TCPCS_READING) == 0)
00226 {
00227
00228 int r = im.readHeaderFrom(fd);
00229 switch(r)
00230 {
00231 case TCPDONE:
00232
00233
00234 if (im.getSize() > 0) state |= TCPCS_READING;
00235 else storeReceivedMessage();
00236 break;
00237 case TCPWAITREAD:
00238 break;
00239 default:
00240 state = TCPCS_BOGUS;
00241 }
00242 }
00243
00244
00245 if (state & TCPCS_WRITING)
00246 {
00247 int r = om.writeTo(fd);
00248 switch(r)
00249 {
00250 case TCPWAITWRITE:
00251 break;
00252 case TCPDONE:
00253 state &= ~TCPCS_WRITING;
00254 break;
00255 case TCPBUG:
00256 state = TCPCS_BOGUS;
00257 break;
00258 default:
00259 LFATAL("Unknown return from TCPmessage::writeTo()");
00260 }
00261 }
00262
00263
00264 if ((state & TCPCS_WRITING) == 0)
00265 {
00266
00267 pthread_mutex_lock(&mutou);
00268 if (oumsg.size())
00269 {
00270 int xx; bool shmdone = oumsg.front().checkShmDone(xx);
00271 int siz = oumsg.front().getSize();
00272
00273
00274
00275
00276 if (useShm &&
00277 shmdone == false &&
00278 siz > int(ETHERMTU - sizeof(TCPmessage::TCPmessageHeader)))
00279 {
00280 key_t key = shmcount ++;
00281
00282
00283 int shmid = shmget(key, siz, 0666 | IPC_CREAT | IPC_EXCL);
00284 if (shmid == -1)
00285 IDPLFATAL("Cannot create shared memory segment");
00286
00287
00288 char *shmbuf = (char *)shmat(shmid, NULL, 0);
00289 if (shmbuf == (char *)(-1))
00290 IDPLFATAL("Cannot attach shared memory segment");
00291
00292
00293 shmmap[shmid] = shmbuf;
00294
00295
00296 memcpy(shmbuf, (void *)(oumsg.front().getMsg()), siz);
00297
00298
00299 om.reset(oumsg.front().getID(), oumsg.front().getAction());
00300 om.addShmInfo(shmid, siz);
00301
00302
00303 oumsg.pop_front();
00304 }
00305 else
00306 {
00307
00308 om = oumsg.front();
00309 oumsg.pop_front();
00310 }
00311 if (shmdone == false)
00312 LDEBUG("Sending off [%d, %d] to %d",om.getID(),om.getAction(),fd);
00313
00314
00315 if (om.writeHeaderTo(fd) != TCPDONE)
00316 { state = TCPCS_BOGUS; return TCPBUG; }
00317
00318
00319
00320 if (om.getSize() > 0)
00321 state |= TCPCS_WRITING;
00322 }
00323 pthread_mutex_unlock(&mutou);
00324 }
00325
00326
00327
00328 if (state == TCPCS_BOGUS) return TCPBUG;
00329 else if (state & TCPCS_WRITING) return TCPWAITRW;
00330 else return TCPWAITREAD;
00331 }
00332
00333
00334 int TCPcliServ::disconnect()
00335 { reset(); return TCPFINISH; }
00336
00337
00338 int TCPcliServ::reset()
00339 {
00340 state = TCPCS_UNKNOWN;
00341 return TCPDONE;
00342 }
00343
00344
00345 void TCPcliServ::storeReceivedMessage()
00346 {
00347 int shmid, siz;
00348
00349
00350 if (im.checkShmInfo(shmid, siz))
00351 {
00352
00353 char *msgbuf = (char *)shmat(shmid, NULL, 0);
00354 if (msgbuf == (char *)(-1))
00355 IDPLERROR("Cannot attach to shared memory segment");
00356 else
00357 {
00358
00359
00360
00361 TCPmessage insm(im.getID(), im.getAction(), im.getETI(),
00362 msgbuf, siz);
00363
00364
00365 queueIncomingMessage(insm);
00366
00367
00368 TCPmessage ack(insm.getID(), insm.getAction());
00369 ack.addShmDone(shmid);
00370
00371
00372 if (shmdt(msgbuf) == -1)
00373 IDPLERROR("Error detaching Shm segment %d", shmid);
00374
00375
00376 send(ack);
00377 }
00378 }
00379
00380 else if (im.checkShmDone(shmid))
00381 {
00382 std::map<int, void *>::const_iterator m = shmmap.find(shmid);
00383 if (m != shmmap.end())
00384 {
00385
00386 if (shmdt(m->second) == -1)
00387 IDPLERROR("Error detaching Shm segment %d", m->first);
00388
00389
00390 if (shmctl(shmid, IPC_RMID, 0) == -1)
00391 IDPLERROR("Error deleting Shm segment %d", m->first);
00392
00393
00394 shmmap.erase(shmid);
00395 }
00396 else
00397 IDLERROR("Attempt to release unknown Shm %d -- IGNORED", shmid);
00398 }
00399
00400 else
00401 queueIncomingMessage(im);
00402
00403
00404 im.freeMem();
00405 }
00406
00407
00408 void TCPcliServ::queueIncomingMessage(TCPmessage& imsg)
00409 {
00410 pthread_mutex_lock(&mutin);
00411
00412
00413
00414 int xx;
00415 if (inmsgqlen > 0 && inmsg.size() >= inmsgqlen &&
00416 imsg.checkShmDone(xx) == false)
00417 {
00418 if (indroplast)
00419 {
00420
00421 }
00422 else
00423 {
00424
00425 if (inmsg.front().checkShmDone(xx) == false) inmsg.pop_front();
00426 inmsg.push_back(imsg);
00427 }
00428 }
00429 else
00430 inmsg.push_back(imsg);
00431
00432 pthread_mutex_unlock(&mutin);
00433 }
00434
00435
00436
00437
00438
00439