diff options
Diffstat (limited to 'cache/src/tango_cache_transfer.cpp')
| -rw-r--r-- | cache/src/tango_cache_transfer.cpp | 986 |
1 files changed, 0 insertions, 986 deletions
diff --git a/cache/src/tango_cache_transfer.cpp b/cache/src/tango_cache_transfer.cpp deleted file mode 100644 index e8b4cfc..0000000 --- a/cache/src/tango_cache_transfer.cpp +++ /dev/null @@ -1,986 +0,0 @@ -#include <sys/types.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <assert.h> -#include <errno.h> -#include <sys/time.h> -#include <time.h> -#include <string.h> -#include <curl/curl.h> - -#include "tango_cache_transfer.h" -#include "tango_cache_xml.h" -#include "tango_cache_tools.h" -#include "tango_cache_redis.h" - -static inline void curl_set_common_options(CURL *curl, long transfer_timeout, char *errorbuf) -{ - curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errorbuf); - curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L); - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); - curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, transfer_timeout); //���Է��ֶ�������ij���ӽ��տ�ס����� - //ctx->error="Operation too slow. Less than 1024 bytes/sec transferred the last 3 seconds" - curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 5L); - curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 100L); - curl_easy_setopt(curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache"); -} - -//response body�̻ܶ���ʱ -size_t curl_response_any_cb(void *ptr, size_t size, size_t count, void *userp) -{ - return size*count; -} - -static size_t curl_put_multipart_header_cb(void *ptr, size_t size, size_t count, void *userp) -{ - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - size_t totallen = size*count; - char *start = (char *)ptr, *end = start + totallen; - struct multipart_etag_list *etag; - - if(!strncmp(start, "Etag:", totallen>5?5:totallen)) - { - start += 5; end -= 1; totallen -= 5; - while(totallen>0 && (*start==' ')) {start++; totallen--;} - while(totallen>0 && (*end=='\r'||*end=='\n')) {end--; totallen--;} - if(totallen > 0) - { - etag = (struct multipart_etag_list *)malloc(sizeof(struct multipart_etag_list)); - totallen = end - start + 1; - etag->etag = (char *)malloc(totallen + 1); - etag->part_number = ctx->put.part_index; - memcpy(etag->etag, start, totallen); - *(etag->etag + totallen) = '\0'; - TAILQ_INSERT_TAIL(&ctx->put.etag_head, etag, node); - } - } - - return size*count; -} - -static size_t curl_put_once_send_cb(void *ptr, size_t size, size_t count, void *userp) -{ - size_t len; - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - - if(size==0 || count==0 || ctx->put.once_request.len>=ctx->put.once_request.size) - { - return 0; //��һ������ - } - - len = ctx->put.once_request.size - ctx->put.once_request.len; //ʣ����ϴ��ij��� - if(len > size * count) - { - len = size * count; - } - - memcpy(ptr, ctx->put.once_request.buff + ctx->put.once_request.len, len); - ctx->put.once_request.len += len; - - if(ctx->put.once_request.len >= ctx->put.once_request.size) - { - ctx->instance->statistic.memory_used -= ctx->put.once_request.size; //δʹ��cache buffer���Լ������ڴ����� - easy_string_destroy(&ctx->put.once_request); - } - return len; -} - -static size_t curl_put_multipart_send_cb(void *ptr, size_t size, size_t count, void *userp) -{ - size_t len, space=size*count, send_len; - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - - if(size==0 || count==0 || ctx->put.upload_offset>=ctx->put.upload_length) - { - return 0; - } - - len = ctx->put.upload_length - ctx->put.upload_offset; - if(len > space) - { - len = space; - } - send_len = evbuffer_remove(ctx->put.evbuf, ptr, len); - assert(send_len>0); - ctx->put.upload_offset += send_len; - ctx->instance->statistic.memory_used -= send_len; - - return send_len; -} - -//return value: <0:fail; =0: not exec; >0: OK -static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool full) -{ - UNUSED CURLMcode rc; - char minio_url[256]={0}, buffer[256]={0}; - - if(NULL == (ctx->curl=curl_easy_init())) - { - return -1; - } - - ctx->put.upload_offset = 0; - if(full) - { - snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key); - } - else - { - snprintf(minio_url, 256, "http://%s/%s?partNumber=%d&uploadId=%s", ctx->hostaddr, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID); - curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb); - curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); - } - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - //token�ֶΣ�����hos�洢��֤ - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.upload_length); - curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb); - curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - DBG_CACHE("state: %d, length: %lu, key: %s\n", ctx->put.state, ctx->put.upload_length, ctx->object_key); - return 1; -} - -static size_t curl_response_body_save_cb(void *ptr, size_t size, size_t count, void *userp) -{ - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - struct easy_string *estr = &ctx->response; - CURLcode code; - - if(ctx->fail_state) - { - return size*count; - } - - if(ctx->res_code == 0) - { - code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code); - if(code != CURLE_OK || ctx->res_code!=200L) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - return size*count; - } - } - - easy_string_savedata(estr, (const char*)ptr, size*count); - return size*count; -} - -int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) -{ - UNUSED CURLMcode rc; - char minio_url[256]={0}, buffer[256]; - - if(NULL == (ctx->curl=curl_easy_init())) - { - return -1; - } - - snprintf(minio_url, 256, "http://%s/%s?uploads", ctx->hostaddr, ctx->object_key); - curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, 0); //Ĭ��ʹ�ûص���������fread�����Է��ֹر�Expectʱ�ᵼ�¿���curl_multi_socket_action - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key); - return 1; -} - -int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back) -{ - UNUSED CURLMcode rc; - char minio_url[256], buffer[256]; - - ctx->instance->statistic.del_recv_num += 1; - if(NULL == (ctx->curl=curl_easy_init())) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - tango_cache_ctx_destroy(ctx, call_back); //�ս��� - return -1; - } - - snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key); - curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE"); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - return 1; -} - -//return value: true-�ɹ������¼���false-δ�����¼� -bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx) -{ - UNUSED CURLMcode rc; - char minio_url[256]; - - if(NULL == (ctx->curl=curl_easy_init())) - { - return false; - } - - snprintf(minio_url, 256, "http://%s/%s?uploadId=%s", ctx->hostaddr, ctx->object_key, ctx->put.uploadID); - curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE"); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - return true; -} - -//return value: true-�ɹ������¼���false-δ�����¼� -bool cache_kick_combine_minio(struct tango_cache_ctx *ctx) -{ - int len=0; - UNUSED CURLMcode rc; - char minio_url[256], buffer[256]; - - if(NULL == (ctx->curl=curl_easy_init())) - { - return false; - } - construct_complete_xml(ctx, &ctx->put.combine_xml, &len); - - snprintf(minio_url, 256, "http://%s/%s?uploadId=%s", ctx->hostaddr, ctx->object_key, ctx->put.uploadID); - curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - - curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //���Content-Length - curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - if(ctx->headers != NULL) - { - curl_slist_free_all(ctx->headers); - ctx->headers = NULL; - } - ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml"); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key); - return true; -} - -//return value: true-�ɹ������¼���false-δ�����¼� -bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len) -{ - int ret = 1; - - switch(ctx->put.state) - { - case PUT_STATE_START: - if(sessions_exceeds_limit(ctx->instance, OBJECT_IN_HOS)) - { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION); - return false; - } - ctx->put.state = PUT_STATE_WAIT_START; - ret = curl_get_minio_uploadID(ctx); - break; - - case PUT_STATE_PART: - if(ctx->curl == NULL) - { - ctx->put.upload_length = block_len; - ret = http_put_bodypart_request_evbuf(ctx, false); - } - break; - - default: break;//nothing to do - } - - if(ret <= 0) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - return false; - } - return true; -} - -//callbackֱ��ʧ���Ƿ���ûص���������ʽ��Ҫ������һ���Բ���Ҫ -static int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx, bool callback) -{ - int ret=-1; - - ctx->put.state = PUT_STATE_END; - ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf); - if(ctx->put.upload_length > 0) - { - ret = http_put_bodypart_request_evbuf(ctx, true); - if(ret <= 0) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - tango_cache_ctx_destroy(ctx, callback); - } - } - else - { - tango_cache_ctx_destroy(ctx, callback); - } - return ret; -} - -int do_tango_cache_update_end(struct tango_cache_ctx *ctx, bool callback) -{ - DBG_CACHE("state: %d, key: %s, curl %s NULL\n", ctx->put.state, ctx->object_key, (ctx->curl==NULL)?"is":"is not"); - ctx->put.close_state = true;//������״̬�����������رգ��ڲ�״̬����ת�������ٹر� - if(ctx->fail_state) - { - tango_cache_ctx_destroy(ctx, callback); - return -1; - } - - switch(ctx->put.state) - { - case PUT_STATE_START: //��ʱ��ͬ����һ�����ϴ� - if(sessions_exceeds_limit(ctx->instance, ctx->locate)) - { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION); - tango_cache_ctx_destroy(ctx, callback); - return -1; - } - if(ctx->locate == OBJECT_IN_HOS) - { - return http_put_complete_part_evbuf(ctx, callback); - } - else - { - return redis_put_complete_part_evbuf(ctx, ctx->put.object_size, callback); - } - break; - - case PUT_STATE_PART: - if(ctx->curl == NULL) - { - ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf); - if(ctx->put.upload_length == 0) - { - if(cache_kick_combine_minio(ctx)) - { - ctx->put.state = PUT_STATE_END; - } - else - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - tango_cache_ctx_destroy(ctx); - return -1; - } - } - else if(http_put_bodypart_request_evbuf(ctx, false) <= 0) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - if(cache_cancel_upload_minio(ctx)) - { - ctx->put.state = PUT_STATE_CANCEL; - } - else - { - tango_cache_ctx_destroy(ctx); - return -1; - } - } - } - break; - - case PUT_STATE_END: assert(0); //�û���������endʱ�����ܴ��ڴ�״̬ - case PUT_STATE_WAIT_START: //��ʱδ��ȡ��uploadId�������������ϴ� - default: break; - } - return 0; -} - -void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) -{ - DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key); - switch(ctx->put.state) - { - case PUT_STATE_WAIT_START: - if(res!=CURLE_OK||res_code!=200L|| ctx->fail_state || !parse_uploadID_xml(ctx->response.buff, ctx->response.len, &ctx->put.uploadID)) - { - easy_string_destroy(&ctx->response); - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - if(ctx->put.close_state) - { - tango_cache_ctx_destroy(ctx); - } - } - else - { - easy_string_destroy(&ctx->response); - ctx->put.state = PUT_STATE_PART; - if(ctx->put.close_state) - { - do_tango_cache_update_end(ctx, true); - } - else - { - size_t upload_length = evbuffer_get_length(ctx->put.evbuf); - if(upload_length >= ctx->instance->param->upload_block_size) - { - cache_kick_upload_minio_multipart(ctx, upload_length); - } - } - } - break; - - case PUT_STATE_PART: - if(res != CURLE_OK || res_code!=200L) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - } - if(ctx->fail_state) - { - if(cache_cancel_upload_minio(ctx)) - { - ctx->put.state = PUT_STATE_CANCEL; - } - else if(ctx->put.close_state) - { - tango_cache_ctx_destroy(ctx); - } - } - else if(ctx->put.close_state) - { - do_tango_cache_update_end(ctx, true); - } - else - { - size_t upload_length = evbuffer_get_length(ctx->put.evbuf); - if(upload_length >= ctx->instance->param->upload_block_size) - { - cache_kick_upload_minio_multipart(ctx, upload_length); - } - } - break; - - case PUT_STATE_CANCEL: //�ȴ��ر� - if(ctx->put.close_state) - { - tango_cache_ctx_destroy(ctx); - } - break; - - case PUT_STATE_END: - if(res != CURLE_OK || res_code!=200L) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - } - if(ctx->instance->param->object_store_way!=CACHE_ALL_HOS && !ctx->fail_state) - { - redis_put_minio_object_meta(ctx, true); - } - else - { - tango_cache_ctx_destroy(ctx); - } - break; - default: break; - } -} - -int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) -{ - UNUSED CURLMcode rc; - char minio_url[256], buffer[256]; - - if(NULL == (ctx->curl=curl_easy_init())) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - tango_cache_ctx_destroy(ctx, callback); - if(way == PUT_MEM_FREE) free((void *)data); - ctx->instance->statistic.memory_used -= size; - return -1; - } - ctx->put.state = PUT_STATE_END; - - snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - if(way == PUT_MEM_COPY) - { - ctx->put.once_request.buff = (char *)malloc(size); - memcpy(ctx->put.once_request.buff, data, size); - } - else - { - ctx->put.once_request.buff = (char *)data; - } - ctx->put.once_request.size = size; - ctx->put.once_request.len = 0; - curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.once_request.size); - curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_once_send_cb); - curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - return 0; -} - -int do_tango_cache_upload_once_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) -{ - ctx->instance->statistic.put_recv_num += 1; - ctx->instance->statistic.memory_used += size; - ctx->instance->error_code = CACHE_OK; - - if(ctx->locate == OBJECT_IN_HOS) - { - return http_put_complete_part_data(ctx, way, data, size, false); - } - else - { - return redis_put_complete_part_data(ctx, way, data, size, false); - } -} - -int do_tango_cache_upload_once_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool callback) -{ - size_t size; - - ctx->instance->statistic.put_recv_num += 1; - ctx->instance->error_code = CACHE_OK; - - if(way == EVBUFFER_MOVE) - { - if(evbuffer_add_buffer(ctx->put.evbuf, evbuf)) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); - tango_cache_ctx_destroy(ctx, callback); - return -1; - } - } - else - { - if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf)) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER); - tango_cache_ctx_destroy(ctx, callback); - return -1; - } - } - size = evbuffer_get_length(ctx->put.evbuf); - ctx->instance->statistic.memory_used += size; - - if(ctx->locate == OBJECT_IN_HOS) - { - return http_put_complete_part_evbuf(ctx, callback); - } - else - { - return redis_put_complete_part_evbuf(ctx, size, callback); - } -} - -void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) -{ - if(res!=CURLE_OK || (res_code!=204L && res_code!=200L )) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - } - tango_cache_ctx_destroy(ctx); -} - -void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) -{ - u_int32_t errnum=0; - - if(res!=CURLE_OK || (res_code!=204L && res_code!=200L )) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - ctx->del.fail_num = ctx->del.succ_num; - ctx->del.succ_num = 0; - } - else - { - if(!parse_multidelete_xml(ctx->response.buff, ctx->response.len, &errnum, ctx->error, CURL_ERROR_SIZE)) - { - ctx->del.fail_num = ctx->del.succ_num; - ctx->del.succ_num = 0; - } - else - { - ctx->del.fail_num = errnum; - ctx->del.succ_num -= errnum; - } - if(ctx->del.fail_num > 0) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - } - } - tango_cache_ctx_destroy(ctx); -} - -int do_tango_cache_multi_delete(struct tango_cache_ctx *ctx, bool callback) -{ - UNUSED CURLMcode rc; - char minio_url[256], buffer[256]; - - ctx->instance->statistic.del_recv_num += ctx->del.succ_num; - ctx->instance->error_code = CACHE_OK; - if(NULL == (ctx->curl=curl_easy_init())) - { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); - tango_cache_ctx_destroy(ctx, callback); - return -1; - } - - snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->param->bucketname); - curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, ctx->response.size); //���Content-Length����CURLOPT_COPYPOSTFIELDS֮ǰ���� - curl_easy_setopt(ctx->curl, CURLOPT_COPYPOSTFIELDS, ctx->response.buff); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - easy_string_destroy(&ctx->response); - return 0; -} - -bool fetch_header_over_biz(struct tango_cache_ctx *ctx) -{ - if(ctx->get.need_hdrs!=RESPONSE_HDR_ALL) //��Expiresʱ - { - tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL); - ctx->get.state = GET_STATE_DELETE; - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - return false; - } - - if(ctx->get.response_tag.len > 0) - { - ctx->get.result.data_frag = ctx->get.response_tag.buff; - ctx->get.result.size = ctx->get.response_tag.len; - ctx->get.result.type = RESULT_TYPE_USERTAG; - promise_success(ctx->promise, &ctx->get.result); - easy_string_destroy(&ctx->get.response_tag); - } - if(ctx->response.len > 0) - { - ctx->get.result.data_frag = ctx->response.buff; - ctx->get.result.size = ctx->response.len; - ctx->get.result.type = RESULT_TYPE_HEADER; - promise_success(ctx->promise, &ctx->get.result); - easy_string_destroy(&ctx->response); - } - return true; -} - -static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, void *userp) -{ - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - - if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE) - { - return size*count; - } - - if(!fetch_header_over_biz(ctx)) - { - return size*count; - } - - ctx->get.result.data_frag = (const char *)ptr; - ctx->get.result.size = size * count; - ctx->get.result.type = RESULT_TYPE_BODY; - promise_success(ctx->promise, &ctx->get.result); - return size*count; -} - -bool check_expires_fresh_header(struct tango_cache_ctx *ctx) -{ - time_t now_gmt; - - if(ctx->get.need_hdrs != RESPONSE_HDR_ALL) - return true; - - now_gmt = get_gmtime_timestamp(time(NULL)); - - if(now_gmt > ctx->get.expires) - { - tango_cache_set_fail_state(ctx, CACHE_TIMEOUT); - ctx->get.state = GET_STATE_DELETE; //����ʧЧʱ���������ʱ����ɾ������ - ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - easy_string_destroy(&ctx->response); - return false; - } - - if(ctx->get.last_modify+ctx->get.max_age > now_gmt || now_gmt+ctx->get.min_fresh>ctx->get.expires) - { - tango_cache_set_fail_state(ctx, CACHE_TIMEOUT); - ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - easy_string_destroy(&ctx->response); - return false; - } - return true; -} - -static bool check_get_result_code(struct tango_cache_ctx *ctx, CURLcode code, long res_code) -{ - if(code != CURLE_OK) - { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - return false; - } - - if(res_code != 200L) - { - if(res_code == 404L) - { - tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS); - ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - } - else - { - tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL); - promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - } - return false; - } - return true; -} - -static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, void *userp) -{ - struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - char *start=(char *)ptr, *pos_colon; - size_t raw_len = size*count, hdrlen=size*count; - char usertag[2048]; - size_t datalen; - - if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE) - { - return raw_len; - } - if(ctx->res_code == 0) //�״�Ӧ��ʱ�ȿ�Ӧ�����Ƿ���200 - { - UNUSED CURLcode code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code); - if(!check_get_result_code(ctx, code, ctx->res_code)) - { - return raw_len; - } - ctx->get.result.location = OBJECT_IN_HOS; - } - pos_colon = (char*)memchr(start, ':', raw_len); - if(pos_colon == NULL) - { - return raw_len; - } - - datalen = pos_colon - start; - switch(datalen) - { - case 7: - if(strcmp_one_word_mesa_equal_len("expires", "EXPIRES", start, 7)) - { - ctx->get.need_hdrs |= RESPONSE_HDR_EXPIRES; - ctx->get.expires = expires_hdr2timestamp(pos_colon + 1, raw_len - datalen - 1); - if(!check_expires_fresh_header(ctx)) - { - return raw_len; - } - } - break; - case 13: - if(strcmp_one_word_mesa_equal_len("x-amz-meta-lm", "X-AMZ-META-LM", start, 13)) - { - ctx->get.need_hdrs |= RESPONSE_HDR_LAST_MOD; - sscanf(pos_colon+1, "%lu", &ctx->get.last_modify); - if(!check_expires_fresh_header(ctx)) - { - return raw_len; - } - } - break; - case 15: - if(strcmp_one_word_mesa_equal_len("x-amz-meta-user", "X-AMZ-META-USER", start, 15)) - { - if((hdrlen = Base64_DecodeBlock((unsigned char*)pos_colon+1, raw_len-datalen-1, (unsigned char*)usertag, 2048))>0) - { - easy_string_savedata(&ctx->get.response_tag, usertag, hdrlen); - } - } - break; - case 14: - if(strcmp_one_word_mesa_equal_len("content-length", "CONTENT-LENGTH", start, 14)) - { - sscanf(pos_colon+1, "%lu", &ctx->get.result.tlength); - } - break; - case 11: if(strcmp_one_word_mesa_equal_len("content-md5", "CONTENT-MD5", start, 11)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - case 12: if(strcmp_one_word_mesa_equal_len("content-type", "CONTENT-TYPE", start, 12)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - case 16: if(strcmp_one_word_mesa_equal_len("content-encoding", "CONTENT-ENCODING", start, 16)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - case 19: if(strcmp_one_word_mesa_equal_len("content-disposition", "CONTENT-DISPOSITION", start, 19)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - default: break; - } - return raw_len; -} - -void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) -{ - switch(ctx->get.state) - { - case GET_STATE_START: - if(!ctx->fail_state && check_get_result_code(ctx, res, res_code)) - { - if(ctx->method!=CACHE_REQUEST_HEAD || fetch_header_over_biz(ctx)) //HEAD���ֵ��ֶβ�ȫ�Ȳ�ɾ������������ޣ� - { - ctx->get.result.type = RESULT_TYPE_END; - promise_success(ctx->promise, &ctx->get.result); - promise_finish(ctx->promise); - } - } - tango_cache_ctx_destroy(ctx); - break; - - case GET_STATE_DELETE: - ctx->get.state = GET_STATE_END; - cache_delete_minio_object(ctx); - break; - - case GET_STATE_END: - tango_cache_ctx_destroy(ctx); - break; - default: assert(0);break; - } -} - -static int tango_cache_fetch_minio(struct tango_cache_ctx *ctx) -{ - UNUSED CURLMcode rc; - char minio_url[256], buffer[256]; - - if(NULL == (ctx->curl=curl_easy_init())) - { - tango_cache_ctx_destroy(ctx); - return -1; - } - - snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key); - curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); - if(ctx->method == CACHE_REQUEST_HEAD) - { - curl_easy_setopt(ctx->curl, CURLOPT_NOBODY, 1L); - } - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_get_response_body_cb); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb); - curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); - sprintf(buffer, "token: %s", ctx->instance->param->cache_token); - ctx->headers = curl_slist_append(ctx->headers, buffer); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); - - rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); - assert(rc==CURLM_OK); - return 1; -} - -static void redis_redirect_object2minio_cb(struct tango_cache_ctx *ctx) -{ - struct promise *p = ctx->promise; - - ctx->get.state = GET_STATE_START; - ctx->locate = OBJECT_IN_HOS; - if(ctx->instance->statistic.session_http>=ctx->instance->param->maximum_sessions) - { - tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); - promise_failed(p, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); - tango_cache_ctx_destroy(ctx); - } - else if(tango_cache_fetch_minio(ctx) != 1) - { - promise_failed(p, FUTURE_ERROR_CANCEL, "tango_cache_fetch_minio failed"); - } -} - -int do_tango_cache_fetch_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_get) -{ - ctx->instance->statistic.get_recv_num += 1; - switch(where_to_get) - { - case OBJECT_IN_HOS: - ctx->locate = OBJECT_IN_HOS; - return (tango_cache_fetch_minio(ctx)==1)?0:-2; - case OBJECT_IN_REDIS: - ctx->locate = OBJECT_IN_REDIS; - return tango_cache_fetch_redis(ctx); - default: - ctx->get.redis_redirect_minio_cb = redis_redirect_object2minio_cb; - return tango_cache_try_fetch_redis(ctx); - } - return 0; -} - -int do_tango_cache_head_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_head) -{ - ctx->instance->statistic.get_recv_num += 1; - if(where_to_head == OBJECT_IN_REDIS) - { - return tango_cache_head_redis(ctx); - } - else - { - return (tango_cache_fetch_minio(ctx)==1)?0:-1; - } -} - |
