summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
author[email protected] <[email protected]>2021-09-08 10:45:47 +0800
committer[email protected] <[email protected]>2021-09-08 10:45:47 +0800
commit6922e3dd5040b8e1cb88a5f2f96d2a5fb8824e71 (patch)
tree609c2be54a364a810e3d4306643c41fa796f72aa /server
parent6386e5de579c42f2730e0c92b8e4cab5b6c57559 (diff)
支持以业务为单位,配置文件不持久化到本地HEADmaster
Diffstat (limited to 'server')
-rw-r--r--server/CMakeLists.txt4
-rw-r--r--server/bin/conf/doris_main.conf1
-rw-r--r--server/doris_server_kvdb.cpp97
-rw-r--r--server/doris_server_kvdb.h18
-rw-r--r--server/doris_server_main.cpp35
-rw-r--r--server/doris_server_main.h7
-rw-r--r--server/doris_server_receive.cpp291
-rw-r--r--server/doris_server_receive.h2
-rw-r--r--server/doris_server_scandir.cpp6
9 files changed, 346 insertions, 115 deletions
diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt
index 92077dc..2032131 100644
--- a/server/CMakeLists.txt
+++ b/server/CMakeLists.txt
@@ -1,10 +1,10 @@
-set (NIRVANA_PLATFORM_SRC doris_server_scandir.cpp doris_server_receive.cpp doris_server_http.cpp doris_server_main.cpp)
+set (NIRVANA_PLATFORM_SRC doris_server_kvdb.cpp doris_server_scandir.cpp doris_server_receive.cpp doris_server_http.cpp doris_server_main.cpp)
add_definitions(-fPIC -Wall -g)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D__FILENAME__='\"$(subst ${CMAKE_CURRENT_SOURCE_DIR}/,,$(abspath $<))\"'")
add_executable(doris ${NIRVANA_PLATFORM_SRC})
-target_link_libraries(doris doris_client_static libMesaMonitor libevent-static libevent-pthreads-static libcurl-static libevent-openssl-static openssl-ssl-static openssl-crypto-static cjson)
+target_link_libraries(doris doris_client_static libMesaMonitor libevent-static libevent-pthreads-static libcurl-static libevent-openssl-static openssl-ssl-static openssl-crypto-static cjson libLevelDB)
target_link_libraries(doris MESA_handle_logger MESA_htable MESA_prof_load MESA_field_stat2 pthread z dl)
set_target_properties(doris PROPERTIES CLEAN_DIRECT_OUTPUT 1)
target_include_directories(doris PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
diff --git a/server/bin/conf/doris_main.conf b/server/bin/conf/doris_main.conf
index 7816131..da962da 100644
--- a/server/bin/conf/doris_main.conf
+++ b/server/bin/conf/doris_main.conf
@@ -35,6 +35,7 @@ doris_client_confile=./conf/doris_client_csum.conf
[VoIP]
receive_config_way=3
+#persistence_write_on=1
producer_listen_port=9801
producer_concurrence_allowed=1
mem_cache_max_versions=2
diff --git a/server/doris_server_kvdb.cpp b/server/doris_server_kvdb.cpp
new file mode 100644
index 0000000..19b6638
--- /dev/null
+++ b/server/doris_server_kvdb.cpp
@@ -0,0 +1,97 @@
+#include <string.h>
+#include <sys/stat.h>
+#include <stdlib.h>
+
+#include <string>
+
+#include "leveldb/db.h"
+#include "leveldb/comparator.h"
+#include "leveldb/cache.h"
+
+#include "doris_server_kvdb.h"
+
+struct doris_kvhandle
+{
+ leveldb::DB *kvdb;
+};
+
+struct doris_kvhandle *doris_kvdb_hanlde_new(const char *dir)
+{
+ struct doris_kvhandle *handle;
+ leveldb::Options options;
+
+ handle = (struct doris_kvhandle *)malloc(sizeof(struct doris_kvhandle));
+ options.create_if_missing = true;
+ leveldb::Status status = leveldb::DB::Open(options, std::string(dir), &(handle->kvdb));
+ if(!status.ok())
+ {
+ free(handle);
+ return NULL;
+ }
+ return handle;
+}
+
+bool doris_kvdb_update_keyint_valint(struct doris_kvhandle *handle, u_int64_t key, int64_t value)
+{
+ leveldb::WriteOptions wop;
+ leveldb::Slice _key((const char *)&key, sizeof(key));
+ leveldb::Slice _value((const char *)&value, sizeof(value));
+ wop.sync = true;
+ leveldb::Status s = handle->kvdb->Put(wop, _key, _value);
+ return s.ok();
+}
+
+bool doris_kvdb_update_keystr_valint(struct doris_kvhandle *handle, const char *key, int64_t value)
+{
+ leveldb::WriteOptions wop;
+ leveldb::Slice _key((const char *)key, strlen(key));
+ leveldb::Slice _value((const char *)&value, sizeof(value));
+ wop.sync = true;
+ leveldb::Status s = handle->kvdb->Put(wop, _key, _value);
+ return s.ok();
+}
+
+int doris_kvdb_delete_keyint(struct doris_kvhandle *handle, u_int64_t key)
+{
+ leveldb::Slice _key((char *)&key, sizeof(key));
+ leveldb::Status s = handle->kvdb->Delete(leveldb::WriteOptions(), _key);
+ return s.ok();
+}
+
+int doris_kvdb_delete_keystr(struct doris_kvhandle *handle, const char *key)
+{
+ leveldb::Slice _key((const char *)key, strlen(key));
+ leveldb::Status s = handle->kvdb->Delete(leveldb::WriteOptions(), _key);
+ return s.ok();
+}
+
+void doris_kvdb_handle_destroy(struct doris_kvhandle *handle)
+{
+ delete handle->kvdb;
+ free(handle);
+}
+
+int64_t doris_kvdb_get_keyint_valint(struct doris_kvhandle *handle, u_int64_t key)
+{
+ std::string value;
+ leveldb::Slice off_key((const char *)&key, sizeof(key));
+ leveldb::Status s = handle->kvdb->Get(leveldb::ReadOptions(), off_key, &value);
+ if(!s.ok())
+ {
+ return 0;
+ }
+ return *(int64_t *)value.data();
+}
+
+int64_t doris_kvdb_get_keystr_valint(struct doris_kvhandle *handle, const char *key)
+{
+ std::string value;
+ leveldb::Slice off_key((const char *)key, strlen(key));
+ leveldb::Status s = handle->kvdb->Get(leveldb::ReadOptions(), off_key, &value);
+ if(!s.ok())
+ {
+ return 0;
+ }
+ return *(int64_t *)value.data();
+}
+
diff --git a/server/doris_server_kvdb.h b/server/doris_server_kvdb.h
new file mode 100644
index 0000000..6608e5c
--- /dev/null
+++ b/server/doris_server_kvdb.h
@@ -0,0 +1,18 @@
+#ifndef __DORIS_KVDB_H__
+#define __DORIS_KVDB_H__
+
+struct doris_kvhandle;
+
+struct doris_kvhandle *doris_kvdb_hanlde_new(const char *dir);
+void doris_kvdb_handle_destroy(struct doris_kvhandle *handle);
+
+bool doris_kvdb_update_keyint_valint(struct doris_kvhandle *handle, u_int64_t key, int64_t value);
+bool doris_kvdb_update_keystr_valint(struct doris_kvhandle *handle, const char *key, int64_t value);
+
+int doris_kvdb_delete_keyint(struct doris_kvhandle *handle, u_int64_t key);
+int doris_kvdb_delete_keystr(struct doris_kvhandle *handle, const char *key);
+
+int64_t doris_kvdb_get_keyint_valint(struct doris_kvhandle *handle, u_int64_t key);
+int64_t doris_kvdb_get_keystr_valint(struct doris_kvhandle *handle, const char *key);
+
+#endif
diff --git a/server/doris_server_main.cpp b/server/doris_server_main.cpp
index 480e904..4e4e20e 100644
--- a/server/doris_server_main.cpp
+++ b/server/doris_server_main.cpp
@@ -302,24 +302,32 @@ static int32_t doris_init_config_for_business(struct doris_global_info *info, st
MESA_load_profile_uint_def(config_file, business->bizname, "max_store_full_versions", &business->saves_when_fulldel, 0);
if(business->saves_when_fulldel > 16)
{
- MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "\033[1;31;40mAlert! %s [%s]max_store_full_versions support max 16!!!!\033[0m\n", config_file, business->bizname);
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert] %s [%s]max_store_full_versions support max 16!!!!\033[0m\n", config_file, business->bizname);
business->saves_when_fulldel = 16;
}
MESA_load_profile_uint_def(config_file, business->bizname, "grafana_monitor_status_id", &business->mmval_status_codeid, 3);
- MESA_load_profile_uint_def(config_file, business->bizname, "mem_cache_max_versions", &business->cache_max_versions, 0);
+ MESA_load_profile_uint_def(config_file, business->bizname, "persistence_write_on", &business->persistence_write_on, 1);
+ MESA_load_profile_int_def(config_file, business->bizname, "mem_cache_max_versions", &business->cache_max_versions, 0);
+ if(business->persistence_write_on==0 && business->cache_max_versions==0)
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert] %s [%s], you must set mem_cache_max_versions if you disable persistence_write_on!\033[0m\n", config_file, business->bizname);
+ return -2;
+ }
if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "store_config_path", business->store_path_root, sizeof(business->store_path_root)))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]store_config_path not found!", bizname, config_file);
assert(0);return -1;
}
- snprintf(tmp_dir, 512, "%s/full/index", business->store_path_root);
- snprintf(tmp_dir2,512, "%s/inc/index", business->store_path_root);
- if(doris_mkdir_according_path(tmp_dir) || doris_mkdir_according_path(tmp_dir2))
+ if(business->persistence_write_on)
{
- MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "mkdir %s failed: %s\n", tmp_dir, strerror(errno));
- return -1;
+ snprintf(tmp_dir, 512, "%s/full/index", business->store_path_root);
+ snprintf(tmp_dir2,512, "%s/inc/index", business->store_path_root);
+ if(doris_mkdir_according_path(tmp_dir) || doris_mkdir_according_path(tmp_dir2))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "mkdir %s failed: %s\n", tmp_dir, strerror(errno));
+ return -1;
+ }
}
-
MESA_load_profile_uint_def(config_file, business->bizname, "receive_config_way", &business->recv_way, RECV_WAY_DRS_CLIENT);
assert(business->recv_way==RECV_WAY_IDX_FILE || business->recv_way==RECV_WAY_DRS_CLIENT || business->recv_way==RECV_WAY_HTTP_POST);
if(business->recv_way == RECV_WAY_IDX_FILE)
@@ -371,6 +379,10 @@ static int32_t doris_init_config_for_business(struct doris_global_info *info, st
{
assert(0);return -2;
}
+ if(!business->persistence_write_on)
+ {
+ g_doris_server_info.business_post_nopersists++;
+ }
g_doris_server_info.business_post_num++;
business->token2node = new map<string, struct version_list_node *>;
}
@@ -466,7 +478,12 @@ int main(int argc, char **argv)
"http_post_server_status", MONITOR_METRICS_GAUGE, "Running status of doris http post server.");
MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_DOWN);
}
-
+ if(g_doris_server_info.business_post_nopersists>0 && NULL==(g_doris_server_info.kvdbhandle=doris_kvdb_hanlde_new("./leveldbdata")))
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mOpen levelDB ./leveldbdata failed.\033[0m");
+ assert(0);return -11;
+ }
+
if(g_doris_server_info.ssl_conn_on && NULL==(g_doris_server_info.ssl_instance=doris_connections_create_ssl_ctx()))
{
assert(0);return -8;
diff --git a/server/doris_server_main.h b/server/doris_server_main.h
index b97f760..ea348b9 100644
--- a/server/doris_server_main.h
+++ b/server/doris_server_main.h
@@ -20,6 +20,7 @@
#include "doris_consumer_client.h"
#include "doris_server_receive.h"
+#include "doris_server_kvdb.h"
#include <map>
#include <string>
@@ -48,9 +49,10 @@ struct doris_business
/*first for configuration*/
char bizname[32];
u_int32_t recv_way;
- u_int32_t cache_max_versions;
+ int32_t cache_max_versions;
u_int32_t concurrency_allowed;
u_int32_t saves_when_fulldel; //��ȫ������ʱ����ౣ�漸�����µ�ȫ���汾��0-ȫ����
+ u_int32_t persistence_write_on;
char recv_path_full[256];
char recv_path_inc[256];
char store_path_root[256];
@@ -74,6 +76,7 @@ struct doris_business
struct doris_prod_instance *instance;
map<string, struct version_list_node *> *token2node;
int64_t version;
+ int64_t genversion_seq; //postģʽ���������ɰ汾�ŵ�����
int32_t source_from;
int32_t type;
int64_t version_cfgnum;
@@ -117,12 +120,14 @@ struct doris_global_info
u_int32_t business_num;
u_int32_t business_post_num; //postģʽ�м���
int32_t business_post_ups; //�����˼���
+ int32_t business_post_nopersists;
int32_t mmid_post_server; //value=PROMETHUES_POST_*
map<string, struct doris_business*> *name2business;
map<string, struct doris_csum_param *> *confile2csmparam;
struct MESA_MonitorHandler *monitor;
pthread_mutex_t mutex_lock;
+ struct doris_kvhandle *kvdbhandle;
/*logs*/
u_int32_t log_level;
diff --git a/server/doris_server_receive.cpp b/server/doris_server_receive.cpp
index 6d30623..56643a7 100644
--- a/server/doris_server_receive.cpp
+++ b/server/doris_server_receive.cpp
@@ -222,7 +222,6 @@ void doris_config_file_version_error(struct doris_csum_instance *instance, void
fclose(business->fp_cfg_file);
remove(business->cfg_file_path);
}
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", business->bizname, business->version);
}
void doris_config_file_cfgfile_start(struct doris_csum_instance *instance,
@@ -320,7 +319,7 @@ void doris_config_mem_version_finish(struct doris_csum_instance *instance, void
{
cfgver_handle = config_version_handle_new();
cfgver_handle->latest_version = business->cur_vernode->version;
- cfgver_handle->version_num = 1;
+ cfgver_handle->version_mem_num = 1;
TAILQ_INSERT_TAIL(&cfgver_handle->version_head, business->cur_vernode, version_node);
cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head);
cfgver_handle->version2node->insert(make_pair(cfgver_handle->latest_version, business->cur_vernode));
@@ -343,14 +342,23 @@ void doris_config_mem_version_finish(struct doris_csum_instance *instance, void
cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head);
}
/*�����ļ�������໺��N���汾��Ԫ��Ϣȫ����*/
- if(business->cache_max_versions!=0 && cfgver_handle->version_num>=business->cache_max_versions)
+ if(business->cache_max_versions!=0 && cfgver_handle->version_mem_num>=business->cache_max_versions)
{
- config_version_node_free_content(cfgver_handle->oldest_vernode);
- cfgver_handle->oldest_vernode = TAILQ_NEXT(cfgver_handle->oldest_vernode, version_node);
+ if(!business->persistence_write_on)
+ {
+ TAILQ_REMOVE(&cfgver_handle->version_head, cfgver_handle->oldest_vernode, version_node);
+ config_version_node_cleanup(cfgver_handle->oldest_vernode);
+ cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head);
+ }
+ else
+ {
+ config_version_node_free_content(cfgver_handle->oldest_vernode);
+ cfgver_handle->oldest_vernode = TAILQ_NEXT(cfgver_handle->oldest_vernode, version_node);
+ }
}
else
{
- cfgver_handle->version_num += 1;
+ cfgver_handle->version_mem_num += 1;
}
pthread_rwlock_unlock(&business->rwlock);
}
@@ -578,8 +586,13 @@ void doris_config_localmem_cfgfile_finish(struct doris_csum_instance *instance,
/*�ޱ��ϵ�к�������������ʱ�ص�*/
void doris_config_version_start(struct doris_csum_instance *instance, cJSON *meta, void *userdata)
{
- doris_config_common_version_start((struct doris_business *)userdata, meta);
- doris_config_file_version_start(instance, meta, userdata);
+ struct doris_business *business=(struct doris_business *)userdata;
+
+ doris_config_common_version_start(business, meta);
+ if(business->persistence_write_on)
+ {
+ doris_config_file_version_start(instance, meta, userdata);
+ }
if(g_doris_server_info.consumer_port)
{
doris_config_mem_version_start(instance, meta, userdata);
@@ -588,7 +601,12 @@ void doris_config_version_start(struct doris_csum_instance *instance, cJSON *met
void doris_config_version_finish(struct doris_csum_instance *instance, void *userdata)
{
- doris_config_file_version_finish(instance, userdata);
+ struct doris_business *business=(struct doris_business *)userdata;
+
+ if(business->persistence_write_on)
+ {
+ doris_config_file_version_finish(instance, userdata);
+ }
if(g_doris_server_info.consumer_port)
{
doris_config_mem_version_finish(instance, userdata);
@@ -598,12 +616,18 @@ void doris_config_version_finish(struct doris_csum_instance *instance, void *use
void doris_config_version_error(struct doris_csum_instance *instance, void *userdata)
{
+ struct doris_business *business=(struct doris_business *)userdata;
+
doris_config_common_version_error((struct doris_business *)userdata);
- doris_config_file_version_error(instance, userdata);
+ if(business->persistence_write_on)
+ {
+ doris_config_file_version_error(instance, userdata);
+ }
if(g_doris_server_info.consumer_port)
{
doris_config_mem_version_error(instance, userdata);
}
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", business->bizname, business->version);
}
void doris_config_cfgfile_start(struct doris_csum_instance *instance,
@@ -612,7 +636,10 @@ void doris_config_cfgfile_start(struct doris_csum_instance *instance,
struct doris_business *business=(struct doris_business *)userdata;
doris_config_common_cfgfile_start((struct doris_business *)userdata, meta->cfgnum);
- doris_config_file_cfgfile_start(instance, meta, localpath, userdata);
+ if(business->persistence_write_on)
+ {
+ doris_config_file_cfgfile_start(instance, meta, localpath, userdata);
+ }
if(g_doris_server_info.consumer_port)
{
doris_config_mem_cfgfile_start(instance, meta, business->cfg_file_path, userdata);
@@ -621,7 +648,12 @@ void doris_config_cfgfile_start(struct doris_csum_instance *instance,
void doris_config_cfgfile_update(struct doris_csum_instance *instance, const char *data, size_t len, void *userdata)
{
- doris_config_file_cfgfile_update(instance, data, len, userdata);
+ struct doris_business *business=(struct doris_business *)userdata;
+
+ if(business->persistence_write_on)
+ {
+ doris_config_file_cfgfile_update(instance, data, len, userdata);
+ }
if(g_doris_server_info.consumer_port)
{
doris_config_mem_cfgfile_update(instance, data, len, userdata);
@@ -630,8 +662,13 @@ void doris_config_cfgfile_update(struct doris_csum_instance *instance, const cha
void doris_config_cfgfile_finish(struct doris_csum_instance *instance, const char *md5, void *userdata)
{
+ struct doris_business *business=(struct doris_business *)userdata;
+
doris_config_common_cfgfile_finish((struct doris_business *)userdata);
- doris_config_file_cfgfile_finish(instance, userdata);
+ if(business->persistence_write_on)
+ {
+ doris_config_file_cfgfile_finish(instance, userdata);
+ }
if(g_doris_server_info.consumer_port)
{
doris_config_mem_cfgfile_finish(instance, md5, userdata);
@@ -656,29 +693,30 @@ void* thread_doris_client_recv_cfg(void *arg)
business->source_from = RECV_WAY_IDX_FILE;
business->worker_evbase = client_evbase;
- scanner = doris_index_file_scanner(0);
-
- /*Retaive latest config to memory from Stored configs*/
- doris_cbs.version_start = doris_config_localmem_version_start;
- doris_cbs.version_finish = doris_config_localmem_version_finish;
- doris_cbs.version_error = doris_config_localmem_version_error;
- doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start;
- doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update;
- doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
- doris_cbs.version_updated= NULL;
- doris_cbs.userdata = business;
-
- snprintf(stored_path, 512, "%s/full/index", business->store_path_root);
- if(business->saves_when_fulldel > 0)
+ if(business->persistence_write_on)
{
- get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel);
- }
- update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
- snprintf(stored_path, 512, "%s/inc/index", business->store_path_root);
- do {
- update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
- }while(update_type != CFG_UPDATE_TYPE_NONE);
+ scanner = doris_index_file_scanner(0);
+ /*Retaive latest config to memory from Stored configs*/
+ doris_cbs.version_start = doris_config_localmem_version_start;
+ doris_cbs.version_finish = doris_config_localmem_version_finish;
+ doris_cbs.version_error = doris_config_localmem_version_error;
+ doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start;
+ doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update;
+ doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
+ doris_cbs.version_updated= NULL;
+ doris_cbs.userdata = business;
+ snprintf(stored_path, 512, "%s/full/index", business->store_path_root);
+ if(business->saves_when_fulldel > 0)
+ {
+ get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel);
+ }
+ update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
+ snprintf(stored_path, 512, "%s/inc/index", business->store_path_root);
+ do {
+ update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
+ }while(update_type != CFG_UPDATE_TYPE_NONE);
+ }
/*Check new configs*/
doris_cbs.version_start = doris_config_version_start;
@@ -687,10 +725,12 @@ void* thread_doris_client_recv_cfg(void *arg)
doris_cbs.cfgfile_start = doris_config_cfgfile_start;
doris_cbs.cfgfile_update = doris_config_cfgfile_update;
doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
+ doris_cbs.version_updated= NULL;
+ doris_cbs.userdata = business;
business->source_from = RECV_WAY_DRS_CLIENT;
memset(&doris_args, 0, sizeof(struct doris_arguments));
- doris_args.current_version = scanner->cur_version;
+ doris_args.current_version = (business->persistence_write_on)?scanner->cur_version:(0-business->cache_max_versions);
sprintf(doris_args.bizname, "%s", business->bizname);
instance = doris_csum_instance_new(business->param_csum, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime);
if(instance == NULL)
@@ -860,6 +900,12 @@ void prod_server_generate_token(struct doris_business *business, char *token/*OU
pthread_mutex_unlock(&g_doris_server_info.mutex_lock);
}
+/*TODO: �������ɱȶԷ��汾��С�İ汾*/
+int64_t prod_server_generate_version(struct doris_business *business)
+{
+ return ++business->genversion_seq;
+}
+
void business_resume_sync_peer_normal(struct doris_business *business)
{
u_int32_t business_post_ups;
@@ -896,7 +942,7 @@ void business_set_sync_peer_abnormal(struct doris_business *business)
{
return;
}
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mcluster sync error, please check slave status!!!\033[0m\n");
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]cluster sync error, please check slave status!!!\033[0m\n");
if(0 == atomic_set(&business->ready_to_sync, 0) || business->listener_prod==0)
{
@@ -1009,19 +1055,22 @@ void http_config_direct_version_cancel(struct version_list_node *vernode, struct
{
doris_prod_upload_ctx_destroy(vernode->synctx);
}
- if(vernode->fp_idx_file != NULL)
- {
- fclose(vernode->fp_idx_file);
- remove(vernode->tmp_index_path);
- }
- if(vernode->cur_table!=NULL && vernode->cur_table->fp_cfg_file != NULL)
+ if(business->persistence_write_on)
{
- fclose(vernode->cur_table->fp_cfg_file);
- remove(vernode->cur_table->localpath);
- }
- TAILQ_FOREACH(tablenode, &vernode->table_head, table_node)
- {
- remove(tablenode->localpath);
+ if(vernode->fp_idx_file != NULL)
+ {
+ fclose(vernode->fp_idx_file);
+ remove(vernode->tmp_index_path);
+ }
+ if(vernode->cur_table!=NULL && vernode->cur_table->fp_cfg_file != NULL)
+ {
+ fclose(vernode->cur_table->fp_cfg_file);
+ remove(vernode->cur_table->localpath);
+ }
+ TAILQ_FOREACH(tablenode, &vernode->table_head, table_node)
+ {
+ remove(tablenode->localpath);
+ }
}
config_version_node_cleanup(vernode);
if(business->concurrency_allowed && evtimer_pending(&vernode->timer_expire, NULL))
@@ -1050,7 +1099,7 @@ void prod_sync_vercancel_result_cb(enum PROD_VEROP_RES result, void *userdata)
case VERSIONOP_RES_ERROR:
evhttp_send_error(vernode->req, 500, "version cancel sync error res_code");
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version cancel sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, version cancel sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname);
break;
case VERSIONOP_CURL_ERROR:
@@ -1115,24 +1164,27 @@ void doris_config_post_version_finish(struct doris_business *business, struct ve
{
assert(newversion > vernode->version);
vernode->version = newversion;
-
- if(vernode->cfg_type == CFG_UPDATE_TYPE_FULL)
- {
- snprintf(business->inc_index_path, 256, "%s/inc/index/full_config_index.%010lu", business->store_path_root, vernode->version);
- snprintf(business->full_index_path, 256, "%s/full/index/full_config_index.%010lu", business->store_path_root, vernode->version);
- }
- else
- {
- snprintf(business->inc_index_path, 256, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, vernode->version);
- }
- /*HTTP postʱ����汾����ÿ�������Լ�����ʱ֪ͨ�ļ��������ñ����ļ��Ĺرպ���*/
- sprintf(business->tmp_index_path, "%s", vernode->tmp_index_path);
business->version = vernode->version;
business->type = vernode->cfg_type;
- business->fp_idx_file = vernode->fp_idx_file;
- doris_config_file_version_finish(NULL, business);
- vernode->fp_idx_file = NULL;
+ if(business->persistence_write_on)
+ {
+ if(vernode->cfg_type == CFG_UPDATE_TYPE_FULL)
+ {
+ snprintf(business->inc_index_path, 256, "%s/inc/index/full_config_index.%010lu", business->store_path_root, vernode->version);
+ snprintf(business->full_index_path, 256, "%s/full/index/full_config_index.%010lu", business->store_path_root, vernode->version);
+ }
+ else
+ {
+ snprintf(business->inc_index_path, 256, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, vernode->version);
+ }
+ /*HTTP postʱ����汾����ÿ�������Լ�����ʱ֪ͨ�ļ��������ñ����ļ��Ĺرպ���*/
+ sprintf(business->tmp_index_path, "%s", vernode->tmp_index_path);
+ business->fp_idx_file = vernode->fp_idx_file;
+ doris_config_file_version_finish(NULL, business);
+ vernode->fp_idx_file = NULL;
+ }
+
if(g_doris_server_info.consumer_port)
{
business->cur_vernode = vernode;
@@ -1158,7 +1210,7 @@ void doris_config_post_version_finish(struct doris_business *business, struct ve
void http_config_direct_version_finish(struct version_list_node *vernode, struct evhttp_request *req, int64_t set_version)
{
struct doris_business *business=vernode->business;
- char version[32], token[64];
+ char version[32], token[64], lvdbkey[40];
int64_t new_version;
if(business->concurrency_allowed && evtimer_pending(&vernode->timer_expire, NULL))
@@ -1168,12 +1220,22 @@ void http_config_direct_version_finish(struct version_list_node *vernode, struct
if(set_version == 0)
{
- new_version = business->cfgver_head->latest_version + 1;
+ new_version = prod_server_generate_version(business);
}
else
{
- new_version = set_version;
+ new_version = business->genversion_seq = set_version;
+ }
+ /*����leveldb���ɰ汾�ŵ�����*/
+ if(!business->persistence_write_on)
+ {
+ sprintf(lvdbkey, "%s_verseq", business->bizname);
+ if(!doris_kvdb_update_keystr_valint(g_doris_server_info.kvdbhandle, lvdbkey, new_version))
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert] business: %s, update levelDB failed!\033[0m\n", business->bizname);
+ }
}
+
sprintf(token, "%s", vernode->token);
doris_config_post_version_finish(business, vernode, new_version);
@@ -1198,7 +1260,7 @@ void prod_sync_verend_result_cb(enum PROD_VEROP_RES result, int64_t version, voi
case VERSIONOP_RES_ERROR:
evhttp_send_error(vernode->req, 500, "version end sync error res_code");
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version end sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, version end sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname);
break;
case VERSIONOP_CURL_ERROR:
@@ -1332,20 +1394,23 @@ void doris_config_post_version_start(struct version_list_node *cur_vernode, cons
struct doris_business *business=cur_vernode->business;
snprintf(cur_vernode->token, 64, "%s", token);
- if(cur_vernode->cfg_type == CFG_UPDATE_TYPE_FULL)
- {
- snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token);
- }
- else
- {
- snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token);
- }
- if(NULL==(cur_vernode->fp_idx_file = fopen(cur_vernode->tmp_index_path, "w+")))
+
+ if(business->persistence_write_on)
{
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, cur_vernode->tmp_index_path, strerror(errno));
- assert(0);
+ if(cur_vernode->cfg_type == CFG_UPDATE_TYPE_FULL)
+ {
+ snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token);
+ }
+ else
+ {
+ snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token);
+ }
+ if(NULL==(cur_vernode->fp_idx_file = fopen(cur_vernode->tmp_index_path, "w+")))
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, cur_vernode->tmp_index_path, strerror(errno));
+ assert(0);
+ }
}
-
if(g_doris_server_info.consumer_port)
{
TAILQ_INIT(&cur_vernode->table_head);
@@ -1404,7 +1469,7 @@ void try_restore_from_busy_peer(struct version_list_node *cur_vernode, const cha
cur_vernode->req = NULL;
if(busy)
{
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "\033[33mbusiness: %s, restore from busy peer, post master server send response version start: %s\033[0m", business->bizname, body);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "\033[33m[Warning]business: %s, restore from busy peer, post master server send response version start: %s\033[0m", business->bizname, body);
}
else
{
@@ -1436,7 +1501,7 @@ void prod_sync_verstart_result_cb(enum PROD_VERSTART_RES result, const char *bod
business->cur_vernode = NULL;
business->posts_on_the_way--;
FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, business->posts_on_the_way);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version start sync error res_code, abandon it. Send 500 response to client.\033[0m", business->bizname);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, version start sync error res_code, abandon it. Send 500 response to client.\033[0m", business->bizname);
break;
case VERSTART_CURL_ERROR:
@@ -1727,12 +1792,14 @@ void doris_config_post_cfgfile_start(struct version_list_node *vernode, struct e
meta.cfgnum = vernode->cur_table->cfgnum;
meta.size = 0;
- vernode->business->type = vernode->cfg_type;
- vernode->business->fp_idx_file = vernode->fp_idx_file;
- doris_config_file_cfgfile_start(NULL, &meta, NULL, vernode->business);
- sprintf(vernode->cur_table->localpath, "%s", vernode->business->cfg_file_path);
- vernode->cur_table->fp_cfg_file = vernode->business->fp_cfg_file;
-
+ if(vernode->business->persistence_write_on)
+ {
+ vernode->business->type = vernode->cfg_type;
+ vernode->business->fp_idx_file = vernode->fp_idx_file;
+ doris_config_file_cfgfile_start(NULL, &meta, NULL, vernode->business);
+ sprintf(vernode->cur_table->localpath, "%s", vernode->business->cfg_file_path);
+ vernode->cur_table->fp_cfg_file = vernode->business->fp_cfg_file;
+ }
if(g_doris_server_info.consumer_port)
{
vernode->cur_table->table_meta = cJSON_CreateObject();
@@ -1749,7 +1816,10 @@ void doris_config_post_cfgfile_start(struct version_list_node *vernode, struct e
void doris_config_post_cfgfile_finish(struct version_list_node *vernode, const char *md5str)
{
doris_config_common_cfgfile_finish(vernode->business);
- fclose(vernode->cur_table->fp_cfg_file);
+ if(vernode->business->persistence_write_on)
+ {
+ fclose(vernode->cur_table->fp_cfg_file);
+ }
assert(vernode->cur_table->filesize == 0);
vernode->cur_table->filesize = vernode->cur_table->cur_totallen;
@@ -1791,11 +1861,14 @@ void http_config_direct_cfgfile_update(struct version_list_node *vernode, struct
}
if(vernode->cur_table->fragsize > 0)
{
- writen_len = fwrite(vernode->cur_table->fragcontent, 1, vernode->cur_table->fragsize, vernode->cur_table->fp_cfg_file);
- if(writen_len != vernode->cur_table->fragsize)
+ if(vernode->business->persistence_write_on)
{
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", vernode->business->bizname, vernode->cur_table->localpath, strerror(errno));
- assert(0);
+ writen_len = fwrite(vernode->cur_table->fragcontent, 1, vernode->cur_table->fragsize, vernode->cur_table->fp_cfg_file);
+ if(writen_len != vernode->cur_table->fragsize)
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", vernode->business->bizname, vernode->cur_table->localpath, strerror(errno));
+ assert(0);
+ }
}
if(g_doris_server_info.consumer_port)
{
@@ -1843,7 +1916,7 @@ void prod_sync_upload_frag_cb(enum PROD_VEROP_RES result,void * userdata)
case VERSIONOP_RES_ERROR:
evhttp_send_error(vernode->req, 500, "frag sync error res_code");
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, frag sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, frag sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname);
break;
case VERSIONOP_CURL_ERROR:
@@ -2050,12 +2123,19 @@ void start_business_http_post_server(struct doris_business *business)
}
}
-void doris_config_version_sync_updated(struct doris_csum_instance *instance, void *userdata)
+void doris_config_version_sync_updated(struct doris_csum_instance *instance, int64_t latest_version, void *userdata)
{
struct doris_business *business=(struct doris_business *)userdata;
struct doris_csum_param *param;
u_int32_t references, business_post_ups;
+ if(latest_version)
+ {
+ business->genversion_seq = latest_version;
+ assert(business->cfgver_head->latest_version == latest_version);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[34m[Attention] business %s, HTTP Post server generate version change to: %lu\033[0m", business->bizname, latest_version);
+ }
+
/*����consuemer��ͬʱȷ��������ִֻ��һ��*/
param = doris_csum_instance_get_param(instance);
doris_csum_instance_destroy(instance);
@@ -2091,8 +2171,7 @@ void doris_config_version_sync_updated(struct doris_csum_instance *instance, voi
MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_UPING);
}
assert(business_post_ups <= g_doris_server_info.business_post_num);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[32m******Doris Producer worker for %s starts******\033[0m", business->bizname);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "HttpProducer, doris ready to sync for business: %s\n", business->bizname);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[32m[Info]******Doris Producer worker for %s starts******\033[0m", business->bizname);
}
/*��thread_doris_client_recv_cfg��������version_updated����*/
@@ -2106,6 +2185,7 @@ void* thread_http_post_recv_cfg(void *arg)
struct doris_idxfile_scanner *scanner;
enum DORIS_UPDATE_TYPE update_type;
char stored_path[512];
+ int64_t genversion_seq;
prctl(PR_SET_NAME, "http_post");
@@ -2116,7 +2196,7 @@ void* thread_http_post_recv_cfg(void *arg)
scanner = doris_index_file_scanner(0);
- /*Retaive latest config to memory from Stored configs*/
+ /*�����Ƿ����־û��������Զ�һ�±������ã�����Ϊ���ð汾��ʼ��ʹ��*/
doris_cbs.version_start = doris_config_localmem_version_start;
doris_cbs.version_finish = doris_config_localmem_version_finish;
doris_cbs.version_error = doris_config_localmem_version_error;
@@ -2137,6 +2217,18 @@ void* thread_http_post_recv_cfg(void *arg)
update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
}while(update_type != CFG_UPDATE_TYPE_NONE);
+ /*�������ɰ汾����ʼ����*/
+ business->genversion_seq = scanner->cur_version;
+ if(!business->persistence_write_on)
+ {
+ sprintf(stored_path, "%s_verseq", business->bizname);
+ if((genversion_seq = doris_kvdb_get_keystr_valint(g_doris_server_info.kvdbhandle, stored_path)) != 0)
+ {
+ business->genversion_seq = genversion_seq;
+ }
+ }
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[34m[Attention] business %s, HTTP Post server generate version from: %lu\033[0m\n", business->bizname, business->genversion_seq);
+
if(g_doris_server_info.cluster_sync_mode) /*Check new configs*/
{
doris_cbs.version_start = doris_config_version_start;
@@ -2146,10 +2238,11 @@ void* thread_http_post_recv_cfg(void *arg)
doris_cbs.cfgfile_update = doris_config_cfgfile_update;
doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
doris_cbs.version_updated= doris_config_version_sync_updated;
+ doris_cbs.userdata = business;
business->source_from = RECV_WAY_DRS_CLIENT;
memset(&doris_args, 0, sizeof(struct doris_arguments));
- doris_args.current_version = scanner->cur_version;
+ doris_args.current_version = (business->persistence_write_on)?scanner->cur_version:(0-business->cache_max_versions);
sprintf(doris_args.bizname, "%s", business->bizname);
instance = doris_csum_instance_new(business->param_csum, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime);
if(instance == NULL)
diff --git a/server/doris_server_receive.h b/server/doris_server_receive.h
index 70b4ea6..44a1247 100644
--- a/server/doris_server_receive.h
+++ b/server/doris_server_receive.h
@@ -142,7 +142,7 @@ struct version_list_handle
TAILQ_HEAD(__version_list_node, version_list_node) version_head;
int64_t latest_version;
int32_t references;
- u_int32_t version_num;
+ int32_t version_mem_num;
map<int64_t, struct version_list_node*> *version2node;
struct version_list_node *oldest_vernode; //δ�����ڴ���̭�����ϰ汾
};
diff --git a/server/doris_server_scandir.cpp b/server/doris_server_scandir.cpp
index 73d8f2e..5097185 100644
--- a/server/doris_server_scandir.cpp
+++ b/server/doris_server_scandir.cpp
@@ -231,7 +231,7 @@ enum DORIS_UPDATE_TYPE get_new_idx_path(long long current_version, const char *f
if(n < 0)
{
MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "scan dir error");
- return update_type;
+ return CFG_UPDATE_TYPE_NONE;
}
tmpidx_ver_array = (struct index_version_array*)calloc(sizeof(struct index_version_array), n);
@@ -439,7 +439,7 @@ enum DORIS_UPDATE_TYPE doris_index_file_traverse(struct doris_idxfile_scanner *s
table_num=cm_read_cfg_index_file(idx_path_array[i].path, table_array, CM_MAX_TABLE_NUM, logger);
if(table_num<=0)
{
- MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "\033[1;31;40mAlert! Load %s failed, skip this wrong version!!!!\033[0m\n", idx_path_array[i].path);
+ MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "\033[1;31;40m[Alert] Load %s failed, skip this wrong version!!!!\033[0m\n", idx_path_array[i].path);
update_type = CFG_UPDATE_TYPE_ERR;
scanner->cur_version = idx_path_array[i].version; //����İ汾����
break;
@@ -460,7 +460,7 @@ enum DORIS_UPDATE_TYPE doris_index_file_traverse(struct doris_idxfile_scanner *s
update_type = CFG_UPDATE_TYPE_ERR;
doris_cbs->version_error(NULL, doris_cbs->userdata);
cJSON_Delete(meta);
- MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "\033[1;31;40mAlert! Load %s failed, skip this wrong version!!!!\033[0m\n", idx_path_array[i].path);
+ MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "\033[1;31;40m[Alert] Load %s failed, skip this wrong version!!!!\033[0m\n", idx_path_array[i].path);
break;
}
cJSON_Delete(meta);