summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
author张成伟 <[email protected]>2021-08-25 10:41:31 +0000
committer张成伟 <[email protected]>2021-08-25 10:41:31 +0000
commit39a4172bd7208f271d73d755b3ac410e4db062f6 (patch)
tree699c72e52f5a4f0a4251d79c2d23df8bef8ea58d /client
parent67bafbefc972158f9ddd4fb9e45dc76ff2c8a540 (diff)
parent1aca701f127e94a7fbf60a996328083fed665f56 (diff)
Merge branch 'develop-http-producer' into 'master'
增加HTTP Post上传配置接口,支持主从双机备份与同步(冷备) See merge request doris/doris_dispatch!1
Diffstat (limited to 'client')
-rw-r--r--client/CMakeLists.txt2
-rw-r--r--client/doris_client_fetch.cpp169
-rw-r--r--client/doris_client_fetch.h21
-rw-r--r--client/doris_client_http.cpp83
-rw-r--r--client/doris_client_http.h12
-rw-r--r--client/doris_client_produce.cpp518
-rw-r--r--client/doris_client_produce.h61
-rw-r--r--client/doris_client_transfer.cpp183
-rw-r--r--client/doris_client_transfer.h4
9 files changed, 967 insertions, 86 deletions
diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt
index 9c13e61..1a8eb2a 100644
--- a/client/CMakeLists.txt
+++ b/client/CMakeLists.txt
@@ -1,4 +1,4 @@
-set (DORIS_CLIENT_SRC doris_client_fetch.cpp doris_client_http.cpp doris_client_transfer.cpp nirvana_conhash.cpp nirvana_murmurhash.cpp)
+set (DORIS_CLIENT_SRC doris_client_fetch.cpp doris_client_produce.cpp doris_client_http.cpp doris_client_transfer.cpp nirvana_conhash.cpp nirvana_murmurhash.cpp)
add_definitions(-fPIC -Wall -g)
diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp
index 9e1d9ef..8827463 100644
--- a/client/doris_client_fetch.cpp
+++ b/client/doris_client_fetch.cpp
@@ -35,29 +35,6 @@ static int doris_md5_final_string(MD5_CTX *c, char *result, unsigned int size)
return 0;
}
-void easy_string_destroy(struct easy_string *estr)
-{
- if(estr->buff != NULL)
- {
- free(estr->buff);
- estr->buff = NULL;
- estr->len = estr->size = 0;
- }
-}
-
-void easy_string_savedata(struct easy_string *estr, const char *data, size_t len)
-{
- if(estr->size-estr->len < len+1)
- {
- estr->size += len*4+1;
- estr->buff = (char*)realloc(estr->buff, estr->size);
- }
-
- memcpy(estr->buff+estr->len, data, len);
- estr->len += len;
- estr->buff[estr->len]='\0';
-}
-
void doris_confile_ctx_reset(struct doris_confile_ctx *ctx)
{
//�������ֲ���
@@ -75,12 +52,12 @@ void doris_confile_ctx_destry(struct doris_confile_ctx *ctx)
ctx->httpctx = NULL;
}
-void doris_update_new_version(struct doris_instance *instance)
+void doris_update_new_version(struct doris_csum_instance *instance)
{
instance->cur_version = instance->new_version;
}
-void doris_request_restart_timer(struct doris_instance *instance, time_t wait_s)
+void doris_request_restart_timer(struct doris_csum_instance *instance, time_t wait_s)
{
struct timeval tv;
@@ -89,7 +66,7 @@ void doris_request_restart_timer(struct doris_instance *instance, time_t wait_s)
event_add(&instance->timer_fetch, &tv);
}
-void doris_fetch_next_confile_meta(struct doris_instance *instance)
+void doris_fetch_next_confile_meta(struct doris_csum_instance *instance)
{
cJSON *cur_a_item, *sub;
@@ -125,7 +102,7 @@ void doris_fetch_next_confile_meta(struct doris_instance *instance)
void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp)
{
- struct doris_instance *instance = (struct doris_instance *)userp;
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
const char *pos_colon;
size_t datalen;
char buffer[64];
@@ -180,7 +157,7 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code
void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp)
{
- struct doris_instance *instance = (struct doris_instance *)userp;
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
if(code!=CURLE_OK || (instance->ctx.res_code!=200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206))
{
@@ -193,10 +170,10 @@ void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, lo
instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes;
}
-void doris_http_fetch_confile(struct doris_instance *instance);
+void doris_http_fetch_confile(struct doris_csum_instance *instance);
void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, void *userp)
{
- struct doris_instance *instance = (struct doris_instance *)userp;
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
char md5buffer[64];
bool direct_fail=false;
@@ -284,7 +261,7 @@ out_md5:
doris_request_restart_timer(instance, instance->param->retry_interval);
}
-void doris_http_fetch_confile(struct doris_instance *instance)
+void doris_http_fetch_confile(struct doris_csum_instance *instance)
{
struct doris_http_callback curlcbs;
char metauri[128], range[64]={0};
@@ -319,7 +296,7 @@ void doris_http_fetch_confile(struct doris_instance *instance)
void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp)
{
- struct doris_instance *instance = (struct doris_instance *)userp;
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
//check code only once
if(instance->ctx.res_code != 0)
@@ -331,14 +308,14 @@ void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, lon
if(res_code != 200)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "No new meta found, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d",
- instance->ctx.server, instance->cur_version, instance->req_version, code);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "business: %s, No new meta found, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d",
+ instance->args.bizname, instance->ctx.server, instance->cur_version, instance->req_version, code);
}
}
void doris_http_meta_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp)
{
- struct doris_instance *instance = (struct doris_instance *)userp;
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
if(code!=CURLE_OK || res_code!=200)
{
@@ -350,14 +327,14 @@ void doris_http_meta_body_cb(const char *ptr, size_t bytes, CURLcode code, long
void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void *userp)
{
- struct doris_instance *instance = (struct doris_instance *)userp;
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
cJSON *sub;
int64_t new_version;
if(res!=CURLE_OK)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Request meta failed, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s",
- instance->ctx.server, instance->cur_version, instance->req_version, res_code, err);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Request meta failed, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s",
+ instance->args.bizname, instance->ctx.server, instance->cur_version, instance->req_version, res_code, err);
goto out_error;
}
@@ -369,16 +346,16 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void
instance->meta = cJSON_Parse(instance->estr.buff);
if(instance->meta == NULL)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, server: %s, req_version=%lu, invalid json: %s",
- instance->ctx.server, instance->req_version, instance->estr.buff);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Parse meta failed, server: %s, req_version=%lu, invalid json: %s",
+ instance->args.bizname, instance->ctx.server, instance->req_version, instance->estr.buff);
goto out_error;
}
sub = cJSON_GetObjectItem(instance->meta, "version");
new_version = sub->valuedouble;
if(new_version <= instance->cur_version)
{
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "An older version received, abandon it. server: %s, cur_version=%lu, invalid json: %s",
- instance->ctx.server, instance->cur_version, instance->estr.buff);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, An older version received, abandon it. server: %s, cur_version=%lu, invalid json: %s",
+ instance->args.bizname, instance->ctx.server, instance->cur_version, instance->estr.buff);
cJSON_Delete(instance->meta);
instance->meta = NULL;
goto out_error;
@@ -386,8 +363,8 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void
instance->new_version = new_version;
instance->req_version = instance->new_version;
instance->statistic.field[DRS_FS_FILED_RES_META] += 1;
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, server: %s, cur_version=%lu, newjson: %s",
- instance->ctx.server, instance->cur_version, instance->estr.buff);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "business: %s, NEW_META found, server: %s, cur_version=%lu, newjson: %s",
+ instance->args.bizname, instance->ctx.server, instance->cur_version, instance->estr.buff);
instance->cbs.version_start(instance, instance->meta, instance->cbs.userdata);
instance->array = cJSON_GetObjectItem(instance->meta, "configs");
@@ -405,13 +382,21 @@ out_error:
doris_request_restart_timer(instance, instance->param->retry_interval);
easy_string_destroy(&instance->estr);
doris_confile_ctx_destry(&instance->ctx);
+ if(res_code==304 && instance->cbs.version_updated!=NULL) //�汾�����ͬ��
+ {
+ instance->cbs.version_updated(instance, instance->cbs.userdata);
+ }
+ if(res_code==300 && instance->param->client_sync_on) //������а�;�еİ汾�ϴ�
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Server is busy processing version requests, waiting it done...", instance->args.bizname);
+ }
}
-static void doris_http_fetch_meta(struct doris_instance *instance)
+static void doris_http_fetch_meta(struct doris_csum_instance *instance)
{
u_int64_t balance_seed;
struct doris_http_callback curlcbs;
- char metauri[128];
+ char metauri[128], cur_version[128];
balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) |
(((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF);
@@ -439,6 +424,13 @@ static void doris_http_fetch_meta(struct doris_instance *instance)
instance->req_version = instance->cur_version + 1; //ֻ�а汾���³ɹ���cur_version�Ż����
if(instance->ctx.httpctx != NULL)
{
+ if(instance->param->client_sync_on)
+ {
+ //����������ͬ�����ѣ����ڱ���Ǵ�Client����
+ sprintf(cur_version, "X-Doris-Sync-Current-Version: %lu", instance->cur_version);
+ doris_http_ctx_add_header(instance->ctx.httpctx, cur_version);
+ }
+
snprintf(metauri, 128, "configmeta?version=%lu&business=%s", instance->req_version, instance->args.bizname);
if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri))
{
@@ -464,7 +456,7 @@ static void doris_http_fetch_meta(struct doris_instance *instance)
static void instance_fetch_cfg_timer_cb(int fd, short kind, void *userp)
{
- struct doris_instance *instance = (struct doris_instance *)userp;
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
switch(instance->status)
{
@@ -482,7 +474,7 @@ static void instance_fetch_cfg_timer_cb(int fd, short kind, void *userp)
static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp)
{
- struct doris_parameter *param=(struct doris_parameter *)userp;
+ struct doris_csum_param *param=(struct doris_csum_param *)userp;
struct timeval tv;
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_MST_CNN_SRV], 0, FS_OP_SET, param->param_master->connected_hosts);
@@ -503,7 +495,7 @@ static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp)
evtimer_add(&param->fs_timer_output, &tv);
}
-static int doris_client_register_field_stat(struct doris_parameter *param, void *runtime_log, struct event_base *evbase)
+static int doris_client_register_field_stat(struct doris_csum_param *param, void *runtime_log, struct event_base *evbase)
{
const char *field_names[FSSTAT_DORIS_FILED_NUM]={"ReqFail", "ReqMetas", "ResMetas", "ResNoNew", "ReqFiles",
"ResFiles", "ResFrags", "ResFragErr", "ResBytes", "ResVerErr", "ReqBackup1", "ReqBackup2"};
@@ -549,19 +541,44 @@ static int doris_client_register_field_stat(struct doris_parameter *param, void
return 0;
}
-struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog)
+u_int32_t doris_csum_param_get_refernces(struct doris_csum_param *param)
{
- struct doris_parameter *param;
+ pthread_mutex_lock(&param->mutex_lock);
+ u_int32_t references = param->references;
+ pthread_mutex_unlock(&param->mutex_lock);
+ return references;
+}
- param = (struct doris_parameter *)calloc(1, sizeof(struct doris_parameter));
+void doris_csum_parameter_destroy(struct doris_csum_param *param)
+{
+ evtimer_del(&param->fs_timer_output);
+ FS_stop(&param->fsstat_handle);
+ doris_http_parameter_destroy(param->param_master);
+ if(param->param_backup1 != NULL)
+ {
+ doris_http_parameter_destroy(param->param_backup1);
+ }
+ if(param->param_backup2 != NULL)
+ {
+ doris_http_parameter_destroy(param->param_backup2);
+ }
+ free(param);
+}
+
+struct doris_csum_param *doris_csum_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog)
+{
+ struct doris_csum_param *param;
+
+ param = (struct doris_csum_param *)calloc(1, sizeof(struct doris_csum_param));
param->manage_evbase = manage_evbase;
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fail_retry_interval", &param->retry_interval, 10);
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fragmet_size", &param->fetch_frag_size, 5242880);
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_confile_max_tries", &param->fetch_max_tries, 3);
+ MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "master_slave_sync_on", &param->client_sync_on, 0);
- MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_appname", param->fsstat_appname, 16, "DorisClient");
- MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_filepath", param->fsstat_filepath, 256, "./log/doris_client.fs");
+ MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_appname", param->fsstat_appname, 16, "DrsCsmClient");
+ MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_filepath", param->fsstat_filepath, 256, "./log/doris_client_csm.fs");
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fsstat_log_interval", &param->fsstat_period, 10);
MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_print_mode", &param->fsstat_print_mode, 1);
MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_dst_ip", param->fsstat_dst_ip, 64, "127.0.0.1");
@@ -578,14 +595,15 @@ struct doris_parameter *doris_parameter_new(const char *confile, struct event_ba
}
param->param_backup1 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup1_server", manage_evbase, runtimelog);
param->param_backup2 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup2_server", manage_evbase, runtimelog);
+ pthread_mutex_init(&param->mutex_lock, NULL);
return param;
}
static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp)
{
- struct doris_instance *instance = (struct doris_instance *)userp;
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
struct timeval tv;
- struct doris_statistics incr_statistic;
+ struct doris_csum_statistics incr_statistic;
long long *plast_statistic = (long long*)&instance->statistic_last;
long long *pnow_statistic = (long long*)&instance->statistic;
long long *pinc_statistic = (long long*)&incr_statistic;
@@ -594,9 +612,9 @@ static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp)
http_sessions += caculate_http_sessions_sum(instance->httpins_master);
http_sessions += caculate_http_sessions_sum(instance->httpins_backup1);
http_sessions += caculate_http_sessions_sum(instance->httpins_backup2);
- instance->statistic.field[DRS_FS_STAT_HTTP_SESSIONS] = http_sessions;
+ instance->statistic.status[DRS_FS_STAT_HTTP_SESSIONS] = http_sessions;
- for(u_int32_t i=0; i<sizeof(struct doris_statistics)/sizeof(long long); i++)
+ for(u_int32_t i=0; i<sizeof(struct doris_csum_statistics)/sizeof(long long); i++)
{
pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i];
}
@@ -615,13 +633,38 @@ static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp)
event_add(&instance->timer_statistic, &tv);
}
-struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase,
+struct doris_csum_param *doris_csum_instance_get_param(struct doris_csum_instance *instance)
+{
+ return instance->param;
+}
+
+void doris_csum_instance_destroy(struct doris_csum_instance *instance)
+{
+ pthread_mutex_lock(&instance->param->mutex_lock);
+ instance->param->references--;
+ pthread_mutex_unlock(&instance->param->mutex_lock);
+
+ evtimer_del(&instance->timer_fetch);
+ evtimer_del(&instance->timer_statistic);
+ /*doris_http_instance_destroy(instance->httpins_master);
+ if(instance->httpins_backup1 != NULL)
+ {
+ doris_http_instance_destroy(instance->httpins_backup1);
+ }
+ if(instance->httpins_backup2 != NULL)
+ {
+ doris_http_instance_destroy(instance->httpins_backup2);
+ }*/
+ free(instance);
+}
+
+struct doris_csum_instance *doris_csum_instance_new(struct doris_csum_param *param, struct event_base *worker_evbase,
struct doris_callbacks *cbs, struct doris_arguments *args, void *runtimelog)
{
- struct doris_instance *instance;
+ struct doris_csum_instance *instance;
struct timeval tv;
- instance = (struct doris_instance *)calloc(1, sizeof(struct doris_instance));
+ instance = (struct doris_csum_instance *)calloc(1, sizeof(struct doris_csum_instance));
instance->param = param;
instance->worker_evbase = worker_evbase;
instance->runtime_log = runtimelog;
@@ -646,6 +689,10 @@ struct doris_instance *doris_instance_new(struct doris_parameter *param, struct
instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog);
}
+ pthread_mutex_lock(&param->mutex_lock);
+ param->references++;
+ pthread_mutex_unlock(&param->mutex_lock);
+
evtimer_assign(&instance->timer_statistic, worker_evbase, doris_instance_statistic_timer_cb, instance);
tv.tv_sec = param->fsstat_period;
tv.tv_usec = 0;
diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h
index 58376ba..a236135 100644
--- a/client/doris_client_fetch.h
+++ b/client/doris_client_fetch.h
@@ -1,19 +1,13 @@
#ifndef __DORIS_CLIENT_FETCH_IN_H__
#define __DORIS_CLIENT_FETCH_IN_H__
+#include <pthread.h>
#include <openssl/md5.h>
#include <MESA/field_stat2.h>
-#include "doris_client.h"
+#include "doris_consumer_client.h"
#include "doris_client_http.h"
-struct easy_string
-{
- char* buff;
- size_t len;
- size_t size;
-};
-
enum FETCH_CFG_STATUS
{
FETCH_STATUS_IDLE=0,
@@ -21,11 +15,14 @@ enum FETCH_CFG_STATUS
FETCH_STATUS_FILE,
};
-struct doris_parameter
+struct doris_csum_param
{
u_int32_t retry_interval;
u_int32_t fetch_frag_size;
u_int32_t fetch_max_tries;
+ u_int32_t client_sync_on;
+ pthread_mutex_t mutex_lock;
+ u_int32_t references;
struct doris_http_parameter *param_master;
struct doris_http_parameter *param_backup1;
@@ -76,7 +73,7 @@ struct doris_confile_ctx
size_t contl_total;
};
-struct doris_instance
+struct doris_csum_instance
{
struct doris_callbacks cbs;
struct doris_arguments args;
@@ -102,9 +99,9 @@ struct doris_instance
struct event_base *worker_evbase;
struct event timer_fetch;
- struct doris_parameter *param;
+ struct doris_csum_param *param;
struct event timer_statistic;
- struct doris_statistics statistic, statistic_last;
+ struct doris_csum_statistics statistic, statistic_last;
void *runtime_log;
};
diff --git a/client/doris_client_http.cpp b/client/doris_client_http.cpp
index fbd9181..a63ac91 100644
--- a/client/doris_client_http.cpp
+++ b/client/doris_client_http.cpp
@@ -21,6 +21,38 @@
#include "doris_client_http.h"
+void easy_string_destroy(struct easy_string *estr)
+{
+ if(estr->buff != NULL)
+ {
+ free(estr->buff);
+ estr->buff = NULL;
+ estr->len = estr->size = 0;
+ }
+}
+
+void easy_string_savedata(struct easy_string *estr, const char *data, size_t len)
+{
+ if(estr->size-estr->len < len+1)
+ {
+ estr->size += len*4+1;
+ estr->buff = (char*)realloc(estr->buff, estr->size);
+ }
+
+ memcpy(estr->buff+estr->len, data, len);
+ estr->len += len;
+ estr->buff[estr->len]='\0';
+}
+
+static inline void drsclient_set_sockopt_keepalive(int sd, int keepidle, int keepintvl, int keepcnt)
+{
+ int keepalive = 1;
+ setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (void*)&keepalive, sizeof(keepalive));
+ setsockopt(sd, SOL_TCP, TCP_KEEPIDLE, (void*)&keepidle, sizeof(keepidle));
+ setsockopt(sd, SOL_TCP, TCP_KEEPINTVL, (void*)&keepintvl, sizeof(keepintvl));
+ setsockopt(sd, SOL_TCP, TCP_KEEPCNT, (void*)&keepcnt, sizeof(keepcnt));
+}
+
int32_t param_get_connected_hosts(struct doris_http_parameter *param)
{
return param->connected_hosts;
@@ -199,6 +231,7 @@ static void client_bufferevent_error_cb(struct bufferevent *bev, short event, vo
conhash_insert_dest_host(balance);
assert(balance->param->connected_hosts > 0);
assert(balance->param->failed_hosts >= 0);
+ drsclient_set_sockopt_keepalive(bufferevent_getfd(bev), 10, 5, 2);
}
else
{
@@ -303,6 +336,24 @@ static int32_t doris_launch_group_connection(struct doris_http_parameter *param,
return 0;
}
+void doris_http_parameter_destroy(struct doris_http_parameter *param)
+{
+ for(u_int32_t i=0; i<param->ipgroup.dstaddr_num; i++) //����
+ {
+ if(evtimer_pending(&param->balance[i].timer_detect, NULL))
+ {
+ evtimer_del(&param->balance[i].timer_detect);
+ }
+ if(param->balance[i].bev != NULL)
+ {
+ bufferevent_free(param->balance[i].bev);
+ }
+ }
+ conhash_instance_free(param->conhash);
+ free(param->ipgroup.dstaddrs);
+ free(param);
+}
+
struct doris_http_parameter *doris_http_parameter_new(const char* profile_path, const char* section, struct event_base* evbase, void *runtime_log)
{
struct doris_http_parameter *param;
@@ -352,6 +403,38 @@ struct doris_http_parameter *doris_http_parameter_new(const char* profile_path,
return param;
}
+void doris_http_instance_destroy(struct doris_http_instance *instance)
+{
+ map<u_int32_t, doris_curl_multihd*>::iterator iter;
+ struct doris_curl_multihd *multihd;
+ CURLMsg *msg;
+ int msgs_left;
+ struct doris_http_ctx *ctx;
+ CURL *easy;
+
+ for(iter=instance->server_hosts->begin(); iter!=instance->server_hosts->end(); )
+ {
+ multihd = iter->second;
+
+ while((msg = curl_multi_info_read(multihd->multi_hd, &msgs_left)))
+ {
+ easy = msg->easy_handle;
+ curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx);
+ curl_multi_remove_handle(multihd->multi_hd, easy);
+ curl_easy_cleanup(easy);
+ ctx->curl = NULL;
+ ctx->transfering = 0;
+ ctx->res = CURLE_ABORTED_BY_CALLBACK;
+ ctx->res_code = 0;
+ ctx->cb.transfer_done_cb(ctx->res, 0, ctx->error, ctx->cb.userp);
+ }
+ curl_multi_cleanup(multihd->multi_hd);
+
+ instance->server_hosts->erase(iter++);
+ }
+ free(instance);
+}
+
struct doris_http_instance *doris_http_instance_new(struct doris_http_parameter *param, struct event_base* evbase, void *runtimelog)
{
struct doris_http_instance *instance;
diff --git a/client/doris_client_http.h b/client/doris_client_http.h
index 6f829ab..4dd3550 100644
--- a/client/doris_client_http.h
+++ b/client/doris_client_http.h
@@ -28,6 +28,15 @@ using namespace std;
#define DEFAULT_HOST_CAPACITY 4
#define LOAD_BALANC_VIRT_TIMES 16
+struct easy_string
+{
+ char* buff;
+ size_t len;
+ size_t size;
+};
+void easy_string_destroy(struct easy_string *estr);
+void easy_string_savedata(struct easy_string *estr, const char *data, size_t len);
+
enum TCP_CONNECTION_STATUS
{
TCP_STATUS_IDLE=0,
@@ -83,7 +92,6 @@ struct doris_http_parameter
struct doris_http_instance
{
struct event_base* evbase;
- SSL_CTX *ssl_instance;
void *privdata;
map<u_int32_t, doris_curl_multihd*> *server_hosts;
@@ -102,7 +110,9 @@ int32_t param_get_connected_hosts(struct doris_http_parameter *param);
int32_t param_get_failed_hosts(struct doris_http_parameter *param);
struct doris_http_parameter *doris_http_parameter_new(const char* profile_path, const char* section, struct event_base* evbase, void *runtime_log);
+void doris_http_parameter_destroy(struct doris_http_parameter *param);
struct doris_http_instance *doris_http_instance_new(struct doris_http_parameter *param, struct event_base* evbase, void *runtimelog);
+void doris_http_instance_destroy(struct doris_http_instance *instance);
#endif
diff --git a/client/doris_client_produce.cpp b/client/doris_client_produce.cpp
new file mode 100644
index 0000000..d8fcedc
--- /dev/null
+++ b/client/doris_client_produce.cpp
@@ -0,0 +1,518 @@
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <time.h>
+#include <string.h>
+#include <openssl/md5.h>
+#include <cjson/cJSON.h>
+
+#include <MESA/MESA_prof_load.h>
+
+#include "doris_client_produce.h"
+
+void doris_prod_upload_ctx_destroy(struct doris_upload_ctx *ctx)
+{
+ doris_http_ctx_destroy(ctx->httpctx);
+ free(ctx);
+}
+
+static enum PROD_VEROP_RES version_common_result_assign_val(CURLcode res, long res_code)
+{
+ if(res != CURLE_OK)
+ {
+ return VERSIONOP_CURL_ERROR;
+ }
+
+ switch(res_code)
+ {
+ case 200: return VERSIONOP_RES_OK;
+ case 201: return VERSIONOP_RES_OK; //�ļ��ֶ��ظ��ϴ��Ż᷵��201
+ default: return VERSIONOP_RES_ERROR;
+ }
+}
+
+void version_cancel_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp)
+{
+ struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
+ enum PROD_VEROP_RES result;
+
+ ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1;
+
+ result = version_common_result_assign_val(res, res_code);
+
+ if(result != VERSIONOP_RES_OK)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, version cancel sync failed, res_code: %ld, err: %s", ctx->business, res_code, err);
+ }
+ else
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "business: %s, version cancel sync succ, res_code: %ld", ctx->business, res_code);
+ }
+
+ if(ctx->vercancel_cb != NULL)
+ {
+ ctx->vercancel_cb(result, ctx->userdata);
+ }
+}
+
+int32_t doris_prod_version_cancel(struct doris_upload_ctx *ctx, void (*vercancel_cb)(enum PROD_VEROP_RES result, void *userdata), void *userdata)
+{
+ struct doris_http_callback cb;
+ char uri[256];
+
+ ctx->instance->statistic.field[DRS_FSPRD_FILED_VERCANCEL] += 1;
+ ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1;
+
+ ctx->vercancel_cb = vercancel_cb;
+ ctx->userdata = userdata;
+
+ cb.userp = ctx;
+ cb.header_cb = NULL;
+ cb.write_cb = NULL;
+ cb.read_process_cb = NULL;
+ cb.transfer_done_cb = version_cancel_transfer_done_cb;
+ doris_http_ctx_reset(ctx->httpctx, &cb);
+
+ if(ctx->instance->param->client_sync_on)
+ {
+ doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1");
+ }
+ snprintf(uri, sizeof(uri), "version/cancel?token=%s", ctx->token);
+ return doris_http_launch_post_request(ctx->httpctx, uri, NULL, 0);
+}
+
+void version_end_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp)
+{
+ struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
+ const char *pos_colon;
+ char buffer[64];
+ int datalen;
+
+ if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL)
+ {
+ return ;
+ }
+ datalen = pos_colon - start;
+ switch(datalen)
+ {
+ case 13:
+ if(!strncasecmp(start, "X-Set-Version:", 14))
+ {
+ memcpy(buffer, start+14, bytes-14);
+ buffer[bytes-14] = '\0';
+ ctx->res_version = atol(buffer);
+ }
+ break;
+ default: break;
+ }
+}
+
+void version_end_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp)
+{
+ struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
+ enum PROD_VEROP_RES result;
+
+ ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1;
+
+ result = version_common_result_assign_val(res, res_code);
+
+ if(result != VERSIONOP_RES_OK)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, version end sync failed, res_code: %ld, err: %s", ctx->business, res_code, err);
+ }
+ else
+ {
+ assert(ctx->res_version != 0);
+ MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_INFO, "business: %s, version end sync succ, res_code: %ld, new version: %lu", ctx->business, res_code, ctx->res_version);
+ }
+
+ if(ctx->verend_cb != NULL)
+ {
+ ctx->verend_cb(result, ctx->res_version, ctx->userdata);
+ }
+}
+
+int32_t doris_prod_version_end(struct doris_upload_ctx *ctx,
+ void (*verend_cb)(enum PROD_VEROP_RES result, int64_t version, void *userdata), void *userdata)
+{
+ struct doris_http_callback cb;
+ char uri[256];
+
+ ctx->instance->statistic.field[DRS_FSPRD_FILED_VEREND] += 1;
+ ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1;
+
+ ctx->verend_cb = verend_cb;
+ ctx->userdata = userdata;
+
+ cb.userp = ctx;
+ cb.header_cb = version_end_header_cb;
+ cb.write_cb = NULL;
+ cb.read_process_cb = NULL;
+ cb.transfer_done_cb = version_end_transfer_done_cb;
+ doris_http_ctx_reset(ctx->httpctx, &cb);
+
+ if(ctx->instance->param->client_sync_on)
+ {
+ doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1");
+ }
+ snprintf(uri, sizeof(uri), "version/finish?token=%s", ctx->token);
+ return doris_http_launch_post_request(ctx->httpctx, uri, NULL, 0);
+}
+
+void upload_frag_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp)
+{
+ struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
+ enum PROD_VEROP_RES result;
+
+ ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1;
+
+ result = version_common_result_assign_val(res, res_code);
+
+ if(result != VERSIONOP_RES_OK)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, upload frag sync failed, res_code: %ld, err: %s", ctx->business, res_code, err);
+ }
+ else
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "business: %s, upload frag sync succ, filename: %s, offset: %lu", ctx->business, ctx->filename, ctx->req_offset);
+ }
+
+ if(ctx->upfrag_cb != NULL)
+ {
+ ctx->upfrag_cb(result, ctx->userdata);
+ }
+}
+
+int32_t do_doris_prod_upload_with_cb(struct doris_upload_ctx *ctx, char *data, size_t size,
+ struct table_meta *meta, const char *uri)
+{
+ struct doris_http_callback cb;
+
+ ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1;
+
+ cb.userp = ctx;
+ cb.header_cb = NULL;
+ cb.write_cb = NULL;
+ cb.read_process_cb = NULL;
+ cb.transfer_done_cb = upload_frag_transfer_done_cb;
+ doris_http_ctx_reset(ctx->httpctx, &cb);
+
+ if(ctx->instance->param->client_sync_on)
+ {
+ doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1");
+ }
+ if(meta->userregion != NULL)
+ {
+ doris_http_ctx_add_header_kvstr(ctx->httpctx, "X-User-Info", meta->userregion);
+ }
+ doris_http_ctx_add_header_kvint(ctx->httpctx, "X-Config-Num", meta->cfgnum);
+ doris_http_ctx_add_header_kvstr(ctx->httpctx, "Content-MD5", meta->md5);
+
+ return doris_http_launch_put_request_data(ctx->httpctx, uri, data, size);
+}
+
+int32_t doris_prod_upload_frag_with_cb(struct doris_upload_ctx *ctx, char *data, size_t size, size_t offset,
+ bool last, struct table_meta *meta, void (*upfrag_cb)(enum PROD_VEROP_RES result, void *userdata), void *userdata)
+{
+ char uri[256];
+
+ ctx->instance->statistic.field[DRS_FSPRD_FILED_FILEFRAG] += 1;
+
+ ctx->upfrag_cb = upfrag_cb;
+ ctx->userdata = userdata;
+ ctx->req_offset = offset;
+ snprintf(ctx->filename, 256, "%s", ctx->filename);
+
+ if(last)
+ {
+ snprintf(uri, sizeof(uri), "filefrag/upload?token=%s&tablename=%s&filename=%s&offset=%lu&last=true",
+ ctx->token, meta->tablename, meta->filename, offset);
+ }
+ else
+ {
+ snprintf(uri, sizeof(uri), "filefrag/upload?token=%s&tablename=%s&filename=%s&offset=%lu",
+ ctx->token, meta->tablename, meta->filename, offset);
+ }
+ return do_doris_prod_upload_with_cb(ctx, data, size, meta, uri);
+}
+
+int32_t doris_prod_upload_once_with_cb(struct doris_upload_ctx *ctx, char *data, size_t size,
+ struct table_meta *meta, void (*upfrag_cb)(enum PROD_VEROP_RES result, void *userdata), void *userdata)
+{
+ char uri[256];
+
+ ctx->instance->statistic.field[DRS_FSPRD_FILED_FILEONCE] += 1;
+
+ ctx->upfrag_cb = upfrag_cb;
+ ctx->userdata = userdata;
+ ctx->req_offset = 0;
+ snprintf(ctx->filename, 256, "%s", ctx->filename);
+
+ snprintf(uri, sizeof(uri), "fileonce/upload?token=%s&tablename=%s&filename=%s",
+ ctx->token, meta->tablename, meta->filename);
+ return do_doris_prod_upload_with_cb(ctx, data, size, meta, uri);
+}
+
+void verstart_body_write_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp)
+{
+ struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
+
+ easy_string_savedata(&ctx->estr, (const char*)ptr, bytes);
+}
+
+static enum PROD_VERSTART_RES version_start_result_assign_val(CURLcode res, long res_code)
+{
+ if(res != CURLE_OK)
+ {
+ return VERSTART_CURL_ERROR;
+ }
+
+ switch(res_code)
+ {
+ case 200: return VERSTART_RES_OK;
+ case 300: return VERSTART_RES_BUSY;
+ default: return VERSTART_RES_ERROR;
+ }
+}
+
+void verstart_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp)
+{
+ struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
+ cJSON *meta, *token;
+ enum PROD_VERSTART_RES result;
+
+ ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1;
+
+ result = version_start_result_assign_val(res, res_code);
+ switch(result)
+ {
+ case VERSTART_RES_OK:
+ case VERSTART_RES_BUSY: //server����300����token������ֻ���Լ�һ��������Ч��(�Լ�֮ǰ��˻ָ�)
+ meta = cJSON_Parse(ctx->estr.buff);
+ token = cJSON_GetObjectItem(meta, "token");
+ assert(token->valuestring != NULL);
+ snprintf(ctx->token, 64, "%s", token->valuestring);
+ cJSON_Delete(meta);
+ MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "business: %s, version start sync %s, res_code: %ld, body: %s",
+ (result==VERSTART_RES_OK)?"succ":"busy", ctx->business, res_code, ctx->estr.buff);
+ break;
+
+ case VERSTART_RES_ERROR:
+ case VERSTART_CURL_ERROR:
+ MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, version start sync failed, res_code: %ld, err: %s",
+ ctx->business, res_code, err);
+ break;
+ default: assert(0);break;
+ }
+
+ if(ctx->verstart_cb != NULL)
+ {
+ ctx->verstart_cb(result, ctx->estr.buff, ctx->userdata);
+ }
+ easy_string_destroy(&ctx->estr);
+}
+
+struct doris_upload_ctx *doris_prod_upload_ctx_new(struct doris_prod_instance *instance,const char *business, int32_t cfgtype)
+{
+ struct doris_upload_ctx *ctx;
+ struct doris_http_callback cb;
+
+ if(cfgtype!=1 && cfgtype!=2)
+ {
+ return NULL;
+ }
+ ctx = (struct doris_upload_ctx *)calloc(1, sizeof(struct doris_upload_ctx));
+ snprintf(ctx->business, 32, "%s", business);
+ ctx->instance = instance;
+ ctx->cfg_type = cfgtype;
+
+ cb.userp = ctx;
+ cb.header_cb = NULL;
+ cb.write_cb = verstart_body_write_cb;
+ cb.read_process_cb = NULL;
+ cb.transfer_done_cb = verstart_transfer_done_cb;
+ if(NULL == (ctx->httpctx = doris_http_ctx_new(instance->http_instance, &cb, rand(), NULL, 0)))
+ {
+ free(ctx);
+ return NULL;
+ }
+ return ctx;
+}
+
+int32_t doris_prod_version_start_with_cb(struct doris_upload_ctx *ctx,
+ void (*verstart_cb)(enum PROD_VERSTART_RES result, const char *body, void *userdata), void *userdata)
+{
+ char uri[256];
+
+ ctx->userdata = userdata;
+ ctx->verstart_cb = verstart_cb;
+
+ if(ctx->instance->param->client_sync_on)
+ {
+ doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1");
+ }
+
+ snprintf(uri, sizeof(uri), "version/start?business=%s&type=%d", ctx->business, ctx->cfg_type);
+ if(doris_http_launch_post_request(ctx->httpctx, uri, NULL, 0))
+ {
+ free(ctx);
+ return -1;
+ }
+ ctx->instance->statistic.field[DRS_FSPRD_FILED_VERSTART] += 1;
+ ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1;
+ return 0;
+}
+
+int32_t doris_prod_version_start(struct doris_upload_ctx *ctx)
+{
+ return doris_prod_version_start_with_cb(ctx, NULL, NULL);
+}
+
+static void doris_prod_fsoutput_timer_cb(int fd, short kind, void *userp)
+{
+ struct doris_prod_param *param=(struct doris_prod_param *)userp;
+ struct timeval tv;
+
+ FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FSPRD_STAT_CNNED_SERVERS], 0, FS_OP_SET, param->param->connected_hosts);
+ FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FSPRD_STAT_FAILED_SERVERS], 0, FS_OP_SET, param->param->failed_hosts);
+ FS_passive_output(param->fsstat_handle);
+
+ tv.tv_sec = param->fsstat_period;
+ tv.tv_usec = 0;
+ evtimer_add(&param->fs_timer_output, &tv);
+}
+
+static int doris_prod_register_fsstat(struct doris_prod_param *param, void *runtime_log, struct event_base *evbase)
+{
+ const char *field_names[FSSTAT_DORIS_PRD_FILED_NUM]={"VerStart", "VerEnd", "VerCancel", "FileFrag", "FileOnce"};
+ const char *status_names[FSSTAT_DORIS_PRD_STATUS_NUM]={"ServerCnned", "ServerFail", "MemoryUsed", "HttpSession", "ReqSession"};
+ struct timeval tv;
+ int value;
+
+ param->fsstat_handle = FS_create_handle();
+ FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1);
+ if(param->fsstat_print_mode == 1)
+ {
+ FS_set_para(param->fsstat_handle, PRINT_MODE, &param->fsstat_print_mode, sizeof(param->fsstat_print_mode));
+ }
+ else
+ {
+ FS_set_para(param->fsstat_handle, PRINT_MODE, &param->fsstat_print_mode, sizeof(param->fsstat_print_mode));
+ value = 1;
+ FS_set_para(param->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value));
+ }
+ value = param->fsstat_period;
+ FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value));
+ value = 0;
+ FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value));
+ FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1);
+ FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1);
+ FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, &param->fsstat_dst_port, sizeof(param->fsstat_dst_port));
+
+ for(int i=0; i<FSSTAT_DORIS_PRD_FILED_NUM; i++)
+ {
+ param->fsstat_field[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]);
+ }
+ for(int i=0; i<FSSTAT_DORIS_PRD_STATUS_NUM; i++)
+ {
+ param->fsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]);
+ }
+ FS_start(param->fsstat_handle);
+
+ evtimer_assign(&param->fs_timer_output, evbase, doris_prod_fsoutput_timer_cb, param);
+ tv.tv_sec = param->fsstat_period;
+ tv.tv_usec = 0;
+ evtimer_add(&param->fs_timer_output, &tv);
+ return 0;
+}
+
+struct doris_prod_param *doris_prod_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog)
+{
+ struct doris_prod_param *param;
+
+ param = (struct doris_prod_param *)calloc(1, sizeof(struct doris_prod_param));
+ param->manage_evbase = manage_evbase;
+
+ MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "upload_fragmet_size", &param->upload_frag_size, 5242880);
+ MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "master_slave_sync_on", &param->client_sync_on, 0);
+
+ MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_appname", param->fsstat_appname, 16, "DrsPrdClient");
+ MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_filepath_p", param->fsstat_filepath, 256, "./log/doris_client_prd.fs");
+ MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fsstat_log_interval", &param->fsstat_period, 10);
+ MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_print_mode", &param->fsstat_print_mode, 1);
+ MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_dst_ip", param->fsstat_dst_ip, 64, "127.0.0.1");
+ MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_dst_port", &param->fsstat_dst_port, 8125);
+
+ /*ͬ��ʱֻ��˫������*/
+ param->param = doris_http_parameter_new(confile, "DORIS_CLIENT.produce", manage_evbase, runtimelog);
+ if(param->param == NULL)
+ {
+ return NULL;
+ }
+ assert(param->param->ipgroup.dstaddr_num == 1);
+ if(doris_prod_register_fsstat(param, runtimelog, manage_evbase))
+ {
+ return NULL;
+ }
+ return param;
+}
+
+static void doris_prod_statistic_timer_cb(int fd, short kind, void *userp)
+{
+ struct doris_prod_instance *instance = (struct doris_prod_instance *)userp;
+ struct timeval tv;
+ struct doris_prod_statistics incr_statistic;
+ long long *plast_statistic = (long long*)&instance->statistic_last;
+ long long *pnow_statistic = (long long*)&instance->statistic;
+ long long *pinc_statistic = (long long*)&incr_statistic;
+
+ instance->statistic.status[DRS_FSPRD_STAT_HTTP_SESSIONS] = caculate_http_sessions_sum(instance->http_instance);
+
+ for(u_int32_t i=0; i<sizeof(struct doris_prod_statistics)/sizeof(long long); i++)
+ {
+ pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i];
+ }
+ instance->statistic_last = instance->statistic;
+
+ for(u_int32_t i=0; i<FSSTAT_DORIS_PRD_FILED_NUM; i++)
+ {
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field[i], 0, FS_OP_ADD, incr_statistic.field[i]);
+ }
+ for(u_int32_t i=0; i<FSSTAT_DORIS_PRD_STATUS_NUM; i++)
+ {
+ FS_operate(instance->param->fsstat_handle, instance->param->fsstat_status[i], 0, FS_OP_ADD, incr_statistic.status[i]);
+ }
+ tv.tv_sec = instance->param->fsstat_period;
+ tv.tv_usec = 0;
+ event_add(&instance->timer_statistic, &tv);
+}
+
+struct doris_prod_instance *doris_prod_instance_new(struct doris_prod_param *param, struct event_base *worker_evbase, void *runtimelog)
+{
+ struct doris_prod_instance *instance;
+ struct timeval tv;
+
+ instance = (struct doris_prod_instance *)calloc(1, sizeof(struct doris_prod_instance));
+ instance->param = param;
+ instance->worker_evbase = worker_evbase;
+ instance->runtime_log = runtimelog;
+
+ instance->http_instance = doris_http_instance_new(param->param, worker_evbase, runtimelog);
+ if(instance->http_instance == NULL)
+ {
+ return NULL;
+ }
+ srand((int64_t)param);
+
+ evtimer_assign(&instance->timer_statistic, worker_evbase, doris_prod_statistic_timer_cb, instance);
+ tv.tv_sec = param->fsstat_period;
+ tv.tv_usec = 0;
+ evtimer_add(&instance->timer_statistic, &tv);
+ return instance;
+}
+
diff --git a/client/doris_client_produce.h b/client/doris_client_produce.h
new file mode 100644
index 0000000..26cf87e
--- /dev/null
+++ b/client/doris_client_produce.h
@@ -0,0 +1,61 @@
+#ifndef __DORIS_CLIENT_PRODUCE_H__
+#define __DORIS_CLIENT_PRODUCE_H__
+
+#include <openssl/md5.h>
+#include <MESA/field_stat2.h>
+
+#include "doris_producer_client.h"
+#include "doris_client_http.h"
+
+struct doris_prod_param
+{
+ u_int32_t upload_frag_size;
+ u_int32_t client_sync_on;
+
+ struct doris_http_parameter *param;
+ struct event_base *manage_evbase;
+
+ screen_stat_handle_t fsstat_handle;
+ struct event fs_timer_output;
+ char fsstat_dst_ip[64];
+ char fsstat_appname[16];
+ char fsstat_filepath[256];
+ u_int32_t fsstat_period;
+ int32_t fsstat_print_mode;
+ int32_t fsstat_dst_port;
+ int32_t fsstat_field[FSSTAT_DORIS_PRD_FILED_NUM];
+ int32_t fsstat_status[FSSTAT_DORIS_PRD_STATUS_NUM];
+};
+
+struct doris_prod_instance
+{
+ struct doris_prod_param *param;
+ struct doris_http_instance *http_instance;
+
+ struct event_base *worker_evbase;
+ struct event timer_statistic;
+ struct doris_prod_statistics statistic, statistic_last;
+ void *runtime_log;
+};
+
+struct doris_upload_ctx
+{
+ struct doris_http_ctx *httpctx;
+ char token[64];
+ char business[32];
+ char filename[256];
+ struct easy_string estr;
+ int32_t cfg_type;
+ size_t req_offset;
+ int64_t res_version;
+
+ void *userdata;
+ void (*verstart_cb)(enum PROD_VERSTART_RES result, const char *body, void *userdata);
+ void (*upfrag_cb)(enum PROD_VEROP_RES result, void *userdata);
+ void (*verend_cb)(enum PROD_VEROP_RES result, int64_t version, void *userdata);
+ void (*vercancel_cb)(enum PROD_VEROP_RES result, void *userdata);
+ struct doris_prod_instance *instance;
+};
+
+#endif
+
diff --git a/client/doris_client_transfer.cpp b/client/doris_client_transfer.cpp
index d26bba2..b0edcf2 100644
--- a/client/doris_client_transfer.cpp
+++ b/client/doris_client_transfer.cpp
@@ -50,7 +50,7 @@ void doris_http_ctx_destroy(struct doris_http_ctx *ctx)
}
struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance,
- struct doris_http_callback *cb, u_int64_t balance_seed, char *host, int32_t size)
+ struct doris_http_callback *cb, u_int64_t balance_seed, char *host/*OUT*/, int32_t size)
{
struct doris_http_ctx *ctx;
struct doris_curl_multihd *multidata;
@@ -62,8 +62,10 @@ struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance,
}
assert(instance->server_hosts->find(result.bucket_id) != instance->server_hosts->end());
multidata = instance->server_hosts->find(result.bucket_id)->second;
- snprintf(host, size, multidata->host->srvaddr);
-
+ if(host != NULL)
+ {
+ snprintf(host, size, multidata->host->srvaddr);
+ }
ctx = (struct doris_http_ctx *)calloc(1, sizeof(struct doris_http_ctx));
ctx->instance = instance;
ctx->multidata = multidata;
@@ -92,8 +94,8 @@ static inline void curl_set_common_options(CURL *curl, long transfer_timeout, ch
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 1000L);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, transfer_timeout); //���Է��ֶ�������ij���ӽ��տ�ס�����
- //ctx->error="Operation too slow. Less than 100 bytes/sec transferred the last 10 seconds"
- curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 10L);
+ //ctx->error="Operation too slow. Less than 100 bytes/sec transferred the last 30 seconds"
+ curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 30L);
curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 100L);
curl_easy_setopt(curl, CURLOPT_USERAGENT, "Doris Client Linux X64");
}
@@ -136,6 +138,21 @@ void doris_http_ctx_add_header(struct doris_http_ctx *ctx, const char *header)
ctx->headers = curl_slist_append(ctx->headers, header);
}
+/*maximum length 1024*/
+void doris_http_ctx_add_header_kvstr(struct doris_http_ctx *ctx, const char *headername, const char *value)
+{
+ char header[1024];
+ snprintf(header, 1024, "%s: %s", headername, value);
+ ctx->headers = curl_slist_append(ctx->headers, header);
+}
+
+void doris_http_ctx_add_header_kvint(struct doris_http_ctx *ctx, const char *headername, u_int64_t value)
+{
+ char header[1024];
+ snprintf(header, 1024, "%s: %lu", headername, value);
+ ctx->headers = curl_slist_append(ctx->headers, header);
+}
+
int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri)
{
char minio_url[2048];
@@ -143,7 +160,7 @@ int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri)
assert(ctx->curl == NULL);
if(NULL == (ctx->curl=curl_easy_init()))
{
- return -1;
+ assert(0);return -1;
}
if(ctx->instance->param->ssl_connection)
@@ -171,8 +188,7 @@ int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri)
if(CURLM_OK != curl_multi_add_handle(ctx->multidata->multi_hd, ctx->curl))
{
- assert(0);
- return -2;
+ assert(0);return -2;
}
ctx->transfering = 1;
return 0;
@@ -185,7 +201,7 @@ int doris_http_launch_post_request(struct doris_http_ctx *ctx, const char *uri,
assert(ctx->curl == NULL);
if(NULL == (ctx->curl=curl_easy_init()))
{
- return -1;
+ assert(0);return -1;
}
doris_http_ctx_add_header(ctx, "Expect:"); //ע��POST������Expect��ϵ��Ҫ��ȷ����CURLOPT_POSTFIELDSIZE
@@ -219,8 +235,153 @@ int doris_http_launch_post_request(struct doris_http_ctx *ctx, const char *uri,
if(CURLM_OK != curl_multi_add_handle(ctx->multidata->multi_hd, ctx->curl))
{
- assert(0);
- return -2;
+ assert(0);return -2;
+ }
+ ctx->transfering = 1;
+ return 0;
+}
+
+static size_t curl_put_data_request_send_cb(void *ptr, size_t size, size_t count, void *userp)
+{
+ size_t len;
+ struct doris_http_ctx *ctx = (struct doris_http_ctx *)userp;
+
+ if(size==0 || count==0 || ctx->put_offset>=ctx->put_length)
+ {
+ return 0; //��һ������
+ }
+
+ len = ctx->put_length - ctx->put_offset; //ʣ����ϴ��ij���
+ if(len > size * count)
+ {
+ len = size * count;
+ }
+
+ memcpy(ptr, ctx->put_data + ctx->put_offset, len);
+ ctx->put_offset += len;
+
+ if(ctx->cb.read_process_cb != NULL)
+ {
+ ctx->cb.read_process_cb(ctx->put_data, ctx->put_offset, ctx->cb.userp);
+ }
+ return len;
+}
+
+int doris_http_launch_put_request_data(struct doris_http_ctx *ctx, const char *uri, char *data, size_t data_len)
+{
+ char minio_url[2048];
+
+ assert(ctx->curl == NULL);
+ if(NULL == (ctx->curl=curl_easy_init()))
+ {
+ assert(0);return -1;
+ }
+ ctx->put_data = data;
+ ctx->put_length = data_len;
+ ctx->put_offset = 0;
+
+ if(ctx->instance->param->ssl_connection)
+ {
+ snprintf(minio_url, sizeof(minio_url), "https://%s/%s", ctx->multidata->host->srvaddr, uri);
+ curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYPEER, 0L);
+ curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYHOST, 0L);
+ }
+ else
+ {
+ snprintf(minio_url, sizeof(minio_url), "http://%s/%s", ctx->multidata->host->srvaddr, uri);
+ }
+ curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
+ curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
+ curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_write_cb);
+ curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
+ if(ctx->cb.header_cb != NULL)
+ {
+ curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_response_header_cb);
+ curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
+ }
+ curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put_length);
+ curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_data_request_send_cb);
+ curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
+
+ curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
+ curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
+ curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
+
+ if(CURLM_OK != curl_multi_add_handle(ctx->multidata->multi_hd, ctx->curl))
+ {
+ assert(0);return -2;
+ }
+ ctx->transfering = 1;
+ return 0;
+}
+
+static size_t curl_put_evbuf_request_send_cb(void *ptr, size_t size, size_t count, void *userp)
+{
+ size_t len, space=size*count, send_len;
+ struct doris_http_ctx *ctx = (struct doris_http_ctx *)userp;
+
+ if(size==0 || count==0 || ctx->put_offset>=ctx->put_length)
+ {
+ return 0;
+ }
+
+ len = ctx->put_length - ctx->put_offset;
+ if(len > space)
+ {
+ len = space;
+ }
+
+ send_len = evbuffer_remove(ctx->put_evbuf, ptr, len);
+ assert(send_len>0);
+ ctx->put_offset += send_len;
+
+ ctx->cb.read_process_cb(ctx->put_evbuf, ctx->put_offset, ctx->cb.userp);
+ return send_len;
+}
+
+int doris_http_launch_put_request_evbuf(struct doris_http_ctx *ctx, const char *uri, struct evbuffer *evbuf, size_t data_len)
+{
+ char minio_url[2048];
+
+ assert(ctx->curl == NULL);
+ if(NULL == (ctx->curl=curl_easy_init()))
+ {
+ assert(0);return -1;
+ }
+ ctx->put_evbuf = evbuf;
+ ctx->put_length = data_len;
+ ctx->put_offset = 0;
+
+ if(ctx->instance->param->ssl_connection)
+ {
+ snprintf(minio_url, sizeof(minio_url), "https://%s/%s", ctx->multidata->host->srvaddr, uri);
+ curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYPEER, 0L);
+ curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYHOST, 0L);
+ }
+ else
+ {
+ snprintf(minio_url, sizeof(minio_url), "http://%s/%s", ctx->multidata->host->srvaddr, uri);
+ }
+ curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
+ curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
+ curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_write_cb);
+ curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
+ if(ctx->cb.header_cb != NULL)
+ {
+ curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_response_header_cb);
+ curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
+ }
+ curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put_length);
+ curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_evbuf_request_send_cb);
+ curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
+
+ curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
+ curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
+ curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
+
+ if(CURLM_OK != curl_multi_add_handle(ctx->multidata->multi_hd, ctx->curl))
+ {
+ assert(0);return -2;
}
ctx->transfering = 1;
return 0;
diff --git a/client/doris_client_transfer.h b/client/doris_client_transfer.h
index 8cfa4d2..324c9aa 100644
--- a/client/doris_client_transfer.h
+++ b/client/doris_client_transfer.h
@@ -49,9 +49,13 @@ void doris_http_ctx_destroy(struct doris_http_ctx *ctx);
void doris_http_ctx_reset(struct doris_http_ctx *ctx, struct doris_http_callback *cb);
void doris_http_ctx_add_header(struct doris_http_ctx *ctx, const char *header);
+void doris_http_ctx_add_header_kvstr(struct doris_http_ctx *ctx, const char *headername, const char *value);
+void doris_http_ctx_add_header_kvint(struct doris_http_ctx *ctx, const char *headername, u_int64_t value);
int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri);
int doris_http_launch_post_request(struct doris_http_ctx *ctx, const char *uri, const char *data, size_t data_len);
+int doris_http_launch_put_request_data(struct doris_http_ctx *ctx, const char *uri, char *data, size_t data_len);
+int doris_http_launch_put_request_evbuf(struct doris_http_ctx *ctx, const char *uri, struct evbuffer *evbuf, size_t data_len);
long long caculate_http_sessions_sum(const struct doris_http_instance *instance);