00001 /*!@file NeovisionII/nv2_label_reader.c */ 00002 00003 // //////////////////////////////////////////////////////////////////// // 00004 // The iLab Neuromorphic Vision C++ Toolkit - Copyright (C) 2000-2005 // 00005 // by the University of Southern California (USC) and the iLab at USC. // 00006 // See http://iLab.usc.edu for information about this project. // 00007 // //////////////////////////////////////////////////////////////////// // 00008 // Major portions of the iLab Neuromorphic Vision Toolkit are protected // 00009 // under the U.S. patent ``Computation of Intrinsic Perceptual Saliency // 00010 // in Visual Environments, and Applications'' by Christof Koch and // 00011 // Laurent Itti, California Institute of Technology, 2001 (patent // 00012 // pending; application number 09/912,225 filed July 23, 2001; see // 00013 // http://pair.uspto.gov/cgi-bin/final/home.pl for current status). // 00014 // //////////////////////////////////////////////////////////////////// // 00015 // This file is part of the iLab Neuromorphic Vision C++ Toolkit. // 00016 // // 00017 // The iLab Neuromorphic Vision C++ Toolkit is free software; you can // 00018 // redistribute it and/or modify it under the terms of the GNU General // 00019 // Public License as published by the Free Software Foundation; either // 00020 // version 2 of the License, or (at your option) any later version. // 00021 // // 00022 // The iLab Neuromorphic Vision C++ Toolkit is distributed in the hope // 00023 // that it will be useful, but WITHOUT ANY WARRANTY; without even the // 00024 // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // 00025 // PURPOSE. See the GNU General Public License for more details. // 00026 // // 00027 // You should have received a copy of the GNU General Public License // 00028 // along with the iLab Neuromorphic Vision C++ Toolkit; if not, write // 00029 // to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, // 00030 // Boston, MA 02111-1307 USA. // 00031 // //////////////////////////////////////////////////////////////////// // 00032 // 00033 // Primary maintainer for this file: Rob Peters <rjpeters at usc dot edu> 00034 // $HeadURL: svn://isvn.usc.edu/software/invt/trunk/saliency/src/NeovisionII/nv2_label_reader.c $ 00035 // $Id: nv2_label_reader.c 9497 2008-03-19 04:19:53Z rjpeters $ 00036 // 00037 00038 #ifndef NEOVISIONII_NV2_LABEL_READER_C_DEFINED 00039 #define NEOVISIONII_NV2_LABEL_READER_C_DEFINED 00040 00041 #include "NeovisionII/nv2_label_reader.h" 00042 00043 #include "NeovisionII/nv2_common.h" 00044 00045 // #include order matters here: 00046 #include <sys/types.h> 00047 #include <sys/socket.h> 00048 #include <netinet/in.h> 00049 #include <arpa/inet.h> 00050 00051 #include <errno.h> 00052 #include <fcntl.h> 00053 #include <pthread.h> 00054 #include <stdio.h> 00055 #include <stdlib.h> 00056 #include <string.h> 00057 #include <unistd.h> 00058 00059 struct nv2_label_reader 00060 { 00061 pthread_mutex_t m_label_lock; 00062 struct nv2_patch_label m_label; 00063 00064 pthread_mutex_t m_patch_lock; 00065 struct nv2_image_patch m_patch; 00066 enum nv2_patch_send_result m_latest_patch_send_result; 00067 00068 int m_label_reader_port; 00069 struct in_addr m_remote_patch_reader_in_addr; 00070 int m_remote_patch_reader_port; 00071 00072 int m_reader_socket_fd; 00073 00074 pthread_t m_label_reader_thread; 00075 pthread_t m_patch_sender_thread; 00076 }; 00077 00078 static void cleanup_fd(void* p) 00079 { 00080 int* const fd = (int*) p; 00081 if (*fd >= 0) 00082 { 00083 close(*fd); 00084 *fd = -1; 00085 } 00086 } 00087 00088 static void* run_label_reader(void* q) 00089 { 00090 struct nv2_label_reader* p = (struct nv2_label_reader*) q; 00091 00092 pthread_cleanup_push(&cleanup_fd, &p->m_reader_socket_fd); 00093 00094 p->m_reader_socket_fd = socket(PF_INET, SOCK_STREAM, 0); 00095 if (p->m_reader_socket_fd == -1) 00096 nv2_fatal("socket() failed"); 00097 00098 { 00099 const int set_on = 1; 00100 if (setsockopt(p->m_reader_socket_fd, 00101 SOL_SOCKET, SO_REUSEADDR, 00102 &set_on, sizeof(set_on)) == -1) 00103 nv2_warn("setsockopt() failed"); 00104 } 00105 00106 struct sockaddr_in name; 00107 name.sin_family = AF_INET; 00108 name.sin_addr.s_addr = htonl(INADDR_ANY); // accept from any host 00109 name.sin_port = htons(p->m_label_reader_port); 00110 if (bind(p->m_reader_socket_fd, 00111 (struct sockaddr*)(&name), sizeof(name)) == -1) 00112 nv2_fatal("bind() failed"); 00113 00114 listen(p->m_reader_socket_fd, 5); 00115 00116 int client_socket_fd = -1; 00117 pthread_cleanup_push(&cleanup_fd, &client_socket_fd); 00118 00119 while (1) 00120 { 00121 struct sockaddr_in client_name; 00122 socklen_t client_name_len = sizeof(client_name); 00123 00124 client_socket_fd = 00125 accept(p->m_reader_socket_fd, 00126 (struct sockaddr*)(&client_name), 00127 &client_name_len); 00128 00129 if (client_socket_fd < 0) 00130 { 00131 // if accept() failed because 00132 // p->m_reader_socket_fd was closed() and/or 00133 // is less than 0, then that is because the 00134 // main thread closed the socket to force us 00135 // to shut down, so let's quit without any 00136 // nv2_warn() 00137 if (p->m_reader_socket_fd < 0) 00138 break; 00139 00140 nv2_warn("accept() failed"); 00141 continue; 00142 } 00143 00144 struct nv2_patch_label l; 00145 00146 errno = 0; 00147 if (nv2_robust_read(client_socket_fd, &l, sizeof(l), NULL) 00148 != sizeof(l)) 00149 { 00150 nv2_warn("read(nv2_patch_label) failed"); 00151 close(client_socket_fd); 00152 client_socket_fd = -1; 00153 continue; 00154 } 00155 00156 l.protocol_version = ntohl(l.protocol_version); 00157 l.patch_id = ntohl(l.patch_id); 00158 l.confidence = ntohl(l.confidence); 00159 l.source[sizeof(l.source) - 1] = '\0'; 00160 l.name[sizeof(l.name) - 1] = '\0'; 00161 l.extra_info[sizeof(l.extra_info) - 1] = '\0'; 00162 00163 if (l.protocol_version != NV2_LABEL_PROTOCOL_VERSION) 00164 { 00165 fprintf(stderr, 00166 "wrong label protocol version " 00167 "(got %u, expected %u)", 00168 (unsigned int) l.protocol_version, 00169 (unsigned int) NV2_LABEL_PROTOCOL_VERSION); 00170 exit(-1); 00171 } 00172 00173 // Enter this patch label into the next available 00174 // position in our queue: 00175 pthread_mutex_lock(&p->m_label_lock); 00176 p->m_label = l; 00177 pthread_mutex_unlock(&p->m_label_lock); 00178 00179 close(client_socket_fd); 00180 client_socket_fd = -1; 00181 } 00182 00183 pthread_cleanup_pop(1); 00184 pthread_cleanup_pop(1); 00185 00186 return (void*) 0; 00187 } 00188 00189 static void* run_patch_sender(void* q) 00190 { 00191 struct nv2_label_reader* r = (struct nv2_label_reader*) q; 00192 00193 int show_warnings = 1; 00194 int nfail = 0; 00195 00196 while (1) 00197 { 00198 struct nv2_image_patch p; 00199 00200 pthread_mutex_lock(&r->m_patch_lock); 00201 p = r->m_patch; 00202 nv2_image_patch_init_empty(&r->m_patch); 00203 pthread_mutex_unlock(&r->m_patch_lock); 00204 00205 if (p.data == 0 || p.width == 0 || p.height == 0) 00206 { 00207 // there was no valid pending patch, so let's 00208 // wait very briefly and then check again 00209 nv2_image_patch_destroy(&p); 00210 usleep(1000); 00211 00212 continue; 00213 } 00214 00215 r->m_latest_patch_send_result = 00216 nv2_label_reader_send_patch_sync 00217 (r, &p, show_warnings); 00218 00219 if (NV2_PATCH_SEND_FAIL 00220 == r->m_latest_patch_send_result) 00221 { 00222 sleep(1); 00223 00224 // ok, we've already showed one warning about 00225 // the patch send failing so let's not show 00226 // any more consecutive warnings 00227 show_warnings = 0; 00228 00229 // keep track of how many consecutive failures 00230 // we've had so that we can print a warning 00231 // about that if/when we finally do a 00232 // successful send again 00233 ++nfail; 00234 } 00235 else 00236 { 00237 show_warnings = 1; 00238 00239 if (nfail > 0) 00240 { 00241 char buf[64]; 00242 snprintf(&buf[0], sizeof(buf), 00243 "%d consecutive patch " 00244 "send attempts failed", 00245 nfail); 00246 errno = 0; 00247 nv2_warn(&buf[0]); 00248 } 00249 nfail = 0; 00250 } 00251 } 00252 00253 return (void*) 0; 00254 } 00255 00256 struct nv2_label_reader* nv2_label_reader_create( 00257 const int label_reader_port, 00258 const char* remote_patch_reader_addr, 00259 const int remote_patch_reader_port) 00260 { 00261 struct nv2_label_reader* p = 00262 malloc(sizeof(struct nv2_label_reader)); 00263 00264 if (p == 0) 00265 nv2_fatal("malloc() failed"); 00266 00267 pthread_mutex_init(&p->m_label_lock, 0); 00268 nv2_patch_label_init_empty(&p->m_label); 00269 00270 pthread_mutex_init(&p->m_patch_lock, 0); 00271 nv2_image_patch_init_empty(&p->m_patch); 00272 p->m_latest_patch_send_result = NV2_PATCH_SEND_OK; 00273 00274 p->m_label_reader_port = label_reader_port; 00275 00276 if (inet_aton(remote_patch_reader_addr, 00277 &p->m_remote_patch_reader_in_addr) == 0) 00278 nv2_fatal("inet_aton() failed"); 00279 00280 p->m_remote_patch_reader_port = remote_patch_reader_port; 00281 00282 p->m_reader_socket_fd = -1; 00283 00284 if (0 != pthread_create(&p->m_label_reader_thread, 0, 00285 &run_label_reader, 00286 (void*) p)) 00287 nv2_fatal("pthread_create() failed (label reader thread)"); 00288 00289 if (0 != pthread_create(&p->m_patch_sender_thread, 0, 00290 &run_patch_sender, 00291 (void*) p)) 00292 nv2_fatal("pthread_create() failed (patch sender thread)"); 00293 00294 return p; 00295 } 00296 00297 void nv2_label_reader_destroy(struct nv2_label_reader* p) 00298 { 00299 pthread_cancel(p->m_label_reader_thread); 00300 pthread_cancel(p->m_patch_sender_thread); 00301 00302 if (p->m_reader_socket_fd > 0) 00303 { 00304 const int close_me = p->m_reader_socket_fd; 00305 p->m_reader_socket_fd = -1; 00306 close(close_me); 00307 } 00308 00309 if (0 != pthread_join(p->m_label_reader_thread, 0)) 00310 nv2_fatal("pthread_join() failed"); 00311 00312 if (0 != pthread_join(p->m_patch_sender_thread, 0)) 00313 nv2_fatal("pthread_join() failed"); 00314 00315 pthread_mutex_destroy(&p->m_label_lock); 00316 pthread_mutex_destroy(&p->m_patch_lock); 00317 nv2_image_patch_destroy(&p->m_patch); 00318 } 00319 00320 int nv2_label_reader_get_current_label(struct nv2_label_reader* p, 00321 struct nv2_patch_label* l) 00322 { 00323 pthread_mutex_lock(&p->m_label_lock); 00324 *l = p->m_label; 00325 nv2_patch_label_init_empty(&p->m_label); 00326 pthread_mutex_unlock(&p->m_label_lock); 00327 00328 if (l->patch_id == 0) 00329 return 0; 00330 else 00331 return 1; 00332 } 00333 00334 void 00335 nv2_label_reader_send_patch(struct nv2_label_reader* r, 00336 struct nv2_image_patch* p) 00337 { 00338 pthread_mutex_lock(&r->m_patch_lock); 00339 nv2_image_patch_destroy(&r->m_patch); 00340 r->m_patch = *p; 00341 nv2_image_patch_init_empty(p); 00342 pthread_mutex_unlock(&r->m_patch_lock); 00343 } 00344 00345 enum nv2_patch_send_result 00346 nv2_label_reader_send_patch_sync(struct nv2_label_reader* r, 00347 struct nv2_image_patch* p, 00348 const int show_warnings) 00349 { 00350 const int socket_fd = socket(PF_INET, SOCK_STREAM, 0); 00351 00352 if (socket_fd == -1) 00353 nv2_fatal("socket() failed"); 00354 00355 struct sockaddr_in name; 00356 name.sin_family = AF_INET; 00357 name.sin_addr = r->m_remote_patch_reader_in_addr; 00358 name.sin_port = htons(r->m_remote_patch_reader_port); 00359 00360 struct nv2_image_patch np; // network-byte-order patch 00361 00362 if (p) np = *p; 00363 else nv2_image_patch_init_empty(&np); 00364 00365 const uint32_t npixbytes = 00366 np.width * np.height * nv2_pixel_type_bytes_per_pixel(np.type); 00367 00368 np.protocol_version = htonl(np.protocol_version); 00369 np.width = htonl(np.width); 00370 np.height = htonl(np.height); 00371 np.id = htonl(np.id); 00372 np.is_training_image = htonl(np.is_training_image); 00373 np.type = htonl(np.type); 00374 np.training_label[sizeof(np.training_label)-1] = '\0'; 00375 np.remote_command[sizeof(np.remote_command)-1] = '\0'; 00376 00377 enum nv2_patch_send_result retval = NV2_PATCH_SEND_OK; 00378 00379 if (connect(socket_fd, (struct sockaddr*)(&name), sizeof(name)) 00380 == -1) 00381 { 00382 if (show_warnings) 00383 nv2_warn("while attempting to send patch: " 00384 "connect() failed"); 00385 retval = NV2_PATCH_SEND_FAIL; 00386 } 00387 else if (send(socket_fd, &np, NV2_IMAGE_PATCH_HEADER_SIZE, 0) 00388 != NV2_IMAGE_PATCH_HEADER_SIZE) 00389 { 00390 if (show_warnings) 00391 nv2_warn("while attempting to send patch: " 00392 "send(header) failed"); 00393 retval = NV2_PATCH_SEND_FAIL; 00394 } 00395 else if (npixbytes > 0) 00396 { 00397 const ssize_t nwritten = 00398 nv2_robust_write(socket_fd, np.data, npixbytes); 00399 00400 if (nwritten != npixbytes) 00401 { 00402 if (show_warnings) 00403 nv2_warn("while attempting to send patch: " 00404 "send(pixels) failed"); 00405 retval = NV2_PATCH_SEND_FAIL; 00406 } 00407 } 00408 00409 close(socket_fd); 00410 00411 nv2_image_patch_destroy(p); 00412 00413 return retval; 00414 } 00415 00416 // ###################################################################### 00417 /* So things look consistent in everyone's emacs... */ 00418 /* Local Variables: */ 00419 /* indent-tabs-mode: nil */ 00420 /* c-file-style: "linux" */ 00421 /* End: */ 00422 00423 #endif // NEOVISIONII_NV2_LABEL_READER_C_DEFINED