diff options
Diffstat (limited to 'client')
| -rw-r--r-- | client/doris_client_fetch.cpp | 175 | ||||
| -rw-r--r-- | client/doris_client_fetch.h | 1 | ||||
| -rw-r--r-- | client/doris_client_transfer.cpp | 40 | ||||
| -rw-r--r-- | client/doris_client_transfer.h | 1 |
4 files changed, 208 insertions, 9 deletions
diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp index ae7cc8e..fee1648 100644 --- a/client/doris_client_fetch.cpp +++ b/client/doris_client_fetch.cpp @@ -385,7 +385,7 @@ out_error: doris_confile_ctx_destry(&instance->ctx); if(res_code==304 && instance->cbs.version_updated!=NULL) //�汾�����ͬ�� { - instance->cbs.version_updated(instance, instance->cbs.userdata); + instance->cbs.version_updated(instance, instance->cur_version, instance->cbs.userdata); } if(res_code==300 && instance->param->client_sync_on) //������а�;�еİ汾�ϴ� { @@ -482,7 +482,154 @@ static void instance_meta_expire_timer_cb(int fd, short kind, void *userp) doris_confile_ctx_destry(&instance->ctx); doris_http_fetch_meta(instance); - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33mbusiness: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname); +} + +void doris_http_head_version_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp) +{ + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; + const char *pos_colon; + char buffer[64]; + int datalen; + + if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL) + { + return ; + } + datalen = pos_colon - start; + switch(datalen) + { + case 16: + if(!strncasecmp(start, "X-Latest-Version:", 17)) + { + memcpy(buffer, start+17, bytes-17); + buffer[bytes-17] = '\0'; + instance->head_version = atol(buffer); + } + break; + default: break; + } + + //check code only once + if(instance->ctx.res_code != 0) + { + return; + } + instance->ctx.res_code = res_code; + assert(res_code != 0); + + if(res_code != 200) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, head version failed, server: %s, curlcode = %d", + instance->args.bizname, instance->ctx.server, code); + } +} + +void doris_http_head_version_done_cb(CURLcode res, long res_code, const char *err, void *userp) +{ + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; + + evtimer_del(&instance->ctx.timer_expires); + if(res != CURLE_OK) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version failed, server: %s, curlcode = %d, error: %s", + instance->args.bizname, instance->ctx.server, res_code, err); + goto out_herror; + } + + if(instance->ctx.res_code != 200 || res_code!=200) + { + goto out_herror; + } + + instance->cur_version = instance->head_version + instance->cur_version; + if(instance->cur_version < 0) + { + instance->cur_version = 0; + } + instance->req_version = instance->cur_version + 1; //TODO + evtimer_assign(&instance->timer_fetch, instance->worker_evbase, instance_fetch_cfg_timer_cb, instance); + evtimer_assign(&instance->ctx.timer_expires, instance->worker_evbase, instance_meta_expire_timer_cb, instance); + doris_http_fetch_meta(instance); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version succ, server: %s, next request meta of version: %ld", + instance->args.bizname, instance->ctx.server, instance->req_version); + return; + +out_herror: + instance->statistic.field[DRS_FS_FILED_RES_NOMETA] += 1; + doris_request_restart_timer(instance, instance->param->retry_interval); + doris_confile_ctx_destry(&instance->ctx); +} + +static void doris_http_head_version(struct doris_csum_instance *instance) +{ + u_int64_t balance_seed; + struct doris_http_callback curlcbs; + char metauri[128]; + struct timeval tv={10, 0}; + + balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) | + (((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF); + + memset(&curlcbs, 0, sizeof(struct doris_http_callback)); + curlcbs.header_cb = doris_http_head_version_header_cb; + curlcbs.write_cb = NULL; + curlcbs.transfer_done_cb = doris_http_head_version_done_cb; + curlcbs.userp = instance; + + instance->array_index = 0; + instance->cur_httpins = instance->httpins_master; + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); + if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL) + { + instance->cur_httpins = instance->httpins_backup1; + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); + } + if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL) + { + instance->cur_httpins = instance->httpins_backup2; + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); + } + + if(instance->ctx.httpctx != NULL) + { + snprintf(metauri, 128, "latestversion?business=%s", instance->args.bizname); + if(!doris_http_launch_head_request(instance->ctx.httpctx, metauri)) + { + instance->status = FETCH_STATUS_META; + instance->statistic.field[DRS_FS_FILED_REQ_META] += 1; + evtimer_add(&instance->ctx.timer_expires, &tv); + } + else + { + instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; + doris_confile_ctx_destry(&instance->ctx); + doris_request_restart_timer(instance, instance->param->retry_interval); + } + if(instance->cur_httpins == instance->httpins_backup1) instance->statistic.field[DRS_FS_FILED_BACKUP1_REQ] += 1; + else if(instance->cur_httpins == instance->httpins_backup2) instance->statistic.field[DRS_FS_FILED_BACKUP2_REQ] += 1; + } + else + { + instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; + doris_request_restart_timer(instance, instance->param->retry_interval); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch version HEAD failed: no active host found"); + } +} + +static void instance_head_version_timer_cb(int fd, short kind, void *userp) +{ + doris_http_head_version((struct doris_csum_instance *)userp); +} + +/*httpsģʽ�£�ʹ��valgrind���У����ַ���GET�����done_cb����ʼ�����õ�����*/ +static void instance_version_expire_timer_cb(int fd, short kind, void *userp) +{ + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; + + doris_confile_ctx_destry(&instance->ctx); + doris_http_head_version(instance); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch version-head wired expired, retry....\033[0m", instance->args.bizname); } static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp) @@ -701,21 +848,31 @@ struct doris_csum_instance *doris_csum_instance_new(struct doris_csum_param *par { instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog); } - evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance); - pthread_mutex_lock(¶m->mutex_lock); param->references++; pthread_mutex_unlock(¶m->mutex_lock); + if(instance->cur_version >= 0) + { + evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance); + evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance); + tv.tv_sec = 3; + tv.tv_usec = 0; + evtimer_add(&instance->timer_fetch, &tv); + } + else + { + evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_version_expire_timer_cb, instance); + evtimer_assign(&instance->timer_fetch, worker_evbase, instance_head_version_timer_cb, instance); + tv.tv_sec = 3; + tv.tv_usec = 0; + evtimer_add(&instance->timer_fetch, &tv); + } + evtimer_assign(&instance->timer_statistic, worker_evbase, doris_instance_statistic_timer_cb, instance); tv.tv_sec = param->fsstat_period; tv.tv_usec = 0; evtimer_add(&instance->timer_statistic, &tv); - - evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance); - tv.tv_sec = 3; - tv.tv_usec = 0; - evtimer_add(&instance->timer_fetch, &tv); return instance; } diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h index 4ef51d9..e15bd27 100644 --- a/client/doris_client_fetch.h +++ b/client/doris_client_fetch.h @@ -86,6 +86,7 @@ struct doris_csum_instance int64_t cur_version; //Ԫ��Ϣ int64_t req_version; //�ļ� int64_t new_version; //�µ�Ԫ��Ϣ + int64_t head_version; struct easy_string estr; cJSON *meta, *array; u_int32_t array_size; diff --git a/client/doris_client_transfer.cpp b/client/doris_client_transfer.cpp index b0edcf2..a71313f 100644 --- a/client/doris_client_transfer.cpp +++ b/client/doris_client_transfer.cpp @@ -153,6 +153,46 @@ void doris_http_ctx_add_header_kvint(struct doris_http_ctx *ctx, const char *hea ctx->headers = curl_slist_append(ctx->headers, header); } +int doris_http_launch_head_request(struct doris_http_ctx *ctx, const char *uri) +{ + char minio_url[2048]; + + assert(ctx->curl == NULL); + if(NULL == (ctx->curl=curl_easy_init())) + { + assert(0);return -1; + } + + curl_easy_setopt(ctx->curl, CURLOPT_NOBODY, 1L); //HEAD���� + if(ctx->instance->param->ssl_connection) + { + snprintf(minio_url, sizeof(minio_url), "https://%s/%s", ctx->multidata->host->srvaddr, uri); + curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYHOST, 0L); + } + else + { + snprintf(minio_url, sizeof(minio_url), "http://%s/%s", ctx->multidata->host->srvaddr, uri); + } + curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); + + if(ctx->headers != NULL) + { + curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); + } + curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_response_header_cb); + curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); + curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); + + if(CURLM_OK != curl_multi_add_handle(ctx->multidata->multi_hd, ctx->curl)) + { + assert(0); return -2; + } + ctx->transfering = 1; + return 0; +} + int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri) { char minio_url[2048]; diff --git a/client/doris_client_transfer.h b/client/doris_client_transfer.h index 324c9aa..309f3fe 100644 --- a/client/doris_client_transfer.h +++ b/client/doris_client_transfer.h @@ -52,6 +52,7 @@ void doris_http_ctx_add_header(struct doris_http_ctx *ctx, const char *header); void doris_http_ctx_add_header_kvstr(struct doris_http_ctx *ctx, const char *headername, const char *value); void doris_http_ctx_add_header_kvint(struct doris_http_ctx *ctx, const char *headername, u_int64_t value); +int doris_http_launch_head_request(struct doris_http_ctx *ctx, const char *uri); int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri); int doris_http_launch_post_request(struct doris_http_ctx *ctx, const char *uri, const char *data, size_t data_len); int doris_http_launch_put_request_data(struct doris_http_ctx *ctx, const char *uri, char *data, size_t data_len); |
