summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-01-25 19:09:52 +0800
committerZheng Chao <[email protected]>2023-01-25 19:09:52 +0800
commit99d7df4c68d3968e9afcfd00b68ca92ef10c4d9c (patch)
treec7999b50b89f09287bbd3b9a20bffaadf79aa29d
parent136e57eff9e87fd84de8b7b25f6691972716bacd (diff)
:bug: The test case of Resilience.AddSlotOwner fails when run all tests. To fix this, we MUST start watching slots and nodes changes in different thread than evbase_dispatch(), if not, evhttp_make_request() maybe fail due to socket fd error.
-rw-r--r--src/swarmkv.c6
-rw-r--r--src/swarmkv_keyspace.c42
-rw-r--r--src/swarmkv_net.c3
-rw-r--r--test/swarmkv_perf_test.cpp245
-rw-r--r--tools/swarmkv_cli.c4
5 files changed, 152 insertions, 148 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 12cbb46..2e99e7f 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -1147,13 +1147,15 @@ static void libevent_log_cb(int severity, const char *msg)
fclose(logfile);
}
*/
-struct swarmkv *swarmkv_open(struct swarmkv_options* opts, const char * db_name, char **err)
+struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, char **err)
{
struct swarmkv *db = NULL;
// event_set_log_callback(libevent_log_cb);
db=ALLOC(struct swarmkv, 1);
strncpy(db->db_name, db_name, sizeof(db->db_name));
-
+ /* adds locking, only required if accessed from separate threads */
+ evthread_use_pthreads();
+
strncpy(db->module.name, "db", sizeof(db->module.name));
db->module.mod_ctx=db;
db->module.lock=NULL;
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index 1a85033..83d579c 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -236,7 +236,8 @@ struct swarmkv_keyspace
exec_cmd_func *exec_cmd_func;
struct swarmkv *exec_cmd_handle;
struct swarmkv_module *mod_monitor;
- void *logger;
+ struct log_handle *logger;
+
};
void http_request_on_close(struct evhttp_connection *conn, void *arg)
{
@@ -248,7 +249,6 @@ struct consul_request
{
char symbol[SWARMKV_SYMBOL_MAX];
long long request_id;
- struct evhttp_connection *evhttpconn; //tcp connection for the request
struct evhttp_request *evhttpreq;
struct future *f;
struct consul_client *ref_client;
@@ -261,6 +261,7 @@ struct consul_client
long long req_id_generator;
struct event_base *evbase; //reference of keyspace evbase
struct log_handle *logger; //reference of logger
+ struct evhttp_connection *evhttpconn; //tcp connection for the request
struct consul_request *request_table;
};
void consul_request_callback(struct evhttp_request *evhttpreq, void *arg);
@@ -269,8 +270,6 @@ struct consul_request *consul_request_new(struct consul_client *client, const ch
struct consul_request *req=ALLOC(struct consul_request, 1);
strncpy(req->symbol, symbol, sizeof(req->symbol));
- req->evhttpconn=evhttp_connection_base_new(client->evbase, NULL, client->consul_agent_host, client->consul_agent_port);
- evhttp_connection_set_timeout(req->evhttpconn, 1800);//set to 30min for blocking query.
req->evhttpreq=evhttp_request_new(consul_request_callback, req);
req->f=f;
req->ref_client=client;
@@ -286,7 +285,7 @@ struct consul_request *consul_request_new(struct consul_client *client, const ch
}
void consul_request_free(struct consul_request *req)
{
- evhttp_connection_free(req->evhttpconn);
+ //evhttp_connection_free(req->evhttpconn);
//if(req->evhttpreq) evhttp_request_free(req->evhttpreq);
future_destroy(req->f);
@@ -318,12 +317,10 @@ void consul_request_callback(struct evhttp_request *evhttpreq, void *arg)
void consul_request_make(struct consul_request *req, enum evhttp_cmd_type cmd, const char *url)
{
struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq);
-
- evhttp_connection_set_closecb(req->evhttpconn, http_request_on_close, req);
evhttp_add_header(output_headers, "Host", req->ref_client->consul_agent_host);
- evhttp_add_header(output_headers, "Connection", "close");
+// evhttp_add_header(output_headers, "Connection", "close");
- evhttp_make_request(req->evhttpconn, req->evhttpreq, cmd, url);
+ evhttp_make_request(req->ref_client->evhttpconn, req->evhttpreq, cmd, url);
}
struct consul_client *consul_client_new(const char *host, unsigned short port, struct event_base *evbase, struct log_handle *logger)
@@ -333,6 +330,9 @@ struct consul_client *consul_client_new(const char *host, unsigned short port, s
strncpy(client->consul_agent_host, host, sizeof(client->consul_agent_host));
client->logger=logger;
client->evbase=evbase;
+ client->evhttpconn=evhttp_connection_base_new(client->evbase, NULL, client->consul_agent_host, client->consul_agent_port);
+ evhttp_connection_set_timeout(client->evhttpconn, 1800);//set to 30min for blocking query.
+ evhttp_connection_set_closecb(client->evhttpconn, http_request_on_close, client);
return client;
}
void consul_client_free(struct consul_client *client)
@@ -343,6 +343,7 @@ void consul_client_free(struct consul_client *client)
HASH_DEL(client->request_table, req);
consul_request_free(req);
}
+ evhttp_connection_free(client->evhttpconn);
free(client);
}
void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks);
@@ -489,14 +490,9 @@ void *swarmkv_keyspace_thread(void *arg)
char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "swarmkv-ks");
prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
-
struct event * ev = event_new(ks->evbase, -1, EV_PERSIST, __ks_dummy_event_handler, ks);
struct timeval timer_delay = {2, 0};
evtimer_add(ev, &timer_delay);
-
- consul_watch_slots_changes_async(ks);
- consul_watch_nodes_changes_async(ks);
-
event_base_dispatch(ks->evbase);
event_del(ev);
event_free(ev);
@@ -576,7 +572,7 @@ void consul_create_session_async(struct swarmkv_keyspace *ks)
struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
- evhttp_connection_set_timeout(req->evhttpconn, 2);
+// evhttp_connection_set_timeout(req->evhttpconn, 2);
struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq);
@@ -705,7 +701,7 @@ void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks)
struct future *f=future_create(__func__, acquire_session_lock_on_success, acquire_session_lock_on_fail, ks);
struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
- evhttp_connection_set_timeout(req->evhttpconn, 2);
+// evhttp_connection_set_timeout(req->evhttpconn, 2);
struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq);
evbuffer_add(output_buffer, req_body, sdslen(req_body));
@@ -916,11 +912,11 @@ void propagate_slot_table_async(struct swarmkv_keyspace *ks, struct key_slot new
struct future *f=future_create(__func__, propagate_slot_table_on_success, propagate_slot_table_on_fail, ks);
struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
- evhttp_connection_set_timeout(req->evhttpconn, 20);
+// evhttp_connection_set_timeout(req->evhttpconn, 20);
struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq);
- struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq);
+ struct evbuffer *output_buffer = evhttp_request_get_output_buffer(req->evhttpreq);
evbuffer_add(output_buffer, new_slots_json, sdslen(new_slots_json));
char number[64];
snprintf(number, sizeof(number), "%zu", sdslen(new_slots_json));
@@ -1187,7 +1183,7 @@ void keyspace_unlock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag
return;
}
-struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, const char* db_name, void *logger, char **err)
+struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, const char *db_name, void *logger, char **err)
{
struct swarmkv_keyspace *ks=ALLOC(struct swarmkv_keyspace, 1);
strncpy(ks->module.name, "keyspace", sizeof(ks->module.name));
@@ -1216,7 +1212,13 @@ struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts,
log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots init failed.");
goto error_out;
}
-// consul_kv_init_if_nonexist(ks);
+ //We MUST start watching in different thread than evbase_dispatch(), if not, evhttp_make_request() maybe fail due to socket fd error.
+ //This happens when calling swarmkv-cli for adding more slot owners.
+ //I spent two days on this issue, but still don't known what magic design of libevent2 causing this problem.
+
+ consul_watch_slots_changes_async(ks);
+ consul_watch_nodes_changes_async(ks);
+
pthread_create(&(ks->thr), NULL, swarmkv_keyspace_thread, (void *) ks);
if(ks->dryrun)
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index c30d713..9ad1563 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -760,14 +760,13 @@ static void __accept_error_cb(struct evconnlistener *listener, void *arg)
struct swarmkv_net* swarmkv_net_new(const struct swarmkv_options *opts, node_t* self, void* logger, char **err)
{
struct swarmkv_net* net=NULL;
- /* adds locking, only required if accessed from separate threads */
- evthread_use_pthreads();
net=ALLOC(struct swarmkv_net, 1);
net->logger=logger;
net->n_thread=opts->nr_worker_threads;
net->sequence_generator=1;
net->threads=ALLOC(struct snet_thread, net->n_thread);
struct snet_thread *thr=NULL;
+
for(size_t i=0; i<net->n_thread; i++)
{
thr=net->threads+i;
diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp
index 136074a..275c4a9 100644
--- a/test/swarmkv_perf_test.cpp
+++ b/test/swarmkv_perf_test.cpp
@@ -141,6 +141,128 @@ TEST(Performance, Nthreads)
}
log_handle_destroy(logger);
}
+
+int g_tconsume_running_flag=0;
+size_t g_token_bucket_number=200*1000;
+
+void *background_tconsume_thread(void *thread_arg)
+{
+ struct swarmkv *db=(struct swarmkv *)thread_arg;
+
+ struct cmd_exec_arg *arg=NULL;
+ arg=cmd_exec_arg_new();
+ cmd_exec_arg_diable_sync_check(arg);
+ char key[256];
+ int ret=0;
+ long long tokens=1;
+ size_t round=0, got_token_cnt=0;
+ int start=random()%16;
+ while(g_tconsume_running_flag)
+ {
+ snprintf(key, sizeof(key), "tb-%zu", (round+start)%g_token_bucket_number);
+ cmd_exec_arg_expect_integer(arg, tokens);
+ arg->print_reply_on_fail=1;
+ swarmkv_tconsume(db, key, strlen(key), tokens, cmd_exec_generic_callback, arg);
+ ret=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS);
+ cmd_exec_arg_clear(arg);
+ if(ret==1) got_token_cnt++;
+ else printf("tconsume %s failed\n", key);
+ round++;
+ }
+ cmd_exec_arg_free(arg);
+ int *success=ALLOC(int, 1);
+ EXPECT_EQ(got_token_cnt, round);
+ if(got_token_cnt==round)
+ *success=1;
+ else
+ *success=0;
+ return success;
+}
+TEST(Performance, Sync)
+{
+ size_t NODE_NUMBER=2;
+ size_t CALL_THREAD_PER_NODE=2;
+ size_t WORKER_THREAD_NUMBER=1;
+
+ struct swarmkv *db[NODE_NUMBER];
+ char *err=NULL;
+ const char *log_path="./swarmkv-sync.log";
+ unsigned int p2p_port_start=9310, health_port_start=10310;
+ char node_list_str[1024]={0};
+ for(size_t i=0; i<NODE_NUMBER; i++)
+ {
+ snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", p2p_port_start+i);
+ }
+ const char *cluster_name="swarmkv-sync";
+ swarmkv_cli_create_cluster(cluster_name, node_list_str);
+ struct log_handle * logger=log_handle_create(log_path, 0);
+ struct swarmkv_options *opts[NODE_NUMBER];
+ for(size_t i=0; i<NODE_NUMBER; i++)
+ {
+ opts[i]=swarmkv_options_new();
+ swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i);
+ swarmkv_options_set_health_check_port(opts[i], health_port_start+i);
+ swarmkv_options_set_logger(opts[i], logger);
+ swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
+ swarmkv_options_set_cluster_timeout_us(opts[i], 1000*1000);
+ swarmkv_options_set_sync_interval_us(opts[i], 100*1000);
+ swarmkv_options_set_disable_run_for_leader(opts[i]);
+ db[i]=swarmkv_open(opts[i], cluster_name, &err);
+ if(err)
+ {
+ printf("swarmkv_open %zu instance failed: %s\n", i, err);
+ free(err);
+ err=NULL;
+ }
+ }
+ char key[256]={0};
+ struct swarmkv_reply *reply=NULL;
+ for(size_t i=0; i<g_token_bucket_number; i++)
+ {
+ snprintf(key, sizeof(key), "tb-%zu", i);
+ reply=swarmkv_command_on(db[i%NODE_NUMBER], NULL, "TCFG %s 2000000 2000000", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ if(reply->type != SWARMKV_REPLY_STATUS)
+ {
+ swarmkv_reply_print(reply, stdout);
+ }
+ swarmkv_reply_free(reply);
+ }
+ srand(171);
+ g_tconsume_running_flag=1;
+ pthread_t threads[NODE_NUMBER][CALL_THREAD_PER_NODE];
+ for(size_t i=0; i<NODE_NUMBER; i++)
+ {
+ for(size_t j=0; j<CALL_THREAD_PER_NODE; j++)
+ {
+ pthread_create(&(threads[i][j]), NULL, background_tconsume_thread, db[i]);
+ }
+ }
+
+ sleep(20);
+ g_tconsume_running_flag=0;
+ int *successful_backgroud_running_thread=NULL;
+ for(size_t i=0; i<NODE_NUMBER; i++)
+ {
+ for(size_t j=0; j<CALL_THREAD_PER_NODE; j++)
+ {
+ pthread_join(threads[i][j], (void**)&successful_backgroud_running_thread);
+
+ EXPECT_EQ(*successful_backgroud_running_thread, 1);
+ *successful_backgroud_running_thread=0;
+ free(successful_backgroud_running_thread);
+ successful_backgroud_running_thread=NULL;
+ }
+ }
+ sleep(20);
+ for(size_t i=0; i<NODE_NUMBER; i++)
+ {
+ swarmkv_close(db[i]);
+ }
+
+ log_handle_destroy(logger);
+
+}
int g_running_flag=0;
void *migration_background_thread(void *thread_arg)
{
@@ -219,7 +341,7 @@ TEST(Resilience, AddSlotOwner)
int i=0, j=0;
struct swarmkv *db[NODE_NUMBER+CANDINATE_NUMBER];
char *err=NULL;
- const char *log_path="./swarmkv-migration.log";
+ const char *log_path="./swarmkv-migration-11.log";
unsigned int p2p_port_start=7310, health_port_start=8310;
char node_list_str[1024]={0};
for(i=0; i<NODE_NUMBER; i++)
@@ -288,127 +410,6 @@ TEST(Resilience, AddSlotOwner)
log_handle_destroy(logger);
}
-int g_tconsume_running_flag=0;
-size_t g_token_bucket_number=200*1000;
-
-void *background_tconsume_thread(void *thread_arg)
-{
- struct swarmkv *db=(struct swarmkv *)thread_arg;
-
- struct cmd_exec_arg *arg=NULL;
- arg=cmd_exec_arg_new();
- cmd_exec_arg_diable_sync_check(arg);
- char key[256];
- int ret=0;
- long long tokens=1;
- size_t round=0, got_token_cnt=0;
- int start=random()%16;
- while(g_tconsume_running_flag)
- {
- snprintf(key, sizeof(key), "tb-%zu", (round+start)%g_token_bucket_number);
- cmd_exec_arg_expect_integer(arg, tokens);
- arg->print_reply_on_fail=1;
- swarmkv_tconsume(db, key, strlen(key), tokens, cmd_exec_generic_callback, arg);
- ret=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS);
- cmd_exec_arg_clear(arg);
- if(ret==1) got_token_cnt++;
- else printf("tconsume %s failed\n", key);
- round++;
- }
- cmd_exec_arg_free(arg);
- int *success=ALLOC(int, 1);
- EXPECT_EQ(got_token_cnt, round);
- if(got_token_cnt==round)
- *success=1;
- else
- *success=0;
- return success;
-}
-TEST(Performance, Sync)
-{
- size_t NODE_NUMBER=2;
- size_t CALL_THREAD_PER_NODE=2;
- size_t WORKER_THREAD_NUMBER=1;
-
- struct swarmkv *db[NODE_NUMBER];
- char *err=NULL;
- const char *log_path="./swarmkv-sync.log";
- unsigned int p2p_port_start=9310, health_port_start=10310;
- char node_list_str[1024]={0};
- for(size_t i=0; i<NODE_NUMBER; i++)
- {
- snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", p2p_port_start+i);
- }
- const char *cluster_name="swarmkv-sync";
- swarmkv_cli_create_cluster(cluster_name, node_list_str);
- struct log_handle * logger=log_handle_create(log_path, 0);
- struct swarmkv_options *opts[NODE_NUMBER];
- for(size_t i=0; i<NODE_NUMBER; i++)
- {
- opts[i]=swarmkv_options_new();
- swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i);
- swarmkv_options_set_health_check_port(opts[i], health_port_start+i);
- swarmkv_options_set_logger(opts[i], logger);
- swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
- swarmkv_options_set_cluster_timeout_us(opts[i], 1000*1000);
- swarmkv_options_set_sync_interval_us(opts[i], 100*1000);
- swarmkv_options_set_disable_run_for_leader(opts[i]);
- db[i]=swarmkv_open(opts[i], cluster_name, &err);
- if(err)
- {
- printf("swarmkv_open %zu instance failed: %s\n", i, err);
- free(err);
- err=NULL;
- }
- }
- char key[256]={0};
- struct swarmkv_reply *reply=NULL;
- for(size_t i=0; i<g_token_bucket_number; i++)
- {
- snprintf(key, sizeof(key), "tb-%zu", i);
- reply=swarmkv_command_on(db[i%NODE_NUMBER], NULL, "TCFG %s 2000000 2000000", key);
- EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS);
- if(reply->type != SWARMKV_REPLY_STATUS)
- {
- swarmkv_reply_print(reply, stdout);
- }
- swarmkv_reply_free(reply);
- }
- srand(171);
- g_tconsume_running_flag=1;
- pthread_t threads[NODE_NUMBER][CALL_THREAD_PER_NODE];
- for(size_t i=0; i<NODE_NUMBER; i++)
- {
- for(size_t j=0; j<CALL_THREAD_PER_NODE; j++)
- {
- pthread_create(&(threads[i][j]), NULL, background_tconsume_thread, db[i]);
- }
- }
-
- sleep(20);
- g_tconsume_running_flag=0;
- int *successful_backgroud_running_thread=NULL;
- for(size_t i=0; i<NODE_NUMBER; i++)
- {
- for(size_t j=0; j<CALL_THREAD_PER_NODE; j++)
- {
- pthread_join(threads[i][j], (void**)&successful_backgroud_running_thread);
-
- EXPECT_EQ(*successful_backgroud_running_thread, 1);
- *successful_backgroud_running_thread=0;
- free(successful_backgroud_running_thread);
- successful_backgroud_running_thread=NULL;
- }
- }
- sleep(20);
- for(size_t i=0; i<NODE_NUMBER; i++)
- {
- swarmkv_close(db[i]);
- }
-
- log_handle_destroy(logger);
-
-}
TEST(Resilience, Failover)
{
size_t NODE_NUMBER=4;
diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c
index a655008..2b81f4c 100644
--- a/tools/swarmkv_cli.c
+++ b/tools/swarmkv_cli.c
@@ -375,7 +375,7 @@ struct swarmkv_reply *cluster_addslotowner_command(struct swarmkv *db, char *arg
struct swarmkv_reply *setslot_reply=NULL, *getkeysinslot_reply=NULL, *addkeystoslot_reply=NULL;
sds *migrate_argv=NULL;
- printf("%zu slot to be migrated\n", actual_rebalanced_slot_num);
+ printf("%zu slots to be migrated\n", actual_rebalanced_slot_num);
long long migrated_keys=0;
node_t *new_node=NULL, *old_node=NULL;
int slot_id=0;
@@ -1239,7 +1239,7 @@ int main(int argc, char * argv[])
return 0;
}
snprintf(g_runtime.prompt, sizeof(g_runtime.prompt), "%s> ", g_config.db_name);
- void * logger=log_handle_create("swarmkv-cli.log", 0);
+ struct log_handle * logger=log_handle_create("swarmkv-cli.log", 0);
struct swarmkv_options *opts=swarmkv_options_new();
swarmkv_options_set_dryrun(opts);
swarmkv_options_set_cluster_port(opts, g_config.cluster_port);//listen on random port