00001 /*!@file Devices/DiskDataStream.C Multi-threaded data streamer to disk */ 00002 00003 // //////////////////////////////////////////////////////////////////// // 00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2000-2003 // 00005 // by the University of Southern California (USC) and the iLab at USC. // 00006 // See http://iLab.usc.edu for information about this project. // 00007 // //////////////////////////////////////////////////////////////////// // 00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected // 00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency // 00010 // in Visual Environments, and Applications'' by Christof Koch and // 00011 // Laurent Itti, California Institute of Technology, 2001 (patent // 00012 // pending; application number 09/912,225 filed July 23, 2001; see // 00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status). // 00014 // //////////////////////////////////////////////////////////////////// // 00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit. // 00016 // // 00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can // 00018 // redistribute it and/or modify it under the terms of the GNU General // 00019 // Public License as published by the Free Software Foundation; either // 00020 // version 2 of the License, or (at your option) any later version. // 00021 // // 00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope // 00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the // 00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // 00025 // PURPOSE. See the GNU General Public License for more details. // 00026 // // 00027 // You should have received a copy of the GNU General Public License // 00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write // 00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, // 00030 // Boston, MA 02111-1307 USA. // 00031 // //////////////////////////////////////////////////////////////////// // 00032 // 00033 // Primary maintainer for this file: Laurent Itti <itti@usc.edu> 00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/Devices/DiskDataStream.C $ 00035 // $Id: DiskDataStream.C 12118 2009-12-04 22:47:43Z sychic $ 00036 // 00037 00038 #include "Devices/DiskDataStream.H" 00039 00040 #include "Devices/DeviceOpts.H" 00041 #include "Util/StringUtil.H" // for split() and join() 00042 #include "Util/WorkThreadServer.H" 00043 #include "Util/sformat.H" 00044 #include "Raster/GenericFrame.H" 00045 00046 #include <iterator> // for back_inserter() 00047 00048 namespace dummy_namespace_to_avoid_gcc411_bug_DiskDataStream_C 00049 { 00050 class DiskDumpJob : public JobServer::Job 00051 { 00052 public: 00053 DiskDumpJob(const GenericFrame& f, const std::string& nm, 00054 bool usemmap) 00055 : 00056 itsFrame(f), 00057 itsFname(nm), 00058 itsUseMmap(usemmap) 00059 {} 00060 00061 virtual void run() 00062 { 00063 if (itsUseMmap) 00064 // 1.01s cpu for 1000 frames (4.60s including framegrabbing) 00065 itsFrame.asVideo().diskDumpMmap(itsFname.c_str(), true); 00066 else 00067 // 1.74s cpu for 1000 frames (5.60s including framegrabbing) 00068 itsFrame.asVideo().diskDumpStdio(itsFname.c_str(), true); 00069 } 00070 00071 virtual const char* jobType() const { return "DiskDumpJob"; } 00072 00073 const GenericFrame itsFrame; 00074 const std::string itsFname; 00075 const bool itsUseMmap; 00076 }; 00077 00078 struct DiskDumpStreamData 00079 { 00080 DiskDumpStreamData() : frameNumberIn(0), frameNumberOut(0) {} 00081 00082 uint frameNumberIn;//!< How many input frames have we received 00083 uint frameNumberOut;//!< How many output frames have we saved 00084 }; 00085 } 00086 00087 using namespace dummy_namespace_to_avoid_gcc411_bug_DiskDataStream_C; 00088 00089 typedef std::map<std::string, DiskDumpStreamData> map_type; 00090 00091 // ###################################################################### 00092 struct DiskDataStream::Impl 00093 { 00094 map_type streams; 00095 }; 00096 00097 // ###################################################################### 00098 DiskDataStream::DiskDataStream(OptionManager& mgr, 00099 const std::string& descrName, 00100 const std::string& tagName) : 00101 FrameOstream(mgr, descrName, tagName), 00102 itsUseMmap(&OPT_DiskDataStreamUseMmap, this), 00103 itsNumThreads(&OPT_DiskDataStreamNumThreads, this), 00104 itsSleepUsecs(&OPT_DiskDataStreamSleepUsecs, this), 00105 itsSavePeriod(&OPT_DiskDataStreamSavePeriod, this), 00106 itsSavePath(&OPT_DiskDataStreamSavePath, this), 00107 itsFileStems(), 00108 itsServer(0), 00109 rep(new Impl) 00110 {} 00111 00112 // ###################################################################### 00113 DiskDataStream::~DiskDataStream() 00114 { 00115 delete rep; 00116 } 00117 00118 // ###################################################################### 00119 void DiskDataStream::setConfigInfo(const std::string& path) 00120 { 00121 // NOTE: if you modify any behavior here, then please update the 00122 // corresponding documentation for the global "--out" option inside 00123 // the OPT_OutputFrameSink definition in Media/MediaOpts.C 00124 00125 this->setSavePath(path); 00126 } 00127 00128 // ###################################################################### 00129 void DiskDataStream::setSavePath(const std::string& path) 00130 { 00131 LINFO("setting save path to '%s'", path.c_str()); 00132 00133 itsFileStems.resize(0); 00134 00135 if (path.size() == 0) 00136 { 00137 itsFileStems.push_back("./"); 00138 } 00139 else 00140 { 00141 split(path, ",", std::back_inserter(itsFileStems)); 00142 } 00143 00144 LINFO("got %"ZU" stems from '%s'", itsFileStems.size(), path.c_str()); 00145 00146 ASSERT(itsFileStems.size() > 0); 00147 } 00148 00149 // ###################################################################### 00150 void DiskDataStream::start2() 00151 { 00152 if (itsUseMmap.getVal()) 00153 { 00154 #ifdef MMAP_IS_NOT_THREADSAFE 00155 itsUseMmap.setVal(false); 00156 LINFO("using stdio instead of mmap " 00157 "because mmap is not threadsafe on this system"); 00158 #else 00159 LINFO("using mmap for writing"); 00160 #endif 00161 } 00162 else 00163 LINFO("using stdio for writing"); 00164 00165 ASSERT(itsServer == 0); 00166 itsServer = new WorkThreadServer("DiskDataStream", 00167 itsNumThreads.getVal()); 00168 00169 itsServer->setCheckpointPeriod(100); 00170 itsServer->setSleepUsecs(itsSleepUsecs.getVal()); 00171 setSavePath(itsSavePath.getVal()); 00172 } 00173 00174 // ###################################################################### 00175 void DiskDataStream::stop1() 00176 { 00177 LINFO("Flushing data queue to disk"); 00178 00179 // wait until queue has been fully written to disk: 00180 while (true) 00181 { 00182 const uint sz = itsServer->size(); 00183 00184 if (sz == 0) 00185 break; 00186 00187 LINFO("Flushing data queue to disk - %u remaining", sz); 00188 usleep(250000); 00189 } 00190 00191 delete itsServer; 00192 itsServer = 0; 00193 } 00194 00195 // ###################################################################### 00196 void DiskDataStream::writeFrame(const GenericFrame& frame, 00197 const std::string& shortname, 00198 const FrameInfo& auxinfo) 00199 { 00200 // NOTE: if you modify any behavior here, then please update the 00201 // corresponding documentation for the global "--out" option inside 00202 // the OPT_OutputFrameSink definition in Media/MediaOpts.C 00203 00204 DiskDumpStreamData& s = rep->streams[shortname]; 00205 00206 const uint nout = s.frameNumberIn++; 00207 if ((nout % itsSavePeriod.getVal()) != 0) 00208 // skip this frame if it doesn't match our save period 00209 return; 00210 00211 // push that video frame into our queue: 00212 00213 // compute the filename: 00214 ASSERT(itsFileStems.size() > 0); 00215 const uint n = (s.frameNumberOut % itsFileStems.size()); 00216 const std::string fname = 00217 sformat("%s%s%06d", 00218 itsFileStems[n].c_str(), shortname.c_str(), 00219 s.frameNumberOut); 00220 00221 // ready for next file: 00222 ++s.frameNumberOut; 00223 00224 // we make a deep copy since the buffer held by frame may be 00225 // "short-lived", i.e., likely to be overwritten when the next frame 00226 // is read from the original data source (mpeg file, framegrabber, 00227 // etc.) 00228 rutz::shared_ptr<DiskDumpJob> j 00229 (new DiskDumpJob(GenericFrame::deepCopyOf(frame), 00230 fname, itsUseMmap.getVal())); 00231 00232 itsServer->enqueueJob(j); 00233 } 00234 00235 // ###################################################################### 00236 void DiskDataStream::closeStream(const std::string& shortname) 00237 { 00238 map_type::iterator itr = rep->streams.find(shortname); 00239 00240 if (itr != rep->streams.end()) 00241 { 00242 (*itr).second.frameNumberIn = 0; 00243 (*itr).second.frameNumberOut = 0; 00244 } 00245 } 00246 00247 // ###################################################################### 00248 /* So things look consistent in everyone's emacs... */ 00249 /* Local Variables: */ 00250 /* indent-tabs-mode: nil */ 00251 /* End: */