00001 #include "Media/FrameSeries.H" 00002 #include "Component/ModelManager.H" 00003 #include "Image/Image.H" 00004 #include "Image/PixelsTypes.H" 00005 #include "Robots/Beobot2/Testing/TestSIFT.ice.H" 00006 #include "Util/WorkThreadServer.H" 00007 #include "Util/Timer.H" 00008 #include "SIFT/VisualObject.H" 00009 00010 00011 #include <Ice/Ice.h> 00012 #include <fstream> 00013 00014 //////////////////////////////////////////////////////////////////////// 00015 // Definition for a matching job which spawns a request to a remote 00016 // worker and waits for the reply 00017 //////////////////////////////////////////////////////////////////////// 00018 class MatchJob : public JobServer::Job 00019 { 00020 public: 00021 MatchJob(TestSIFT::SIFTMatcherPrx matcherProxy, 00022 std::vector<rutz::shared_ptr<Keypoint> >::iterator kp_beginning, 00023 std::vector<rutz::shared_ptr<Keypoint> >::iterator kp_ending 00024 ) 00025 : Job() 00026 { 00027 itsMatcherPrx = matcherProxy; 00028 00029 std::vector<rutz::shared_ptr<Keypoint> >::iterator kpIt; 00030 for(kpIt = kp_beginning; kpIt < kp_ending; ++kpIt) 00031 { 00032 rutz::shared_ptr<Keypoint> key = *kpIt; 00033 00034 //Copy all of the keypoint data into the Ice structure 00035 TestSIFT::keypoint newKey; 00036 newKey.x = key->getX(); 00037 newKey.y = key->getY(); 00038 newKey.s = key->getS(); 00039 newKey.o = key->getO(); 00040 newKey.m = key->getM(); 00041 newKey.oriFV = key->getOriFV(); 00042 itsKeypoints.push_back(newKey); 00043 } 00044 } 00045 00046 void run() 00047 { 00048 //Send the list of keypoints to the remote worker, and wait until 00049 //it returns with some results 00050 00051 try 00052 { 00053 TestSIFT::idSequence ids = itsMatcherPrx->matchKeypoints(itsKeypoints); 00054 LDEBUG("Got Result of size: %Zu", ids.size()); 00055 } 00056 catch(const Ice::Exception& ex) 00057 { std::cerr << "Failed: " << ex << std::endl; } 00058 catch(const char* msg) 00059 { std::cerr << "Failed: " << msg << std::endl; } 00060 00061 00062 } 00063 00064 char const* jobType() const { return "MatchJob"; } 00065 00066 TestSIFT::SIFTMatcherPrx itsMatcherPrx; 00067 TestSIFT::keypointSequence itsKeypoints; 00068 }; 00069 00070 int main(int argc, char const* argv[]) 00071 { 00072 //////////////////////////////////////////////////////////////////////// 00073 // Start up our model components 00074 //////////////////////////////////////////////////////////////////////// 00075 ModelManager mgr; 00076 nub::ref<InputFrameSeries> ifs(new InputFrameSeries(mgr)); 00077 mgr.addSubComponent(ifs); 00078 00079 // nub::ref<OutputFrameSeries> ofs(new OutputFrameSeries(mgr)); 00080 // mgr.addSubComponent(ofs); 00081 00082 mgr.parseCommandLine(argc, argv, "logfile <list of workers specified as IP[:PORT]>", 2,-1); 00083 mgr.start(); 00084 00085 //////////////////////////////////////////////////////////////////////// 00086 // Open the log file for writing 00087 //////////////////////////////////////////////////////////////////////// 00088 //Get the log file name as the first extra argument 00089 std::string logfilename = mgr.getExtraArg(0); 00090 std::ofstream logfile(logfilename.c_str()); 00091 00092 00093 //////////////////////////////////////////////////////////////////////// 00094 // Create a list of worker proxies 00095 //////////////////////////////////////////////////////////////////////// 00096 std::vector<TestSIFT::SIFTMatcherPrx> workers; 00097 00098 Ice::CommunicatorPtr ic; 00099 try 00100 { 00101 int dummy_argc=1; 00102 const char* dummy_argv[1] = {"dummy"}; 00103 00104 std::cout << "Initializing Ice Runtime..."; 00105 00106 Ice::PropertiesPtr props = Ice::createProperties(dummy_argc,(char**)dummy_argv); 00107 props->setProperty("Ice.MessageSizeMax", "1048576"); 00108 props->setProperty("Ice.Warn.Connections", "1"); 00109 Ice::InitializationData id; 00110 id.properties = props; 00111 00112 ic = Ice::initialize(id); 00113 00114 std::cout << "Success!" << std::endl; 00115 00116 int numWorkers = mgr.numExtraArgs()-1; 00117 for(int workerID=0; workerID<numWorkers; ++workerID) 00118 { 00119 //Get the worker IP and port separately, using the default port if not specified 00120 std::string workerString = mgr.getExtraArg(workerID+1); 00121 size_t colonIdx = workerString.find(":"); 00122 std::string workerName = workerString.substr(0, colonIdx); 00123 uint workerPort = TestSIFT::DEFAULTWORKERPORT; 00124 if(colonIdx != std::string::npos) 00125 workerPort = atoi(workerString.substr(colonIdx+1, std::string::npos).c_str()); 00126 00127 //Try opening proxy connections to the worker services 00128 std::cout << "Opening Connection to " << workerName << ":" << workerPort << "..."; 00129 char connectionBuffer[256]; 00130 sprintf(connectionBuffer, 00131 "SIFTMatcher:default -h %s -p %d", workerName.c_str(), workerPort); 00132 Ice::ObjectPrx base = ic->stringToProxy(connectionBuffer); 00133 workers.push_back(TestSIFT::SIFTMatcherPrx::checkedCast(base)); 00134 std::cout << "Opened" << std::endl; 00135 } 00136 } 00137 catch(const Ice::Exception& ex) 00138 { std::cerr << "Failed: " << ex << std::endl; } 00139 catch(const char* msg) 00140 { std::cerr << "Failed: " << msg << std::endl; } 00141 00142 //////////////////////////////////////////////////////////////////////// 00143 // Read input frames, extract the SIFT keypoints, and send them out to be 00144 // matched by the workers 00145 //////////////////////////////////////////////////////////////////////// 00146 LINFO("Beginning Processing With %Zu Nodes", workers.size()); 00147 00148 //Create a jobserver to handle the workers - one thread for each worker 00149 WorkThreadServer threadServer("SIFT Match Job Server", workers.size()); 00150 00151 Timer timer(1000000); 00152 FrameState state = FRAME_NEXT; 00153 while(state == FRAME_NEXT) 00154 { 00155 //Read in the new input image 00156 state = ifs->updateNext(); 00157 Image<PixRGB<byte> > img = ifs->readRGB(); 00158 if(img.initialized()) 00159 { 00160 //ofs->writeRGB(img, "Input"); 00161 } 00162 else 00163 break; 00164 00165 //Compute SIFT keypoints from the image 00166 VisualObject siftMatch("img", "img", img); 00167 std::vector<rutz::shared_ptr<Keypoint> > keypoints = 00168 siftMatch.getKeypoints(); 00169 00170 int kp_per_worker = keypoints.size() / workers.size(); 00171 00172 //Assign the computed keypoints to the workers 00173 rutz::shared_ptr<JobServer::Job> jobs[workers.size()]; 00174 for(size_t workerIdx=0; workerIdx<workers.size(); ++workerIdx) 00175 { 00176 int keyIdx_start = kp_per_worker*workerIdx; 00177 int keyIdx_end = kp_per_worker*(workerIdx+1); 00178 if(workerIdx == workers.size()-1) 00179 keyIdx_end = keypoints.size(); 00180 00181 LDEBUG("Worker %Zu gets %d - %d / %Zu", 00182 workerIdx, 00183 keyIdx_start, 00184 keyIdx_end, 00185 keypoints.size()); 00186 00187 jobs[workerIdx] = rutz::shared_ptr<JobServer::Job>( 00188 new MatchJob( 00189 workers[workerIdx], 00190 keypoints.begin() + keyIdx_start, 00191 keypoints.begin() + keyIdx_end 00192 ) 00193 ); 00194 } 00195 00196 //Enqueue the jobs all at once, and start the timer 00197 timer.reset(); 00198 threadServer.enqueueJobs(jobs, workers.size()); 00199 00200 //Wait for all jobs to finish, and stop the timer 00201 threadServer.flushQueue(10000, false); 00202 double time = timer.getSecs(); 00203 00204 logfile << time << std::endl; 00205 logfile.flush(); 00206 LINFO("Done Frame in %fs", time); 00207 } 00208 LINFO("FINISHED"); 00209 }