diff options
| author | Zheng Chao <[email protected]> | 2023-01-25 19:09:52 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-01-25 19:09:52 +0800 |
| commit | 99d7df4c68d3968e9afcfd00b68ca92ef10c4d9c (patch) | |
| tree | c7999b50b89f09287bbd3b9a20bffaadf79aa29d /src | |
| parent | 136e57eff9e87fd84de8b7b25f6691972716bacd (diff) | |
:bug: The test case of Resilience.AddSlotOwner fails when run all tests. To fix this, we MUST start watching slots and nodes changes in different thread than evbase_dispatch(), if not, evhttp_make_request() maybe fail due to socket fd error.
Diffstat (limited to 'src')
| -rw-r--r-- | src/swarmkv.c | 6 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 42 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 3 |
3 files changed, 27 insertions, 24 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c index 12cbb46..2e99e7f 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -1147,13 +1147,15 @@ static void libevent_log_cb(int severity, const char *msg) fclose(logfile); } */ -struct swarmkv *swarmkv_open(struct swarmkv_options* opts, const char * db_name, char **err) +struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, char **err) { struct swarmkv *db = NULL; // event_set_log_callback(libevent_log_cb); db=ALLOC(struct swarmkv, 1); strncpy(db->db_name, db_name, sizeof(db->db_name)); - + /* adds locking, only required if accessed from separate threads */ + evthread_use_pthreads(); + strncpy(db->module.name, "db", sizeof(db->module.name)); db->module.mod_ctx=db; db->module.lock=NULL; diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index 1a85033..83d579c 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -236,7 +236,8 @@ struct swarmkv_keyspace exec_cmd_func *exec_cmd_func; struct swarmkv *exec_cmd_handle; struct swarmkv_module *mod_monitor; - void *logger; + struct log_handle *logger; + }; void http_request_on_close(struct evhttp_connection *conn, void *arg) { @@ -248,7 +249,6 @@ struct consul_request { char symbol[SWARMKV_SYMBOL_MAX]; long long request_id; - struct evhttp_connection *evhttpconn; //tcp connection for the request struct evhttp_request *evhttpreq; struct future *f; struct consul_client *ref_client; @@ -261,6 +261,7 @@ struct consul_client long long req_id_generator; struct event_base *evbase; //reference of keyspace evbase struct log_handle *logger; //reference of logger + struct evhttp_connection *evhttpconn; //tcp connection for the request struct consul_request *request_table; }; void consul_request_callback(struct evhttp_request *evhttpreq, void *arg); @@ -269,8 +270,6 @@ struct consul_request *consul_request_new(struct consul_client *client, const ch struct consul_request *req=ALLOC(struct consul_request, 1); strncpy(req->symbol, symbol, sizeof(req->symbol)); - req->evhttpconn=evhttp_connection_base_new(client->evbase, NULL, client->consul_agent_host, client->consul_agent_port); - evhttp_connection_set_timeout(req->evhttpconn, 1800);//set to 30min for blocking query. req->evhttpreq=evhttp_request_new(consul_request_callback, req); req->f=f; req->ref_client=client; @@ -286,7 +285,7 @@ struct consul_request *consul_request_new(struct consul_client *client, const ch } void consul_request_free(struct consul_request *req) { - evhttp_connection_free(req->evhttpconn); + //evhttp_connection_free(req->evhttpconn); //if(req->evhttpreq) evhttp_request_free(req->evhttpreq); future_destroy(req->f); @@ -318,12 +317,10 @@ void consul_request_callback(struct evhttp_request *evhttpreq, void *arg) void consul_request_make(struct consul_request *req, enum evhttp_cmd_type cmd, const char *url) { struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq); - - evhttp_connection_set_closecb(req->evhttpconn, http_request_on_close, req); evhttp_add_header(output_headers, "Host", req->ref_client->consul_agent_host); - evhttp_add_header(output_headers, "Connection", "close"); +// evhttp_add_header(output_headers, "Connection", "close"); - evhttp_make_request(req->evhttpconn, req->evhttpreq, cmd, url); + evhttp_make_request(req->ref_client->evhttpconn, req->evhttpreq, cmd, url); } struct consul_client *consul_client_new(const char *host, unsigned short port, struct event_base *evbase, struct log_handle *logger) @@ -333,6 +330,9 @@ struct consul_client *consul_client_new(const char *host, unsigned short port, s strncpy(client->consul_agent_host, host, sizeof(client->consul_agent_host)); client->logger=logger; client->evbase=evbase; + client->evhttpconn=evhttp_connection_base_new(client->evbase, NULL, client->consul_agent_host, client->consul_agent_port); + evhttp_connection_set_timeout(client->evhttpconn, 1800);//set to 30min for blocking query. + evhttp_connection_set_closecb(client->evhttpconn, http_request_on_close, client); return client; } void consul_client_free(struct consul_client *client) @@ -343,6 +343,7 @@ void consul_client_free(struct consul_client *client) HASH_DEL(client->request_table, req); consul_request_free(req); } + evhttp_connection_free(client->evhttpconn); free(client); } void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks); @@ -489,14 +490,9 @@ void *swarmkv_keyspace_thread(void *arg) char thread_name[16]; snprintf(thread_name, sizeof(thread_name), "swarmkv-ks"); prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); - struct event * ev = event_new(ks->evbase, -1, EV_PERSIST, __ks_dummy_event_handler, ks); struct timeval timer_delay = {2, 0}; evtimer_add(ev, &timer_delay); - - consul_watch_slots_changes_async(ks); - consul_watch_nodes_changes_async(ks); - event_base_dispatch(ks->evbase); event_del(ev); event_free(ev); @@ -576,7 +572,7 @@ void consul_create_session_async(struct swarmkv_keyspace *ks) struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); - evhttp_connection_set_timeout(req->evhttpconn, 2); +// evhttp_connection_set_timeout(req->evhttpconn, 2); struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq); @@ -705,7 +701,7 @@ void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks) struct future *f=future_create(__func__, acquire_session_lock_on_success, acquire_session_lock_on_fail, ks); struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); - evhttp_connection_set_timeout(req->evhttpconn, 2); +// evhttp_connection_set_timeout(req->evhttpconn, 2); struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq); evbuffer_add(output_buffer, req_body, sdslen(req_body)); @@ -916,11 +912,11 @@ void propagate_slot_table_async(struct swarmkv_keyspace *ks, struct key_slot new struct future *f=future_create(__func__, propagate_slot_table_on_success, propagate_slot_table_on_fail, ks); struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); - evhttp_connection_set_timeout(req->evhttpconn, 20); +// evhttp_connection_set_timeout(req->evhttpconn, 20); struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq); - struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq); + struct evbuffer *output_buffer = evhttp_request_get_output_buffer(req->evhttpreq); evbuffer_add(output_buffer, new_slots_json, sdslen(new_slots_json)); char number[64]; snprintf(number, sizeof(number), "%zu", sdslen(new_slots_json)); @@ -1187,7 +1183,7 @@ void keyspace_unlock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag return; } -struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, const char* db_name, void *logger, char **err) +struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, const char *db_name, void *logger, char **err) { struct swarmkv_keyspace *ks=ALLOC(struct swarmkv_keyspace, 1); strncpy(ks->module.name, "keyspace", sizeof(ks->module.name)); @@ -1216,7 +1212,13 @@ struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots init failed."); goto error_out; } -// consul_kv_init_if_nonexist(ks); + //We MUST start watching in different thread than evbase_dispatch(), if not, evhttp_make_request() maybe fail due to socket fd error. + //This happens when calling swarmkv-cli for adding more slot owners. + //I spent two days on this issue, but still don't known what magic design of libevent2 causing this problem. + + consul_watch_slots_changes_async(ks); + consul_watch_nodes_changes_async(ks); + pthread_create(&(ks->thr), NULL, swarmkv_keyspace_thread, (void *) ks); if(ks->dryrun) diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index c30d713..9ad1563 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -760,14 +760,13 @@ static void __accept_error_cb(struct evconnlistener *listener, void *arg) struct swarmkv_net* swarmkv_net_new(const struct swarmkv_options *opts, node_t* self, void* logger, char **err) { struct swarmkv_net* net=NULL; - /* adds locking, only required if accessed from separate threads */ - evthread_use_pthreads(); net=ALLOC(struct swarmkv_net, 1); net->logger=logger; net->n_thread=opts->nr_worker_threads; net->sequence_generator=1; net->threads=ALLOC(struct snet_thread, net->n_thread); struct snet_thread *thr=NULL; + for(size_t i=0; i<net->n_thread; i++) { thr=net->threads+i; |
