summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c195
1 files changed, 195 insertions, 0 deletions
diff --git a/src/blocked.c b/src/blocked.c
new file mode 100644
index 0000000..54b26b7
--- /dev/null
+++ b/src/blocked.c
@@ -0,0 +1,195 @@
+/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * ---------------------------------------------------------------------------
+ *
+ * API:
+ *
+ * getTimeoutFromObjectOrReply() is just an utility function to parse a
+ * timeout argument since blocking operations usually require a timeout.
+ *
+ * blockClient() set the CLIENT_BLOCKED flag in the client, and set the
+ * specified block type 'btype' filed to one of BLOCKED_* macros.
+ *
+ * unblockClient() unblocks the client doing the following:
+ * 1) It calls the btype-specific function to cleanup the state.
+ * 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag.
+ * 3) It puts the client into a list of just unblocked clients that are
+ * processed ASAP in the beforeSleep() event loop callback, so that
+ * if there is some query buffer to process, we do it. This is also
+ * required because otherwise there is no 'readable' event fired, we
+ * already read the pending commands. We also set the CLIENT_UNBLOCKED
+ * flag to remember the client is in the unblocked_clients list.
+ *
+ * processUnblockedClients() is called inside the beforeSleep() function
+ * to process the query buffer from unblocked clients and remove the clients
+ * from the blocked_clients queue.
+ *
+ * replyToBlockedClientTimedOut() is called by the cron function when
+ * a client blocked reaches the specified timeout (if the timeout is set
+ * to 0, no timeout is processed).
+ * It usually just needs to send a reply to the client.
+ *
+ * When implementing a new type of blocking opeation, the implementation
+ * should modify unblockClient() and replyToBlockedClientTimedOut() in order
+ * to handle the btype-specific behavior of this two functions.
+ * If the blocking operation waits for certain keys to change state, the
+ * clusterRedirectBlockedClientIfNeeded() function should also be updated.
+ */
+
+#include "server.h"
+
+/* Get a timeout value from an object and store it into 'timeout'.
+ * The final timeout is always stored as milliseconds as a time where the
+ * timeout will expire, however the parsing is performed according to
+ * the 'unit' that can be seconds or milliseconds.
+ *
+ * Note that if the timeout is zero (usually from the point of view of
+ * commands API this means no timeout) the value stored into 'timeout'
+ * is zero. */
+int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
+ long long tval;
+
+ if (getLongLongFromObjectOrReply(c,object,&tval,
+ "timeout is not an integer or out of range") != C_OK)
+ return C_ERR;
+
+ if (tval < 0) {
+ addReplyError(c,"timeout is negative");
+ return C_ERR;
+ }
+
+ if (tval > 0) {
+ if (unit == UNIT_SECONDS) tval *= 1000;
+ tval += mstime();
+ }
+ *timeout = tval;
+
+ return C_OK;
+}
+
+/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
+ * flag is set client query buffer is not longer processed, but accumulated,
+ * and will be processed when the client is unblocked. */
+void blockClient(client *c, int btype) {
+ c->flags |= CLIENT_BLOCKED;
+ c->btype = btype;
+ server.bpop_blocked_clients++;
+}
+
+/* This function is called in the beforeSleep() function of the event loop
+ * in order to process the pending input buffer of clients that were
+ * unblocked after a blocking operation. */
+void processUnblockedClients(void) {
+ listNode *ln;
+ client *c;
+
+ while (listLength(server.unblocked_clients)) {
+ ln = listFirst(server.unblocked_clients);
+ serverAssert(ln != NULL);
+ c = ln->value;
+ listDelNode(server.unblocked_clients,ln);
+ c->flags &= ~CLIENT_UNBLOCKED;
+
+ /* Process remaining data in the input buffer, unless the client
+ * is blocked again. Actually processInputBuffer() checks that the
+ * client is not blocked before to proceed, but things may change and
+ * the code is conceptually more correct this way. */
+ if (!(c->flags & CLIENT_BLOCKED)) {
+ if (c->querybuf && sdslen(c->querybuf) > 0) {
+ processInputBuffer(c);
+ }
+ }
+ }
+}
+
+/* Unblock a client calling the right function depending on the kind
+ * of operation the client is blocking for. */
+void unblockClient(client *c) {
+ if (c->btype == BLOCKED_LIST) {
+ unblockClientWaitingData(c);
+ } else if (c->btype == BLOCKED_WAIT) {
+ unblockClientWaitingReplicas(c);
+ } else if (c->btype == BLOCKED_MODULE) {
+ unblockClientFromModule(c);
+ } else {
+ serverPanic("Unknown btype in unblockClient().");
+ }
+ /* Clear the flags, and put the client in the unblocked list so that
+ * we'll process new commands in its query buffer ASAP. */
+ c->flags &= ~CLIENT_BLOCKED;
+ c->btype = BLOCKED_NONE;
+ server.bpop_blocked_clients--;
+ /* The client may already be into the unblocked list because of a previous
+ * blocking operation, don't add back it into the list multiple times. */
+ if (!(c->flags & CLIENT_UNBLOCKED)) {
+ c->flags |= CLIENT_UNBLOCKED;
+ listAddNodeTail(server.unblocked_clients,c);
+ }
+}
+
+/* This function gets called when a blocked client timed out in order to
+ * send it a reply of some kind. After this function is called,
+ * unblockClient() will be called with the same client as argument. */
+void replyToBlockedClientTimedOut(client *c) {
+ if (c->btype == BLOCKED_LIST) {
+ addReply(c,shared.nullmultibulk);
+ } else if (c->btype == BLOCKED_WAIT) {
+ addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
+ } else if (c->btype == BLOCKED_MODULE) {
+ moduleBlockedClientTimedOut(c);
+ } else {
+ serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
+ }
+}
+
+/* Mass-unblock clients because something changed in the instance that makes
+ * blocking no longer safe. For example clients blocked in list operations
+ * in an instance which turns from master to slave is unsafe, so this function
+ * is called when a master turns into a slave.
+ *
+ * The semantics is to send an -UNBLOCKED error to the client, disconnecting
+ * it at the same time. */
+void disconnectAllBlockedClients(void) {
+ listNode *ln;
+ listIter li;
+
+ listRewind(server.clients,&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+
+ if (c->flags & CLIENT_BLOCKED) {
+ addReplySds(c,sdsnew(
+ "-UNBLOCKED force unblock from blocking operation, "
+ "instance state changed (master -> slave?)\r\n"));
+ unblockClient(c);
+ c->flags |= CLIENT_CLOSE_AFTER_REPLY;
+ }
+ }
+}