00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include "Beowulf/Beowulf.H"
00039
00040 #include "Beowulf/BeowulfOpts.H"
00041 #include "Component/OptionManager.H"
00042 #include "Util/Assert.H"
00043 #include "Util/MathFunctions.H"
00044 #include "Util/sformat.H"
00045
00046 #include <cmath>
00047 #include <cstdio>
00048 #include <limits>
00049
00050
00051 Beowulf::Beowulf(OptionManager& mgr, const std::string& descrName,
00052 const std::string& tagName, const bool ismaster) :
00053 ModelComponent(mgr, descrName, tagName),
00054 itsSlaveNames(&OPT_BeowulfSlaveNames, this),
00055 isMaster(&OPT_BeowulfMaster, this, ismaster, USE_MY_VAL),
00056 selfqlen(&OPT_BeowulfSelfQlen, this),
00057 selfdroplast(&OPT_BeowulfSelfDropLast, this),
00058 initTimeout(&OPT_BeowulfInitTimeout, this),
00059 com(new TCPcommunicator(mgr)), initialized(false),
00060 itsNodes(), fd2node(NULL), master(0), me(0),
00061 tim(1000000),
00062 selfmsg()
00063 {
00064
00065 pthread_mutex_init(&mutselfmsg, NULL);
00066
00067
00068 addSubComponent(com);
00069
00070 if (isMaster.getVal() == false)
00071 this->unregisterParam(&itsSlaveNames);
00072 }
00073
00074
00075 Beowulf::~Beowulf()
00076 { pthread_mutex_destroy(&mutselfmsg); }
00077
00078
00079 void Beowulf::paramChanged(ModelParamBase* const param,
00080 const bool valueChanged,
00081 ParamClient::ChangeStatus* status)
00082 {
00083 ModelComponent::paramChanged(param, valueChanged, status);
00084
00085
00086 if (param == &isMaster)
00087 {
00088 if (isMaster.getVal())
00089 { me = -1; this->registerOptionedParam(&itsSlaveNames, 0); }
00090 else
00091 { me = 0; this->unregisterParam(&itsSlaveNames); }
00092 }
00093 }
00094
00095
00096 void Beowulf::start2()
00097 {
00098 if (isMaster.getVal()) masterInit(itsSlaveNames.getVal().c_str());
00099 else slaveInit();
00100 }
00101
00102
00103 void Beowulf::stop1()
00104 {
00105 resetConnections(-1);
00106 }
00107
00108
00109 int Beowulf::getNbSlaves() const
00110 {
00111
00112
00113 ASSERT(itsNodes.size() <= size_t(std::numeric_limits<int>::max()));
00114
00115 return int(itsNodes.size());
00116 }
00117
00118
00119 int Beowulf::getNodeNumber() const
00120 { return me; }
00121
00122
00123 const char* Beowulf::nodeName(const int nb) const
00124 {
00125
00126 if (nb == -1) return "BeoMaster";
00127 if (nb < 0 || size_t(nb) >= itsNodes.size())
00128 LFATAL("Node number %d exceeds number of nodes: %"ZU,
00129 nb, itsNodes.size());
00130 return itsNodes[nb].name.c_str();
00131 }
00132
00133
00134 int Beowulf::requestNode()
00135 {
00136 if (started() == false) LFATAL("I am not started");
00137
00138 for (size_t i = 0; i < itsNodes.size(); ++i)
00139 {
00140 if (itsNodes[i].isAvailable)
00141 {
00142 itsNodes[i].isAvailable = false;
00143 return int(i);
00144 }
00145 }
00146
00147 LERROR("I am out of nodes -- RETURNING -2");
00148 return -2;
00149 }
00150
00151
00152 void Beowulf::releaseNode(int nodenum)
00153 {
00154 if (nodenum < 0 || size_t(nodenum) >= itsNodes.size())
00155 LERROR("Request to release an invalid node number (%d) -- IGNORING",
00156 nodenum);
00157 else if (itsNodes[nodenum].isAvailable)
00158 LERROR("Request to release an unallocated node (%d) -- IGNORING",
00159 nodenum);
00160 else
00161 itsNodes[nodenum].isAvailable = true;
00162 }
00163
00164
00165 void Beowulf::resetConnections(const int masterfd)
00166 {
00167 itsNodes.resize(0);
00168 if (fd2node) { delete [] fd2node; fd2node = NULL; }
00169 if (masterfd == -1) com->terminateAll();
00170 else com->terminateAllButOne(masterfd);
00171 initialized = false; tim.reset(); selfmsg.clear();
00172 }
00173
00174
00175 void Beowulf::masterInit(const int nb_nodes, char** node_names)
00176 {
00177 resetConnections(-1);
00178 master = -1;
00179 me = -1;
00180
00181
00182 itsNodes.resize(nb_nodes);
00183
00184
00185 fd2node = new int[FD_SETSIZE];
00186 for (int i = 0; i < FD_SETSIZE; i ++) fd2node[i] = -1;
00187
00188 for (size_t i = 0; i < itsNodes.size(); ++i)
00189 {
00190 itsNodes[i].name = node_names[i];
00191 itsNodes[i].fd = com->contact(itsNodes[i].name.c_str());
00192 LDEBUG("Linked node %"ZU" to fd %d", i, itsNodes[i].fd);
00193 if (itsNodes[i].fd == -1)
00194 LFATAL("Failed contacting %s", itsNodes[i].name.c_str());
00195 fd2node[itsNodes[i].fd] = i;
00196
00197
00198 itsNodes[i].ETI = 0.0F;
00199 itsNodes[i].ETIreceived = 0.0F;
00200 }
00201
00202 tim.reset();
00203
00204
00205
00206
00207 for (size_t i = 0; i < itsNodes.size(); ++i)
00208 {
00209
00210
00211 TCPmessage msg(0, BEO_INIT);
00212 std::string buf = sformat("%"ZU" %"ZU" ", itsNodes.size(), i);
00213 for (size_t j = 0; j < itsNodes.size(); ++j)
00214 { buf += itsNodes[j].name; buf += " "; }
00215 msg.addString(buf.c_str());
00216 com->send(itsNodes[i].fd, msg);
00217 }
00218
00219
00220
00221 size_t nodeok = 0;
00222 LDEBUG("Waiting for INIT from each node");
00223 Timer initTimer;
00224 while (nodeok < itsNodes.size() &&
00225 (initTimeout.getVal() <= 0.0 ||
00226 initTimer.getSecs() <= initTimeout.getVal()))
00227 {
00228 TCPmessage msg; int rfd = -1;
00229 if (com->receive(rfd, msg, 5))
00230 {
00231 if (msg.getAction() != BEO_INIT)
00232 LERROR("Bogus message [%d, %d] ignored!",
00233 msg.getID(), msg.getAction());
00234 else
00235 nodeok ++;
00236 }
00237 }
00238 if (nodeok != itsNodes.size())
00239 LFATAL("Timeout while waiting for INIT from each node");
00240
00241
00242 TCPmessage msg(0, BEO_INIT2);
00243 for (size_t i = 0; i < itsNodes.size(); ++i)
00244 com->send(itsNodes[i].fd, msg);
00245
00246
00247
00248 nodeok = 0;
00249 LDEBUG("Waiting for INIT3 from each node");
00250 while (nodeok < itsNodes.size() &&
00251 (initTimeout.getVal() <= 0.0 ||
00252 initTimer.getSecs() <= initTimeout.getVal()))
00253 {
00254 TCPmessage msg; int rfd = -1;
00255 if (com->receive(rfd, msg, 5))
00256 {
00257 if (msg.getAction() != BEO_INIT3)
00258 LERROR("Bogus message [%d, %d] ignored!",
00259 msg.getID(), msg.getAction());
00260 else
00261 {
00262 nodeok ++;
00263 LINFO("Slave node %d [%s] is configured.", msg.getID(),
00264 itsNodes[msg.getID()].name.c_str());
00265 }
00266 }
00267 }
00268
00269 if (nodeok != itsNodes.size())
00270 LFATAL("Timeout while waiting for INIT3 from each node");
00271
00272
00273 initialized = true;
00274 LINFO("Initialization complete -- all slaves ready!");
00275 }
00276
00277
00278 void Beowulf::masterInit(const char* node_names)
00279 {
00280 ASSERT(node_names);
00281 int nbn = 0;
00282 typedef char* charptr; char **nn;
00283
00284 if (node_names[0] == '/')
00285 {
00286
00287 LINFO("Using Beowulf slaves from file %s", node_names);
00288 char line[1024];
00289 FILE *fil = fopen(node_names, "r");
00290 if (fil == NULL) LFATAL("Cannot open slave node file %s", node_names);
00291 while(fgets(line, 1024, fil)) nbn ++;
00292
00293
00294 fseek(fil, 0, SEEK_SET);
00295 int no = 0;
00296 nn = new charptr[nbn];
00297 while(fgets(line, 1024, fil))
00298 {
00299 line[strlen(line) - 1] = '\0';
00300 nn[no] = new char[strlen(line) + 1];
00301 strcpy(nn[no], line);
00302 no ++;
00303 }
00304 fclose(fil);
00305 }
00306 else
00307 {
00308 LINFO("Using Beowulf slaves %s", node_names);
00309
00310 int idx = 0, slen = strlen(node_names);
00311 while(idx < slen) {
00312 if (node_names[idx] == ' ' || node_names[idx] == ',' ||
00313 idx == slen - 1) nbn ++;
00314 idx ++;
00315 }
00316
00317
00318 nn = new charptr[nbn];
00319 idx = 0; int idx2 = 0, no = 0;
00320 while(idx <= slen) {
00321 if (idx == slen || node_names[idx] == ' ' || node_names[idx] == ',') {
00322 nn[no] = new char[idx - idx2 + 1];
00323 strncpy(nn[no], node_names + idx2, idx - idx2);
00324 nn[no][idx - idx2] = '\0';
00325 no ++; idx2 = idx + 1;
00326 }
00327 idx ++;
00328 }
00329 }
00330
00331
00332 masterInit(nbn, nn);
00333
00334
00335 for (int i = 0; i < nbn; i ++) delete [] nn[i];
00336 delete [] nn;
00337 }
00338
00339
00340 void Beowulf::slaveInit()
00341 {
00342 size_t nodeok = 1;
00343 LINFO("waiting for master...");
00344
00345
00346
00347 while(initialized == false)
00348 {
00349 TCPmessage msg; int rfd = -1;
00350 if (com->receive(rfd, msg, 5))
00351 {
00352 int32 id = msg.getID();
00353 int32 action = msg.getAction();
00354
00355 switch(action)
00356 {
00357 case BEO_INIT:
00358 {
00359
00360 master = rfd;
00361
00362
00363 slaveReInit(msg);
00364 }
00365 break;
00366 case BEO_INIT2:
00367 {
00368
00369
00370
00371
00372 TCPmessage msg2(me, BEO_INIT3); nodeok = 1;
00373 for (size_t i = me + 1; i < itsNodes.size(); ++i)
00374 {
00375 itsNodes[i].fd =
00376 com->contact(itsNodes[i].name.c_str());
00377 if (itsNodes[i].fd == -1)
00378 LFATAL("Failed contacting %s",
00379 itsNodes[i].name.c_str());
00380 fd2node[itsNodes[i].fd] = i;
00381 com->send(itsNodes[i].fd, msg2);
00382 nodeok ++;
00383 }
00384
00385 if (nodeok == itsNodes.size()) initialized = true;
00386 }
00387 break;
00388 case BEO_INIT3:
00389 {
00390
00391
00392 ASSERT(id >= 0 && size_t(id) < itsNodes.size());
00393 itsNodes[id].fd = rfd; fd2node[rfd] = id;
00394 nodeok ++;
00395
00396
00397 if (nodeok == itsNodes.size()) initialized = true;
00398 }
00399 break;
00400 default:
00401 LERROR("Bogus action %d -- IGNORING.", action);
00402 break;
00403 }
00404 }
00405 }
00406
00407
00408 TCPmessage msg(me, BEO_INIT3);
00409 com->send(master, msg);
00410 LINFO("Initialization complete -- all connections ready!");
00411
00412 ASSERT(me >= 0);
00413
00414 std::string xx;
00415
00416 for (size_t i = 0; i < itsNodes.size(); ++i)
00417 if (i == size_t(me)) xx += "me ";
00418 else { xx += sformat("%02d ", itsNodes[i].fd); }
00419 LINFO("NODES = [ %s]", xx.c_str());
00420 }
00421
00422
00423 void Beowulf::slaveReInit(TCPmessage& rmsg)
00424 {
00425
00426 resetConnections(master);
00427
00428
00429 char* buf2;
00430 char* buf3;
00431 const std::string buf = rmsg.getElementString();
00432 const long nbnode = strtol(buf.c_str(), &buf2, 10); buf2 ++;
00433 me = strtol(buf2, &buf3, 10); buf3 ++;
00434 itsNodes.resize(nbnode);
00435 fd2node = new int[FD_SETSIZE];
00436 for (int i = 0; i < FD_SETSIZE; i ++) fd2node[i] = -1;
00437
00438 for (size_t i = 0; i < itsNodes.size(); ++i)
00439 {
00440 itsNodes[i].fd = -1;
00441 buf2 = buf3; while(*buf3 != ' ' && *buf3 != '\0') buf3 ++;
00442 *buf3++ = '\0';
00443 itsNodes[i].name = buf2;
00444 }
00445
00446
00447 LINFO("INIT with %"ZU" nodes, me = %d [%s]",
00448 itsNodes.size(), me, itsNodes[me].name.c_str());
00449 TCPmessage msg(me, BEO_INIT);
00450 com->send(master, msg);
00451 }
00452
00453
00454 void Beowulf::send(const int node_nb, TCPmessage& msg)
00455 {
00456 ASSERT(initialized);
00457 ASSERT(node_nb == -1 ||
00458 (node_nb >= 0 && size_t(node_nb) < itsNodes.size()));
00459 if (node_nb == me)
00460 {
00461 LDEBUG("Sending msg [%d, %d] to myself", msg.getID(), msg.getAction());
00462 if (selfqlen.getVal() > 0 && int(selfmsg.size()) >= selfqlen.getVal())
00463 {
00464 if (selfdroplast.getVal())
00465 LERROR("Self-message queue full -- DROPPING MOST RECENT MESSAGE");
00466 else
00467 {
00468 LERROR("Self-message queue full -- DROPPING LEAST RECENT "
00469 "MESSAGE");
00470 pthread_mutex_lock(&mutselfmsg);
00471 selfmsg.pop_front();
00472 selfmsg.push_back(msg);
00473 pthread_mutex_unlock(&mutselfmsg);
00474 }
00475 }
00476 else
00477 {
00478 pthread_mutex_lock(&mutselfmsg);
00479 selfmsg.push_back(msg);
00480 pthread_mutex_unlock(&mutselfmsg);
00481 }
00482 }
00483 else if (node_nb == -1)
00484 {
00485 LDEBUG("Sending msg [%d, %d] to master", msg.getID(), msg.getAction());
00486 com->send(master, msg);
00487 }
00488 else
00489 {
00490 ASSERT(node_nb >= 0 && size_t(node_nb) < itsNodes.size());
00491 LDEBUG("Sending msg [%d, %d] to node %d [%s] [%d]", msg.getID(),
00492 msg.getAction(), node_nb,
00493 itsNodes[node_nb].name.c_str(), itsNodes[node_nb].fd);
00494 com->send(itsNodes[node_nb].fd, msg);
00495 }
00496 }
00497
00498
00499 void Beowulf::send(TCPmessage& msg)
00500 {
00501 ASSERT(initialized);
00502
00503 ASSERT(me == -1);
00504
00505
00506 int minnode[itsNodes.size()];
00507 int nmin = 0;
00508 float mindiff = 1.0e30F;
00509
00510 for (size_t i = 0; i < itsNodes.size(); ++i)
00511 {
00512 if (itsNodes[i].ETIreceived < 0.0F)
00513 { continue; }
00514 float diff = itsNodes[i].ETI + itsNodes[i].ETIreceived;
00515 if (diff < mindiff) { mindiff = diff; nmin = 1; minnode[0] = i; }
00516 else if (diff == mindiff) minnode[nmin++] = i;
00517
00518 }
00519
00520
00521 int node_nb = -1;
00522 if (nmin == 0)
00523 node_nb = int(floor(itsNodes.size() * randomDouble()));
00524 else
00525 node_nb = minnode[int(floor(nmin * randomDouble()))];
00526
00527 ASSERT(node_nb >= 0 && size_t(node_nb) < itsNodes.size());
00528 LDEBUG("Sending msg [%d, %d] to least-loaded node %d [%s] [%d]",
00529 msg.getID(), msg.getAction(), node_nb,
00530 itsNodes[node_nb].name.c_str(), itsNodes[node_nb].fd);
00531 com->send(itsNodes[node_nb].fd, msg);
00532 itsNodes[node_nb].ETIreceived = -1.0F;
00533 }
00534
00535
00536 bool Beowulf::receive(int& node_nb, TCPmessage& msg, int32& frame,
00537 int32& action, const int timeout, int* err)
00538 {
00539 ASSERT(initialized);
00540
00541
00542 if (node_nb == me || node_nb == -1)
00543 {
00544 if (selfmsg.size() > 0)
00545 {
00546 pthread_mutex_lock(&mutselfmsg);
00547 msg = selfmsg.front(); selfmsg.pop_front();
00548 pthread_mutex_unlock(&mutselfmsg);
00549 frame = msg.getID(); action = msg.getAction(); node_nb = me;
00550 LDEBUG("Received msg [%d, %d] from myself", frame, action);
00551 return true;
00552 }
00553 else if (node_nb == me && me != -1)
00554 return false;
00555 }
00556
00557 ASSERT(node_nb == -1 ||
00558 (node_nb >= 0 && size_t(node_nb) < itsNodes.size()));
00559
00560 int rfd;
00561 if (node_nb == -1) rfd = -1;
00562 else rfd = itsNodes[node_nb].fd;
00563 bool rec = com->receive(rfd, msg, timeout, err);
00564 if (rec)
00565 {
00566 frame = msg.getID(); action = msg.getAction(); node_nb = fd2node[rfd];
00567 if (node_nb == -1)
00568 LDEBUG("Received msg [%d, %d] from master", frame, action);
00569 else
00570 LDEBUG("Received msg [%d, %d] from node %d [%s]", frame, action,
00571 node_nb, itsNodes[node_nb].name.c_str());
00572
00573
00574 if (action == BEO_INIT && initialized)
00575 {
00576 LINFO("RE-INIT order received -- Starting re-initialization...");
00577
00578 master = rfd;
00579
00580 slaveReInit(msg);
00581
00582 slaveInit();
00583
00584 }
00585
00586
00587 if (me == -1)
00588 {
00589 if (node_nb < 0 || size_t(node_nb) >= itsNodes.size())
00590 LERROR("Bogus node number %d - IGNORED", node_nb);
00591 else
00592 {
00593 itsNodes[node_nb].ETI = msg.getETI();
00594 itsNodes[node_nb].ETIreceived = tim.getSecs();
00595 }
00596 }
00597 }
00598 else
00599 { frame = -1; action = -1; }
00600
00601 return rec;
00602 }
00603
00604
00605 int Beowulf::nbReceived(const int node_nb)
00606 {
00607 if (node_nb == -2)
00608 return com->nbReceived(-1);
00609 if (node_nb == -1)
00610 {
00611 if (me == -1) LFATAL("Hey, I am the master!");
00612 return com->nbReceived(master);
00613 }
00614 if (node_nb < 0 || size_t(node_nb) >= itsNodes.size())
00615 LFATAL("Node number %d out of range [-2 .. %"ZU"]",
00616 node_nb, itsNodes.size());
00617 return com->nbReceived(itsNodes[node_nb].fd);
00618 }
00619
00620
00621
00622
00623
00624
00625