summaryrefslogtreecommitdiff
path: root/cache/src/tango_cache_transfer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cache/src/tango_cache_transfer.cpp')
-rw-r--r--cache/src/tango_cache_transfer.cpp986
1 files changed, 0 insertions, 986 deletions
diff --git a/cache/src/tango_cache_transfer.cpp b/cache/src/tango_cache_transfer.cpp
deleted file mode 100644
index e8b4cfc..0000000
--- a/cache/src/tango_cache_transfer.cpp
+++ /dev/null
@@ -1,986 +0,0 @@
-#include <sys/types.h>
-#include <unistd.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <assert.h>
-#include <errno.h>
-#include <sys/time.h>
-#include <time.h>
-#include <string.h>
-#include <curl/curl.h>
-
-#include "tango_cache_transfer.h"
-#include "tango_cache_xml.h"
-#include "tango_cache_tools.h"
-#include "tango_cache_redis.h"
-
-static inline void curl_set_common_options(CURL *curl, long transfer_timeout, char *errorbuf)
-{
- curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errorbuf);
- curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
- curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
- curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
- curl_easy_setopt(curl, CURLOPT_TIMEOUT, transfer_timeout); //���Է��ֶ�������ij���ӽ��տ�ס�����
- //ctx->error="Operation too slow. Less than 1024 bytes/sec transferred the last 3 seconds"
- curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 5L);
- curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 100L);
- curl_easy_setopt(curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
-}
-
-//response body�̻ܶ򲻹���ʱ
-size_t curl_response_any_cb(void *ptr, size_t size, size_t count, void *userp)
-{
- return size*count;
-}
-
-static size_t curl_put_multipart_header_cb(void *ptr, size_t size, size_t count, void *userp)
-{
- struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
- size_t totallen = size*count;
- char *start = (char *)ptr, *end = start + totallen;
- struct multipart_etag_list *etag;
-
- if(!strncmp(start, "Etag:", totallen>5?5:totallen))
- {
- start += 5; end -= 1; totallen -= 5;
- while(totallen>0 && (*start==' ')) {start++; totallen--;}
- while(totallen>0 && (*end=='\r'||*end=='\n')) {end--; totallen--;}
- if(totallen > 0)
- {
- etag = (struct multipart_etag_list *)malloc(sizeof(struct multipart_etag_list));
- totallen = end - start + 1;
- etag->etag = (char *)malloc(totallen + 1);
- etag->part_number = ctx->put.part_index;
- memcpy(etag->etag, start, totallen);
- *(etag->etag + totallen) = '\0';
- TAILQ_INSERT_TAIL(&ctx->put.etag_head, etag, node);
- }
- }
-
- return size*count;
-}
-
-static size_t curl_put_once_send_cb(void *ptr, size_t size, size_t count, void *userp)
-{
- size_t len;
- struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
-
- if(size==0 || count==0 || ctx->put.once_request.len>=ctx->put.once_request.size)
- {
- return 0; //��һ������
- }
-
- len = ctx->put.once_request.size - ctx->put.once_request.len; //ʣ����ϴ��ij���
- if(len > size * count)
- {
- len = size * count;
- }
-
- memcpy(ptr, ctx->put.once_request.buff + ctx->put.once_request.len, len);
- ctx->put.once_request.len += len;
-
- if(ctx->put.once_request.len >= ctx->put.once_request.size)
- {
- ctx->instance->statistic.memory_used -= ctx->put.once_request.size; //δʹ��cache buffer���Լ������ڴ�����
- easy_string_destroy(&ctx->put.once_request);
- }
- return len;
-}
-
-static size_t curl_put_multipart_send_cb(void *ptr, size_t size, size_t count, void *userp)
-{
- size_t len, space=size*count, send_len;
- struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
-
- if(size==0 || count==0 || ctx->put.upload_offset>=ctx->put.upload_length)
- {
- return 0;
- }
-
- len = ctx->put.upload_length - ctx->put.upload_offset;
- if(len > space)
- {
- len = space;
- }
- send_len = evbuffer_remove(ctx->put.evbuf, ptr, len);
- assert(send_len>0);
- ctx->put.upload_offset += send_len;
- ctx->instance->statistic.memory_used -= send_len;
-
- return send_len;
-}
-
-//return value: <0:fail; =0: not exec; >0: OK
-static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool full)
-{
- UNUSED CURLMcode rc;
- char minio_url[256]={0}, buffer[256]={0};
-
- if(NULL == (ctx->curl=curl_easy_init()))
- {
- return -1;
- }
-
- ctx->put.upload_offset = 0;
- if(full)
- {
- snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key);
- }
- else
- {
- snprintf(minio_url, 256, "http://%s/%s?partNumber=%d&uploadId=%s", ctx->hostaddr, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID);
- curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
- }
- curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
- curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
- //token�ֶΣ�����hos�洢��֤
- sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
- ctx->headers = curl_slist_append(ctx->headers, buffer);
- curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
- curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
- curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.upload_length);
- curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
- curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
-
- rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
- assert(rc==CURLM_OK);
- DBG_CACHE("state: %d, length: %lu, key: %s\n", ctx->put.state, ctx->put.upload_length, ctx->object_key);
- return 1;
-}
-
-static size_t curl_response_body_save_cb(void *ptr, size_t size, size_t count, void *userp)
-{
- struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
- struct easy_string *estr = &ctx->response;
- CURLcode code;
-
- if(ctx->fail_state)
- {
- return size*count;
- }
-
- if(ctx->res_code == 0)
- {
- code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code);
- if(code != CURLE_OK || ctx->res_code!=200L)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- return size*count;
- }
- }
-
- easy_string_savedata(estr, (const char*)ptr, size*count);
- return size*count;
-}
-
-int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
-{
- UNUSED CURLMcode rc;
- char minio_url[256]={0}, buffer[256];
-
- if(NULL == (ctx->curl=curl_easy_init()))
- {
- return -1;
- }
-
- snprintf(minio_url, 256, "http://%s/%s?uploads", ctx->hostaddr, ctx->object_key);
- curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
- curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, 0); //Ĭ��ʹ�ûص���������fread�����Է��ֹر�Expectʱ�ᵼ�¿���curl_multi_socket_action
- curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
-
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
- curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
- sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
- ctx->headers = curl_slist_append(ctx->headers, buffer);
- curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
- curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
-
- rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
- assert(rc==CURLM_OK);
- DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key);
- return 1;
-}
-
-int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back)
-{
- UNUSED CURLMcode rc;
- char minio_url[256], buffer[256];
-
- ctx->instance->statistic.del_recv_num += 1;
- if(NULL == (ctx->curl=curl_easy_init()))
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- tango_cache_ctx_destroy(ctx, call_back); //�ս���
- return -1;
- }
-
- snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key);
- curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
- curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
- curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
- sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
- ctx->headers = curl_slist_append(ctx->headers, buffer);
- curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
- curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
-
- rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
- assert(rc==CURLM_OK);
- return 1;
-}
-
-//return value: true-�ɹ������¼���false-δ�����¼�
-bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx)
-{
- UNUSED CURLMcode rc;
- char minio_url[256];
-
- if(NULL == (ctx->curl=curl_easy_init()))
- {
- return false;
- }
-
- snprintf(minio_url, 256, "http://%s/%s?uploadId=%s", ctx->hostaddr, ctx->object_key, ctx->put.uploadID);
- curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
- curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
- curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
- curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
-
- rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
- assert(rc==CURLM_OK);
- return true;
-}
-
-//return value: true-�ɹ������¼���false-δ�����¼�
-bool cache_kick_combine_minio(struct tango_cache_ctx *ctx)
-{
- int len=0;
- UNUSED CURLMcode rc;
- char minio_url[256], buffer[256];
-
- if(NULL == (ctx->curl=curl_easy_init()))
- {
- return false;
- }
- construct_complete_xml(ctx, &ctx->put.combine_xml, &len);
-
- snprintf(minio_url, 256, "http://%s/%s?uploadId=%s", ctx->hostaddr, ctx->object_key, ctx->put.uploadID);
- curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
- curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
- curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
-
- curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //���Content-Length
- curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml);
- curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
-
- if(ctx->headers != NULL)
- {
- curl_slist_free_all(ctx->headers);
- ctx->headers = NULL;
- }
- ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml");
- sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
- ctx->headers = curl_slist_append(ctx->headers, buffer);
- curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
-
- rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
- assert(rc==CURLM_OK);
- DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key);
- return true;
-}
-
-//return value: true-�ɹ������¼���false-δ�����¼�
-bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len)
-{
- int ret = 1;
-
- switch(ctx->put.state)
- {
- case PUT_STATE_START:
- if(sessions_exceeds_limit(ctx->instance, OBJECT_IN_HOS))
- {
- tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION);
- return false;
- }
- ctx->put.state = PUT_STATE_WAIT_START;
- ret = curl_get_minio_uploadID(ctx);
- break;
-
- case PUT_STATE_PART:
- if(ctx->curl == NULL)
- {
- ctx->put.upload_length = block_len;
- ret = http_put_bodypart_request_evbuf(ctx, false);
- }
- break;
-
- default: break;//nothing to do
- }
-
- if(ret <= 0)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- return false;
- }
- return true;
-}
-
-//callbackֱ��ʧ���Ƿ���ûص���������ʽ��Ҫ������һ���Բ���Ҫ
-static int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx, bool callback)
-{
- int ret=-1;
-
- ctx->put.state = PUT_STATE_END;
- ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf);
- if(ctx->put.upload_length > 0)
- {
- ret = http_put_bodypart_request_evbuf(ctx, true);
- if(ret <= 0)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- tango_cache_ctx_destroy(ctx, callback);
- }
- }
- else
- {
- tango_cache_ctx_destroy(ctx, callback);
- }
- return ret;
-}
-
-int do_tango_cache_update_end(struct tango_cache_ctx *ctx, bool callback)
-{
- DBG_CACHE("state: %d, key: %s, curl %s NULL\n", ctx->put.state, ctx->object_key, (ctx->curl==NULL)?"is":"is not");
- ctx->put.close_state = true;//������״̬�����������رգ��ڲ�״̬����ת�������ٹر�
- if(ctx->fail_state)
- {
- tango_cache_ctx_destroy(ctx, callback);
- return -1;
- }
-
- switch(ctx->put.state)
- {
- case PUT_STATE_START: //��ʱ��ͬ����һ�����ϴ�
- if(sessions_exceeds_limit(ctx->instance, ctx->locate))
- {
- tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION);
- tango_cache_ctx_destroy(ctx, callback);
- return -1;
- }
- if(ctx->locate == OBJECT_IN_HOS)
- {
- return http_put_complete_part_evbuf(ctx, callback);
- }
- else
- {
- return redis_put_complete_part_evbuf(ctx, ctx->put.object_size, callback);
- }
- break;
-
- case PUT_STATE_PART:
- if(ctx->curl == NULL)
- {
- ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf);
- if(ctx->put.upload_length == 0)
- {
- if(cache_kick_combine_minio(ctx))
- {
- ctx->put.state = PUT_STATE_END;
- }
- else
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- tango_cache_ctx_destroy(ctx);
- return -1;
- }
- }
- else if(http_put_bodypart_request_evbuf(ctx, false) <= 0)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- if(cache_cancel_upload_minio(ctx))
- {
- ctx->put.state = PUT_STATE_CANCEL;
- }
- else
- {
- tango_cache_ctx_destroy(ctx);
- return -1;
- }
- }
- }
- break;
-
- case PUT_STATE_END: assert(0); //�û���������endʱ�����ܴ��ڴ�״̬
- case PUT_STATE_WAIT_START: //��ʱδ��ȡ��uploadId�������޷������ϴ�
- default: break;
- }
- return 0;
-}
-
-void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
-{
- DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key);
- switch(ctx->put.state)
- {
- case PUT_STATE_WAIT_START:
- if(res!=CURLE_OK||res_code!=200L|| ctx->fail_state || !parse_uploadID_xml(ctx->response.buff, ctx->response.len, &ctx->put.uploadID))
- {
- easy_string_destroy(&ctx->response);
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- if(ctx->put.close_state)
- {
- tango_cache_ctx_destroy(ctx);
- }
- }
- else
- {
- easy_string_destroy(&ctx->response);
- ctx->put.state = PUT_STATE_PART;
- if(ctx->put.close_state)
- {
- do_tango_cache_update_end(ctx, true);
- }
- else
- {
- size_t upload_length = evbuffer_get_length(ctx->put.evbuf);
- if(upload_length >= ctx->instance->param->upload_block_size)
- {
- cache_kick_upload_minio_multipart(ctx, upload_length);
- }
- }
- }
- break;
-
- case PUT_STATE_PART:
- if(res != CURLE_OK || res_code!=200L)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- }
- if(ctx->fail_state)
- {
- if(cache_cancel_upload_minio(ctx))
- {
- ctx->put.state = PUT_STATE_CANCEL;
- }
- else if(ctx->put.close_state)
- {
- tango_cache_ctx_destroy(ctx);
- }
- }
- else if(ctx->put.close_state)
- {
- do_tango_cache_update_end(ctx, true);
- }
- else
- {
- size_t upload_length = evbuffer_get_length(ctx->put.evbuf);
- if(upload_length >= ctx->instance->param->upload_block_size)
- {
- cache_kick_upload_minio_multipart(ctx, upload_length);
- }
- }
- break;
-
- case PUT_STATE_CANCEL: //�ȴ��ر�
- if(ctx->put.close_state)
- {
- tango_cache_ctx_destroy(ctx);
- }
- break;
-
- case PUT_STATE_END:
- if(res != CURLE_OK || res_code!=200L)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- }
- if(ctx->instance->param->object_store_way!=CACHE_ALL_HOS && !ctx->fail_state)
- {
- redis_put_minio_object_meta(ctx, true);
- }
- else
- {
- tango_cache_ctx_destroy(ctx);
- }
- break;
- default: break;
- }
-}
-
-int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback)
-{
- UNUSED CURLMcode rc;
- char minio_url[256], buffer[256];
-
- if(NULL == (ctx->curl=curl_easy_init()))
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- tango_cache_ctx_destroy(ctx, callback);
- if(way == PUT_MEM_FREE) free((void *)data);
- ctx->instance->statistic.memory_used -= size;
- return -1;
- }
- ctx->put.state = PUT_STATE_END;
-
- snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key);
- curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
- curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
- sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
- ctx->headers = curl_slist_append(ctx->headers, buffer);
- curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
- curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
-
- if(way == PUT_MEM_COPY)
- {
- ctx->put.once_request.buff = (char *)malloc(size);
- memcpy(ctx->put.once_request.buff, data, size);
- }
- else
- {
- ctx->put.once_request.buff = (char *)data;
- }
- ctx->put.once_request.size = size;
- ctx->put.once_request.len = 0;
- curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
- curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.once_request.size);
- curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_once_send_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
-
- rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
- assert(rc==CURLM_OK);
- return 0;
-}
-
-int do_tango_cache_upload_once_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback)
-{
- ctx->instance->statistic.put_recv_num += 1;
- ctx->instance->statistic.memory_used += size;
- ctx->instance->error_code = CACHE_OK;
-
- if(ctx->locate == OBJECT_IN_HOS)
- {
- return http_put_complete_part_data(ctx, way, data, size, false);
- }
- else
- {
- return redis_put_complete_part_data(ctx, way, data, size, false);
- }
-}
-
-int do_tango_cache_upload_once_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool callback)
-{
- size_t size;
-
- ctx->instance->statistic.put_recv_num += 1;
- ctx->instance->error_code = CACHE_OK;
-
- if(way == EVBUFFER_MOVE)
- {
- if(evbuffer_add_buffer(ctx->put.evbuf, evbuf))
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER);
- tango_cache_ctx_destroy(ctx, callback);
- return -1;
- }
- }
- else
- {
- if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf))
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER);
- tango_cache_ctx_destroy(ctx, callback);
- return -1;
- }
- }
- size = evbuffer_get_length(ctx->put.evbuf);
- ctx->instance->statistic.memory_used += size;
-
- if(ctx->locate == OBJECT_IN_HOS)
- {
- return http_put_complete_part_evbuf(ctx, callback);
- }
- else
- {
- return redis_put_complete_part_evbuf(ctx, size, callback);
- }
-}
-
-void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
-{
- if(res!=CURLE_OK || (res_code!=204L && res_code!=200L ))
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- }
- tango_cache_ctx_destroy(ctx);
-}
-
-void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
-{
- u_int32_t errnum=0;
-
- if(res!=CURLE_OK || (res_code!=204L && res_code!=200L ))
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- ctx->del.fail_num = ctx->del.succ_num;
- ctx->del.succ_num = 0;
- }
- else
- {
- if(!parse_multidelete_xml(ctx->response.buff, ctx->response.len, &errnum, ctx->error, CURL_ERROR_SIZE))
- {
- ctx->del.fail_num = ctx->del.succ_num;
- ctx->del.succ_num = 0;
- }
- else
- {
- ctx->del.fail_num = errnum;
- ctx->del.succ_num -= errnum;
- }
- if(ctx->del.fail_num > 0)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- }
- }
- tango_cache_ctx_destroy(ctx);
-}
-
-int do_tango_cache_multi_delete(struct tango_cache_ctx *ctx, bool callback)
-{
- UNUSED CURLMcode rc;
- char minio_url[256], buffer[256];
-
- ctx->instance->statistic.del_recv_num += ctx->del.succ_num;
- ctx->instance->error_code = CACHE_OK;
- if(NULL == (ctx->curl=curl_easy_init()))
- {
- tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
- tango_cache_ctx_destroy(ctx, callback);
- return -1;
- }
-
- snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->param->bucketname);
- curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
- curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, ctx->response.size); //���Content-Length����CURLOPT_COPYPOSTFIELDS֮ǰ����
- curl_easy_setopt(ctx->curl, CURLOPT_COPYPOSTFIELDS, ctx->response.buff);
- curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
- sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
- ctx->headers = curl_slist_append(ctx->headers, buffer);
- curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
- curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
- curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
-
- rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
- assert(rc==CURLM_OK);
- easy_string_destroy(&ctx->response);
- return 0;
-}
-
-bool fetch_header_over_biz(struct tango_cache_ctx *ctx)
-{
- if(ctx->get.need_hdrs!=RESPONSE_HDR_ALL) //��Expiresʱ
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL);
- ctx->get.state = GET_STATE_DELETE;
- promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
- return false;
- }
-
- if(ctx->get.response_tag.len > 0)
- {
- ctx->get.result.data_frag = ctx->get.response_tag.buff;
- ctx->get.result.size = ctx->get.response_tag.len;
- ctx->get.result.type = RESULT_TYPE_USERTAG;
- promise_success(ctx->promise, &ctx->get.result);
- easy_string_destroy(&ctx->get.response_tag);
- }
- if(ctx->response.len > 0)
- {
- ctx->get.result.data_frag = ctx->response.buff;
- ctx->get.result.size = ctx->response.len;
- ctx->get.result.type = RESULT_TYPE_HEADER;
- promise_success(ctx->promise, &ctx->get.result);
- easy_string_destroy(&ctx->response);
- }
- return true;
-}
-
-static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, void *userp)
-{
- struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
-
- if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE)
- {
- return size*count;
- }
-
- if(!fetch_header_over_biz(ctx))
- {
- return size*count;
- }
-
- ctx->get.result.data_frag = (const char *)ptr;
- ctx->get.result.size = size * count;
- ctx->get.result.type = RESULT_TYPE_BODY;
- promise_success(ctx->promise, &ctx->get.result);
- return size*count;
-}
-
-bool check_expires_fresh_header(struct tango_cache_ctx *ctx)
-{
- time_t now_gmt;
-
- if(ctx->get.need_hdrs != RESPONSE_HDR_ALL)
- return true;
-
- now_gmt = get_gmtime_timestamp(time(NULL));
-
- if(now_gmt > ctx->get.expires)
- {
- tango_cache_set_fail_state(ctx, CACHE_TIMEOUT);
- ctx->get.state = GET_STATE_DELETE; //����ʧЧʱ���������ʱ����ɾ������
- ctx->get.result.type = RESULT_TYPE_MISS;
- promise_success(ctx->promise, &ctx->get.result);
- promise_finish(ctx->promise);
- easy_string_destroy(&ctx->response);
- return false;
- }
-
- if(ctx->get.last_modify+ctx->get.max_age > now_gmt || now_gmt+ctx->get.min_fresh>ctx->get.expires)
- {
- tango_cache_set_fail_state(ctx, CACHE_TIMEOUT);
- ctx->get.result.type = RESULT_TYPE_MISS;
- promise_success(ctx->promise, &ctx->get.result);
- promise_finish(ctx->promise);
- easy_string_destroy(&ctx->response);
- return false;
- }
- return true;
-}
-
-static bool check_get_result_code(struct tango_cache_ctx *ctx, CURLcode code, long res_code)
-{
- if(code != CURLE_OK)
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
- promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
- return false;
- }
-
- if(res_code != 200L)
- {
- if(res_code == 404L)
- {
- tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS);
- ctx->get.result.type = RESULT_TYPE_MISS;
- promise_success(ctx->promise, &ctx->get.result);
- promise_finish(ctx->promise);
- }
- else
- {
- tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL);
- promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
- }
- return false;
- }
- return true;
-}
-
-static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, void *userp)
-{
- struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
- char *start=(char *)ptr, *pos_colon;
- size_t raw_len = size*count, hdrlen=size*count;
- char usertag[2048];
- size_t datalen;
-
- if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE)
- {
- return raw_len;
- }
- if(ctx->res_code == 0) //�״�Ӧ��ʱ�ȿ�Ӧ�����Ƿ���200
- {
- UNUSED CURLcode code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code);
- if(!check_get_result_code(ctx, code, ctx->res_code))
- {
- return raw_len;
- }
- ctx->get.result.location = OBJECT_IN_HOS;
- }
- pos_colon = (char*)memchr(start, ':', raw_len);
- if(pos_colon == NULL)
- {
- return raw_len;
- }
-
- datalen = pos_colon - start;
- switch(datalen)
- {
- case 7:
- if(strcmp_one_word_mesa_equal_len("expires", "EXPIRES", start, 7))
- {
- ctx->get.need_hdrs |= RESPONSE_HDR_EXPIRES;
- ctx->get.expires = expires_hdr2timestamp(pos_colon + 1, raw_len - datalen - 1);
- if(!check_expires_fresh_header(ctx))
- {
- return raw_len;
- }
- }
- break;
- case 13:
- if(strcmp_one_word_mesa_equal_len("x-amz-meta-lm", "X-AMZ-META-LM", start, 13))
- {
- ctx->get.need_hdrs |= RESPONSE_HDR_LAST_MOD;
- sscanf(pos_colon+1, "%lu", &ctx->get.last_modify);
- if(!check_expires_fresh_header(ctx))
- {
- return raw_len;
- }
- }
- break;
- case 15:
- if(strcmp_one_word_mesa_equal_len("x-amz-meta-user", "X-AMZ-META-USER", start, 15))
- {
- if((hdrlen = Base64_DecodeBlock((unsigned char*)pos_colon+1, raw_len-datalen-1, (unsigned char*)usertag, 2048))>0)
- {
- easy_string_savedata(&ctx->get.response_tag, usertag, hdrlen);
- }
- }
- break;
- case 14:
- if(strcmp_one_word_mesa_equal_len("content-length", "CONTENT-LENGTH", start, 14))
- {
- sscanf(pos_colon+1, "%lu", &ctx->get.result.tlength);
- }
- break;
- case 11: if(strcmp_one_word_mesa_equal_len("content-md5", "CONTENT-MD5", start, 11)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
- case 12: if(strcmp_one_word_mesa_equal_len("content-type", "CONTENT-TYPE", start, 12)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
- case 16: if(strcmp_one_word_mesa_equal_len("content-encoding", "CONTENT-ENCODING", start, 16)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
- case 19: if(strcmp_one_word_mesa_equal_len("content-disposition", "CONTENT-DISPOSITION", start, 19)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
- default: break;
- }
- return raw_len;
-}
-
-void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
-{
- switch(ctx->get.state)
- {
- case GET_STATE_START:
- if(!ctx->fail_state && check_get_result_code(ctx, res, res_code))
- {
- if(ctx->method!=CACHE_REQUEST_HEAD || fetch_header_over_biz(ctx)) //HEAD���ֵ��ֶβ�ȫ�Ȳ�ɾ������������ޣ�
- {
- ctx->get.result.type = RESULT_TYPE_END;
- promise_success(ctx->promise, &ctx->get.result);
- promise_finish(ctx->promise);
- }
- }
- tango_cache_ctx_destroy(ctx);
- break;
-
- case GET_STATE_DELETE:
- ctx->get.state = GET_STATE_END;
- cache_delete_minio_object(ctx);
- break;
-
- case GET_STATE_END:
- tango_cache_ctx_destroy(ctx);
- break;
- default: assert(0);break;
- }
-}
-
-static int tango_cache_fetch_minio(struct tango_cache_ctx *ctx)
-{
- UNUSED CURLMcode rc;
- char minio_url[256], buffer[256];
-
- if(NULL == (ctx->curl=curl_easy_init()))
- {
- tango_cache_ctx_destroy(ctx);
- return -1;
- }
-
- snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key);
- curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
- if(ctx->method == CACHE_REQUEST_HEAD)
- {
- curl_easy_setopt(ctx->curl, CURLOPT_NOBODY, 1L);
- }
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_get_response_body_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
- curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
- curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb);
- curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
- sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
- ctx->headers = curl_slist_append(ctx->headers, buffer);
- curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
- curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
-
- rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
- assert(rc==CURLM_OK);
- return 1;
-}
-
-static void redis_redirect_object2minio_cb(struct tango_cache_ctx *ctx)
-{
- struct promise *p = ctx->promise;
-
- ctx->get.state = GET_STATE_START;
- ctx->locate = OBJECT_IN_HOS;
- if(ctx->instance->statistic.session_http>=ctx->instance->param->maximum_sessions)
- {
- tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
- promise_failed(p, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
- tango_cache_ctx_destroy(ctx);
- }
- else if(tango_cache_fetch_minio(ctx) != 1)
- {
- promise_failed(p, FUTURE_ERROR_CANCEL, "tango_cache_fetch_minio failed");
- }
-}
-
-int do_tango_cache_fetch_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_get)
-{
- ctx->instance->statistic.get_recv_num += 1;
- switch(where_to_get)
- {
- case OBJECT_IN_HOS:
- ctx->locate = OBJECT_IN_HOS;
- return (tango_cache_fetch_minio(ctx)==1)?0:-2;
- case OBJECT_IN_REDIS:
- ctx->locate = OBJECT_IN_REDIS;
- return tango_cache_fetch_redis(ctx);
- default:
- ctx->get.redis_redirect_minio_cb = redis_redirect_object2minio_cb;
- return tango_cache_try_fetch_redis(ctx);
- }
- return 0;
-}
-
-int do_tango_cache_head_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_head)
-{
- ctx->instance->statistic.get_recv_num += 1;
- if(where_to_head == OBJECT_IN_REDIS)
- {
- return tango_cache_head_redis(ctx);
- }
- else
- {
- return (tango_cache_fetch_minio(ctx)==1)?0:-1;
- }
-}
-