From 01cb1f06df65fefad36143fa3a55d69aecf4a1c0 Mon Sep 17 00:00:00 2001 From: David Woodhouse Date: Tue, 14 Aug 2007 00:50:39 +0800 Subject: Import FEC code from Luigi Rizzo's RMDP Paper: http://info.iet.unipi.it/~luigi/mccr6.ps.gz Code: http://info.iet.unipi.it/~luigi/rmdp980703.tgz Signed-off-by: David Woodhouse --- serve_image.c | 184 +++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 118 insertions(+), 66 deletions(-) (limited to 'serve_image.c') diff --git a/serve_image.c b/serve_image.c index dc434f0..0c7eab8 100644 --- a/serve_image.c +++ b/serve_image.c @@ -1,3 +1,7 @@ +#define _POSIX_C_SOURCE 199309 + +#include + #include #include #include @@ -16,7 +20,9 @@ #include "mcast_image.h" int tx_rate = 80000; -int pkt_delay = 12500 * PKT_SIZE / 1024; +int pkt_delay; + +#undef RANDOMDROP int main(int argc, char **argv) { @@ -30,26 +36,36 @@ int main(int argc, char **argv) struct stat st; int writeerrors = 0; uint32_t erasesize; - unsigned char parbuf[PKT_SIZE]; unsigned char *image, *blockptr; uint32_t block_nr; - uint32_t block_ofs; int nr_blocks; - uint32_t droppoint = -1; struct timeval then, now, nextpkt; long time_msecs; + unsigned char **src_pkts; + unsigned char last_src_pkt[PKT_SIZE]; + int pkts_extra = 6; + int pkts_per_block; + struct fec_parms *fec; - if (argc == 6) { - tx_rate = atol(argv[5]) * 1024; + if (argc == 7) { + tx_rate = atol(argv[6]) * 1024; if (tx_rate < PKT_SIZE || tx_rate > 20000000) { fprintf(stderr, "Bogus TX rate %d KiB/s\n", tx_rate); exit(1); } + argc = 6; + } + if (argc == 6) { + pkts_extra = atol(argv[5]); + if (pkts_extra < 0 || pkts_extra > 200) { + fprintf(stderr, "Bogus redundancy %d packets\n", pkts_extra); + exit(1); + } argc = 5; } if (argc != 5) { - fprintf(stderr, "usage: %s []\n", + fprintf(stderr, "usage: %s [] []\n", (strrchr(argv[0], '/')?:argv[0]-1)+1); exit(1); } @@ -62,6 +78,22 @@ int main(int argc, char **argv) fprintf(stderr, "erasesize cannot be zero\n"); exit(1); } + + pkts_per_block = (erasesize + PKT_SIZE - 1) / PKT_SIZE; + src_pkts = malloc(pkts_per_block * sizeof(unsigned char *)); + if (!src_pkts) { + fprintf(stderr, "Failed to allocate memory for packet pointers\n"); + exit(1); + } + /* We have to pad it with zeroes, so can't use it in-place */ + src_pkts[pkts_per_block-1] = last_src_pkt; + + fec = fec_new(pkts_per_block, pkts_per_block + pkts_extra); + if (!fec) { + fprintf(stderr, "Error initialising FEC\n"); + exit(1); + } + memset(&hints, 0, sizeof(hints)); hints.ai_flags = AI_ADDRCONFIG; hints.ai_socktype = SOCK_DGRAM; @@ -118,85 +150,91 @@ int main(int argc, char **argv) pktbuf.hdr.totcrc = htonl(crc32(-1, image, st.st_size)); pktbuf.hdr.nr_blocks = htonl(nr_blocks); pktbuf.hdr.blocksize = htonl(erasesize); + pktbuf.hdr.thislen = htonl(PKT_SIZE); + pktbuf.hdr.nr_pkts = htons(pkts_per_block + pkts_extra); printf("%08x\n", ntohl(pktbuf.hdr.totcrc)); + again: + printf("Image size %ld KiB (%08lx). %d redundant packets per block (%d total)\n" + "Data to send %d KiB. Estimated transmit time: %ds\n", + (long)st.st_size / 1024, (long) st.st_size, pkts_extra, pkts_extra+pkts_per_block, + nr_blocks * PKT_SIZE * (pkts_per_block+pkts_extra) / 1024, + nr_blocks * (pkts_per_block+pkts_extra) * pkt_delay / 1000000); gettimeofday(&then, NULL); nextpkt = then; +#ifdef RANDOMDROP + srand((unsigned)then.tv_usec); + printf("Random seed %u\n", (unsigned)then.tv_usec); +#endif blockptr = image; for (block_nr = 0; block_nr < nr_blocks; block_nr++) { - int len; - int dropped = 0; + int i; + long tosleep; + + blockptr = image + (erasesize * block_nr); + + pktbuf.hdr.block_crc = htonl(crc32(-1, blockptr, erasesize)); + + for (i=0; i < pkts_per_block-1; i++) + src_pkts[i] = blockptr + (i*PKT_SIZE); + + memcpy(last_src_pkt, blockptr + (i*PKT_SIZE), + erasesize - (i * PKT_SIZE)); pktbuf.hdr.block_nr = htonl(block_nr); - for (block_ofs = 0; block_ofs <= erasesize; block_ofs += len) { - int i; + for (i=0; i < pkts_per_block + pkts_extra; i++) { + + fec_encode(fec, src_pkts, pktbuf.data, i, PKT_SIZE); - if (block_ofs + PKT_SIZE > erasesize) - len = erasesize - block_ofs; - else - len = PKT_SIZE; + printf("\rSending data block %08x packet %3d/%d", + block_nr * erasesize, i, pkts_per_block + pkts_extra); - if (block_ofs == erasesize) { + if (block_nr && !i) { gettimeofday(&now, NULL); time_msecs = (now.tv_sec - then.tv_sec) * 1000; time_msecs += ((int)(now.tv_usec - then.tv_usec)) / 1000; - - printf("\rSending parity block: %08x (%ld KiB/s) ", - block_nr * erasesize, - (block_ofs + (block_nr * (erasesize+sizeof(pktbuf)))) / 1024 * 1000 / time_msecs); - - len = PKT_SIZE; - memcpy(pktbuf.data, parbuf, PKT_SIZE); - } else { - if (!block_ofs) - memcpy(parbuf, blockptr, PKT_SIZE); - else for (i=0; i < len; i++) - parbuf[i] ^= blockptr[i]; - - memcpy(pktbuf.data, blockptr, len); - printf("\rSending data block at %08x", - block_nr * erasesize + block_ofs); - blockptr += len; + printf(" (%ld KiB/s) ", + (block_nr * sizeof(pktbuf) * (pkts_per_block+pkts_extra)) + / 1024 * 1000 / time_msecs); } fflush(stdout); - pktbuf.hdr.thislen = htonl(len); - pktbuf.hdr.block_ofs = htonl(block_ofs); - pktbuf.hdr.thiscrc = htonl(crc32(-1, pktbuf.data, len)); - - if (droppoint == block_ofs && !dropped) { - dropped = 1; - if (droppoint == 0) - droppoint = erasesize; - else if (droppoint == erasesize) - droppoint = ((erasesize - 1) / PKT_SIZE) * PKT_SIZE; - else droppoint -= PKT_SIZE; - printf("\nDropping data block at %08x\n", block_ofs); + pktbuf.hdr.pkt_nr = htons(i); + pktbuf.hdr.thiscrc = htonl(crc32(-1, pktbuf.data, PKT_SIZE)); + +#ifdef RANDOMDROP + if ((rand() % 1000) < 20) { + printf("\nDropping packet %d\n", i+1); continue; } +#endif + gettimeofday(&now, NULL); +#if 1 + tosleep = nextpkt.tv_usec - now.tv_usec + + (1000000 * (nextpkt.tv_sec - now.tv_sec)); - if (write(sock, &pktbuf, sizeof(pktbuf.hdr)+len) < 0) { - perror("write"); - writeerrors++; - if (writeerrors > 10) { - fprintf(stderr, "Too many consecutive write errors\n"); - exit(1); - } - } else - writeerrors = 0; + /* We need hrtimers for this to actually work */ + if (tosleep > 0) { + struct timespec req; - do { - gettimeofday(&now, NULL); - } while (now.tv_sec < nextpkt.tv_sec || - (now.tv_sec == nextpkt.tv_sec && - now.tv_usec < nextpkt.tv_usec)); + req.tv_nsec = (tosleep % 1000000) * 1000; + req.tv_sec = tosleep / 1000000; - nextpkt.tv_usec = now.tv_usec + pkt_delay; + nanosleep(&req, NULL); + } +#else + while (now.tv_sec < nextpkt.tv_sec || + (now.tv_sec == nextpkt.tv_sec && + now.tv_usec < nextpkt.tv_usec)) { + gettimeofday(&now, NULL); + } +#endif + nextpkt.tv_usec += pkt_delay; if (nextpkt.tv_usec >= 1000000) { nextpkt.tv_sec += nextpkt.tv_usec / 1000000; nextpkt.tv_usec %= 1000000; @@ -206,20 +244,34 @@ int main(int argc, char **argv) passed, then we've lost time. Adjust our expected timings accordingly. */ if (now.tv_usec > (now.tv_usec + - 1000000 * (nextpkt.tv_sec - now.tv_sec))) + 1000000 * (nextpkt.tv_sec - now.tv_sec))) { nextpkt = now; + } + + if (write(sock, &pktbuf, sizeof(pktbuf)) < 0) { + perror("write"); + writeerrors++; + if (writeerrors > 10) { + fprintf(stderr, "Too many consecutive write errors\n"); + exit(1); + } + } else + writeerrors = 0; + + } } - munmap(image, st.st_size); - close(rfd); - close(sock); gettimeofday(&now, NULL); time_msecs = (now.tv_sec - then.tv_sec) * 1000; time_msecs += ((int)(now.tv_usec - then.tv_usec)) / 1000; printf("\n%d KiB sent in %ldms (%ld KiB/s)\n", - nr_blocks * (erasesize+sizeof(pktbuf)) / 1024, time_msecs, - nr_blocks * (erasesize+sizeof(pktbuf)) / 1024 * 1000 / time_msecs); + nr_blocks * sizeof(pktbuf) * (pkts_per_block+pkts_extra) / 1024, time_msecs, + nr_blocks * sizeof(pktbuf) * (pkts_per_block+pkts_extra) / 1024 * 1000 / time_msecs); + + munmap(image, st.st_size); + close(rfd); + close(sock); return 0; } -- cgit v1.2.3