app-TestSIFT_Master.C
00001 #include "Media/FrameSeries.H"
00002 #include "Component/ModelManager.H"
00003 #include "Image/Image.H"
00004 #include "Image/PixelsTypes.H"
00005 #include "Robots/Beobot2/Testing/TestSIFT.ice.H"
00006 #include "Util/WorkThreadServer.H"
00007 #include "Util/Timer.H"
00008 #include "SIFT/VisualObject.H"
00009
00010
00011 #include <Ice/Ice.h>
00012 #include <fstream>
00013
00014
00015
00016
00017
00018 class MatchJob : public JobServer::Job
00019 {
00020 public:
00021 MatchJob(TestSIFT::SIFTMatcherPrx matcherProxy,
00022 std::vector<rutz::shared_ptr<Keypoint> >::iterator kp_beginning,
00023 std::vector<rutz::shared_ptr<Keypoint> >::iterator kp_ending
00024 )
00025 : Job()
00026 {
00027 itsMatcherPrx = matcherProxy;
00028
00029 std::vector<rutz::shared_ptr<Keypoint> >::iterator kpIt;
00030 for(kpIt = kp_beginning; kpIt < kp_ending; ++kpIt)
00031 {
00032 rutz::shared_ptr<Keypoint> key = *kpIt;
00033
00034
00035 TestSIFT::keypoint newKey;
00036 newKey.x = key->getX();
00037 newKey.y = key->getY();
00038 newKey.s = key->getS();
00039 newKey.o = key->getO();
00040 newKey.m = key->getM();
00041 newKey.oriFV = key->getOriFV();
00042 itsKeypoints.push_back(newKey);
00043 }
00044 }
00045
00046 void run()
00047 {
00048
00049
00050
00051 try
00052 {
00053 TestSIFT::idSequence ids = itsMatcherPrx->matchKeypoints(itsKeypoints);
00054 LDEBUG("Got Result of size: %Zu", ids.size());
00055 }
00056 catch(const Ice::Exception& ex)
00057 { std::cerr << "Failed: " << ex << std::endl; }
00058 catch(const char* msg)
00059 { std::cerr << "Failed: " << msg << std::endl; }
00060
00061
00062 }
00063
00064 char const* jobType() const { return "MatchJob"; }
00065
00066 TestSIFT::SIFTMatcherPrx itsMatcherPrx;
00067 TestSIFT::keypointSequence itsKeypoints;
00068 };
00069
00070 int main(int argc, char const* argv[])
00071 {
00072
00073
00074
00075 ModelManager mgr;
00076 nub::ref<InputFrameSeries> ifs(new InputFrameSeries(mgr));
00077 mgr.addSubComponent(ifs);
00078
00079
00080
00081
00082 mgr.parseCommandLine(argc, argv, "logfile <list of workers specified as IP[:PORT]>", 2,-1);
00083 mgr.start();
00084
00085
00086
00087
00088
00089 std::string logfilename = mgr.getExtraArg(0);
00090 std::ofstream logfile(logfilename.c_str());
00091
00092
00093
00094
00095
00096 std::vector<TestSIFT::SIFTMatcherPrx> workers;
00097
00098 Ice::CommunicatorPtr ic;
00099 try
00100 {
00101 int dummy_argc=1;
00102 const char* dummy_argv[1] = {"dummy"};
00103
00104 std::cout << "Initializing Ice Runtime...";
00105
00106 Ice::PropertiesPtr props = Ice::createProperties(dummy_argc,(char**)dummy_argv);
00107 props->setProperty("Ice.MessageSizeMax", "1048576");
00108 props->setProperty("Ice.Warn.Connections", "1");
00109 Ice::InitializationData id;
00110 id.properties = props;
00111
00112 ic = Ice::initialize(id);
00113
00114 std::cout << "Success!" << std::endl;
00115
00116 int numWorkers = mgr.numExtraArgs()-1;
00117 for(int workerID=0; workerID<numWorkers; ++workerID)
00118 {
00119
00120 std::string workerString = mgr.getExtraArg(workerID+1);
00121 size_t colonIdx = workerString.find(":");
00122 std::string workerName = workerString.substr(0, colonIdx);
00123 uint workerPort = TestSIFT::DEFAULTWORKERPORT;
00124 if(colonIdx != std::string::npos)
00125 workerPort = atoi(workerString.substr(colonIdx+1, std::string::npos).c_str());
00126
00127
00128 std::cout << "Opening Connection to " << workerName << ":" << workerPort << "...";
00129 char connectionBuffer[256];
00130 sprintf(connectionBuffer,
00131 "SIFTMatcher:default -h %s -p %d", workerName.c_str(), workerPort);
00132 Ice::ObjectPrx base = ic->stringToProxy(connectionBuffer);
00133 workers.push_back(TestSIFT::SIFTMatcherPrx::checkedCast(base));
00134 std::cout << "Opened" << std::endl;
00135 }
00136 }
00137 catch(const Ice::Exception& ex)
00138 { std::cerr << "Failed: " << ex << std::endl; }
00139 catch(const char* msg)
00140 { std::cerr << "Failed: " << msg << std::endl; }
00141
00142
00143
00144
00145
00146 LINFO("Beginning Processing With %Zu Nodes", workers.size());
00147
00148
00149 WorkThreadServer threadServer("SIFT Match Job Server", workers.size());
00150
00151 Timer timer(1000000);
00152 FrameState state = FRAME_NEXT;
00153 while(state == FRAME_NEXT)
00154 {
00155
00156 state = ifs->updateNext();
00157 Image<PixRGB<byte> > img = ifs->readRGB();
00158 if(img.initialized())
00159 {
00160
00161 }
00162 else
00163 break;
00164
00165
00166 VisualObject siftMatch("img", "img", img);
00167 std::vector<rutz::shared_ptr<Keypoint> > keypoints =
00168 siftMatch.getKeypoints();
00169
00170 int kp_per_worker = keypoints.size() / workers.size();
00171
00172
00173 rutz::shared_ptr<JobServer::Job> jobs[workers.size()];
00174 for(size_t workerIdx=0; workerIdx<workers.size(); ++workerIdx)
00175 {
00176 int keyIdx_start = kp_per_worker*workerIdx;
00177 int keyIdx_end = kp_per_worker*(workerIdx+1);
00178 if(workerIdx == workers.size()-1)
00179 keyIdx_end = keypoints.size();
00180
00181 LDEBUG("Worker %Zu gets %d - %d / %Zu",
00182 workerIdx,
00183 keyIdx_start,
00184 keyIdx_end,
00185 keypoints.size());
00186
00187 jobs[workerIdx] = rutz::shared_ptr<JobServer::Job>(
00188 new MatchJob(
00189 workers[workerIdx],
00190 keypoints.begin() + keyIdx_start,
00191 keypoints.begin() + keyIdx_end
00192 )
00193 );
00194 }
00195
00196
00197 timer.reset();
00198 threadServer.enqueueJobs(jobs, workers.size());
00199
00200
00201 threadServer.flushQueue(10000, false);
00202 double time = timer.getSecs();
00203
00204 logfile << time << std::endl;
00205 logfile.flush();
00206 LINFO("Done Frame in %fs", time);
00207 }
00208 LINFO("FINISHED");
00209 }