summaryrefslogtreecommitdiff
path: root/src/db.c
diff options
context:
space:
mode:
authorbyte2016 <[email protected]>2018-06-12 19:55:38 +0800
committerbyte2016 <[email protected]>2018-06-12 19:55:38 +0800
commit76f2c13d7c27d7419af79ea0bdc7ab7717b6935b (patch)
treeaa2ca741501d40990b892d504a1cc3b7defe57aa /src/db.c
Init commit.HEADmaster
Diffstat (limited to 'src/db.c')
-rw-r--r--src/db.c1390
1 files changed, 1390 insertions, 0 deletions
diff --git a/src/db.c b/src/db.c
new file mode 100644
index 0000000..a21437c
--- /dev/null
+++ b/src/db.c
@@ -0,0 +1,1390 @@
+/*
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+#include "cluster.h"
+#include "atomicvar.h"
+
+#include <signal.h>
+#include <ctype.h>
+
+/*-----------------------------------------------------------------------------
+ * C-level DB API
+ *----------------------------------------------------------------------------*/
+
+/* Low level key lookup API, not actually called directly from commands
+ * implementations that should instead rely on lookupKeyRead(),
+ * lookupKeyWrite() and lookupKeyReadWithFlags(). */
+robj *lookupKey(redisDb *db, robj *key, int flags) {
+ dictEntry *de = dictFind(db->dict,key->ptr);
+ if (de) {
+ robj *val = dictGetVal(de);
+
+ /* Update the access time for the ageing algorithm.
+ * Don't do it if we have a saving child, as this will trigger
+ * a copy on write madness. */
+ if (server.rdb_child_pid == -1 &&
+ server.aof_child_pid == -1 &&
+ !(flags & LOOKUP_NOTOUCH))
+ {
+ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
+ unsigned long ldt = val->lru >> 8;
+ unsigned long counter = LFULogIncr(val->lru & 255);
+ val->lru = (ldt << 8) | counter;
+ } else {
+ val->lru = LRU_CLOCK();
+ }
+ }
+ return val;
+ } else {
+ return NULL;
+ }
+}
+
+/* Lookup a key for read operations, or return NULL if the key is not found
+ * in the specified DB.
+ *
+ * As a side effect of calling this function:
+ * 1. A key gets expired if it reached it's TTL.
+ * 2. The key last access time is updated.
+ * 3. The global keys hits/misses stats are updated (reported in INFO).
+ *
+ * This API should not be used when we write to the key after obtaining
+ * the object linked to the key, but only for read only operations.
+ *
+ * Flags change the behavior of this command:
+ *
+ * LOOKUP_NONE (or zero): no special flags are passed.
+ * LOOKUP_NOTOUCH: don't alter the last access time of the key.
+ *
+ * Note: this function also returns NULL is the key is logically expired
+ * but still existing, in case this is a slave, since this API is called only
+ * for read operations. Even if the key expiry is master-driven, we can
+ * correctly report a key is expired on slaves even if the master is lagging
+ * expiring our key via DELs in the replication link. */
+robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
+ robj *val;
+
+ if (expireIfNeeded(db,key) == 1) {
+ /* Key expired. If we are in the context of a master, expireIfNeeded()
+ * returns 0 only when the key does not exist at all, so it's save
+ * to return NULL ASAP. */
+ if (server.masterhost == NULL) return NULL;
+
+ /* However if we are in the context of a slave, expireIfNeeded() will
+ * not really try to expire the key, it only returns information
+ * about the "logical" status of the key: key expiring is up to the
+ * master in order to have a consistent view of master's data set.
+ *
+ * However, if the command caller is not the master, and as additional
+ * safety measure, the command invoked is a read-only command, we can
+ * safely return NULL here, and provide a more consistent behavior
+ * to clients accessign expired values in a read-only fashion, that
+ * will say the key as non exisitng.
+ *
+ * Notably this covers GETs when slaves are used to scale reads. */
+ if (server.current_client &&
+ server.current_client != server.master &&
+ server.current_client->cmd &&
+ server.current_client->cmd->flags & CMD_READONLY)
+ {
+ return NULL;
+ }
+ }
+ val = lookupKey(db,key,flags);
+ if (val == NULL)
+ server.stat_keyspace_misses++;
+ else
+ server.stat_keyspace_hits++;
+ return val;
+}
+
+/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
+ * common case. */
+robj *lookupKeyRead(redisDb *db, robj *key) {
+ return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
+}
+
+/* Lookup a key for write operations, and as a side effect, if needed, expires
+ * the key if its TTL is reached.
+ *
+ * Returns the linked value object if the key exists or NULL if the key
+ * does not exist in the specified DB. */
+robj *lookupKeyWrite(redisDb *db, robj *key) {
+ expireIfNeeded(db,key);
+ return lookupKey(db,key,LOOKUP_NONE);
+}
+
+robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
+ robj *o = lookupKeyRead(c->db, key);
+ if (!o) addReply(c,reply);
+ return o;
+}
+
+robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
+ robj *o = lookupKeyWrite(c->db, key);
+ if (!o) addReply(c,reply);
+ return o;
+}
+
+/* Add the key to the DB. It's up to the caller to increment the reference
+ * counter of the value if needed.
+ *
+ * The program is aborted if the key already exists. */
+void dbAdd(redisDb *db, robj *key, robj *val) {
+ sds copy = sdsdup(key->ptr);
+ int retval = dictAdd(db->dict, copy, val);
+
+ serverAssertWithInfo(NULL,key,retval == DICT_OK);
+ if (val->type == OBJ_LIST) signalListAsReady(db, key);
+ if (server.cluster_enabled) slotToKeyAdd(key);
+ }
+
+/* Overwrite an existing key with a new value. Incrementing the reference
+ * count of the new value is up to the caller.
+ * This function does not modify the expire time of the existing key.
+ *
+ * The program is aborted if the key was not already present. */
+void dbOverwrite(redisDb *db, robj *key, robj *val) {
+ dictEntry *de = dictFind(db->dict,key->ptr);
+
+ serverAssertWithInfo(NULL,key,de != NULL);
+ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
+ robj *old = dictGetVal(de);
+ int saved_lru = old->lru;
+ dictReplace(db->dict, key->ptr, val);
+ val->lru = saved_lru;
+ } else {
+ dictReplace(db->dict, key->ptr, val);
+ }
+}
+
+/* High level Set operation. This function can be used in order to set
+ * a key, whatever it was existing or not, to a new object.
+ *
+ * 1) The ref count of the value object is incremented.
+ * 2) clients WATCHing for the destination key notified.
+ * 3) The expire time of the key is reset (the key is made persistent).
+ *
+ * All the new keys in the database should be craeted via this interface. */
+void setKey(redisDb *db, robj *key, robj *val) {
+ if (lookupKeyWrite(db,key) == NULL) {
+ dbAdd(db,key,val);
+ } else {
+ dbOverwrite(db,key,val);
+ }
+ incrRefCount(val);
+ removeExpire(db,key);
+ signalModifiedKey(db,key);
+}
+
+int dbExists(redisDb *db, robj *key) {
+ return dictFind(db->dict,key->ptr) != NULL;
+}
+
+/* Return a random key, in form of a Redis object.
+ * If there are no keys, NULL is returned.
+ *
+ * The function makes sure to return keys not already expired. */
+robj *dbRandomKey(redisDb *db) {
+ dictEntry *de;
+
+ while(1) {
+ sds key;
+ robj *keyobj;
+
+ de = dictGetRandomKey(db->dict);
+ if (de == NULL) return NULL;
+
+ key = dictGetKey(de);
+ keyobj = createStringObject(key,sdslen(key));
+ if (dictFind(db->expires,key)) {
+ if (expireIfNeeded(db,keyobj)) {
+ decrRefCount(keyobj);
+ continue; /* search for another key. This expired. */
+ }
+ }
+ return keyobj;
+ }
+}
+
+/* Delete a key, value, and associated expiration entry if any, from the DB */
+int dbSyncDelete(redisDb *db, robj *key) {
+ /* Deleting an entry from the expires dict will not free the sds of
+ * the key, because it is shared with the main dictionary. */
+ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
+ if (dictDelete(db->dict,key->ptr) == DICT_OK) {
+ if (server.cluster_enabled) slotToKeyDel(key);
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+/* This is a wrapper whose behavior depends on the Redis lazy free
+ * configuration. Deletes the key synchronously or asynchronously. */
+int dbDelete(redisDb *db, robj *key) {
+ return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) :
+ dbSyncDelete(db,key);
+}
+
+/* Prepare the string object stored at 'key' to be modified destructively
+ * to implement commands like SETBIT or APPEND.
+ *
+ * An object is usually ready to be modified unless one of the two conditions
+ * are true:
+ *
+ * 1) The object 'o' is shared (refcount > 1), we don't want to affect
+ * other users.
+ * 2) The object encoding is not "RAW".
+ *
+ * If the object is found in one of the above conditions (or both) by the
+ * function, an unshared / not-encoded copy of the string object is stored
+ * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
+ * returned.
+ *
+ * USAGE:
+ *
+ * The object 'o' is what the caller already obtained by looking up 'key'
+ * in 'db', the usage pattern looks like this:
+ *
+ * o = lookupKeyWrite(db,key);
+ * if (checkType(c,o,OBJ_STRING)) return;
+ * o = dbUnshareStringValue(db,key,o);
+ *
+ * At this point the caller is ready to modify the object, for example
+ * using an sdscat() call to append some data, or anything else.
+ */
+robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
+ serverAssert(o->type == OBJ_STRING);
+ if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
+ robj *decoded = getDecodedObject(o);
+ o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
+ decrRefCount(decoded);
+ dbOverwrite(db,key,o);
+ }
+ return o;
+}
+
+/* Remove all keys from all the databases in a Redis server.
+ * If callback is given the function is called from time to time to
+ * signal that work is in progress.
+ *
+ * The dbnum can be -1 if all teh DBs should be flushed, or the specified
+ * DB number if we want to flush only a single Redis database number.
+ *
+ * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
+ * EMPTYDB_ASYNC if we want the memory to be freed in a different thread
+ * and the function to return ASAP.
+ *
+ * On success the fuction returns the number of keys removed from the
+ * database(s). Otherwise -1 is returned in the specific case the
+ * DB number is out of range, and errno is set to EINVAL. */
+long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
+ int j, async = (flags & EMPTYDB_ASYNC);
+ long long removed = 0;
+
+ if (dbnum < -1 || dbnum >= server.dbnum) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ for (j = 0; j < server.dbnum; j++) {
+ if (dbnum != -1 && dbnum != j) continue;
+ removed += dictSize(server.db[j].dict);
+ if (async) {
+ emptyDbAsync(&server.db[j]);
+ } else {
+ dictEmpty(server.db[j].dict,callback);
+ dictEmpty(server.db[j].expires,callback);
+ }
+ }
+ if (server.cluster_enabled) {
+ if (async) {
+ slotToKeyFlushAsync();
+ } else {
+ slotToKeyFlush();
+ }
+ }
+ if (dbnum == -1) flushSlaveKeysWithExpireList();
+ return removed;
+}
+
+int selectDb(client *c, int id) {
+ if (id < 0 || id >= server.dbnum)
+ return C_ERR;
+ c->db = &server.db[id];
+ return C_OK;
+}
+
+/*-----------------------------------------------------------------------------
+ * Hooks for key space changes.
+ *
+ * Every time a key in the database is modified the function
+ * signalModifiedKey() is called.
+ *
+ * Every time a DB is flushed the function signalFlushDb() is called.
+ *----------------------------------------------------------------------------*/
+
+void signalModifiedKey(redisDb *db, robj *key) {
+ touchWatchedKey(db,key);
+}
+
+void signalFlushedDb(int dbid) {
+ touchWatchedKeysOnFlush(dbid);
+}
+
+/*-----------------------------------------------------------------------------
+ * Type agnostic commands operating on the key space
+ *----------------------------------------------------------------------------*/
+
+/* Return the set of flags to use for the emptyDb() call for FLUSHALL
+ * and FLUSHDB commands.
+ *
+ * Currently the command just attempts to parse the "ASYNC" option. It
+ * also checks if the command arity is wrong.
+ *
+ * On success C_OK is returned and the flags are stored in *flags, otherwise
+ * C_ERR is returned and the function sends an error to the client. */
+int getFlushCommandFlags(client *c, int *flags) {
+ /* Parse the optional ASYNC option. */
+ if (c->argc > 1) {
+ if (c->argc > 2 || strcasecmp(c->argv[1]->ptr,"async")) {
+ addReply(c,shared.syntaxerr);
+ return C_ERR;
+ }
+ *flags = EMPTYDB_ASYNC;
+ } else {
+ *flags = EMPTYDB_NO_FLAGS;
+ }
+ return C_OK;
+}
+
+/* FLUSHDB [ASYNC]
+ *
+ * Flushes the currently SELECTed Redis DB. */
+void flushdbCommand(client *c) {
+ int flags;
+
+ if (getFlushCommandFlags(c,&flags) == C_ERR) return;
+ signalFlushedDb(c->db->id);
+ server.dirty += emptyDb(c->db->id,flags,NULL);
+ addReply(c,shared.ok);
+}
+
+/* FLUSHALL [ASYNC]
+ *
+ * Flushes the whole server data set. */
+void flushallCommand(client *c) {
+ int flags;
+
+ if (getFlushCommandFlags(c,&flags) == C_ERR) return;
+ signalFlushedDb(-1);
+ server.dirty += emptyDb(-1,flags,NULL);
+ addReply(c,shared.ok);
+ if (server.rdb_child_pid != -1) {
+ kill(server.rdb_child_pid,SIGUSR1);
+ rdbRemoveTempFile(server.rdb_child_pid);
+ }
+ if (server.saveparamslen > 0) {
+ /* Normally rdbSave() will reset dirty, but we don't want this here
+ * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
+ int saved_dirty = server.dirty;
+ rdbSave(server.rdb_filename,NULL);
+ server.dirty = saved_dirty;
+ }
+ server.dirty++;
+}
+
+/* This command implements DEL and LAZYDEL. */
+void delGenericCommand(client *c, int lazy) {
+ int numdel = 0, j;
+
+ for (j = 1; j < c->argc; j++) {
+ expireIfNeeded(c->db,c->argv[j]);
+ int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
+ dbSyncDelete(c->db,c->argv[j]);
+ if (deleted) {
+ signalModifiedKey(c->db,c->argv[j]);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,
+ "del",c->argv[j],c->db->id);
+ server.dirty++;
+ numdel++;
+ }
+ }
+ addReplyLongLong(c,numdel);
+}
+
+void delCommand(client *c) {
+ delGenericCommand(c,0);
+}
+
+void unlinkCommand(client *c) {
+ delGenericCommand(c,1);
+}
+
+/* EXISTS key1 key2 ... key_N.
+ * Return value is the number of keys existing. */
+void existsCommand(client *c) {
+ long long count = 0;
+ int j;
+
+ for (j = 1; j < c->argc; j++) {
+ expireIfNeeded(c->db,c->argv[j]);
+ if (dbExists(c->db,c->argv[j])) count++;
+ }
+ addReplyLongLong(c,count);
+}
+
+void selectCommand(client *c) {
+ long id;
+
+ if (getLongFromObjectOrReply(c, c->argv[1], &id,
+ "invalid DB index") != C_OK)
+ return;
+
+ if (server.cluster_enabled && id != 0) {
+ addReplyError(c,"SELECT is not allowed in cluster mode");
+ return;
+ }
+ if (selectDb(c,id) == C_ERR) {
+ addReplyError(c,"DB index is out of range");
+ } else {
+ addReply(c,shared.ok);
+ }
+}
+
+void randomkeyCommand(client *c) {
+ robj *key;
+
+ if ((key = dbRandomKey(c->db)) == NULL) {
+ addReply(c,shared.nullbulk);
+ return;
+ }
+
+ addReplyBulk(c,key);
+ decrRefCount(key);
+}
+
+void keysCommand(client *c) {
+ dictIterator *di;
+ dictEntry *de;
+ sds pattern = c->argv[1]->ptr;
+ int plen = sdslen(pattern), allkeys;
+ unsigned long numkeys = 0;
+ void *replylen = addDeferredMultiBulkLength(c);
+
+ di = dictGetSafeIterator(c->db->dict);
+ allkeys = (pattern[0] == '*' && pattern[1] == '\0');
+ while((de = dictNext(di)) != NULL) {
+ sds key = dictGetKey(de);
+ robj *keyobj;
+
+ if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
+ keyobj = createStringObject(key,sdslen(key));
+ if (expireIfNeeded(c->db,keyobj) == 0) {
+ addReplyBulk(c,keyobj);
+ numkeys++;
+ }
+ decrRefCount(keyobj);
+ }
+ }
+ dictReleaseIterator(di);
+ setDeferredMultiBulkLength(c,replylen,numkeys);
+}
+
+/* This callback is used by scanGenericCommand in order to collect elements
+ * returned by the dictionary iterator into a list. */
+void scanCallback(void *privdata, const dictEntry *de) {
+ void **pd = (void**) privdata;
+ list *keys = pd[0];
+ robj *o = pd[1];
+ robj *key, *val = NULL;
+
+ if (o == NULL) {
+ sds sdskey = dictGetKey(de);
+ key = createStringObject(sdskey, sdslen(sdskey));
+ } else if (o->type == OBJ_SET) {
+ sds keysds = dictGetKey(de);
+ key = createStringObject(keysds,sdslen(keysds));
+ } else if (o->type == OBJ_HASH) {
+ sds sdskey = dictGetKey(de);
+ sds sdsval = dictGetVal(de);
+ key = createStringObject(sdskey,sdslen(sdskey));
+ val = createStringObject(sdsval,sdslen(sdsval));
+ } else if (o->type == OBJ_ZSET) {
+ sds sdskey = dictGetKey(de);
+ key = createStringObject(sdskey,sdslen(sdskey));
+ val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
+ } else {
+ serverPanic("Type not handled in SCAN callback.");
+ }
+
+ listAddNodeTail(keys, key);
+ if (val) listAddNodeTail(keys, val);
+}
+
+/* Try to parse a SCAN cursor stored at object 'o':
+ * if the cursor is valid, store it as unsigned integer into *cursor and
+ * returns C_OK. Otherwise return C_ERR and send an error to the
+ * client. */
+int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
+ char *eptr;
+
+ /* Use strtoul() because we need an *unsigned* long, so
+ * getLongLongFromObject() does not cover the whole cursor space. */
+ errno = 0;
+ *cursor = strtoul(o->ptr, &eptr, 10);
+ if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
+ {
+ addReplyError(c, "invalid cursor");
+ return C_ERR;
+ }
+ return C_OK;
+}
+
+/* This command implements SCAN, HSCAN and SSCAN commands.
+ * If object 'o' is passed, then it must be a Hash or Set object, otherwise
+ * if 'o' is NULL the command will operate on the dictionary associated with
+ * the current database.
+ *
+ * When 'o' is not NULL the function assumes that the first argument in
+ * the client arguments vector is a key so it skips it before iterating
+ * in order to parse options.
+ *
+ * In the case of a Hash object the function returns both the field and value
+ * of every element on the Hash. */
+void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
+ int i, j;
+ list *keys = listCreate();
+ listNode *node, *nextnode;
+ long count = 10;
+ sds pat = NULL;
+ int patlen = 0, use_pattern = 0;
+ dict *ht;
+
+ /* Object must be NULL (to iterate keys names), or the type of the object
+ * must be Set, Sorted Set, or Hash. */
+ serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
+ o->type == OBJ_ZSET);
+
+ /* Set i to the first option argument. The previous one is the cursor. */
+ i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
+
+ /* Step 1: Parse options. */
+ while (i < c->argc) {
+ j = c->argc - i;
+ if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
+ if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
+ != C_OK)
+ {
+ goto cleanup;
+ }
+
+ if (count < 1) {
+ addReply(c,shared.syntaxerr);
+ goto cleanup;
+ }
+
+ i += 2;
+ } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
+ pat = c->argv[i+1]->ptr;
+ patlen = sdslen(pat);
+
+ /* The pattern always matches if it is exactly "*", so it is
+ * equivalent to disabling it. */
+ use_pattern = !(pat[0] == '*' && patlen == 1);
+
+ i += 2;
+ } else {
+ addReply(c,shared.syntaxerr);
+ goto cleanup;
+ }
+ }
+
+ /* Step 2: Iterate the collection.
+ *
+ * Note that if the object is encoded with a ziplist, intset, or any other
+ * representation that is not a hash table, we are sure that it is also
+ * composed of a small number of elements. So to avoid taking state we
+ * just return everything inside the object in a single call, setting the
+ * cursor to zero to signal the end of the iteration. */
+
+ /* Handle the case of a hash table. */
+ ht = NULL;
+ if (o == NULL) {
+ ht = c->db->dict;
+ } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
+ ht = o->ptr;
+ } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
+ ht = o->ptr;
+ count *= 2; /* We return key / value for this type. */
+ } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
+ zset *zs = o->ptr;
+ ht = zs->dict;
+ count *= 2; /* We return key / value for this type. */
+ }
+
+ if (ht) {
+ void *privdata[2];
+ /* We set the max number of iterations to ten times the specified
+ * COUNT, so if the hash table is in a pathological state (very
+ * sparsely populated) we avoid to block too much time at the cost
+ * of returning no or very few elements. */
+ long maxiterations = count*10;
+
+ /* We pass two pointers to the callback: the list to which it will
+ * add new elements, and the object containing the dictionary so that
+ * it is possible to fetch more data in a type-dependent way. */
+ privdata[0] = keys;
+ privdata[1] = o;
+ do {
+ cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
+ } while (cursor &&
+ maxiterations-- &&
+ listLength(keys) < (unsigned long)count);
+ } else if (o->type == OBJ_SET) {
+ int pos = 0;
+ int64_t ll;
+
+ while(intsetGet(o->ptr,pos++,&ll))
+ listAddNodeTail(keys,createStringObjectFromLongLong(ll));
+ cursor = 0;
+ } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
+ unsigned char *p = ziplistIndex(o->ptr,0);
+ unsigned char *vstr;
+ unsigned int vlen;
+ long long vll;
+
+ while(p) {
+ ziplistGet(p,&vstr,&vlen,&vll);
+ listAddNodeTail(keys,
+ (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
+ createStringObjectFromLongLong(vll));
+ p = ziplistNext(o->ptr,p);
+ }
+ cursor = 0;
+ } else {
+ serverPanic("Not handled encoding in SCAN.");
+ }
+
+ /* Step 3: Filter elements. */
+ node = listFirst(keys);
+ while (node) {
+ robj *kobj = listNodeValue(node);
+ nextnode = listNextNode(node);
+ int filter = 0;
+
+ /* Filter element if it does not match the pattern. */
+ if (!filter && use_pattern) {
+ if (sdsEncodedObject(kobj)) {
+ if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
+ filter = 1;
+ } else {
+ char buf[LONG_STR_SIZE];
+ int len;
+
+ serverAssert(kobj->encoding == OBJ_ENCODING_INT);
+ len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
+ if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
+ }
+ }
+
+ /* Filter element if it is an expired key. */
+ if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;
+
+ /* Remove the element and its associted value if needed. */
+ if (filter) {
+ decrRefCount(kobj);
+ listDelNode(keys, node);
+ }
+
+ /* If this is a hash or a sorted set, we have a flat list of
+ * key-value elements, so if this element was filtered, remove the
+ * value, or skip it if it was not filtered: we only match keys. */
+ if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
+ node = nextnode;
+ nextnode = listNextNode(node);
+ if (filter) {
+ kobj = listNodeValue(node);
+ decrRefCount(kobj);
+ listDelNode(keys, node);
+ }
+ }
+ node = nextnode;
+ }
+
+ /* Step 4: Reply to the client. */
+ addReplyMultiBulkLen(c, 2);
+ addReplyBulkLongLong(c,cursor);
+
+ addReplyMultiBulkLen(c, listLength(keys));
+ while ((node = listFirst(keys)) != NULL) {
+ robj *kobj = listNodeValue(node);
+ addReplyBulk(c, kobj);
+ decrRefCount(kobj);
+ listDelNode(keys, node);
+ }
+
+cleanup:
+ listSetFreeMethod(keys,decrRefCountVoid);
+ listRelease(keys);
+}
+
+/* The SCAN command completely relies on scanGenericCommand. */
+void scanCommand(client *c) {
+ unsigned long cursor;
+ if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
+ scanGenericCommand(c,NULL,cursor);
+}
+
+void dbsizeCommand(client *c) {
+ addReplyLongLong(c,dictSize(c->db->dict));
+}
+
+void lastsaveCommand(client *c) {
+ addReplyLongLong(c,server.lastsave);
+}
+
+void typeCommand(client *c) {
+ robj *o;
+ char *type;
+
+ o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
+ if (o == NULL) {
+ type = "none";
+ } else {
+ switch(o->type) {
+ case OBJ_STRING: type = "string"; break;
+ case OBJ_LIST: type = "list"; break;
+ case OBJ_SET: type = "set"; break;
+ case OBJ_ZSET: type = "zset"; break;
+ case OBJ_HASH: type = "hash"; break;
+ case OBJ_MODULE: {
+ moduleValue *mv = o->ptr;
+ type = mv->type->name;
+ }; break;
+ default: type = "unknown"; break;
+ }
+ }
+ addReplyStatus(c,type);
+}
+
+void shutdownCommand(client *c) {
+ int flags = 0;
+
+ if (c->argc > 2) {
+ addReply(c,shared.syntaxerr);
+ return;
+ } else if (c->argc == 2) {
+ if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
+ flags |= SHUTDOWN_NOSAVE;
+ } else if (!strcasecmp(c->argv[1]->ptr,"save")) {
+ flags |= SHUTDOWN_SAVE;
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+ /* When SHUTDOWN is called while the server is loading a dataset in
+ * memory we need to make sure no attempt is performed to save
+ * the dataset on shutdown (otherwise it could overwrite the current DB
+ * with half-read data).
+ *
+ * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
+ if (server.loading || server.sentinel_mode)
+ flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
+ if (prepareForShutdown(flags) == C_OK) exit(0);
+ addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
+}
+
+void renameGenericCommand(client *c, int nx) {
+ robj *o;
+ long long expire;
+ int samekey = 0;
+
+ /* When source and dest key is the same, no operation is performed,
+ * if the key exists, however we still return an error on unexisting key. */
+ if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
+
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
+ return;
+
+ if (samekey) {
+ addReply(c,nx ? shared.czero : shared.ok);
+ return;
+ }
+
+ incrRefCount(o);
+ expire = getExpire(c->db,c->argv[1]);
+ if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
+ if (nx) {
+ decrRefCount(o);
+ addReply(c,shared.czero);
+ return;
+ }
+ /* Overwrite: delete the old key before creating the new one
+ * with the same name. */
+ dbDelete(c->db,c->argv[2]);
+ }
+ dbAdd(c->db,c->argv[2],o);
+ if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
+ dbDelete(c->db,c->argv[1]);
+ signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c->db,c->argv[2]);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
+ c->argv[1],c->db->id);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
+ c->argv[2],c->db->id);
+ server.dirty++;
+ addReply(c,nx ? shared.cone : shared.ok);
+}
+
+void renameCommand(client *c) {
+ renameGenericCommand(c,0);
+}
+
+void renamenxCommand(client *c) {
+ renameGenericCommand(c,1);
+}
+
+void moveCommand(client *c) {
+ robj *o;
+ redisDb *src, *dst;
+ int srcid;
+ long long dbid, expire;
+
+ if (server.cluster_enabled) {
+ addReplyError(c,"MOVE is not allowed in cluster mode");
+ return;
+ }
+
+ /* Obtain source and target DB pointers */
+ src = c->db;
+ srcid = c->db->id;
+
+ if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
+ dbid < INT_MIN || dbid > INT_MAX ||
+ selectDb(c,dbid) == C_ERR)
+ {
+ addReply(c,shared.outofrangeerr);
+ return;
+ }
+ dst = c->db;
+ selectDb(c,srcid); /* Back to the source DB */
+
+ /* If the user is moving using as target the same
+ * DB as the source DB it is probably an error. */
+ if (src == dst) {
+ addReply(c,shared.sameobjecterr);
+ return;
+ }
+
+ /* Check if the element exists and get a reference */
+ o = lookupKeyWrite(c->db,c->argv[1]);
+ if (!o) {
+ addReply(c,shared.czero);
+ return;
+ }
+ expire = getExpire(c->db,c->argv[1]);
+
+ /* Return zero if the key already exists in the target DB */
+ if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
+ addReply(c,shared.czero);
+ return;
+ }
+ dbAdd(dst,c->argv[1],o);
+ if (expire != -1) setExpire(c,dst,c->argv[1],expire);
+ incrRefCount(o);
+
+ /* OK! key moved, free the entry in the source DB */
+ dbDelete(src,c->argv[1]);
+ server.dirty++;
+ addReply(c,shared.cone);
+}
+
+/* Helper function for dbSwapDatabases(): scans the list of keys that have
+ * one or more blocked clients for B[LR]POP or other list blocking commands
+ * and signal the keys are ready if they are lists. See the comment where
+ * the function is used for more info. */
+void scanDatabaseForReadyLists(redisDb *db) {
+ dictEntry *de;
+ dictIterator *di = dictGetSafeIterator(db->blocking_keys);
+ while((de = dictNext(di)) != NULL) {
+ robj *key = dictGetKey(de);
+ robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
+ if (value && value->type == OBJ_LIST)
+ signalListAsReady(db, key);
+ }
+ dictReleaseIterator(di);
+}
+
+/* Swap two databases at runtime so that all clients will magically see
+ * the new database even if already connected. Note that the client
+ * structure c->db points to a given DB, so we need to be smarter and
+ * swap the underlying referenced structures, otherwise we would need
+ * to fix all the references to the Redis DB structure.
+ *
+ * Returns C_ERR if at least one of the DB ids are out of range, otherwise
+ * C_OK is returned. */
+int dbSwapDatabases(int id1, int id2) {
+ if (id1 < 0 || id1 >= server.dbnum ||
+ id2 < 0 || id2 >= server.dbnum) return C_ERR;
+ if (id1 == id2) return C_OK;
+ redisDb aux = server.db[id1];
+ redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
+
+ /* Swap hash tables. Note that we don't swap blocking_keys,
+ * ready_keys and watched_keys, since we want clients to
+ * remain in the same DB they were. */
+ db1->dict = db2->dict;
+ db1->expires = db2->expires;
+ db1->avg_ttl = db2->avg_ttl;
+
+ db2->dict = aux.dict;
+ db2->expires = aux.expires;
+ db2->avg_ttl = aux.avg_ttl;
+
+ /* Now we need to handle clients blocked on lists: as an effect
+ * of swapping the two DBs, a client that was waiting for list
+ * X in a given DB, may now actually be unblocked if X happens
+ * to exist in the new version of the DB, after the swap.
+ *
+ * However normally we only do this check for efficiency reasons
+ * in dbAdd() when a list is created. So here we need to rescan
+ * the list of clients blocked on lists and signal lists as ready
+ * if needed. */
+ scanDatabaseForReadyLists(db1);
+ scanDatabaseForReadyLists(db2);
+ return C_OK;
+}
+
+/* SWAPDB db1 db2 */
+void swapdbCommand(client *c) {
+ long id1, id2;
+
+ /* Not allowed in cluster mode: we have just DB 0 there. */
+ if (server.cluster_enabled) {
+ addReplyError(c,"SWAPDB is not allowed in cluster mode");
+ return;
+ }
+
+ /* Get the two DBs indexes. */
+ if (getLongFromObjectOrReply(c, c->argv[1], &id1,
+ "invalid first DB index") != C_OK)
+ return;
+
+ if (getLongFromObjectOrReply(c, c->argv[2], &id2,
+ "invalid second DB index") != C_OK)
+ return;
+
+ /* Swap... */
+ if (dbSwapDatabases(id1,id2) == C_ERR) {
+ addReplyError(c,"DB index is out of range");
+ return;
+ } else {
+ server.dirty++;
+ addReply(c,shared.ok);
+ }
+}
+
+/*-----------------------------------------------------------------------------
+ * Expires API
+ *----------------------------------------------------------------------------*/
+
+int removeExpire(redisDb *db, robj *key) {
+ /* An expire may only be removed if there is a corresponding entry in the
+ * main dict. Otherwise, the key will never be freed. */
+ serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
+ return dictDelete(db->expires,key->ptr) == DICT_OK;
+}
+
+/* Set an expire to the specified key. If the expire is set in the context
+ * of an user calling a command 'c' is the client, otherwise 'c' is set
+ * to NULL. The 'when' parameter is the absolute unix time in milliseconds
+ * after which the key will no longer be considered valid. */
+void setExpire(client *c, redisDb *db, robj *key, long long when) {
+ dictEntry *kde, *de;
+
+ /* Reuse the sds from the main dict in the expire dict */
+ kde = dictFind(db->dict,key->ptr);
+ serverAssertWithInfo(NULL,key,kde != NULL);
+ de = dictAddOrFind(db->expires,dictGetKey(kde));
+ dictSetSignedIntegerVal(de,when);
+
+ int writable_slave = server.masterhost && server.repl_slave_ro == 0;
+ if (c && writable_slave && !(c->flags & CLIENT_MASTER))
+ rememberSlaveKeyWithExpire(db,key);
+}
+
+/* Return the expire time of the specified key, or -1 if no expire
+ * is associated with this key (i.e. the key is non volatile) */
+long long getExpire(redisDb *db, robj *key) {
+ dictEntry *de;
+
+ /* No expire? return ASAP */
+ if (dictSize(db->expires) == 0 ||
+ (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
+
+ /* The entry was found in the expire dict, this means it should also
+ * be present in the main dict (safety check). */
+ serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
+ return dictGetSignedIntegerVal(de);
+}
+
+/* Propagate expires into slaves and the AOF file.
+ * When a key expires in the master, a DEL operation for this key is sent
+ * to all the slaves and the AOF file if enabled.
+ *
+ * This way the key expiry is centralized in one place, and since both
+ * AOF and the master->slave link guarantee operation ordering, everything
+ * will be consistent even if we allow write operations against expiring
+ * keys. */
+void propagateExpire(redisDb *db, robj *key, int lazy) {
+ robj *argv[2];
+
+ argv[0] = lazy ? shared.unlink : shared.del;
+ argv[1] = key;
+ incrRefCount(argv[0]);
+ incrRefCount(argv[1]);
+
+ if (server.aof_state != AOF_OFF)
+ feedAppendOnlyFile(server.delCommand,db->id,argv,2);
+ replicationFeedSlaves(server.slaves,db->id,argv,2);
+
+ decrRefCount(argv[0]);
+ decrRefCount(argv[1]);
+}
+
+int expireIfNeeded(redisDb *db, robj *key) {
+ mstime_t when = getExpire(db,key);
+ mstime_t now;
+
+ if (when < 0) return 0; /* No expire for this key */
+
+ /* Don't expire anything while loading. It will be done later. */
+ if (server.loading) return 0;
+
+ /* If we are in the context of a Lua script, we claim that time is
+ * blocked to when the Lua script started. This way a key can expire
+ * only the first time it is accessed and not in the middle of the
+ * script execution, making propagation to slaves / AOF consistent.
+ * See issue #1525 on Github for more information. */
+ now = server.lua_caller ? server.lua_time_start : mstime();
+
+ /* If we are running in the context of a slave, return ASAP:
+ * the slave key expiration is controlled by the master that will
+ * send us synthesized DEL operations for expired keys.
+ *
+ * Still we try to return the right information to the caller,
+ * that is, 0 if we think the key should be still valid, 1 if
+ * we think the key is expired at this time. */
+ if (server.masterhost != NULL) return now > when;
+
+ /* Return when this key has not expired */
+ if (now <= when) return 0;
+
+ /* Delete the key */
+ server.stat_expiredkeys++;
+ propagateExpire(db,key,server.lazyfree_lazy_expire);
+ notifyKeyspaceEvent(NOTIFY_EXPIRED,
+ "expired",key,db->id);
+ return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
+ dbSyncDelete(db,key);
+}
+
+/* -----------------------------------------------------------------------------
+ * API to get key arguments from commands
+ * ---------------------------------------------------------------------------*/
+
+/* The base case is to use the keys position as given in the command table
+ * (firstkey, lastkey, step). */
+int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkeys) {
+ int j, i = 0, last, *keys;
+ UNUSED(argv);
+
+ if (cmd->firstkey == 0) {
+ *numkeys = 0;
+ return NULL;
+ }
+ last = cmd->lastkey;
+ if (last < 0) last = argc+last;
+ keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
+ for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
+ serverAssert(j < argc);
+ keys[i++] = j;
+ }
+ *numkeys = i;
+ return keys;
+}
+
+/* Return all the arguments that are keys in the command passed via argc / argv.
+ *
+ * The command returns the positions of all the key arguments inside the array,
+ * so the actual return value is an heap allocated array of integers. The
+ * length of the array is returned by reference into *numkeys.
+ *
+ * 'cmd' must be point to the corresponding entry into the redisCommand
+ * table, according to the command name in argv[0].
+ *
+ * This function uses the command table if a command-specific helper function
+ * is not required, otherwise it calls the command-specific function. */
+int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
+ if (cmd->flags & CMD_MODULE_GETKEYS) {
+ return moduleGetCommandKeysViaAPI(cmd,argv,argc,numkeys);
+ } else if (!(cmd->flags & CMD_MODULE) && cmd->getkeys_proc) {
+ return cmd->getkeys_proc(cmd,argv,argc,numkeys);
+ } else {
+ return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
+ }
+}
+
+/* Free the result of getKeysFromCommand. */
+void getKeysFreeResult(int *result) {
+ zfree(result);
+}
+
+/* Helper function to extract keys from following commands:
+ * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
+ * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
+int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
+ int i, num, *keys;
+ UNUSED(cmd);
+
+ num = atoi(argv[2]->ptr);
+ /* Sanity check. Don't return any key if the command is going to
+ * reply with syntax error. */
+ if (num > (argc-3)) {
+ *numkeys = 0;
+ return NULL;
+ }
+
+ /* Keys in z{union,inter}store come from two places:
+ * argv[1] = storage key,
+ * argv[3...n] = keys to intersect */
+ keys = zmalloc(sizeof(int)*(num+1));
+
+ /* Add all key positions for argv[3...n] to keys[] */
+ for (i = 0; i < num; i++) keys[i] = 3+i;
+
+ /* Finally add the argv[1] key position (the storage key target). */
+ keys[num] = 1;
+ *numkeys = num+1; /* Total keys = {union,inter} keys + storage key */
+ return keys;
+}
+
+/* Helper function to extract keys from the following commands:
+ * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff]
+ * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */
+int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
+ int i, num, *keys;
+ UNUSED(cmd);
+
+ num = atoi(argv[2]->ptr);
+ /* Sanity check. Don't return any key if the command is going to
+ * reply with syntax error. */
+ if (num > (argc-3)) {
+ *numkeys = 0;
+ return NULL;
+ }
+
+ keys = zmalloc(sizeof(int)*num);
+ *numkeys = num;
+
+ /* Add all key positions for argv[3...n] to keys[] */
+ for (i = 0; i < num; i++) keys[i] = 3+i;
+
+ return keys;
+}
+
+/* Helper function to extract keys from the SORT command.
+ *
+ * SORT <sort-key> ... STORE <store-key> ...
+ *
+ * The first argument of SORT is always a key, however a list of options
+ * follow in SQL-alike style. Here we parse just the minimum in order to
+ * correctly identify keys in the "STORE" option. */
+int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
+ int i, j, num, *keys, found_store = 0;
+ UNUSED(cmd);
+
+ num = 0;
+ keys = zmalloc(sizeof(int)*2); /* Alloc 2 places for the worst case. */
+
+ keys[num++] = 1; /* <sort-key> is always present. */
+
+ /* Search for STORE option. By default we consider options to don't
+ * have arguments, so if we find an unknown option name we scan the
+ * next. However there are options with 1 or 2 arguments, so we
+ * provide a list here in order to skip the right number of args. */
+ struct {
+ char *name;
+ int skip;
+ } skiplist[] = {
+ {"limit", 2},
+ {"get", 1},
+ {"by", 1},
+ {NULL, 0} /* End of elements. */
+ };
+
+ for (i = 2; i < argc; i++) {
+ for (j = 0; skiplist[j].name != NULL; j++) {
+ if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
+ i += skiplist[j].skip;
+ break;
+ } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
+ /* Note: we don't increment "num" here and continue the loop
+ * to be sure to process the *last* "STORE" option if multiple
+ * ones are provided. This is same behavior as SORT. */
+ found_store = 1;
+ keys[num] = i+1; /* <store-key> */
+ break;
+ }
+ }
+ }
+ *numkeys = num + found_store;
+ return keys;
+}
+
+int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
+ int i, num, first, *keys;
+ UNUSED(cmd);
+
+ /* Assume the obvious form. */
+ first = 3;
+ num = 1;
+
+ /* But check for the extended one with the KEYS option. */
+ if (argc > 6) {
+ for (i = 6; i < argc; i++) {
+ if (!strcasecmp(argv[i]->ptr,"keys") &&
+ sdslen(argv[3]->ptr) == 0)
+ {
+ first = i+1;
+ num = argc-first;
+ break;
+ }
+ }
+ }
+
+ keys = zmalloc(sizeof(int)*num);
+ for (i = 0; i < num; i++) keys[i] = first+i;
+ *numkeys = num;
+ return keys;
+}
+
+/* Slot to Key API. This is used by Redis Cluster in order to obtain in
+ * a fast way a key that belongs to a specified hash slot. This is useful
+ * while rehashing the cluster. */
+void slotToKeyAdd(robj *key) {
+ unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
+
+ sds sdskey = sdsdup(key->ptr);
+ zslInsert(server.cluster->slots_to_keys,hashslot,sdskey);
+}
+
+void slotToKeyDel(robj *key) {
+ unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
+ zslDelete(server.cluster->slots_to_keys,hashslot,key->ptr,NULL);
+}
+
+void slotToKeyFlush(void) {
+ zslFree(server.cluster->slots_to_keys);
+ server.cluster->slots_to_keys = zslCreate();
+}
+
+/* Pupulate the specified array of objects with keys in the specified slot.
+ * New objects are returned to represent keys, it's up to the caller to
+ * decrement the reference count to release the keys names. */
+unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
+ zskiplistNode *n;
+ zrangespec range;
+ int j = 0;
+
+ range.min = range.max = hashslot;
+ range.minex = range.maxex = 0;
+
+ n = zslFirstInRange(server.cluster->slots_to_keys, &range);
+ while(n && n->score == hashslot && count--) {
+ keys[j++] = createStringObject(n->ele,sdslen(n->ele));
+ n = n->level[0].forward;
+ }
+ return j;
+}
+
+/* Remove all the keys in the specified hash slot.
+ * The number of removed items is returned. */
+unsigned int delKeysInSlot(unsigned int hashslot) {
+ zskiplistNode *n;
+ zrangespec range;
+ int j = 0;
+
+ range.min = range.max = hashslot;
+ range.minex = range.maxex = 0;
+
+ n = zslFirstInRange(server.cluster->slots_to_keys, &range);
+ while(n && n->score == hashslot) {
+ sds sdskey = n->ele;
+ robj *key = createStringObject(sdskey,sdslen(sdskey));
+ n = n->level[0].forward; /* Go to the next item before freeing it. */
+ dbDelete(&server.db[0],key);
+ decrRefCount(key);
+ j++;
+ }
+ return j;
+}
+
+unsigned int countKeysInSlot(unsigned int hashslot) {
+ zskiplist *zsl = server.cluster->slots_to_keys;
+ zskiplistNode *zn;
+ zrangespec range;
+ int rank, count = 0;
+
+ range.min = range.max = hashslot;
+ range.minex = range.maxex = 0;
+
+ /* Find first element in range */
+ zn = zslFirstInRange(zsl, &range);
+
+ /* Use rank of first element, if any, to determine preliminary count */
+ if (zn != NULL) {
+ rank = zslGetRank(zsl, zn->score, zn->ele);
+ count = (zsl->length - (rank - 1));
+
+ /* Find last element in range */
+ zn = zslLastInRange(zsl, &range);
+
+ /* Use rank of last element, if any, to determine the actual count */
+ if (zn != NULL) {
+ rank = zslGetRank(zsl, zn->score, zn->ele);
+ count -= (zsl->length - rank);
+ }
+ }
+ return count;
+}