00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "Psycho/StimListener.H"
00037 #include "Util/sformat.H"
00038
00039 #define BUFSIZE 50
00040 #define PORT "9999"
00041
00042
00043
00044
00045 StimListener::StimListener() :
00046 itsEventLog(), itsBuffer() { };
00047
00048
00049 StimListener::~StimListener() { };
00050
00051
00052 void StimListener::setBuffer(const rutz::shared_ptr<SharedBuffer<StimData> >& buffer)
00053 {
00054 itsBuffer = buffer;
00055 }
00056
00057
00058 void StimListener::setEventLog(const nub::soft_ref<EventLog>& elog)
00059 {
00060 itsEventLog = elog;
00061 }
00062
00063
00064 void StimListener::pushEvent(const std::string& msg, const bool& useLinfo)
00065 {
00066 if (useLinfo)
00067 LINFO(msg.c_str());
00068
00069 if (itsEventLog.isValid())
00070 itsEventLog->pushEvent(msg);
00071 }
00072
00073
00074 void StimListener::addData(const StimData&, const data)
00075 {
00076 itsBuffer.push(data);
00077 pushEvent(sformat("Recieved::%d bytes",d->buflen));
00078 }
00079
00080
00081 void StimListener::start()
00082 {
00083 if ( !itsBuffer.is_valid() )
00084 LFATAL("A call to setBuffer(rutz::shared_ptr<SharedBuffer<StimData> >) is"
00085 " needed to start");
00086 }
00087
00088
00089 void StimListener::stop()
00090 {
00091
00092 }
00093
00094
00095 void StimListener::run()
00096 {
00097 listen();
00098 }
00099
00100
00101 const char* StimListener::jobType() const
00102 {
00103 return "Stimulus Listener";
00104 }
00105
00106
00107 int StimListener::priority() const
00108 {
00109 return 0;
00110 }
00111
00112
00113
00114
00115 StimListenerDML::StimListenerDML(unsigned char exitcode, const unsigned char[] msg) :
00116 StimListener(), sockfd(0), myInfo(NULL),
00117 hostInfo(NULL), keepGoing(true),itsExitCode(exitcode), itsConnected(false)
00118 {
00119
00120 itsMsg = new unsigned char[sizeof(msg)];
00121 strcpy(itsMsg, msg);
00122 };
00123
00124
00125 StimListenerDML::~StimListenerDML()
00126 {
00127 delete itsMsg;
00128 delete myInfo;
00129 delete hostInfo;
00130 };
00131
00132
00133 void StimListenerDML::start()
00134 {
00135
00136 StimListener::start();
00137
00138 FD_ZERO(&master);
00139
00140
00141 struct addrinfo hints;
00142 memset(&hints, 0, sizeof hints);
00143 hints.ai_family = AF_UNSPEC;
00144 hints.ai_socktype = SOCK_DGRAM;
00145 hints.ai_flags = AI_PASSIVE;
00146
00147
00148 int rv;
00149 if ((rv = getaddrinfo(NULL, PORT, &hints, &myInfo)) < 0)
00150 {
00151 pushEvent(sformat("getaddrinfo: %s", gai_strerror(rv)),true);
00152 freeaddrinfo(myInfo);
00153 LFATAL("Fatal Error");
00154 }
00155
00156
00157 for(; myInfo != NULL; myInfo = myInfo->ai_next)
00158 {
00159 if ((sockfd = socket(myInfo->ai_family, myInfo->ai_socktype,
00160 myInfo->ai_protocol)) == -1)
00161 {
00162 pushEvent("Error::Socket",true);
00163 continue;
00164 }
00165
00166
00167 int bindr;
00168 if ((bindr = bind(sockfd, myInfo->ai_addr, myInfo->ai_addrlen)) < 0)
00169 {
00170 close(sockfd);
00171 pushEvent("Error::Listener bind",true);
00172 continue;
00173 }
00174 else break;
00175 }
00176
00177 if (myInfo == NULL)
00178 {
00179 pushEvent("Error: Failed to bind socket",true);
00180 freeaddrinfo(myInfo);
00181 LFATAL("Fatal Error");
00182 }
00183 if (hostInfo == NULL)
00184 pushEvent("No host. Getting host from packets");
00185
00186
00187 FD_SET(sockfd, &master);
00188 itsConnected = true;
00189 pushEvent("Socket creation succesful",true);
00190 itsConnected = true;
00191 }
00192
00193
00194 void StimListenerDML::stop()
00195 {
00196 StimListener::stop();
00197 keepGoing = false;
00198 usleep(50000);
00199 FD_CLR(sockfd, &master);
00200 close(sockfd);
00201
00202 if (hostInfo!=myInfo)
00203 freeaddrinfo(hostInfo);
00204 freeaddrinfo(myInfo);
00205 hostInfo = NULL;
00206 myInfo = NULL;
00207 itsConnected = false;
00208 }
00209
00210
00211 bool StimListenerDML::sendMessage(const unsigned char* msg)
00212 {
00213 if (itsConnected)
00214 {
00215
00216 int check = 0;
00217 (hostInfo != NULL)?
00218 check = sendto(sockfd, msg, strlen(msg), 0,
00219 hostInfo->ai_addr, hostInfo->ai_addrlen)
00220 :check = sendto(sockfd, msg, strlen(msg), 0,
00221 &lasthost_addr, lasthost_addrlen);
00222
00223 if (check < 0)
00224 {
00225 pushEvent("Error::Sending",true);
00226 return false;
00227 }
00228 else
00229 {
00230 pushEvent( std::string("Listener::Sent '" + std::string(msg) + "'"),
00231 true);
00232 return true;
00233 }
00234 }
00235 else
00236 {
00237 pushEvent("Must be Connected to send a message",true);
00238 return false;
00239 }
00240 }
00241
00242
00243 bool StimListenerDML::listen()
00244 {
00245 if (itsConnected)
00246 {
00247 int numbytes = 0, retval=0;
00248 unsigned char status_code = '\0';
00249 fd_set read_fds;
00250 FD_ZERO(&read_fds);
00251
00252
00253 pushEvent("Listener::Waiting",true);
00254 while( (status_code != itsExitCode) && keepGoing)
00255 {
00256 do
00257 {
00258 read_fds = master;
00259 retval = select(sockfd+1, &read_fds, NULL, NULL, NULL);
00260 }
00261 while( (retval ==-1) && (errno==EINTR) && (keepGoing) );
00262
00263 if (retval < 0)
00264 {
00265 pushEvent("Error::select",true);
00266 status_code = itsExitCode;
00267 continue;
00268 }
00269
00270 if (FD_ISSET(sockfd, &read_fds))
00271 {
00272 lasthost_addrlen = sizeof lasthost_addr;
00273 unsigned char recvbuffer[BUFSIZE];
00274 if ( (numbytes = recvfrom(sockfd, recvbuffer, BUFSIZE-1 , 0,
00275 &lasthost_addr,
00276 &lasthost_addrlen)) <= 0)
00277 {
00278
00279 if (numbytes == 0)
00280 {
00281
00282 pushEvent("Error::socket closed",true);
00283 }
00284 else
00285 {
00286 pushEvent("Error::recieve",true);
00287 }
00288 status_code = itsExitCode;
00289 }
00290 else
00291 {
00292 addData(StimData(recvbuffer,(uint)numbytes));
00293 sendOK();
00294 }
00295 }
00296 }
00297 if ((retval < 0) || (numbytes==0))
00298 return false;
00299 else
00300 return true;
00301 }
00302 else
00303 {
00304 pushEvent("Must be Conncted to listen",true);
00305 return false;
00306 }
00307 }
00308
00309
00310 void StimListenerDML::setHostName(const std::string hostname)
00311 {
00312
00313 if (!itsConnected)
00314 {
00315 struct addrinfo hints;
00316 memset(&hints, 0, sizeof hints);
00317 hints.ai_family = AF_UNSPEC;
00318 hints.ai_socktype = SOCK_DGRAM;
00319 int rv;
00320 if ( (rv = getaddrinfo(hostname.c_str(),PORT,&hints,&hostInfo)) == -1)
00321 {
00322 pushEvent(sformat("error host connection setup: %s",
00323 gai_strerror(rv)),true);
00324 freeaddrinfo(hostInfo);
00325 hostInfo = NULL;
00326 }
00327 pushEvent("Got info for " + hostname,true);
00328 }
00329 else
00330 {
00331 pushEvent("Cannot setHostName() while connected, please call stop(),"
00332 "setHostName(), and start()",true);
00333 LFATAL("Fatal Error");
00334 }
00335 }
00336
00337
00338 const char* StimListenerDML::jobType() const
00339 {
00340 return "Stimulus Listener DML";
00341 }
00342
00343
00344
00345 void StimListener::sendOK()
00346 {
00347 sendMessage(itsMsg);
00348 }
00349
00350
00351
00352
00353
00354
00355