diff options
| author | Zheng Chao <[email protected]> | 2023-01-20 15:20:11 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-01-20 15:20:11 +0800 |
| commit | 136e57eff9e87fd84de8b7b25f6691972716bacd (patch) | |
| tree | 043b825a12d5ed8db2b4de2cee8202d1607f0e01 | |
| parent | c283dfdac2ea674fdd52fc61f00814caf8f6f5c6 (diff) | |
Bugfix: illegall http header on consul session creation.
| -rw-r--r-- | src/inc_internal/swarmkv_common.h | 2 | ||||
| -rw-r--r-- | src/swarmkv.c | 2 | ||||
| -rw-r--r-- | src/swarmkv_api.c | 4 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 65 | ||||
| -rw-r--r-- | src/swarmkv_monitor.c | 2 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 2 |
6 files changed, 36 insertions, 41 deletions
diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h index d039c67..0660d93 100644 --- a/src/inc_internal/swarmkv_common.h +++ b/src/inc_internal/swarmkv_common.h @@ -63,7 +63,7 @@ struct swarmkv_options int dryrun; int run_for_leader_flag; unsigned int consul_port; - unsigned int p2p_timeout_us; + unsigned int cluster_timeout_us; unsigned int sync_interval_us; struct log_handle *logger; int loglevel; diff --git a/src/swarmkv.c b/src/swarmkv.c index ab8473f..12cbb46 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -273,7 +273,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw "instantaneous_input_cps: %.2f\r\n" "instantaneous_output_cps: %.2f\r\n" , - (double)db->opts->p2p_timeout_us/1000, + (double)db->opts->cluster_timeout_us/1000, net_info.connections, net_info.pending_rpcs, net_info.timed_out_rpcs, diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index bd7e24e..d2bc51b 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -27,7 +27,7 @@ struct swarmkv_options* swarmkv_options_new(void) opts->cluster_port=5210; opts->health_check_port=0; opts->loglevel=0; - opts->p2p_timeout_us=500*1000;//Default 500ms + opts->cluster_timeout_us=500*1000;//Default 500ms opts->sync_interval_us=10*1000; //Default 10ms strcpy(opts->bind_address, "127.0.0.1"); strcpy(opts->consul_agent_host, "127.0.0.1"); @@ -61,7 +61,7 @@ int swarmkv_options_set_health_check_port(struct swarmkv_options *opts, unsigned } int swarmkv_options_set_cluster_timeout_us(struct swarmkv_options *opts, unsigned int timeout_ms) { - opts->p2p_timeout_us=timeout_ms; + opts->cluster_timeout_us=timeout_ms; return 0; } int swarmkv_options_set_sync_interval_us(struct swarmkv_options *opts, unsigned int interval_us) diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index ae00534..1a85033 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -32,7 +32,8 @@ #include <event2/http.h> -#define CONSUL_HOST "127.0.0.1" +//#define CONSUL_HOST "127.0.0.1" +#define CONSUL_HOST "localhost" #define CONSUL_DEFAULT_PORT 8500 #define LOCAL_HOST "127.0.0.1" @@ -275,7 +276,11 @@ struct consul_request *consul_request_new(struct consul_client *client, const ch req->ref_client=client; req->request_id=client->req_id_generator++; - HASH_ADD(hh, client->request_table, request_id, sizeof(req->request_id), req); + struct consul_request *tmp=NULL; + HASH_FIND(hh, client->request_table, symbol, strlen(symbol), tmp); + assert(tmp==NULL); + + HASH_ADD(hh, client->request_table, symbol, strlen(symbol), req); return req; } @@ -284,38 +289,27 @@ void consul_request_free(struct consul_request *req) evhttp_connection_free(req->evhttpconn); //if(req->evhttpreq) evhttp_request_free(req->evhttpreq); future_destroy(req->f); - HASH_DEL(req->ref_client->request_table, req); + free(req); } void consul_request_callback(struct evhttp_request *evhttpreq, void *arg) { struct consul_request *req=(struct consul_request *)arg; - int resp_code=0; struct promise *p=future_to_promise(req->f); + HASH_DEL(req->ref_client->request_table, req); if(!evhttpreq || !evhttp_request_get_response_code(evhttpreq)) { int errcode = EVUTIL_SOCKET_ERROR(); log_fatal(req->ref_client->logger, MODULE_SWARMKV_KEYSPACE, "%s failed: %s", req->symbol, evutil_socket_error_to_string(errcode)); - promise_failed(p, FUTURE_ERROR_EXCEPTION, "socket error"); - } - else - { - resp_code=evhttp_request_get_response_code(evhttpreq); + sleep(1000); + promise_failed(p, FUTURE_ERROR_EXCEPTION, "socket error"); } - if(resp_code!=200) - { - char what[256]; - snprintf(what, sizeof(what), "resp_code=%d", resp_code); - promise_failed(p, FUTURE_ERROR_EXCEPTION, what); - } - else - { - promise_success(p, evhttpreq); - } + promise_success(p, evhttpreq); + //The request should be freed automatically after the http_request_done has finished executing. req->evhttpreq=NULL; consul_request_free(req); @@ -346,6 +340,7 @@ void consul_client_free(struct consul_client *client) struct consul_request *req=NULL, *tmp=NULL; HASH_ITER(hh, client->request_table, req, tmp) { + HASH_DEL(client->request_table, req); consul_request_free(req); } free(client); @@ -517,33 +512,37 @@ void consul_session_create_on_success(void *result, void *arg) int is_running_for_leader=0; + struct evbuffer *buf=NULL; + buf=evhttp_request_get_input_buffer(req); + size_t len = evbuffer_get_length(buf); + sds out=sdsnewlen(SDS_NOINIT, len); + evbuffer_copyout(buf, out, len); + resp_code=evhttp_request_get_response_code(req); if(resp_code==200) { - struct evbuffer *buf=NULL; - buf=evhttp_request_get_input_buffer(req); - size_t len = evbuffer_get_length(buf); - sds out=sdsnewlen(SDS_NOINIT, len); - evbuffer_copyout(buf, out, len); + session_create_response=cJSON_Parse(out); session_id=cJSON_GetObjectItem(session_create_response, "ID"); strncpy(ks->consul_session_id, session_id->valuestring, sizeof(ks->consul_session_id)); cJSON_Delete(session_create_response); consul_acquire_session_lock_async(ks); - sdsfree(out); + is_running_for_leader=1; log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "session %s is created.", ks->consul_session_id); } else { - log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "session creation failed: HTTP code %d.", - resp_code); + log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "session creation failed: HTTP code %d, %s.", + resp_code, + out); } if(!is_running_for_leader) { consul_watch_leadership_changes_async(ks); } + sdsfree(out); return; } void consul_session_create_on_on_fail(enum e_future_error err, const char * what, void * arg) @@ -552,9 +551,9 @@ void consul_session_create_on_on_fail(enum e_future_error err, const char * what log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "consul session create failed: %s.", what); return; } -void consul_create_session_async(struct swarmkv_keyspace* ks) +void consul_create_session_async(struct swarmkv_keyspace *ks) { - cJSON* session=NULL, *service_check_array=NULL, *service_check=NULL; + cJSON *session=NULL, *service_check_array=NULL, *service_check=NULL; session=cJSON_CreateObject(); cJSON_AddStringToObject(session, "LockDelay", "5s"); cJSON_AddStringToObject(session, "Name", "swarmkv-leader-election-lock"); @@ -566,10 +565,9 @@ void consul_create_session_async(struct swarmkv_keyspace* ks) cJSON_AddItemToArray(service_check_array, service_check); cJSON_AddItemToObject(session, "ServiceChecks", service_check_array); - char* payload=NULL; + char *payload=NULL; payload=cJSON_Print(session); cJSON_Delete(session); -// printf("%s\n", req_body); char url[SWARMKV_URL_MAX]=""; snprintf(url, sizeof(url), "/v1/session/create"); @@ -581,9 +579,6 @@ void consul_create_session_async(struct swarmkv_keyspace* ks) evhttp_connection_set_timeout(req->evhttpconn, 2); struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq); - - evhttp_add_header(output_headers, "Host", ks->consul_agent_host); - evhttp_add_header(output_headers, "Connection", "close"); struct evbuffer *output_buffer = evhttp_request_get_output_buffer(req->evhttpreq); evbuffer_add(output_buffer, payload, strlen(payload)); @@ -1194,7 +1189,7 @@ void keyspace_unlock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, const char* db_name, void *logger, char **err) { - struct swarmkv_keyspace* ks=ALLOC(struct swarmkv_keyspace, 1); + struct swarmkv_keyspace *ks=ALLOC(struct swarmkv_keyspace, 1); strncpy(ks->module.name, "keyspace", sizeof(ks->module.name)); ks->module.mod_ctx=ks; ks->module.lock=keyspace_lock; diff --git a/src/swarmkv_monitor.c b/src/swarmkv_monitor.c index c00a081..39ea9d7 100644 --- a/src/swarmkv_monitor.c +++ b/src/swarmkv_monitor.c @@ -210,7 +210,7 @@ struct swarmkv_monitor *module2monitor(struct swarmkv_module *module) struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts) { struct swarmkv_monitor *monitor=ALLOC(struct swarmkv_monitor, 1); - monitor->max_latency_usec=opts->p2p_timeout_us; + monitor->max_latency_usec=opts->cluster_timeout_us; monitor->nr_worker_threads=opts->nr_worker_threads; monitor->peers=ALLOC(struct recorder *, monitor->nr_worker_threads); pthread_mutex_init(&monitor->lock_event_recorder, NULL); diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index e2ad95e..c30d713 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -773,7 +773,7 @@ struct swarmkv_net* swarmkv_net_new(const struct swarmkv_options *opts, node_t* thr=net->threads+i; thr->evbase=event_base_new(); thr->thread_id=i; - thr->peer_timeout_us=opts->p2p_timeout_us; + thr->peer_timeout_us=opts->cluster_timeout_us; thr->ref_logger=logger; thr->ref_net=net; thr->conn_table=NULL; |
