summaryrefslogtreecommitdiff
path: root/cache/src/tango_cache_redis.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cache/src/tango_cache_redis.cpp')
-rw-r--r--cache/src/tango_cache_redis.cpp422
1 files changed, 0 insertions, 422 deletions
diff --git a/cache/src/tango_cache_redis.cpp b/cache/src/tango_cache_redis.cpp
deleted file mode 100644
index 54fd8d0..0000000
--- a/cache/src/tango_cache_redis.cpp
+++ /dev/null
@@ -1,422 +0,0 @@
-#include <sys/types.h>
-#include <unistd.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <assert.h>
-#include <errno.h>
-#include <sys/time.h>
-#include <time.h>
-#include <string.h>
-
-#include <hiredis-vip/hircluster.h>
-#include <hiredis-vip/async.h>
-#include <hiredis-vip/adapters/libevent.h>
-#include <cjson/cJSON.h>
-
-#include "tango_cache_transfer.h"
-#include "tango_cache_tools.h"
-#include "tango_cache_redis.h"
-
-#define PARSE_JSON_RET_ERROR -1
-#define PARSE_JSON_RET_TIMEOUT 0
-#define PARSE_JSON_RET_SUCC 1
-
-#define CACHE_REDIS_CONNECT_IDLE 0
-#define CACHE_REDIS_CONNECTING 1
-#define CACHE_REDIS_CONNECTED 2
-#define CACHE_REDIS_DISCONNECTED 3
-
-static void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status)
-{
- struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac);
-
- if(status == REDIS_OK)
- {
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Redis disconnect %s:%d success.",
- ac->c.tcp.host, ac->c.tcp.port);
- }
- else
- {
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Redis disconnect %s:%d failed: %s.",
- ac->c.tcp.host, ac->c.tcp.port, ac->errstr);
- }
- instance->redis_connecting = CACHE_REDIS_DISCONNECTED;
-}
-
-static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status)
-{
- struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac);
-
- if(status == REDIS_OK)
- {
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "RedisCluster connect %s:%d success.",
- ac->c.tcp.host, ac->c.tcp.port);
- instance->redis_connecting = CACHE_REDIS_CONNECTED;
- }
- else
- {
- instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE;
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "RedisCluster connect %s:%d failed: %s.",
- ac->c.tcp.host, ac->c.tcp.port, ac->errstr);
- }
-}
-
-int redis_asyn_connect_init(struct tango_cache_instance *instance)
-{
- instance->redis_ac = redisClusterAsyncConnect(instance->param->redisaddrs, HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
- if(instance->redis_ac == NULL)
- {
- return -1;
- }
- instance->redis_connecting = CACHE_REDIS_CONNECTING;
- redisClusterLibeventAttach(instance->redis_ac, instance->evbase);
- redisClusterAsyncSetConnectionData(instance->redis_ac, instance);
- redisClusterAsyncSetConnectCallback(instance->redis_ac, redis_asyn_connect_cb);
- redisClusterAsyncSetDisconnectCallback(instance->redis_ac, redis_asyn_disconnect_cb);
- return 0;
-}
-
-static int parse_object_meta_json(struct tango_cache_ctx *ctx, const char *jcontent)
-{
- cJSON *root, *ptarget;
- int ret = PARSE_JSON_RET_ERROR;
- char usertag[2048];
- size_t datalen;
-
- if(NULL == (root=cJSON_Parse(jcontent)))
- {
- goto out_json;
- }
- if(NULL == (ptarget=cJSON_GetObjectItem(root, "Content-Length")))
- {
- goto out_json;
- }
- ctx->get.result.tlength = ptarget->valuedouble;
- if(NULL==(ptarget=cJSON_GetObjectItem(root, "X-Amz-Meta-Lm")))
- {
- goto out_json;
- }
- ctx->get.last_modify = ptarget->valuedouble;
- if(NULL==(ptarget=cJSON_GetObjectItem(root, "Expires")))
- {
- goto out_json;
- }
- ctx->get.expires = ptarget->valuedouble;
- ctx->get.need_hdrs = RESPONSE_HDR_ALL;
- if(!check_expires_fresh_header(ctx))
- {
- ret = PARSE_JSON_RET_TIMEOUT;
- goto out_json;
- }
-
- if(NULL!=(ptarget=cJSON_GetObjectItem(root, "Headers")))
- {
- easy_string_savedata(&ctx->response, ptarget->valuestring, strlen(ptarget->valuestring));
- }
- if(NULL!=(ptarget=cJSON_GetObjectItem(root, "X-Amz-Meta-User")))
- {
- if((datalen = Base64_DecodeBlock((unsigned char*)ptarget->valuestring, strlen(ptarget->valuestring), (unsigned char*)usertag, 2048))>0)
- {
- easy_string_savedata(&ctx->get.response_tag, usertag, datalen);
- }
- }
- cJSON_Delete(root);
- return PARSE_JSON_RET_SUCC;
-
-out_json:
- cJSON_Delete(root);
- return ret;
-}
-
-static void redis_hget_command_cb(struct redisClusterAsyncContext *ac, void *vreply, void *privdata)
-{
- redisReply *reply = (redisReply *)vreply;
- struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata;
- int ret;
-
- ctx->instance->statistic.session_redis -= 1;
- if(reply == NULL || reply->type!=REDIS_REPLY_ARRAY)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_EXEC);
- if(reply!=NULL && reply->type==REDIS_REPLY_ERROR)
- {
- promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, reply->str);
- }
- else
- {
- promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
- }
- tango_cache_ctx_destroy(ctx);
- return;
- }
- else if(reply->element[0]->type == REDIS_REPLY_NIL)
- {
- tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS);
- ctx->get.result.type = RESULT_TYPE_MISS;
- promise_success(ctx->promise, &ctx->get.result);
- promise_finish(ctx->promise);
- tango_cache_ctx_destroy(ctx);
- return;
- }
-
- switch(ctx->get.state)
- {
- case GET_STATE_REDIS_META:
- ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_HOS:OBJECT_IN_REDIS;
- break;
- case GET_STATE_REDIS_ALL:
- ctx->get.result.location = OBJECT_IN_REDIS;
- break;
-
- case GET_STATE_REDIS_TRY:
- ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_HOS:OBJECT_IN_REDIS;
- if(ctx->get.result.location == OBJECT_IN_HOS)
- {
- ctx->get.redis_redirect_minio_cb(ctx);
- return;
- }
- ctx->locate = OBJECT_IN_REDIS;
- break;
- default: assert(0);break;
- }
-
- ret = parse_object_meta_json(ctx, reply->element[0]->str);
- switch(ret)
- {
- case PARSE_JSON_RET_ERROR:
- tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON);
- promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
- tango_cache_ctx_destroy(ctx);
- break;
- case PARSE_JSON_RET_TIMEOUT:
- if(ctx->get.state == GET_STATE_DELETE && ctx->get.result.location==OBJECT_IN_HOS)
- {
- ctx->get.state = GET_STATE_END;
- cache_delete_minio_object(ctx);
- }
- else
- {
- tango_cache_ctx_destroy(ctx);
- }
- break;
- case PARSE_JSON_RET_SUCC:
- fetch_header_over_biz(ctx);
- if(ctx->get.state != GET_STATE_REDIS_META)
- {
- ctx->get.result.data_frag = reply->element[2]->str;
- ctx->get.result.size = reply->element[2]->len;
- ctx->get.result.type = RESULT_TYPE_BODY;
- promise_success(ctx->promise, &ctx->get.result);
- }
- ctx->get.result.type = RESULT_TYPE_END;
- promise_success(ctx->promise, &ctx->get.result);
- promise_finish(ctx->promise);
- tango_cache_ctx_destroy(ctx);
- break;
- default: assert(0);break;
- }
-}
-
-int tango_cache_head_redis(struct tango_cache_ctx *ctx)
-{
- int ret = -1;
-
- ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx,
- "HMGET %s OBJECT_META OBJECT_LOCATION", ctx->object_key);
- if(ret != REDIS_OK)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
- tango_cache_ctx_destroy(ctx);
- }
- else
- {
- ctx->instance->statistic.session_redis += 1;
- ctx->get.state = GET_STATE_REDIS_META;
- }
- return ret;
-}
-
-int tango_cache_fetch_redis(struct tango_cache_ctx *ctx)
-{
- int ret = -1;
-
- ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx,
- "HMGET %s OBJECT_META OBJECT_LOCATION OBJECT_BODY", ctx->object_key);
- if(ret != REDIS_OK)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
- tango_cache_ctx_destroy(ctx);
- }
- else
- {
- ctx->instance->statistic.session_redis += 1;
- ctx->get.state = GET_STATE_REDIS_ALL;
- }
- return ret;
-}
-
-int tango_cache_try_fetch_redis(struct tango_cache_ctx *ctx)
-{
- int ret = -1;
-
- ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx,
- "HMGET %s OBJECT_META OBJECT_LOCATION OBJECT_BODY", ctx->object_key);
- if(ret != REDIS_OK)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
- tango_cache_ctx_destroy(ctx);
- }
- else
- {
- ctx->instance->statistic.session_redis += 1;
- ctx->get.state = GET_STATE_REDIS_TRY;
- }
- return ret;
-}
-
-static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vreply, void *privdata)
-{
- struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata;
- redisReply *reply = (redisReply *)vreply;
- int ret;
-
- ctx->instance->statistic.session_redis -= 1;
- if(reply == NULL || reply->type==REDIS_REPLY_ERROR || ac->err)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_EXEC);
- }
- if(ctx->fail_state)
- {
- if(ctx->put.state==PUT_STATE_REDIS_META || ctx->put.state==PUT_STATE_REDIS_SETEX) //����һ��������ص�
- {
- ctx->put.state = PUT_STATE_END;
- return;
- }
- tango_cache_ctx_destroy(ctx, true);
- return;
- }
-
- switch(ctx->put.state)
- {
- case PUT_STATE_REDIS_META:
- ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx,
- "EXPIRE %s %u", ctx->object_key, ctx->put.object_ttl);
- if(ret!=REDIS_OK)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
- ctx->put.state = PUT_STATE_END;
- }
- else
- {
- ctx->instance->statistic.session_redis += 1;
- ctx->put.state = PUT_STATE_REDIS_SETEX;
- }
- break;
- case PUT_STATE_REDIS_EXPIRE:
- ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx,
- "EXPIRE %s %u", ctx->object_key, ctx->put.object_ttl);
- if(ret != REDIS_OK)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
- tango_cache_ctx_destroy(ctx, true);
- }
- else
- {
- ctx->instance->statistic.session_redis += 1;
- ctx->put.state = PUT_STATE_END;
- }
- break;
- case PUT_STATE_REDIS_SETEX:
- ctx->put.state = PUT_STATE_END; //����һ��EXPIRE���������
- break;
- case PUT_STATE_END:
- tango_cache_ctx_destroy(ctx, true);
- break;
- default: assert(0);break;
- }
-}
-
-int redis_put_minio_object_meta(struct tango_cache_ctx *ctx, bool callback)
-{
- int ret_mset, ret_set;
- char *meta;
-
- meta = cJSON_PrintUnformatted(ctx->put.object_meta);
-
- ret_mset = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx,
- "HMSET %s OBJECT_LOCATION minio OBJECT_META %s MINIO_ADDR %s", ctx->object_key, meta, ctx->hostaddr);
- ret_set = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx,
- "SET http://%s/%s 1 EX %u", ctx->hostaddr, ctx->object_key, ctx->put.object_ttl);
- if(ret_mset==REDIS_OK && ret_set==REDIS_OK)
- {
- ctx->instance->statistic.session_redis += 2;
- ctx->put.state = PUT_STATE_REDIS_META;
- }
- else
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
- if(ret_mset==REDIS_OK)
- {
- ctx->instance->statistic.session_redis += 1;
- ctx->put.state = PUT_STATE_REDIS_EXPIRE; //��ʱ��PUT����object�߼�һ��
- }
- else if(ret_set==REDIS_OK)
- {
- ctx->instance->statistic.session_redis += 1;
- ctx->put.state = PUT_STATE_END;
- }
- else
- {
- tango_cache_ctx_destroy(ctx, callback);
- free(meta);
- return -1;
- }
- }
- free(meta);
- return 0;
-}
-
-int redis_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback)
-{
- int ret;
- char *meta;
-
- ctx->instance->statistic.memory_used -= size;
- meta = cJSON_PrintUnformatted(ctx->put.object_meta);
- ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx,
- "HMSET %s OBJECT_LOCATION redis OBJECT_META %s OBJECT_BODY %b", ctx->object_key, meta, data, size);
- if(ret != REDIS_OK)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
- tango_cache_ctx_destroy(ctx, callback);
- }
- else
- {
- ctx->instance->statistic.session_redis += 1;
- ctx->put.state = PUT_STATE_REDIS_EXPIRE;
- }
- if(way == PUT_MEM_FREE)
- {
- free((void *)data);
- }
- free(meta);
- return ret;
-}
-
-int redis_put_complete_part_evbuf(struct tango_cache_ctx *ctx, size_t object_size, bool callback)
-{
- char *data;
- size_t size;
-
- data = (char *)malloc(object_size);
- size = evbuffer_remove(ctx->put.evbuf, data, object_size);
- if(size != object_size)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER);
- tango_cache_ctx_destroy(ctx, callback);
- free(data);
- return CACHE_ERR_EVBUFFER;
- }
- return redis_put_complete_part_data(ctx, PUT_MEM_FREE, data, object_size, callback);
-}
-