BufferedFrameIstream.C

Go to the documentation of this file.
00001 /*!@file Transport/BufferedFrameIstream.C Frame-buffering layer on top of other FrameIstream objects */
00002 
00003 // //////////////////////////////////////////////////////////////////// //
00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2000-2005   //
00005 // by the University of Southern California (USC) and the iLab at USC.  //
00006 // See http://iLab.usc.edu for information about this project.          //
00007 // //////////////////////////////////////////////////////////////////// //
00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected //
00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency //
00010 // in Visual Environments, and Applications'' by Christof Koch and      //
00011 // Laurent Itti, California Institute of Technology, 2001 (patent       //
00012 // pending; application number 09/912,225 filed July 23, 2001; see      //
00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status).     //
00014 // //////////////////////////////////////////////////////////////////// //
00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit.       //
00016 //                                                                      //
00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can   //
00018 // redistribute it and/or modify it under the terms of the GNU General  //
00019 // Public License as published by the Free Software Foundation; either  //
00020 // version 2 of the License, or (at your option) any later version.     //
00021 //                                                                      //
00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope  //
00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the   //
00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR      //
00025 // PURPOSE.  See the GNU General Public License for more details.       //
00026 //                                                                      //
00027 // You should have received a copy of the GNU General Public License    //
00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write   //
00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,   //
00030 // Boston, MA 02111-1307 USA.                                           //
00031 // //////////////////////////////////////////////////////////////////// //
00032 //
00033 // Primary maintainer for this file: Rob Peters <rjpeters at usc dot edu>
00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Transport/BufferedFrameIstream.C $
00035 // $Id: BufferedFrameIstream.C 14128 2010-10-12 23:39:22Z rand $
00036 //
00037 
00038 #ifndef TRANSPORT_BUFFEREDFRAMEISTREAM_C_DEFINED
00039 #define TRANSPORT_BUFFEREDFRAMEISTREAM_C_DEFINED
00040 
00041 #include "Transport/BufferedFrameIstream.H"
00042 
00043 #include "Component/ModelOptionDef.H"
00044 #include "Transport/FrameIstreamFactory.H"
00045 #include "Transport/TransportOpts.H"
00046 #include "Util/SimTime.H"
00047 #include "Util/log.H"
00048 #include "rutz/time.h"
00049 
00050 // Used by: BufferedFrameIstream
00051 static const ModelOptionDef OPT_InputBufferSize =
00052   { MODOPT_ARG(size_t), "InputBufferSize", &MOC_INPUT, OPTEXP_CORE,
00053     "Number of frames to keep in input buffer when using --in=buf",
00054     "input-buffer-size", '\0', "<int>", "32" };
00055 
00056 // Used by: BufferedFrameIstream
00057 static const ModelOptionDef OPT_UnderflowStrategy =
00058   { MODOPT_ARG(ErrorStrategy), "UnderflowStrategy", &MOC_INPUT, OPTEXP_CORE,
00059     "What to do if the input buffer is empty when trying to read "
00060     "a new frame when using --in=buf; ABORT=make it a fatal error, "
00061     "RETRY=keep polling until the buffer becomes non-empty, "
00062     "IGNORE=ignore the error and return an empty frame as if "
00063     "end-of-stream had been reached",
00064     "underflow-strategy", '\0', "<ABORT|RETRY|IGNORE>", "IGNORE" };
00065 
00066 struct BufferedFrameIstream::Checkpoint
00067 {
00068   Checkpoint(int f, int q, int mq) : framenum(f), qsize(q), minqsize(mq) {}
00069 
00070   int framenum;
00071   int qsize;
00072   int minqsize;
00073 };
00074 
00075 // ######################################################################
00076 BufferedFrameIstream::BufferedFrameIstream(OptionManager& mgr)
00077   :
00078   FrameIstream(mgr, "Buffered Input", "BufferedFrameIstream"),
00079   itsBufSize(&OPT_InputBufferSize, this),
00080   itsUnderflowStrategy(&OPT_UnderflowStrategy, this),
00081   itsSrc(),
00082   itsQ(0),
00083   itsInputDone(false),
00084   itsStop(false),
00085   itsNumFilled(),
00086   itsMinNumFilled(0),
00087   itsInputFrameNum(-1),
00088   itsOutputFrameNum(0),
00089   itsFrameSpec(),
00090   itsFrameSpecValid(false),
00091   itsCheckpoints()
00092 {
00093   if (0 != pthread_mutex_init(&itsQmut, NULL))
00094     LFATAL("pthread_mutex_init() failed");
00095 }
00096 
00097 // ######################################################################
00098 BufferedFrameIstream::~BufferedFrameIstream()
00099 {
00100   delete itsQ;
00101 }
00102 
00103 // ######################################################################
00104 void BufferedFrameIstream::setConfigInfo(const std::string& cfg)
00105 {
00106   if (itsSrc.is_valid())
00107     this->removeSubComponent(*itsSrc);
00108   itsSrc = makeFrameIstream(cfg);
00109   ASSERT(itsSrc.is_valid());
00110   this->addSubComponent(itsSrc);
00111 }
00112 
00113 // ######################################################################
00114 bool BufferedFrameIstream::setFrameNumber(int n)
00115 {
00116   if (!itsSrc.is_valid())
00117     LFATAL("must have a valid input source before setFrameNumber()");
00118 
00119   itsOutputFrameNum = n;
00120 
00121   return true;
00122 }
00123 
00124 // ######################################################################
00125 GenericFrameSpec BufferedFrameIstream::peekFrameSpec()
00126 {
00127   if (!itsFrameSpecValid)
00128     {
00129       if (!itsSrc.is_valid())
00130         LFATAL("must have a valid input source before peekFrameSpec()");
00131 
00132       if (itsInputFrameNum < 0)
00133         {
00134           itsInputFrameNum = itsOutputFrameNum;
00135           itsSrc->setFrameNumber(itsInputFrameNum);
00136         }
00137 
00138       itsFrameSpec = itsSrc->peekFrameSpec();
00139       itsFrameSpecValid = true;
00140     }
00141 
00142   return itsFrameSpec;
00143 }
00144 
00145 // ######################################################################
00146 SimTime BufferedFrameIstream::getNaturalFrameTime() const
00147 {
00148   if (!this->started())
00149     LFATAL("must be start()-ed before getNaturalFrameTime()");
00150 
00151   ASSERT(itsSrc.is_valid());
00152 
00153   return itsSrc->getNaturalFrameTime();
00154 }
00155 
00156 // ######################################################################
00157 void BufferedFrameIstream::startStream()
00158 {
00159   if (!this->started())
00160     LFATAL("must be start()-ed before startStream()");
00161 
00162   ASSERT(itsQ != 0);
00163 
00164   // wait for the input queue to get filled
00165   rutz::time t = rutz::time::wall_clock_now();
00166   while (itsNumFilled.atomic_get() < int(itsQ->size()) && !itsInputDone)
00167     {
00168       rutz::time t2 = rutz::time::wall_clock_now();
00169       if ((t2 - t).sec() >= 1.0)
00170         {
00171           LINFO("waiting for input queue to be filled (%d/%d)",
00172                 itsNumFilled.atomic_get(), int(itsQ->size()));
00173           t = t2;
00174         }
00175       usleep(20000);
00176     }
00177 
00178   LINFO("%d/%d entries filled in input queue%s",
00179         itsNumFilled.atomic_get(), int(itsQ->size()),
00180         itsInputDone ? " [input read to completion]" : "");
00181 }
00182 
00183 // ######################################################################
00184 GenericFrame BufferedFrameIstream::readFrame()
00185 {
00186   if (!this->started())
00187     LFATAL("must be start()-ed before readFrame()");
00188 
00189   ASSERT(itsQ != 0);
00190 
00191   GenericFrame result;
00192 
00193   while (true)
00194     {
00195       const bool pop_ok = itsQ->pop_front(result);
00196 
00197       if (pop_ok)
00198         {
00199           const int n = itsNumFilled.atomic_decr_return();
00200           if (n < itsMinNumFilled)
00201             itsMinNumFilled = n;
00202         }
00203 
00204       // we have premature underflow if the pop fails (!pop_ok) before
00205       // our underlying input source is finished (!itsInputDone):
00206       const bool did_underflow = (!pop_ok && !itsInputDone);
00207 
00208       if (!did_underflow)
00209         break;
00210       else if (itsUnderflowStrategy.getVal() == ERR_ABORT)
00211         {
00212           LFATAL("input underflow");
00213         }
00214       else if (itsUnderflowStrategy.getVal() == ERR_RETRY)
00215         {
00216           LDEBUG("input underflow");
00217           usleep(20000);
00218           continue;
00219         }
00220       else if (itsUnderflowStrategy.getVal() == ERR_IGNORE)
00221         {
00222           // report the underflow error but ignore it and break out
00223           // the loop, letting the returned image be empty
00224           LINFO("input underflow");
00225           break;
00226         }
00227     }
00228 
00229   if (itsOutputFrameNum % 100 == 0)
00230     itsCheckpoints.push_back
00231       (Checkpoint(itsOutputFrameNum, itsNumFilled.atomic_get(),
00232                   itsMinNumFilled));
00233 
00234   return result;
00235 }
00236 
00237 // ######################################################################
00238 void BufferedFrameIstream::start2()
00239 {
00240   if (this->started())
00241     LFATAL("must be stop()-ed before start()");
00242 
00243   if (!itsSrc.is_valid())
00244     LFATAL("no underlying input source given");
00245 
00246   if (itsBufSize.getVal() <= 0)
00247     LFATAL("--%s must be greater than 0, but got --%s=%"ZU,
00248            itsBufSize.getOptionDef()->longoptname,
00249            itsBufSize.getOptionDef()->longoptname,
00250            itsBufSize.getVal());
00251 
00252   itsInputDone = false;
00253   itsStop = false;
00254   itsMinNumFilled = int(itsBufSize.getVal());
00255   itsCheckpoints.clear();
00256 
00257   // call peekFrameSpec() at least once to ensure that itsFrameSpec
00258   // gets cached, so that we don't need to access
00259   // itsSrc->peekFrameSpec() after we start the worker thread (because
00260   // then we would have possibly reentrant calls into itsSrc, which
00261   // has no guarantee of being thread-safe)
00262   (void) this->peekFrameSpec();
00263 
00264   ASSERT(itsQ == 0);
00265 
00266   itsQ = new rutz::circular_queue<GenericFrame>(itsBufSize.getVal());
00267 
00268   if (0 != pthread_create(&itsFillThread, NULL, &c_fill,
00269                           static_cast<void*>(this)))
00270     LFATAL("pthread_create() failed");
00271 
00272 }
00273 
00274 // ######################################################################
00275 void BufferedFrameIstream::stop1()
00276 {
00277   if (!this->started())
00278     LFATAL("must be start()-ed before stop()");
00279 
00280   ASSERT(itsQ != 0);
00281 
00282   itsStop = true;
00283 
00284   if (0 != pthread_join(itsFillThread, NULL))
00285     LERROR("pthread_join() failed");
00286 
00287   for (std::list<Checkpoint>::const_iterator
00288          itr = itsCheckpoints.begin(), stop = itsCheckpoints.end();
00289        itr != stop; ++itr)
00290     {
00291       LINFO("checkpoint frame %06d - queue fill : %d/%d (min %d)",
00292             (*itr).framenum, (*itr).qsize, int(itsQ->size()),
00293             (*itr).minqsize);
00294     }
00295 
00296   delete itsQ;
00297   itsQ = 0;
00298 
00299   itsFrameSpecValid = false;
00300 }
00301 
00302 // ######################################################################
00303 void* BufferedFrameIstream::c_fill(void* p)
00304 {
00305   try
00306     {
00307       BufferedFrameIstream* const b =
00308         static_cast<BufferedFrameIstream*>(p);
00309 
00310       // convert nub::soft_ref to nub::ref to make sure that we have a
00311       // valid object:
00312       nub::ref<FrameIstream> src = b->itsSrc;
00313 
00314       src->startStream();
00315 
00316       while (true)
00317         {
00318           if (b->itsStop)
00319             break;
00320 
00321           // get the next frame:
00322 
00323           if (!src->setFrameNumber(b->itsInputFrameNum++))
00324             {
00325               b->itsInputDone = true;
00326               return NULL;
00327             }
00328 
00329           GenericFrame f = src->readFrame();
00330           if (!f.initialized())
00331             {
00332               b->itsInputDone = true;
00333               return NULL;
00334             }
00335 
00336           // now try to push it onto the queue (and just keep
00337           // re-trying if the push fails due to the queue being full):
00338 
00339           while (true)
00340             {
00341               if (b->itsStop)
00342                 break;
00343 
00344               ASSERT(b->itsQ != 0);
00345 
00346               if (b->itsQ->push_back(f) == true)
00347                 {
00348                   b->itsNumFilled.atomic_incr();
00349                   break;
00350                 }
00351 
00352               usleep(20000);
00353             }
00354         }
00355     }
00356   catch (...)
00357     {
00358       REPORT_CURRENT_EXCEPTION;
00359       exit(1);
00360     }
00361 
00362   return NULL;
00363 }
00364 
00365 // ######################################################################
00366 /* So things look consistent in everyone's emacs... */
00367 /* Local Variables: */
00368 /* mode: c++ */
00369 /* indent-tabs-mode: nil */
00370 /* End: */
00371 
00372 #endif // TRANSPORT_BUFFEREDFRAMEISTREAM_C_DEFINED
Generated on Sun May 8 08:42:24 2011 for iLab Neuromorphic Vision Toolkit by  doxygen 1.6.3