summaryrefslogtreecommitdiff
path: root/client/doris_client_fetch.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'client/doris_client_fetch.cpp')
-rw-r--r--client/doris_client_fetch.cpp175
1 files changed, 166 insertions, 9 deletions
diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp
index ae7cc8e..fee1648 100644
--- a/client/doris_client_fetch.cpp
+++ b/client/doris_client_fetch.cpp
@@ -385,7 +385,7 @@ out_error:
doris_confile_ctx_destry(&instance->ctx);
if(res_code==304 && instance->cbs.version_updated!=NULL) //�汾�����ͬ��
{
- instance->cbs.version_updated(instance, instance->cbs.userdata);
+ instance->cbs.version_updated(instance, instance->cur_version, instance->cbs.userdata);
}
if(res_code==300 && instance->param->client_sync_on) //������а�;�еİ汾�ϴ�
{
@@ -482,7 +482,154 @@ static void instance_meta_expire_timer_cb(int fd, short kind, void *userp)
doris_confile_ctx_destry(&instance->ctx);
doris_http_fetch_meta(instance);
- MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33mbusiness: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname);
+}
+
+void doris_http_head_version_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp)
+{
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
+ const char *pos_colon;
+ char buffer[64];
+ int datalen;
+
+ if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL)
+ {
+ return ;
+ }
+ datalen = pos_colon - start;
+ switch(datalen)
+ {
+ case 16:
+ if(!strncasecmp(start, "X-Latest-Version:", 17))
+ {
+ memcpy(buffer, start+17, bytes-17);
+ buffer[bytes-17] = '\0';
+ instance->head_version = atol(buffer);
+ }
+ break;
+ default: break;
+ }
+
+ //check code only once
+ if(instance->ctx.res_code != 0)
+ {
+ return;
+ }
+ instance->ctx.res_code = res_code;
+ assert(res_code != 0);
+
+ if(res_code != 200)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, head version failed, server: %s, curlcode = %d",
+ instance->args.bizname, instance->ctx.server, code);
+ }
+}
+
+void doris_http_head_version_done_cb(CURLcode res, long res_code, const char *err, void *userp)
+{
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
+
+ evtimer_del(&instance->ctx.timer_expires);
+ if(res != CURLE_OK)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version failed, server: %s, curlcode = %d, error: %s",
+ instance->args.bizname, instance->ctx.server, res_code, err);
+ goto out_herror;
+ }
+
+ if(instance->ctx.res_code != 200 || res_code!=200)
+ {
+ goto out_herror;
+ }
+
+ instance->cur_version = instance->head_version + instance->cur_version;
+ if(instance->cur_version < 0)
+ {
+ instance->cur_version = 0;
+ }
+ instance->req_version = instance->cur_version + 1; //TODO
+ evtimer_assign(&instance->timer_fetch, instance->worker_evbase, instance_fetch_cfg_timer_cb, instance);
+ evtimer_assign(&instance->ctx.timer_expires, instance->worker_evbase, instance_meta_expire_timer_cb, instance);
+ doris_http_fetch_meta(instance);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version succ, server: %s, next request meta of version: %ld",
+ instance->args.bizname, instance->ctx.server, instance->req_version);
+ return;
+
+out_herror:
+ instance->statistic.field[DRS_FS_FILED_RES_NOMETA] += 1;
+ doris_request_restart_timer(instance, instance->param->retry_interval);
+ doris_confile_ctx_destry(&instance->ctx);
+}
+
+static void doris_http_head_version(struct doris_csum_instance *instance)
+{
+ u_int64_t balance_seed;
+ struct doris_http_callback curlcbs;
+ char metauri[128];
+ struct timeval tv={10, 0};
+
+ balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) |
+ (((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF);
+
+ memset(&curlcbs, 0, sizeof(struct doris_http_callback));
+ curlcbs.header_cb = doris_http_head_version_header_cb;
+ curlcbs.write_cb = NULL;
+ curlcbs.transfer_done_cb = doris_http_head_version_done_cb;
+ curlcbs.userp = instance;
+
+ instance->array_index = 0;
+ instance->cur_httpins = instance->httpins_master;
+ instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
+ if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL)
+ {
+ instance->cur_httpins = instance->httpins_backup1;
+ instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
+ }
+ if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL)
+ {
+ instance->cur_httpins = instance->httpins_backup2;
+ instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
+ }
+
+ if(instance->ctx.httpctx != NULL)
+ {
+ snprintf(metauri, 128, "latestversion?business=%s", instance->args.bizname);
+ if(!doris_http_launch_head_request(instance->ctx.httpctx, metauri))
+ {
+ instance->status = FETCH_STATUS_META;
+ instance->statistic.field[DRS_FS_FILED_REQ_META] += 1;
+ evtimer_add(&instance->ctx.timer_expires, &tv);
+ }
+ else
+ {
+ instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
+ doris_confile_ctx_destry(&instance->ctx);
+ doris_request_restart_timer(instance, instance->param->retry_interval);
+ }
+ if(instance->cur_httpins == instance->httpins_backup1) instance->statistic.field[DRS_FS_FILED_BACKUP1_REQ] += 1;
+ else if(instance->cur_httpins == instance->httpins_backup2) instance->statistic.field[DRS_FS_FILED_BACKUP2_REQ] += 1;
+ }
+ else
+ {
+ instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
+ doris_request_restart_timer(instance, instance->param->retry_interval);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch version HEAD failed: no active host found");
+ }
+}
+
+static void instance_head_version_timer_cb(int fd, short kind, void *userp)
+{
+ doris_http_head_version((struct doris_csum_instance *)userp);
+}
+
+/*httpsģʽ�£�ʹ��valgrind���У����ַ���GET�����done_cb����ʼ���޷��õ�����*/
+static void instance_version_expire_timer_cb(int fd, short kind, void *userp)
+{
+ struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
+
+ doris_confile_ctx_destry(&instance->ctx);
+ doris_http_head_version(instance);
+ MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch version-head wired expired, retry....\033[0m", instance->args.bizname);
}
static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp)
@@ -701,21 +848,31 @@ struct doris_csum_instance *doris_csum_instance_new(struct doris_csum_param *par
{
instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog);
}
- evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance);
-
pthread_mutex_lock(&param->mutex_lock);
param->references++;
pthread_mutex_unlock(&param->mutex_lock);
+ if(instance->cur_version >= 0)
+ {
+ evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance);
+ evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance);
+ tv.tv_sec = 3;
+ tv.tv_usec = 0;
+ evtimer_add(&instance->timer_fetch, &tv);
+ }
+ else
+ {
+ evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_version_expire_timer_cb, instance);
+ evtimer_assign(&instance->timer_fetch, worker_evbase, instance_head_version_timer_cb, instance);
+ tv.tv_sec = 3;
+ tv.tv_usec = 0;
+ evtimer_add(&instance->timer_fetch, &tv);
+ }
+
evtimer_assign(&instance->timer_statistic, worker_evbase, doris_instance_statistic_timer_cb, instance);
tv.tv_sec = param->fsstat_period;
tv.tv_usec = 0;
evtimer_add(&instance->timer_statistic, &tv);
-
- evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance);
- tv.tv_sec = 3;
- tv.tv_usec = 0;
- evtimer_add(&instance->timer_fetch, &tv);
return instance;
}