00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #ifndef TRANSPORT_BUFFEREDFRAMEISTREAM_C_DEFINED
00039 #define TRANSPORT_BUFFEREDFRAMEISTREAM_C_DEFINED
00040
00041 #include "Transport/BufferedFrameIstream.H"
00042
00043 #include "Component/ModelOptionDef.H"
00044 #include "Transport/FrameIstreamFactory.H"
00045 #include "Transport/TransportOpts.H"
00046 #include "Util/SimTime.H"
00047 #include "Util/log.H"
00048 #include "rutz/time.h"
00049
00050
00051 static const ModelOptionDef OPT_InputBufferSize =
00052 { MODOPT_ARG(size_t), "InputBufferSize", &MOC_INPUT, OPTEXP_CORE,
00053 "Number of frames to keep in input buffer when using --in=buf",
00054 "input-buffer-size", '\0', "<int>", "32" };
00055
00056
00057 static const ModelOptionDef OPT_UnderflowStrategy =
00058 { MODOPT_ARG(ErrorStrategy), "UnderflowStrategy", &MOC_INPUT, OPTEXP_CORE,
00059 "What to do if the input buffer is empty when trying to read "
00060 "a new frame when using --in=buf; ABORT=make it a fatal error, "
00061 "RETRY=keep polling until the buffer becomes non-empty, "
00062 "IGNORE=ignore the error and return an empty frame as if "
00063 "end-of-stream had been reached",
00064 "underflow-strategy", '\0', "<ABORT|RETRY|IGNORE>", "IGNORE" };
00065
00066 struct BufferedFrameIstream::Checkpoint
00067 {
00068 Checkpoint(int f, int q, int mq) : framenum(f), qsize(q), minqsize(mq) {}
00069
00070 int framenum;
00071 int qsize;
00072 int minqsize;
00073 };
00074
00075
00076 BufferedFrameIstream::BufferedFrameIstream(OptionManager& mgr)
00077 :
00078 FrameIstream(mgr, "Buffered Input", "BufferedFrameIstream"),
00079 itsBufSize(&OPT_InputBufferSize, this),
00080 itsUnderflowStrategy(&OPT_UnderflowStrategy, this),
00081 itsSrc(),
00082 itsQ(0),
00083 itsInputDone(false),
00084 itsStop(false),
00085 itsNumFilled(),
00086 itsMinNumFilled(0),
00087 itsInputFrameNum(-1),
00088 itsOutputFrameNum(0),
00089 itsFrameSpec(),
00090 itsFrameSpecValid(false),
00091 itsCheckpoints()
00092 {
00093 if (0 != pthread_mutex_init(&itsQmut, NULL))
00094 LFATAL("pthread_mutex_init() failed");
00095 }
00096
00097
00098 BufferedFrameIstream::~BufferedFrameIstream()
00099 {
00100 delete itsQ;
00101 }
00102
00103
00104 void BufferedFrameIstream::setConfigInfo(const std::string& cfg)
00105 {
00106 if (itsSrc.is_valid())
00107 this->removeSubComponent(*itsSrc);
00108 itsSrc = makeFrameIstream(cfg);
00109 ASSERT(itsSrc.is_valid());
00110 this->addSubComponent(itsSrc);
00111 }
00112
00113
00114 bool BufferedFrameIstream::setFrameNumber(int n)
00115 {
00116 if (!itsSrc.is_valid())
00117 LFATAL("must have a valid input source before setFrameNumber()");
00118
00119 itsOutputFrameNum = n;
00120
00121 return true;
00122 }
00123
00124
00125 GenericFrameSpec BufferedFrameIstream::peekFrameSpec()
00126 {
00127 if (!itsFrameSpecValid)
00128 {
00129 if (!itsSrc.is_valid())
00130 LFATAL("must have a valid input source before peekFrameSpec()");
00131
00132 if (itsInputFrameNum < 0)
00133 {
00134 itsInputFrameNum = itsOutputFrameNum;
00135 itsSrc->setFrameNumber(itsInputFrameNum);
00136 }
00137
00138 itsFrameSpec = itsSrc->peekFrameSpec();
00139 itsFrameSpecValid = true;
00140 }
00141
00142 return itsFrameSpec;
00143 }
00144
00145
00146 SimTime BufferedFrameIstream::getNaturalFrameTime() const
00147 {
00148 if (!this->started())
00149 LFATAL("must be start()-ed before getNaturalFrameTime()");
00150
00151 ASSERT(itsSrc.is_valid());
00152
00153 return itsSrc->getNaturalFrameTime();
00154 }
00155
00156
00157 void BufferedFrameIstream::startStream()
00158 {
00159 if (!this->started())
00160 LFATAL("must be start()-ed before startStream()");
00161
00162 ASSERT(itsQ != 0);
00163
00164
00165 rutz::time t = rutz::time::wall_clock_now();
00166 while (itsNumFilled.atomic_get() < int(itsQ->size()) && !itsInputDone)
00167 {
00168 rutz::time t2 = rutz::time::wall_clock_now();
00169 if ((t2 - t).sec() >= 1.0)
00170 {
00171 LINFO("waiting for input queue to be filled (%d/%d)",
00172 itsNumFilled.atomic_get(), int(itsQ->size()));
00173 t = t2;
00174 }
00175 usleep(20000);
00176 }
00177
00178 LINFO("%d/%d entries filled in input queue%s",
00179 itsNumFilled.atomic_get(), int(itsQ->size()),
00180 itsInputDone ? " [input read to completion]" : "");
00181 }
00182
00183
00184 GenericFrame BufferedFrameIstream::readFrame()
00185 {
00186 if (!this->started())
00187 LFATAL("must be start()-ed before readFrame()");
00188
00189 ASSERT(itsQ != 0);
00190
00191 GenericFrame result;
00192
00193 while (true)
00194 {
00195 const bool pop_ok = itsQ->pop_front(result);
00196
00197 if (pop_ok)
00198 {
00199 const int n = itsNumFilled.atomic_decr_return();
00200 if (n < itsMinNumFilled)
00201 itsMinNumFilled = n;
00202 }
00203
00204
00205
00206 const bool did_underflow = (!pop_ok && !itsInputDone);
00207
00208 if (!did_underflow)
00209 break;
00210 else if (itsUnderflowStrategy.getVal() == ERR_ABORT)
00211 {
00212 LFATAL("input underflow");
00213 }
00214 else if (itsUnderflowStrategy.getVal() == ERR_RETRY)
00215 {
00216 LDEBUG("input underflow");
00217 usleep(20000);
00218 continue;
00219 }
00220 else if (itsUnderflowStrategy.getVal() == ERR_IGNORE)
00221 {
00222
00223
00224 LINFO("input underflow");
00225 break;
00226 }
00227 }
00228
00229 if (itsOutputFrameNum % 100 == 0)
00230 itsCheckpoints.push_back
00231 (Checkpoint(itsOutputFrameNum, itsNumFilled.atomic_get(),
00232 itsMinNumFilled));
00233
00234 return result;
00235 }
00236
00237
00238 void BufferedFrameIstream::start2()
00239 {
00240 if (this->started())
00241 LFATAL("must be stop()-ed before start()");
00242
00243 if (!itsSrc.is_valid())
00244 LFATAL("no underlying input source given");
00245
00246 if (itsBufSize.getVal() <= 0)
00247 LFATAL("--%s must be greater than 0, but got --%s=%"ZU,
00248 itsBufSize.getOptionDef()->longoptname,
00249 itsBufSize.getOptionDef()->longoptname,
00250 itsBufSize.getVal());
00251
00252 itsInputDone = false;
00253 itsStop = false;
00254 itsMinNumFilled = int(itsBufSize.getVal());
00255 itsCheckpoints.clear();
00256
00257
00258
00259
00260
00261
00262 (void) this->peekFrameSpec();
00263
00264 ASSERT(itsQ == 0);
00265
00266 itsQ = new rutz::circular_queue<GenericFrame>(itsBufSize.getVal());
00267
00268 if (0 != pthread_create(&itsFillThread, NULL, &c_fill,
00269 static_cast<void*>(this)))
00270 LFATAL("pthread_create() failed");
00271
00272 }
00273
00274
00275 void BufferedFrameIstream::stop1()
00276 {
00277 if (!this->started())
00278 LFATAL("must be start()-ed before stop()");
00279
00280 ASSERT(itsQ != 0);
00281
00282 itsStop = true;
00283
00284 if (0 != pthread_join(itsFillThread, NULL))
00285 LERROR("pthread_join() failed");
00286
00287 for (std::list<Checkpoint>::const_iterator
00288 itr = itsCheckpoints.begin(), stop = itsCheckpoints.end();
00289 itr != stop; ++itr)
00290 {
00291 LINFO("checkpoint frame %06d - queue fill : %d/%d (min %d)",
00292 (*itr).framenum, (*itr).qsize, int(itsQ->size()),
00293 (*itr).minqsize);
00294 }
00295
00296 delete itsQ;
00297 itsQ = 0;
00298
00299 itsFrameSpecValid = false;
00300 }
00301
00302
00303 void* BufferedFrameIstream::c_fill(void* p)
00304 {
00305 try
00306 {
00307 BufferedFrameIstream* const b =
00308 static_cast<BufferedFrameIstream*>(p);
00309
00310
00311
00312 nub::ref<FrameIstream> src = b->itsSrc;
00313
00314 src->startStream();
00315
00316 while (true)
00317 {
00318 if (b->itsStop)
00319 break;
00320
00321
00322
00323 if (!src->setFrameNumber(b->itsInputFrameNum++))
00324 {
00325 b->itsInputDone = true;
00326 return NULL;
00327 }
00328
00329 GenericFrame f = src->readFrame();
00330 if (!f.initialized())
00331 {
00332 b->itsInputDone = true;
00333 return NULL;
00334 }
00335
00336
00337
00338
00339 while (true)
00340 {
00341 if (b->itsStop)
00342 break;
00343
00344 ASSERT(b->itsQ != 0);
00345
00346 if (b->itsQ->push_back(f) == true)
00347 {
00348 b->itsNumFilled.atomic_incr();
00349 break;
00350 }
00351
00352 usleep(20000);
00353 }
00354 }
00355 }
00356 catch (...)
00357 {
00358 REPORT_CURRENT_EXCEPTION;
00359 exit(1);
00360 }
00361
00362 return NULL;
00363 }
00364
00365
00366
00367
00368
00369
00370
00371
00372 #endif // TRANSPORT_BUFFEREDFRAMEISTREAM_C_DEFINED