Beowulf.C

Go to the documentation of this file.
00001 /*!@file Beowulf/Beowulf.C Simple interfacing to a Beowulf cluster */
00002 
00003 // //////////////////////////////////////////////////////////////////// //
00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2001 by the //
00005 // University of Southern California (USC) and the iLab at USC.         //
00006 // See http://iLab.usc.edu for information about this project.          //
00007 // //////////////////////////////////////////////////////////////////// //
00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected //
00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency //
00010 // in Visual Environments, and Applications'' by Christof Koch and      //
00011 // Laurent Itti, California Institute of Technology, 2001 (patent       //
00012 // pending; application number 09/912,225 filed July 23, 2001; see      //
00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status).     //
00014 // //////////////////////////////////////////////////////////////////// //
00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit.       //
00016 //                                                                      //
00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can   //
00018 // redistribute it and/or modify it under the terms of the GNU General  //
00019 // Public License as published by the Free Software Foundation; either  //
00020 // version 2 of the License, or (at your option) any later version.     //
00021 //                                                                      //
00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope  //
00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the   //
00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR      //
00025 // PURPOSE.  See the GNU General Public License for more details.       //
00026 //                                                                      //
00027 // You should have received a copy of the GNU General Public License    //
00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write   //
00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,   //
00030 // Boston, MA 02111-1307 USA.                                           //
00031 // //////////////////////////////////////////////////////////////////// //
00032 //
00033 // Primary maintainer for this file: Laurent Itti <itti@usc.edu>
00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Beowulf/Beowulf.C $
00035 // $Id: Beowulf.C 8264 2007-04-17 21:43:10Z rjpeters $
00036 //
00037 
00038 #include "Beowulf/Beowulf.H"
00039 
00040 #include "Beowulf/BeowulfOpts.H"
00041 #include "Component/OptionManager.H"
00042 #include "Util/Assert.H"
00043 #include "Util/MathFunctions.H"
00044 #include "Util/sformat.H"
00045 
00046 #include <cmath>    // for floor()
00047 #include <cstdio>   // for fseek()
00048 #include <limits>
00049 
00050 // ######################################################################
00051 Beowulf::Beowulf(OptionManager& mgr, const std::string& descrName,
00052                  const std::string& tagName, const bool ismaster) :
00053   ModelComponent(mgr, descrName, tagName),
00054   itsSlaveNames(&OPT_BeowulfSlaveNames, this),
00055   isMaster(&OPT_BeowulfMaster, this, ismaster, USE_MY_VAL),
00056   selfqlen(&OPT_BeowulfSelfQlen, this),
00057   selfdroplast(&OPT_BeowulfSelfDropLast, this),
00058   initTimeout(&OPT_BeowulfInitTimeout, this),
00059   com(new TCPcommunicator(mgr)), initialized(false),
00060   itsNodes(), fd2node(NULL), master(0), me(0),
00061   tim(1000000),  // microsecond accuracy
00062   selfmsg()
00063 {
00064   // initialize our self-addressed message queue mutex:
00065   pthread_mutex_init(&mutselfmsg, NULL);
00066 
00067   // make our TCPcommunicator a subcomponent of us:
00068   addSubComponent(com);
00069 
00070   if (isMaster.getVal() == false)
00071     this->unregisterParam(&itsSlaveNames);
00072 }
00073 
00074 // ######################################################################
00075 Beowulf::~Beowulf()
00076 { pthread_mutex_destroy(&mutselfmsg); }
00077 
00078 // ######################################################################
00079 void Beowulf::paramChanged(ModelParamBase* const param,
00080                            const bool valueChanged,
00081                            ParamClient::ChangeStatus* status)
00082 {
00083   ModelComponent::paramChanged(param, valueChanged, status);
00084 
00085   // was that a change of masterhood?
00086   if (param == &isMaster)
00087     {
00088       if (isMaster.getVal())
00089         { me = -1; this->registerOptionedParam(&itsSlaveNames, 0); }
00090       else
00091         { me = 0; this->unregisterParam(&itsSlaveNames); }
00092     }
00093 }
00094 
00095 // ######################################################################
00096 void Beowulf::start2()
00097 {
00098   if (isMaster.getVal()) masterInit(itsSlaveNames.getVal().c_str());
00099   else slaveInit();
00100 }
00101 
00102 // ######################################################################
00103 void Beowulf::stop1()
00104 {
00105   resetConnections(-1);
00106 }
00107 
00108 // ######################################################################
00109 int Beowulf::getNbSlaves() const
00110 {
00111   // we're converting from size_t->int; let's make sure that
00112   // conversion is going to be safe:
00113   ASSERT(itsNodes.size() <= size_t(std::numeric_limits<int>::max()));
00114 
00115   return int(itsNodes.size());
00116 }
00117 
00118 // ######################################################################
00119 int Beowulf::getNodeNumber() const
00120 { return me; }
00121 
00122 // ######################################################################
00123 const char* Beowulf::nodeName(const int nb) const
00124 {
00125   // is it the master?
00126   if (nb == -1) return "BeoMaster";
00127   if (nb < 0 || size_t(nb) >= itsNodes.size())
00128     LFATAL("Node number %d exceeds number of nodes: %"ZU,
00129            nb, itsNodes.size());
00130   return itsNodes[nb].name.c_str();
00131 }
00132 
00133 // ######################################################################
00134 int Beowulf::requestNode()
00135 {
00136   if (started() == false) LFATAL("I am not started");
00137 
00138   for (size_t i = 0; i < itsNodes.size(); ++i)
00139     {
00140       if (itsNodes[i].isAvailable)
00141         {
00142           itsNodes[i].isAvailable = false;
00143           return int(i);
00144         }
00145     }
00146 
00147   LERROR("I am out of nodes -- RETURNING -2");
00148   return -2;
00149 }
00150 
00151 // ######################################################################
00152 void Beowulf::releaseNode(int nodenum)
00153 {
00154   if (nodenum < 0 || size_t(nodenum) >= itsNodes.size())
00155     LERROR("Request to release an invalid node number (%d) -- IGNORING",
00156            nodenum);
00157   else if (itsNodes[nodenum].isAvailable)
00158     LERROR("Request to release an unallocated node (%d) -- IGNORING",
00159            nodenum);
00160   else
00161     itsNodes[nodenum].isAvailable = true;
00162 }
00163 
00164 // ######################################################################
00165 void Beowulf::resetConnections(const int masterfd)
00166 {
00167   itsNodes.resize(0);
00168   if (fd2node) { delete [] fd2node; fd2node = NULL; }
00169   if (masterfd == -1) com->terminateAll();
00170   else com->terminateAllButOne(masterfd);
00171   initialized = false; tim.reset(); selfmsg.clear();
00172 }
00173 
00174 // ######################################################################
00175 void Beowulf::masterInit(const int nb_nodes, char** node_names)
00176 {
00177   resetConnections(-1);  // reset possible existing communications
00178   master = -1;  // I have no master, so I am the master
00179   me = -1;      // yes, I am the master
00180 
00181   // setup table of all node nostnames:
00182   itsNodes.resize(nb_nodes);
00183 
00184   // setup initial communication with all nodes:
00185   fd2node = new int[FD_SETSIZE];
00186   for (int i = 0; i < FD_SETSIZE; i ++) fd2node[i] = -1;
00187 
00188   for (size_t i = 0; i < itsNodes.size(); ++i)
00189     {
00190       itsNodes[i].name = node_names[i];
00191       itsNodes[i].fd = com->contact(itsNodes[i].name.c_str());
00192       LDEBUG("Linked node %"ZU" to fd %d", i, itsNodes[i].fd);
00193       if (itsNodes[i].fd == -1)
00194         LFATAL("Failed contacting %s", itsNodes[i].name.c_str());
00195       fd2node[itsNodes[i].fd] = i;
00196 
00197       // setup initial ETIs. We assume that all nodes are idle:
00198       itsNodes[i].ETI = 0.0F;
00199       itsNodes[i].ETIreceived = 0.0F;
00200     }
00201 
00202   tim.reset();
00203 
00204   // now, send an initial message to each node that tells them what the
00205   // array of slaves is, so that they can allocate memory for it and
00206   // initialize it:
00207   for (size_t i = 0; i < itsNodes.size(); ++i)
00208     {
00209       // send a message with the names of all nodes and the index of the
00210       // node of interest:
00211       TCPmessage msg(0, BEO_INIT);
00212       std::string buf = sformat("%"ZU" %"ZU" ", itsNodes.size(), i);
00213       for (size_t j = 0; j < itsNodes.size(); ++j)
00214         { buf += itsNodes[j].name; buf += " "; }
00215       msg.addString(buf.c_str());
00216       com->send(itsNodes[i].fd, msg);
00217     }
00218 
00219   // wait for a message of type INIT from each node, acknowledging that they
00220   // have initialized their array of peers:
00221   size_t nodeok = 0;
00222   LDEBUG("Waiting for INIT from each node");
00223   Timer initTimer;
00224   while (nodeok < itsNodes.size() &&
00225          (initTimeout.getVal() <= 0.0 ||
00226           initTimer.getSecs() <= initTimeout.getVal()))
00227     {
00228       TCPmessage msg; int rfd = -1;  // receive from any node
00229       if (com->receive(rfd, msg, 5))  // block up to 5ms
00230         {
00231           if (msg.getAction() != BEO_INIT)
00232             LERROR("Bogus message [%d, %d] ignored!",
00233                    msg.getID(), msg.getAction());
00234           else
00235             nodeok ++;
00236         }
00237     }
00238   if (nodeok != itsNodes.size())
00239     LFATAL("Timeout while waiting for INIT from each node");
00240 
00241   // now, send an order to each node to contact its peers:
00242   TCPmessage msg(0, BEO_INIT2);
00243   for (size_t i = 0; i < itsNodes.size(); ++i)
00244     com->send(itsNodes[i].fd, msg);
00245 
00246   // wait for a message of type INIT3 from each node, acknowledging that they
00247   // have contacted their peers:
00248   nodeok = 0;
00249   LDEBUG("Waiting for INIT3 from each node");
00250   while (nodeok < itsNodes.size() &&
00251          (initTimeout.getVal() <= 0.0 ||
00252           initTimer.getSecs() <= initTimeout.getVal()))
00253     {
00254       TCPmessage msg; int rfd = -1;  // receive from any node
00255       if (com->receive(rfd, msg, 5))  // block up to 5ms
00256         {
00257           if (msg.getAction() != BEO_INIT3)
00258             LERROR("Bogus message [%d, %d] ignored!",
00259                    msg.getID(), msg.getAction());
00260           else
00261             {
00262               nodeok ++;
00263               LINFO("Slave node %d [%s] is configured.", msg.getID(),
00264                     itsNodes[msg.getID()].name.c_str());
00265             }
00266         }
00267     }
00268 
00269   if (nodeok != itsNodes.size())
00270     LFATAL("Timeout while waiting for INIT3 from each node");
00271 
00272   // ready to transmit/receive messages:
00273   initialized = true;
00274   LINFO("Initialization complete -- all slaves ready!");
00275 }
00276 
00277 // ######################################################################
00278 void Beowulf::masterInit(const char* node_names)
00279 {
00280   ASSERT(node_names);
00281   int nbn = 0;
00282   typedef char* charptr; char **nn;
00283 
00284   if (node_names[0] == '/') // file that contains the node list
00285     {
00286       // count how many nodes:
00287       LINFO("Using Beowulf slaves from file %s", node_names);
00288       char line[1024];
00289       FILE *fil = fopen(node_names, "r");
00290       if (fil == NULL) LFATAL("Cannot open slave node file %s", node_names);
00291       while(fgets(line, 1024, fil)) nbn ++;
00292 
00293       // create list of node names:
00294       fseek(fil, 0, SEEK_SET); // back to start of file
00295       int no = 0;
00296       nn = new charptr[nbn];
00297       while(fgets(line, 1024, fil))
00298         {
00299           line[strlen(line) - 1] = '\0';  // eliminate LF
00300           nn[no] = new char[strlen(line) + 1];
00301           strcpy(nn[no], line);
00302           no ++;
00303         }
00304       fclose(fil);
00305     }
00306   else
00307     {                       // node list as n01:9567,n02:9567, ...
00308       LINFO("Using Beowulf slaves %s", node_names);
00309       // count how many nodes:
00310       int idx = 0, slen = strlen(node_names);
00311       while(idx < slen) {
00312         if (node_names[idx] == ' ' || node_names[idx] == ',' ||
00313             idx == slen - 1) nbn ++;
00314         idx ++;
00315       }
00316 
00317       // create list of node names:
00318       nn = new charptr[nbn];
00319       idx = 0; int idx2 = 0, no = 0;
00320       while(idx <= slen) {
00321         if (idx == slen || node_names[idx] == ' ' || node_names[idx] == ',') {
00322           nn[no] = new char[idx - idx2 + 1];
00323           strncpy(nn[no], node_names + idx2, idx - idx2);
00324           nn[no][idx - idx2] = '\0';
00325           no ++; idx2 = idx + 1;
00326         }
00327         idx ++;
00328       }
00329     }
00330 
00331   // do the initialization:
00332   masterInit(nbn, nn);
00333 
00334   // delete local vars:
00335   for (int i = 0; i < nbn; i ++) delete [] nn[i];
00336   delete [] nn;
00337 }
00338 
00339 // ######################################################################
00340 void Beowulf::slaveInit()
00341 {
00342   size_t nodeok = 1;
00343   LINFO("waiting for master...");
00344 
00345   // wait for my master to contact me, then contact other nodes and tell
00346   // master when I am ready:
00347   while(initialized == false)
00348     {
00349       TCPmessage msg; int rfd = -1;  // receive from any open socket
00350       if (com->receive(rfd, msg, 5))  // block up to 5ms
00351         {
00352           int32 id = msg.getID();
00353           int32 action = msg.getAction();
00354           //LDEBUG("Received message [%d, %d] from fd=%d", id, action, rfd);
00355           switch(action)
00356             {
00357             case BEO_INIT: // ##############################
00358               {
00359                 // this message comes from our master:
00360                 master = rfd;
00361 
00362                 // start the (re)-initialization:
00363                 slaveReInit(msg);
00364               }
00365               break;
00366             case BEO_INIT2:
00367               {
00368                 // ok, now all the slaves should know what the array
00369                 // of slaves is.  Let's contact our peers: send a
00370                 // message of type INIT3 to all nodes with number
00371                 // above mine:
00372                 TCPmessage msg2(me, BEO_INIT3); nodeok = 1;
00373                 for (size_t i = me + 1; i < itsNodes.size(); ++i)
00374                   {
00375                     itsNodes[i].fd =
00376                       com->contact(itsNodes[i].name.c_str());
00377                     if (itsNodes[i].fd == -1)
00378                       LFATAL("Failed contacting %s",
00379                              itsNodes[i].name.c_str());
00380                     fd2node[itsNodes[i].fd] = i;
00381                     com->send(itsNodes[i].fd, msg2);
00382                     nodeok ++; // node[i] is initialized and ok
00383                   }
00384                 // have we contacted everybody we needed to?
00385                 if (nodeok == itsNodes.size()) initialized = true;
00386               }
00387               break;
00388             case BEO_INIT3: // ##############################
00389               {
00390                 // we received a message that will allow us to fill up
00391                 // blanks in our node[] array:
00392                 ASSERT(id >= 0 && size_t(id) < itsNodes.size());
00393                 itsNodes[id].fd = rfd; fd2node[rfd] = id;
00394                 nodeok ++;
00395 
00396                 // have we contacted everybody we needed to?
00397                 if (nodeok == itsNodes.size()) initialized = true;
00398               }
00399               break;
00400             default: // ##############################
00401               LERROR("Bogus action %d -- IGNORING.", action);
00402               break;
00403             }
00404         }
00405     }
00406 
00407   // ok, we are ready; tell master:
00408   TCPmessage msg(me, BEO_INIT3);
00409   com->send(master, msg);
00410   LINFO("Initialization complete -- all connections ready!");
00411 
00412   ASSERT(me >= 0); // because we are a slave
00413 
00414   std::string xx;
00415 
00416   for (size_t i = 0; i < itsNodes.size(); ++i)
00417     if (i == size_t(me)) xx += "me ";
00418     else { xx += sformat("%02d ", itsNodes[i].fd); }
00419   LINFO("NODES = [ %s]", xx.c_str());
00420 }
00421 
00422 // ######################################################################
00423 void Beowulf::slaveReInit(TCPmessage& rmsg)
00424 {
00425   // kill all existing communications except towards master:
00426   resetConnections(master);
00427 
00428   // decode the message:
00429   char* buf2;
00430   char* buf3;
00431   const std::string buf = rmsg.getElementString();
00432   const long nbnode = strtol(buf.c_str(), &buf2, 10); buf2 ++;
00433   me = strtol(buf2, &buf3, 10); buf3 ++;
00434   itsNodes.resize(nbnode);
00435   fd2node = new int[FD_SETSIZE];
00436   for (int i = 0; i < FD_SETSIZE; i ++) fd2node[i] = -1;
00437 
00438   for (size_t i = 0; i < itsNodes.size(); ++i)
00439     {
00440       itsNodes[i].fd = -1;  // no known node here yet...
00441       buf2 = buf3; while(*buf3 != ' ' && *buf3 != '\0') buf3 ++;
00442       *buf3++ = '\0';
00443       itsNodes[i].name = buf2;
00444     }
00445 
00446   // send an acknowledgment to master:
00447   LINFO("INIT with %"ZU" nodes, me = %d [%s]",
00448         itsNodes.size(), me, itsNodes[me].name.c_str());
00449   TCPmessage msg(me, BEO_INIT);
00450   com->send(master, msg);
00451 }
00452 
00453 // ######################################################################
00454 void Beowulf::send(const int node_nb, TCPmessage& msg)
00455 {
00456   ASSERT(initialized);
00457   ASSERT(node_nb == -1 ||
00458          (node_nb >= 0 && size_t(node_nb) < itsNodes.size()));
00459   if (node_nb == me)       // send to ourselves?
00460     {
00461       LDEBUG("Sending msg [%d, %d] to myself", msg.getID(), msg.getAction());
00462       if (selfqlen.getVal() > 0 && int(selfmsg.size()) >= selfqlen.getVal())
00463         {
00464           if (selfdroplast.getVal())
00465             LERROR("Self-message queue full -- DROPPING MOST RECENT MESSAGE");
00466           else
00467             {
00468               LERROR("Self-message queue full -- DROPPING LEAST RECENT "
00469                      "MESSAGE");
00470               pthread_mutex_lock(&mutselfmsg);
00471               selfmsg.pop_front();
00472               selfmsg.push_back(msg);
00473               pthread_mutex_unlock(&mutselfmsg);
00474             }
00475         }
00476       else
00477         {
00478           pthread_mutex_lock(&mutselfmsg);
00479           selfmsg.push_back(msg);
00480           pthread_mutex_unlock(&mutselfmsg);
00481         }
00482     }
00483   else if (node_nb == -1)  // want to send to my master
00484     {
00485       LDEBUG("Sending msg [%d, %d] to master", msg.getID(), msg.getAction());
00486       com->send(master, msg);
00487     }
00488   else
00489     {
00490       ASSERT(node_nb >= 0 && size_t(node_nb) < itsNodes.size());
00491       LDEBUG("Sending msg [%d, %d] to node %d [%s] [%d]", msg.getID(),
00492              msg.getAction(), node_nb,
00493              itsNodes[node_nb].name.c_str(), itsNodes[node_nb].fd);
00494       com->send(itsNodes[node_nb].fd, msg);
00495     }
00496 }
00497 
00498 // ######################################################################
00499 void Beowulf::send(TCPmessage& msg)
00500 {
00501   ASSERT(initialized);
00502   // this is the load-balanced send. Only works if we are master:
00503   ASSERT(me == -1);
00504 
00505   // find out least ETI delta; what counts here is ETIreceived + ETI:
00506   int minnode[itsNodes.size()]; // nodes that have the smallest ETI
00507   int nmin = 0;  // how many nodes have the smallest ETI?
00508   float mindiff = 1.0e30F;
00509   // std::string load;
00510   for (size_t i = 0; i < itsNodes.size(); ++i)
00511     {
00512       if (itsNodes[i].ETIreceived < 0.0F)
00513         { /* load += "- ";*/ continue; } // don't know about this node
00514       float diff = itsNodes[i].ETI + itsNodes[i].ETIreceived;
00515       if (diff < mindiff) { mindiff = diff; nmin = 1; minnode[0] = i; }
00516       else if (diff == mindiff) minnode[nmin++] = i;
00517       // load += sformat("%.1f ", diff * 1000.0F);
00518     }
00519   // LINFO("LOAD = [ %s]", load.c_str());
00520 
00521   int node_nb = -1;
00522   if (nmin == 0)  // don't know about load of any node -> random pick
00523     node_nb = int(floor(itsNodes.size() * randomDouble()));
00524   else            // pick one of our mindiff nodes and send to it
00525     node_nb = minnode[int(floor(nmin * randomDouble()))];
00526 
00527   ASSERT(node_nb >= 0 && size_t(node_nb) < itsNodes.size());
00528   LDEBUG("Sending msg [%d, %d] to least-loaded node %d [%s] [%d]",
00529          msg.getID(), msg.getAction(), node_nb,
00530          itsNodes[node_nb].name.c_str(), itsNodes[node_nb].fd);
00531   com->send(itsNodes[node_nb].fd, msg);
00532   itsNodes[node_nb].ETIreceived = -1.0F; // don't know about this node anymore
00533 }
00534 
00535 // ######################################################################
00536 bool Beowulf::receive(int& node_nb, TCPmessage& msg, int32& frame,
00537                       int32& action, const int timeout, int* err)
00538 {
00539   ASSERT(initialized);
00540 
00541   // our self-sent messages have priority, let's deal with them first:
00542   if (node_nb == me || node_nb == -1) // receive from myself or anybody
00543     {
00544       if (selfmsg.size() > 0)
00545         {
00546           pthread_mutex_lock(&mutselfmsg);
00547           msg = selfmsg.front(); selfmsg.pop_front();
00548           pthread_mutex_unlock(&mutselfmsg);
00549           frame = msg.getID(); action = msg.getAction(); node_nb = me;
00550           LDEBUG("Received msg [%d, %d] from myself", frame, action);
00551           return true;
00552         }
00553       else if (node_nb == me && me != -1)
00554         return false; // nothing received from myself and I am not master
00555     }
00556 
00557   ASSERT(node_nb == -1 ||
00558          (node_nb >= 0 && size_t(node_nb) < itsNodes.size()));
00559 
00560   int rfd;
00561   if (node_nb == -1) rfd = -1;  // receive from any node
00562   else rfd = itsNodes[node_nb].fd; // receive from given node
00563   bool rec = com->receive(rfd, msg, timeout, err);
00564   if (rec)
00565     {
00566       frame = msg.getID(); action = msg.getAction(); node_nb = fd2node[rfd];
00567       if (node_nb == -1)
00568         LDEBUG("Received msg [%d, %d] from master", frame, action);
00569       else
00570         LDEBUG("Received msg [%d, %d] from node %d [%s]", frame, action,
00571                node_nb, itsNodes[node_nb].name.c_str());
00572 
00573       // is this message an order for re-initialization?
00574       if (action == BEO_INIT && initialized)
00575         {
00576           LINFO("RE-INIT order received -- Starting re-initialization...");
00577           // whoever re-inits me is my new master:
00578           master = rfd;
00579           // start a re-init based on message we just received:
00580           slaveReInit(msg);
00581           // finish up re-init:
00582           slaveInit();
00583           // we still return the message to let our user know we got re-inited
00584         }
00585 
00586       // if we are the master, update that node's ETI:
00587       if (me == -1)
00588         {
00589           if (node_nb < 0 || size_t(node_nb) >= itsNodes.size())
00590             LERROR("Bogus node number %d - IGNORED", node_nb);
00591           else
00592             {
00593               itsNodes[node_nb].ETI = msg.getETI();
00594               itsNodes[node_nb].ETIreceived = tim.getSecs();
00595             }
00596         }
00597     }
00598   else
00599     { frame = -1; action = -1; }
00600 
00601   return rec;
00602 }
00603 
00604 // ######################################################################
00605 int Beowulf::nbReceived(const int node_nb)
00606 {
00607   if (node_nb == -2)  // any node
00608     return com->nbReceived(-1);
00609   if (node_nb == -1)  // only master
00610     {
00611       if (me == -1) LFATAL("Hey, I am the master!");
00612       return com->nbReceived(master);
00613     }
00614   if (node_nb < 0 || size_t(node_nb) >= itsNodes.size())
00615     LFATAL("Node number %d out of range [-2 .. %"ZU"]",
00616            node_nb, itsNodes.size());
00617   return com->nbReceived(itsNodes[node_nb].fd);
00618 }
00619 
00620 
00621 // ######################################################################
00622 /* So things look consistent in everyone's emacs... */
00623 /* Local Variables: */
00624 /* indent-tabs-mode: nil */
00625 /* End: */
Generated on Sun May 8 08:40:20 2011 for iLab Neuromorphic Vision Toolkit by  doxygen 1.6.3