summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
Diffstat (limited to 'client')
-rw-r--r--client/doris_client_fetch.cpp175
-rw-r--r--client/doris_client_fetch.h1
-rw-r--r--client/doris_client_transfer.cpp40
-rw-r--r--client/doris_client_transfer.h1
4 files changed, 208 insertions, 9 deletions
diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp
index ae7cc8e..fee1648 100644
--- a/client/doris_client_fetch.cpp
+++ b/client/doris_client_fetch.cpp
@@ -385,7 +385,7 @@ out_error:
doris_confile_ctx_destry(&instance->ctx);
if(res_code==304 && instance->cbs.version_updated!=NULL) //�汾�����ͬ��
{
- instance->cbs.version_updated(instance, instance->cbs.userdata);
+ instance->cbs.version_updated(instance, instance->cur_version, instance->cbs.userdata);
}
if(res_code==300 && instance->param->client_sync_on) //������а�;�еİ汾�ϴ�
{
@@ -482,7 +482,154 @@ static void instance_meta_expire_timer_cb(int fd, short kind, void *userp)
doris_confile_ctx_destry(&instance->ctx);
doris_http_fetch_meta(instance);
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33mbusiness: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname);
+}
+
+void doris_http_head_version_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp)
+{
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
+ const char *pos_colon;
+ char buffer[64];
+ int datalen;
+
+ if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL)
+ {
+ return ;
+ }
+ datalen = pos_colon - start;
+ switch(datalen)
+ {
+ case 16:
+ if(!strncasecmp(start, "X-Latest-Version:", 17))
+ {
+ memcpy(buffer, start+17, bytes-17);
+ buffer[bytes-17] = '\0';
+ instance->head_version = atol(buffer);
+ }
+ break;
+ default: break;
+ }
+
+ //check code only once
+ if(instance->ctx.res_code != 0)
+ {
+ return;
+ }
+ instance->ctx.res_code = res_code;
+ assert(res_code != 0);
+
+ if(res_code != 200)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, head version failed, server: %s, curlcode = %d",
+ instance->args.bizname, instance->ctx.server, code);
+ }
+}
+
+void doris_http_head_version_done_cb(CURLcode res, long res_code, const char *err, void *userp)
+{
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
+
+ evtimer_del(&instance->ctx.timer_expires);
+ if(res != CURLE_OK)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version failed, server: %s, curlcode = %d, error: %s",
+ instance->args.bizname, instance->ctx.server, res_code, err);
+ goto out_herror;
+ }
+
+ if(instance->ctx.res_code != 200 || res_code!=200)
+ {
+ goto out_herror;
+ }
+
+ instance->cur_version = instance->head_version + instance->cur_version;
+ if(instance->cur_version < 0)
+ {
+ instance->cur_version = 0;
+ }
+ instance->req_version = instance->cur_version + 1; //TODO
+ evtimer_assign(&instance->timer_fetch, instance->worker_evbase, instance_fetch_cfg_timer_cb, instance);
+ evtimer_assign(&instance->ctx.timer_expires, instance->worker_evbase, instance_meta_expire_timer_cb, instance);
+ doris_http_fetch_meta(instance);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version succ, server: %s, next request meta of version: %ld",
+ instance->args.bizname, instance->ctx.server, instance->req_version);
+ return;
+
+out_herror:
+ instance->statistic.field[DRS_FS_FILED_RES_NOMETA] += 1;
+ doris_request_restart_timer(instance, instance->param->retry_interval);
+ doris_confile_ctx_destry(&instance->ctx);
+}
+
+static void doris_http_head_version(struct doris_csum_instance *instance)
+{
+ u_int64_t balance_seed;
+ struct doris_http_callback curlcbs;
+ char metauri[128];
+ struct timeval tv={10, 0};
+
+ balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) |
+ (((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF);
+
+ memset(&curlcbs, 0, sizeof(struct doris_http_callback));
+ curlcbs.header_cb = doris_http_head_version_header_cb;
+ curlcbs.write_cb = NULL;
+ curlcbs.transfer_done_cb = doris_http_head_version_done_cb;
+ curlcbs.userp = instance;
+
+ instance->array_index = 0;
+ instance->cur_httpins = instance->httpins_master;
+ instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
+ if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL)
+ {
+ instance->cur_httpins = instance->httpins_backup1;
+ instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
+ }
+ if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL)
+ {
+ instance->cur_httpins = instance->httpins_backup2;
+ instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
+ }
+
+ if(instance->ctx.httpctx != NULL)
+ {
+ snprintf(metauri, 128, "latestversion?business=%s", instance->args.bizname);
+ if(!doris_http_launch_head_request(instance->ctx.httpctx, metauri))
+ {
+ instance->status = FETCH_STATUS_META;
+ instance->statistic.field[DRS_FS_FILED_REQ_META] += 1;
+ evtimer_add(&instance->ctx.timer_expires, &tv);
+ }
+ else
+ {
+ instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
+ doris_confile_ctx_destry(&instance->ctx);
+ doris_request_restart_timer(instance, instance->param->retry_interval);
+ }
+ if(instance->cur_httpins == instance->httpins_backup1) instance->statistic.field[DRS_FS_FILED_BACKUP1_REQ] += 1;
+ else if(instance->cur_httpins == instance->httpins_backup2) instance->statistic.field[DRS_FS_FILED_BACKUP2_REQ] += 1;
+ }
+ else
+ {
+ instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
+ doris_request_restart_timer(instance, instance->param->retry_interval);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch version HEAD failed: no active host found");
+ }
+}
+
+static void instance_head_version_timer_cb(int fd, short kind, void *userp)
+{
+ doris_http_head_version((struct doris_csum_instance *)userp);
+}
+
+/*httpsģʽ�£�ʹ��valgrind���У����ַ���GET�����done_cb����ʼ���޷��õ�����*/
+static void instance_version_expire_timer_cb(int fd, short kind, void *userp)
+{
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
+
+ doris_confile_ctx_destry(&instance->ctx);
+ doris_http_head_version(instance);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch version-head wired expired, retry....\033[0m", instance->args.bizname);
}
static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp)
@@ -701,21 +848,31 @@ struct doris_csum_instance *doris_csum_instance_new(struct doris_csum_param *par
{
instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog);
}
- evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance);
-
pthread_mutex_lock(&param->mutex_lock);
param->references++;
pthread_mutex_unlock(&param->mutex_lock);
+ if(instance->cur_version >= 0)
+ {
+ evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance);
+ evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance);
+ tv.tv_sec = 3;
+ tv.tv_usec = 0;
+ evtimer_add(&instance->timer_fetch, &tv);
+ }
+ else
+ {
+ evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_version_expire_timer_cb, instance);
+ evtimer_assign(&instance->timer_fetch, worker_evbase, instance_head_version_timer_cb, instance);
+ tv.tv_sec = 3;
+ tv.tv_usec = 0;
+ evtimer_add(&instance->timer_fetch, &tv);
+ }
+
evtimer_assign(&instance->timer_statistic, worker_evbase, doris_instance_statistic_timer_cb, instance);
tv.tv_sec = param->fsstat_period;
tv.tv_usec = 0;
evtimer_add(&instance->timer_statistic, &tv);
-
- evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance);
- tv.tv_sec = 3;
- tv.tv_usec = 0;
- evtimer_add(&instance->timer_fetch, &tv);
return instance;
}
diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h
index 4ef51d9..e15bd27 100644
--- a/client/doris_client_fetch.h
+++ b/client/doris_client_fetch.h
@@ -86,6 +86,7 @@ struct doris_csum_instance
int64_t cur_version; //Ԫ��Ϣ
int64_t req_version; //�ļ�
int64_t new_version; //�µ�Ԫ��Ϣ
+ int64_t head_version;
struct easy_string estr;
cJSON *meta, *array;
u_int32_t array_size;
diff --git a/client/doris_client_transfer.cpp b/client/doris_client_transfer.cpp
index b0edcf2..a71313f 100644
--- a/client/doris_client_transfer.cpp
+++ b/client/doris_client_transfer.cpp
@@ -153,6 +153,46 @@ void doris_http_ctx_add_header_kvint(struct doris_http_ctx *ctx, const char *hea
ctx->headers = curl_slist_append(ctx->headers, header);
}
+int doris_http_launch_head_request(struct doris_http_ctx *ctx, const char *uri)
+{
+ char minio_url[2048];
+
+ assert(ctx->curl == NULL);
+ if(NULL == (ctx->curl=curl_easy_init()))
+ {
+ assert(0);return -1;
+ }
+
+ curl_easy_setopt(ctx->curl, CURLOPT_NOBODY, 1L); //HEAD����
+ if(ctx->instance->param->ssl_connection)
+ {
+ snprintf(minio_url, sizeof(minio_url), "https://%s/%s", ctx->multidata->host->srvaddr, uri);
+ curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYPEER, 0L);
+ curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYHOST, 0L);
+ }
+ else
+ {
+ snprintf(minio_url, sizeof(minio_url), "http://%s/%s", ctx->multidata->host->srvaddr, uri);
+ }
+ curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
+
+ if(ctx->headers != NULL)
+ {
+ curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
+ }
+ curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_response_header_cb);
+ curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
+ curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
+ curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
+
+ if(CURLM_OK != curl_multi_add_handle(ctx->multidata->multi_hd, ctx->curl))
+ {
+ assert(0); return -2;
+ }
+ ctx->transfering = 1;
+ return 0;
+}
+
int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri)
{
char minio_url[2048];
diff --git a/client/doris_client_transfer.h b/client/doris_client_transfer.h
index 324c9aa..309f3fe 100644
--- a/client/doris_client_transfer.h
+++ b/client/doris_client_transfer.h
@@ -52,6 +52,7 @@ void doris_http_ctx_add_header(struct doris_http_ctx *ctx, const char *header);
void doris_http_ctx_add_header_kvstr(struct doris_http_ctx *ctx, const char *headername, const char *value);
void doris_http_ctx_add_header_kvint(struct doris_http_ctx *ctx, const char *headername, u_int64_t value);
+int doris_http_launch_head_request(struct doris_http_ctx *ctx, const char *uri);
int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri);
int doris_http_launch_post_request(struct doris_http_ctx *ctx, const char *uri, const char *data, size_t data_len);
int doris_http_launch_put_request_data(struct doris_http_ctx *ctx, const char *uri, char *data, size_t data_len);