00001 /*!@file Transport/BufferedFrameIstream.C Frame-buffering layer on top of other FrameIstream objects */ 00002 00003 // //////////////////////////////////////////////////////////////////// // 00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2000-2005 // 00005 // by the University of Southern California (USC) and the iLab at USC. // 00006 // See http://iLab.usc.edu for information about this project. // 00007 // //////////////////////////////////////////////////////////////////// // 00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected // 00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency // 00010 // in Visual Environments, and Applications'' by Christof Koch and // 00011 // Laurent Itti, California Institute of Technology, 2001 (patent // 00012 // pending; application number 09/912,225 filed July 23, 2001; see // 00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status). // 00014 // //////////////////////////////////////////////////////////////////// // 00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit. // 00016 // // 00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can // 00018 // redistribute it and/or modify it under the terms of the GNU General // 00019 // Public License as published by the Free Software Foundation; either // 00020 // version 2 of the License, or (at your option) any later version. // 00021 // // 00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope // 00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the // 00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // 00025 // PURPOSE. See the GNU General Public License for more details. // 00026 // // 00027 // You should have received a copy of the GNU General Public License // 00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write // 00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, // 00030 // Boston, MA 02111-1307 USA. // 00031 // //////////////////////////////////////////////////////////////////// // 00032 // 00033 // Primary maintainer for this file: Rob Peters <rjpeters at usc dot edu> 00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Transport/BufferedFrameIstream.C $ 00035 // $Id: BufferedFrameIstream.C 14128 2010-10-12 23:39:22Z rand $ 00036 // 00037 00038 #ifndef TRANSPORT_BUFFEREDFRAMEISTREAM_C_DEFINED 00039 #define TRANSPORT_BUFFEREDFRAMEISTREAM_C_DEFINED 00040 00041 #include "Transport/BufferedFrameIstream.H" 00042 00043 #include "Component/ModelOptionDef.H" 00044 #include "Transport/FrameIstreamFactory.H" 00045 #include "Transport/TransportOpts.H" 00046 #include "Util/SimTime.H" 00047 #include "Util/log.H" 00048 #include "rutz/time.h" 00049 00050 // Used by: BufferedFrameIstream 00051 static const ModelOptionDef OPT_InputBufferSize = 00052 { MODOPT_ARG(size_t), "InputBufferSize", &MOC_INPUT, OPTEXP_CORE, 00053 "Number of frames to keep in input buffer when using --in=buf", 00054 "input-buffer-size", '\0', "<int>", "32" }; 00055 00056 // Used by: BufferedFrameIstream 00057 static const ModelOptionDef OPT_UnderflowStrategy = 00058 { MODOPT_ARG(ErrorStrategy), "UnderflowStrategy", &MOC_INPUT, OPTEXP_CORE, 00059 "What to do if the input buffer is empty when trying to read " 00060 "a new frame when using --in=buf; ABORT=make it a fatal error, " 00061 "RETRY=keep polling until the buffer becomes non-empty, " 00062 "IGNORE=ignore the error and return an empty frame as if " 00063 "end-of-stream had been reached", 00064 "underflow-strategy", '\0', "<ABORT|RETRY|IGNORE>", "IGNORE" }; 00065 00066 struct BufferedFrameIstream::Checkpoint 00067 { 00068 Checkpoint(int f, int q, int mq) : framenum(f), qsize(q), minqsize(mq) {} 00069 00070 int framenum; 00071 int qsize; 00072 int minqsize; 00073 }; 00074 00075 // ###################################################################### 00076 BufferedFrameIstream::BufferedFrameIstream(OptionManager& mgr) 00077 : 00078 FrameIstream(mgr, "Buffered Input", "BufferedFrameIstream"), 00079 itsBufSize(&OPT_InputBufferSize, this), 00080 itsUnderflowStrategy(&OPT_UnderflowStrategy, this), 00081 itsSrc(), 00082 itsQ(0), 00083 itsInputDone(false), 00084 itsStop(false), 00085 itsNumFilled(), 00086 itsMinNumFilled(0), 00087 itsInputFrameNum(-1), 00088 itsOutputFrameNum(0), 00089 itsFrameSpec(), 00090 itsFrameSpecValid(false), 00091 itsCheckpoints() 00092 { 00093 if (0 != pthread_mutex_init(&itsQmut, NULL)) 00094 LFATAL("pthread_mutex_init() failed"); 00095 } 00096 00097 // ###################################################################### 00098 BufferedFrameIstream::~BufferedFrameIstream() 00099 { 00100 delete itsQ; 00101 } 00102 00103 // ###################################################################### 00104 void BufferedFrameIstream::setConfigInfo(const std::string& cfg) 00105 { 00106 if (itsSrc.is_valid()) 00107 this->removeSubComponent(*itsSrc); 00108 itsSrc = makeFrameIstream(cfg); 00109 ASSERT(itsSrc.is_valid()); 00110 this->addSubComponent(itsSrc); 00111 } 00112 00113 // ###################################################################### 00114 bool BufferedFrameIstream::setFrameNumber(int n) 00115 { 00116 if (!itsSrc.is_valid()) 00117 LFATAL("must have a valid input source before setFrameNumber()"); 00118 00119 itsOutputFrameNum = n; 00120 00121 return true; 00122 } 00123 00124 // ###################################################################### 00125 GenericFrameSpec BufferedFrameIstream::peekFrameSpec() 00126 { 00127 if (!itsFrameSpecValid) 00128 { 00129 if (!itsSrc.is_valid()) 00130 LFATAL("must have a valid input source before peekFrameSpec()"); 00131 00132 if (itsInputFrameNum < 0) 00133 { 00134 itsInputFrameNum = itsOutputFrameNum; 00135 itsSrc->setFrameNumber(itsInputFrameNum); 00136 } 00137 00138 itsFrameSpec = itsSrc->peekFrameSpec(); 00139 itsFrameSpecValid = true; 00140 } 00141 00142 return itsFrameSpec; 00143 } 00144 00145 // ###################################################################### 00146 SimTime BufferedFrameIstream::getNaturalFrameTime() const 00147 { 00148 if (!this->started()) 00149 LFATAL("must be start()-ed before getNaturalFrameTime()"); 00150 00151 ASSERT(itsSrc.is_valid()); 00152 00153 return itsSrc->getNaturalFrameTime(); 00154 } 00155 00156 // ###################################################################### 00157 void BufferedFrameIstream::startStream() 00158 { 00159 if (!this->started()) 00160 LFATAL("must be start()-ed before startStream()"); 00161 00162 ASSERT(itsQ != 0); 00163 00164 // wait for the input queue to get filled 00165 rutz::time t = rutz::time::wall_clock_now(); 00166 while (itsNumFilled.atomic_get() < int(itsQ->size()) && !itsInputDone) 00167 { 00168 rutz::time t2 = rutz::time::wall_clock_now(); 00169 if ((t2 - t).sec() >= 1.0) 00170 { 00171 LINFO("waiting for input queue to be filled (%d/%d)", 00172 itsNumFilled.atomic_get(), int(itsQ->size())); 00173 t = t2; 00174 } 00175 usleep(20000); 00176 } 00177 00178 LINFO("%d/%d entries filled in input queue%s", 00179 itsNumFilled.atomic_get(), int(itsQ->size()), 00180 itsInputDone ? " [input read to completion]" : ""); 00181 } 00182 00183 // ###################################################################### 00184 GenericFrame BufferedFrameIstream::readFrame() 00185 { 00186 if (!this->started()) 00187 LFATAL("must be start()-ed before readFrame()"); 00188 00189 ASSERT(itsQ != 0); 00190 00191 GenericFrame result; 00192 00193 while (true) 00194 { 00195 const bool pop_ok = itsQ->pop_front(result); 00196 00197 if (pop_ok) 00198 { 00199 const int n = itsNumFilled.atomic_decr_return(); 00200 if (n < itsMinNumFilled) 00201 itsMinNumFilled = n; 00202 } 00203 00204 // we have premature underflow if the pop fails (!pop_ok) before 00205 // our underlying input source is finished (!itsInputDone): 00206 const bool did_underflow = (!pop_ok && !itsInputDone); 00207 00208 if (!did_underflow) 00209 break; 00210 else if (itsUnderflowStrategy.getVal() == ERR_ABORT) 00211 { 00212 LFATAL("input underflow"); 00213 } 00214 else if (itsUnderflowStrategy.getVal() == ERR_RETRY) 00215 { 00216 LDEBUG("input underflow"); 00217 usleep(20000); 00218 continue; 00219 } 00220 else if (itsUnderflowStrategy.getVal() == ERR_IGNORE) 00221 { 00222 // report the underflow error but ignore it and break out 00223 // the loop, letting the returned image be empty 00224 LINFO("input underflow"); 00225 break; 00226 } 00227 } 00228 00229 if (itsOutputFrameNum % 100 == 0) 00230 itsCheckpoints.push_back 00231 (Checkpoint(itsOutputFrameNum, itsNumFilled.atomic_get(), 00232 itsMinNumFilled)); 00233 00234 return result; 00235 } 00236 00237 // ###################################################################### 00238 void BufferedFrameIstream::start2() 00239 { 00240 if (this->started()) 00241 LFATAL("must be stop()-ed before start()"); 00242 00243 if (!itsSrc.is_valid()) 00244 LFATAL("no underlying input source given"); 00245 00246 if (itsBufSize.getVal() <= 0) 00247 LFATAL("--%s must be greater than 0, but got --%s=%"ZU, 00248 itsBufSize.getOptionDef()->longoptname, 00249 itsBufSize.getOptionDef()->longoptname, 00250 itsBufSize.getVal()); 00251 00252 itsInputDone = false; 00253 itsStop = false; 00254 itsMinNumFilled = int(itsBufSize.getVal()); 00255 itsCheckpoints.clear(); 00256 00257 // call peekFrameSpec() at least once to ensure that itsFrameSpec 00258 // gets cached, so that we don't need to access 00259 // itsSrc->peekFrameSpec() after we start the worker thread (because 00260 // then we would have possibly reentrant calls into itsSrc, which 00261 // has no guarantee of being thread-safe) 00262 (void) this->peekFrameSpec(); 00263 00264 ASSERT(itsQ == 0); 00265 00266 itsQ = new rutz::circular_queue<GenericFrame>(itsBufSize.getVal()); 00267 00268 if (0 != pthread_create(&itsFillThread, NULL, &c_fill, 00269 static_cast<void*>(this))) 00270 LFATAL("pthread_create() failed"); 00271 00272 } 00273 00274 // ###################################################################### 00275 void BufferedFrameIstream::stop1() 00276 { 00277 if (!this->started()) 00278 LFATAL("must be start()-ed before stop()"); 00279 00280 ASSERT(itsQ != 0); 00281 00282 itsStop = true; 00283 00284 if (0 != pthread_join(itsFillThread, NULL)) 00285 LERROR("pthread_join() failed"); 00286 00287 for (std::list<Checkpoint>::const_iterator 00288 itr = itsCheckpoints.begin(), stop = itsCheckpoints.end(); 00289 itr != stop; ++itr) 00290 { 00291 LINFO("checkpoint frame %06d - queue fill : %d/%d (min %d)", 00292 (*itr).framenum, (*itr).qsize, int(itsQ->size()), 00293 (*itr).minqsize); 00294 } 00295 00296 delete itsQ; 00297 itsQ = 0; 00298 00299 itsFrameSpecValid = false; 00300 } 00301 00302 // ###################################################################### 00303 void* BufferedFrameIstream::c_fill(void* p) 00304 { 00305 try 00306 { 00307 BufferedFrameIstream* const b = 00308 static_cast<BufferedFrameIstream*>(p); 00309 00310 // convert nub::soft_ref to nub::ref to make sure that we have a 00311 // valid object: 00312 nub::ref<FrameIstream> src = b->itsSrc; 00313 00314 src->startStream(); 00315 00316 while (true) 00317 { 00318 if (b->itsStop) 00319 break; 00320 00321 // get the next frame: 00322 00323 if (!src->setFrameNumber(b->itsInputFrameNum++)) 00324 { 00325 b->itsInputDone = true; 00326 return NULL; 00327 } 00328 00329 GenericFrame f = src->readFrame(); 00330 if (!f.initialized()) 00331 { 00332 b->itsInputDone = true; 00333 return NULL; 00334 } 00335 00336 // now try to push it onto the queue (and just keep 00337 // re-trying if the push fails due to the queue being full): 00338 00339 while (true) 00340 { 00341 if (b->itsStop) 00342 break; 00343 00344 ASSERT(b->itsQ != 0); 00345 00346 if (b->itsQ->push_back(f) == true) 00347 { 00348 b->itsNumFilled.atomic_incr(); 00349 break; 00350 } 00351 00352 usleep(20000); 00353 } 00354 } 00355 } 00356 catch (...) 00357 { 00358 REPORT_CURRENT_EXCEPTION; 00359 exit(1); 00360 } 00361 00362 return NULL; 00363 } 00364 00365 // ###################################################################### 00366 /* So things look consistent in everyone's emacs... */ 00367 /* Local Variables: */ 00368 /* mode: c++ */ 00369 /* indent-tabs-mode: nil */ 00370 /* End: */ 00371 00372 #endif // TRANSPORT_BUFFEREDFRAMEISTREAM_C_DEFINED