diff options
Diffstat (limited to 'server/doris_server_receive.cpp')
| -rw-r--r-- | server/doris_server_receive.cpp | 291 |
1 files changed, 192 insertions, 99 deletions
diff --git a/server/doris_server_receive.cpp b/server/doris_server_receive.cpp index 6d30623..56643a7 100644 --- a/server/doris_server_receive.cpp +++ b/server/doris_server_receive.cpp @@ -222,7 +222,6 @@ void doris_config_file_version_error(struct doris_csum_instance *instance, void fclose(business->fp_cfg_file); remove(business->cfg_file_path); } - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", business->bizname, business->version); } void doris_config_file_cfgfile_start(struct doris_csum_instance *instance, @@ -320,7 +319,7 @@ void doris_config_mem_version_finish(struct doris_csum_instance *instance, void { cfgver_handle = config_version_handle_new(); cfgver_handle->latest_version = business->cur_vernode->version; - cfgver_handle->version_num = 1; + cfgver_handle->version_mem_num = 1; TAILQ_INSERT_TAIL(&cfgver_handle->version_head, business->cur_vernode, version_node); cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head); cfgver_handle->version2node->insert(make_pair(cfgver_handle->latest_version, business->cur_vernode)); @@ -343,14 +342,23 @@ void doris_config_mem_version_finish(struct doris_csum_instance *instance, void cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head); } /*�����ļ���������N���汾��Ԫ��Ϣȫ����*/ - if(business->cache_max_versions!=0 && cfgver_handle->version_num>=business->cache_max_versions) + if(business->cache_max_versions!=0 && cfgver_handle->version_mem_num>=business->cache_max_versions) { - config_version_node_free_content(cfgver_handle->oldest_vernode); - cfgver_handle->oldest_vernode = TAILQ_NEXT(cfgver_handle->oldest_vernode, version_node); + if(!business->persistence_write_on) + { + TAILQ_REMOVE(&cfgver_handle->version_head, cfgver_handle->oldest_vernode, version_node); + config_version_node_cleanup(cfgver_handle->oldest_vernode); + cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head); + } + else + { + config_version_node_free_content(cfgver_handle->oldest_vernode); + cfgver_handle->oldest_vernode = TAILQ_NEXT(cfgver_handle->oldest_vernode, version_node); + } } else { - cfgver_handle->version_num += 1; + cfgver_handle->version_mem_num += 1; } pthread_rwlock_unlock(&business->rwlock); } @@ -578,8 +586,13 @@ void doris_config_localmem_cfgfile_finish(struct doris_csum_instance *instance, /*�ޱ��ϵ�к�������������ʱ�ص�*/ void doris_config_version_start(struct doris_csum_instance *instance, cJSON *meta, void *userdata) { - doris_config_common_version_start((struct doris_business *)userdata, meta); - doris_config_file_version_start(instance, meta, userdata); + struct doris_business *business=(struct doris_business *)userdata; + + doris_config_common_version_start(business, meta); + if(business->persistence_write_on) + { + doris_config_file_version_start(instance, meta, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_version_start(instance, meta, userdata); @@ -588,7 +601,12 @@ void doris_config_version_start(struct doris_csum_instance *instance, cJSON *met void doris_config_version_finish(struct doris_csum_instance *instance, void *userdata) { - doris_config_file_version_finish(instance, userdata); + struct doris_business *business=(struct doris_business *)userdata; + + if(business->persistence_write_on) + { + doris_config_file_version_finish(instance, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_version_finish(instance, userdata); @@ -598,12 +616,18 @@ void doris_config_version_finish(struct doris_csum_instance *instance, void *use void doris_config_version_error(struct doris_csum_instance *instance, void *userdata) { + struct doris_business *business=(struct doris_business *)userdata; + doris_config_common_version_error((struct doris_business *)userdata); - doris_config_file_version_error(instance, userdata); + if(business->persistence_write_on) + { + doris_config_file_version_error(instance, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_version_error(instance, userdata); } + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", business->bizname, business->version); } void doris_config_cfgfile_start(struct doris_csum_instance *instance, @@ -612,7 +636,10 @@ void doris_config_cfgfile_start(struct doris_csum_instance *instance, struct doris_business *business=(struct doris_business *)userdata; doris_config_common_cfgfile_start((struct doris_business *)userdata, meta->cfgnum); - doris_config_file_cfgfile_start(instance, meta, localpath, userdata); + if(business->persistence_write_on) + { + doris_config_file_cfgfile_start(instance, meta, localpath, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_start(instance, meta, business->cfg_file_path, userdata); @@ -621,7 +648,12 @@ void doris_config_cfgfile_start(struct doris_csum_instance *instance, void doris_config_cfgfile_update(struct doris_csum_instance *instance, const char *data, size_t len, void *userdata) { - doris_config_file_cfgfile_update(instance, data, len, userdata); + struct doris_business *business=(struct doris_business *)userdata; + + if(business->persistence_write_on) + { + doris_config_file_cfgfile_update(instance, data, len, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_update(instance, data, len, userdata); @@ -630,8 +662,13 @@ void doris_config_cfgfile_update(struct doris_csum_instance *instance, const cha void doris_config_cfgfile_finish(struct doris_csum_instance *instance, const char *md5, void *userdata) { + struct doris_business *business=(struct doris_business *)userdata; + doris_config_common_cfgfile_finish((struct doris_business *)userdata); - doris_config_file_cfgfile_finish(instance, userdata); + if(business->persistence_write_on) + { + doris_config_file_cfgfile_finish(instance, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_finish(instance, md5, userdata); @@ -656,29 +693,30 @@ void* thread_doris_client_recv_cfg(void *arg) business->source_from = RECV_WAY_IDX_FILE; business->worker_evbase = client_evbase; - scanner = doris_index_file_scanner(0); - - /*Retaive latest config to memory from Stored configs*/ - doris_cbs.version_start = doris_config_localmem_version_start; - doris_cbs.version_finish = doris_config_localmem_version_finish; - doris_cbs.version_error = doris_config_localmem_version_error; - doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start; - doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update; - doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish; - doris_cbs.version_updated= NULL; - doris_cbs.userdata = business; - - snprintf(stored_path, 512, "%s/full/index", business->store_path_root); - if(business->saves_when_fulldel > 0) + if(business->persistence_write_on) { - get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel); - } - update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); - snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); - do { - update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); - }while(update_type != CFG_UPDATE_TYPE_NONE); + scanner = doris_index_file_scanner(0); + /*Retaive latest config to memory from Stored configs*/ + doris_cbs.version_start = doris_config_localmem_version_start; + doris_cbs.version_finish = doris_config_localmem_version_finish; + doris_cbs.version_error = doris_config_localmem_version_error; + doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start; + doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update; + doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish; + doris_cbs.version_updated= NULL; + doris_cbs.userdata = business; + snprintf(stored_path, 512, "%s/full/index", business->store_path_root); + if(business->saves_when_fulldel > 0) + { + get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel); + } + update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); + snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); + do { + update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); + }while(update_type != CFG_UPDATE_TYPE_NONE); + } /*Check new configs*/ doris_cbs.version_start = doris_config_version_start; @@ -687,10 +725,12 @@ void* thread_doris_client_recv_cfg(void *arg) doris_cbs.cfgfile_start = doris_config_cfgfile_start; doris_cbs.cfgfile_update = doris_config_cfgfile_update; doris_cbs.cfgfile_finish = doris_config_cfgfile_finish; + doris_cbs.version_updated= NULL; + doris_cbs.userdata = business; business->source_from = RECV_WAY_DRS_CLIENT; memset(&doris_args, 0, sizeof(struct doris_arguments)); - doris_args.current_version = scanner->cur_version; + doris_args.current_version = (business->persistence_write_on)?scanner->cur_version:(0-business->cache_max_versions); sprintf(doris_args.bizname, "%s", business->bizname); instance = doris_csum_instance_new(business->param_csum, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime); if(instance == NULL) @@ -860,6 +900,12 @@ void prod_server_generate_token(struct doris_business *business, char *token/*OU pthread_mutex_unlock(&g_doris_server_info.mutex_lock); } +/*TODO: �������ɱȶԷ��汾��С�İ汾*/ +int64_t prod_server_generate_version(struct doris_business *business) +{ + return ++business->genversion_seq; +} + void business_resume_sync_peer_normal(struct doris_business *business) { u_int32_t business_post_ups; @@ -896,7 +942,7 @@ void business_set_sync_peer_abnormal(struct doris_business *business) { return; } - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mcluster sync error, please check slave status!!!\033[0m\n"); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]cluster sync error, please check slave status!!!\033[0m\n"); if(0 == atomic_set(&business->ready_to_sync, 0) || business->listener_prod==0) { @@ -1009,19 +1055,22 @@ void http_config_direct_version_cancel(struct version_list_node *vernode, struct { doris_prod_upload_ctx_destroy(vernode->synctx); } - if(vernode->fp_idx_file != NULL) - { - fclose(vernode->fp_idx_file); - remove(vernode->tmp_index_path); - } - if(vernode->cur_table!=NULL && vernode->cur_table->fp_cfg_file != NULL) + if(business->persistence_write_on) { - fclose(vernode->cur_table->fp_cfg_file); - remove(vernode->cur_table->localpath); - } - TAILQ_FOREACH(tablenode, &vernode->table_head, table_node) - { - remove(tablenode->localpath); + if(vernode->fp_idx_file != NULL) + { + fclose(vernode->fp_idx_file); + remove(vernode->tmp_index_path); + } + if(vernode->cur_table!=NULL && vernode->cur_table->fp_cfg_file != NULL) + { + fclose(vernode->cur_table->fp_cfg_file); + remove(vernode->cur_table->localpath); + } + TAILQ_FOREACH(tablenode, &vernode->table_head, table_node) + { + remove(tablenode->localpath); + } } config_version_node_cleanup(vernode); if(business->concurrency_allowed && evtimer_pending(&vernode->timer_expire, NULL)) @@ -1050,7 +1099,7 @@ void prod_sync_vercancel_result_cb(enum PROD_VEROP_RES result, void *userdata) case VERSIONOP_RES_ERROR: evhttp_send_error(vernode->req, 500, "version cancel sync error res_code"); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version cancel sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, version cancel sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); break; case VERSIONOP_CURL_ERROR: @@ -1115,24 +1164,27 @@ void doris_config_post_version_finish(struct doris_business *business, struct ve { assert(newversion > vernode->version); vernode->version = newversion; - - if(vernode->cfg_type == CFG_UPDATE_TYPE_FULL) - { - snprintf(business->inc_index_path, 256, "%s/inc/index/full_config_index.%010lu", business->store_path_root, vernode->version); - snprintf(business->full_index_path, 256, "%s/full/index/full_config_index.%010lu", business->store_path_root, vernode->version); - } - else - { - snprintf(business->inc_index_path, 256, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, vernode->version); - } - /*HTTP postʱ����汾����ÿ�������Լ�����ʱ֪ͨ�ļ��������ñ����ļ��Ĺرպ���*/ - sprintf(business->tmp_index_path, "%s", vernode->tmp_index_path); business->version = vernode->version; business->type = vernode->cfg_type; - business->fp_idx_file = vernode->fp_idx_file; - doris_config_file_version_finish(NULL, business); - vernode->fp_idx_file = NULL; + if(business->persistence_write_on) + { + if(vernode->cfg_type == CFG_UPDATE_TYPE_FULL) + { + snprintf(business->inc_index_path, 256, "%s/inc/index/full_config_index.%010lu", business->store_path_root, vernode->version); + snprintf(business->full_index_path, 256, "%s/full/index/full_config_index.%010lu", business->store_path_root, vernode->version); + } + else + { + snprintf(business->inc_index_path, 256, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, vernode->version); + } + /*HTTP postʱ����汾����ÿ�������Լ�����ʱ֪ͨ�ļ��������ñ����ļ��Ĺرպ���*/ + sprintf(business->tmp_index_path, "%s", vernode->tmp_index_path); + business->fp_idx_file = vernode->fp_idx_file; + doris_config_file_version_finish(NULL, business); + vernode->fp_idx_file = NULL; + } + if(g_doris_server_info.consumer_port) { business->cur_vernode = vernode; @@ -1158,7 +1210,7 @@ void doris_config_post_version_finish(struct doris_business *business, struct ve void http_config_direct_version_finish(struct version_list_node *vernode, struct evhttp_request *req, int64_t set_version) { struct doris_business *business=vernode->business; - char version[32], token[64]; + char version[32], token[64], lvdbkey[40]; int64_t new_version; if(business->concurrency_allowed && evtimer_pending(&vernode->timer_expire, NULL)) @@ -1168,12 +1220,22 @@ void http_config_direct_version_finish(struct version_list_node *vernode, struct if(set_version == 0) { - new_version = business->cfgver_head->latest_version + 1; + new_version = prod_server_generate_version(business); } else { - new_version = set_version; + new_version = business->genversion_seq = set_version; + } + /*����leveldb���ɰ汾�ŵ�����*/ + if(!business->persistence_write_on) + { + sprintf(lvdbkey, "%s_verseq", business->bizname); + if(!doris_kvdb_update_keystr_valint(g_doris_server_info.kvdbhandle, lvdbkey, new_version)) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert] business: %s, update levelDB failed!\033[0m\n", business->bizname); + } } + sprintf(token, "%s", vernode->token); doris_config_post_version_finish(business, vernode, new_version); @@ -1198,7 +1260,7 @@ void prod_sync_verend_result_cb(enum PROD_VEROP_RES result, int64_t version, voi case VERSIONOP_RES_ERROR: evhttp_send_error(vernode->req, 500, "version end sync error res_code"); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version end sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, version end sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); break; case VERSIONOP_CURL_ERROR: @@ -1332,20 +1394,23 @@ void doris_config_post_version_start(struct version_list_node *cur_vernode, cons struct doris_business *business=cur_vernode->business; snprintf(cur_vernode->token, 64, "%s", token); - if(cur_vernode->cfg_type == CFG_UPDATE_TYPE_FULL) - { - snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); - } - else - { - snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); - } - if(NULL==(cur_vernode->fp_idx_file = fopen(cur_vernode->tmp_index_path, "w+"))) + + if(business->persistence_write_on) { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, cur_vernode->tmp_index_path, strerror(errno)); - assert(0); + if(cur_vernode->cfg_type == CFG_UPDATE_TYPE_FULL) + { + snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); + } + else + { + snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); + } + if(NULL==(cur_vernode->fp_idx_file = fopen(cur_vernode->tmp_index_path, "w+"))) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, cur_vernode->tmp_index_path, strerror(errno)); + assert(0); + } } - if(g_doris_server_info.consumer_port) { TAILQ_INIT(&cur_vernode->table_head); @@ -1404,7 +1469,7 @@ void try_restore_from_busy_peer(struct version_list_node *cur_vernode, const cha cur_vernode->req = NULL; if(busy) { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "\033[33mbusiness: %s, restore from busy peer, post master server send response version start: %s\033[0m", business->bizname, body); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "\033[33m[Warning]business: %s, restore from busy peer, post master server send response version start: %s\033[0m", business->bizname, body); } else { @@ -1436,7 +1501,7 @@ void prod_sync_verstart_result_cb(enum PROD_VERSTART_RES result, const char *bod business->cur_vernode = NULL; business->posts_on_the_way--; FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, business->posts_on_the_way); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version start sync error res_code, abandon it. Send 500 response to client.\033[0m", business->bizname); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, version start sync error res_code, abandon it. Send 500 response to client.\033[0m", business->bizname); break; case VERSTART_CURL_ERROR: @@ -1727,12 +1792,14 @@ void doris_config_post_cfgfile_start(struct version_list_node *vernode, struct e meta.cfgnum = vernode->cur_table->cfgnum; meta.size = 0; - vernode->business->type = vernode->cfg_type; - vernode->business->fp_idx_file = vernode->fp_idx_file; - doris_config_file_cfgfile_start(NULL, &meta, NULL, vernode->business); - sprintf(vernode->cur_table->localpath, "%s", vernode->business->cfg_file_path); - vernode->cur_table->fp_cfg_file = vernode->business->fp_cfg_file; - + if(vernode->business->persistence_write_on) + { + vernode->business->type = vernode->cfg_type; + vernode->business->fp_idx_file = vernode->fp_idx_file; + doris_config_file_cfgfile_start(NULL, &meta, NULL, vernode->business); + sprintf(vernode->cur_table->localpath, "%s", vernode->business->cfg_file_path); + vernode->cur_table->fp_cfg_file = vernode->business->fp_cfg_file; + } if(g_doris_server_info.consumer_port) { vernode->cur_table->table_meta = cJSON_CreateObject(); @@ -1749,7 +1816,10 @@ void doris_config_post_cfgfile_start(struct version_list_node *vernode, struct e void doris_config_post_cfgfile_finish(struct version_list_node *vernode, const char *md5str) { doris_config_common_cfgfile_finish(vernode->business); - fclose(vernode->cur_table->fp_cfg_file); + if(vernode->business->persistence_write_on) + { + fclose(vernode->cur_table->fp_cfg_file); + } assert(vernode->cur_table->filesize == 0); vernode->cur_table->filesize = vernode->cur_table->cur_totallen; @@ -1791,11 +1861,14 @@ void http_config_direct_cfgfile_update(struct version_list_node *vernode, struct } if(vernode->cur_table->fragsize > 0) { - writen_len = fwrite(vernode->cur_table->fragcontent, 1, vernode->cur_table->fragsize, vernode->cur_table->fp_cfg_file); - if(writen_len != vernode->cur_table->fragsize) + if(vernode->business->persistence_write_on) { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", vernode->business->bizname, vernode->cur_table->localpath, strerror(errno)); - assert(0); + writen_len = fwrite(vernode->cur_table->fragcontent, 1, vernode->cur_table->fragsize, vernode->cur_table->fp_cfg_file); + if(writen_len != vernode->cur_table->fragsize) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", vernode->business->bizname, vernode->cur_table->localpath, strerror(errno)); + assert(0); + } } if(g_doris_server_info.consumer_port) { @@ -1843,7 +1916,7 @@ void prod_sync_upload_frag_cb(enum PROD_VEROP_RES result,void * userdata) case VERSIONOP_RES_ERROR: evhttp_send_error(vernode->req, 500, "frag sync error res_code"); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, frag sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, frag sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); break; case VERSIONOP_CURL_ERROR: @@ -2050,12 +2123,19 @@ void start_business_http_post_server(struct doris_business *business) } } -void doris_config_version_sync_updated(struct doris_csum_instance *instance, void *userdata) +void doris_config_version_sync_updated(struct doris_csum_instance *instance, int64_t latest_version, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; struct doris_csum_param *param; u_int32_t references, business_post_ups; + if(latest_version) + { + business->genversion_seq = latest_version; + assert(business->cfgver_head->latest_version == latest_version); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[34m[Attention] business %s, HTTP Post server generate version change to: %lu\033[0m", business->bizname, latest_version); + } + /*����consuemer��ͬʱȷ��������ִֻ��һ��*/ param = doris_csum_instance_get_param(instance); doris_csum_instance_destroy(instance); @@ -2091,8 +2171,7 @@ void doris_config_version_sync_updated(struct doris_csum_instance *instance, voi MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_UPING); } assert(business_post_ups <= g_doris_server_info.business_post_num); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[32m******Doris Producer worker for %s starts******\033[0m", business->bizname); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "HttpProducer, doris ready to sync for business: %s\n", business->bizname); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[32m[Info]******Doris Producer worker for %s starts******\033[0m", business->bizname); } /*��thread_doris_client_recv_cfg��������version_updated����*/ @@ -2106,6 +2185,7 @@ void* thread_http_post_recv_cfg(void *arg) struct doris_idxfile_scanner *scanner; enum DORIS_UPDATE_TYPE update_type; char stored_path[512]; + int64_t genversion_seq; prctl(PR_SET_NAME, "http_post"); @@ -2116,7 +2196,7 @@ void* thread_http_post_recv_cfg(void *arg) scanner = doris_index_file_scanner(0); - /*Retaive latest config to memory from Stored configs*/ + /*�����Ƿ����־û��������Զ�һ�±������ã�����Ϊ���ð汾��ʼ��ʹ��*/ doris_cbs.version_start = doris_config_localmem_version_start; doris_cbs.version_finish = doris_config_localmem_version_finish; doris_cbs.version_error = doris_config_localmem_version_error; @@ -2137,6 +2217,18 @@ void* thread_http_post_recv_cfg(void *arg) update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); }while(update_type != CFG_UPDATE_TYPE_NONE); + /*�������ɰ汾����ʼ����*/ + business->genversion_seq = scanner->cur_version; + if(!business->persistence_write_on) + { + sprintf(stored_path, "%s_verseq", business->bizname); + if((genversion_seq = doris_kvdb_get_keystr_valint(g_doris_server_info.kvdbhandle, stored_path)) != 0) + { + business->genversion_seq = genversion_seq; + } + } + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[34m[Attention] business %s, HTTP Post server generate version from: %lu\033[0m\n", business->bizname, business->genversion_seq); + if(g_doris_server_info.cluster_sync_mode) /*Check new configs*/ { doris_cbs.version_start = doris_config_version_start; @@ -2146,10 +2238,11 @@ void* thread_http_post_recv_cfg(void *arg) doris_cbs.cfgfile_update = doris_config_cfgfile_update; doris_cbs.cfgfile_finish = doris_config_cfgfile_finish; doris_cbs.version_updated= doris_config_version_sync_updated; + doris_cbs.userdata = business; business->source_from = RECV_WAY_DRS_CLIENT; memset(&doris_args, 0, sizeof(struct doris_arguments)); - doris_args.current_version = scanner->cur_version; + doris_args.current_version = (business->persistence_write_on)?scanner->cur_version:(0-business->cache_max_versions); sprintf(doris_args.bizname, "%s", business->bizname); instance = doris_csum_instance_new(business->param_csum, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime); if(instance == NULL) |
