summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-01-25 19:09:52 +0800
committerZheng Chao <[email protected]>2023-01-25 19:09:52 +0800
commit99d7df4c68d3968e9afcfd00b68ca92ef10c4d9c (patch)
treec7999b50b89f09287bbd3b9a20bffaadf79aa29d /src
parent136e57eff9e87fd84de8b7b25f6691972716bacd (diff)
:bug: The test case of Resilience.AddSlotOwner fails when run all tests. To fix this, we MUST start watching slots and nodes changes in different thread than evbase_dispatch(), if not, evhttp_make_request() maybe fail due to socket fd error.
Diffstat (limited to 'src')
-rw-r--r--src/swarmkv.c6
-rw-r--r--src/swarmkv_keyspace.c42
-rw-r--r--src/swarmkv_net.c3
3 files changed, 27 insertions, 24 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 12cbb46..2e99e7f 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -1147,13 +1147,15 @@ static void libevent_log_cb(int severity, const char *msg)
fclose(logfile);
}
*/
-struct swarmkv *swarmkv_open(struct swarmkv_options* opts, const char * db_name, char **err)
+struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, char **err)
{
struct swarmkv *db = NULL;
// event_set_log_callback(libevent_log_cb);
db=ALLOC(struct swarmkv, 1);
strncpy(db->db_name, db_name, sizeof(db->db_name));
-
+ /* adds locking, only required if accessed from separate threads */
+ evthread_use_pthreads();
+
strncpy(db->module.name, "db", sizeof(db->module.name));
db->module.mod_ctx=db;
db->module.lock=NULL;
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index 1a85033..83d579c 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -236,7 +236,8 @@ struct swarmkv_keyspace
exec_cmd_func *exec_cmd_func;
struct swarmkv *exec_cmd_handle;
struct swarmkv_module *mod_monitor;
- void *logger;
+ struct log_handle *logger;
+
};
void http_request_on_close(struct evhttp_connection *conn, void *arg)
{
@@ -248,7 +249,6 @@ struct consul_request
{
char symbol[SWARMKV_SYMBOL_MAX];
long long request_id;
- struct evhttp_connection *evhttpconn; //tcp connection for the request
struct evhttp_request *evhttpreq;
struct future *f;
struct consul_client *ref_client;
@@ -261,6 +261,7 @@ struct consul_client
long long req_id_generator;
struct event_base *evbase; //reference of keyspace evbase
struct log_handle *logger; //reference of logger
+ struct evhttp_connection *evhttpconn; //tcp connection for the request
struct consul_request *request_table;
};
void consul_request_callback(struct evhttp_request *evhttpreq, void *arg);
@@ -269,8 +270,6 @@ struct consul_request *consul_request_new(struct consul_client *client, const ch
struct consul_request *req=ALLOC(struct consul_request, 1);
strncpy(req->symbol, symbol, sizeof(req->symbol));
- req->evhttpconn=evhttp_connection_base_new(client->evbase, NULL, client->consul_agent_host, client->consul_agent_port);
- evhttp_connection_set_timeout(req->evhttpconn, 1800);//set to 30min for blocking query.
req->evhttpreq=evhttp_request_new(consul_request_callback, req);
req->f=f;
req->ref_client=client;
@@ -286,7 +285,7 @@ struct consul_request *consul_request_new(struct consul_client *client, const ch
}
void consul_request_free(struct consul_request *req)
{
- evhttp_connection_free(req->evhttpconn);
+ //evhttp_connection_free(req->evhttpconn);
//if(req->evhttpreq) evhttp_request_free(req->evhttpreq);
future_destroy(req->f);
@@ -318,12 +317,10 @@ void consul_request_callback(struct evhttp_request *evhttpreq, void *arg)
void consul_request_make(struct consul_request *req, enum evhttp_cmd_type cmd, const char *url)
{
struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq);
-
- evhttp_connection_set_closecb(req->evhttpconn, http_request_on_close, req);
evhttp_add_header(output_headers, "Host", req->ref_client->consul_agent_host);
- evhttp_add_header(output_headers, "Connection", "close");
+// evhttp_add_header(output_headers, "Connection", "close");
- evhttp_make_request(req->evhttpconn, req->evhttpreq, cmd, url);
+ evhttp_make_request(req->ref_client->evhttpconn, req->evhttpreq, cmd, url);
}
struct consul_client *consul_client_new(const char *host, unsigned short port, struct event_base *evbase, struct log_handle *logger)
@@ -333,6 +330,9 @@ struct consul_client *consul_client_new(const char *host, unsigned short port, s
strncpy(client->consul_agent_host, host, sizeof(client->consul_agent_host));
client->logger=logger;
client->evbase=evbase;
+ client->evhttpconn=evhttp_connection_base_new(client->evbase, NULL, client->consul_agent_host, client->consul_agent_port);
+ evhttp_connection_set_timeout(client->evhttpconn, 1800);//set to 30min for blocking query.
+ evhttp_connection_set_closecb(client->evhttpconn, http_request_on_close, client);
return client;
}
void consul_client_free(struct consul_client *client)
@@ -343,6 +343,7 @@ void consul_client_free(struct consul_client *client)
HASH_DEL(client->request_table, req);
consul_request_free(req);
}
+ evhttp_connection_free(client->evhttpconn);
free(client);
}
void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks);
@@ -489,14 +490,9 @@ void *swarmkv_keyspace_thread(void *arg)
char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "swarmkv-ks");
prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
-
struct event * ev = event_new(ks->evbase, -1, EV_PERSIST, __ks_dummy_event_handler, ks);
struct timeval timer_delay = {2, 0};
evtimer_add(ev, &timer_delay);
-
- consul_watch_slots_changes_async(ks);
- consul_watch_nodes_changes_async(ks);
-
event_base_dispatch(ks->evbase);
event_del(ev);
event_free(ev);
@@ -576,7 +572,7 @@ void consul_create_session_async(struct swarmkv_keyspace *ks)
struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
- evhttp_connection_set_timeout(req->evhttpconn, 2);
+// evhttp_connection_set_timeout(req->evhttpconn, 2);
struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq);
@@ -705,7 +701,7 @@ void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks)
struct future *f=future_create(__func__, acquire_session_lock_on_success, acquire_session_lock_on_fail, ks);
struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
- evhttp_connection_set_timeout(req->evhttpconn, 2);
+// evhttp_connection_set_timeout(req->evhttpconn, 2);
struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq);
evbuffer_add(output_buffer, req_body, sdslen(req_body));
@@ -916,11 +912,11 @@ void propagate_slot_table_async(struct swarmkv_keyspace *ks, struct key_slot new
struct future *f=future_create(__func__, propagate_slot_table_on_success, propagate_slot_table_on_fail, ks);
struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
- evhttp_connection_set_timeout(req->evhttpconn, 20);
+// evhttp_connection_set_timeout(req->evhttpconn, 20);
struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq);
- struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq);
+ struct evbuffer *output_buffer = evhttp_request_get_output_buffer(req->evhttpreq);
evbuffer_add(output_buffer, new_slots_json, sdslen(new_slots_json));
char number[64];
snprintf(number, sizeof(number), "%zu", sdslen(new_slots_json));
@@ -1187,7 +1183,7 @@ void keyspace_unlock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag
return;
}
-struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, const char* db_name, void *logger, char **err)
+struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, const char *db_name, void *logger, char **err)
{
struct swarmkv_keyspace *ks=ALLOC(struct swarmkv_keyspace, 1);
strncpy(ks->module.name, "keyspace", sizeof(ks->module.name));
@@ -1216,7 +1212,13 @@ struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts,
log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots init failed.");
goto error_out;
}
-// consul_kv_init_if_nonexist(ks);
+ //We MUST start watching in different thread than evbase_dispatch(), if not, evhttp_make_request() maybe fail due to socket fd error.
+ //This happens when calling swarmkv-cli for adding more slot owners.
+ //I spent two days on this issue, but still don't known what magic design of libevent2 causing this problem.
+
+ consul_watch_slots_changes_async(ks);
+ consul_watch_nodes_changes_async(ks);
+
pthread_create(&(ks->thr), NULL, swarmkv_keyspace_thread, (void *) ks);
if(ks->dryrun)
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index c30d713..9ad1563 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -760,14 +760,13 @@ static void __accept_error_cb(struct evconnlistener *listener, void *arg)
struct swarmkv_net* swarmkv_net_new(const struct swarmkv_options *opts, node_t* self, void* logger, char **err)
{
struct swarmkv_net* net=NULL;
- /* adds locking, only required if accessed from separate threads */
- evthread_use_pthreads();
net=ALLOC(struct swarmkv_net, 1);
net->logger=logger;
net->n_thread=opts->nr_worker_threads;
net->sequence_generator=1;
net->threads=ALLOC(struct snet_thread, net->n_thread);
struct snet_thread *thr=NULL;
+
for(size_t i=0; i<net->n_thread; i++)
{
thr=net->threads+i;