diff options
Diffstat (limited to 'cache/src/tango_cache_redis.cpp')
| -rw-r--r-- | cache/src/tango_cache_redis.cpp | 422 |
1 files changed, 0 insertions, 422 deletions
diff --git a/cache/src/tango_cache_redis.cpp b/cache/src/tango_cache_redis.cpp deleted file mode 100644 index 54fd8d0..0000000 --- a/cache/src/tango_cache_redis.cpp +++ /dev/null @@ -1,422 +0,0 @@ -#include <sys/types.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <assert.h> -#include <errno.h> -#include <sys/time.h> -#include <time.h> -#include <string.h> - -#include <hiredis-vip/hircluster.h> -#include <hiredis-vip/async.h> -#include <hiredis-vip/adapters/libevent.h> -#include <cjson/cJSON.h> - -#include "tango_cache_transfer.h" -#include "tango_cache_tools.h" -#include "tango_cache_redis.h" - -#define PARSE_JSON_RET_ERROR -1 -#define PARSE_JSON_RET_TIMEOUT 0 -#define PARSE_JSON_RET_SUCC 1 - -#define CACHE_REDIS_CONNECT_IDLE 0 -#define CACHE_REDIS_CONNECTING 1 -#define CACHE_REDIS_CONNECTED 2 -#define CACHE_REDIS_DISCONNECTED 3 - -static void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); - - if(status == REDIS_OK) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Redis disconnect %s:%d success.", - ac->c.tcp.host, ac->c.tcp.port); - } - else - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Redis disconnect %s:%d failed: %s.", - ac->c.tcp.host, ac->c.tcp.port, ac->errstr); - } - instance->redis_connecting = CACHE_REDIS_DISCONNECTED; -} - -static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status) -{ - struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); - - if(status == REDIS_OK) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "RedisCluster connect %s:%d success.", - ac->c.tcp.host, ac->c.tcp.port); - instance->redis_connecting = CACHE_REDIS_CONNECTED; - } - else - { - instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE; - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "RedisCluster connect %s:%d failed: %s.", - ac->c.tcp.host, ac->c.tcp.port, ac->errstr); - } -} - -int redis_asyn_connect_init(struct tango_cache_instance *instance) -{ - instance->redis_ac = redisClusterAsyncConnect(instance->param->redisaddrs, HIRCLUSTER_FLAG_ROUTE_USE_SLOTS); - if(instance->redis_ac == NULL) - { - return -1; - } - instance->redis_connecting = CACHE_REDIS_CONNECTING; - redisClusterLibeventAttach(instance->redis_ac, instance->evbase); - redisClusterAsyncSetConnectionData(instance->redis_ac, instance); - redisClusterAsyncSetConnectCallback(instance->redis_ac, redis_asyn_connect_cb); - redisClusterAsyncSetDisconnectCallback(instance->redis_ac, redis_asyn_disconnect_cb); - return 0; -} - -static int parse_object_meta_json(struct tango_cache_ctx *ctx, const char *jcontent) -{ - cJSON *root, *ptarget; - int ret = PARSE_JSON_RET_ERROR; - char usertag[2048]; - size_t datalen; - - if(NULL == (root=cJSON_Parse(jcontent))) - { - goto out_json; - } - if(NULL == (ptarget=cJSON_GetObjectItem(root, "Content-Length"))) - { - goto out_json; - } - ctx->get.result.tlength = ptarget->valuedouble; - if(NULL==(ptarget=cJSON_GetObjectItem(root, "X-Amz-Meta-Lm"))) - { - goto out_json; - } - ctx->get.last_modify = ptarget->valuedouble; - if(NULL==(ptarget=cJSON_GetObjectItem(root, "Expires"))) - { - goto out_json; - } - ctx->get.expires = ptarget->valuedouble; - ctx->get.need_hdrs = RESPONSE_HDR_ALL; - if(!check_expires_fresh_header(ctx)) - { - ret = PARSE_JSON_RET_TIMEOUT; - goto out_json; - } - - if(NULL!=(ptarget=cJSON_GetObjectItem(root, "Headers"))) - { - easy_string_savedata(&ctx->response, ptarget->valuestring, strlen(ptarget->valuestring)); - } - if(NULL!=(ptarget=cJSON_GetObjectItem(root, "X-Amz-Meta-User"))) - { - if((datalen = Base64_DecodeBlock((unsigned char*)ptarget->valuestring, strlen(ptarget->valuestring), (unsigned char*)usertag, 2048))>0) - { - easy_string_savedata(&ctx->get.response_tag, usertag, datalen); - } - } - cJSON_Delete(root); - return PARSE_JSON_RET_SUCC; - -out_json: - cJSON_Delete(root); - return ret; -} - -static void redis_hget_command_cb(struct redisClusterAsyncContext *ac, void *vreply, void *privdata) -{ - redisReply *reply = (redisReply *)vreply; - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata; - int ret; - - ctx->instance->statistic.session_redis -= 1; - if(reply == NULL || reply->type!=REDIS_REPLY_ARRAY) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_EXEC); - if(reply!=NULL && reply->type==REDIS_REPLY_ERROR) - { - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, reply->str); - } - else - { - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - } - tango_cache_ctx_destroy(ctx); - return; - } - else if(reply->element[0]->type == REDIS_REPLY_NIL) - { - tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS); - ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - tango_cache_ctx_destroy(ctx); - return; - } - - switch(ctx->get.state) - { - case GET_STATE_REDIS_META: - ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_HOS:OBJECT_IN_REDIS; - break; - case GET_STATE_REDIS_ALL: - ctx->get.result.location = OBJECT_IN_REDIS; - break; - - case GET_STATE_REDIS_TRY: - ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_HOS:OBJECT_IN_REDIS; - if(ctx->get.result.location == OBJECT_IN_HOS) - { - ctx->get.redis_redirect_minio_cb(ctx); - return; - } - ctx->locate = OBJECT_IN_REDIS; - break; - default: assert(0);break; - } - - ret = parse_object_meta_json(ctx, reply->element[0]->str); - switch(ret) - { - case PARSE_JSON_RET_ERROR: - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON); - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - tango_cache_ctx_destroy(ctx); - break; - case PARSE_JSON_RET_TIMEOUT: - if(ctx->get.state == GET_STATE_DELETE && ctx->get.result.location==OBJECT_IN_HOS) - { - ctx->get.state = GET_STATE_END; - cache_delete_minio_object(ctx); - } - else - { - tango_cache_ctx_destroy(ctx); - } - break; - case PARSE_JSON_RET_SUCC: - fetch_header_over_biz(ctx); - if(ctx->get.state != GET_STATE_REDIS_META) - { - ctx->get.result.data_frag = reply->element[2]->str; - ctx->get.result.size = reply->element[2]->len; - ctx->get.result.type = RESULT_TYPE_BODY; - promise_success(ctx->promise, &ctx->get.result); - } - ctx->get.result.type = RESULT_TYPE_END; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - tango_cache_ctx_destroy(ctx); - break; - default: assert(0);break; - } -} - -int tango_cache_head_redis(struct tango_cache_ctx *ctx) -{ - int ret = -1; - - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, - "HMGET %s OBJECT_META OBJECT_LOCATION", ctx->object_key); - if(ret != REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx); - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->get.state = GET_STATE_REDIS_META; - } - return ret; -} - -int tango_cache_fetch_redis(struct tango_cache_ctx *ctx) -{ - int ret = -1; - - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, - "HMGET %s OBJECT_META OBJECT_LOCATION OBJECT_BODY", ctx->object_key); - if(ret != REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx); - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->get.state = GET_STATE_REDIS_ALL; - } - return ret; -} - -int tango_cache_try_fetch_redis(struct tango_cache_ctx *ctx) -{ - int ret = -1; - - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, - "HMGET %s OBJECT_META OBJECT_LOCATION OBJECT_BODY", ctx->object_key); - if(ret != REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx); - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->get.state = GET_STATE_REDIS_TRY; - } - return ret; -} - -static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vreply, void *privdata) -{ - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata; - redisReply *reply = (redisReply *)vreply; - int ret; - - ctx->instance->statistic.session_redis -= 1; - if(reply == NULL || reply->type==REDIS_REPLY_ERROR || ac->err) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_EXEC); - } - if(ctx->fail_state) - { - if(ctx->put.state==PUT_STATE_REDIS_META || ctx->put.state==PUT_STATE_REDIS_SETEX) //����һ��������ص� - { - ctx->put.state = PUT_STATE_END; - return; - } - tango_cache_ctx_destroy(ctx, true); - return; - } - - switch(ctx->put.state) - { - case PUT_STATE_REDIS_META: - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, - "EXPIRE %s %u", ctx->object_key, ctx->put.object_ttl); - if(ret!=REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - ctx->put.state = PUT_STATE_END; - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_REDIS_SETEX; - } - break; - case PUT_STATE_REDIS_EXPIRE: - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, - "EXPIRE %s %u", ctx->object_key, ctx->put.object_ttl); - if(ret != REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx, true); - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_END; - } - break; - case PUT_STATE_REDIS_SETEX: - ctx->put.state = PUT_STATE_END; //����һ��EXPIRE��������� - break; - case PUT_STATE_END: - tango_cache_ctx_destroy(ctx, true); - break; - default: assert(0);break; - } -} - -int redis_put_minio_object_meta(struct tango_cache_ctx *ctx, bool callback) -{ - int ret_mset, ret_set; - char *meta; - - meta = cJSON_PrintUnformatted(ctx->put.object_meta); - - ret_mset = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, - "HMSET %s OBJECT_LOCATION minio OBJECT_META %s MINIO_ADDR %s", ctx->object_key, meta, ctx->hostaddr); - ret_set = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, - "SET http://%s/%s 1 EX %u", ctx->hostaddr, ctx->object_key, ctx->put.object_ttl); - if(ret_mset==REDIS_OK && ret_set==REDIS_OK) - { - ctx->instance->statistic.session_redis += 2; - ctx->put.state = PUT_STATE_REDIS_META; - } - else - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - if(ret_mset==REDIS_OK) - { - ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_REDIS_EXPIRE; //��ʱ��PUT����object��һ�� - } - else if(ret_set==REDIS_OK) - { - ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_END; - } - else - { - tango_cache_ctx_destroy(ctx, callback); - free(meta); - return -1; - } - } - free(meta); - return 0; -} - -int redis_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) -{ - int ret; - char *meta; - - ctx->instance->statistic.memory_used -= size; - meta = cJSON_PrintUnformatted(ctx->put.object_meta); - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, - "HMSET %s OBJECT_LOCATION redis OBJECT_META %s OBJECT_BODY %b", ctx->object_key, meta, data, size); - if(ret != REDIS_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx, callback); - } - else - { - ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_REDIS_EXPIRE; - } - if(way == PUT_MEM_FREE) - { - free((void *)data); - } - free(meta); - return ret; -} - -int redis_put_complete_part_evbuf(struct tango_cache_ctx *ctx, size_t object_size, bool callback) -{ - char *data; - size_t size; - - data = (char *)malloc(object_size); - size = evbuffer_remove(ctx->put.evbuf, data, object_size); - if(size != object_size) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); - tango_cache_ctx_destroy(ctx, callback); - free(data); - return CACHE_ERR_EVBUFFER; - } - return redis_put_complete_part_data(ctx, PUT_MEM_FREE, data, object_size, callback); -} - |
