summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/CMakeLists.txt2
-rw-r--r--server/bin/conf/doris_client.conf35
-rw-r--r--server/bin/conf/doris_client_csum.conf30
-rw-r--r--server/bin/conf/doris_client_sync_t1.conf28
-rw-r--r--server/bin/conf/doris_client_sync_voip.conf28
-rw-r--r--server/bin/conf/doris_main.conf27
-rw-r--r--server/doris_server_http.cpp171
-rw-r--r--server/doris_server_http.h1
-rw-r--r--server/doris_server_main.cpp212
-rw-r--r--server/doris_server_main.h58
-rw-r--r--server/doris_server_receive.cpp1845
-rw-r--r--server/doris_server_receive.h86
-rw-r--r--server/doris_server_scandir.cpp14
-rw-r--r--server/doris_server_scandir.h4
14 files changed, 2067 insertions, 474 deletions
diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt
index 3f609ed..92077dc 100644
--- a/server/CMakeLists.txt
+++ b/server/CMakeLists.txt
@@ -4,7 +4,7 @@ add_definitions(-fPIC -Wall -g)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D__FILENAME__='\"$(subst ${CMAKE_CURRENT_SOURCE_DIR}/,,$(abspath $<))\"'")
add_executable(doris ${NIRVANA_PLATFORM_SRC})
-target_link_libraries(doris doris_client_static libMesaMonitor libevent-static libevent-pthreads-static libcurl-static libevent-openssl-static openssl-ssl-static openssl-crypto-static cjson libLevelDB)
+target_link_libraries(doris doris_client_static libMesaMonitor libevent-static libevent-pthreads-static libcurl-static libevent-openssl-static openssl-ssl-static openssl-crypto-static cjson)
target_link_libraries(doris MESA_handle_logger MESA_htable MESA_prof_load MESA_field_stat2 pthread z dl)
set_target_properties(doris PROPERTIES CLEAN_DIRECT_OUTPUT 1)
target_include_directories(doris PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
diff --git a/server/bin/conf/doris_client.conf b/server/bin/conf/doris_client.conf
deleted file mode 100644
index 062aeeb..0000000
--- a/server/bin/conf/doris_client.conf
+++ /dev/null
@@ -1,35 +0,0 @@
-[DORIS_CLIENT]
-fetch_fail_retry_interval=5
-fetch_fragmet_size=5242880
-fetch_confile_max_tries=3
-
-fsstat_log_appname=DorisClient
-fsstat_log_filepath=./log/doris_client.fs
-fsstat_log_interval=2
-fsstat_log_print_mode=1
-fsstat_log_dst_ip=192.168.10.90
-fsstat_log_dst_port=8125
-
-[DORIS_CLIENT.master_server]
-max_connection_per_host=1
-max_cnnt_pipeline_num=10
-https_connection_on=0
-max_curl_session_num=10
-
-http_server_listen_port=9897
-http_server_manage_port=9897
-http_server_ip_list=192.168.10.8
-
-[DORIS_CLIENT.backup1_server]
-max_connection_per_host=1
-max_cnnt_pipeline_num=10
-https_connection_on=0
-max_curl_session_num=10
-
-http_server_listen_port=9897
-http_server_manage_port=9897
-http_server_ip_list=192.168.11.241
-
-[DORIS_CLIENT.backup2_server]
-
-
diff --git a/server/bin/conf/doris_client_csum.conf b/server/bin/conf/doris_client_csum.conf
new file mode 100644
index 0000000..7055fc3
--- /dev/null
+++ b/server/bin/conf/doris_client_csum.conf
@@ -0,0 +1,30 @@
+[DORIS_CLIENT]
+fetch_fail_retry_interval=5
+fetch_fragmet_size=5242880
+fetch_confile_max_tries=3
+
+fsstat_log_appname=DrsCsmClient
+fsstat_log_filepath=./log/doris_client_csm.fs
+fsstat_log_interval=2
+fsstat_log_print_mode=1
+fsstat_log_dst_ip=192.168.10.90
+fsstat_log_dst_port=8125
+
+[DORIS_CLIENT.master_server]
+https_connection_on=1
+http_server_listen_port=9898
+http_server_manage_port=2233
+http_server_ip_list=192.168.10.8
+
+[DORIS_CLIENT.backup1_server]
+https_connection_on=1
+http_server_listen_port=9898
+http_server_manage_port=2233
+http_server_ip_list=192.168.11.241
+
+[DORIS_CLIENT.backup2_server]
+https_connection_on=1
+http_server_listen_port=9898
+http_server_manage_port=2233
+http_server_ip_list=192.168.11.242
+
diff --git a/server/bin/conf/doris_client_sync_t1.conf b/server/bin/conf/doris_client_sync_t1.conf
new file mode 100644
index 0000000..717ce9a
--- /dev/null
+++ b/server/bin/conf/doris_client_sync_t1.conf
@@ -0,0 +1,28 @@
+[DORIS_CLIENT]
+fetch_fail_retry_interval=5
+fetch_fragmet_size=5242880
+fetch_confile_max_tries=3
+
+upload_fragmet_size=5242880
+master_slave_sync_on=1
+
+fsstat_log_appname=DrsPrdClient
+fsstat_log_filepath=./log/doris_sync_t1c.fs
+fsstat_log_filepath_p=./log/doris_sync_t1p.fs
+fsstat_log_interval=2
+fsstat_log_print_mode=1
+fsstat_log_dst_ip=192.168.10.90
+fsstat_log_dst_port=8125
+
+[DORIS_CLIENT.master_server]
+https_connection_on=1
+http_server_listen_port=9898
+http_server_manage_port=2233
+http_server_ip_list=192.168.10.9
+
+[DORIS_CLIENT.produce]
+https_connection_on=1
+http_server_listen_port=9800
+http_server_manage_port=2233
+http_server_ip_list=192.168.10.9
+
diff --git a/server/bin/conf/doris_client_sync_voip.conf b/server/bin/conf/doris_client_sync_voip.conf
new file mode 100644
index 0000000..5600721
--- /dev/null
+++ b/server/bin/conf/doris_client_sync_voip.conf
@@ -0,0 +1,28 @@
+[DORIS_CLIENT]
+fetch_fail_retry_interval=5
+fetch_fragmet_size=5242880
+fetch_confile_max_tries=3
+
+upload_fragmet_size=5242880
+master_slave_sync_on=1
+
+fsstat_log_appname=DrsPrdClient
+fsstat_log_filepath=./log/doris_sync_voipc.fs
+fsstat_log_filepath_p=./log/doris_sync_voipp.fs
+fsstat_log_interval=2
+fsstat_log_print_mode=1
+fsstat_log_dst_ip=192.168.10.90
+fsstat_log_dst_port=8125
+
+[DORIS_CLIENT.master_server]
+https_connection_on=1
+http_server_listen_port=9898
+http_server_manage_port=2233
+http_server_ip_list=192.168.10.9
+
+[DORIS_CLIENT.produce]
+https_connection_on=1
+http_server_listen_port=9801
+http_server_manage_port=2233
+http_server_ip_list=192.168.10.9
+
diff --git a/server/bin/conf/doris_main.conf b/server/bin/conf/doris_main.conf
index 1220cc5..fcf8a6c 100644
--- a/server/bin/conf/doris_main.conf
+++ b/server/bin/conf/doris_main.conf
@@ -1,13 +1,15 @@
[DORIS_SERVER]
worker_thread_num=2
-server_listen_port=9898
-manage_listen_port=2233
+consumer_listen_port=9898
+manager_listen_port=2233
https_connection_on=1
-cache_file_frag_size=100
-#doris_server_role_on=1
+#cache_file_frag_size=67108864
#index_file_format_maat=0
+local_net_name=em1
+http_post_cluster_on=1
+#http_post_max_concurrence=100000
-business_system_list=T1_1;VoIP
+business_system_list=VoIP;T1_1
run_log_dir=./log
run_log_lv=20
@@ -19,20 +21,25 @@ fsstat_log_dst_ip=192.168.10.90
fsstat_log_dst_port=8125
[T1_1]
-#1-Doris client; 2-local file
-receive_config_way=2
+#1-Doris client; 2-local file; 3-HTTP post server
+receive_config_way=1
grafana_monitor_status_id=3
+#producer_listen_port=9800
+#producer_concurrence_allowed=0
store_config_path=./doris_store_t1
receive_config_path_full=./doris_receive_t1/full/index
receive_config_path_inc=./doris_receive_t1/inc/index
-#doris_client_confile=./conf/doris_client.conf
+doris_client_confile=./conf/doris_client_csum.conf
+#doris_client_confile=./conf/doris_client_sync_t1.conf
[VoIP]
-receive_config_way=2
+receive_config_way=3
+producer_listen_port=9801
+producer_concurrence_allowed=1
mem_cache_max_versions=2
grafana_monitor_status_id=4
store_config_path=./doris_store_voip
receive_config_path_full=./doris_receive_voip/full/index
receive_config_path_inc=./doris_receive_voip/inc/index
-#doris_client_confile=./conf/doris_client.conf
+doris_client_confile=./conf/doris_client_sync_voip.conf
diff --git a/server/doris_server_http.cpp b/server/doris_server_http.cpp
index 6c2dad1..f8bdb4e 100644
--- a/server/doris_server_http.cpp
+++ b/server/doris_server_http.cpp
@@ -16,64 +16,11 @@
#include <sys/stat.h>
#include <fcntl.h>
-#include <event2/bufferevent_ssl.h>
-
#include "doris_server_main.h"
#include "doris_server_http.h"
extern struct doris_global_info g_doris_server_info;
-static inline void set_sockopt_keepalive(int sd, int keepidle, int keepintvl, int keepcnt)
-{
- int keepalive = 1;
- setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (void*)&keepalive, sizeof(keepalive));
- setsockopt(sd, SOL_TCP, TCP_KEEPIDLE, (void*)&keepidle, sizeof(keepidle));
- setsockopt(sd, SOL_TCP, TCP_KEEPINTVL, (void*)&keepintvl, sizeof(keepintvl));
- setsockopt(sd, SOL_TCP, TCP_KEEPCNT, (void*)&keepcnt, sizeof(keepcnt));
-}
-
-static inline void set_listen_sockopt(int sd)
-{
- if(g_doris_server_info.sock_recv_bufsize > 0)
- {
- setsockopt(sd, SOL_SOCKET, SO_RCVBUF, &g_doris_server_info.sock_recv_bufsize, sizeof(u_int32_t));
- }
-}
-
-int doris_create_listen_socket(int bind_port)
-{
- evutil_socket_t listener;
- struct sockaddr_in sin;
-
- listener = socket(AF_INET, SOCK_STREAM, 0);
- if(listener < 0)
- {
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "create socket error!\n");
- return -1;
- }
- set_sockopt_keepalive(listener, 300, 10, 2);
- set_listen_sockopt(listener);
- evutil_make_listen_socket_reuseable(listener);
-
- sin.sin_family = AF_INET;
- sin.sin_addr.s_addr=htonl(INADDR_ANY);
- sin.sin_port = htons(bind_port);
-
- if (bind(listener, (struct sockaddr *)&sin, sizeof(sin)) < 0)
- {
- printf("bind socket to port: %d error: %s!\n", bind_port, strerror(errno));
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "bind socket to port: %d error: %s!\n", bind_port, strerror(errno));
- assert(0);return -2;
- }
- if (listen(listener, 1024)<0)
- {
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "listen socket 1024 error!\n");
- return -3;
- }
- evutil_make_socket_nonblocking(listener);
- return listener;
-}
-
struct version_list_node *lookup_vernode_accord_version(struct version_list_handle *handle, int64_t version)
{
struct version_list_node *vernode;
@@ -96,16 +43,49 @@ struct version_list_node *lookup_vernode_accord_version(struct version_list_hand
return NULL;
}
+/*����ֵ:
+ *304-�ͻ����Ѵﵽ���°汾��������ͬ������������consumer��׼����������
+ *300-��δ����ͬ��������Client���ð汾����*/
+static int32_t check_producer_ready_sync(struct doris_business *business, struct evhttp_request *req, int64_t cur_version)
+{
+ const char *client_version;
+ int64_t clientversion;
+
+ if(NULL == (client_version=evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Sync-Current-Version")))
+ {
+ return HTTP_NOTMODIFIED;
+ }
+
+ /*request from sync client, check http posts-on-the-way first*/
+ if(business->posts_on_the_way)
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_DEBUG, "HttpProducer, posts-on-the-way: %d, meta response 300", business->posts_on_the_way);
+ return 300;
+ }
+
+ /*Client�汾����? �����˴ӻ����ӻ������������������£�����Client����ʱ���Ӷ������ƣ��ڻ�����ȡ*/
+ /*����ȡʱ���õ�300��Ӧ������ȡʱ��ֱ��ͬ�������İ汾�������������������汾���һ�£����304*/
+ if((clientversion=atol(client_version)) > cur_version)
+ {
+ business_set_sync_peer_abnormal(business);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "HttpProducer, client version(%lu) is newer than server(%lu)", clientversion, cur_version);
+ return 300;
+ }
+
+ business_resume_sync_peer_normal(business);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "HttpProducer, doris client is OK to sync for business: %s", business->bizname);
+ return HTTP_NOTMODIFIED;
+}
+
void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
{
struct evkeyvalq params;
- const char *version, *bizname;
+ const char *version;
int64_t verlong;
char *endptr=NULL, length[64];
struct version_list_node *vernode;
struct evbuffer *evbuf;
struct doris_business *business;
- map<string, struct doris_business*>::iterator iter;
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_META_REQ], 0, FS_OP_ADD, 1);
if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
@@ -128,29 +108,21 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid");
return;
}
- if(NULL == (bizname = evhttp_find_header(&params, "business")))
- {
- FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
- evhttp_clear_headers(&params);
- evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no business found");
- return;
- }
- if((iter = g_doris_server_info.name2business->find(string(bizname)))==g_doris_server_info.name2business->end())
+ if(NULL == (business = lookup_bizstruct_from_name(&params)))
{
- FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_clear_headers(&params);
- evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, business invalid");
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameter business invalid");
return;
}
evhttp_clear_headers(&params);
- business = iter->second;
pthread_rwlock_rdlock(&business->rwlock);
if(NULL == (vernode = lookup_vernode_accord_version(business->cfgver_head, verlong)))
{
+ int code = check_producer_ready_sync(business, req, business->cfgver_head->latest_version);
pthread_rwlock_unlock(&business->rwlock);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_META_NONEW], 0, FS_OP_ADD, 1);
- evhttp_send_error(req, HTTP_NOTMODIFIED, "No new configs found");
+ evhttp_send_error(req, code, "No new configs found");
return;
}
evbuf = evbuffer_new();
@@ -210,24 +182,14 @@ struct evbuffer *evbuf_content_from_disk(struct table_list_node *tablenode, size
return evbuf;
}
-void doris_response_file_range(struct evhttp_request *req, const char *bizname, const char *tablename,
- int64_t verlong, size_t start, size_t end, bool range)
+void doris_response_file_range(struct evhttp_request *req, struct doris_business *business,
+ const char *tablename, int64_t verlong, size_t start, size_t end, bool range)
{
struct version_list_node *vernode;
struct table_list_node *tablenode;
struct evbuffer *evbuf;
char length[128];
size_t filesize, res_length;
- struct doris_business *business;
- map<string, struct doris_business*>::iterator iter;
-
- if((iter = g_doris_server_info.name2business->find(string(bizname)))==g_doris_server_info.name2business->end())
- {
- FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
- evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, business invalid");
- return;
- }
- business = iter->second;
pthread_rwlock_rdlock(&business->rwlock);
if(NULL == (vernode = lookup_vernode_accord_version(business->cfgver_head, verlong)))
@@ -266,24 +228,29 @@ void doris_response_file_range(struct evhttp_request *req, const char *bizname,
assert(res_length == end + 1 - start);
sprintf(length, "%lu", res_length);
- FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_SEND_FILE_RES], FS_OP_ADD, 1);
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILES], 0, FS_OP_ADD, 1);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_BYTES], 0, FS_OP_ADD, res_length);
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Length", length);
+ evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Type", "application/stream");
+ evhttp_add_header(evhttp_request_get_output_headers(req), "Connection", "keep-alive");
if(range)
{
sprintf(length, "bytes %lu-%lu/%lu", start, end, filesize);
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Range", length);
+ evhttp_send_reply(req, 206, "Partial Content", evbuf);
+ }
+ else
+ {
+ evhttp_send_reply(req, HTTP_OK, "OK", evbuf);
}
- evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Type", "application/stream");
- evhttp_add_header(evhttp_request_get_output_headers(req), "Connection", "keep-alive");
- evhttp_send_reply(req, HTTP_OK, "OK", evbuf);
evbuffer_free(evbuf);
}
void doris_http_server_file_cb(struct evhttp_request *req, void *arg)
{
struct evkeyvalq params;
- const char *version, *tablename, *content_range, *bizname;
+ struct doris_business *business;
+ const char *version, *tablename, *content_range;
int64_t verlong;
char *endptr=NULL;
size_t req_start=0, req_end=0;
@@ -316,25 +283,22 @@ void doris_http_server_file_cb(struct evhttp_request *req, void *arg)
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Header Range invalid");
return;
- }
- if(NULL == (bizname = evhttp_find_header(&params, "business")))
+ }
+ if(NULL == (business = lookup_bizstruct_from_name(&params)))
{
- FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_clear_headers(&params);
- evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no business found");
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameter business invalid");
return;
}
- doris_response_file_range(req, bizname, tablename, verlong, req_start, req_end, (content_range==NULL)?false:true);
+ doris_response_file_range(req, business, tablename, verlong, req_start, req_end, (content_range==NULL)?false:true);
evhttp_clear_headers(&params);
}
void doris_http_server_version_cb(struct evhttp_request *req, void *arg)
{
struct evkeyvalq params;
- const char *bizname;
struct doris_business *business;
- map<string, struct doris_business*>::iterator iter;
char verbuf[32];
if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
@@ -343,23 +307,13 @@ void doris_http_server_version_cb(struct evhttp_request *req, void *arg)
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
return;
}
-
- if(NULL == (bizname = evhttp_find_header(&params, "business")))
- {
- FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
- evhttp_clear_headers(&params);
- evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no business found");
- return;
- }
- if((iter = g_doris_server_info.name2business->find(string(bizname)))==g_doris_server_info.name2business->end())
+ if(NULL == (business = lookup_bizstruct_from_name(&params)))
{
- FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_clear_headers(&params);
- evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, business invalid");
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameter business invalid");
return;
}
evhttp_clear_headers(&params);
- business = iter->second;
pthread_rwlock_rdlock(&business->rwlock);
sprintf(verbuf, "%lu", business->cfgver_head->latest_version);
@@ -464,13 +418,6 @@ SSL_CTX *doris_connections_create_ssl_ctx(void)
return ssl_ctx;
}
-struct bufferevent *doris_https_bufferevent_cb(struct event_base *evabse, void *arg)
-{
- SSL_CTX *ssl_instance = (SSL_CTX *)arg;
-
- return bufferevent_openssl_socket_new(evabse, -1, SSL_new(ssl_instance), BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE);
-}
-
void* thread_doris_http_server(void *arg)
{
struct event_base *worker_evbase;
@@ -492,9 +439,9 @@ void* thread_doris_http_server(void *arg)
evhttp_set_gencb(worker_http, doris_http_server_generic_cb, NULL);
evhttp_set_allowed_methods(worker_http, EVHTTP_REQ_GET|EVHTTP_REQ_HEAD);
- if(evhttp_accept_socket(worker_http, g_doris_server_info.listener))
+ if(evhttp_accept_socket(worker_http, g_doris_server_info.listener_csum))
{
- printf("evhttp_accept_socket %d error!\n", g_doris_server_info.listener);
+ printf("evhttp_accept_socket %d error!\n", g_doris_server_info.listener_csum);
assert(0); return NULL;
}
diff --git a/server/doris_server_http.h b/server/doris_server_http.h
index 3092a55..934cb56 100644
--- a/server/doris_server_http.h
+++ b/server/doris_server_http.h
@@ -3,7 +3,6 @@
#include <openssl/ssl.h>
-int doris_create_listen_socket(int bind_port);
SSL_CTX *doris_connections_create_ssl_ctx(void);
void* thread_doris_http_server(void *arg);
diff --git a/server/doris_server_main.cpp b/server/doris_server_main.cpp
index c7703cf..2bc10f4 100644
--- a/server/doris_server_main.cpp
+++ b/server/doris_server_main.cpp
@@ -7,17 +7,24 @@
#include <errno.h>
#include <pthread.h>
#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <net/if.h>
#include <event2/thread.h>
#include <MESA/MESA_prof_load.h>
-#include "doris_client.h"
+#include "doris_consumer_client.h"
+#include "doris_producer_client.h"
#include "doris_server_main.h"
#include "doris_server_http.h"
struct doris_global_info g_doris_server_info;
-static unsigned long doris_vesion_20210804=20210804L;
+static unsigned long doris_version_20210825=20210825L;
int doris_mkdir_according_path(const char * path)
{
@@ -54,6 +61,79 @@ int doris_mkdir_according_path(const char * path)
return 0;
}
+int32_t get_ip_by_ifname(const char *ifname)
+{
+ int sockfd;
+ struct ifreq ifr;
+ int32_t ip;
+
+ sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+ if (-1 == sockfd)
+ return INADDR_NONE;
+
+ strcpy(ifr.ifr_name,ifname);
+ if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0)
+ {
+ close(sockfd);
+ return INADDR_NONE;
+ }
+
+ ip = ((struct sockaddr_in*)&(ifr.ifr_addr))->sin_addr.s_addr;
+ close(sockfd);
+ return ip;
+}
+
+static inline void set_sockopt_keepalive(int sd, int keepidle, int keepintvl, int keepcnt)
+{
+ int keepalive = 1;
+ setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (void*)&keepalive, sizeof(keepalive));
+ setsockopt(sd, SOL_TCP, TCP_KEEPIDLE, (void*)&keepidle, sizeof(keepidle));
+ setsockopt(sd, SOL_TCP, TCP_KEEPINTVL, (void*)&keepintvl, sizeof(keepintvl));
+ setsockopt(sd, SOL_TCP, TCP_KEEPCNT, (void*)&keepcnt, sizeof(keepcnt));
+}
+
+static inline void set_listen_sockopt(int sd)
+{
+ if(g_doris_server_info.sock_recv_bufsize > 0)
+ {
+ setsockopt(sd, SOL_SOCKET, SO_RCVBUF, &g_doris_server_info.sock_recv_bufsize, sizeof(u_int32_t));
+ }
+}
+
+int doris_create_listen_socket(int bind_port)
+{
+ evutil_socket_t listener;
+ struct sockaddr_in sin;
+
+ listener = socket(AF_INET, SOCK_STREAM, 0);
+ if(listener < 0)
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "create socket error!\n");
+ return -1;
+ }
+ set_sockopt_keepalive(listener, 300, 10, 2);
+ set_listen_sockopt(listener);
+ evutil_make_listen_socket_reuseable(listener);
+
+ sin.sin_family = AF_INET;
+ sin.sin_addr.s_addr=htonl(INADDR_ANY);
+ sin.sin_port = htons(bind_port);
+
+ if (bind(listener, (struct sockaddr *)&sin, sizeof(sin)) < 0)
+ {
+ printf("bind socket to port: %d error: %s!\n", bind_port, strerror(errno));
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "bind socket to port: %d error: %s!\n", bind_port, strerror(errno));
+ assert(0);return -2;
+ }
+ if (listen(listener, 1024)<0)
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "listen socket 1024 error!\n");
+ return -3;
+ }
+ evutil_make_socket_nonblocking(listener);
+ return listener;
+}
+
static int doris_chech_name_valid(const char *name)
{
size_t i, namelen=strlen(name);
@@ -100,8 +180,11 @@ int32_t doris_read_profile_configs(const char *config_file)
assert(0);return -1;
}
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "cache_file_frag_size", &g_doris_server_info.cache_frag_size, 67108864);
- MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_server_role_on", &g_doris_server_info.server_role_sw, 1);
+ MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "http_post_max_frag_size", &g_doris_server_info.max_http_body_size, 67108864);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "index_file_format_maat", &g_doris_server_info.idx_file_maat, 0);
+ MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "http_post_max_concurrence", &g_doris_server_info.max_concurrent_reqs, 100000);
+ MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "http_post_cluster_on", &g_doris_server_info.cluster_sync_mode, 0);
+ MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "http_post_op_expires", &g_doris_server_info.post_vernode_ttl, 1200);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "https_connection_on", &g_doris_server_info.ssl_conn_on, 0);
@@ -122,18 +205,19 @@ int32_t doris_read_profile_configs(const char *config_file)
MESA_load_profile_int_def(config_file, "DORIS_SERVER", "fsstat_log_dst_port", &g_doris_server_info.fsstat_dst_port, 8125);
//LIBEVENT
- MESA_load_profile_int_def(config_file, "DORIS_SERVER", "server_listen_port", &g_doris_server_info.server_port, 9898);
- MESA_load_profile_int_def(config_file, "DORIS_SERVER", "manage_listen_port", &g_doris_server_info.manager_port, 2233);
+ MESA_load_profile_int_def(config_file, "DORIS_SERVER", "consumer_listen_port", &g_doris_server_info.consumer_port, 0);
+ MESA_load_profile_int_def(config_file, "DORIS_SERVER", "manager_listen_port", &g_doris_server_info.manager_port, 2233);
MESA_load_profile_int_def(config_file, "DORIS_SERVER", "socket_recv_bufsize", &g_doris_server_info.sock_recv_bufsize, 524288);
+ pthread_mutex_init(&g_doris_server_info.mutex_lock, NULL);
return 0;
}
static int doris_server_register_field_stat(struct doris_global_info *param)
{
const char *field_names[DRS_FSSTAT_FIELD_NUM]={"RecvErrVer", "FileStarts", "FileComplete", "ClientInvReq",
- "ClientMetaReq", "SendNoNewMeta", "ClientFileReq", "SendBytes", "SendFile404"};
+ "ClientMetaReq", "SendNoNewMeta", "ClientFileReq", "SendFiles", "SendBytes", "SendFile404", "VerExpire"};
const char *status_names[DRS_FSSTAT_STATUS_NUM]={"MemoryUsed"};
- const char *column_names[DRS_FSSTAT_CLUMN_NUM]={"RecvFullVer", "RecvIncVer", "RecvFiles", "SendResMeta", "SendFiles",
+ const char *column_names[DRS_FSSTAT_CLUMN_NUM]={"RecvFullVer", "RecvIncVer", "RecvFiles", "PostOnWay", "SendResMeta",
"CurFullVer", "CurIncVer", "TotalCfgNum"};
int value;
@@ -188,7 +272,8 @@ static int32_t doris_init_config_for_business(struct doris_global_info *info, st
{
char tmpbuffer[4096], tmp_dir[256], tmp_dir2[256], *bizname, *save=NULL;
struct doris_business *business;
- map<string, struct doris_parameter *>::iterator iter;
+ map<string, struct doris_csum_param *>::iterator iter_csm;
+ map<string, struct doris_prod_param *>::iterator iter_prd;
if(0>=MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "business_system_list", tmpbuffer, sizeof(tmpbuffer)))
{
@@ -214,7 +299,7 @@ static int32_t doris_init_config_for_business(struct doris_global_info *info, st
}
info->name2business->insert(make_pair(string(business->bizname), business));
- MESA_load_profile_uint_def(config_file, business->bizname, "grafana_monitor_status_id", &business->mm_status_codeid, 3);
+ MESA_load_profile_uint_def(config_file, business->bizname, "grafana_monitor_status_id", &business->mmval_status_codeid, 3);
MESA_load_profile_uint_def(config_file, business->bizname, "mem_cache_max_versions", &business->cache_max_versions, 0);
if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "store_config_path", business->store_path_root, sizeof(business->store_path_root)))
{
@@ -230,46 +315,66 @@ static int32_t doris_init_config_for_business(struct doris_global_info *info, st
}
MESA_load_profile_uint_def(config_file, business->bizname, "receive_config_way", &business->recv_way, RECV_WAY_DRS_CLIENT);
- if(business->recv_way == RECV_WAY_DRS_CLIENT)
+ assert(business->recv_way==RECV_WAY_IDX_FILE || business->recv_way==RECV_WAY_DRS_CLIENT || business->recv_way==RECV_WAY_HTTP_POST);
+ if(business->recv_way == RECV_WAY_IDX_FILE)
{
- if(0>=MESA_load_profile_string_nodef(config_file, business->bizname, "doris_client_confile", tmp_dir, sizeof(tmp_dir)))
+ if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_full", business->recv_path_full, sizeof(business->recv_path_full)))
{
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]doris_client_confile not found!", config_file);
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file);
assert(0);return -1;
}
- if((iter=info->confile2param->find(string(tmp_dir))) != info->confile2param->end())
- {
- business->param = iter->second;
- }
- else
+ if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_inc", business->recv_path_inc, sizeof(business->recv_path_inc)))
{
- business->param = doris_parameter_new(tmp_dir, manage_evbase, info->log_runtime);
- if(business->param == NULL)
- {
- assert(0);return -2;
- }
- info->confile2param->insert(make_pair(string(tmp_dir), business->param));
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file);
+ assert(0);return -1;
}
}
- else
+ else //������ͬ��Ҳ��Ҫ
{
- if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_full", business->recv_path_full, sizeof(business->recv_path_full)))
+ if(business->recv_way==RECV_WAY_DRS_CLIENT || g_doris_server_info.cluster_sync_mode)
{
- MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file);
- assert(0);return -1;
+ if(0>=MESA_load_profile_string_nodef(config_file, business->bizname, "doris_client_confile", tmp_dir, sizeof(tmp_dir)))
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]doris_client_confile not found!", config_file);
+ assert(0);return -1;
+ }
+ if((iter_csm=info->confile2csmparam->find(string(tmp_dir))) != info->confile2csmparam->end())
+ {
+ business->param_csum = iter_csm->second;
+ }
+ else
+ {
+ business->param_csum = doris_csum_parameter_new(tmp_dir, manage_evbase, info->log_runtime);
+ if(business->param_csum == NULL)
+ {
+ assert(0);return -2;
+ }
+ info->confile2csmparam->insert(make_pair(string(tmp_dir), business->param_csum));
+ }
}
- if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_inc", business->recv_path_inc, sizeof(business->recv_path_inc)))
+ if(business->recv_way == RECV_WAY_HTTP_POST) //��Ҫ��������ͬ��
{
- MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file);
- assert(0);return -1;
+ MESA_load_profile_uint_def(config_file, business->bizname, "producer_concurrence_allowed", &business->concurrency_allowed, 0);
+ if(MESA_load_profile_int_nodef(config_file, business->bizname, "producer_listen_port", &business->producer_port)<0)
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [%s]producer_listen_port not found!", config_file, business->bizname);
+ assert(0);return -1;
+ }
+ if(g_doris_server_info.cluster_sync_mode &&
+ NULL==(business->param_prod = doris_prod_parameter_new(tmp_dir, manage_evbase, info->log_runtime)))
+ {
+ assert(0);return -2;
+ }
+ g_doris_server_info.business_post_num++;
+ business->token2node = new map<string, struct version_list_node *>;
}
}
business->fs_lineid = FS_register(info->fsstat_handle, FS_STYLE_LINE, FS_CALC_CURRENT, business->bizname);;
snprintf(tmp_dir, 512, "latest_cfg_version_%s", business->bizname);
- business->mm_latest_ver = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Latest doris config version.");
+ business->mmid_latest_ver = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Latest doris config version.");
snprintf(tmp_dir, 512, "total_config_num_%s", business->bizname);
- business->mm_total_cfgnum = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Total config num from latest full version till now.");
+ business->mmid_total_cfgnum = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Total config num from latest full version till now.");
}
return 0;
}
@@ -302,14 +407,16 @@ int main(int argc, char **argv)
struct timeval tv;
struct event timer_statistic_fsstat;
struct evhttp *manager_http;
+ char netname[32];
+ memset(&g_doris_server_info, 0, sizeof(struct doris_global_info));
if(doris_read_profile_configs(NIRVANA_CONFIG_FILE) || doris_server_register_field_stat(&g_doris_server_info))
{
return -1;
}
evthread_use_pthreads();
g_doris_server_info.name2business = new map<string, struct doris_business*>;
- g_doris_server_info.confile2param = new map<string, struct doris_parameter *>;
+ g_doris_server_info.confile2csmparam = new map<string, struct doris_csum_param *>;
manage_evbase = event_base_new();
/*Doris manager server*/
@@ -327,7 +434,7 @@ int main(int argc, char **argv)
evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL);
evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL);
evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL);
- g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210804);
+ g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_version_20210825);
if(evhttp_accept_socket(manager_http, g_doris_server_info.manager))
{
printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager);
@@ -340,6 +447,24 @@ int main(int argc, char **argv)
{
return -4;
}
+ if(g_doris_server_info.business_post_num > 0)
+ {
+ if((MESA_load_profile_string_nodef(NIRVANA_CONFIG_FILE, "DORIS_SERVER", "local_net_name", netname, sizeof(netname))<0) ||
+ (g_doris_server_info.local_ip = get_ip_by_ifname(netname))<0)
+ {
+ printf("%s: [DORIS_SERVER]local_net_name not valid", NIRVANA_CONFIG_FILE);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]local_net_name not valid", NIRVANA_CONFIG_FILE);
+ assert(0);return -11;
+ }
+ g_doris_server_info.mmid_post_server = MESA_Monitor_register(g_doris_server_info.monitor,
+ "http_post_server_status", MONITOR_METRICS_GAUGE, "Running status of doris http post server.");
+ MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_DOWN);
+ }
+
+ if(g_doris_server_info.ssl_conn_on && NULL==(g_doris_server_info.ssl_instance=doris_connections_create_ssl_ctx()))
+ {
+ assert(0);return -8;
+ }
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
@@ -353,6 +478,14 @@ int main(int argc, char **argv)
assert(0);return -5;
}
}
+ else if(g_doris_server_info.business[i].recv_way == RECV_WAY_HTTP_POST)
+ {
+ if(pthread_create(&thread_desc, &attr, thread_http_post_recv_cfg, &g_doris_server_info.business[i]))
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
+ assert(0);return -6;
+ }
+ }
else
{
if(pthread_create(&thread_desc, &attr, thread_index_file_recv_cfg, &g_doris_server_info.business[i]))
@@ -364,17 +497,13 @@ int main(int argc, char **argv)
}
/*Doris http server*/
- if(g_doris_server_info.server_role_sw)
+ if(g_doris_server_info.consumer_port)
{
- g_doris_server_info.listener = doris_create_listen_socket(g_doris_server_info.server_port);
- if(g_doris_server_info.listener < 0)
+ g_doris_server_info.listener_csum = doris_create_listen_socket(g_doris_server_info.consumer_port);
+ if(g_doris_server_info.listener_csum < 0)
{
return -7;
}
- if(g_doris_server_info.ssl_conn_on && NULL==(g_doris_server_info.ssl_instance=doris_connections_create_ssl_ctx()))
- {
- assert(0);return -8;
- }
for(u_int32_t i=0; i<g_doris_server_info.iothreads; i++)
{
if(pthread_create(&thread_desc, &attr, thread_doris_http_server, NULL))
@@ -396,4 +525,3 @@ int main(int argc, char **argv)
return 0;
}
-
diff --git a/server/doris_server_main.h b/server/doris_server_main.h
index eca9286..ab5a4c6 100644
--- a/server/doris_server_main.h
+++ b/server/doris_server_main.h
@@ -18,7 +18,7 @@
#include "MESA_Monitor.h"
-#include "doris_client.h"
+#include "doris_consumer_client.h"
#include "doris_server_receive.h"
#include <map>
@@ -39,35 +39,65 @@ using namespace std;
#define RECV_WAY_IDX_FILE 2
#define RECV_WAY_HTTP_POST 3
+#define PROMETHUES_POST_SERVER_OK 0
+#define PROMETHUES_POST_SERVER_UPING 1
+#define PROMETHUES_POST_SERVER_DOWN 2
+
struct doris_business
{
+ /*first for configuration*/
char bizname[32];
u_int32_t recv_way;
u_int32_t cache_max_versions;
+ u_int32_t concurrency_allowed;
char recv_path_full[256];
char recv_path_inc[256];
char store_path_root[256];
struct version_list_handle *cfgver_head;
- struct doris_parameter *param;
-
+ struct doris_csum_param *param_csum;
+ struct doris_prod_param *param_prod;
+ u_int32_t ready_to_sync;
+ u_int32_t posts_on_the_way;
+ int32_t producer_port; //��֤ÿ��biz����server�߳�ֻ��һ�������⽻�����instance
+ evutil_socket_t listener_prod;
int64_t total_cfgnum;
- int32_t mm_latest_ver;
- int32_t mm_total_cfgnum;
- u_int32_t mm_status_codeid; //MM�ڲ��쳣״̬id
+ int32_t mmid_latest_ver;
+ int32_t mmid_total_cfgnum;
+ u_int32_t mmval_status_codeid; //MM�ڲ��쳣״̬id��Grafana��value
u_int32_t fs_lineid;
pthread_rwlock_t rwlock;
+
+ /*next for updating*/
+ struct event_base *worker_evbase;
+ struct doris_prod_instance *instance;
+ map<string, struct version_list_node *> *token2node;
+ int64_t version;
+ int32_t source_from;
+ int32_t type;
+ int64_t version_cfgnum;
+ char inc_index_path[256]; //incĿ¼�µ�����ȫ��
+ char tmp_index_path[256]; //incĿ¼�µ�����ȫ��
+ char full_index_path[256]; //fullĿ¼�µ�ȫ��
+ char cfg_file_path[256];
+ FILE *fp_cfg_file; //�����ڶ��ļ���DRS Client�ӿ�
+ FILE *fp_idx_file; //�����ڶ��ļ���DRS Client�ӿ�
+ struct version_list_node *cur_vernode;
};
struct doris_global_info
{
u_int32_t iothreads;
- int32_t server_port;
+ int32_t consumer_port;
int32_t manager_port;
int32_t sock_recv_bufsize;
u_int32_t ssl_conn_on;
u_int32_t scan_idx_interval;
u_int32_t cache_frag_size;
- u_int32_t server_role_sw;
+ u_int32_t max_http_body_size;
+ u_int32_t idx_file_maat;
+ u_int32_t max_concurrent_reqs;
+ u_int32_t cluster_sync_mode;
+ u_int32_t post_vernode_ttl;
char ssl_CA_path[256];
char ssl_cert_file[256];
@@ -76,16 +106,21 @@ struct doris_global_info
pthread_mutex_t *lock_cs;
SSL_CTX *ssl_instance;
- evutil_socket_t listener;
+ evutil_socket_t listener_csum;
evutil_socket_t manager;
+ u_int32_t token_seq;
+ int32_t local_ip;
struct doris_business business[MAX_BUSINESS_NUM];
u_int32_t business_num;
- u_int32_t idx_file_maat;
+ u_int32_t business_post_num; //postģʽ�м���
+ int32_t business_post_ups; //�����˼���
+ int32_t mmid_post_server; //value=PROMETHUES_POST_*
map<string, struct doris_business*> *name2business;
- map<string, struct doris_parameter *> *confile2param;
+ map<string, struct doris_csum_param *> *confile2csmparam;
struct MESA_MonitorHandler *monitor;
+ pthread_mutex_t mutex_lock;
/*logs*/
u_int32_t log_level;
@@ -106,6 +141,7 @@ struct doris_global_info
};
int doris_mkdir_according_path(const char * path);
+int doris_create_listen_socket(int bind_port);
#endif
diff --git a/server/doris_server_receive.cpp b/server/doris_server_receive.cpp
index 1eecc55..6d75fe4 100644
--- a/server/doris_server_receive.cpp
+++ b/server/doris_server_receive.cpp
@@ -10,6 +10,8 @@
#include <sys/prctl.h>
#include <time.h>
+#include <event2/bufferevent_ssl.h>
+
#include "doris_server_main.h"
#include "doris_server_scandir.h"
#include "doris_server_receive.h"
@@ -45,6 +47,8 @@ void config_table_node_cleanup(struct table_list_node *table_node)
TAILQ_REMOVE(&table_node->frag_head, fragnode, frag_node);
config_frag_node_cleanup(fragnode);
}
+ config_frag_node_cleanup(table_node->cur_frag);
+ cJSON_Delete(table_node->table_meta);
free(table_node);
}
@@ -59,10 +63,15 @@ void config_version_node_cleanup(struct version_list_node *vernode)
TAILQ_REMOVE(&vernode->table_head, tablenode, table_node);
config_table_node_cleanup(tablenode);
}
+ config_table_node_cleanup(vernode->cur_table);
free(vernode->metacont);
cJSON_Delete(vernode->metajson);
cJSON_Delete(vernode->arrayjson);
cJSON_Delete(vernode->table_meta);
+ if(vernode->business!=NULL && vernode->business->recv_way==RECV_WAY_HTTP_POST)
+ {
+ vernode->business->token2node->erase(string(vernode->token));
+ }
free(vernode);
}
@@ -128,7 +137,7 @@ static void cfgver_delay_destroy_timer_cb(int fd, short kind, void *userp)
free(delay_event);
}
-static void cfgver_handle_delay_destroy(struct confile_save *save, struct event_base *evbase, struct version_list_handle *version)
+static void cfgver_handle_delay_destroy(struct event_base *evbase, struct version_list_handle *version)
{
struct common_timer_event *delay_event;
@@ -139,186 +148,188 @@ static void cfgver_handle_delay_destroy(struct confile_save *save, struct event_
}
/*fileϵ�к�����д�����ļ�*/
-void doris_config_file_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
+void doris_config_file_version_start(struct doris_csum_instance *instance, cJSON *meta, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
+ struct doris_business *business=(struct doris_business *)userdata;
- if(save->type == CFG_UPDATE_TYPE_FULL)
+ if(business->type == CFG_UPDATE_TYPE_FULL)
{
- snprintf(save->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", save->business->store_path_root, save->version);
- snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", save->business->store_path_root, save->version);
- snprintf(save->full_index_path, 512, "%s/full/index/full_config_index.%010lu", save->business->store_path_root, save->version);
+ snprintf(business->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", business->store_path_root, business->version);
+ snprintf(business->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", business->store_path_root, business->version);
+ snprintf(business->full_index_path, 512, "%s/full/index/full_config_index.%010lu", business->store_path_root, business->version);
}
else
{
- snprintf(save->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", save->business->store_path_root, save->version);
- snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", save->business->store_path_root, save->version);
+ snprintf(business->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, business->version);
+ snprintf(business->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", business->store_path_root, business->version);
}
- if(NULL==(save->fp_idx_file = fopen(save->tmp_index_path, "w+")))
+ if(NULL==(business->fp_idx_file = fopen(business->tmp_index_path, "w+")))
{
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->tmp_index_path, strerror(errno));
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, business->tmp_index_path, strerror(errno));
assert(0);
}
}
-void doris_config_file_version_finish(struct doris_instance *instance, void *userdata)
+void doris_config_file_version_finish(struct doris_csum_instance *instance, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
+ struct doris_business *business=(struct doris_business *)userdata;
- fclose(save->fp_idx_file);
- if(rename(save->tmp_index_path, save->inc_index_path))
+ fclose(business->fp_idx_file);
+ if(rename(business->tmp_index_path, business->inc_index_path))
{
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", save->business->bizname, save->tmp_index_path, save->inc_index_path, strerror(errno));
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s",
+ business->bizname, business->tmp_index_path, business->inc_index_path, strerror(errno));
assert(0);
}
- if(save->type == CFG_UPDATE_TYPE_FULL)
+ if(business->type == CFG_UPDATE_TYPE_FULL)
{
- if(link(save->inc_index_path, save->full_index_path) && errno!=EEXIST) //����Ӳ����
+ if(link(business->inc_index_path, business->full_index_path) && errno!=EEXIST) //����Ӳ����
{
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", save->business->bizname, save->tmp_index_path, save->inc_index_path, strerror(errno));
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s",
+ business->bizname, business->tmp_index_path, business->inc_index_path, strerror(errno));
assert(0);
}
}
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu write finished, index file: %s", save->business->bizname, save->version, save->inc_index_path);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu write finished, index file: %s",
+ business->bizname, business->version, business->inc_index_path);
}
-void doris_config_file_version_error(struct doris_instance *instance, void *userdata)
+void doris_config_file_version_error(struct doris_csum_instance *instance, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
+ struct doris_business *business=(struct doris_business *)userdata;
- if(save->fp_idx_file != NULL)
+ if(business->fp_idx_file != NULL)
{
- fclose(save->fp_idx_file);
- remove(save->tmp_index_path);
+ fclose(business->fp_idx_file);
+ remove(business->tmp_index_path);
}
- if(save->fp_cfg_file != NULL)
+ if(business->fp_cfg_file != NULL)
{
- fclose(save->fp_cfg_file);
- remove(save->cfg_file_path);
+ fclose(business->fp_cfg_file);
+ remove(business->cfg_file_path);
}
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", save->business->bizname, save->version);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", business->bizname, business->version);
}
-void doris_config_file_cfgfile_start(struct doris_instance *instance,
+void doris_config_file_cfgfile_start(struct doris_csum_instance *instance,
const struct tablemeta *meta, const char *localpath, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
+ struct doris_business *business=(struct doris_business *)userdata;
struct tm *localtm, savetime;
time_t now;
const char *type;
char dir[256];
- type = (save->type == CFG_UPDATE_TYPE_FULL)?"full":"inc";
+ type = (business->type == CFG_UPDATE_TYPE_FULL)?"full":"inc";
now = time(NULL);
localtm = localtime_r(&now, &savetime);
- snprintf(dir, 256, "%s/%s/%04d-%02d-%02d", save->business->store_path_root, type, localtm->tm_year+1900, localtm->tm_mon+1, localtm->tm_mday);
+ snprintf(dir, 256, "%s/%s/%04d-%02d-%02d", business->store_path_root, type, localtm->tm_year+1900, localtm->tm_mon+1, localtm->tm_mday);
if(access(dir, F_OK))
{
doris_mkdir_according_path(dir);
}
- snprintf(save->cfg_file_path, 256, "%s/%s", dir, meta->filename);
- if(g_doris_server_info.idx_file_maat) //MAAT��ʽ��֪ͨ�ļ�
+ snprintf(business->cfg_file_path, 256, "%s/%s", dir, meta->filename);
+ if(g_doris_server_info.idx_file_maat || meta->userregion==NULL) //MAAT��ʽ��֪ͨ�ļ�
{
- fprintf(save->fp_idx_file, "%s\t%u\t%s\n", meta->tablename, meta->cfgnum, save->cfg_file_path);
+ fprintf(business->fp_idx_file, "%s\t%u\t%s\n", meta->tablename, meta->cfgnum, business->cfg_file_path);
}
else //ת����ɫ�����û��Զ�����Ϣ
{
- fprintf(save->fp_idx_file, "%s\t%u\t%s\t%s\n", meta->tablename, meta->cfgnum, save->cfg_file_path, meta->userregion);
+ fprintf(business->fp_idx_file, "%s\t%u\t%s\t%s\n", meta->tablename, meta->cfgnum, business->cfg_file_path, meta->userregion);
}
- if(NULL == (save->fp_cfg_file = fopen(save->cfg_file_path, "w+")))
+ if(NULL == (business->fp_cfg_file = fopen(business->cfg_file_path, "w+")))
{
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno));
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, business->cfg_file_path, strerror(errno));
assert(0);
}
else
{
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s start writing...", save->business->bizname, save->cfg_file_path);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s start writing...", business->bizname, business->cfg_file_path);
}
}
-void doris_config_file_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
+void doris_config_file_cfgfile_update(struct doris_csum_instance *instance, const char *data, size_t len, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
+ struct doris_business *business=(struct doris_business *)userdata;
size_t writen_len;
- writen_len = fwrite(data, 1, len, save->fp_cfg_file);
+ writen_len = fwrite(data, 1, len, business->fp_cfg_file);
if(writen_len != len)
{
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno));
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", business->bizname, business->cfg_file_path, strerror(errno));
assert(0);
}
}
-void doris_config_file_cfgfile_finish(struct doris_instance *instance, void *userdata)
+void doris_config_file_cfgfile_finish(struct doris_csum_instance *instance, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
- fclose(save->fp_cfg_file);
+ struct doris_business *business=(struct doris_business *)userdata;
+ fclose(business->fp_cfg_file);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s write finished", save->business->bizname, save->cfg_file_path);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s write finished", business->bizname, business->cfg_file_path);
}
/*memϵ�к������������ڴ�*/
-void doris_config_mem_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
+void doris_config_mem_version_start(struct doris_csum_instance *instance, cJSON *meta, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
+ struct doris_business *business=(struct doris_business *)userdata;
- save->cur_vernode = (struct version_list_node *)calloc(1, sizeof(struct version_list_node));
- TAILQ_INIT(&save->cur_vernode->table_head);
- save->cur_vernode->metajson = cJSON_CreateObject();
- save->cur_vernode->arrayjson= cJSON_CreateArray();
+ business->cur_vernode = (struct version_list_node *)calloc(1, sizeof(struct version_list_node));
+ TAILQ_INIT(&business->cur_vernode->table_head);
+ business->cur_vernode->metajson = cJSON_CreateObject();
+ business->cur_vernode->arrayjson= cJSON_CreateArray();
- save->cur_vernode->version = save->version;
- cJSON_AddNumberToObject(save->cur_vernode->metajson, "version", save->cur_vernode->version);
+ business->cur_vernode->version = business->version;
+ cJSON_AddNumberToObject(business->cur_vernode->metajson, "version", business->cur_vernode->version);
- save->cur_vernode->cfg_type = save->type;
- cJSON_AddNumberToObject(save->cur_vernode->metajson, "type", save->cur_vernode->cfg_type);
+ business->cur_vernode->cfg_type = business->type;
+ cJSON_AddNumberToObject(business->cur_vernode->metajson, "type", business->cur_vernode->cfg_type);
}
-void doris_config_mem_version_finish(struct doris_instance *instance, void *userdata)
+void doris_config_mem_version_finish(struct doris_csum_instance *instance, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
- struct version_list_handle *cur_version;
+ struct doris_business *business=(struct doris_business *)userdata;
struct version_list_handle *tmplist;
struct version_list_handle *cfgver_handle;
- cJSON_AddItemToObject(save->cur_vernode->metajson, "configs", save->cur_vernode->arrayjson);
- save->cur_vernode->arrayjson = NULL;
- save->cur_vernode->metacont = cJSON_PrintUnformatted(save->cur_vernode->metajson);
- save->cur_vernode->metalen = strlen(save->cur_vernode->metacont);
- cJSON_Delete(save->cur_vernode->metajson);
- save->cur_vernode->metajson = NULL;
+ cJSON_AddItemToObject(business->cur_vernode->metajson, "configs", business->cur_vernode->arrayjson);
+ business->cur_vernode->arrayjson = NULL;
+ business->cur_vernode->metacont = cJSON_PrintUnformatted(business->cur_vernode->metajson);
+ business->cur_vernode->metalen = strlen(business->cur_vernode->metacont);
+ cJSON_Delete(business->cur_vernode->metajson);
+ business->cur_vernode->metajson = NULL;
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %lu mem finished, info: %s", save->business->bizname, save->version, save->cur_vernode->metacont);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %lu mem finished, info: %s", business->bizname, business->version, business->cur_vernode->metacont);
- if(save->cur_vernode->cfg_type==CFG_UPDATE_TYPE_FULL && save->business->cfgver_head->latest_version!=0)
+ if(business->cur_vernode->cfg_type==CFG_UPDATE_TYPE_FULL && business->cfgver_head->latest_version!=0)
{
- cur_version = config_version_handle_new();
- cur_version->latest_version = save->cur_vernode->version;
- cur_version->version_num = 1;
- TAILQ_INSERT_TAIL(&cur_version->version_head, save->cur_vernode, version_node);
- cur_version->oldest_vernode = TAILQ_FIRST(&cur_version->version_head);
- cur_version->version2node->insert(make_pair(cur_version->latest_version, save->cur_vernode));
-
- pthread_rwlock_wrlock(&save->business->rwlock);
- tmplist = save->business->cfgver_head;
- save->business->cfgver_head = cur_version;
- pthread_rwlock_unlock(&save->business->rwlock);
- cfgver_handle_delay_destroy(save, save->evbase, tmplist);
+ cfgver_handle = config_version_handle_new();
+ cfgver_handle->latest_version = business->cur_vernode->version;
+ cfgver_handle->version_num = 1;
+ TAILQ_INSERT_TAIL(&cfgver_handle->version_head, business->cur_vernode, version_node);
+ cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head);
+ cfgver_handle->version2node->insert(make_pair(cfgver_handle->latest_version, business->cur_vernode));
+
+ pthread_rwlock_wrlock(&business->rwlock);
+ tmplist = business->cfgver_head;
+ business->cfgver_head = cfgver_handle;
+ pthread_rwlock_unlock(&business->rwlock);
+ cfgver_handle_delay_destroy(business->worker_evbase, tmplist);
}
else
{
- pthread_rwlock_wrlock(&save->business->rwlock);
- cfgver_handle = save->business->cfgver_head;
- TAILQ_INSERT_TAIL(&cfgver_handle->version_head, save->cur_vernode, version_node);
- cfgver_handle->latest_version = save->cur_vernode->version;
- cfgver_handle->version2node->insert(make_pair(save->cur_vernode->version, save->cur_vernode));
+ pthread_rwlock_wrlock(&business->rwlock);
+ cfgver_handle = business->cfgver_head;
+ TAILQ_INSERT_TAIL(&cfgver_handle->version_head, business->cur_vernode, version_node);
+ cfgver_handle->latest_version = business->cur_vernode->version;
+ cfgver_handle->version2node->insert(make_pair(business->cur_vernode->version, business->cur_vernode));
if(cfgver_handle->oldest_vernode == NULL)
{
cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head);
}
/*�����ļ�������໺��N���汾��Ԫ��Ϣȫ����*/
- if(save->business->cache_max_versions!=0 && cfgver_handle->version_num>=save->business->cache_max_versions)
+ if(business->cache_max_versions!=0 && cfgver_handle->version_num>=business->cache_max_versions)
{
config_version_node_free_content(cfgver_handle->oldest_vernode);
cfgver_handle->oldest_vernode = TAILQ_NEXT(cfgver_handle->oldest_vernode, version_node);
@@ -327,281 +338,287 @@ void doris_config_mem_version_finish(struct doris_instance *instance, void *user
{
cfgver_handle->version_num += 1;
}
- pthread_rwlock_unlock(&save->business->rwlock);
+ pthread_rwlock_unlock(&business->rwlock);
}
- save->cur_vernode = NULL;
+ business->cur_vernode = NULL;
}
-void doris_config_mem_version_error(struct doris_instance *instance, void *userdata)
+void doris_config_mem_version_error(struct doris_csum_instance *instance, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
+ struct doris_business *business=(struct doris_business *)userdata;
- config_frag_node_cleanup(save->cur_frag);
- config_table_node_cleanup(save->cur_table);
- config_version_node_cleanup(save->cur_vernode);
- save->cur_frag = NULL;
- save->cur_table = NULL;
- save->cur_vernode = NULL;
+ if(business->cur_vernode != NULL)
+ {
+ config_version_node_cleanup(business->cur_vernode);
+ }
+ business->cur_vernode = NULL;
}
-void doris_config_mem_cfgfile_start(struct doris_instance *instance,
+void doris_config_mem_cfgfile_start(struct doris_csum_instance *instance,
const struct tablemeta *meta, const char *localpath, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
-
- save->cur_vernode->table_meta = cJSON_CreateObject();
- cJSON_AddStringToObject(save->cur_vernode->table_meta, "tablename", meta->tablename);
- cJSON_AddStringToObject(save->cur_vernode->table_meta, "filename", meta->filename);
- cJSON_AddNumberToObject(save->cur_vernode->table_meta, "cfg_num", meta->cfgnum);
- cJSON_AddNumberToObject(save->cur_vernode->table_meta, "size", meta->size);
+ struct doris_business *business=(struct doris_business *)userdata;
+ struct table_list_node *cur_table;
+
+ business->cur_vernode->table_meta = cJSON_CreateObject();
+ cJSON_AddStringToObject(business->cur_vernode->table_meta, "tablename", meta->tablename);
+ cJSON_AddStringToObject(business->cur_vernode->table_meta, "filename", meta->filename);
+ cJSON_AddNumberToObject(business->cur_vernode->table_meta, "cfg_num", meta->cfgnum);
+ cJSON_AddNumberToObject(business->cur_vernode->table_meta, "size", meta->size);
if(meta->userregion != NULL)
{
- cJSON_AddStringToObject(save->cur_vernode->table_meta, "user_region", meta->userregion);
+ cJSON_AddStringToObject(business->cur_vernode->table_meta, "user_region", meta->userregion);
}
- save->cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node));
- save->cur_table->filesize = meta->size;
- snprintf(save->cur_table->tablename, 64, "%s", meta->tablename);
- snprintf(save->cur_table->localpath, 256, "%s", localpath);
- TAILQ_INIT(&save->cur_table->frag_head);
+ cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node));
+ cur_table->filesize = meta->size;
+ snprintf(cur_table->tablename, 64, "%s", meta->tablename);
+ snprintf(cur_table->localpath, 256, "%s", localpath);
+ TAILQ_INIT(&cur_table->frag_head);
+ business->cur_vernode->cur_table = cur_table;
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu start loading to memory...",
- save->business->bizname, meta->tablename, save->version);
+ business->bizname, meta->tablename, business->version);
}
-void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
+void doris_config_mem_cfgfile_update(struct doris_csum_instance *instance, const char *data, size_t len, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
+ struct doris_business *business=(struct doris_business *)userdata;
+ struct table_list_node *cur_table;
size_t cache_len, offset=0;
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[DRS_FSSTAT_MEMORY_USED], 0, FS_OP_ADD, len);
+
+ cur_table = business->cur_vernode->cur_table;
while(len > 0)
{
- if(save->cur_frag == NULL)
+ if(cur_table->cur_frag == NULL)
{
- save->cur_frag = (struct cont_frag_node *)calloc(1, sizeof(struct cont_frag_node));
- save->cur_frag->start = save->cur_table->cur_totallen;
- save->cur_frag->totalsize = save->cur_table->filesize - save->cur_table->cur_totallen;
- if(save->cur_frag->totalsize > g_doris_server_info.cache_frag_size)
+ cur_table->cur_frag = (struct cont_frag_node *)calloc(1, sizeof(struct cont_frag_node));
+ cur_table->cur_frag->start = cur_table->cur_totallen;
+ cur_table->cur_frag->totalsize = cur_table->filesize - cur_table->cur_totallen;
+ if(cur_table->filesize==0 || cur_table->cur_frag->totalsize > g_doris_server_info.cache_frag_size)
{
- save->cur_frag->totalsize = g_doris_server_info.cache_frag_size;
+ cur_table->cur_frag->totalsize = g_doris_server_info.cache_frag_size;
}
- save->cur_frag->end = save->cur_frag->start + save->cur_frag->totalsize - 1;
- save->cur_frag->content = (char *)malloc(save->cur_frag->totalsize);
+ cur_table->cur_frag->end = cur_table->cur_frag->start + cur_table->cur_frag->totalsize - 1;
+ cur_table->cur_frag->content = (char *)malloc(cur_table->cur_frag->totalsize);
}
- if(save->cur_frag->totalsize > save->cur_frag->cur_fraglen + len)
+ if(cur_table->cur_frag->totalsize > cur_table->cur_frag->cur_fraglen + len)
{
- memcpy(save->cur_frag->content+save->cur_frag->cur_fraglen, data+offset, len);
- save->cur_frag->cur_fraglen += len;
- save->cur_table->cur_totallen += len;
+ memcpy(cur_table->cur_frag->content+cur_table->cur_frag->cur_fraglen, data+offset, len);
+ cur_table->cur_frag->cur_fraglen += len;
+ cur_table->cur_totallen += len;
offset += len;
len = 0;
}
else
{
- cache_len = save->cur_frag->totalsize - save->cur_frag->cur_fraglen;
- memcpy(save->cur_frag->content+save->cur_frag->cur_fraglen, data+offset, cache_len);
- save->cur_frag->cur_fraglen += cache_len;
- save->cur_table->cur_totallen += cache_len;
+ cache_len = cur_table->cur_frag->totalsize - cur_table->cur_frag->cur_fraglen;
+ memcpy(cur_table->cur_frag->content+cur_table->cur_frag->cur_fraglen, data+offset, cache_len);
+ cur_table->cur_frag->cur_fraglen += cache_len;
+ cur_table->cur_totallen += cache_len;
offset += cache_len;
len -= cache_len;
- TAILQ_INSERT_TAIL(&save->cur_table->frag_head, save->cur_frag, frag_node);
- assert(save->cur_frag->cur_fraglen == save->cur_frag->end - save->cur_frag->start + 1);
- save->cur_frag = NULL;
+ TAILQ_INSERT_TAIL(&cur_table->frag_head, cur_table->cur_frag, frag_node);
+ assert(cur_table->cur_frag->cur_fraglen == cur_table->cur_frag->end - cur_table->cur_frag->start + 1);
+ cur_table->cur_frag = NULL;
}
}
- assert(save->cur_table->cur_totallen <= save->cur_table->filesize);
+ assert(cur_table->cur_totallen <= cur_table->filesize || cur_table->filesize==0);
}
-void doris_config_mem_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
+void doris_config_mem_cfgfile_finish(struct doris_csum_instance *instance, const char *md5, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
+ struct doris_business *business=(struct doris_business *)userdata;
+ struct table_list_node *cur_table;
- cJSON_AddStringToObject(save->cur_vernode->table_meta, "md5", md5);
- cJSON_AddItemToArray(save->cur_vernode->arrayjson, save->cur_vernode->table_meta);
- save->cur_vernode->table_meta = NULL;
+ cJSON_AddStringToObject(business->cur_vernode->table_meta, "md5", md5);
+ cJSON_AddItemToArray(business->cur_vernode->arrayjson, business->cur_vernode->table_meta);
+ business->cur_vernode->table_meta = NULL;
- if(save->cur_frag != NULL)
+ cur_table = business->cur_vernode->cur_table;
+ if(cur_table->cur_frag != NULL)
{
- TAILQ_INSERT_TAIL(&save->cur_table->frag_head, save->cur_frag, frag_node);
- assert(save->cur_frag->cur_fraglen == save->cur_frag->end - save->cur_frag->start + 1);
- save->cur_frag = NULL;
+ TAILQ_INSERT_TAIL(&cur_table->frag_head, cur_table->cur_frag, frag_node);
+ assert(cur_table->cur_frag->cur_fraglen == cur_table->cur_frag->end - cur_table->cur_frag->start + 1);
+ cur_table->cur_frag = NULL;
}
- assert(save->cur_table->cur_totallen == save->cur_table->filesize);
- TAILQ_INSERT_TAIL(&save->cur_vernode->table_head, save->cur_table, table_node);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu load to memory finished", save->business->bizname, save->cur_table->tablename, save->version);
- save->cur_table = NULL;
+ assert(cur_table->cur_totallen == cur_table->filesize);
+ TAILQ_INSERT_TAIL(&business->cur_vernode->table_head, cur_table, table_node);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu load to memory finished", business->bizname, cur_table->tablename, business->version);
+ business->cur_vernode->cur_table = NULL;
}
/*commonϵ�к�������������ʱ��������*/
-void doris_config_common_version_start(struct confile_save *save, cJSON *meta)
+void doris_config_common_version_start(struct doris_business *business, cJSON *meta)
{
cJSON *sub;
sub = cJSON_GetObjectItem(meta, "version");
- save->version = sub->valuedouble;
+ business->version = sub->valuedouble;
sub = cJSON_GetObjectItem(meta, "type");
- save->type = sub->valueint;
- assert(save->type==CFG_UPDATE_TYPE_FULL || save->type==CFG_UPDATE_TYPE_INC);
- save->version_cfgnum = 0;
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu start updating...", save->business->bizname, save->version);
+ business->type = sub->valueint;
+ assert(business->type==CFG_UPDATE_TYPE_FULL || business->type==CFG_UPDATE_TYPE_INC);
+ business->version_cfgnum = 0;
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu start updating...", business->bizname, business->version);
}
-void doris_config_common_version_finish(struct confile_save *save)
+void doris_config_common_version_finish(struct doris_business *business)
{
- if(save->type == CFG_UPDATE_TYPE_FULL)
+ if(business->type == CFG_UPDATE_TYPE_FULL)
{
- save->business->total_cfgnum = save->version_cfgnum;
- FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_FULL_VERSION], FS_OP_SET, save->version);
- FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_SET, save->version_cfgnum);
- FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FULL_VER], FS_OP_ADD, 1);
+ business->total_cfgnum = business->version_cfgnum;
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_FULL_VERSION], FS_OP_SET, business->version);
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_SET, business->version_cfgnum);
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FULL_VER], FS_OP_ADD, 1);
}
else
{
- save->business->total_cfgnum += save->version_cfgnum;
- FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_INC_VERSION], FS_OP_SET, save->version);
- FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_ADD, save->version_cfgnum);
- FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_INC_VER], FS_OP_ADD, 1);
+ business->total_cfgnum += business->version_cfgnum;
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_INC_VERSION], FS_OP_SET, business->version);
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_ADD, business->version_cfgnum);
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_INC_VER], FS_OP_ADD, 1);
}
- MESA_Monitor_operation(g_doris_server_info.monitor, save->business->mm_latest_ver, MONITOR_VALUE_SET, save->version);
- MESA_Monitor_operation(g_doris_server_info.monitor, save->business->mm_total_cfgnum, MONITOR_VALUE_SET, save->business->total_cfgnum);
- MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, save->business->mm_status_codeid, NULL, NULL);
- MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu update finished", save->business->bizname, save->version);
- }
+ MESA_Monitor_operation(g_doris_server_info.monitor, business->mmid_latest_ver, MONITOR_VALUE_SET, business->version);
+ MESA_Monitor_operation(g_doris_server_info.monitor, business->mmid_total_cfgnum, MONITOR_VALUE_SET, business->total_cfgnum);
+ MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, business->mmval_status_codeid, NULL, NULL);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu update finished", business->bizname, business->version);
+}
-void doris_config_common_version_error(struct confile_save *save)
+void doris_config_common_version_error(struct doris_business *business)
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_ERR_VER], 0, FS_OP_ADD, 1);
//Grafana+Promethues��չʾ�ڲ��쳣״̬
- MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, save->business->mm_status_codeid,
+ MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, business->mmval_status_codeid,
"Version receive error", "Receive config file error, please check producer");
}
-void doris_config_common_cfgfile_start(struct confile_save *save, u_int32_t cfgnum)
+void doris_config_common_cfgfile_start(struct doris_business *business, u_int32_t cfgnum)
{
- save->version_cfgnum += cfgnum;
+ business->version_cfgnum += cfgnum;
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_START_FILES], 0, FS_OP_ADD, 1);
}
-void doris_config_common_cfgfile_finish(struct confile_save *save)
+void doris_config_common_cfgfile_finish(struct doris_business *business)
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_CMPLT_FILES], 0, FS_OP_ADD, 1);
- FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FILES], FS_OP_ADD, 1);
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FILES], FS_OP_ADD, 1);
}
/*localmemϵ�к���������ʱ�ӱ��ػ�����Ļص�*/
-void doris_config_localmem_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
+void doris_config_localmem_version_start(struct doris_csum_instance *instance, cJSON *meta, void *userdata)
{
- doris_config_common_version_start((struct confile_save *)userdata, meta);
- if(g_doris_server_info.server_role_sw)
+ doris_config_common_version_start((struct doris_business *)userdata, meta);
+ if(g_doris_server_info.consumer_port)
{
doris_config_mem_version_start(instance, meta, userdata);
}
}
-void doris_config_localmem_version_finish(struct doris_instance *instance, void *userdata)
+void doris_config_localmem_version_finish(struct doris_csum_instance *instance, void *userdata)
{
- if(g_doris_server_info.server_role_sw)
+ if(g_doris_server_info.consumer_port)
{
doris_config_mem_version_finish(instance, userdata);
}
- doris_config_common_version_finish((struct confile_save *)userdata);
+ doris_config_common_version_finish((struct doris_business *)userdata);
}
-void doris_config_localmem_version_error(struct doris_instance *instance, void *userdata)
+void doris_config_localmem_version_error(struct doris_csum_instance *instance, void *userdata)
{
- doris_config_common_version_error((struct confile_save *)userdata);
- if(g_doris_server_info.server_role_sw)
+ doris_config_common_version_error((struct doris_business *)userdata);
+ if(g_doris_server_info.consumer_port)
{
doris_config_mem_version_error(instance, userdata);
}
}
-void doris_config_localmem_cfgfile_start(struct doris_instance *instance,
+void doris_config_localmem_cfgfile_start(struct doris_csum_instance *instance,
const struct tablemeta *meta, const char *localpath, void *userdata)
{
- doris_config_common_cfgfile_start((struct confile_save *)userdata, meta->cfgnum);
- if(g_doris_server_info.server_role_sw)
+ doris_config_common_cfgfile_start((struct doris_business *)userdata, meta->cfgnum);
+ if(g_doris_server_info.consumer_port)
{
doris_config_mem_cfgfile_start(instance, meta, localpath, userdata);
}
}
-void doris_config_localmem_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
+void doris_config_localmem_cfgfile_update(struct doris_csum_instance *instance, const char *data, size_t len, void *userdata)
{
- if(g_doris_server_info.server_role_sw)
+ if(g_doris_server_info.consumer_port)
{
doris_config_mem_cfgfile_update(instance, data, len, userdata);
}
}
-void doris_config_localmem_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
+void doris_config_localmem_cfgfile_finish(struct doris_csum_instance *instance, const char *md5, void *userdata)
{
- doris_config_common_cfgfile_finish((struct confile_save *)userdata);
- if(g_doris_server_info.server_role_sw)
+ doris_config_common_cfgfile_finish((struct doris_business *)userdata);
+ if(g_doris_server_info.consumer_port)
{
doris_config_mem_cfgfile_finish(instance, md5, userdata);
}
}
/*�ޱ��ϵ�к�������������ʱ�ص�*/
-void doris_config_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
+void doris_config_version_start(struct doris_csum_instance *instance, cJSON *meta, void *userdata)
{
- doris_config_common_version_start((struct confile_save *)userdata, meta);
+ doris_config_common_version_start((struct doris_business *)userdata, meta);
doris_config_file_version_start(instance, meta, userdata);
- if(g_doris_server_info.server_role_sw)
+ if(g_doris_server_info.consumer_port)
{
doris_config_mem_version_start(instance, meta, userdata);
}
}
-void doris_config_version_finish(struct doris_instance *instance, void *userdata)
+void doris_config_version_finish(struct doris_csum_instance *instance, void *userdata)
{
doris_config_file_version_finish(instance, userdata);
- if(g_doris_server_info.server_role_sw)
+ if(g_doris_server_info.consumer_port)
{
doris_config_mem_version_finish(instance, userdata);
}
- doris_config_common_version_finish((struct confile_save *)userdata);
+ doris_config_common_version_finish((struct doris_business *)userdata);
}
-void doris_config_version_error(struct doris_instance *instance, void *userdata)
+void doris_config_version_error(struct doris_csum_instance *instance, void *userdata)
{
- doris_config_common_version_error((struct confile_save *)userdata);
+ doris_config_common_version_error((struct doris_business *)userdata);
doris_config_file_version_error(instance, userdata);
- if(g_doris_server_info.server_role_sw)
+ if(g_doris_server_info.consumer_port)
{
doris_config_mem_version_error(instance, userdata);
}
}
-void doris_config_cfgfile_start(struct doris_instance *instance,
+void doris_config_cfgfile_start(struct doris_csum_instance *instance,
const struct tablemeta *meta, const char *localpath, void *userdata)
{
- struct confile_save *save=(struct confile_save *)userdata;
+ struct doris_business *business=(struct doris_business *)userdata;
- doris_config_common_cfgfile_start((struct confile_save *)userdata, meta->cfgnum);
+ doris_config_common_cfgfile_start((struct doris_business *)userdata, meta->cfgnum);
doris_config_file_cfgfile_start(instance, meta, localpath, userdata);
- if(g_doris_server_info.server_role_sw)
+ if(g_doris_server_info.consumer_port)
{
- doris_config_mem_cfgfile_start(instance, meta, save->cfg_file_path, userdata);
+ doris_config_mem_cfgfile_start(instance, meta, business->cfg_file_path, userdata);
}
}
-void doris_config_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
+void doris_config_cfgfile_update(struct doris_csum_instance *instance, const char *data, size_t len, void *userdata)
{
doris_config_file_cfgfile_update(instance, data, len, userdata);
- if(g_doris_server_info.server_role_sw)
+ if(g_doris_server_info.consumer_port)
{
doris_config_mem_cfgfile_update(instance, data, len, userdata);
}
}
-void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
+void doris_config_cfgfile_finish(struct doris_csum_instance *instance, const char *md5, void *userdata)
{
- doris_config_common_cfgfile_finish((struct confile_save *)userdata);
+ doris_config_common_cfgfile_finish((struct doris_business *)userdata);
doris_config_file_cfgfile_finish(instance, userdata);
- if(g_doris_server_info.server_role_sw)
+ if(g_doris_server_info.consumer_port)
{
doris_config_mem_cfgfile_finish(instance, md5, userdata);
}
@@ -611,22 +628,19 @@ void* thread_doris_client_recv_cfg(void *arg)
{
struct doris_business *business=(struct doris_business *)arg;
struct event_base *client_evbase;
- struct doris_instance *instance;
+ struct doris_csum_instance *instance;
struct doris_callbacks doris_cbs;
struct doris_arguments doris_args;
struct doris_idxfile_scanner *scanner;
enum DORIS_UPDATE_TYPE update_type;
- struct confile_save save;
char stored_path[512];
prctl(PR_SET_NAME, "client_recv");
client_evbase = event_base_new();
- memset(&save, 0, sizeof(struct confile_save));
- save.source_from = RECV_WAY_IDX_FILE;
- save.evbase = client_evbase;
- save.business = business;
+ business->source_from = RECV_WAY_IDX_FILE;
+ business->worker_evbase = client_evbase;
scanner = doris_index_file_scanner(0);
@@ -637,15 +651,14 @@ void* thread_doris_client_recv_cfg(void *arg)
doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start;
doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update;
doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
- doris_cbs.userdata = &save;
+ doris_cbs.version_updated= NULL;
+ doris_cbs.userdata = business;
snprintf(stored_path, 512, "%s/full/index", business->store_path_root);
update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
- assert(update_type!=CFG_UPDATE_TYPE_ERR);
snprintf(stored_path, 512, "%s/inc/index", business->store_path_root);
do {
update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
- assert(update_type!=CFG_UPDATE_TYPE_ERR);
}while(update_type != CFG_UPDATE_TYPE_NONE);
@@ -657,11 +670,11 @@ void* thread_doris_client_recv_cfg(void *arg)
doris_cbs.cfgfile_update = doris_config_cfgfile_update;
doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
- save.source_from = RECV_WAY_DRS_CLIENT;
+ business->source_from = RECV_WAY_DRS_CLIENT;
memset(&doris_args, 0, sizeof(struct doris_arguments));
doris_args.current_version = scanner->cur_version;
sprintf(doris_args.bizname, "%s", business->bizname);
- instance = doris_instance_new(business->param, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime);
+ instance = doris_csum_instance_new(business->param_csum, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime);
if(instance == NULL)
{
assert(0);return NULL;
@@ -693,7 +706,6 @@ void* thread_index_file_recv_cfg(void *arg)
{
struct doris_business *business=(struct doris_business *)arg;
struct event_base *client_evbase;
- struct confile_save save;
struct timeval tv;
struct scanner_timer_priv timer_priv;
enum DORIS_UPDATE_TYPE update_type;
@@ -701,14 +713,12 @@ void* thread_index_file_recv_cfg(void *arg)
prctl(PR_SET_NAME, "index_file");
- memset(&save, 0, sizeof(struct confile_save));
memset(&timer_priv, 0, sizeof(struct scanner_timer_priv));
client_evbase = event_base_new();
- save.source_from = RECV_WAY_IDX_FILE;
- save.evbase = client_evbase;
- save.business = business;
+ business->source_from = RECV_WAY_IDX_FILE;
+ business->worker_evbase = client_evbase;
timer_priv.scanner = doris_index_file_scanner(0);
timer_priv.business = business;
@@ -720,16 +730,15 @@ void* thread_index_file_recv_cfg(void *arg)
timer_priv.doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start;
timer_priv.doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update;
timer_priv.doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
- timer_priv.doris_cbs.userdata = &save;
+ timer_priv.doris_cbs.version_updated= NULL;
+ timer_priv.doris_cbs.userdata = business;
snprintf(stored_path, 512, "%s/full/index", business->store_path_root);
update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
- assert(update_type!=CFG_UPDATE_TYPE_ERR);
snprintf(stored_path, 512, "%s/inc/index", business->store_path_root);
do{
update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
- assert(update_type!=CFG_UPDATE_TYPE_ERR);
- }while(update_type!=CFG_UPDATE_TYPE_NONE && update_type!=CFG_UPDATE_TYPE_ERR);
+ }while(update_type!=CFG_UPDATE_TYPE_NONE);
/*Check new configs*/
@@ -742,8 +751,7 @@ void* thread_index_file_recv_cfg(void *arg)
update_type = doris_index_file_traverse(timer_priv.scanner, business->recv_path_full,
&timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
- assert(update_type!=CFG_UPDATE_TYPE_ERR);
- if(update_type!=CFG_UPDATE_TYPE_NONE && update_type!=CFG_UPDATE_TYPE_ERR)
+ if(update_type!=CFG_UPDATE_TYPE_NONE)
{
tv.tv_sec = 0;
}
@@ -761,3 +769,1376 @@ void* thread_index_file_recv_cfg(void *arg)
return NULL;
}
+struct bufferevent *doris_https_bufferevent_cb(struct event_base *evabse, void *arg)
+{
+ SSL_CTX *ssl_instance = (SSL_CTX *)arg;
+
+ return bufferevent_openssl_socket_new(evabse, -1, SSL_new(ssl_instance), BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE);
+}
+
+struct doris_business *lookup_bizstruct_from_name(const struct evkeyvalq *params)
+{
+ map<string, struct doris_business*>::iterator iter;
+ const char *bizname;
+
+ if(NULL == (bizname = evhttp_find_header(params, "business")))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ return NULL;
+ }
+ if((iter = g_doris_server_info.name2business->find(string(bizname)))==g_doris_server_info.name2business->end())
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ return NULL;
+ }
+ return iter->second;
+}
+
+struct version_list_node *lookup_vernode_struct_from_name(struct doris_business *business, const struct evkeyvalq *params)
+{
+ map<string, struct version_list_node *>::iterator iter;
+ const char *token;
+
+ if(NULL == (token = evhttp_find_header(params, "token")))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ return NULL;
+ }
+ if((iter = business->token2node->find(string(token)))==business->token2node->end())
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ return NULL;
+ }
+ return iter->second;
+}
+
+struct version_list_node *lookup_vernode_struct_from_name_renew(struct doris_business *business, const struct evkeyvalq *params)
+{
+ struct version_list_node *vernode;
+ struct timeval tv;
+
+ if(NULL == (vernode = lookup_vernode_struct_from_name(business, params)))
+ {
+ return NULL;
+ }
+ if(vernode->business->concurrency_allowed)
+ {
+ tv.tv_sec = g_doris_server_info.post_vernode_ttl;
+ tv.tv_usec = 0;
+ evtimer_add(&vernode->timer_expire, &tv);
+ }
+ return vernode;
+}
+
+/*��֤business֮�����ɵ�token����ͻ*/
+void prod_server_generate_token(struct doris_business *business, char *token/*OUT*/, size_t size)
+{
+ pthread_mutex_lock(&g_doris_server_info.mutex_lock);
+ snprintf(token, size, "%u-%lu-%u-%u", g_doris_server_info.local_ip, time(NULL), rand(), ++g_doris_server_info.token_seq);
+ pthread_mutex_unlock(&g_doris_server_info.mutex_lock);
+}
+
+void business_resume_sync_peer_normal(struct doris_business *business)
+{
+ u_int32_t business_post_ups;
+
+ if(!g_doris_server_info.cluster_sync_mode)
+ {
+ return;
+ }
+
+ if(1 == atomic_set(&business->ready_to_sync, 1) || business->listener_prod==0)
+ {
+ return;
+ }
+
+ pthread_mutex_lock(&g_doris_server_info.mutex_lock);
+ business_post_ups = ++g_doris_server_info.business_post_ups;
+ pthread_mutex_unlock(&g_doris_server_info.mutex_lock);
+ if(business_post_ups == g_doris_server_info.business_post_num)
+ {
+ MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_OK);
+ }
+ else
+ {
+ MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_UPING);
+ }
+ assert(business_post_ups <= g_doris_server_info.business_post_num);
+}
+
+void business_set_sync_peer_abnormal(struct doris_business *business)
+{
+ u_int32_t business_post_ups;
+
+ if(!g_doris_server_info.cluster_sync_mode)
+ {
+ return;
+ }
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mcluster sync error, please check slave status!!!\033[0m\n");
+
+ if(0 == atomic_set(&business->ready_to_sync, 0) || business->listener_prod==0)
+ {
+ return;
+ }
+ pthread_mutex_lock(&g_doris_server_info.mutex_lock);
+ business_post_ups = --g_doris_server_info.business_post_ups;
+ pthread_mutex_unlock(&g_doris_server_info.mutex_lock);
+ if(business_post_ups == 0)
+ {
+ MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_DOWN);
+ }
+ else
+ {
+ MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_UPING);
+ }
+ assert(business_post_ups < g_doris_server_info.business_post_num);
+}
+
+char *vernode_print_json_meta(struct version_list_node *vernode)
+{
+ struct table_list_node *tablenode;
+ cJSON *root, *array=NULL, *item;
+ char *p;
+
+ root = cJSON_CreateObject();
+ cJSON_AddStringToObject(root, "token", vernode->token);
+ cJSON_AddNumberToObject(root, "type", vernode->cfg_type);
+ TAILQ_FOREACH(tablenode, &vernode->table_head, table_node)
+ {
+ if(array == NULL)
+ {
+ array = cJSON_CreateArray();
+ }
+ item = cJSON_CreateObject();
+ cJSON_AddStringToObject(item, "tablename", tablenode->tablename);
+ cJSON_AddNumberToObject(item, "size", tablenode->cur_totallen);
+ cJSON_AddItemToArray(array, item);
+ assert(tablenode->finished); //�ϴ���ϵIJ��ܼ�������
+ }
+ if(vernode->cur_table != NULL)
+ {
+ if(array == NULL)
+ {
+ array = cJSON_CreateArray();
+ }
+ item = cJSON_CreateObject();
+ cJSON_AddStringToObject(item, "tablename", vernode->cur_table->tablename);
+ cJSON_AddNumberToObject(item, "offset", vernode->cur_table->cur_totallen);
+ cJSON_AddItemToArray(array, item);
+ }
+ cJSON_AddItemToObject(root, "configs", array);
+ p = cJSON_PrintUnformatted(root);
+ cJSON_Delete(root);
+ return p;
+}
+
+void http_prod_server_verion_check_cb(struct evhttp_request *req, void *arg)
+{
+ struct doris_business *business=(struct doris_business *)arg;
+ struct version_list_node *vernode;
+ struct evkeyvalq params;
+ struct evbuffer *evbuf;
+ char *p;
+
+ if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
+ return;
+ }
+ if(NULL == (vernode = lookup_vernode_struct_from_name(business, &params)))
+ {
+ evhttp_clear_headers(&params);
+ evhttp_send_error(req, HTTP_NOTFOUND, "Parameter token not found");
+ return;
+ }
+ evhttp_clear_headers(&params);
+
+ if(vernode->syncing)
+ {
+ evhttp_send_error(req, 310, "table syncing now, retry later");
+ return;
+ }
+
+ p = vernode_print_json_meta(vernode);
+
+ evbuf = evbuffer_new();
+ evbuffer_add(evbuf, p, strlen(p));
+ if(vernode->version_finished)
+ {
+ evhttp_send_reply(req, HTTP_OK, "OK", evbuf);
+ }
+ else
+ {
+ evhttp_send_reply(req, 300, "version is posting", evbuf);
+ }
+ evbuffer_free(evbuf);
+ free(p);
+}
+
+void http_config_direct_version_cancel(struct version_list_node *vernode, struct evhttp_request *req)
+{
+ struct doris_business *business=vernode->business;
+ struct table_list_node *tablenode;
+ char token[64];
+
+ sprintf(token, "%s", vernode->token);
+ if(vernode->synctx != NULL)
+ {
+ doris_prod_upload_ctx_destroy(vernode->synctx);
+ }
+ if(vernode->fp_idx_file != NULL)
+ {
+ fclose(vernode->fp_idx_file);
+ remove(vernode->tmp_index_path);
+ }
+ if(vernode->cur_table!=NULL && vernode->cur_table->fp_cfg_file != NULL)
+ {
+ fclose(vernode->cur_table->fp_cfg_file);
+ remove(vernode->cur_table->localpath);
+ }
+ TAILQ_FOREACH(tablenode, &vernode->table_head, table_node)
+ {
+ remove(tablenode->localpath);
+ }
+ config_version_node_cleanup(vernode);
+ if(evtimer_pending(&vernode->timer_expire, NULL))
+ {
+ evtimer_del(&vernode->timer_expire);
+ }
+ business->cur_vernode = NULL; //�������
+ business->posts_on_the_way--;
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, business->posts_on_the_way);
+
+ evhttp_send_reply(req, 200, "OK", NULL);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, post server version cancel, token: %s", business->bizname, token);
+}
+
+void prod_sync_vercancel_result_cb(enum PROD_VEROP_RES result, void *userdata)
+{
+ struct version_list_node *vernode=(struct version_list_node *)userdata;
+
+ vernode->syncing = 0;
+ vernode->retry_times++;
+ switch(result)
+ {
+ case VERSIONOP_RES_OK:
+ http_config_direct_version_cancel(vernode, vernode->req);
+ break;
+
+ case VERSIONOP_RES_ERROR:
+ evhttp_send_error(vernode->req, 500, "version cancel sync error res_code");
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version cancel sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname);
+ break;
+
+ case VERSIONOP_CURL_ERROR:
+ if(atomic_read(&vernode->business->ready_to_sync) && (vernode->retry_times < 3))
+ {
+ vernode->syncing = 1;
+ doris_prod_version_cancel(vernode->synctx, prod_sync_vercancel_result_cb, vernode);
+ }
+ else
+ {
+ http_config_direct_version_cancel(vernode, vernode->req);
+ business_set_sync_peer_abnormal(vernode->business);
+ }
+ break;
+ default: assert(0);break;
+ }
+}
+
+void http_prod_server_verion_cancel_cb(struct evhttp_request *req, void *arg)
+{
+ struct doris_business *business=(struct doris_business *)arg;
+ struct version_list_node *vernode;
+ struct evkeyvalq params;
+
+ if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
+ return;
+ }
+ if(NULL == (vernode = lookup_vernode_struct_from_name_renew(business, &params)))
+ {
+ evhttp_clear_headers(&params);
+ evhttp_send_error(req, HTTP_OK, "Parameter token not found"); //����һ����
+ return;
+ }
+ evhttp_clear_headers(&params);
+
+ if(vernode->version_finished)
+ {
+ evhttp_send_error(req, HTTP_BADREQUEST, "version already finished");
+ return;
+ }
+ if(vernode->syncing)
+ {
+ evhttp_send_error(req, 300, "table syncing now, retry later");
+ return;
+ }
+
+ if(!atomic_read(&business->ready_to_sync) ||
+ NULL!=evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Master-Slave-Sync"))
+ {
+ return http_config_direct_version_cancel(vernode, req);
+ }
+ vernode->retry_times = 0;
+ vernode->syncing = 1;
+ vernode->req = req;
+ doris_prod_version_cancel(vernode->synctx, prod_sync_vercancel_result_cb, vernode);
+}
+
+void doris_config_post_version_finish(struct doris_business *business, struct version_list_node *vernode, int64_t newversion)
+{
+ assert(newversion > vernode->version);
+ vernode->version = newversion;
+
+ if(vernode->cfg_type == CFG_UPDATE_TYPE_FULL)
+ {
+ snprintf(business->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", business->store_path_root, vernode->version);
+ snprintf(business->full_index_path, 512, "%s/full/index/full_config_index.%010lu", business->store_path_root, vernode->version);
+ }
+ else
+ {
+ snprintf(business->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, vernode->version);
+ }
+ /*HTTP postʱ����汾����ÿ�������Լ�����ʱ֪ͨ�ļ��������ñ����ļ��Ĺرպ���*/
+ sprintf(business->tmp_index_path, "%s", vernode->tmp_index_path);
+ business->version = vernode->version;
+ business->type = vernode->cfg_type;
+ business->fp_idx_file = vernode->fp_idx_file;
+ doris_config_file_version_finish(NULL, business);
+ vernode->fp_idx_file = NULL;
+
+ if(g_doris_server_info.consumer_port)
+ {
+ business->cur_vernode = vernode;
+ cJSON_AddNumberToObject(vernode->metajson, "version", vernode->version);
+ doris_config_mem_version_finish(NULL, business);
+ }
+
+ business->version_cfgnum = vernode->total_cfgs;
+ doris_config_common_version_finish(business);
+ business->cfgver_head->latest_version = vernode->version;
+
+ if(vernode->synctx != NULL)
+ {
+ doris_prod_upload_ctx_destroy(vernode->synctx);
+ vernode->synctx = NULL;
+ }
+ vernode->version_finished = 1;
+ business->posts_on_the_way--;
+ business->cur_vernode = NULL; //�������
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, business->posts_on_the_way);
+}
+
+void http_config_direct_version_finish(struct version_list_node *vernode, struct evhttp_request *req, int64_t set_version)
+{
+ struct doris_business *business=vernode->business;
+ char version[32], token[64];
+ int64_t new_version;
+
+ if(evtimer_pending(&vernode->timer_expire, NULL))
+ {
+ evtimer_del(&vernode->timer_expire);
+ }
+
+ if(set_version == 0)
+ {
+ new_version = business->cfgver_head->latest_version + 1;
+ }
+ else
+ {
+ new_version = set_version;
+ }
+ sprintf(token, "%s", vernode->token);
+ doris_config_post_version_finish(business, vernode, new_version);
+
+ sprintf(version, "%lu", new_version);
+ evhttp_add_header(evhttp_request_get_output_headers(req), "X-Set-Version", version);
+ evhttp_send_reply(req, 200, "OK", NULL);
+
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, post server version finish, token: %s, version: %lu", business->bizname, token, new_version);
+}
+
+void prod_sync_verend_result_cb(enum PROD_VEROP_RES result, int64_t version, void *userdata)
+{
+ struct version_list_node *vernode=(struct version_list_node *)userdata;
+
+ vernode->retry_times++;
+ vernode->syncing = 0;
+ switch(result)
+ {
+ case VERSIONOP_RES_OK:
+ http_config_direct_version_finish(vernode, vernode->req, version);
+ break;
+
+ case VERSIONOP_RES_ERROR:
+ evhttp_send_error(vernode->req, 500, "version end sync error res_code");
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version end sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname);
+ break;
+
+ case VERSIONOP_CURL_ERROR:
+ if(atomic_read(&vernode->business->ready_to_sync) && (vernode->retry_times < 3))
+ {
+ vernode->syncing = 1;
+ doris_prod_version_end(vernode->synctx, prod_sync_verend_result_cb, vernode);
+ }
+ else
+ {
+ http_config_direct_version_finish(vernode, vernode->req, 0);
+ business_set_sync_peer_abnormal(vernode->business);
+ }
+ break;
+ default: assert(0);break;
+ }
+}
+
+void http_prod_server_verion_end_cb(struct evhttp_request *req, void *arg)
+{
+ struct doris_business *business=(struct doris_business *)arg;
+ struct version_list_node *vernode;
+ struct evkeyvalq params;
+ char version[32];
+
+ if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
+ return;
+ }
+ if(NULL == (vernode = lookup_vernode_struct_from_name_renew(business, &params)))
+ {
+ evhttp_clear_headers(&params);
+ evhttp_send_error(req, HTTP_NOTFOUND, "Parameter token invalid");
+ return;
+ }
+ evhttp_clear_headers(&params);
+
+ if(vernode->version_finished)
+ {
+ sprintf(version, "%lu", vernode->version);
+ evhttp_add_header(evhttp_request_get_output_headers(req), "X-Set-Version", version);
+ evhttp_send_error(req, HTTP_OK, "version already finished"); //��֤����һ����
+ return;
+ }
+ if(vernode->cur_table != NULL || vernode->syncing)
+ {
+ evhttp_send_error(req, 300, "table not finished yet");
+ return;
+ }
+
+ if(!atomic_read(&business->ready_to_sync) ||
+ NULL!=evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Master-Slave-Sync"))
+ {
+ return http_config_direct_version_finish(vernode, req, 0);
+ }
+
+ if(vernode->synctx == NULL)
+ {
+ evhttp_send_error(req, 400, "illegal server host, cannt change server durain version life cycle");
+ return;
+ }
+ vernode->retry_times = 0;
+ vernode->syncing = 1;
+ vernode->req = req;
+ doris_prod_version_end(vernode->synctx, prod_sync_verend_result_cb, vernode);
+}
+
+static void post_vernode_expire_destroy_cb(int fd, short kind, void *userp)
+{
+ struct version_list_node *vernode=(struct version_list_node *)userp;
+ struct table_list_node *tablenode;
+ struct timeval tv;
+
+ if(vernode->syncing)
+ {
+ tv.tv_sec = g_doris_server_info.post_vernode_ttl;
+ tv.tv_usec = 0;
+ evtimer_add(&vernode->timer_expire, &tv);
+ return;
+ }
+ if(vernode->synctx != NULL)
+ {
+ doris_prod_upload_ctx_destroy(vernode->synctx);
+ vernode->synctx = NULL;
+ }
+
+ if(vernode->fp_idx_file != NULL)
+ {
+ fclose(vernode->fp_idx_file);
+ remove(vernode->tmp_index_path);
+ }
+ if(vernode->cur_table!=NULL && vernode->cur_table->fp_cfg_file != NULL)
+ {
+ fclose(vernode->cur_table->fp_cfg_file);
+ remove(vernode->cur_table->localpath);
+ }
+ TAILQ_FOREACH(tablenode, &vernode->table_head, table_node)
+ {
+ remove(tablenode->localpath);
+ }
+ vernode->business->posts_on_the_way--;
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_VERSION_EXPIRES], 0, FS_OP_ADD, 1);
+ FS_operate(g_doris_server_info.fsstat_handle, vernode->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, vernode->business->posts_on_the_way);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, token %s expires", vernode->business->bizname, vernode->token);
+ config_version_node_cleanup(vernode);
+}
+
+struct version_list_node *doris_config_post_version_prepare(struct doris_business *business, int32_t cfgtype)
+{
+ struct version_list_node *vernode;
+ struct timeval tv;
+
+ vernode = (struct version_list_node *)calloc(1, sizeof(struct version_list_node));
+ vernode->business = business;
+ vernode->cfg_type = cfgtype;
+
+ if(business->concurrency_allowed)
+ {
+ tv.tv_sec = g_doris_server_info.post_vernode_ttl;
+ tv.tv_usec = 0;
+ evtimer_assign(&vernode->timer_expire, business->worker_evbase, post_vernode_expire_destroy_cb, vernode);
+ evtimer_add(&vernode->timer_expire, &tv);
+ }
+ return vernode;
+}
+
+void doris_config_post_version_start(struct version_list_node *cur_vernode, const char *token)
+{
+ struct doris_business *business=cur_vernode->business;
+
+ snprintf(cur_vernode->token, 64, "%s", token);
+ if(cur_vernode->cfg_type == CFG_UPDATE_TYPE_FULL)
+ {
+ snprintf(cur_vernode->tmp_index_path, 512, "%s/inc/full_config_index.%s.ing", business->store_path_root, token);
+ }
+ else
+ {
+ snprintf(cur_vernode->tmp_index_path, 512, "%s/inc/full_config_index.%s.ing", business->store_path_root, token);
+ }
+ if(NULL==(cur_vernode->fp_idx_file = fopen(cur_vernode->tmp_index_path, "w+")))
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, cur_vernode->tmp_index_path, strerror(errno));
+ assert(0);
+ }
+
+ if(g_doris_server_info.consumer_port)
+ {
+ TAILQ_INIT(&cur_vernode->table_head);
+ cur_vernode->metajson = cJSON_CreateObject();
+ cur_vernode->arrayjson= cJSON_CreateArray();
+ cJSON_AddNumberToObject(cur_vernode->metajson, "type", cur_vernode->cfg_type);
+ }
+ business->token2node->insert(make_pair(string(token), cur_vernode));
+}
+
+void http_post_direct_version_start(struct version_list_node *cur_vernode, struct evhttp_request *req, const char *role)
+{
+ struct doris_business *business=cur_vernode->business;
+ char token[64], *p;
+ struct evbuffer *evbuf;
+ cJSON *meta;
+
+ prod_server_generate_token(business, token, 64);
+ doris_config_post_version_start(cur_vernode, token);
+
+ meta = cJSON_CreateObject();
+ cJSON_AddStringToObject(meta, "token", token);
+ p = cJSON_PrintUnformatted(meta);
+ cJSON_Delete(meta);
+
+ evbuf = evbuffer_new();
+ evbuffer_add(evbuf, p, strlen(p));
+ evhttp_send_reply(req, 200, "OK", evbuf);
+ evbuffer_free(evbuf);
+ cur_vernode->req = NULL;
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, post %s server send response version start: %s", business->bizname, role, p);
+ free(p);
+}
+
+void try_restore_from_busy_peer(struct version_list_node *cur_vernode, const char *body, bool busy)
+{
+ struct doris_business *business=cur_vernode->business;
+ struct evbuffer *evbuf;
+ cJSON *meta, *token;
+
+ /*�Զ˼�Ȼbusy��˵����һ�����������token������token�IJ����ǶԷ�����*/
+ if((NULL==(meta=cJSON_Parse(body))) || NULL==(token=(cJSON_GetObjectItem(meta, "token"))))
+ {
+ assert(0);
+ }
+ /*����һ��û���ϴ������ã���Ϊ���Է�����;������post server����������������curlʧ�ܵ����*/
+ assert(NULL == cJSON_GetObjectItem(meta, "configs"));
+
+ doris_config_post_version_start(cur_vernode, token->valuestring);
+ cJSON_Delete(meta);
+
+ evbuf = evbuffer_new();
+ evbuffer_add(evbuf, body, strlen(body));
+ evhttp_send_reply(cur_vernode->req, 200, "OK", evbuf);
+ evbuffer_free(evbuf);
+ cur_vernode->req = NULL;
+ if(busy)
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "\033[33mbusiness: %s, restore from busy peer, post master server send response version start: %s\033[0m", business->bizname, body);
+ }
+ else
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, post master server send response version start: %s", business->bizname, body);
+ }
+}
+
+void prod_sync_verstart_result_cb(enum PROD_VERSTART_RES result, const char *body, void *userdata)
+{
+ struct version_list_node *vernode=(struct version_list_node *)userdata;
+ struct doris_business *business=vernode->business;
+
+ vernode->retry_times++;
+ vernode->syncing = 0;
+ switch(result)
+ {
+ case VERSTART_RES_OK:
+ try_restore_from_busy_peer(vernode, body, false);
+ break;
+
+ case VERSTART_RES_BUSY: //һ��������ǰ����CURLE��������ģ���rate limit
+ try_restore_from_busy_peer(vernode, body, true);
+ break;
+
+ case VERSTART_RES_ERROR: //�Ƿ�����ֱ�ӷ��ظ�Client
+ evhttp_send_error(vernode->req, 500, "version start sync error res_code");
+ doris_prod_upload_ctx_destroy(vernode->synctx);
+ free(vernode);
+ business->cur_vernode = NULL;
+ business->posts_on_the_way--;
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, business->posts_on_the_way);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version start sync error res_code, abandon it. Send 500 response to client.\033[0m", business->bizname);
+ break;
+
+ case VERSTART_CURL_ERROR:
+ if(atomic_read(&business->ready_to_sync) && (vernode->retry_times < 3))
+ {
+ vernode->syncing = 1;
+ doris_prod_version_start_with_cb(vernode->synctx, prod_sync_verstart_result_cb, vernode);
+ }
+ else
+ {
+ http_post_direct_version_start(vernode, vernode->req, "master");
+ business_set_sync_peer_abnormal(vernode->business);
+ }
+ break;
+ default: assert(0);break;
+ }
+}
+
+void concurrency_send_busy_reply(struct doris_business *business, struct evhttp_request *req)
+{
+ char *p;
+ struct evbuffer *evbuf;
+
+ /*����������version start���̣���ͬ��δ���token����;�汾����ʱ���ܽ�����������*/
+ if(business->cur_vernode==NULL || business->cur_vernode->token[0]=='\0')
+ {
+ evhttp_send_error(req, 400, "another empty uploading busy");
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_DEBUG, "business: %s busy starting, posts-on-the-way: %d", business->bizname, business->posts_on_the_way);
+ return;
+ }
+
+ /*�����������version start��������;�汾����ͬ���õ��Է���token*/
+ p = vernode_print_json_meta(business->cur_vernode);
+ evbuf = evbuffer_new();
+ evbuffer_add(evbuf, p, strlen(p));
+ evhttp_send_reply(req, 300, "another uploading busy", evbuf);
+ evbuffer_free(evbuf);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s busy, posts-on-the-way: %d, reply: %s", business->bizname, business->posts_on_the_way, p);
+ free(p);
+}
+
+void http_prod_server_verion_start_cb(struct evhttp_request *req, void *arg)
+{
+ struct doris_business *argbiz=(struct doris_business *)arg, *business;
+ struct evkeyvalq params;
+ const char *type;
+ int cfgtype;
+
+ if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
+ return;
+ }
+ if(NULL == (business = lookup_bizstruct_from_name(&params)) || business!=argbiz)
+ {
+ evhttp_clear_headers(&params);
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameter business invalid");
+ return;
+ }
+ if(NULL == (type = evhttp_find_header(&params, "type")) || ((cfgtype=atoi(type))!=1 && cfgtype!=2))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_clear_headers(&params);
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameter type invalid");
+ return ;
+ }
+ evhttp_clear_headers(&params);
+ if(!business->concurrency_allowed && business->posts_on_the_way>0)
+ {
+ return concurrency_send_busy_reply(business, req);
+ }
+ if(business->posts_on_the_way > g_doris_server_info.max_concurrent_reqs)
+ {
+ evhttp_send_error(req, HTTP_SERVUNAVAIL, "Too many concurrent requests, service unavailable");
+ return ;
+ }
+
+ /*���ڲ����������������business->cur_vernodeʼ�ղ��䣻���������������*/
+ business->cur_vernode = doris_config_post_version_prepare(business, cfgtype);
+ business->posts_on_the_way++;
+ business->type = cfgtype;
+ FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, business->posts_on_the_way);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s receives a version start request, posts-on-the-way: %d", business->bizname, business->posts_on_the_way);
+
+ if(NULL != evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Master-Slave-Sync")) //�ڲ��������ͬ������
+ {
+ return http_post_direct_version_start(business->cur_vernode, req, "slave");
+ }
+
+ if(atomic_read(&business->ready_to_sync) &&
+ (NULL!=(business->cur_vernode->synctx=doris_prod_upload_ctx_new(business->instance, business->bizname, cfgtype))))
+ {
+ business->cur_vernode->retry_times = 0;
+ business->cur_vernode->req = req;
+ business->cur_vernode->syncing = 1;
+ doris_prod_version_start_with_cb(business->cur_vernode->synctx, prod_sync_verstart_result_cb, business->cur_vernode);
+ }
+ else
+ {
+ http_post_direct_version_start(business->cur_vernode, req, "master");
+ business_set_sync_peer_abnormal(business);
+ }
+}
+
+bool upload_frag_argument_check_offset(struct evhttp_request *req, struct evkeyvalq *params,
+ struct version_list_node *vernode, struct internal_tablemeta *tablemeta)
+{
+ const char *tmparg;
+ char *endptr=NULL, curoffset[32];
+ size_t length;
+
+ tablemeta->islast = 0;
+ if(NULL!=(tmparg=evhttp_find_header(params, "last")) && !strcasecmp(tmparg, "true"))
+ {
+ tablemeta->islast = 1;
+ }
+ if((length=evbuffer_get_length(evhttp_request_get_input_buffer(req))) > 0)
+ {
+ if(NULL == (tmparg = evhttp_find_header(params, "offset")))
+ {
+ evhttp_send_error(req, 401, "Parameter offset not found");
+ return false;
+ }
+ tablemeta->offset = strtol(tmparg, &endptr, 10);
+ if(*endptr != '\0')
+ {
+ evhttp_send_reply(req, 401, "Parameter offset invalid", NULL);
+ return false;
+ }
+ if(vernode->cur_table == NULL)
+ {
+ if(tablemeta->offset != 0)
+ {
+ evhttp_send_reply(req, 401, "Parameter offset is not starting from 0", NULL);
+ return false;
+ }
+ }
+ else if(tablemeta->offset+length <= vernode->cur_table->cur_totallen)
+ {
+ evhttp_send_reply(req, 201, "Parameter offset already uploaded", NULL);
+ return false;
+ }
+ else if(tablemeta->offset != vernode->cur_table->cur_totallen)
+ {
+ sprintf(curoffset, "%lu", vernode->cur_table->cur_totallen);
+ evhttp_add_header(evhttp_request_get_output_headers(req), "X-Current-Offset", curoffset);
+ evhttp_send_reply(req, 401, "Parameter offset invalid", NULL);
+ return false;
+ }
+ }
+ else if(!tablemeta->islast || vernode->cur_table==NULL) //�����last������δ��������˵��δ�ϴ�����
+ {
+ evhttp_send_error(req, 400, "Content length is zero, but parameter last!=true; or total length is zero, but parameter last=true");
+ return false;
+ }
+ return true;
+}
+
+struct version_list_node *upload_file_arguments_valid_check(struct evhttp_request *req,
+ struct doris_business *business, struct internal_tablemeta *tablemeta, bool fragcheck)
+{
+ struct evkeyvalq params;
+ struct version_list_node *vernode;
+ struct table_list_node *tablenode;
+ const char *tablename, *tmparg;
+
+ if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
+ return NULL;
+ }
+ if(NULL==(vernode = lookup_vernode_struct_from_name_renew(business, &params)))
+ {
+ evhttp_send_error(req, HTTP_NOTFOUND, "Parameter token invalid");
+ evhttp_clear_headers(&params);
+ return NULL;
+ }
+ if(NULL == (tablename = evhttp_find_header(&params, "tablename")))
+ {
+ evhttp_send_error(req, HTTP_BADREQUEST, "Parameter tablename invalid");
+ evhttp_clear_headers(&params);
+ return NULL;
+ }
+
+ /*�ϸ���δ���������������ű������ϴ�*/
+ if(vernode->cur_table!=NULL && (vernode->syncing || strcmp(vernode->cur_table->tablename, tablename)))
+ {
+ evhttp_send_error(req, 300, "tablename busy");
+ evhttp_clear_headers(&params);
+ return NULL;
+ }
+ /*finished���Ż�����������鿴�Ƿ����иñ���������֧���ظ��ı���*/
+ tablenode = TAILQ_FIRST(&vernode->table_head);
+ while(tablenode!=NULL && strcmp(tablename, tablenode->tablename))
+ {
+ tablenode = TAILQ_NEXT(tablenode, table_node);
+ }
+ if(tablenode != NULL)
+ {
+ evhttp_send_error(req, HTTP_BADREQUEST, "tablename already finished");
+ evhttp_clear_headers(&params);
+ return NULL;
+ }
+
+ if(fragcheck && !upload_frag_argument_check_offset(req, &params, vernode, tablemeta))
+ {
+ evhttp_clear_headers(&params);
+ return NULL;
+ }
+
+ snprintf(tablemeta->tablename, 64, "%s", tablename);
+ if(NULL == (tmparg = evhttp_find_header(&params, "filename")))
+ {
+ tablemeta->filename[0] = '\0';
+ }
+ else
+ {
+ snprintf(tablemeta->filename, 64, "%s", tmparg);
+ }
+ evhttp_clear_headers(&params);
+ return vernode;
+}
+
+bool upload_frag_check_content_md5(struct evhttp_request *req, const char *content, size_t len, char *md5str, int md5size)
+{
+ const char *md5;
+ MD5_CTX ctx;
+
+ if(NULL == (md5=evhttp_find_header(evhttp_request_get_input_headers(req), "Content-MD5")))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_send_error(req, 402, "Content-MD5 header not found");
+ return false;
+ }
+ MD5_Init(&ctx);
+ MD5_Update(&ctx, content, len);
+ scandir_md5_final_string(&ctx, md5str, md5size);
+ if(strcasecmp(md5, md5str))
+ {
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
+ evhttp_send_error(req, 402, "Content-MD5 not match");
+ return false;
+ }
+ return true;
+}
+
+void doris_config_post_cfgfile_prepare(struct version_list_node *cur_vernode,
+ struct internal_tablemeta *tablemeta, const char *md5, u_int32_t cfgnum, char *content, size_t size)
+{
+ if(cur_vernode->cur_table == NULL)
+ {
+ cur_vernode->cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node));
+ cur_vernode->cur_table->cfgnum = cfgnum;
+ cur_vernode->total_cfgs += cfgnum;
+ sprintf(cur_vernode->cur_table->tablename, "%s", tablemeta->tablename);
+ if(tablemeta->filename[0] != '\0') //Clientָ���ļ���
+ {
+ sprintf(cur_vernode->cur_table->filename, "%s", tablemeta->filename);
+ }
+ else
+ {
+ snprintf(cur_vernode->cur_table->filename, 128, "%s.%s", tablemeta->tablename, cur_vernode->token);
+ }
+ MD5_Init(&cur_vernode->cur_table->md5ctx);
+ TAILQ_INIT(&cur_vernode->cur_table->frag_head);
+ }
+ cur_vernode->cur_table->fragcontent = content;
+ cur_vernode->cur_table->fragsize = size;
+ cur_vernode->cur_table->finished = tablemeta->islast;
+ sprintf(cur_vernode->cur_table->fragmd5, "%s", md5);
+
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_DEBUG, "business: %s, table %s receives a file part, offset: %lu, size: %lu",
+ cur_vernode->business->bizname, tablemeta->tablename, tablemeta->offset, size);
+}
+
+void doris_config_post_cfgfile_start(struct version_list_node *vernode, struct evhttp_request *req)
+{
+ struct tablemeta meta;
+
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s start...", vernode->business->bizname, vernode->cur_table->filename);
+ FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_START_FILES], 0, FS_OP_ADD, 1);
+
+ meta.tablename = vernode->cur_table->tablename;
+ meta.filename = vernode->cur_table->filename;
+ meta.userregion = evhttp_find_header(evhttp_request_get_input_headers(req), "X-User-Info");
+ meta.cfgnum = vernode->cur_table->cfgnum;
+ meta.size = 0;
+
+ vernode->business->type = vernode->cfg_type;
+ vernode->business->fp_idx_file = vernode->fp_idx_file;
+ doris_config_file_cfgfile_start(NULL, &meta, NULL, vernode->business);
+ sprintf(vernode->cur_table->localpath, "%s", vernode->business->cfg_file_path);
+ vernode->cur_table->fp_cfg_file = vernode->business->fp_cfg_file;
+
+ if(g_doris_server_info.consumer_port)
+ {
+ vernode->cur_table->table_meta = cJSON_CreateObject();
+ cJSON_AddStringToObject(vernode->cur_table->table_meta, "tablename", meta.tablename);
+ cJSON_AddStringToObject(vernode->cur_table->table_meta, "filename", meta.filename);
+ cJSON_AddNumberToObject(vernode->cur_table->table_meta, "cfg_num", meta.cfgnum);
+ if(meta.userregion != NULL)
+ {
+ cJSON_AddStringToObject(vernode->cur_table->table_meta, "user_region", meta.userregion);
+ }
+ }
+}
+
+void doris_config_post_cfgfile_finish(struct version_list_node *vernode, const char *md5str)
+{
+ doris_config_common_cfgfile_finish(vernode->business);
+ fclose(vernode->cur_table->fp_cfg_file);
+
+ assert(vernode->cur_table->filesize == 0);
+ vernode->cur_table->filesize = vernode->cur_table->cur_totallen;
+ if(g_doris_server_info.consumer_port)
+ {
+ cJSON_AddNumberToObject(vernode->cur_table->table_meta, "size", vernode->cur_table->filesize);
+ cJSON_AddStringToObject(vernode->cur_table->table_meta, "md5", md5str);
+ cJSON_AddItemToArray(vernode->arrayjson, vernode->cur_table->table_meta);
+ vernode->cur_table->table_meta = NULL;
+ if(vernode->cur_table->cur_frag != NULL)
+ {
+ if(vernode->cur_table->cur_frag->totalsize > vernode->cur_table->cur_frag->cur_fraglen)
+ {
+ char *content = (char *)malloc(vernode->cur_table->cur_frag->cur_fraglen);
+ memcpy(content, vernode->cur_table->cur_frag->content, vernode->cur_table->cur_frag->cur_fraglen);
+ free(vernode->cur_table->cur_frag->content);
+ vernode->cur_table->cur_frag->content = content;
+ vernode->cur_table->cur_frag->totalsize = vernode->cur_table->cur_frag->cur_fraglen;
+ vernode->cur_table->cur_frag->end = vernode->cur_table->filesize - 1;
+ }
+ TAILQ_INSERT_TAIL(&vernode->cur_table->frag_head, vernode->cur_table->cur_frag, frag_node);
+ assert(vernode->cur_table->cur_frag->cur_fraglen == vernode->cur_table->cur_frag->end - vernode->cur_table->cur_frag->start + 1);
+ vernode->cur_table->cur_frag = NULL;
+ }
+ }
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s finished", vernode->business->bizname, vernode->cur_table->filename);
+ TAILQ_INSERT_TAIL(&vernode->table_head, vernode->cur_table, table_node);
+ vernode->cur_table = NULL; //��գ�׼����һ�ű�
+}
+
+void http_config_direct_cfgfile_update(struct version_list_node *vernode, struct evhttp_request *req)
+{
+ size_t writen_len;
+ char md5str[40];
+
+ if(vernode->cur_table->cur_totallen == 0) //start
+ {
+ doris_config_post_cfgfile_start(vernode, req);
+ }
+ if(vernode->cur_table->fragsize > 0)
+ {
+ writen_len = fwrite(vernode->cur_table->fragcontent, 1, vernode->cur_table->fragsize, vernode->cur_table->fp_cfg_file);
+ if(writen_len != vernode->cur_table->fragsize)
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", vernode->business->bizname, vernode->cur_table->localpath, strerror(errno));
+ assert(0);
+ }
+ if(g_doris_server_info.consumer_port)
+ {
+ vernode->business->cur_vernode = vernode;
+ doris_config_mem_cfgfile_update(NULL, vernode->cur_table->fragcontent, vernode->cur_table->fragsize, vernode->business);
+ }
+ else
+ {
+ vernode->cur_table->cur_totallen += vernode->cur_table->fragsize;
+ }
+ if(!vernode->cur_table->onceupload)
+ {
+ MD5_Update(&vernode->cur_table->md5ctx, vernode->cur_table->fragcontent, vernode->cur_table->fragsize);
+ }
+ free(vernode->cur_table->fragcontent);
+ }
+ if(vernode->cur_table->finished) //end
+ {
+ if(!vernode->cur_table->onceupload)
+ {
+ scandir_md5_final_string(&vernode->cur_table->md5ctx, md5str, 40);
+ doris_config_post_cfgfile_finish(vernode, md5str);
+ evhttp_add_header(evhttp_request_get_output_headers(req), "X-Content-MD5", md5str);
+ }
+ else
+ {
+ doris_config_post_cfgfile_finish(vernode, vernode->cur_table->fragmd5);
+ }
+ }
+ evhttp_send_reply(req, HTTP_OK, "OK", NULL);
+}
+
+void prod_sync_upload_frag_cb(enum PROD_VEROP_RES result,void * userdata)
+{
+ struct version_list_node *vernode=(struct version_list_node *)userdata;
+ struct table_meta meta;
+
+ vernode->retry_times++;
+ vernode->syncing = 0;
+ switch(result)
+ {
+ case VERSIONOP_RES_OK:
+ http_config_direct_cfgfile_update(vernode, vernode->req);
+ break;
+
+ case VERSIONOP_RES_ERROR:
+ evhttp_send_error(vernode->req, 500, "frag sync error res_code");
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, frag sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname);
+ break;
+
+ case VERSIONOP_CURL_ERROR:
+ if(atomic_read(&vernode->business->ready_to_sync) && (vernode->retry_times < 3))
+ {
+ vernode->syncing = 1;
+ meta.md5 = vernode->cur_table->fragmd5;
+ meta.cfgnum = vernode->cur_table->cfgnum;
+ meta.tablename = vernode->cur_table->tablename;
+ meta.filename = vernode->cur_table->filename;
+ meta.userregion = evhttp_find_header(evhttp_request_get_input_headers(vernode->req), "X-User-Info");
+ if(vernode->cur_table->onceupload)
+ {
+ doris_prod_upload_once_with_cb(vernode->synctx, vernode->cur_table->fragcontent,
+ vernode->cur_table->fragsize, &meta, prod_sync_upload_frag_cb, vernode);
+ }
+ else
+ {
+ doris_prod_upload_frag_with_cb(vernode->synctx, vernode->cur_table->fragcontent, vernode->cur_table->fragsize, vernode->cur_table->cur_totallen,
+ vernode->cur_table->finished?true:false, &meta, prod_sync_upload_frag_cb, vernode);
+ }
+ }
+ else
+ {
+ http_config_direct_cfgfile_update(vernode, vernode->req);
+ business_set_sync_peer_abnormal(vernode->business);
+ }
+ break;
+ default: assert(0);break;
+ }
+}
+
+void http_prod_server_file_once_cb(struct evhttp_request *req, void *arg)
+{
+ struct doris_business *business=(struct doris_business *)arg;
+ struct version_list_node *vernode;
+ char *content, md5str[64];
+ const char *tmp;
+ struct internal_tablemeta tablemeta;
+ size_t size;
+ struct table_meta meta;
+ int32_t cfgnum=0, need_sync=0;
+
+ if(NULL == (vernode=upload_file_arguments_valid_check(req, business, &tablemeta, false)))
+ {
+ return;
+ }
+ tablemeta.islast = 1;
+ tablemeta.offset = 0;
+
+ /*ԭʼClient����X-Doris-Master-Slave-Sync����ͷ��ͬһ�汾�������������*/
+ if(atomic_read(&business->ready_to_sync) &&
+ NULL==evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Master-Slave-Sync"))
+ {
+ need_sync = 1;
+ }
+ if(need_sync && vernode->synctx==NULL)
+ {
+ evhttp_send_error(req, 400, "illegal server host, cannt change server durain version life cycle");
+ return ;
+ }
+
+ if((size=evbuffer_get_length(evhttp_request_get_input_buffer(req))) == 0)
+ {
+ evhttp_send_error(req, 400, "no content");
+ return ;
+ }
+ content = (char*)malloc(size);
+ if(size != (size_t)evbuffer_copyout(evhttp_request_get_input_buffer(req), content, size))
+ {
+ assert(0);
+ }
+ if(!upload_frag_check_content_md5(req, content, size, md5str, 64))
+ {
+ free(content);
+ return ;
+ }
+
+ if(NULL != (tmp=evhttp_find_header(evhttp_request_get_input_headers(req), "X-Config-Num")))
+ {
+ cfgnum = atoi(tmp);;
+ }
+ doris_config_post_cfgfile_prepare(vernode, &tablemeta, md5str, cfgnum, content, size);
+
+ meta.md5 = md5str;
+ meta.cfgnum = cfgnum;
+ meta.tablename = tablemeta.tablename;
+ meta.userregion = evhttp_find_header(evhttp_request_get_input_headers(req), "X-User-Info");
+ meta.filename = vernode->cur_table->filename;
+ vernode->cur_table->onceupload = true;
+
+ if(!need_sync)
+ {
+ return http_config_direct_cfgfile_update(vernode, req);
+ }
+ vernode->retry_times = 0;
+ vernode->req = req;
+ vernode->syncing = 1;
+ doris_prod_upload_once_with_cb(vernode->synctx, content, size, &meta, prod_sync_upload_frag_cb, vernode);
+}
+
+void http_prod_server_file_frag_cb(struct evhttp_request *req, void *arg)
+{
+ struct doris_business *business=(struct doris_business *)arg;
+ struct version_list_node *vernode;
+ char *content=NULL, md5str[64];
+ const char *tmp;
+ struct internal_tablemeta tablemeta;
+ size_t size=0;
+ struct table_meta meta;
+ int32_t cfgnum=0, need_sync=0;
+
+ if(NULL == (vernode=upload_file_arguments_valid_check(req, business, &tablemeta, true)))
+ {
+ return;
+ }
+ if((size=evbuffer_get_length(evhttp_request_get_input_buffer(req)))==0 && !tablemeta.islast)
+ {
+ evhttp_send_error(req, 400, "no content");
+ return ;
+ }
+
+ if(atomic_read(&business->ready_to_sync) &&
+ NULL==evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Master-Slave-Sync"))
+ {
+ need_sync = 1;
+ }
+ if(need_sync && vernode->synctx==NULL)
+ {
+ evhttp_send_error(req, 400, "illegal server host, cannt change server durain version life cycle");
+ return ;
+ }
+
+ if(size > 0)
+ {
+ content = (char *)malloc(size);
+ if(size != (size_t)evbuffer_copyout(evhttp_request_get_input_buffer(req), content, size))
+ {
+ assert(0);
+ }
+ if(!upload_frag_check_content_md5(req, content, size, md5str, 64))
+ {
+ free(content);
+ return ;
+ }
+ }
+
+ if(NULL != (tmp=evhttp_find_header(evhttp_request_get_input_headers(req), "X-Config-Num")))
+ {
+ cfgnum = atoi(tmp);;
+ }
+ doris_config_post_cfgfile_prepare(vernode, &tablemeta, md5str, cfgnum, content, size);
+
+ meta.md5 = md5str;
+ meta.cfgnum = cfgnum;
+ meta.tablename = tablemeta.tablename;
+ meta.userregion = evhttp_find_header(evhttp_request_get_input_headers(req), "X-User-Info");
+ meta.filename = vernode->cur_table->filename;
+ if(tablemeta.islast && tablemeta.offset==0)
+ {
+ vernode->cur_table->onceupload = true;
+ }
+
+ if(!need_sync)
+ {
+ return http_config_direct_cfgfile_update(vernode, req);
+ }
+ vernode->retry_times = 0;
+ vernode->req = req;
+ vernode->syncing = 1;
+ doris_prod_upload_frag_with_cb(vernode->synctx, content, size, vernode->cur_table->cur_totallen,
+ tablemeta.islast?true:false, &meta, prod_sync_upload_frag_cb, vernode);
+}
+
+void start_business_http_post_server(struct doris_business *business)
+{
+ struct evhttp *worker_http;
+
+ if((business->listener_prod = doris_create_listen_socket(business->producer_port)) < 0)
+ {
+ assert(0);return;
+ }
+ business->source_from = RECV_WAY_HTTP_POST;
+ worker_http = evhttp_new(business->worker_evbase);
+ if(g_doris_server_info.ssl_conn_on)
+ {
+ evhttp_set_bevcb(worker_http, doris_https_bufferevent_cb, g_doris_server_info.ssl_instance);
+ }
+
+ evhttp_set_cb(worker_http, "/version/start", http_prod_server_verion_start_cb, business);
+ evhttp_set_cb(worker_http, "/version/finish", http_prod_server_verion_end_cb, business);
+ evhttp_set_cb(worker_http, "/version/cancel", http_prod_server_verion_cancel_cb, business);
+ evhttp_set_cb(worker_http, "/version/check", http_prod_server_verion_check_cb, business);
+ evhttp_set_cb(worker_http, "/fileonce/upload", http_prod_server_file_once_cb, business);
+ evhttp_set_cb(worker_http, "/filefrag/upload", http_prod_server_file_frag_cb, business);
+ evhttp_set_allowed_methods(worker_http, EVHTTP_REQ_POST|EVHTTP_REQ_PUT|EVHTTP_REQ_HEAD);
+ evhttp_set_max_body_size(worker_http, g_doris_server_info.max_http_body_size);
+
+ if(evhttp_accept_socket(worker_http, business->listener_prod))
+ {
+ printf("evhttp_accept_socket %d error!\n", business->listener_prod);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "evhttp_accept_socket %d error!\n", business->listener_prod);
+ assert(0);
+ }
+}
+
+void doris_config_version_sync_updated(struct doris_csum_instance *instance, void *userdata)
+{
+ struct doris_business *business=(struct doris_business *)userdata;
+ struct doris_csum_param *param;
+ u_int32_t references, business_post_ups;
+
+ /*����consuemer��ͬʱȷ��������ִֻ��һ��*/
+ param = doris_csum_instance_get_param(instance);
+ doris_csum_instance_destroy(instance);
+ references = doris_csum_param_get_refernces(param);
+ if(references == 0)
+ {
+ doris_csum_parameter_destroy(param);
+ }
+
+ /*init sync instance*/
+ business->instance = doris_prod_instance_new(business->param_prod, business->worker_evbase, g_doris_server_info.log_runtime);
+ if(business->instance == NULL)
+ {
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "doris_prod_instance_new for %s failed", business->bizname);
+ assert(0);return;
+ }
+
+ /*start worker*/
+ start_business_http_post_server(business);
+
+ /*ͬ����ɣ���ʾ�����汾��server��һ��(ע����һ�£������DZ�������, ��������ʱserver���᷵��304)*/
+ atomic_set(&business->ready_to_sync, 1);
+
+ pthread_mutex_lock(&g_doris_server_info.mutex_lock);
+ business_post_ups = ++g_doris_server_info.business_post_ups;
+ pthread_mutex_unlock(&g_doris_server_info.mutex_lock);
+ if(business_post_ups == g_doris_server_info.business_post_num)
+ {
+ MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_OK);
+ }
+ else
+ {
+ MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_UPING);
+ }
+ assert(business_post_ups <= g_doris_server_info.business_post_num);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[32m******Doris Producer worker for %s starts******\033[0m", business->bizname);
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "HttpProducer, doris ready to sync for business: %s\n", business->bizname);
+}
+
+/*��thread_doris_client_recv_cfg��������version_updated����*/
+void* thread_http_post_recv_cfg(void *arg)
+{
+ struct doris_business *business=(struct doris_business *)arg;
+ struct event_base *client_evbase;
+ struct doris_csum_instance *instance;
+ struct doris_callbacks doris_cbs;
+ struct doris_arguments doris_args;
+ struct doris_idxfile_scanner *scanner;
+ enum DORIS_UPDATE_TYPE update_type;
+ char stored_path[512];
+
+ prctl(PR_SET_NAME, "http_post");
+
+ client_evbase = event_base_new();
+
+ business->source_from = RECV_WAY_IDX_FILE;
+ business->worker_evbase = client_evbase;
+
+ scanner = doris_index_file_scanner(0);
+
+ /*Retaive latest config to memory from Stored configs*/
+ doris_cbs.version_start = doris_config_localmem_version_start;
+ doris_cbs.version_finish = doris_config_localmem_version_finish;
+ doris_cbs.version_error = doris_config_localmem_version_error;
+ doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start;
+ doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update;
+ doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
+ doris_cbs.version_updated= NULL;
+ doris_cbs.userdata = business;
+
+ snprintf(stored_path, 512, "%s/full/index", business->store_path_root);
+ update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
+ snprintf(stored_path, 512, "%s/inc/index", business->store_path_root);
+ do {
+ update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
+ }while(update_type != CFG_UPDATE_TYPE_NONE);
+
+ if(g_doris_server_info.cluster_sync_mode) /*Check new configs*/
+ {
+ doris_cbs.version_start = doris_config_version_start;
+ doris_cbs.version_finish = doris_config_version_finish;
+ doris_cbs.version_error = doris_config_version_error;
+ doris_cbs.cfgfile_start = doris_config_cfgfile_start;
+ doris_cbs.cfgfile_update = doris_config_cfgfile_update;
+ doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
+ doris_cbs.version_updated= doris_config_version_sync_updated;
+
+ business->source_from = RECV_WAY_DRS_CLIENT;
+ memset(&doris_args, 0, sizeof(struct doris_arguments));
+ doris_args.current_version = scanner->cur_version;
+ sprintf(doris_args.bizname, "%s", business->bizname);
+ instance = doris_csum_instance_new(business->param_csum, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime);
+ if(instance == NULL)
+ {
+ assert(0);return NULL;
+ }
+ }
+ else
+ {
+ start_business_http_post_server(business);
+ }
+
+ event_base_dispatch(client_evbase);
+ printf("Libevent dispath error, should not run here.\n");
+ MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
+ assert(0);return NULL;
+}
+
diff --git a/server/doris_server_receive.h b/server/doris_server_receive.h
index 846ae78..70b4ea6 100644
--- a/server/doris_server_receive.h
+++ b/server/doris_server_receive.h
@@ -4,8 +4,23 @@
#include <stdio.h>
#include <sys/queue.h>
#include <event.h>
+#include <openssl/md5.h>
#include <cjson/cJSON.h>
+#include <evhttp.h>
+
+#include "doris_producer_client.h"
+
+#if(__GNUC__ * 100 + __GNUC_MINOR__ * 10 + __GNUC_PATCHLEVEL__ >= 410)
+#define atomic_inc(x) __sync_add_and_fetch((x),1)
+#define atomic_dec(x) __sync_sub_and_fetch((x),1)
+#define atomic_add(x,y) __sync_add_and_fetch((x),(y))
+#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y))
+#define atomic_read(x) __sync_add_and_fetch((x),0)
+#define atomic_set(x,y) __sync_lock_test_and_set((x),y)
+#else
+#error "GCC version should be 4.1.2 later"
+#endif
#include <map>
using namespace std;
@@ -20,8 +35,10 @@ enum DORIS_SERVER_FS_FILED
DRS_FSSTAT_CLIENT_META_REQ,
DRS_FSSTAT_SEND_META_NONEW,
DRS_FSSTAT_CLIENT_FILE_REQ,
+ DRS_FSSTAT_SEND_FILES,
DRS_FSSTAT_SEND_FILE_BYTES,
DRS_FSSTAT_SEND_FILE_RES_404,
+ DRS_FSSTAT_VERSION_EXPIRES,
DRS_FSSTAT_FIELD_NUM,
};
@@ -31,8 +48,8 @@ enum DORIS_SERVER_FS_COLUMN
DRS_FSCLM_RECV_FULL_VER=0,
DRS_FSCLM_RECV_INC_VER,
DRS_FSCLM_RECV_FILES,
+ DRS_FSCLM_POST_ON_THE_WAY,
DRS_FSCLM_SEND_META_RES,
- DRS_FSCLM_SEND_FILE_RES,
DRS_FSCLM_CUR_FULL_VERSION,
DRS_FSCLM_CUR_INC_VERSION,
DRS_FSCLM_CONFIG_TOTAL_NUM,
@@ -58,6 +75,14 @@ struct cont_frag_node
TAILQ_ENTRY(cont_frag_node) frag_node;
};
+struct internal_tablemeta
+{
+ char tablename[64];
+ char filename[64];
+ size_t offset;
+ int32_t islast;
+};
+
struct table_list_node
{
char tablename[64];
@@ -65,20 +90,49 @@ struct table_list_node
size_t filesize;
size_t cur_totallen;
+ /*this part for http post server*/
+ bool onceupload;
+ int32_t finished;
+ MD5_CTX md5ctx;
+ char tmppath[256];
+ char filename[128];
+ char fragmd5[36];
+ u_int32_t cfgnum;
+ char *fragcontent;
+ size_t fragsize;
+ cJSON *table_meta;
+
+ FILE *fp_cfg_file;
+ struct cont_frag_node *cur_frag;
TAILQ_HEAD(__table_cont_node, cont_frag_node) frag_head;
TAILQ_ENTRY(table_list_node) table_node;
};
+struct doris_business;
struct version_list_node
{
int64_t version;
char *metacont;
int32_t metalen;
int16_t cfg_type; //1-full, 2-inc
- int16_t cont_in_disk;
+ int8_t cont_in_disk;
+ int8_t version_finished;
cJSON *metajson, *arrayjson;
cJSON *table_meta;
+ /*this part for http post server*/
+ FILE *fp_idx_file;
+ char token[64];
+ char tmp_index_path[256]; //incĿ¼�µ�����ȫ��������ʱÿ���汾��һ��
+ int16_t retry_times;
+ int16_t syncing;
+ int32_t total_cfgs;
+ struct doris_business *business;
+ struct evhttp_request *req;
+ struct doris_upload_ctx *synctx;
+ struct table_list_node *cur_table;
+ struct event timer_expire;
+
TAILQ_HEAD(__table_list_node, table_list_node) table_head;
TAILQ_ENTRY(version_list_node) version_node;
};
@@ -95,35 +149,21 @@ struct version_list_handle
struct version_list_handle *config_version_handle_new(void);
-struct doris_business;
-struct confile_save
-{
- struct event_base *evbase;
- struct doris_business *business;
- int64_t version;
- int32_t source_from;
- int32_t type;
- int64_t version_cfgnum;
- char inc_index_path[256]; //incĿ¼�µ�����ȫ��
- char tmp_index_path[256]; //incĿ¼�µ�����ȫ��
- char full_index_path[256]; //fullĿ¼�µ�ȫ��
- char cfg_file_path[256];
- FILE *fp_cfg_file;
- FILE *fp_idx_file;
-
- struct version_list_node *cur_vernode;
- struct table_list_node *cur_table;
- struct cont_frag_node *cur_frag;
-};
-
struct common_timer_event
{
struct event timer_event;
void *data;
};
+struct doris_business;
+struct bufferevent *doris_https_bufferevent_cb(struct event_base *evabse, void *arg);
+struct doris_business *lookup_bizstruct_from_name(const struct evkeyvalq *params);
+void business_set_sync_peer_abnormal(struct doris_business *business);
+void business_resume_sync_peer_normal(struct doris_business *business);
+
void* thread_doris_client_recv_cfg(void *arg);
void* thread_index_file_recv_cfg(void *arg);
+void* thread_http_post_recv_cfg(void *arg);
#endif
diff --git a/server/doris_server_scandir.cpp b/server/doris_server_scandir.cpp
index 8840b64..a9e4404 100644
--- a/server/doris_server_scandir.cpp
+++ b/server/doris_server_scandir.cpp
@@ -17,7 +17,7 @@
#define MESA_RUNTIME_LOGV4(handle, lv, fmt, args...) \
MESA_handle_runtime_log((handle), (lv), "DorisServer", "%s:%d, " fmt, __FILENAME__, __LINE__, ##args)
-static int scandir_md5_final_string(MD5_CTX *c, char *result, unsigned int size)
+int scandir_md5_final_string(MD5_CTX *c, char *result, unsigned int size)
{
unsigned char md5[17]={0};
int i;
@@ -202,12 +202,12 @@ int cm_read_cfg_index_file(const char* path, struct cfg_table_info* idx/*OUT*/,
memset(line, 0, sizeof(line));
fgets(line, sizeof(line), fp);
ret=sscanf(line,"%[^ \t]%*[ \t]%d%*[ \t]%s%*[ \t]%s", idx[i].table_name, &(idx[i].cfg_num), idx[i].cfg_path, idx[i].user_region);
- if((ret!=3 && ret!=4) || idx[i].cfg_num==0)//jump over empty line
+ if((ret!=3 && ret!=4))//jump over empty line
{
continue;
}
ret=stat(idx[i].cfg_path, &file_info);
- if(ret!=0)
+ if(ret!=0 || file_info.st_size==0) //���ļ����·�
{
MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "%s of %s not exisit", idx[i].cfg_path, path);
fclose(fp);
@@ -315,12 +315,14 @@ enum DORIS_UPDATE_TYPE doris_index_file_traverse(struct doris_idxfile_scanner *s
{
MESA_RUNTIME_LOGV4(logger, RLOG_LV_INFO, "load %s", idx_path_array[i].path);
table_num=cm_read_cfg_index_file(idx_path_array[i].path, table_array, CM_MAX_TABLE_NUM, logger);
- if(table_num<0)
+ if(table_num<=0)
{
- MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "load %s faild, abandon udpate.", idx_path_array[i].path);
+ MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "\033[1;31;40mAlert! Load %s failed, skip this wrong version!!!!\033[0m\n", idx_path_array[i].path);
update_type = CFG_UPDATE_TYPE_ERR;
+ scanner->cur_version = idx_path_array[i].version; //����İ汾����
break;
}
+ scanner->cur_version = idx_path_array[i].version;
cJSON *meta = doris_index_version_start(idx_path_array[i].version, table_array, table_num, update_type, doris_cbs);
for(j=0; j<table_num && update_rslt; j++)
@@ -329,7 +331,6 @@ enum DORIS_UPDATE_TYPE doris_index_file_traverse(struct doris_idxfile_scanner *s
}
if(update_rslt)
{
- scanner->cur_version = idx_path_array[i].version;
doris_cbs->version_finish(NULL, doris_cbs->userdata);
}
else
@@ -337,6 +338,7 @@ enum DORIS_UPDATE_TYPE doris_index_file_traverse(struct doris_idxfile_scanner *s
update_type = CFG_UPDATE_TYPE_ERR;
doris_cbs->version_error(NULL, doris_cbs->userdata);
cJSON_Delete(meta);
+ MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "\033[1;31;40mAlert! Load %s failed, skip this wrong version!!!!\033[0m\n", idx_path_array[i].path);
break;
}
cJSON_Delete(meta);
diff --git a/server/doris_server_scandir.h b/server/doris_server_scandir.h
index ab8d993..c9e68eb 100644
--- a/server/doris_server_scandir.h
+++ b/server/doris_server_scandir.h
@@ -1,7 +1,7 @@
#ifndef __DORIS_SERVER_SCANDIR_H__
#define __DORIS_SERVER_SCANDIR_H__
-#include "doris_client.h"
+#include "doris_consumer_client.h"
#define CM_MAX_TABLE_NUM 256
#define MAX_CONFIG_FN_LEN 256
@@ -45,6 +45,8 @@ struct doris_idxfile_scanner
char oncebuf[ONCE_BUF_SIZE];
};
+int scandir_md5_final_string(MD5_CTX *c, char *result, unsigned int size);
+
struct doris_idxfile_scanner *doris_index_file_scanner(int64_t start_version);
enum DORIS_UPDATE_TYPE doris_index_file_traverse(struct doris_idxfile_scanner *scanner, const char*idx_dir,
struct doris_callbacks *doris_cbs, const char* dec_key, void* logger);