diff options
Diffstat (limited to 'src/rio.c')
| -rw-r--r-- | src/rio.c | 351 |
1 files changed, 351 insertions, 0 deletions
diff --git a/src/rio.c b/src/rio.c new file mode 100644 index 0000000..9c7220f --- /dev/null +++ b/src/rio.c @@ -0,0 +1,351 @@ +/* rio.c is a simple stream-oriented I/O abstraction that provides an interface + * to write code that can consume/produce data using different concrete input + * and output devices. For instance the same rdb.c code using the rio + * abstraction can be used to read and write the RDB format using in-memory + * buffers or files. + * + * A rio object provides the following methods: + * read: read from stream. + * write: write to stream. + * tell: get the current offset. + * + * It is also possible to set a 'checksum' method that is used by rio.c in order + * to compute a checksum of the data written or read, or to query the rio object + * for the current checksum. + * + * ---------------------------------------------------------------------------- + * + * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com> + * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "fmacros.h" +#include <string.h> +#include <stdio.h> +#include <unistd.h> +#include "rio.h" +#include "util.h" +#include "crc64.h" +#include "config.h" +#include "server.h" + +/* ------------------------- Buffer I/O implementation ----------------------- */ + +/* Returns 1 or 0 for success/failure. */ +static size_t rioBufferWrite(rio *r, const void *buf, size_t len) { + r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len); + r->io.buffer.pos += len; + return 1; +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioBufferRead(rio *r, void *buf, size_t len) { + if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len) + return 0; /* not enough buffer to return len bytes. */ + memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len); + r->io.buffer.pos += len; + return 1; +} + +/* Returns read/write position in buffer. */ +static off_t rioBufferTell(rio *r) { + return r->io.buffer.pos; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioBufferFlush(rio *r) { + UNUSED(r); + return 1; /* Nothing to do, our write just appends to the buffer. */ +} + +static const rio rioBufferIO = { + rioBufferRead, + rioBufferWrite, + rioBufferTell, + rioBufferFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithBuffer(rio *r, sds s) { + *r = rioBufferIO; + r->io.buffer.ptr = s; + r->io.buffer.pos = 0; +} + +/* --------------------- Stdio file pointer implementation ------------------- */ + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFileWrite(rio *r, const void *buf, size_t len) { + size_t retval; + + retval = fwrite(buf,len,1,r->io.file.fp); + r->io.file.buffered += len; + + if (r->io.file.autosync && + r->io.file.buffered >= r->io.file.autosync) + { + fflush(r->io.file.fp); + aof_fsync(fileno(r->io.file.fp)); + r->io.file.buffered = 0; + } + return retval; +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFileRead(rio *r, void *buf, size_t len) { + return fread(buf,len,1,r->io.file.fp); +} + +/* Returns read/write position in file. */ +static off_t rioFileTell(rio *r) { + return ftello(r->io.file.fp); +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFileFlush(rio *r) { + return (fflush(r->io.file.fp) == 0) ? 1 : 0; +} + +static const rio rioFileIO = { + rioFileRead, + rioFileWrite, + rioFileTell, + rioFileFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithFile(rio *r, FILE *fp) { + *r = rioFileIO; + r->io.file.fp = fp; + r->io.file.buffered = 0; + r->io.file.autosync = 0; +} + +/* ------------------- File descriptors set implementation ------------------- */ + +/* Returns 1 or 0 for success/failure. + * The function returns success as long as we are able to correctly write + * to at least one file descriptor. + * + * When buf is NULL and len is 0, the function performs a flush operation + * if there is some pending buffer, so this function is also used in order + * to implement rioFdsetFlush(). */ +static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { + ssize_t retval; + int j; + unsigned char *p = (unsigned char*) buf; + int doflush = (buf == NULL && len == 0); + + /* To start we always append to our buffer. If it gets larger than + * a given size, we actually write to the sockets. */ + if (len) { + r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len); + len = 0; /* Prevent entering the while below if we don't flush. */ + if (sdslen(r->io.fdset.buf) > PROTO_IOBUF_LEN) doflush = 1; + } + + if (doflush) { + p = (unsigned char*) r->io.fdset.buf; + len = sdslen(r->io.fdset.buf); + } + + /* Write in little chunchs so that when there are big writes we + * parallelize while the kernel is sending data in background to + * the TCP socket. */ + while(len) { + size_t count = len < 1024 ? len : 1024; + int broken = 0; + for (j = 0; j < r->io.fdset.numfds; j++) { + if (r->io.fdset.state[j] != 0) { + /* Skip FDs alraedy in error. */ + broken++; + continue; + } + + /* Make sure to write 'count' bytes to the socket regardless + * of short writes. */ + size_t nwritten = 0; + while(nwritten != count) { + retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten); + if (retval <= 0) { + /* With blocking sockets, which is the sole user of this + * rio target, EWOULDBLOCK is returned only because of + * the SO_SNDTIMEO socket option, so we translate the error + * into one more recognizable by the user. */ + if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; + break; + } + nwritten += retval; + } + + if (nwritten != count) { + /* Mark this FD as broken. */ + r->io.fdset.state[j] = errno; + if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO; + } + } + if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */ + p += count; + len -= count; + r->io.fdset.pos += count; + } + + if (doflush) sdsclear(r->io.fdset.buf); + return 1; +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdsetRead(rio *r, void *buf, size_t len) { + UNUSED(r); + UNUSED(buf); + UNUSED(len); + return 0; /* Error, this target does not support reading. */ +} + +/* Returns read/write position in file. */ +static off_t rioFdsetTell(rio *r) { + return r->io.fdset.pos; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFdsetFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioFdsetWrite(r,NULL,0); +} + +static const rio rioFdsetIO = { + rioFdsetRead, + rioFdsetWrite, + rioFdsetTell, + rioFdsetFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithFdset(rio *r, int *fds, int numfds) { + int j; + + *r = rioFdsetIO; + r->io.fdset.fds = zmalloc(sizeof(int)*numfds); + r->io.fdset.state = zmalloc(sizeof(int)*numfds); + memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds); + for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0; + r->io.fdset.numfds = numfds; + r->io.fdset.pos = 0; + r->io.fdset.buf = sdsempty(); +} + +/* release the rio stream. */ +void rioFreeFdset(rio *r) { + zfree(r->io.fdset.fds); + zfree(r->io.fdset.state); + sdsfree(r->io.fdset.buf); +} + +/* ---------------------------- Generic functions ---------------------------- */ + +/* This function can be installed both in memory and file streams when checksum + * computation is needed. */ +void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { + r->cksum = crc64(r->cksum,buf,len); +} + +/* Set the file-based rio object to auto-fsync every 'bytes' file written. + * By default this is set to zero that means no automatic file sync is + * performed. + * + * This feature is useful in a few contexts since when we rely on OS write + * buffers sometimes the OS buffers way too much, resulting in too many + * disk I/O concentrated in very little time. When we fsync in an explicit + * way instead the I/O pressure is more distributed across time. */ +void rioSetAutoSync(rio *r, off_t bytes) { + serverAssert(r->read == rioFileIO.read); + r->io.file.autosync = bytes; +} + +/* --------------------------- Higher level interface -------------------------- + * + * The following higher level functions use lower level rio.c functions to help + * generating the Redis protocol for the Append Only File. */ + +/* Write multi bulk count in the format: "*<count>\r\n". */ +size_t rioWriteBulkCount(rio *r, char prefix, int count) { + char cbuf[128]; + int clen; + + cbuf[0] = prefix; + clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count); + cbuf[clen++] = '\r'; + cbuf[clen++] = '\n'; + if (rioWrite(r,cbuf,clen) == 0) return 0; + return clen; +} + +/* Write binary-safe string in the format: "$<count>\r\n<payload>\r\n". */ +size_t rioWriteBulkString(rio *r, const char *buf, size_t len) { + size_t nwritten; + + if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0; + if (len > 0 && rioWrite(r,buf,len) == 0) return 0; + if (rioWrite(r,"\r\n",2) == 0) return 0; + return nwritten+len+2; +} + +/* Write a long long value in format: "$<count>\r\n<payload>\r\n". */ +size_t rioWriteBulkLongLong(rio *r, long long l) { + char lbuf[32]; + unsigned int llen; + + llen = ll2string(lbuf,sizeof(lbuf),l); + return rioWriteBulkString(r,lbuf,llen); +} + +/* Write a double value in the format: "$<count>\r\n<payload>\r\n" */ +size_t rioWriteBulkDouble(rio *r, double d) { + char dbuf[128]; + unsigned int dlen; + + dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); + return rioWriteBulkString(r,dbuf,dlen); +} |
