app-TestSIFT_Master.C

00001 #include "Media/FrameSeries.H"
00002 #include "Component/ModelManager.H"
00003 #include "Image/Image.H"
00004 #include "Image/PixelsTypes.H"
00005 #include "Robots/Beobot2/Testing/TestSIFT.ice.H"
00006 #include "Util/WorkThreadServer.H"
00007 #include "Util/Timer.H"
00008 #include "SIFT/VisualObject.H"
00009 
00010 
00011 #include <Ice/Ice.h>
00012 #include <fstream>
00013 
00014 ////////////////////////////////////////////////////////////////////////
00015 // Definition for a matching job which spawns a request to a remote 
00016 // worker and waits for the reply
00017 ////////////////////////////////////////////////////////////////////////
00018 class MatchJob : public JobServer::Job
00019 {
00020   public:
00021     MatchJob(TestSIFT::SIFTMatcherPrx matcherProxy,
00022         std::vector<rutz::shared_ptr<Keypoint> >::iterator kp_beginning,
00023         std::vector<rutz::shared_ptr<Keypoint> >::iterator kp_ending
00024         )
00025       : Job()
00026     {
00027       itsMatcherPrx = matcherProxy;
00028 
00029       std::vector<rutz::shared_ptr<Keypoint> >::iterator kpIt;
00030       for(kpIt = kp_beginning; kpIt < kp_ending; ++kpIt)
00031       {
00032         rutz::shared_ptr<Keypoint> key = *kpIt;
00033 
00034         //Copy all of the keypoint data into the Ice structure
00035         TestSIFT::keypoint newKey;
00036         newKey.x = key->getX();
00037         newKey.y = key->getY();
00038         newKey.s = key->getS();
00039         newKey.o = key->getO();
00040         newKey.m = key->getM();
00041         newKey.oriFV = key->getOriFV();
00042         itsKeypoints.push_back(newKey);
00043       }
00044     }
00045 
00046     void run()
00047     {
00048       //Send the list of keypoints to the remote worker, and wait until 
00049       //it returns with some results
00050 
00051                         try
00052                         {
00053                                 TestSIFT::idSequence ids = itsMatcherPrx->matchKeypoints(itsKeypoints);
00054                                 LDEBUG("Got Result of size: %Zu", ids.size());
00055                         }
00056                         catch(const Ice::Exception& ex)
00057                         {  std::cerr << "Failed: " << ex << std::endl;   }
00058                         catch(const char* msg)
00059                         {   std::cerr << "Failed: " << msg << std::endl; }
00060 
00061 
00062     }
00063 
00064     char const* jobType() const { return "MatchJob"; }
00065 
00066     TestSIFT::SIFTMatcherPrx itsMatcherPrx;
00067     TestSIFT::keypointSequence itsKeypoints;
00068 };
00069 
00070 int main(int argc, char const* argv[])
00071 {
00072   ////////////////////////////////////////////////////////////////////////
00073   // Start up our model components 
00074   ////////////////////////////////////////////////////////////////////////
00075   ModelManager mgr;
00076   nub::ref<InputFrameSeries> ifs(new InputFrameSeries(mgr));
00077   mgr.addSubComponent(ifs);
00078 
00079 //  nub::ref<OutputFrameSeries> ofs(new OutputFrameSeries(mgr));
00080 //  mgr.addSubComponent(ofs);
00081 
00082   mgr.parseCommandLine(argc, argv, "logfile <list of workers specified as IP[:PORT]>", 2,-1);
00083   mgr.start();
00084 
00085   ////////////////////////////////////////////////////////////////////////
00086   // Open the log file for writing
00087   ////////////////////////////////////////////////////////////////////////
00088   //Get the log file name as the first extra argument
00089   std::string logfilename = mgr.getExtraArg(0);
00090   std::ofstream logfile(logfilename.c_str());
00091 
00092 
00093   ////////////////////////////////////////////////////////////////////////
00094   // Create a list of worker proxies
00095   ////////////////////////////////////////////////////////////////////////
00096   std::vector<TestSIFT::SIFTMatcherPrx> workers;
00097 
00098   Ice::CommunicatorPtr ic;
00099   try
00100   {
00101     int dummy_argc=1;
00102     const char* dummy_argv[1] = {"dummy"};
00103 
00104     std::cout << "Initializing Ice Runtime...";
00105 
00106                 Ice::PropertiesPtr props = Ice::createProperties(dummy_argc,(char**)dummy_argv);
00107                 props->setProperty("Ice.MessageSizeMax", "1048576");
00108                 props->setProperty("Ice.Warn.Connections", "1");
00109                 Ice::InitializationData id;
00110                 id.properties = props;
00111 
00112     ic = Ice::initialize(id);
00113 
00114     std::cout << "Success!" << std::endl;
00115 
00116     int numWorkers = mgr.numExtraArgs()-1;
00117     for(int workerID=0; workerID<numWorkers; ++workerID)
00118     {
00119       //Get the worker IP and port separately, using the default port if not specified
00120       std::string workerString = mgr.getExtraArg(workerID+1);
00121       size_t colonIdx = workerString.find(":");
00122       std::string workerName   = workerString.substr(0, colonIdx);
00123       uint workerPort = TestSIFT::DEFAULTWORKERPORT;
00124       if(colonIdx != std::string::npos) 
00125         workerPort = atoi(workerString.substr(colonIdx+1, std::string::npos).c_str());
00126 
00127       //Try opening proxy connections to the worker services
00128       std::cout << "Opening Connection to " << workerName << ":" << workerPort << "...";
00129       char connectionBuffer[256];
00130       sprintf(connectionBuffer,
00131           "SIFTMatcher:default -h %s -p %d", workerName.c_str(), workerPort);
00132       Ice::ObjectPrx base = ic->stringToProxy(connectionBuffer);
00133       workers.push_back(TestSIFT::SIFTMatcherPrx::checkedCast(base));
00134       std::cout << "Opened" << std::endl;
00135     }
00136   }
00137   catch(const Ice::Exception& ex)
00138   {  std::cerr << "Failed: " << ex << std::endl;   }
00139   catch(const char* msg)
00140   {   std::cerr << "Failed: " << msg << std::endl; }
00141 
00142   ////////////////////////////////////////////////////////////////////////
00143   // Read input frames, extract the SIFT keypoints, and send them out to be
00144   // matched by the workers
00145   ////////////////////////////////////////////////////////////////////////
00146   LINFO("Beginning Processing With %Zu Nodes", workers.size());
00147 
00148   //Create a jobserver to handle the workers - one thread for each worker
00149   WorkThreadServer threadServer("SIFT Match Job Server", workers.size());
00150 
00151   Timer timer(1000000);
00152   FrameState state = FRAME_NEXT;
00153   while(state == FRAME_NEXT)
00154   {
00155     //Read in the new input image
00156     state = ifs->updateNext();
00157     Image<PixRGB<byte> > img = ifs->readRGB();
00158     if(img.initialized())
00159     {
00160       //ofs->writeRGB(img, "Input");
00161     }
00162     else 
00163       break;
00164 
00165     //Compute SIFT keypoints from the image
00166     VisualObject siftMatch("img", "img", img);
00167     std::vector<rutz::shared_ptr<Keypoint> > keypoints =
00168       siftMatch.getKeypoints();
00169 
00170     int kp_per_worker = keypoints.size() / workers.size();
00171 
00172     //Assign the computed keypoints to the workers
00173     rutz::shared_ptr<JobServer::Job> jobs[workers.size()];
00174     for(size_t workerIdx=0; workerIdx<workers.size(); ++workerIdx)
00175     {
00176       int keyIdx_start = kp_per_worker*workerIdx;
00177       int keyIdx_end   = kp_per_worker*(workerIdx+1);
00178       if(workerIdx == workers.size()-1)
00179         keyIdx_end = keypoints.size();
00180 
00181       LDEBUG("Worker %Zu gets %d - %d / %Zu",
00182           workerIdx,
00183           keyIdx_start,
00184           keyIdx_end,
00185           keypoints.size());
00186 
00187       jobs[workerIdx] = rutz::shared_ptr<JobServer::Job>(
00188           new MatchJob(
00189             workers[workerIdx],
00190             keypoints.begin() + keyIdx_start,
00191             keypoints.begin() + keyIdx_end
00192             )
00193           );
00194     }
00195 
00196     //Enqueue the jobs all at once, and start the timer
00197     timer.reset();
00198     threadServer.enqueueJobs(jobs, workers.size());
00199 
00200     //Wait for all jobs to finish, and stop the timer
00201     threadServer.flushQueue(10000, false);
00202     double time = timer.getSecs();
00203 
00204     logfile << time << std::endl;
00205     logfile.flush();
00206     LINFO("Done Frame in %fs", time);
00207   }
00208   LINFO("FINISHED");
00209 }
Generated on Sun May 8 08:41:19 2011 for iLab Neuromorphic Vision Toolkit by  doxygen 1.6.3