summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-01-20 15:20:11 +0800
committerZheng Chao <[email protected]>2023-01-20 15:20:11 +0800
commit136e57eff9e87fd84de8b7b25f6691972716bacd (patch)
tree043b825a12d5ed8db2b4de2cee8202d1607f0e01
parentc283dfdac2ea674fdd52fc61f00814caf8f6f5c6 (diff)
Bugfix: illegall http header on consul session creation.
-rw-r--r--src/inc_internal/swarmkv_common.h2
-rw-r--r--src/swarmkv.c2
-rw-r--r--src/swarmkv_api.c4
-rw-r--r--src/swarmkv_keyspace.c65
-rw-r--r--src/swarmkv_monitor.c2
-rw-r--r--src/swarmkv_net.c2
6 files changed, 36 insertions, 41 deletions
diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h
index d039c67..0660d93 100644
--- a/src/inc_internal/swarmkv_common.h
+++ b/src/inc_internal/swarmkv_common.h
@@ -63,7 +63,7 @@ struct swarmkv_options
int dryrun;
int run_for_leader_flag;
unsigned int consul_port;
- unsigned int p2p_timeout_us;
+ unsigned int cluster_timeout_us;
unsigned int sync_interval_us;
struct log_handle *logger;
int loglevel;
diff --git a/src/swarmkv.c b/src/swarmkv.c
index ab8473f..12cbb46 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -273,7 +273,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
"instantaneous_input_cps: %.2f\r\n"
"instantaneous_output_cps: %.2f\r\n"
,
- (double)db->opts->p2p_timeout_us/1000,
+ (double)db->opts->cluster_timeout_us/1000,
net_info.connections,
net_info.pending_rpcs,
net_info.timed_out_rpcs,
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index bd7e24e..d2bc51b 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -27,7 +27,7 @@ struct swarmkv_options* swarmkv_options_new(void)
opts->cluster_port=5210;
opts->health_check_port=0;
opts->loglevel=0;
- opts->p2p_timeout_us=500*1000;//Default 500ms
+ opts->cluster_timeout_us=500*1000;//Default 500ms
opts->sync_interval_us=10*1000; //Default 10ms
strcpy(opts->bind_address, "127.0.0.1");
strcpy(opts->consul_agent_host, "127.0.0.1");
@@ -61,7 +61,7 @@ int swarmkv_options_set_health_check_port(struct swarmkv_options *opts, unsigned
}
int swarmkv_options_set_cluster_timeout_us(struct swarmkv_options *opts, unsigned int timeout_ms)
{
- opts->p2p_timeout_us=timeout_ms;
+ opts->cluster_timeout_us=timeout_ms;
return 0;
}
int swarmkv_options_set_sync_interval_us(struct swarmkv_options *opts, unsigned int interval_us)
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index ae00534..1a85033 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -32,7 +32,8 @@
#include <event2/http.h>
-#define CONSUL_HOST "127.0.0.1"
+//#define CONSUL_HOST "127.0.0.1"
+#define CONSUL_HOST "localhost"
#define CONSUL_DEFAULT_PORT 8500
#define LOCAL_HOST "127.0.0.1"
@@ -275,7 +276,11 @@ struct consul_request *consul_request_new(struct consul_client *client, const ch
req->ref_client=client;
req->request_id=client->req_id_generator++;
- HASH_ADD(hh, client->request_table, request_id, sizeof(req->request_id), req);
+ struct consul_request *tmp=NULL;
+ HASH_FIND(hh, client->request_table, symbol, strlen(symbol), tmp);
+ assert(tmp==NULL);
+
+ HASH_ADD(hh, client->request_table, symbol, strlen(symbol), req);
return req;
}
@@ -284,38 +289,27 @@ void consul_request_free(struct consul_request *req)
evhttp_connection_free(req->evhttpconn);
//if(req->evhttpreq) evhttp_request_free(req->evhttpreq);
future_destroy(req->f);
- HASH_DEL(req->ref_client->request_table, req);
+
free(req);
}
void consul_request_callback(struct evhttp_request *evhttpreq, void *arg)
{
struct consul_request *req=(struct consul_request *)arg;
- int resp_code=0;
struct promise *p=future_to_promise(req->f);
+ HASH_DEL(req->ref_client->request_table, req);
if(!evhttpreq || !evhttp_request_get_response_code(evhttpreq))
{
int errcode = EVUTIL_SOCKET_ERROR();
log_fatal(req->ref_client->logger, MODULE_SWARMKV_KEYSPACE, "%s failed: %s",
req->symbol,
evutil_socket_error_to_string(errcode));
- promise_failed(p, FUTURE_ERROR_EXCEPTION, "socket error");
- }
- else
- {
- resp_code=evhttp_request_get_response_code(evhttpreq);
+ sleep(1000);
+ promise_failed(p, FUTURE_ERROR_EXCEPTION, "socket error");
}
- if(resp_code!=200)
- {
- char what[256];
- snprintf(what, sizeof(what), "resp_code=%d", resp_code);
- promise_failed(p, FUTURE_ERROR_EXCEPTION, what);
- }
- else
- {
- promise_success(p, evhttpreq);
- }
+ promise_success(p, evhttpreq);
+
//The request should be freed automatically after the http_request_done has finished executing.
req->evhttpreq=NULL;
consul_request_free(req);
@@ -346,6 +340,7 @@ void consul_client_free(struct consul_client *client)
struct consul_request *req=NULL, *tmp=NULL;
HASH_ITER(hh, client->request_table, req, tmp)
{
+ HASH_DEL(client->request_table, req);
consul_request_free(req);
}
free(client);
@@ -517,33 +512,37 @@ void consul_session_create_on_success(void *result, void *arg)
int is_running_for_leader=0;
+ struct evbuffer *buf=NULL;
+ buf=evhttp_request_get_input_buffer(req);
+ size_t len = evbuffer_get_length(buf);
+ sds out=sdsnewlen(SDS_NOINIT, len);
+ evbuffer_copyout(buf, out, len);
+
resp_code=evhttp_request_get_response_code(req);
if(resp_code==200)
{
- struct evbuffer *buf=NULL;
- buf=evhttp_request_get_input_buffer(req);
- size_t len = evbuffer_get_length(buf);
- sds out=sdsnewlen(SDS_NOINIT, len);
- evbuffer_copyout(buf, out, len);
+
session_create_response=cJSON_Parse(out);
session_id=cJSON_GetObjectItem(session_create_response, "ID");
strncpy(ks->consul_session_id, session_id->valuestring, sizeof(ks->consul_session_id));
cJSON_Delete(session_create_response);
consul_acquire_session_lock_async(ks);
- sdsfree(out);
+
is_running_for_leader=1;
log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "session %s is created.", ks->consul_session_id);
}
else
{
- log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "session creation failed: HTTP code %d.",
- resp_code);
+ log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "session creation failed: HTTP code %d, %s.",
+ resp_code,
+ out);
}
if(!is_running_for_leader)
{
consul_watch_leadership_changes_async(ks);
}
+ sdsfree(out);
return;
}
void consul_session_create_on_on_fail(enum e_future_error err, const char * what, void * arg)
@@ -552,9 +551,9 @@ void consul_session_create_on_on_fail(enum e_future_error err, const char * what
log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "consul session create failed: %s.", what);
return;
}
-void consul_create_session_async(struct swarmkv_keyspace* ks)
+void consul_create_session_async(struct swarmkv_keyspace *ks)
{
- cJSON* session=NULL, *service_check_array=NULL, *service_check=NULL;
+ cJSON *session=NULL, *service_check_array=NULL, *service_check=NULL;
session=cJSON_CreateObject();
cJSON_AddStringToObject(session, "LockDelay", "5s");
cJSON_AddStringToObject(session, "Name", "swarmkv-leader-election-lock");
@@ -566,10 +565,9 @@ void consul_create_session_async(struct swarmkv_keyspace* ks)
cJSON_AddItemToArray(service_check_array, service_check);
cJSON_AddItemToObject(session, "ServiceChecks", service_check_array);
- char* payload=NULL;
+ char *payload=NULL;
payload=cJSON_Print(session);
cJSON_Delete(session);
-// printf("%s\n", req_body);
char url[SWARMKV_URL_MAX]="";
snprintf(url, sizeof(url), "/v1/session/create");
@@ -581,9 +579,6 @@ void consul_create_session_async(struct swarmkv_keyspace* ks)
evhttp_connection_set_timeout(req->evhttpconn, 2);
struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq);
-
- evhttp_add_header(output_headers, "Host", ks->consul_agent_host);
- evhttp_add_header(output_headers, "Connection", "close");
struct evbuffer *output_buffer = evhttp_request_get_output_buffer(req->evhttpreq);
evbuffer_add(output_buffer, payload, strlen(payload));
@@ -1194,7 +1189,7 @@ void keyspace_unlock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag
struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, const char* db_name, void *logger, char **err)
{
- struct swarmkv_keyspace* ks=ALLOC(struct swarmkv_keyspace, 1);
+ struct swarmkv_keyspace *ks=ALLOC(struct swarmkv_keyspace, 1);
strncpy(ks->module.name, "keyspace", sizeof(ks->module.name));
ks->module.mod_ctx=ks;
ks->module.lock=keyspace_lock;
diff --git a/src/swarmkv_monitor.c b/src/swarmkv_monitor.c
index c00a081..39ea9d7 100644
--- a/src/swarmkv_monitor.c
+++ b/src/swarmkv_monitor.c
@@ -210,7 +210,7 @@ struct swarmkv_monitor *module2monitor(struct swarmkv_module *module)
struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts)
{
struct swarmkv_monitor *monitor=ALLOC(struct swarmkv_monitor, 1);
- monitor->max_latency_usec=opts->p2p_timeout_us;
+ monitor->max_latency_usec=opts->cluster_timeout_us;
monitor->nr_worker_threads=opts->nr_worker_threads;
monitor->peers=ALLOC(struct recorder *, monitor->nr_worker_threads);
pthread_mutex_init(&monitor->lock_event_recorder, NULL);
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index e2ad95e..c30d713 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -773,7 +773,7 @@ struct swarmkv_net* swarmkv_net_new(const struct swarmkv_options *opts, node_t*
thr=net->threads+i;
thr->evbase=event_base_new();
thr->thread_id=i;
- thr->peer_timeout_us=opts->p2p_timeout_us;
+ thr->peer_timeout_us=opts->cluster_timeout_us;
thr->ref_logger=logger;
thr->ref_net=net;
thr->conn_table=NULL;