nv2_label_reader.c

Go to the documentation of this file.
00001 /*!@file NeovisionII/nv2_label_reader.c */
00002 
00003 // //////////////////////////////////////////////////////////////////// //
00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2000-2005   //
00005 // by the University of Southern California (USC) and the iLab at USC.  //
00006 // See http://iLab.usc.edu for information about this project.          //
00007 // //////////////////////////////////////////////////////////////////// //
00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected //
00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency //
00010 // in Visual Environments, and Applications'' by Christof Koch and      //
00011 // Laurent Itti, California Institute of Technology, 2001 (patent       //
00012 // pending; application number 09/912,225 filed July 23, 2001; see      //
00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status).     //
00014 // //////////////////////////////////////////////////////////////////// //
00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit.       //
00016 //                                                                      //
00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can   //
00018 // redistribute it and/or modify it under the terms of the GNU General  //
00019 // Public License as published by the Free Software Foundation; either  //
00020 // version 2 of the License, or (at your option) any later version.     //
00021 //                                                                      //
00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope  //
00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the   //
00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR      //
00025 // PURPOSE.  See the GNU General Public License for more details.       //
00026 //                                                                      //
00027 // You should have received a copy of the GNU General Public License    //
00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write   //
00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,   //
00030 // Boston, MA 02111-1307 USA.                                           //
00031 // //////////////////////////////////////////////////////////////////// //
00032 //
00033 // Primary maintainer for this file: Rob Peters <rjpeters at usc dot edu>
00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/NeovisionII/nv2_label_reader.c $
00035 // $Id: nv2_label_reader.c 9497 2008-03-19 04:19:53Z rjpeters $
00036 //
00037 
00038 #ifndef NEOVISIONII_NV2_LABEL_READER_C_DEFINED
00039 #define NEOVISIONII_NV2_LABEL_READER_C_DEFINED
00040 
00041 #include "NeovisionII/nv2_label_reader.h"
00042 
00043 #include "NeovisionII/nv2_common.h"
00044 
00045 // #include order matters here:
00046 #include <sys/types.h>
00047 #include <sys/socket.h>
00048 #include <netinet/in.h>
00049 #include <arpa/inet.h>
00050 
00051 #include <errno.h>
00052 #include <fcntl.h>
00053 #include <pthread.h>
00054 #include <stdio.h>
00055 #include <stdlib.h>
00056 #include <string.h>
00057 #include <unistd.h>
00058 
00059 struct nv2_label_reader
00060 {
00061         pthread_mutex_t m_label_lock;
00062         struct nv2_patch_label m_label;
00063 
00064         pthread_mutex_t m_patch_lock;
00065         struct nv2_image_patch m_patch;
00066         enum nv2_patch_send_result m_latest_patch_send_result;
00067 
00068         int m_label_reader_port;
00069         struct in_addr m_remote_patch_reader_in_addr;
00070         int m_remote_patch_reader_port;
00071 
00072         int m_reader_socket_fd;
00073 
00074         pthread_t m_label_reader_thread;
00075         pthread_t m_patch_sender_thread;
00076 };
00077 
00078 static void cleanup_fd(void* p)
00079 {
00080         int* const fd = (int*) p;
00081         if (*fd >= 0)
00082         {
00083                 close(*fd);
00084                 *fd = -1;
00085         }
00086 }
00087 
00088 static void* run_label_reader(void* q)
00089 {
00090         struct nv2_label_reader* p = (struct nv2_label_reader*) q;
00091 
00092         pthread_cleanup_push(&cleanup_fd, &p->m_reader_socket_fd);
00093 
00094         p->m_reader_socket_fd = socket(PF_INET, SOCK_STREAM, 0);
00095         if (p->m_reader_socket_fd == -1)
00096                 nv2_fatal("socket() failed");
00097 
00098         {
00099                 const int set_on = 1;
00100                 if (setsockopt(p->m_reader_socket_fd,
00101                                SOL_SOCKET, SO_REUSEADDR,
00102                                &set_on, sizeof(set_on)) == -1)
00103                         nv2_warn("setsockopt() failed");
00104         }
00105 
00106         struct sockaddr_in name;
00107         name.sin_family = AF_INET;
00108         name.sin_addr.s_addr = htonl(INADDR_ANY); // accept from any host
00109         name.sin_port = htons(p->m_label_reader_port);
00110         if (bind(p->m_reader_socket_fd,
00111                  (struct sockaddr*)(&name), sizeof(name)) == -1)
00112                 nv2_fatal("bind() failed");
00113 
00114         listen(p->m_reader_socket_fd, 5);
00115 
00116         int client_socket_fd = -1;
00117         pthread_cleanup_push(&cleanup_fd, &client_socket_fd);
00118 
00119         while (1)
00120         {
00121                 struct sockaddr_in client_name;
00122                 socklen_t client_name_len = sizeof(client_name);
00123 
00124                 client_socket_fd =
00125                         accept(p->m_reader_socket_fd,
00126                                (struct sockaddr*)(&client_name),
00127                                &client_name_len);
00128 
00129                 if (client_socket_fd < 0)
00130                 {
00131                         // if accept() failed because
00132                         // p->m_reader_socket_fd was closed() and/or
00133                         // is less than 0, then that is because the
00134                         // main thread closed the socket to force us
00135                         // to shut down, so let's quit without any
00136                         // nv2_warn()
00137                         if (p->m_reader_socket_fd < 0)
00138                                 break;
00139 
00140                         nv2_warn("accept() failed");
00141                         continue;
00142                 }
00143 
00144                 struct nv2_patch_label l;
00145 
00146                 errno = 0;
00147                 if (nv2_robust_read(client_socket_fd, &l, sizeof(l), NULL)
00148                     != sizeof(l))
00149                 {
00150                         nv2_warn("read(nv2_patch_label) failed");
00151                         close(client_socket_fd);
00152                         client_socket_fd = -1;
00153                         continue;
00154                 }
00155 
00156                 l.protocol_version = ntohl(l.protocol_version);
00157                 l.patch_id = ntohl(l.patch_id);
00158                 l.confidence = ntohl(l.confidence);
00159                 l.source[sizeof(l.source) - 1] = '\0';
00160                 l.name[sizeof(l.name) - 1] = '\0';
00161                 l.extra_info[sizeof(l.extra_info) - 1] = '\0';
00162 
00163                 if (l.protocol_version != NV2_LABEL_PROTOCOL_VERSION)
00164                 {
00165                         fprintf(stderr,
00166                                 "wrong label protocol version "
00167                                 "(got %u, expected %u)",
00168                                 (unsigned int) l.protocol_version,
00169                                 (unsigned int) NV2_LABEL_PROTOCOL_VERSION);
00170                         exit(-1);
00171                 }
00172 
00173                 // Enter this patch label into the next available
00174                 // position in our queue:
00175                 pthread_mutex_lock(&p->m_label_lock);
00176                 p->m_label = l;
00177                 pthread_mutex_unlock(&p->m_label_lock);
00178 
00179                 close(client_socket_fd);
00180                 client_socket_fd = -1;
00181         }
00182 
00183         pthread_cleanup_pop(1);
00184         pthread_cleanup_pop(1);
00185 
00186         return (void*) 0;
00187 }
00188 
00189 static void* run_patch_sender(void* q)
00190 {
00191         struct nv2_label_reader* r = (struct nv2_label_reader*) q;
00192 
00193         int show_warnings = 1;
00194         int nfail = 0;
00195 
00196         while (1)
00197         {
00198                 struct nv2_image_patch p;
00199 
00200                 pthread_mutex_lock(&r->m_patch_lock);
00201                 p = r->m_patch;
00202                 nv2_image_patch_init_empty(&r->m_patch);
00203                 pthread_mutex_unlock(&r->m_patch_lock);
00204 
00205                 if (p.data == 0 || p.width == 0 || p.height == 0)
00206                 {
00207                         // there was no valid pending patch, so let's
00208                         // wait very briefly and then check again
00209                         nv2_image_patch_destroy(&p);
00210                         usleep(1000);
00211 
00212                         continue;
00213                 }
00214 
00215                 r->m_latest_patch_send_result =
00216                         nv2_label_reader_send_patch_sync
00217                         (r, &p, show_warnings);
00218 
00219                 if (NV2_PATCH_SEND_FAIL
00220                     == r->m_latest_patch_send_result)
00221                 {
00222                         sleep(1);
00223 
00224                         // ok, we've already showed one warning about
00225                         // the patch send failing so let's not show
00226                         // any more consecutive warnings
00227                         show_warnings = 0;
00228 
00229                         // keep track of how many consecutive failures
00230                         // we've had so that we can print a warning
00231                         // about that if/when we finally do a
00232                         // successful send again
00233                         ++nfail;
00234                 }
00235                 else
00236                 {
00237                         show_warnings = 1;
00238 
00239                         if (nfail > 0)
00240                         {
00241                                 char buf[64];
00242                                 snprintf(&buf[0], sizeof(buf),
00243                                          "%d consecutive patch "
00244                                          "send attempts failed",
00245                                          nfail);
00246                                 errno = 0;
00247                                 nv2_warn(&buf[0]);
00248                         }
00249                         nfail = 0;
00250                 }
00251         }
00252 
00253         return (void*) 0;
00254 }
00255 
00256 struct nv2_label_reader* nv2_label_reader_create(
00257         const int label_reader_port,
00258         const char* remote_patch_reader_addr,
00259         const int remote_patch_reader_port)
00260 {
00261         struct nv2_label_reader* p =
00262                 malloc(sizeof(struct nv2_label_reader));
00263 
00264         if (p == 0)
00265                 nv2_fatal("malloc() failed");
00266 
00267         pthread_mutex_init(&p->m_label_lock, 0);
00268         nv2_patch_label_init_empty(&p->m_label);
00269 
00270         pthread_mutex_init(&p->m_patch_lock, 0);
00271         nv2_image_patch_init_empty(&p->m_patch);
00272         p->m_latest_patch_send_result = NV2_PATCH_SEND_OK;
00273 
00274         p->m_label_reader_port = label_reader_port;
00275 
00276         if (inet_aton(remote_patch_reader_addr,
00277                       &p->m_remote_patch_reader_in_addr) == 0)
00278                 nv2_fatal("inet_aton() failed");
00279 
00280         p->m_remote_patch_reader_port = remote_patch_reader_port;
00281 
00282         p->m_reader_socket_fd = -1;
00283 
00284         if (0 != pthread_create(&p->m_label_reader_thread, 0,
00285                                 &run_label_reader,
00286                                 (void*) p))
00287                 nv2_fatal("pthread_create() failed (label reader thread)");
00288 
00289         if (0 != pthread_create(&p->m_patch_sender_thread, 0,
00290                                 &run_patch_sender,
00291                                 (void*) p))
00292                 nv2_fatal("pthread_create() failed (patch sender thread)");
00293 
00294         return p;
00295 }
00296 
00297 void nv2_label_reader_destroy(struct nv2_label_reader* p)
00298 {
00299         pthread_cancel(p->m_label_reader_thread);
00300         pthread_cancel(p->m_patch_sender_thread);
00301 
00302         if (p->m_reader_socket_fd > 0)
00303         {
00304                 const int close_me = p->m_reader_socket_fd;
00305                 p->m_reader_socket_fd = -1;
00306                 close(close_me);
00307         }
00308 
00309         if (0 != pthread_join(p->m_label_reader_thread, 0))
00310                 nv2_fatal("pthread_join() failed");
00311 
00312         if (0 != pthread_join(p->m_patch_sender_thread, 0))
00313                 nv2_fatal("pthread_join() failed");
00314 
00315         pthread_mutex_destroy(&p->m_label_lock);
00316         pthread_mutex_destroy(&p->m_patch_lock);
00317         nv2_image_patch_destroy(&p->m_patch);
00318 }
00319 
00320 int nv2_label_reader_get_current_label(struct nv2_label_reader* p,
00321                                        struct nv2_patch_label* l)
00322 {
00323         pthread_mutex_lock(&p->m_label_lock);
00324         *l = p->m_label;
00325         nv2_patch_label_init_empty(&p->m_label);
00326         pthread_mutex_unlock(&p->m_label_lock);
00327 
00328         if (l->patch_id == 0)
00329                 return 0;
00330         else
00331                 return 1;
00332 }
00333 
00334 void
00335 nv2_label_reader_send_patch(struct nv2_label_reader* r,
00336                             struct nv2_image_patch* p)
00337 {
00338         pthread_mutex_lock(&r->m_patch_lock);
00339         nv2_image_patch_destroy(&r->m_patch);
00340         r->m_patch = *p;
00341         nv2_image_patch_init_empty(p);
00342         pthread_mutex_unlock(&r->m_patch_lock);
00343 }
00344 
00345 enum nv2_patch_send_result
00346 nv2_label_reader_send_patch_sync(struct nv2_label_reader* r,
00347                                  struct nv2_image_patch* p,
00348                                  const int show_warnings)
00349 {
00350         const int socket_fd = socket(PF_INET, SOCK_STREAM, 0);
00351 
00352         if (socket_fd == -1)
00353                 nv2_fatal("socket() failed");
00354 
00355         struct sockaddr_in name;
00356         name.sin_family = AF_INET;
00357         name.sin_addr = r->m_remote_patch_reader_in_addr;
00358         name.sin_port = htons(r->m_remote_patch_reader_port);
00359 
00360         struct nv2_image_patch np; // network-byte-order patch
00361 
00362         if (p) np = *p;
00363         else nv2_image_patch_init_empty(&np);
00364 
00365         const uint32_t npixbytes =
00366                 np.width * np.height * nv2_pixel_type_bytes_per_pixel(np.type);
00367 
00368         np.protocol_version  = htonl(np.protocol_version);
00369         np.width             = htonl(np.width);
00370         np.height            = htonl(np.height);
00371         np.id                = htonl(np.id);
00372         np.is_training_image = htonl(np.is_training_image);
00373         np.type              = htonl(np.type);
00374         np.training_label[sizeof(np.training_label)-1] = '\0';
00375         np.remote_command[sizeof(np.remote_command)-1] = '\0';
00376 
00377         enum nv2_patch_send_result retval = NV2_PATCH_SEND_OK;
00378 
00379         if (connect(socket_fd, (struct sockaddr*)(&name), sizeof(name))
00380             == -1)
00381         {
00382                 if (show_warnings)
00383                         nv2_warn("while attempting to send patch: "
00384                                  "connect() failed");
00385                 retval = NV2_PATCH_SEND_FAIL;
00386         }
00387         else if (send(socket_fd, &np, NV2_IMAGE_PATCH_HEADER_SIZE, 0)
00388                  != NV2_IMAGE_PATCH_HEADER_SIZE)
00389         {
00390                 if (show_warnings)
00391                         nv2_warn("while attempting to send patch: "
00392                                  "send(header) failed");
00393                 retval = NV2_PATCH_SEND_FAIL;
00394         }
00395         else if (npixbytes > 0)
00396         {
00397                 const ssize_t nwritten =
00398                         nv2_robust_write(socket_fd, np.data, npixbytes);
00399 
00400                 if (nwritten != npixbytes)
00401                 {
00402                         if (show_warnings)
00403                                 nv2_warn("while attempting to send patch: "
00404                                          "send(pixels) failed");
00405                         retval = NV2_PATCH_SEND_FAIL;
00406                 }
00407         }
00408 
00409         close(socket_fd);
00410 
00411         nv2_image_patch_destroy(p);
00412 
00413         return retval;
00414 }
00415 
00416 // ######################################################################
00417 /* So things look consistent in everyone's emacs... */
00418 /* Local Variables: */
00419 /* indent-tabs-mode: nil */
00420 /* c-file-style: "linux" */
00421 /* End: */
00422 
00423 #endif // NEOVISIONII_NV2_LABEL_READER_C_DEFINED
Generated on Sun May 8 08:05:23 2011 for iLab Neuromorphic Vision Toolkit by  doxygen 1.6.3