00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #ifndef NEOVISIONII_NV2_LABEL_READER_C_DEFINED
00039 #define NEOVISIONII_NV2_LABEL_READER_C_DEFINED
00040
00041 #include "NeovisionII/nv2_label_reader.h"
00042
00043 #include "NeovisionII/nv2_common.h"
00044
00045
00046 #include <sys/types.h>
00047 #include <sys/socket.h>
00048 #include <netinet/in.h>
00049 #include <arpa/inet.h>
00050
00051 #include <errno.h>
00052 #include <fcntl.h>
00053 #include <pthread.h>
00054 #include <stdio.h>
00055 #include <stdlib.h>
00056 #include <string.h>
00057 #include <unistd.h>
00058
00059 struct nv2_label_reader
00060 {
00061 pthread_mutex_t m_label_lock;
00062 struct nv2_patch_label m_label;
00063
00064 pthread_mutex_t m_patch_lock;
00065 struct nv2_image_patch m_patch;
00066 enum nv2_patch_send_result m_latest_patch_send_result;
00067
00068 int m_label_reader_port;
00069 struct in_addr m_remote_patch_reader_in_addr;
00070 int m_remote_patch_reader_port;
00071
00072 int m_reader_socket_fd;
00073
00074 pthread_t m_label_reader_thread;
00075 pthread_t m_patch_sender_thread;
00076 };
00077
00078 static void cleanup_fd(void* p)
00079 {
00080 int* const fd = (int*) p;
00081 if (*fd >= 0)
00082 {
00083 close(*fd);
00084 *fd = -1;
00085 }
00086 }
00087
00088 static void* run_label_reader(void* q)
00089 {
00090 struct nv2_label_reader* p = (struct nv2_label_reader*) q;
00091
00092 pthread_cleanup_push(&cleanup_fd, &p->m_reader_socket_fd);
00093
00094 p->m_reader_socket_fd = socket(PF_INET, SOCK_STREAM, 0);
00095 if (p->m_reader_socket_fd == -1)
00096 nv2_fatal("socket() failed");
00097
00098 {
00099 const int set_on = 1;
00100 if (setsockopt(p->m_reader_socket_fd,
00101 SOL_SOCKET, SO_REUSEADDR,
00102 &set_on, sizeof(set_on)) == -1)
00103 nv2_warn("setsockopt() failed");
00104 }
00105
00106 struct sockaddr_in name;
00107 name.sin_family = AF_INET;
00108 name.sin_addr.s_addr = htonl(INADDR_ANY);
00109 name.sin_port = htons(p->m_label_reader_port);
00110 if (bind(p->m_reader_socket_fd,
00111 (struct sockaddr*)(&name), sizeof(name)) == -1)
00112 nv2_fatal("bind() failed");
00113
00114 listen(p->m_reader_socket_fd, 5);
00115
00116 int client_socket_fd = -1;
00117 pthread_cleanup_push(&cleanup_fd, &client_socket_fd);
00118
00119 while (1)
00120 {
00121 struct sockaddr_in client_name;
00122 socklen_t client_name_len = sizeof(client_name);
00123
00124 client_socket_fd =
00125 accept(p->m_reader_socket_fd,
00126 (struct sockaddr*)(&client_name),
00127 &client_name_len);
00128
00129 if (client_socket_fd < 0)
00130 {
00131
00132
00133
00134
00135
00136
00137 if (p->m_reader_socket_fd < 0)
00138 break;
00139
00140 nv2_warn("accept() failed");
00141 continue;
00142 }
00143
00144 struct nv2_patch_label l;
00145
00146 errno = 0;
00147 if (nv2_robust_read(client_socket_fd, &l, sizeof(l), NULL)
00148 != sizeof(l))
00149 {
00150 nv2_warn("read(nv2_patch_label) failed");
00151 close(client_socket_fd);
00152 client_socket_fd = -1;
00153 continue;
00154 }
00155
00156 l.protocol_version = ntohl(l.protocol_version);
00157 l.patch_id = ntohl(l.patch_id);
00158 l.confidence = ntohl(l.confidence);
00159 l.source[sizeof(l.source) - 1] = '\0';
00160 l.name[sizeof(l.name) - 1] = '\0';
00161 l.extra_info[sizeof(l.extra_info) - 1] = '\0';
00162
00163 if (l.protocol_version != NV2_LABEL_PROTOCOL_VERSION)
00164 {
00165 fprintf(stderr,
00166 "wrong label protocol version "
00167 "(got %u, expected %u)",
00168 (unsigned int) l.protocol_version,
00169 (unsigned int) NV2_LABEL_PROTOCOL_VERSION);
00170 exit(-1);
00171 }
00172
00173
00174
00175 pthread_mutex_lock(&p->m_label_lock);
00176 p->m_label = l;
00177 pthread_mutex_unlock(&p->m_label_lock);
00178
00179 close(client_socket_fd);
00180 client_socket_fd = -1;
00181 }
00182
00183 pthread_cleanup_pop(1);
00184 pthread_cleanup_pop(1);
00185
00186 return (void*) 0;
00187 }
00188
00189 static void* run_patch_sender(void* q)
00190 {
00191 struct nv2_label_reader* r = (struct nv2_label_reader*) q;
00192
00193 int show_warnings = 1;
00194 int nfail = 0;
00195
00196 while (1)
00197 {
00198 struct nv2_image_patch p;
00199
00200 pthread_mutex_lock(&r->m_patch_lock);
00201 p = r->m_patch;
00202 nv2_image_patch_init_empty(&r->m_patch);
00203 pthread_mutex_unlock(&r->m_patch_lock);
00204
00205 if (p.data == 0 || p.width == 0 || p.height == 0)
00206 {
00207
00208
00209 nv2_image_patch_destroy(&p);
00210 usleep(1000);
00211
00212 continue;
00213 }
00214
00215 r->m_latest_patch_send_result =
00216 nv2_label_reader_send_patch_sync
00217 (r, &p, show_warnings);
00218
00219 if (NV2_PATCH_SEND_FAIL
00220 == r->m_latest_patch_send_result)
00221 {
00222 sleep(1);
00223
00224
00225
00226
00227 show_warnings = 0;
00228
00229
00230
00231
00232
00233 ++nfail;
00234 }
00235 else
00236 {
00237 show_warnings = 1;
00238
00239 if (nfail > 0)
00240 {
00241 char buf[64];
00242 snprintf(&buf[0], sizeof(buf),
00243 "%d consecutive patch "
00244 "send attempts failed",
00245 nfail);
00246 errno = 0;
00247 nv2_warn(&buf[0]);
00248 }
00249 nfail = 0;
00250 }
00251 }
00252
00253 return (void*) 0;
00254 }
00255
00256 struct nv2_label_reader* nv2_label_reader_create(
00257 const int label_reader_port,
00258 const char* remote_patch_reader_addr,
00259 const int remote_patch_reader_port)
00260 {
00261 struct nv2_label_reader* p =
00262 malloc(sizeof(struct nv2_label_reader));
00263
00264 if (p == 0)
00265 nv2_fatal("malloc() failed");
00266
00267 pthread_mutex_init(&p->m_label_lock, 0);
00268 nv2_patch_label_init_empty(&p->m_label);
00269
00270 pthread_mutex_init(&p->m_patch_lock, 0);
00271 nv2_image_patch_init_empty(&p->m_patch);
00272 p->m_latest_patch_send_result = NV2_PATCH_SEND_OK;
00273
00274 p->m_label_reader_port = label_reader_port;
00275
00276 if (inet_aton(remote_patch_reader_addr,
00277 &p->m_remote_patch_reader_in_addr) == 0)
00278 nv2_fatal("inet_aton() failed");
00279
00280 p->m_remote_patch_reader_port = remote_patch_reader_port;
00281
00282 p->m_reader_socket_fd = -1;
00283
00284 if (0 != pthread_create(&p->m_label_reader_thread, 0,
00285 &run_label_reader,
00286 (void*) p))
00287 nv2_fatal("pthread_create() failed (label reader thread)");
00288
00289 if (0 != pthread_create(&p->m_patch_sender_thread, 0,
00290 &run_patch_sender,
00291 (void*) p))
00292 nv2_fatal("pthread_create() failed (patch sender thread)");
00293
00294 return p;
00295 }
00296
00297 void nv2_label_reader_destroy(struct nv2_label_reader* p)
00298 {
00299 pthread_cancel(p->m_label_reader_thread);
00300 pthread_cancel(p->m_patch_sender_thread);
00301
00302 if (p->m_reader_socket_fd > 0)
00303 {
00304 const int close_me = p->m_reader_socket_fd;
00305 p->m_reader_socket_fd = -1;
00306 close(close_me);
00307 }
00308
00309 if (0 != pthread_join(p->m_label_reader_thread, 0))
00310 nv2_fatal("pthread_join() failed");
00311
00312 if (0 != pthread_join(p->m_patch_sender_thread, 0))
00313 nv2_fatal("pthread_join() failed");
00314
00315 pthread_mutex_destroy(&p->m_label_lock);
00316 pthread_mutex_destroy(&p->m_patch_lock);
00317 nv2_image_patch_destroy(&p->m_patch);
00318 }
00319
00320 int nv2_label_reader_get_current_label(struct nv2_label_reader* p,
00321 struct nv2_patch_label* l)
00322 {
00323 pthread_mutex_lock(&p->m_label_lock);
00324 *l = p->m_label;
00325 nv2_patch_label_init_empty(&p->m_label);
00326 pthread_mutex_unlock(&p->m_label_lock);
00327
00328 if (l->patch_id == 0)
00329 return 0;
00330 else
00331 return 1;
00332 }
00333
00334 void
00335 nv2_label_reader_send_patch(struct nv2_label_reader* r,
00336 struct nv2_image_patch* p)
00337 {
00338 pthread_mutex_lock(&r->m_patch_lock);
00339 nv2_image_patch_destroy(&r->m_patch);
00340 r->m_patch = *p;
00341 nv2_image_patch_init_empty(p);
00342 pthread_mutex_unlock(&r->m_patch_lock);
00343 }
00344
00345 enum nv2_patch_send_result
00346 nv2_label_reader_send_patch_sync(struct nv2_label_reader* r,
00347 struct nv2_image_patch* p,
00348 const int show_warnings)
00349 {
00350 const int socket_fd = socket(PF_INET, SOCK_STREAM, 0);
00351
00352 if (socket_fd == -1)
00353 nv2_fatal("socket() failed");
00354
00355 struct sockaddr_in name;
00356 name.sin_family = AF_INET;
00357 name.sin_addr = r->m_remote_patch_reader_in_addr;
00358 name.sin_port = htons(r->m_remote_patch_reader_port);
00359
00360 struct nv2_image_patch np;
00361
00362 if (p) np = *p;
00363 else nv2_image_patch_init_empty(&np);
00364
00365 const uint32_t npixbytes =
00366 np.width * np.height * nv2_pixel_type_bytes_per_pixel(np.type);
00367
00368 np.protocol_version = htonl(np.protocol_version);
00369 np.width = htonl(np.width);
00370 np.height = htonl(np.height);
00371 np.id = htonl(np.id);
00372 np.is_training_image = htonl(np.is_training_image);
00373 np.type = htonl(np.type);
00374 np.training_label[sizeof(np.training_label)-1] = '\0';
00375 np.remote_command[sizeof(np.remote_command)-1] = '\0';
00376
00377 enum nv2_patch_send_result retval = NV2_PATCH_SEND_OK;
00378
00379 if (connect(socket_fd, (struct sockaddr*)(&name), sizeof(name))
00380 == -1)
00381 {
00382 if (show_warnings)
00383 nv2_warn("while attempting to send patch: "
00384 "connect() failed");
00385 retval = NV2_PATCH_SEND_FAIL;
00386 }
00387 else if (send(socket_fd, &np, NV2_IMAGE_PATCH_HEADER_SIZE, 0)
00388 != NV2_IMAGE_PATCH_HEADER_SIZE)
00389 {
00390 if (show_warnings)
00391 nv2_warn("while attempting to send patch: "
00392 "send(header) failed");
00393 retval = NV2_PATCH_SEND_FAIL;
00394 }
00395 else if (npixbytes > 0)
00396 {
00397 const ssize_t nwritten =
00398 nv2_robust_write(socket_fd, np.data, npixbytes);
00399
00400 if (nwritten != npixbytes)
00401 {
00402 if (show_warnings)
00403 nv2_warn("while attempting to send patch: "
00404 "send(pixels) failed");
00405 retval = NV2_PATCH_SEND_FAIL;
00406 }
00407 }
00408
00409 close(socket_fd);
00410
00411 nv2_image_patch_destroy(p);
00412
00413 return retval;
00414 }
00415
00416
00417
00418
00419
00420
00421
00422
00423 #endif // NEOVISIONII_NV2_LABEL_READER_C_DEFINED