diff options
Diffstat (limited to 'client/doris_client_fetch.cpp')
| -rw-r--r-- | client/doris_client_fetch.cpp | 175 |
1 files changed, 166 insertions, 9 deletions
diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp index ae7cc8e..fee1648 100644 --- a/client/doris_client_fetch.cpp +++ b/client/doris_client_fetch.cpp @@ -385,7 +385,7 @@ out_error: doris_confile_ctx_destry(&instance->ctx); if(res_code==304 && instance->cbs.version_updated!=NULL) //�汾�����ͬ�� { - instance->cbs.version_updated(instance, instance->cbs.userdata); + instance->cbs.version_updated(instance, instance->cur_version, instance->cbs.userdata); } if(res_code==300 && instance->param->client_sync_on) //������а�;�еİ汾�ϴ� { @@ -482,7 +482,154 @@ static void instance_meta_expire_timer_cb(int fd, short kind, void *userp) doris_confile_ctx_destry(&instance->ctx); doris_http_fetch_meta(instance); - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33mbusiness: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname); +} + +void doris_http_head_version_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp) +{ + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; + const char *pos_colon; + char buffer[64]; + int datalen; + + if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL) + { + return ; + } + datalen = pos_colon - start; + switch(datalen) + { + case 16: + if(!strncasecmp(start, "X-Latest-Version:", 17)) + { + memcpy(buffer, start+17, bytes-17); + buffer[bytes-17] = '\0'; + instance->head_version = atol(buffer); + } + break; + default: break; + } + + //check code only once + if(instance->ctx.res_code != 0) + { + return; + } + instance->ctx.res_code = res_code; + assert(res_code != 0); + + if(res_code != 200) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, head version failed, server: %s, curlcode = %d", + instance->args.bizname, instance->ctx.server, code); + } +} + +void doris_http_head_version_done_cb(CURLcode res, long res_code, const char *err, void *userp) +{ + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; + + evtimer_del(&instance->ctx.timer_expires); + if(res != CURLE_OK) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version failed, server: %s, curlcode = %d, error: %s", + instance->args.bizname, instance->ctx.server, res_code, err); + goto out_herror; + } + + if(instance->ctx.res_code != 200 || res_code!=200) + { + goto out_herror; + } + + instance->cur_version = instance->head_version + instance->cur_version; + if(instance->cur_version < 0) + { + instance->cur_version = 0; + } + instance->req_version = instance->cur_version + 1; //TODO + evtimer_assign(&instance->timer_fetch, instance->worker_evbase, instance_fetch_cfg_timer_cb, instance); + evtimer_assign(&instance->ctx.timer_expires, instance->worker_evbase, instance_meta_expire_timer_cb, instance); + doris_http_fetch_meta(instance); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version succ, server: %s, next request meta of version: %ld", + instance->args.bizname, instance->ctx.server, instance->req_version); + return; + +out_herror: + instance->statistic.field[DRS_FS_FILED_RES_NOMETA] += 1; + doris_request_restart_timer(instance, instance->param->retry_interval); + doris_confile_ctx_destry(&instance->ctx); +} + +static void doris_http_head_version(struct doris_csum_instance *instance) +{ + u_int64_t balance_seed; + struct doris_http_callback curlcbs; + char metauri[128]; + struct timeval tv={10, 0}; + + balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) | + (((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF); + + memset(&curlcbs, 0, sizeof(struct doris_http_callback)); + curlcbs.header_cb = doris_http_head_version_header_cb; + curlcbs.write_cb = NULL; + curlcbs.transfer_done_cb = doris_http_head_version_done_cb; + curlcbs.userp = instance; + + instance->array_index = 0; + instance->cur_httpins = instance->httpins_master; + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); + if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL) + { + instance->cur_httpins = instance->httpins_backup1; + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); + } + if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL) + { + instance->cur_httpins = instance->httpins_backup2; + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); + } + + if(instance->ctx.httpctx != NULL) + { + snprintf(metauri, 128, "latestversion?business=%s", instance->args.bizname); + if(!doris_http_launch_head_request(instance->ctx.httpctx, metauri)) + { + instance->status = FETCH_STATUS_META; + instance->statistic.field[DRS_FS_FILED_REQ_META] += 1; + evtimer_add(&instance->ctx.timer_expires, &tv); + } + else + { + instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; + doris_confile_ctx_destry(&instance->ctx); + doris_request_restart_timer(instance, instance->param->retry_interval); + } + if(instance->cur_httpins == instance->httpins_backup1) instance->statistic.field[DRS_FS_FILED_BACKUP1_REQ] += 1; + else if(instance->cur_httpins == instance->httpins_backup2) instance->statistic.field[DRS_FS_FILED_BACKUP2_REQ] += 1; + } + else + { + instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; + doris_request_restart_timer(instance, instance->param->retry_interval); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch version HEAD failed: no active host found"); + } +} + +static void instance_head_version_timer_cb(int fd, short kind, void *userp) +{ + doris_http_head_version((struct doris_csum_instance *)userp); +} + +/*httpsģʽ�£�ʹ��valgrind���У����ַ���GET�����done_cb����ʼ�����õ�����*/ +static void instance_version_expire_timer_cb(int fd, short kind, void *userp) +{ + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; + + doris_confile_ctx_destry(&instance->ctx); + doris_http_head_version(instance); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch version-head wired expired, retry....\033[0m", instance->args.bizname); } static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp) @@ -701,21 +848,31 @@ struct doris_csum_instance *doris_csum_instance_new(struct doris_csum_param *par { instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog); } - evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance); - pthread_mutex_lock(¶m->mutex_lock); param->references++; pthread_mutex_unlock(¶m->mutex_lock); + if(instance->cur_version >= 0) + { + evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance); + evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance); + tv.tv_sec = 3; + tv.tv_usec = 0; + evtimer_add(&instance->timer_fetch, &tv); + } + else + { + evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_version_expire_timer_cb, instance); + evtimer_assign(&instance->timer_fetch, worker_evbase, instance_head_version_timer_cb, instance); + tv.tv_sec = 3; + tv.tv_usec = 0; + evtimer_add(&instance->timer_fetch, &tv); + } + evtimer_assign(&instance->timer_statistic, worker_evbase, doris_instance_statistic_timer_cb, instance); tv.tv_sec = param->fsstat_period; tv.tv_usec = 0; evtimer_add(&instance->timer_statistic, &tv); - - evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance); - tv.tv_sec = 3; - tv.tv_usec = 0; - evtimer_add(&instance->timer_fetch, &tv); return instance; } |
