diff options
| author | 张成伟 <[email protected]> | 2021-08-25 10:41:31 +0000 |
|---|---|---|
| committer | 张成伟 <[email protected]> | 2021-08-25 10:41:31 +0000 |
| commit | 39a4172bd7208f271d73d755b3ac410e4db062f6 (patch) | |
| tree | 699c72e52f5a4f0a4251d79c2d23df8bef8ea58d /client | |
| parent | 67bafbefc972158f9ddd4fb9e45dc76ff2c8a540 (diff) | |
| parent | 1aca701f127e94a7fbf60a996328083fed665f56 (diff) | |
Merge branch 'develop-http-producer' into 'master'
增加HTTP Post上传配置接口,支持主从双机备份与同步(冷备)
See merge request doris/doris_dispatch!1
Diffstat (limited to 'client')
| -rw-r--r-- | client/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | client/doris_client_fetch.cpp | 169 | ||||
| -rw-r--r-- | client/doris_client_fetch.h | 21 | ||||
| -rw-r--r-- | client/doris_client_http.cpp | 83 | ||||
| -rw-r--r-- | client/doris_client_http.h | 12 | ||||
| -rw-r--r-- | client/doris_client_produce.cpp | 518 | ||||
| -rw-r--r-- | client/doris_client_produce.h | 61 | ||||
| -rw-r--r-- | client/doris_client_transfer.cpp | 183 | ||||
| -rw-r--r-- | client/doris_client_transfer.h | 4 |
9 files changed, 967 insertions, 86 deletions
diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt index 9c13e61..1a8eb2a 100644 --- a/client/CMakeLists.txt +++ b/client/CMakeLists.txt @@ -1,4 +1,4 @@ -set (DORIS_CLIENT_SRC doris_client_fetch.cpp doris_client_http.cpp doris_client_transfer.cpp nirvana_conhash.cpp nirvana_murmurhash.cpp) +set (DORIS_CLIENT_SRC doris_client_fetch.cpp doris_client_produce.cpp doris_client_http.cpp doris_client_transfer.cpp nirvana_conhash.cpp nirvana_murmurhash.cpp) add_definitions(-fPIC -Wall -g) diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp index 9e1d9ef..8827463 100644 --- a/client/doris_client_fetch.cpp +++ b/client/doris_client_fetch.cpp @@ -35,29 +35,6 @@ static int doris_md5_final_string(MD5_CTX *c, char *result, unsigned int size) return 0; } -void easy_string_destroy(struct easy_string *estr) -{ - if(estr->buff != NULL) - { - free(estr->buff); - estr->buff = NULL; - estr->len = estr->size = 0; - } -} - -void easy_string_savedata(struct easy_string *estr, const char *data, size_t len) -{ - if(estr->size-estr->len < len+1) - { - estr->size += len*4+1; - estr->buff = (char*)realloc(estr->buff, estr->size); - } - - memcpy(estr->buff+estr->len, data, len); - estr->len += len; - estr->buff[estr->len]='\0'; -} - void doris_confile_ctx_reset(struct doris_confile_ctx *ctx) { //�������ֲ��� @@ -75,12 +52,12 @@ void doris_confile_ctx_destry(struct doris_confile_ctx *ctx) ctx->httpctx = NULL; } -void doris_update_new_version(struct doris_instance *instance) +void doris_update_new_version(struct doris_csum_instance *instance) { instance->cur_version = instance->new_version; } -void doris_request_restart_timer(struct doris_instance *instance, time_t wait_s) +void doris_request_restart_timer(struct doris_csum_instance *instance, time_t wait_s) { struct timeval tv; @@ -89,7 +66,7 @@ void doris_request_restart_timer(struct doris_instance *instance, time_t wait_s) event_add(&instance->timer_fetch, &tv); } -void doris_fetch_next_confile_meta(struct doris_instance *instance) +void doris_fetch_next_confile_meta(struct doris_csum_instance *instance) { cJSON *cur_a_item, *sub; @@ -125,7 +102,7 @@ void doris_fetch_next_confile_meta(struct doris_instance *instance) void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp) { - struct doris_instance *instance = (struct doris_instance *)userp; + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; const char *pos_colon; size_t datalen; char buffer[64]; @@ -180,7 +157,7 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp) { - struct doris_instance *instance = (struct doris_instance *)userp; + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; if(code!=CURLE_OK || (instance->ctx.res_code!=200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206)) { @@ -193,10 +170,10 @@ void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, lo instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes; } -void doris_http_fetch_confile(struct doris_instance *instance); +void doris_http_fetch_confile(struct doris_csum_instance *instance); void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, void *userp) { - struct doris_instance *instance = (struct doris_instance *)userp; + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; char md5buffer[64]; bool direct_fail=false; @@ -284,7 +261,7 @@ out_md5: doris_request_restart_timer(instance, instance->param->retry_interval); } -void doris_http_fetch_confile(struct doris_instance *instance) +void doris_http_fetch_confile(struct doris_csum_instance *instance) { struct doris_http_callback curlcbs; char metauri[128], range[64]={0}; @@ -319,7 +296,7 @@ void doris_http_fetch_confile(struct doris_instance *instance) void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp) { - struct doris_instance *instance = (struct doris_instance *)userp; + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; //check code only once if(instance->ctx.res_code != 0) @@ -331,14 +308,14 @@ void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, lon if(res_code != 200) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "No new meta found, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d", - instance->ctx.server, instance->cur_version, instance->req_version, code); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "business: %s, No new meta found, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d", + instance->args.bizname, instance->ctx.server, instance->cur_version, instance->req_version, code); } } void doris_http_meta_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp) { - struct doris_instance *instance = (struct doris_instance *)userp; + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; if(code!=CURLE_OK || res_code!=200) { @@ -350,14 +327,14 @@ void doris_http_meta_body_cb(const char *ptr, size_t bytes, CURLcode code, long void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void *userp) { - struct doris_instance *instance = (struct doris_instance *)userp; + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; cJSON *sub; int64_t new_version; if(res!=CURLE_OK) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Request meta failed, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s", - instance->ctx.server, instance->cur_version, instance->req_version, res_code, err); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Request meta failed, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s", + instance->args.bizname, instance->ctx.server, instance->cur_version, instance->req_version, res_code, err); goto out_error; } @@ -369,16 +346,16 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void instance->meta = cJSON_Parse(instance->estr.buff); if(instance->meta == NULL) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, server: %s, req_version=%lu, invalid json: %s", - instance->ctx.server, instance->req_version, instance->estr.buff); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Parse meta failed, server: %s, req_version=%lu, invalid json: %s", + instance->args.bizname, instance->ctx.server, instance->req_version, instance->estr.buff); goto out_error; } sub = cJSON_GetObjectItem(instance->meta, "version"); new_version = sub->valuedouble; if(new_version <= instance->cur_version) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "An older version received, abandon it. server: %s, cur_version=%lu, invalid json: %s", - instance->ctx.server, instance->cur_version, instance->estr.buff); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, An older version received, abandon it. server: %s, cur_version=%lu, invalid json: %s", + instance->args.bizname, instance->ctx.server, instance->cur_version, instance->estr.buff); cJSON_Delete(instance->meta); instance->meta = NULL; goto out_error; @@ -386,8 +363,8 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void instance->new_version = new_version; instance->req_version = instance->new_version; instance->statistic.field[DRS_FS_FILED_RES_META] += 1; - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, server: %s, cur_version=%lu, newjson: %s", - instance->ctx.server, instance->cur_version, instance->estr.buff); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "business: %s, NEW_META found, server: %s, cur_version=%lu, newjson: %s", + instance->args.bizname, instance->ctx.server, instance->cur_version, instance->estr.buff); instance->cbs.version_start(instance, instance->meta, instance->cbs.userdata); instance->array = cJSON_GetObjectItem(instance->meta, "configs"); @@ -405,13 +382,21 @@ out_error: doris_request_restart_timer(instance, instance->param->retry_interval); easy_string_destroy(&instance->estr); doris_confile_ctx_destry(&instance->ctx); + if(res_code==304 && instance->cbs.version_updated!=NULL) //�汾�����ͬ�� + { + instance->cbs.version_updated(instance, instance->cbs.userdata); + } + if(res_code==300 && instance->param->client_sync_on) //������а�;�еİ汾�ϴ� + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Server is busy processing version requests, waiting it done...", instance->args.bizname); + } } -static void doris_http_fetch_meta(struct doris_instance *instance) +static void doris_http_fetch_meta(struct doris_csum_instance *instance) { u_int64_t balance_seed; struct doris_http_callback curlcbs; - char metauri[128]; + char metauri[128], cur_version[128]; balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) | (((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF); @@ -439,6 +424,13 @@ static void doris_http_fetch_meta(struct doris_instance *instance) instance->req_version = instance->cur_version + 1; //ֻ�а汾���³ɹ���cur_version�Ż���� if(instance->ctx.httpctx != NULL) { + if(instance->param->client_sync_on) + { + //����������ͬ�����ѣ����ڱ���Ǵ�Client���� + sprintf(cur_version, "X-Doris-Sync-Current-Version: %lu", instance->cur_version); + doris_http_ctx_add_header(instance->ctx.httpctx, cur_version); + } + snprintf(metauri, 128, "configmeta?version=%lu&business=%s", instance->req_version, instance->args.bizname); if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { @@ -464,7 +456,7 @@ static void doris_http_fetch_meta(struct doris_instance *instance) static void instance_fetch_cfg_timer_cb(int fd, short kind, void *userp) { - struct doris_instance *instance = (struct doris_instance *)userp; + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; switch(instance->status) { @@ -482,7 +474,7 @@ static void instance_fetch_cfg_timer_cb(int fd, short kind, void *userp) static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp) { - struct doris_parameter *param=(struct doris_parameter *)userp; + struct doris_csum_param *param=(struct doris_csum_param *)userp; struct timeval tv; FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_MST_CNN_SRV], 0, FS_OP_SET, param->param_master->connected_hosts); @@ -503,7 +495,7 @@ static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp) evtimer_add(¶m->fs_timer_output, &tv); } -static int doris_client_register_field_stat(struct doris_parameter *param, void *runtime_log, struct event_base *evbase) +static int doris_client_register_field_stat(struct doris_csum_param *param, void *runtime_log, struct event_base *evbase) { const char *field_names[FSSTAT_DORIS_FILED_NUM]={"ReqFail", "ReqMetas", "ResMetas", "ResNoNew", "ReqFiles", "ResFiles", "ResFrags", "ResFragErr", "ResBytes", "ResVerErr", "ReqBackup1", "ReqBackup2"}; @@ -549,19 +541,44 @@ static int doris_client_register_field_stat(struct doris_parameter *param, void return 0; } -struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog) +u_int32_t doris_csum_param_get_refernces(struct doris_csum_param *param) { - struct doris_parameter *param; + pthread_mutex_lock(¶m->mutex_lock); + u_int32_t references = param->references; + pthread_mutex_unlock(¶m->mutex_lock); + return references; +} - param = (struct doris_parameter *)calloc(1, sizeof(struct doris_parameter)); +void doris_csum_parameter_destroy(struct doris_csum_param *param) +{ + evtimer_del(¶m->fs_timer_output); + FS_stop(¶m->fsstat_handle); + doris_http_parameter_destroy(param->param_master); + if(param->param_backup1 != NULL) + { + doris_http_parameter_destroy(param->param_backup1); + } + if(param->param_backup2 != NULL) + { + doris_http_parameter_destroy(param->param_backup2); + } + free(param); +} + +struct doris_csum_param *doris_csum_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog) +{ + struct doris_csum_param *param; + + param = (struct doris_csum_param *)calloc(1, sizeof(struct doris_csum_param)); param->manage_evbase = manage_evbase; MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fail_retry_interval", ¶m->retry_interval, 10); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fragmet_size", ¶m->fetch_frag_size, 5242880); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_confile_max_tries", ¶m->fetch_max_tries, 3); + MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "master_slave_sync_on", ¶m->client_sync_on, 0); - MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_appname", param->fsstat_appname, 16, "DorisClient"); - MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_filepath", param->fsstat_filepath, 256, "./log/doris_client.fs"); + MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_appname", param->fsstat_appname, 16, "DrsCsmClient"); + MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_filepath", param->fsstat_filepath, 256, "./log/doris_client_csm.fs"); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fsstat_log_interval", ¶m->fsstat_period, 10); MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_print_mode", ¶m->fsstat_print_mode, 1); MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_dst_ip", param->fsstat_dst_ip, 64, "127.0.0.1"); @@ -578,14 +595,15 @@ struct doris_parameter *doris_parameter_new(const char *confile, struct event_ba } param->param_backup1 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup1_server", manage_evbase, runtimelog); param->param_backup2 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup2_server", manage_evbase, runtimelog); + pthread_mutex_init(¶m->mutex_lock, NULL); return param; } static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp) { - struct doris_instance *instance = (struct doris_instance *)userp; + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; struct timeval tv; - struct doris_statistics incr_statistic; + struct doris_csum_statistics incr_statistic; long long *plast_statistic = (long long*)&instance->statistic_last; long long *pnow_statistic = (long long*)&instance->statistic; long long *pinc_statistic = (long long*)&incr_statistic; @@ -594,9 +612,9 @@ static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp) http_sessions += caculate_http_sessions_sum(instance->httpins_master); http_sessions += caculate_http_sessions_sum(instance->httpins_backup1); http_sessions += caculate_http_sessions_sum(instance->httpins_backup2); - instance->statistic.field[DRS_FS_STAT_HTTP_SESSIONS] = http_sessions; + instance->statistic.status[DRS_FS_STAT_HTTP_SESSIONS] = http_sessions; - for(u_int32_t i=0; i<sizeof(struct doris_statistics)/sizeof(long long); i++) + for(u_int32_t i=0; i<sizeof(struct doris_csum_statistics)/sizeof(long long); i++) { pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i]; } @@ -615,13 +633,38 @@ static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp) event_add(&instance->timer_statistic, &tv); } -struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase, +struct doris_csum_param *doris_csum_instance_get_param(struct doris_csum_instance *instance) +{ + return instance->param; +} + +void doris_csum_instance_destroy(struct doris_csum_instance *instance) +{ + pthread_mutex_lock(&instance->param->mutex_lock); + instance->param->references--; + pthread_mutex_unlock(&instance->param->mutex_lock); + + evtimer_del(&instance->timer_fetch); + evtimer_del(&instance->timer_statistic); + /*doris_http_instance_destroy(instance->httpins_master); + if(instance->httpins_backup1 != NULL) + { + doris_http_instance_destroy(instance->httpins_backup1); + } + if(instance->httpins_backup2 != NULL) + { + doris_http_instance_destroy(instance->httpins_backup2); + }*/ + free(instance); +} + +struct doris_csum_instance *doris_csum_instance_new(struct doris_csum_param *param, struct event_base *worker_evbase, struct doris_callbacks *cbs, struct doris_arguments *args, void *runtimelog) { - struct doris_instance *instance; + struct doris_csum_instance *instance; struct timeval tv; - instance = (struct doris_instance *)calloc(1, sizeof(struct doris_instance)); + instance = (struct doris_csum_instance *)calloc(1, sizeof(struct doris_csum_instance)); instance->param = param; instance->worker_evbase = worker_evbase; instance->runtime_log = runtimelog; @@ -646,6 +689,10 @@ struct doris_instance *doris_instance_new(struct doris_parameter *param, struct instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog); } + pthread_mutex_lock(¶m->mutex_lock); + param->references++; + pthread_mutex_unlock(¶m->mutex_lock); + evtimer_assign(&instance->timer_statistic, worker_evbase, doris_instance_statistic_timer_cb, instance); tv.tv_sec = param->fsstat_period; tv.tv_usec = 0; diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h index 58376ba..a236135 100644 --- a/client/doris_client_fetch.h +++ b/client/doris_client_fetch.h @@ -1,19 +1,13 @@ #ifndef __DORIS_CLIENT_FETCH_IN_H__ #define __DORIS_CLIENT_FETCH_IN_H__ +#include <pthread.h> #include <openssl/md5.h> #include <MESA/field_stat2.h> -#include "doris_client.h" +#include "doris_consumer_client.h" #include "doris_client_http.h" -struct easy_string -{ - char* buff; - size_t len; - size_t size; -}; - enum FETCH_CFG_STATUS { FETCH_STATUS_IDLE=0, @@ -21,11 +15,14 @@ enum FETCH_CFG_STATUS FETCH_STATUS_FILE, }; -struct doris_parameter +struct doris_csum_param { u_int32_t retry_interval; u_int32_t fetch_frag_size; u_int32_t fetch_max_tries; + u_int32_t client_sync_on; + pthread_mutex_t mutex_lock; + u_int32_t references; struct doris_http_parameter *param_master; struct doris_http_parameter *param_backup1; @@ -76,7 +73,7 @@ struct doris_confile_ctx size_t contl_total; }; -struct doris_instance +struct doris_csum_instance { struct doris_callbacks cbs; struct doris_arguments args; @@ -102,9 +99,9 @@ struct doris_instance struct event_base *worker_evbase; struct event timer_fetch; - struct doris_parameter *param; + struct doris_csum_param *param; struct event timer_statistic; - struct doris_statistics statistic, statistic_last; + struct doris_csum_statistics statistic, statistic_last; void *runtime_log; }; diff --git a/client/doris_client_http.cpp b/client/doris_client_http.cpp index fbd9181..a63ac91 100644 --- a/client/doris_client_http.cpp +++ b/client/doris_client_http.cpp @@ -21,6 +21,38 @@ #include "doris_client_http.h" +void easy_string_destroy(struct easy_string *estr) +{ + if(estr->buff != NULL) + { + free(estr->buff); + estr->buff = NULL; + estr->len = estr->size = 0; + } +} + +void easy_string_savedata(struct easy_string *estr, const char *data, size_t len) +{ + if(estr->size-estr->len < len+1) + { + estr->size += len*4+1; + estr->buff = (char*)realloc(estr->buff, estr->size); + } + + memcpy(estr->buff+estr->len, data, len); + estr->len += len; + estr->buff[estr->len]='\0'; +} + +static inline void drsclient_set_sockopt_keepalive(int sd, int keepidle, int keepintvl, int keepcnt) +{ + int keepalive = 1; + setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (void*)&keepalive, sizeof(keepalive)); + setsockopt(sd, SOL_TCP, TCP_KEEPIDLE, (void*)&keepidle, sizeof(keepidle)); + setsockopt(sd, SOL_TCP, TCP_KEEPINTVL, (void*)&keepintvl, sizeof(keepintvl)); + setsockopt(sd, SOL_TCP, TCP_KEEPCNT, (void*)&keepcnt, sizeof(keepcnt)); +} + int32_t param_get_connected_hosts(struct doris_http_parameter *param) { return param->connected_hosts; @@ -199,6 +231,7 @@ static void client_bufferevent_error_cb(struct bufferevent *bev, short event, vo conhash_insert_dest_host(balance); assert(balance->param->connected_hosts > 0); assert(balance->param->failed_hosts >= 0); + drsclient_set_sockopt_keepalive(bufferevent_getfd(bev), 10, 5, 2); } else { @@ -303,6 +336,24 @@ static int32_t doris_launch_group_connection(struct doris_http_parameter *param, return 0; } +void doris_http_parameter_destroy(struct doris_http_parameter *param) +{ + for(u_int32_t i=0; i<param->ipgroup.dstaddr_num; i++) //���� + { + if(evtimer_pending(¶m->balance[i].timer_detect, NULL)) + { + evtimer_del(¶m->balance[i].timer_detect); + } + if(param->balance[i].bev != NULL) + { + bufferevent_free(param->balance[i].bev); + } + } + conhash_instance_free(param->conhash); + free(param->ipgroup.dstaddrs); + free(param); +} + struct doris_http_parameter *doris_http_parameter_new(const char* profile_path, const char* section, struct event_base* evbase, void *runtime_log) { struct doris_http_parameter *param; @@ -352,6 +403,38 @@ struct doris_http_parameter *doris_http_parameter_new(const char* profile_path, return param; } +void doris_http_instance_destroy(struct doris_http_instance *instance) +{ + map<u_int32_t, doris_curl_multihd*>::iterator iter; + struct doris_curl_multihd *multihd; + CURLMsg *msg; + int msgs_left; + struct doris_http_ctx *ctx; + CURL *easy; + + for(iter=instance->server_hosts->begin(); iter!=instance->server_hosts->end(); ) + { + multihd = iter->second; + + while((msg = curl_multi_info_read(multihd->multi_hd, &msgs_left))) + { + easy = msg->easy_handle; + curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx); + curl_multi_remove_handle(multihd->multi_hd, easy); + curl_easy_cleanup(easy); + ctx->curl = NULL; + ctx->transfering = 0; + ctx->res = CURLE_ABORTED_BY_CALLBACK; + ctx->res_code = 0; + ctx->cb.transfer_done_cb(ctx->res, 0, ctx->error, ctx->cb.userp); + } + curl_multi_cleanup(multihd->multi_hd); + + instance->server_hosts->erase(iter++); + } + free(instance); +} + struct doris_http_instance *doris_http_instance_new(struct doris_http_parameter *param, struct event_base* evbase, void *runtimelog) { struct doris_http_instance *instance; diff --git a/client/doris_client_http.h b/client/doris_client_http.h index 6f829ab..4dd3550 100644 --- a/client/doris_client_http.h +++ b/client/doris_client_http.h @@ -28,6 +28,15 @@ using namespace std; #define DEFAULT_HOST_CAPACITY 4 #define LOAD_BALANC_VIRT_TIMES 16 +struct easy_string +{ + char* buff; + size_t len; + size_t size; +}; +void easy_string_destroy(struct easy_string *estr); +void easy_string_savedata(struct easy_string *estr, const char *data, size_t len); + enum TCP_CONNECTION_STATUS { TCP_STATUS_IDLE=0, @@ -83,7 +92,6 @@ struct doris_http_parameter struct doris_http_instance { struct event_base* evbase; - SSL_CTX *ssl_instance; void *privdata; map<u_int32_t, doris_curl_multihd*> *server_hosts; @@ -102,7 +110,9 @@ int32_t param_get_connected_hosts(struct doris_http_parameter *param); int32_t param_get_failed_hosts(struct doris_http_parameter *param); struct doris_http_parameter *doris_http_parameter_new(const char* profile_path, const char* section, struct event_base* evbase, void *runtime_log); +void doris_http_parameter_destroy(struct doris_http_parameter *param); struct doris_http_instance *doris_http_instance_new(struct doris_http_parameter *param, struct event_base* evbase, void *runtimelog); +void doris_http_instance_destroy(struct doris_http_instance *instance); #endif diff --git a/client/doris_client_produce.cpp b/client/doris_client_produce.cpp new file mode 100644 index 0000000..d8fcedc --- /dev/null +++ b/client/doris_client_produce.cpp @@ -0,0 +1,518 @@ +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> +#include <errno.h> +#include <sys/time.h> +#include <time.h> +#include <string.h> +#include <openssl/md5.h> +#include <cjson/cJSON.h> + +#include <MESA/MESA_prof_load.h> + +#include "doris_client_produce.h" + +void doris_prod_upload_ctx_destroy(struct doris_upload_ctx *ctx) +{ + doris_http_ctx_destroy(ctx->httpctx); + free(ctx); +} + +static enum PROD_VEROP_RES version_common_result_assign_val(CURLcode res, long res_code) +{ + if(res != CURLE_OK) + { + return VERSIONOP_CURL_ERROR; + } + + switch(res_code) + { + case 200: return VERSIONOP_RES_OK; + case 201: return VERSIONOP_RES_OK; //�ļ��ֶ��ظ��ϴ��Ż᷵��201 + default: return VERSIONOP_RES_ERROR; + } +} + +void version_cancel_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp) +{ + struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp; + enum PROD_VEROP_RES result; + + ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1; + + result = version_common_result_assign_val(res, res_code); + + if(result != VERSIONOP_RES_OK) + { + MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, version cancel sync failed, res_code: %ld, err: %s", ctx->business, res_code, err); + } + else + { + MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "business: %s, version cancel sync succ, res_code: %ld", ctx->business, res_code); + } + + if(ctx->vercancel_cb != NULL) + { + ctx->vercancel_cb(result, ctx->userdata); + } +} + +int32_t doris_prod_version_cancel(struct doris_upload_ctx *ctx, void (*vercancel_cb)(enum PROD_VEROP_RES result, void *userdata), void *userdata) +{ + struct doris_http_callback cb; + char uri[256]; + + ctx->instance->statistic.field[DRS_FSPRD_FILED_VERCANCEL] += 1; + ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1; + + ctx->vercancel_cb = vercancel_cb; + ctx->userdata = userdata; + + cb.userp = ctx; + cb.header_cb = NULL; + cb.write_cb = NULL; + cb.read_process_cb = NULL; + cb.transfer_done_cb = version_cancel_transfer_done_cb; + doris_http_ctx_reset(ctx->httpctx, &cb); + + if(ctx->instance->param->client_sync_on) + { + doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1"); + } + snprintf(uri, sizeof(uri), "version/cancel?token=%s", ctx->token); + return doris_http_launch_post_request(ctx->httpctx, uri, NULL, 0); +} + +void version_end_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp) +{ + struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp; + const char *pos_colon; + char buffer[64]; + int datalen; + + if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL) + { + return ; + } + datalen = pos_colon - start; + switch(datalen) + { + case 13: + if(!strncasecmp(start, "X-Set-Version:", 14)) + { + memcpy(buffer, start+14, bytes-14); + buffer[bytes-14] = '\0'; + ctx->res_version = atol(buffer); + } + break; + default: break; + } +} + +void version_end_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp) +{ + struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp; + enum PROD_VEROP_RES result; + + ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1; + + result = version_common_result_assign_val(res, res_code); + + if(result != VERSIONOP_RES_OK) + { + MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, version end sync failed, res_code: %ld, err: %s", ctx->business, res_code, err); + } + else + { + assert(ctx->res_version != 0); + MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_INFO, "business: %s, version end sync succ, res_code: %ld, new version: %lu", ctx->business, res_code, ctx->res_version); + } + + if(ctx->verend_cb != NULL) + { + ctx->verend_cb(result, ctx->res_version, ctx->userdata); + } +} + +int32_t doris_prod_version_end(struct doris_upload_ctx *ctx, + void (*verend_cb)(enum PROD_VEROP_RES result, int64_t version, void *userdata), void *userdata) +{ + struct doris_http_callback cb; + char uri[256]; + + ctx->instance->statistic.field[DRS_FSPRD_FILED_VEREND] += 1; + ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1; + + ctx->verend_cb = verend_cb; + ctx->userdata = userdata; + + cb.userp = ctx; + cb.header_cb = version_end_header_cb; + cb.write_cb = NULL; + cb.read_process_cb = NULL; + cb.transfer_done_cb = version_end_transfer_done_cb; + doris_http_ctx_reset(ctx->httpctx, &cb); + + if(ctx->instance->param->client_sync_on) + { + doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1"); + } + snprintf(uri, sizeof(uri), "version/finish?token=%s", ctx->token); + return doris_http_launch_post_request(ctx->httpctx, uri, NULL, 0); +} + +void upload_frag_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp) +{ + struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp; + enum PROD_VEROP_RES result; + + ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1; + + result = version_common_result_assign_val(res, res_code); + + if(result != VERSIONOP_RES_OK) + { + MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, upload frag sync failed, res_code: %ld, err: %s", ctx->business, res_code, err); + } + else + { + MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "business: %s, upload frag sync succ, filename: %s, offset: %lu", ctx->business, ctx->filename, ctx->req_offset); + } + + if(ctx->upfrag_cb != NULL) + { + ctx->upfrag_cb(result, ctx->userdata); + } +} + +int32_t do_doris_prod_upload_with_cb(struct doris_upload_ctx *ctx, char *data, size_t size, + struct table_meta *meta, const char *uri) +{ + struct doris_http_callback cb; + + ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1; + + cb.userp = ctx; + cb.header_cb = NULL; + cb.write_cb = NULL; + cb.read_process_cb = NULL; + cb.transfer_done_cb = upload_frag_transfer_done_cb; + doris_http_ctx_reset(ctx->httpctx, &cb); + + if(ctx->instance->param->client_sync_on) + { + doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1"); + } + if(meta->userregion != NULL) + { + doris_http_ctx_add_header_kvstr(ctx->httpctx, "X-User-Info", meta->userregion); + } + doris_http_ctx_add_header_kvint(ctx->httpctx, "X-Config-Num", meta->cfgnum); + doris_http_ctx_add_header_kvstr(ctx->httpctx, "Content-MD5", meta->md5); + + return doris_http_launch_put_request_data(ctx->httpctx, uri, data, size); +} + +int32_t doris_prod_upload_frag_with_cb(struct doris_upload_ctx *ctx, char *data, size_t size, size_t offset, + bool last, struct table_meta *meta, void (*upfrag_cb)(enum PROD_VEROP_RES result, void *userdata), void *userdata) +{ + char uri[256]; + + ctx->instance->statistic.field[DRS_FSPRD_FILED_FILEFRAG] += 1; + + ctx->upfrag_cb = upfrag_cb; + ctx->userdata = userdata; + ctx->req_offset = offset; + snprintf(ctx->filename, 256, "%s", ctx->filename); + + if(last) + { + snprintf(uri, sizeof(uri), "filefrag/upload?token=%s&tablename=%s&filename=%s&offset=%lu&last=true", + ctx->token, meta->tablename, meta->filename, offset); + } + else + { + snprintf(uri, sizeof(uri), "filefrag/upload?token=%s&tablename=%s&filename=%s&offset=%lu", + ctx->token, meta->tablename, meta->filename, offset); + } + return do_doris_prod_upload_with_cb(ctx, data, size, meta, uri); +} + +int32_t doris_prod_upload_once_with_cb(struct doris_upload_ctx *ctx, char *data, size_t size, + struct table_meta *meta, void (*upfrag_cb)(enum PROD_VEROP_RES result, void *userdata), void *userdata) +{ + char uri[256]; + + ctx->instance->statistic.field[DRS_FSPRD_FILED_FILEONCE] += 1; + + ctx->upfrag_cb = upfrag_cb; + ctx->userdata = userdata; + ctx->req_offset = 0; + snprintf(ctx->filename, 256, "%s", ctx->filename); + + snprintf(uri, sizeof(uri), "fileonce/upload?token=%s&tablename=%s&filename=%s", + ctx->token, meta->tablename, meta->filename); + return do_doris_prod_upload_with_cb(ctx, data, size, meta, uri); +} + +void verstart_body_write_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp) +{ + struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp; + + easy_string_savedata(&ctx->estr, (const char*)ptr, bytes); +} + +static enum PROD_VERSTART_RES version_start_result_assign_val(CURLcode res, long res_code) +{ + if(res != CURLE_OK) + { + return VERSTART_CURL_ERROR; + } + + switch(res_code) + { + case 200: return VERSTART_RES_OK; + case 300: return VERSTART_RES_BUSY; + default: return VERSTART_RES_ERROR; + } +} + +void verstart_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp) +{ + struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp; + cJSON *meta, *token; + enum PROD_VERSTART_RES result; + + ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1; + + result = version_start_result_assign_val(res, res_code); + switch(result) + { + case VERSTART_RES_OK: + case VERSTART_RES_BUSY: //server����300����token������ֻ���Լ�һ��������Ч��(�Լ�֮ǰ��˻ָ�) + meta = cJSON_Parse(ctx->estr.buff); + token = cJSON_GetObjectItem(meta, "token"); + assert(token->valuestring != NULL); + snprintf(ctx->token, 64, "%s", token->valuestring); + cJSON_Delete(meta); + MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "business: %s, version start sync %s, res_code: %ld, body: %s", + (result==VERSTART_RES_OK)?"succ":"busy", ctx->business, res_code, ctx->estr.buff); + break; + + case VERSTART_RES_ERROR: + case VERSTART_CURL_ERROR: + MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, version start sync failed, res_code: %ld, err: %s", + ctx->business, res_code, err); + break; + default: assert(0);break; + } + + if(ctx->verstart_cb != NULL) + { + ctx->verstart_cb(result, ctx->estr.buff, ctx->userdata); + } + easy_string_destroy(&ctx->estr); +} + +struct doris_upload_ctx *doris_prod_upload_ctx_new(struct doris_prod_instance *instance,const char *business, int32_t cfgtype) +{ + struct doris_upload_ctx *ctx; + struct doris_http_callback cb; + + if(cfgtype!=1 && cfgtype!=2) + { + return NULL; + } + ctx = (struct doris_upload_ctx *)calloc(1, sizeof(struct doris_upload_ctx)); + snprintf(ctx->business, 32, "%s", business); + ctx->instance = instance; + ctx->cfg_type = cfgtype; + + cb.userp = ctx; + cb.header_cb = NULL; + cb.write_cb = verstart_body_write_cb; + cb.read_process_cb = NULL; + cb.transfer_done_cb = verstart_transfer_done_cb; + if(NULL == (ctx->httpctx = doris_http_ctx_new(instance->http_instance, &cb, rand(), NULL, 0))) + { + free(ctx); + return NULL; + } + return ctx; +} + +int32_t doris_prod_version_start_with_cb(struct doris_upload_ctx *ctx, + void (*verstart_cb)(enum PROD_VERSTART_RES result, const char *body, void *userdata), void *userdata) +{ + char uri[256]; + + ctx->userdata = userdata; + ctx->verstart_cb = verstart_cb; + + if(ctx->instance->param->client_sync_on) + { + doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1"); + } + + snprintf(uri, sizeof(uri), "version/start?business=%s&type=%d", ctx->business, ctx->cfg_type); + if(doris_http_launch_post_request(ctx->httpctx, uri, NULL, 0)) + { + free(ctx); + return -1; + } + ctx->instance->statistic.field[DRS_FSPRD_FILED_VERSTART] += 1; + ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1; + return 0; +} + +int32_t doris_prod_version_start(struct doris_upload_ctx *ctx) +{ + return doris_prod_version_start_with_cb(ctx, NULL, NULL); +} + +static void doris_prod_fsoutput_timer_cb(int fd, short kind, void *userp) +{ + struct doris_prod_param *param=(struct doris_prod_param *)userp; + struct timeval tv; + + FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FSPRD_STAT_CNNED_SERVERS], 0, FS_OP_SET, param->param->connected_hosts); + FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FSPRD_STAT_FAILED_SERVERS], 0, FS_OP_SET, param->param->failed_hosts); + FS_passive_output(param->fsstat_handle); + + tv.tv_sec = param->fsstat_period; + tv.tv_usec = 0; + evtimer_add(¶m->fs_timer_output, &tv); +} + +static int doris_prod_register_fsstat(struct doris_prod_param *param, void *runtime_log, struct event_base *evbase) +{ + const char *field_names[FSSTAT_DORIS_PRD_FILED_NUM]={"VerStart", "VerEnd", "VerCancel", "FileFrag", "FileOnce"}; + const char *status_names[FSSTAT_DORIS_PRD_STATUS_NUM]={"ServerCnned", "ServerFail", "MemoryUsed", "HttpSession", "ReqSession"}; + struct timeval tv; + int value; + + param->fsstat_handle = FS_create_handle(); + FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1); + if(param->fsstat_print_mode == 1) + { + FS_set_para(param->fsstat_handle, PRINT_MODE, ¶m->fsstat_print_mode, sizeof(param->fsstat_print_mode)); + } + else + { + FS_set_para(param->fsstat_handle, PRINT_MODE, ¶m->fsstat_print_mode, sizeof(param->fsstat_print_mode)); + value = 1; + FS_set_para(param->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value)); + } + value = param->fsstat_period; + FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value)); + value = 0; + FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value)); + FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1); + FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1); + FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, ¶m->fsstat_dst_port, sizeof(param->fsstat_dst_port)); + + for(int i=0; i<FSSTAT_DORIS_PRD_FILED_NUM; i++) + { + param->fsstat_field[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]); + } + for(int i=0; i<FSSTAT_DORIS_PRD_STATUS_NUM; i++) + { + param->fsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]); + } + FS_start(param->fsstat_handle); + + evtimer_assign(¶m->fs_timer_output, evbase, doris_prod_fsoutput_timer_cb, param); + tv.tv_sec = param->fsstat_period; + tv.tv_usec = 0; + evtimer_add(¶m->fs_timer_output, &tv); + return 0; +} + +struct doris_prod_param *doris_prod_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog) +{ + struct doris_prod_param *param; + + param = (struct doris_prod_param *)calloc(1, sizeof(struct doris_prod_param)); + param->manage_evbase = manage_evbase; + + MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "upload_fragmet_size", ¶m->upload_frag_size, 5242880); + MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "master_slave_sync_on", ¶m->client_sync_on, 0); + + MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_appname", param->fsstat_appname, 16, "DrsPrdClient"); + MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_filepath_p", param->fsstat_filepath, 256, "./log/doris_client_prd.fs"); + MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fsstat_log_interval", ¶m->fsstat_period, 10); + MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_print_mode", ¶m->fsstat_print_mode, 1); + MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_dst_ip", param->fsstat_dst_ip, 64, "127.0.0.1"); + MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_dst_port", ¶m->fsstat_dst_port, 8125); + + /*ͬ��ʱֻ��˫������*/ + param->param = doris_http_parameter_new(confile, "DORIS_CLIENT.produce", manage_evbase, runtimelog); + if(param->param == NULL) + { + return NULL; + } + assert(param->param->ipgroup.dstaddr_num == 1); + if(doris_prod_register_fsstat(param, runtimelog, manage_evbase)) + { + return NULL; + } + return param; +} + +static void doris_prod_statistic_timer_cb(int fd, short kind, void *userp) +{ + struct doris_prod_instance *instance = (struct doris_prod_instance *)userp; + struct timeval tv; + struct doris_prod_statistics incr_statistic; + long long *plast_statistic = (long long*)&instance->statistic_last; + long long *pnow_statistic = (long long*)&instance->statistic; + long long *pinc_statistic = (long long*)&incr_statistic; + + instance->statistic.status[DRS_FSPRD_STAT_HTTP_SESSIONS] = caculate_http_sessions_sum(instance->http_instance); + + for(u_int32_t i=0; i<sizeof(struct doris_prod_statistics)/sizeof(long long); i++) + { + pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i]; + } + instance->statistic_last = instance->statistic; + + for(u_int32_t i=0; i<FSSTAT_DORIS_PRD_FILED_NUM; i++) + { + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field[i], 0, FS_OP_ADD, incr_statistic.field[i]); + } + for(u_int32_t i=0; i<FSSTAT_DORIS_PRD_STATUS_NUM; i++) + { + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_status[i], 0, FS_OP_ADD, incr_statistic.status[i]); + } + tv.tv_sec = instance->param->fsstat_period; + tv.tv_usec = 0; + event_add(&instance->timer_statistic, &tv); +} + +struct doris_prod_instance *doris_prod_instance_new(struct doris_prod_param *param, struct event_base *worker_evbase, void *runtimelog) +{ + struct doris_prod_instance *instance; + struct timeval tv; + + instance = (struct doris_prod_instance *)calloc(1, sizeof(struct doris_prod_instance)); + instance->param = param; + instance->worker_evbase = worker_evbase; + instance->runtime_log = runtimelog; + + instance->http_instance = doris_http_instance_new(param->param, worker_evbase, runtimelog); + if(instance->http_instance == NULL) + { + return NULL; + } + srand((int64_t)param); + + evtimer_assign(&instance->timer_statistic, worker_evbase, doris_prod_statistic_timer_cb, instance); + tv.tv_sec = param->fsstat_period; + tv.tv_usec = 0; + evtimer_add(&instance->timer_statistic, &tv); + return instance; +} + diff --git a/client/doris_client_produce.h b/client/doris_client_produce.h new file mode 100644 index 0000000..26cf87e --- /dev/null +++ b/client/doris_client_produce.h @@ -0,0 +1,61 @@ +#ifndef __DORIS_CLIENT_PRODUCE_H__ +#define __DORIS_CLIENT_PRODUCE_H__ + +#include <openssl/md5.h> +#include <MESA/field_stat2.h> + +#include "doris_producer_client.h" +#include "doris_client_http.h" + +struct doris_prod_param +{ + u_int32_t upload_frag_size; + u_int32_t client_sync_on; + + struct doris_http_parameter *param; + struct event_base *manage_evbase; + + screen_stat_handle_t fsstat_handle; + struct event fs_timer_output; + char fsstat_dst_ip[64]; + char fsstat_appname[16]; + char fsstat_filepath[256]; + u_int32_t fsstat_period; + int32_t fsstat_print_mode; + int32_t fsstat_dst_port; + int32_t fsstat_field[FSSTAT_DORIS_PRD_FILED_NUM]; + int32_t fsstat_status[FSSTAT_DORIS_PRD_STATUS_NUM]; +}; + +struct doris_prod_instance +{ + struct doris_prod_param *param; + struct doris_http_instance *http_instance; + + struct event_base *worker_evbase; + struct event timer_statistic; + struct doris_prod_statistics statistic, statistic_last; + void *runtime_log; +}; + +struct doris_upload_ctx +{ + struct doris_http_ctx *httpctx; + char token[64]; + char business[32]; + char filename[256]; + struct easy_string estr; + int32_t cfg_type; + size_t req_offset; + int64_t res_version; + + void *userdata; + void (*verstart_cb)(enum PROD_VERSTART_RES result, const char *body, void *userdata); + void (*upfrag_cb)(enum PROD_VEROP_RES result, void *userdata); + void (*verend_cb)(enum PROD_VEROP_RES result, int64_t version, void *userdata); + void (*vercancel_cb)(enum PROD_VEROP_RES result, void *userdata); + struct doris_prod_instance *instance; +}; + +#endif + diff --git a/client/doris_client_transfer.cpp b/client/doris_client_transfer.cpp index d26bba2..b0edcf2 100644 --- a/client/doris_client_transfer.cpp +++ b/client/doris_client_transfer.cpp @@ -50,7 +50,7 @@ void doris_http_ctx_destroy(struct doris_http_ctx *ctx) } struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance, - struct doris_http_callback *cb, u_int64_t balance_seed, char *host, int32_t size) + struct doris_http_callback *cb, u_int64_t balance_seed, char *host/*OUT*/, int32_t size) { struct doris_http_ctx *ctx; struct doris_curl_multihd *multidata; @@ -62,8 +62,10 @@ struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance, } assert(instance->server_hosts->find(result.bucket_id) != instance->server_hosts->end()); multidata = instance->server_hosts->find(result.bucket_id)->second; - snprintf(host, size, multidata->host->srvaddr); - + if(host != NULL) + { + snprintf(host, size, multidata->host->srvaddr); + } ctx = (struct doris_http_ctx *)calloc(1, sizeof(struct doris_http_ctx)); ctx->instance = instance; ctx->multidata = multidata; @@ -92,8 +94,8 @@ static inline void curl_set_common_options(CURL *curl, long transfer_timeout, ch curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 1000L); curl_easy_setopt(curl, CURLOPT_TIMEOUT, transfer_timeout); //���Է��ֶ�������ij���ӽ��տ�ס����� - //ctx->error="Operation too slow. Less than 100 bytes/sec transferred the last 10 seconds" - curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 10L); + //ctx->error="Operation too slow. Less than 100 bytes/sec transferred the last 30 seconds" + curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 30L); curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 100L); curl_easy_setopt(curl, CURLOPT_USERAGENT, "Doris Client Linux X64"); } @@ -136,6 +138,21 @@ void doris_http_ctx_add_header(struct doris_http_ctx *ctx, const char *header) ctx->headers = curl_slist_append(ctx->headers, header); } +/*maximum length 1024*/ +void doris_http_ctx_add_header_kvstr(struct doris_http_ctx *ctx, const char *headername, const char *value) +{ + char header[1024]; + snprintf(header, 1024, "%s: %s", headername, value); + ctx->headers = curl_slist_append(ctx->headers, header); +} + +void doris_http_ctx_add_header_kvint(struct doris_http_ctx *ctx, const char *headername, u_int64_t value) +{ + char header[1024]; + snprintf(header, 1024, "%s: %lu", headername, value); + ctx->headers = curl_slist_append(ctx->headers, header); +} + int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri) { char minio_url[2048]; @@ -143,7 +160,7 @@ int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri) assert(ctx->curl == NULL); if(NULL == (ctx->curl=curl_easy_init())) { - return -1; + assert(0);return -1; } if(ctx->instance->param->ssl_connection) @@ -171,8 +188,7 @@ int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri) if(CURLM_OK != curl_multi_add_handle(ctx->multidata->multi_hd, ctx->curl)) { - assert(0); - return -2; + assert(0);return -2; } ctx->transfering = 1; return 0; @@ -185,7 +201,7 @@ int doris_http_launch_post_request(struct doris_http_ctx *ctx, const char *uri, assert(ctx->curl == NULL); if(NULL == (ctx->curl=curl_easy_init())) { - return -1; + assert(0);return -1; } doris_http_ctx_add_header(ctx, "Expect:"); //ע��POST������Expect��ϵ��Ҫ��ȷ����CURLOPT_POSTFIELDSIZE @@ -219,8 +235,153 @@ int doris_http_launch_post_request(struct doris_http_ctx *ctx, const char *uri, if(CURLM_OK != curl_multi_add_handle(ctx->multidata->multi_hd, ctx->curl)) { - assert(0); - return -2; + assert(0);return -2; + } + ctx->transfering = 1; + return 0; +} + +static size_t curl_put_data_request_send_cb(void *ptr, size_t size, size_t count, void *userp) +{ + size_t len; + struct doris_http_ctx *ctx = (struct doris_http_ctx *)userp; + + if(size==0 || count==0 || ctx->put_offset>=ctx->put_length) + { + return 0; //��һ������ + } + + len = ctx->put_length - ctx->put_offset; //ʣ����ϴ��ij��� + if(len > size * count) + { + len = size * count; + } + + memcpy(ptr, ctx->put_data + ctx->put_offset, len); + ctx->put_offset += len; + + if(ctx->cb.read_process_cb != NULL) + { + ctx->cb.read_process_cb(ctx->put_data, ctx->put_offset, ctx->cb.userp); + } + return len; +} + +int doris_http_launch_put_request_data(struct doris_http_ctx *ctx, const char *uri, char *data, size_t data_len) +{ + char minio_url[2048]; + + assert(ctx->curl == NULL); + if(NULL == (ctx->curl=curl_easy_init())) + { + assert(0);return -1; + } + ctx->put_data = data; + ctx->put_length = data_len; + ctx->put_offset = 0; + + if(ctx->instance->param->ssl_connection) + { + snprintf(minio_url, sizeof(minio_url), "https://%s/%s", ctx->multidata->host->srvaddr, uri); + curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYHOST, 0L); + } + else + { + snprintf(minio_url, sizeof(minio_url), "http://%s/%s", ctx->multidata->host->srvaddr, uri); + } + curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L); + curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); + curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_write_cb); + curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); + if(ctx->cb.header_cb != NULL) + { + curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_response_header_cb); + curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); + } + curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put_length); + curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_data_request_send_cb); + curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx); + + curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); + curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); + + if(CURLM_OK != curl_multi_add_handle(ctx->multidata->multi_hd, ctx->curl)) + { + assert(0);return -2; + } + ctx->transfering = 1; + return 0; +} + +static size_t curl_put_evbuf_request_send_cb(void *ptr, size_t size, size_t count, void *userp) +{ + size_t len, space=size*count, send_len; + struct doris_http_ctx *ctx = (struct doris_http_ctx *)userp; + + if(size==0 || count==0 || ctx->put_offset>=ctx->put_length) + { + return 0; + } + + len = ctx->put_length - ctx->put_offset; + if(len > space) + { + len = space; + } + + send_len = evbuffer_remove(ctx->put_evbuf, ptr, len); + assert(send_len>0); + ctx->put_offset += send_len; + + ctx->cb.read_process_cb(ctx->put_evbuf, ctx->put_offset, ctx->cb.userp); + return send_len; +} + +int doris_http_launch_put_request_evbuf(struct doris_http_ctx *ctx, const char *uri, struct evbuffer *evbuf, size_t data_len) +{ + char minio_url[2048]; + + assert(ctx->curl == NULL); + if(NULL == (ctx->curl=curl_easy_init())) + { + assert(0);return -1; + } + ctx->put_evbuf = evbuf; + ctx->put_length = data_len; + ctx->put_offset = 0; + + if(ctx->instance->param->ssl_connection) + { + snprintf(minio_url, sizeof(minio_url), "https://%s/%s", ctx->multidata->host->srvaddr, uri); + curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYHOST, 0L); + } + else + { + snprintf(minio_url, sizeof(minio_url), "http://%s/%s", ctx->multidata->host->srvaddr, uri); + } + curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L); + curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); + curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_write_cb); + curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); + if(ctx->cb.header_cb != NULL) + { + curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_response_header_cb); + curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); + } + curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put_length); + curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_evbuf_request_send_cb); + curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx); + + curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); + curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); + + if(CURLM_OK != curl_multi_add_handle(ctx->multidata->multi_hd, ctx->curl)) + { + assert(0);return -2; } ctx->transfering = 1; return 0; diff --git a/client/doris_client_transfer.h b/client/doris_client_transfer.h index 8cfa4d2..324c9aa 100644 --- a/client/doris_client_transfer.h +++ b/client/doris_client_transfer.h @@ -49,9 +49,13 @@ void doris_http_ctx_destroy(struct doris_http_ctx *ctx); void doris_http_ctx_reset(struct doris_http_ctx *ctx, struct doris_http_callback *cb); void doris_http_ctx_add_header(struct doris_http_ctx *ctx, const char *header); +void doris_http_ctx_add_header_kvstr(struct doris_http_ctx *ctx, const char *headername, const char *value); +void doris_http_ctx_add_header_kvint(struct doris_http_ctx *ctx, const char *headername, u_int64_t value); int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri); int doris_http_launch_post_request(struct doris_http_ctx *ctx, const char *uri, const char *data, size_t data_len); +int doris_http_launch_put_request_data(struct doris_http_ctx *ctx, const char *uri, char *data, size_t data_len); +int doris_http_launch_put_request_evbuf(struct doris_http_ctx *ctx, const char *uri, struct evbuffer *evbuf, size_t data_len); long long caculate_http_sessions_sum(const struct doris_http_instance *instance); |
