diff options
Diffstat (limited to 'src/expire.c')
| -rw-r--r-- | src/expire.c | 502 |
1 files changed, 502 insertions, 0 deletions
diff --git a/src/expire.c b/src/expire.c new file mode 100644 index 0000000..637139f --- /dev/null +++ b/src/expire.c @@ -0,0 +1,502 @@ +/* Implementation of EXPIRE (keys with fixed time to live). + * + * ---------------------------------------------------------------------------- + * + * Copyright (c) 2009-2016, Salvatore Sanfilippo <antirez at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" + +/*----------------------------------------------------------------------------- + * Incremental collection of expired keys. + * + * When keys are accessed they are expired on-access. However we need a + * mechanism in order to ensure keys are eventually removed when expired even + * if no access is performed on them. + *----------------------------------------------------------------------------*/ + +/* Helper function for the activeExpireCycle() function. + * This function will try to expire the key that is stored in the hash table + * entry 'de' of the 'expires' hash table of a Redis database. + * + * If the key is found to be expired, it is removed from the database and + * 1 is returned. Otherwise no operation is performed and 0 is returned. + * + * When a key is expired, server.stat_expiredkeys is incremented. + * + * The parameter 'now' is the current time in milliseconds as is passed + * to the function to avoid too many gettimeofday() syscalls. */ +int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { + long long t = dictGetSignedIntegerVal(de); + if (now > t) { + sds key = dictGetKey(de); + robj *keyobj = createStringObject(key,sdslen(key)); + + propagateExpire(db,keyobj,server.lazyfree_lazy_expire); + if (server.lazyfree_lazy_expire) + dbAsyncDelete(db,keyobj); + else + dbSyncDelete(db,keyobj); + notifyKeyspaceEvent(NOTIFY_EXPIRED, + "expired",keyobj,db->id); + decrRefCount(keyobj); + server.stat_expiredkeys++; + return 1; + } else { + return 0; + } +} + +/* Try to expire a few timed out keys. The algorithm used is adaptive and + * will use few CPU cycles if there are few expiring keys, otherwise + * it will get more aggressive to avoid that too much memory is used by + * keys that can be removed from the keyspace. + * + * No more than CRON_DBS_PER_CALL databases are tested at every + * iteration. + * + * This kind of call is used when Redis detects that timelimit_exit is + * true, so there is more work to do, and we do it more incrementally from + * the beforeSleep() function of the event loop. + * + * Expire cycle type: + * + * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a + * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION + * microseconds, and is not repeated again before the same amount of time. + * + * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is + * executed, where the time limit is a percentage of the REDIS_HZ period + * as specified by the REDIS_EXPIRELOOKUPS_TIME_PERC define. */ + +void activeExpireCycle(int type) { + /* This function has some global state in order to continue the work + * incrementally across calls. */ + static unsigned int current_db = 0; /* Last DB tested. */ + static int timelimit_exit = 0; /* Time limit hit in previous call? */ + static long long last_fast_cycle = 0; /* When last fast cycle ran. */ + + int j, iteration = 0; + int dbs_per_call = CRON_DBS_PER_CALL; + long long start = ustime(), timelimit; + + if (type == ACTIVE_EXPIRE_CYCLE_FAST) { + /* Don't start a fast cycle if the previous cycle did not exited + * for time limt. Also don't repeat a fast cycle for the same period + * as the fast cycle total duration itself. */ + if (!timelimit_exit) return; + if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; + last_fast_cycle = start; + } + + /* We usually should test CRON_DBS_PER_CALL per iteration, with + * two exceptions: + * + * 1) Don't test more DBs than we have. + * 2) If last time we hit the time limit, we want to scan all DBs + * in this iteration, as there is work to do in some DB and we don't want + * expired keys to use memory for too much time. */ + if (dbs_per_call > server.dbnum || timelimit_exit) + dbs_per_call = server.dbnum; + + /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time + * per iteration. Since this function gets called with a frequency of + * server.hz times per second, the following is the max amount of + * microseconds we can spend in this function. */ + timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; + timelimit_exit = 0; + if (timelimit <= 0) timelimit = 1; + + if (type == ACTIVE_EXPIRE_CYCLE_FAST) + timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ + + for (j = 0; j < dbs_per_call; j++) { + int expired; + redisDb *db = server.db+(current_db % server.dbnum); + + /* Increment the DB now so we are sure if we run out of time + * in the current DB we'll restart from the next. This allows to + * distribute the time evenly across DBs. */ + current_db++; + + /* Continue to expire if at the end of the cycle more than 25% + * of the keys were expired. */ + do { + unsigned long num, slots; + long long now, ttl_sum; + int ttl_samples; + + /* If there is nothing to expire try next DB ASAP. */ + if ((num = dictSize(db->expires)) == 0) { + db->avg_ttl = 0; + break; + } + slots = dictSlots(db->expires); + now = mstime(); + + /* When there are less than 1% filled slots getting random + * keys is expensive, so stop here waiting for better times... + * The dictionary will be resized asap. */ + if (num && slots > DICT_HT_INITIAL_SIZE && + (num*100/slots < 1)) break; + + /* The main collection cycle. Sample random keys among keys + * with an expire set, checking for expired ones. */ + expired = 0; + ttl_sum = 0; + ttl_samples = 0; + + if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) + num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; + + while (num--) { + dictEntry *de; + long long ttl; + + if ((de = dictGetRandomKey(db->expires)) == NULL) break; + ttl = dictGetSignedIntegerVal(de)-now; + if (activeExpireCycleTryExpire(db,de,now)) expired++; + if (ttl > 0) { + /* We want the average TTL of keys yet not expired. */ + ttl_sum += ttl; + ttl_samples++; + } + } + + /* Update the average TTL stats for this database. */ + if (ttl_samples) { + long long avg_ttl = ttl_sum/ttl_samples; + + /* Do a simple running average with a few samples. + * We just use the current estimate with a weight of 2% + * and the previous estimate with a weight of 98%. */ + if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; + db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); + } + + /* We can't block forever here even if there are many keys to + * expire. So after a given amount of milliseconds return to the + * caller waiting for the other active expire cycle. */ + iteration++; + if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ + long long elapsed = ustime()-start; + + latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); + if (elapsed > timelimit) timelimit_exit = 1; + } + if (timelimit_exit) return; + /* We don't repeat the cycle if there are less than 25% of keys + * found expired in the current DB. */ + } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); + } +} + +/*----------------------------------------------------------------------------- + * Expires of keys created in writable slaves + * + * Normally slaves do not process expires: they wait the masters to synthesize + * DEL operations in order to retain consistency. However writable slaves are + * an exception: if a key is created in the slave and an expire is assigned + * to it, we need a way to expire such a key, since the master does not know + * anything about such a key. + * + * In order to do so, we track keys created in the slave side with an expire + * set, and call the expireSlaveKeys() function from time to time in order to + * reclaim the keys if they already expired. + * + * Note that the use case we are trying to cover here, is a popular one where + * slaves are put in writable mode in order to compute slow operations in + * the slave side that are mostly useful to actually read data in a more + * processed way. Think at sets intersections in a tmp key, with an expire so + * that it is also used as a cache to avoid intersecting every time. + * + * This implementation is currently not perfect but a lot better than leaking + * the keys as implemented in 3.2. + *----------------------------------------------------------------------------*/ + +/* The dictionary where we remember key names and database ID of keys we may + * want to expire from the slave. Since this function is not often used we + * don't even care to initialize the database at startup. We'll do it once + * the feature is used the first time, that is, when rememberSlaveKeyWithExpire() + * is called. + * + * The dictionary has an SDS string representing the key as the hash table + * key, while the value is a 64 bit unsigned integer with the bits corresponding + * to the DB where the keys may exist set to 1. Currently the keys created + * with a DB id > 63 are not expired, but a trivial fix is to set the bitmap + * to the max 64 bit unsigned value when we know there is a key with a DB + * ID greater than 63, and check all the configured DBs in such a case. */ +dict *slaveKeysWithExpire = NULL; + +/* Check the set of keys created by the master with an expire set in order to + * check if they should be evicted. */ +void expireSlaveKeys(void) { + if (slaveKeysWithExpire == NULL || + dictSize(slaveKeysWithExpire) == 0) return; + + int cycles = 0, noexpire = 0; + mstime_t start = mstime(); + while(1) { + dictEntry *de = dictGetRandomKey(slaveKeysWithExpire); + sds keyname = dictGetKey(de); + uint64_t dbids = dictGetUnsignedIntegerVal(de); + uint64_t new_dbids = 0; + + /* Check the key against every database corresponding to the + * bits set in the value bitmap. */ + int dbid = 0; + while(dbids && dbid < server.dbnum) { + if ((dbids & 1) != 0) { + redisDb *db = server.db+dbid; + dictEntry *expire = dictFind(db->expires,keyname); + int expired = 0; + + if (expire && + activeExpireCycleTryExpire(server.db+dbid,expire,start)) + { + expired = 1; + } + + /* If the key was not expired in this DB, we need to set the + * corresponding bit in the new bitmap we set as value. + * At the end of the loop if the bitmap is zero, it means we + * no longer need to keep track of this key. */ + if (expire && !expired) { + noexpire++; + new_dbids |= (uint64_t)1 << dbid; + } + } + dbid++; + dbids >>= 1; + } + + /* Set the new bitmap as value of the key, in the dictionary + * of keys with an expire set directly in the writable slave. Otherwise + * if the bitmap is zero, we no longer need to keep track of it. */ + if (new_dbids) + dictSetUnsignedIntegerVal(de,new_dbids); + else + dictDelete(slaveKeysWithExpire,keyname); + + /* Stop conditions: found 3 keys we cna't expire in a row or + * time limit was reached. */ + cycles++; + if (noexpire > 3) break; + if ((cycles % 64) == 0 && mstime()-start > 1) break; + if (dictSize(slaveKeysWithExpire) == 0) break; + } +} + +/* Track keys that received an EXPIRE or similar command in the context + * of a writable slave. */ +void rememberSlaveKeyWithExpire(redisDb *db, robj *key) { + if (slaveKeysWithExpire == NULL) { + static dictType dt = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL /* val destructor */ + }; + slaveKeysWithExpire = dictCreate(&dt,NULL); + } + if (db->id > 63) return; + + dictEntry *de = dictAddOrFind(slaveKeysWithExpire,key->ptr); + /* If the entry was just created, set it to a copy of the SDS string + * representing the key: we don't want to need to take those keys + * in sync with the main DB. The keys will be removed by expireSlaveKeys() + * as it scans to find keys to remove. */ + if (de->key == key->ptr) { + de->key = sdsdup(key->ptr); + dictSetUnsignedIntegerVal(de,0); + } + + uint64_t dbids = dictGetUnsignedIntegerVal(de); + dbids |= (uint64_t)1 << db->id; + dictSetUnsignedIntegerVal(de,dbids); +} + +/* Return the number of keys we are tracking. */ +size_t getSlaveKeyWithExpireCount(void) { + if (slaveKeysWithExpire == NULL) return 0; + return dictSize(slaveKeysWithExpire); +} + +/* Remove the keys in the hash table. We need to do that when data is + * flushed from the server. We may receive new keys from the master with + * the same name/db and it is no longer a good idea to expire them. + * + * Note: technically we should handle the case of a single DB being flushed + * but it is not worth it since anyway race conditions using the same set + * of key names in a wriatable slave and in its master will lead to + * inconsistencies. This is just a best-effort thing we do. */ +void flushSlaveKeysWithExpireList(void) { + if (slaveKeysWithExpire) { + dictRelease(slaveKeysWithExpire); + slaveKeysWithExpire = NULL; + } +} + +/*----------------------------------------------------------------------------- + * Expires Commands + *----------------------------------------------------------------------------*/ + +/* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT + * and PEXPIREAT. Because the commad second argument may be relative or absolute + * the "basetime" argument is used to signal what the base time is (either 0 + * for *AT variants of the command, or the current time for relative expires). + * + * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for + * the argv[2] parameter. The basetime is always specified in milliseconds. */ +void expireGenericCommand(client *c, long long basetime, int unit) { + robj *key = c->argv[1], *param = c->argv[2]; + long long when; /* unix time in milliseconds when the key will expire. */ + + if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK) + return; + + if (unit == UNIT_SECONDS) when *= 1000; + when += basetime; + + /* No key, return zero. */ + if (lookupKeyWrite(c->db,key) == NULL) { + addReply(c,shared.czero); + return; + } + + /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past + * should never be executed as a DEL when load the AOF or in the context + * of a slave instance. + * + * Instead we take the other branch of the IF statement setting an expire + * (possibly in the past) and wait for an explicit DEL from the master. */ + if (when <= mstime() && !server.loading && !server.masterhost) { + robj *aux; + + int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) : + dbSyncDelete(c->db,key); + serverAssertWithInfo(c,key,deleted); + server.dirty++; + + /* Replicate/AOF this as an explicit DEL or UNLINK. */ + aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del; + rewriteClientCommandVector(c,2,aux,key); + signalModifiedKey(c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); + addReply(c, shared.cone); + return; + } else { + setExpire(c,c->db,key,when); + addReply(c,shared.cone); + signalModifiedKey(c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); + server.dirty++; + return; + } +} + +/* EXPIRE key seconds */ +void expireCommand(client *c) { + expireGenericCommand(c,mstime(),UNIT_SECONDS); +} + +/* EXPIREAT key time */ +void expireatCommand(client *c) { + expireGenericCommand(c,0,UNIT_SECONDS); +} + +/* PEXPIRE key milliseconds */ +void pexpireCommand(client *c) { + expireGenericCommand(c,mstime(),UNIT_MILLISECONDS); +} + +/* PEXPIREAT key ms_time */ +void pexpireatCommand(client *c) { + expireGenericCommand(c,0,UNIT_MILLISECONDS); +} + +/* Implements TTL and PTTL */ +void ttlGenericCommand(client *c, int output_ms) { + long long expire, ttl = -1; + + /* If the key does not exist at all, return -2 */ + if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == NULL) { + addReplyLongLong(c,-2); + return; + } + /* The key exists. Return -1 if it has no expire, or the actual + * TTL value otherwise. */ + expire = getExpire(c->db,c->argv[1]); + if (expire != -1) { + ttl = expire-mstime(); + if (ttl < 0) ttl = 0; + } + if (ttl == -1) { + addReplyLongLong(c,-1); + } else { + addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000)); + } +} + +/* TTL key */ +void ttlCommand(client *c) { + ttlGenericCommand(c, 0); +} + +/* PTTL key */ +void pttlCommand(client *c) { + ttlGenericCommand(c, 1); +} + +/* PERSIST key */ +void persistCommand(client *c) { + dictEntry *de; + + de = dictFind(c->db->dict,c->argv[1]->ptr); + if (de == NULL) { + addReply(c,shared.czero); + } else { + if (removeExpire(c->db,c->argv[1])) { + addReply(c,shared.cone); + server.dirty++; + } else { + addReply(c,shared.czero); + } + } +} + +/* TOUCH key1 [key2 key3 ... keyN] */ +void touchCommand(client *c) { + int touched = 0; + for (int j = 1; j < c->argc; j++) + if (lookupKeyRead(c->db,c->argv[j]) != NULL) touched++; + addReplyLongLong(c,touched); +} + |
