#include #include #include #include #include #include #include #include #include #include /* Forward ZMQ messages from one subscriber to a * publisher. * * Each time a SIGUSR1 is received, one message gets dropped. */ #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 static int drop_frames = 0; void signalHandler(int signum) { if (signum == SIGUSR1) { drop_frames++; signal(SIGUSR1, signalHandler); } } void barf() { fprintf(stderr, "Error: %s\n", zmq_strerror(errno)); exit(1); } void usage(char** argv) { fprintf(stderr, "usage: connect to publish from\n"); fprintf(stderr, " zmq-dropper host port port\n"); exit(1); } int main(int argc, char** argv) { int rc; char endpoint[256]; #ifdef GIT_VERSION fprintf(stderr, "zmq-dropper message dropper. version %s\n", GIT_VERSION); #else fprintf(stderr, "zmq-dropper message dropper. version ?\n"); #endif fprintf(stderr, "Send me a SIGUSR1 then I will drop a frame\n\n"); if (argc < 4) { usage(argv); } char* sub_host = argv[1]; int sub_port = atoi(argv[2]); int pub_port = atoi(argv[3]); fprintf(stderr, "connecting to tcp://%s:%d\n", sub_host, sub_port); void* ctx = zmq_ctx_new(); /*********** SUBSCRIBER **********/ void* sub_sock = zmq_socket(ctx, ZMQ_SUB); snprintf(endpoint, 256, "tcp://%s:%d", sub_host, sub_port); rc = zmq_connect(sub_sock, endpoint); fprintf(stderr, "sub connect %d\n", rc); if (rc) barf(); rc = zmq_setsockopt(sub_sock, ZMQ_SUBSCRIBE, NULL, 0); fprintf(stderr, "subscribe %d\n", rc); if (rc) barf(); /*********** PUBLISHER **********/ void* pub_sock = zmq_socket(ctx, ZMQ_PUB); snprintf(endpoint, 256, "tcp://*:%d", pub_port); rc = zmq_bind(pub_sock, endpoint); fprintf(stderr, "pub bind %d\n", rc); if (rc) barf(); /*********** MAIN LOOP *************/ signal(SIGUSR1, signalHandler); const int framelen = NUM_FRAMES_PER_ZMQ_MESSAGE * 6144; uint8_t eti[framelen]; while (1) { rc = zmq_recv(sub_sock, eti, framelen, 0); if (rc > 0) { if (drop_frames) { drop_frames--; fprintf(stderr, "Dropped one frame\n"); } else { rc = zmq_send(pub_sock, eti, framelen, 0); if (rc < 0) { fprintf(stderr, "zmq_send rc=%d\n", rc); barf(); } } } else if (rc == -1) { if (errno == EINTR) { fprintf(stderr, "zmq_recv interrupted\n"); } else { fprintf(stderr, "zmq_recv rc=%d\n", rc); barf(); } } else { fprintf(stderr, "zmq_recv rc=%d\n", rc); } } zmq_close(sub_sock); zmq_close(pub_sock); zmq_ctx_destroy(ctx); return 0; }