summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-01-13 14:48:22 +0800
committer郑超 <[email protected]>2023-01-13 06:51:02 +0000
commit67124d30cd0f12548a1f1ed612edcd5b56b18d17 (patch)
treeece5c36af5147945f3f88c676f8b5a3d6cc79c31
parentffc741af38aec50a15ebfd9ee07024dc40f515c5 (diff)
:recycle: The keyspace uses future-promise to do HTTP get.
-rw-r--r--docs/design.md2
-rw-r--r--src/inc_internal/swarmkv_error.h4
-rw-r--r--src/swarmkv_keyspace.c308
-rw-r--r--test/swarmkv_perf_test.cpp4
-rw-r--r--test/test_utils.c4
-rw-r--r--test/test_utils.h2
-rw-r--r--tools/swarmkv_cli.c10
7 files changed, 163 insertions, 171 deletions
diff --git a/docs/design.md b/docs/design.md
index d4ce2f4..7e2584c 100644
--- a/docs/design.md
+++ b/docs/design.md
@@ -98,7 +98,7 @@ Examples:
The keyspace is sharding to slots and each slot is owned by one node. This information is recorded in the global shared slot table. If any node's health check failed, the cluster leader will choose a randome health node as the new slot owner.
-You can use `CLUSTER ADDKEYOWNER` in swarmkv-cli for adding nodes.
+You can use `CLUSTER ADDSLOTOWNER` to add node to the slot table.
When adding a new node, the operation is **Assign slot_id from `original_node` to `new_node`**, the `swarmkv-cli` executes the following commands:
diff --git a/src/inc_internal/swarmkv_error.h b/src/inc_internal/swarmkv_error.h
index 8d3fa88..9d637e4 100644
--- a/src/inc_internal/swarmkv_error.h
+++ b/src/inc_internal/swarmkv_error.h
@@ -19,6 +19,6 @@
#define error_read_slot_table_failed "ERR read slot table failed"
#define error_keyspace_obj_owner_not_found "ERR can't find object owner from keyspace"
-#define error_cluster_addkeyowner_node_is_not_active "ERR candinate node %s is not active"
-#define error_cluster_addkeyowner_node_has_slot "ERR candinate node %s already has some slots"
+#define error_cluster_addslotowner_node_is_not_active "ERR candinate node %s is not active"
+#define error_cluster_addslotowner_node_has_slot "ERR candinate node %s already has some slots"
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index ad4f6fc..0ff575a 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -193,17 +193,88 @@ struct slot_runtime
int I_am_owner;
pthread_mutex_t mutex;
};
+struct http_req_context
+{
+ char symbol[128];
+ struct evhttp_connection *conn;
+ struct log_handle *logger;
+ struct future *f;
+};
+void http_req_error_cb(enum evhttp_request_error error, void *arg)
+{
+ return;
+}
+void http_request_callback(struct evhttp_request *req, void *arg)
+{
+ struct http_req_context *ctx=(struct http_req_context *)arg;
+ int resp_code=0;
+
+ struct promise *p=future_to_promise(ctx->f);
+ if(!req || !evhttp_request_get_response_code(req))
+ {
+ int errcode = EVUTIL_SOCKET_ERROR();
+ log_fatal(ctx->logger, MODULE_SWARMKV_KEYSPACE, "%s failed: %s",
+ ctx->symbol,
+ evutil_socket_error_to_string(errcode));
+ promise_failed(p, FUTURE_ERROR_EXCEPTION, "socket error");
+ }
+ else
+ {
+ resp_code=evhttp_request_get_response_code(req);
+ }
+
+ if(resp_code!=200)
+ {
+ char what[256];
+ snprintf(what, sizeof(what), "resp_code=%d", resp_code);
+ promise_failed(p, FUTURE_ERROR_EXCEPTION, what);
+ }
+ else
+ {
+ promise_success(p, req);
+ }
+ evhttp_connection_free(ctx->conn);
+ future_destroy(ctx->f);
+ ctx->f=NULL;
+ free(ctx);
+ return;
+}
+void http_get_async(const char *symbol, const char *url, const char *host, unsigned short port, struct event_base *evbase, struct future *f, struct log_handle *logger)
+{
+ struct http_req_context *ctx=ALLOC(struct http_req_context, 1);
+ strncpy(ctx->symbol, symbol, sizeof(ctx->symbol));
+ ctx->logger=logger;
+ ctx->f=f;
+ ctx->conn=evhttp_connection_base_new(evbase, NULL, host, port);
+ struct evhttp_request *request=evhttp_request_new(http_request_callback, ctx);
+ evhttp_request_set_error_cb(request, http_req_error_cb);
+
+ struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request);
+ evhttp_connection_set_timeout(ctx->conn, 1800);//set to 30min for blocking query.
+
+ evhttp_add_header(output_headers, "Host", LOCAL_HOST);
+ evhttp_add_header(output_headers, "Connection", "close");
+
+ evhttp_make_request(ctx->conn, request, EVHTTP_REQ_GET, url);
+}
struct consul_task
{
char task_name[SWARMKV_SYMBOL_MAX];
struct evhttp_connection* evhttpconn; //tcp connection for the request
UT_hash_handle hh;
};
-struct consul_task *consul_task_new(const char* task_name, struct event_base *evbase, const char* host_name, unsigned short port)
+void http_connection_close_callback(struct evhttp_connection *conn, void *arg)
+{
+// struct consul_task *task=(struct consul_task *)arg;
+ //printf("http connection %s %p closed\n", task->task_name, conn);
+ return;
+}
+struct consul_task *consul_task_new(const char *task_name, struct event_base *evbase, const char *host_name, unsigned short port)
{
struct consul_task *task=ALLOC(struct consul_task, 1);
strncpy(task->task_name, task_name, sizeof(task->task_name));
task->evhttpconn=evhttp_connection_base_new(evbase, NULL, host_name, port);
+ evhttp_connection_set_closecb(task->evhttpconn, http_connection_close_callback, task);
return task;
}
void consul_task_free(struct consul_task *task)
@@ -221,12 +292,11 @@ struct swarmkv_keyspace
node_t self;
uuid_t uuid;
pthread_rwlock_t rwlock;
-// struct key_slot slots[KEYSPACE_SLOT_NUM];
struct slot_runtime slot_rts[KEYSPACE_SLOT_NUM];
node_t nodes[SWARMKV_MAX_NODE_NUM];
unsigned int consul_agent_port;
- unsigned int health_check_port; //HTTP
+ unsigned int health_check_port; //Port for internal HTTP server
int dryrun;
int init_success;
@@ -238,7 +308,7 @@ struct swarmkv_keyspace
char consul_check_id[SWARMKV_SYMBOL_MAX];
char consul_session_id[SWARMKV_SYMBOL_MAX];
- struct evhttp* http_server;
+ struct evhttp *http_server;
struct event_base * evbase;
struct consul_task *consul_task_hash;
@@ -248,7 +318,7 @@ struct swarmkv_keyspace
struct swarmkv_module *mod_monitor;
void *logger;
};
-struct consul_task *consul_task_getnxset(struct swarmkv_keyspace* ks, const char* task_name)
+struct consul_task *consul_task_getnxset(struct swarmkv_keyspace *ks, const char *task_name)
{
struct consul_task *task=NULL;
HASH_FIND(hh, ks->consul_task_hash, task_name, strlen(task_name), task);
@@ -664,115 +734,90 @@ void consul_run_for_leader_async(struct swarmkv_keyspace* ks)
}
return;
}
-void __watch_leadership_changes_cb(struct evhttp_request *req, void *arg)
+void watch_leadership_changes_on_success(void *result, void *arg)
{
- struct swarmkv_keyspace* ks=(struct swarmkv_keyspace*)arg;
+ struct evhttp_request *req=(struct evhttp_request*)result;
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg;
cJSON *metadata_array=NULL, *metadata=NULL, *modify_idx=NULL, *session=NULL;
int resp_code=0;
int is_running_for_leader=0;
- if(!req || !evhttp_request_get_response_code(req))
- {
- int errcode = EVUTIL_SOCKET_ERROR();
- log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "watch leadership changes failed: %s",
- evutil_socket_error_to_string(errcode));
- assert(0);
- }
- else
+ resp_code=evhttp_request_get_response_code(req);
+ if(resp_code==200)
{
- resp_code=evhttp_request_get_response_code(req);
- if(resp_code==200)
- {
- struct evbuffer *buf=NULL;
- buf=evhttp_request_get_input_buffer(req);
- size_t len = evbuffer_get_length(buf);
- sds resp_body_buff=sdsnewlen(SDS_NOINIT, len);
- evbuffer_copyout(buf, resp_body_buff, len);
- metadata_array=cJSON_Parse(resp_body_buff);
- metadata=cJSON_GetArrayItem(metadata_array, 0);
- modify_idx=cJSON_GetObjectItem(metadata, "ModifyIndex");
- ks->consul_lead_key_modify_idx=modify_idx->valueint;
- session=cJSON_GetObjectItem(metadata, "Session");
- if(!session || 0==strlen(session->valuestring))
- {
- log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "cluster losts leader, start to run for leader.");
- consul_run_for_leader_async(ks);
- is_running_for_leader=1;
- }
- if(!session || 0!=strcmp(session->valuestring, ks->consul_session_id))
- {
- if(ks->is_leader)
- {
- log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "node is no longer a leader.");
- ks->is_leader=0;
- }
- }
- cJSON_Delete(metadata_array);
- sdsfree(resp_body_buff);
- }
- else if(resp_code==404)
+ struct evbuffer *buf=NULL;
+ buf=evhttp_request_get_input_buffer(req);
+ size_t len = evbuffer_get_length(buf);
+ sds resp_body_buff=sdsnewlen(SDS_NOINIT, len);
+ evbuffer_copyout(buf, resp_body_buff, len);
+ metadata_array=cJSON_Parse(resp_body_buff);
+ metadata=cJSON_GetArrayItem(metadata_array, 0);
+ modify_idx=cJSON_GetObjectItem(metadata, "ModifyIndex");
+ ks->consul_lead_key_modify_idx=modify_idx->valueint;
+ session=cJSON_GetObjectItem(metadata, "Session");
+ if(!session || 0==strlen(session->valuestring))
{
- log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "no leader key, start to run for leader.");
+ log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "cluster losts leader, start to run for leader.");
consul_run_for_leader_async(ks);
is_running_for_leader=1;
}
- else
+ if(!session || 0!=strcmp(session->valuestring, ks->consul_session_id))
{
- log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "watch leadership changes failed: HTTP code %d.", resp_code);
+ if(ks->is_leader)
+ {
+ log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "node is no longer a leader.");
+ ks->is_leader=0;
+ }
}
+ cJSON_Delete(metadata_array);
+ sdsfree(resp_body_buff);
+ }
+ else if(resp_code==404)
+ {
+ log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "no leader key, start to run for leader.");
+ consul_run_for_leader_async(ks);
+ is_running_for_leader=1;
}
+ else
+ {
+ log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "watch leadership changes failed: HTTP code %d.", resp_code);
+ }
+
if(!is_running_for_leader)
{
consul_watch_leadership_changes_async(ks);
}
}
+void watch_leadership_changes_on_fail(enum e_future_error err, const char * what, void * arg)
+{
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg;
+ log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "watch leadership changes failed: %s.", what);
+ return;
+}
void consul_watch_leadership_changes_async(struct swarmkv_keyspace* ks)
{
- char url[SWARMKV_URL_MAX]="";
- struct consul_task *task=consul_task_getnxset(ks, __func__);
-
- struct evhttp_request* request=evhttp_request_new(__watch_leadership_changes_cb, ks);
- struct evhttp_connection* conn=task->evhttpconn;
- struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request);
- evhttp_connection_set_timeout(conn, 1800);
+ char url[SWARMKV_URL_MAX]="";
snprintf(url, sizeof(url), "/v1/kv/swarmkv/%s/lead?index=%d&wait=10s", ks->db_name, ks->consul_lead_key_modify_idx);
- evhttp_add_header(output_headers, "Host", LOCAL_HOST);
- evhttp_add_header(output_headers, "Connection", "close");
- evhttp_make_request(conn, request, EVHTTP_REQ_GET, url);
+ struct future *f=future_create(__func__, watch_leadership_changes_on_success, watch_leadership_changes_on_fail, ks);
+ http_get_async(__func__, url, LOCAL_HOST, ks->consul_agent_port, ks->evbase, f, ks->logger);
return;
}
-void __watch_slots_changes_cb(struct evhttp_request *req, void *arg)
+void watch_slots_changes_on_success(void *result, void *arg)
{
struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg;
cJSON *metadata_array=NULL, *metadata=NULL, *value=NULL, *modify_idx=NULL;
- int resp_code=0;
int i=0;
struct key_slot *new_slots=NULL;
-
- if(!req || !evhttp_request_get_response_code(req))
- {
- int errcode = EVUTIL_SOCKET_ERROR();
- log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "watch slots changes failed: %s",
- evutil_socket_error_to_string(errcode));
-
- }
- else
- {
- resp_code=evhttp_request_get_response_code(req);
- }
- if(resp_code!=200)
- {
- consul_watch_slots_changes_async(ks);
- return;
- }
-
+ struct evhttp_request *req=(struct evhttp_request*)result;
struct evbuffer *buf=NULL;
buf=evhttp_request_get_input_buffer(req);
size_t len = evbuffer_get_length(buf);
- sds resp_body_buff=sdsnewlen(NULL, len);
+ sds resp_body_buff=sdsnewlen(SDS_NOINIT, len);
evbuffer_copyout(buf, resp_body_buff, len);
+
+
metadata_array=cJSON_Parse(resp_body_buff);
metadata=cJSON_GetArrayItem(metadata_array, 0);
modify_idx=cJSON_GetObjectItem(metadata, "ModifyIndex");
@@ -783,7 +828,7 @@ void __watch_slots_changes_cb(struct evhttp_request *req, void *arg)
new_slots=ALLOC(struct key_slot, KEYSPACE_SLOT_NUM);
json2keyslots(decode_buffer, new_slots, sizeof(struct key_slot), 0, KEYSPACE_SLOT_NUM);
-
+
sdsfree(decode_buffer);
sdsfree(resp_body_buff);
@@ -820,63 +865,21 @@ void __watch_slots_changes_cb(struct evhttp_request *req, void *arg)
}
log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots update finished.");
cJSON_Delete(metadata_array);
- consul_watch_slots_changes_async(ks);
free(new_slots);
+ consul_watch_slots_changes_async(ks);
+ return;
}
-
-/*
-void keyslots_print_reply(struct slot_runtime slots[], size_t n_slot, struct swarmkv_reply **pp_reply)
-{
- size_t i=0, n_reply=0;
- int start=0, end=0;
- char range[256]="", uuid_str[38]="";
- struct swarmkv_reply **reply_array[KEYSPACE_SLOT_NUM];
- const char *slot_state[3]={"STABLE", "MIGRATING", "IMPORTING"};
- start=end=slots[0].slot_id;
- for(i=0; i<n_slot+1; i++)
- {
- if(i<n_slot
- && 0==swarmkv_node_compare(&(slots[i].owner), &(slots[end].owner))
- && slots[i].owner==slots[end].state)
- {
- end=i;
- continue;
- }
- if(start==end)
- {
- snprintf(range, sizeof(range), "%d", start);
- }
- else
- {
- snprintf(range, sizeof(range), "%d-%d", start, end);
- }
- uuid_unparse_lower(slots[end].owner.bin_uuid, uuid_str);
- reply_array[n_reply]=swarmkv_reply_new_string_fmt("%s %s %s:%d [%s]", range,
- uuid_str,
- slots[end].owner.addr.ip_addr,
- slots[end].owner.addr.p2p_port,
- slot_state[slots[end].state]
- );
- start=end=i;
- }
-
+void watch_slots_changes_on_fail(enum e_future_error err, const char * what, void * arg)
+{
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg;
+ consul_watch_slots_changes_async(ks);
}
-*/
-void consul_watch_slots_changes_async(struct swarmkv_keyspace* ks)
+void consul_watch_slots_changes_async(struct swarmkv_keyspace *ks)
{
- char url[SWARMKV_URL_MAX]="";
- struct consul_task *task=consul_task_getnxset(ks, __func__);
-
- struct evhttp_request* request=evhttp_request_new(__watch_slots_changes_cb, ks);
- struct evhttp_connection* conn=task->evhttpconn;
- struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request);
- evhttp_connection_set_timeout(conn, 1800);//set to 30min for blocking query.
-
+ char url[SWARMKV_URL_MAX]="";
snprintf(url, sizeof(url), "/v1/kv/swarmkv/%s/slots?index=%d", ks->db_name, ks->consul_slots_modify_idx);
- evhttp_add_header(output_headers, "Host", LOCAL_HOST);
- evhttp_add_header(output_headers, "Connection", "close");
-
- evhttp_make_request(conn, request, EVHTTP_REQ_GET, url);
+ struct future *f=future_create(__func__, watch_slots_changes_on_success, watch_slots_changes_on_fail, ks);
+ http_get_async(__func__, url, LOCAL_HOST, ks->consul_agent_port, ks->evbase, f, ks->logger);
return;
}
void __propagate_slot_table_cb(struct evhttp_request *req, void *arg)
@@ -912,8 +915,8 @@ void propagate_slot_table_async(struct swarmkv_keyspace *ks, struct key_slot new
new_slots_json=keyslots2json(new_slot, sizeof(struct key_slot), 0, KEYSPACE_SLOT_NUM);
char url[SWARMKV_URL_MAX]="";
snprintf(url, sizeof(url), "/v1/kv/swarmkv/%s/slots", ks->db_name);
- struct evhttp_request* request=evhttp_request_new(__propagate_slot_table_cb, ks);
- struct evhttp_connection* conn=task->evhttpconn;
+ struct evhttp_request *request=evhttp_request_new(__propagate_slot_table_cb, ks);
+ struct evhttp_connection *conn=task->evhttpconn;
evhttp_connection_set_timeout(conn, 20);
struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request);
@@ -1073,23 +1076,17 @@ void remove_failed_nodes_from_key_route_table(struct swarmkv_keyspace *ks, node_
n_modified_key, n_removed_replica);
}
}
-void __watch_nodes_changes_cb(struct evhttp_request *req, void *arg)
+void watch_nodes_changes_on_success(void *result, void *arg)
{
+ struct evhttp_request *req= (struct evhttp_request *)result;
struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg;
int resp_code=0;
const char* tmp=NULL;
node_t health_nodes[SWARMKV_NODE_MAX];
memset(health_nodes, 0, sizeof(health_nodes));
- if(!req || !evhttp_request_get_response_code(req))
- {
- int errcode = EVUTIL_SOCKET_ERROR();
- log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "watch node changes failed: %s",
- evutil_socket_error_to_string(errcode));
- }
- else
- {
- resp_code=evhttp_request_get_response_code(req);
- }
+
+ resp_code=evhttp_request_get_response_code(req);
+
if(resp_code==200)
{
struct evbuffer *buf=NULL;
@@ -1112,23 +1109,18 @@ void __watch_nodes_changes_cb(struct evhttp_request *req, void *arg)
}
consul_watch_nodes_changes_async(ks);
}
+void watch_nodes_changes_on_fail(enum e_future_error err, const char * what, void * arg)
+{
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg;
+ consul_watch_nodes_changes_async(ks);
+}
void consul_watch_nodes_changes_async(struct swarmkv_keyspace* ks)
{
char url[SWARMKV_URL_MAX]="";
- char tmp[64];
- struct consul_task *task=consul_task_getnxset(ks, __func__);
-
- struct evhttp_request* request=evhttp_request_new(__watch_nodes_changes_cb, ks);
- struct evhttp_connection* conn=task->evhttpconn;
- evhttp_connection_set_timeout(conn, 1800);
-
- struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request);
- snprintf(tmp, sizeof(tmp), "%u", ks->consul_nodes_modify_idx);
- evhttp_add_header(output_headers, "Host", LOCAL_HOST);
- evhttp_add_header(output_headers, "Connection", "close");
snprintf(url, sizeof(url), "/v1/health/service/%s?passing=1&index=%d&wait=10s", ks->db_name, ks->consul_nodes_modify_idx);
- evhttp_make_request(conn, request, EVHTTP_REQ_GET, url);
+ struct future *f=future_create(__func__, watch_nodes_changes_on_success, watch_nodes_changes_on_fail, ks);
+ http_get_async(__func__, url, LOCAL_HOST, ks->consul_agent_port, ks->evbase, f, ks->logger);
}
diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp
index 17b872a..8879172 100644
--- a/test/swarmkv_perf_test.cpp
+++ b/test/swarmkv_perf_test.cpp
@@ -261,9 +261,9 @@ TEST(Resilience, AddSlotOwner)
int offset=0;
for(i=0; i<CANDINATE_NUMBER; i++)
{
- offset+=snprintf(candinate_string+offset, sizeof(candinate_string)-offset, "127.0.0.1:%u ",p2p_port_start+NODE_NUMBER+i);
+ offset+=snprintf(candinate_string+offset, sizeof(candinate_string)-offset, "127.0.0.1:%u ", p2p_port_start+NODE_NUMBER+i);
}
- swarmkv_cli_add_key_owner(cluster_name, candinate_string);
+ swarmkv_cli_add_slot_owner(cluster_name, candinate_string);
sleep(30);
g_running_flag=0;
int *successful_backgroud_running_thread=NULL;
diff --git a/test/test_utils.c b/test/test_utils.c
index 7291310..3611a1a 100644
--- a/test/test_utils.c
+++ b/test/test_utils.c
@@ -14,10 +14,10 @@ int swarmkv_cli_create_cluster(const char *cluster_name, const char *node_string
snprintf(cmd_string, sizeof(cmd_string), "../tools/swarmkv-cli --cluster-create %s %s", cluster_name, node_string);
return system(cmd_string);
}
-int swarmkv_cli_add_key_owner(const char *cluster_name, const char *node_string)
+int swarmkv_cli_add_slot_owner(const char *cluster_name, const char *node_string)
{
char cmd_string[1024];
- snprintf(cmd_string, sizeof(cmd_string), "../tools/swarmkv-cli -n %s --exec cluster addkeyowner %s", cluster_name, node_string);
+ snprintf(cmd_string, sizeof(cmd_string), "../tools/swarmkv-cli -n %s --exec cluster addslotowner %s", cluster_name, node_string);
return system(cmd_string);
}
diff --git a/test/test_utils.h b/test/test_utils.h
index ef2eed1..4e79e78 100644
--- a/test/test_utils.h
+++ b/test/test_utils.h
@@ -13,7 +13,7 @@ extern "C"
#define ALLOC(type, number) ((type *)calloc(sizeof(type), number))
int swarmkv_cli_create_cluster(const char* cluster_name, const char *node_string);
-int swarmkv_cli_add_key_owner(const char *cluster_name, const char *node_string);
+int swarmkv_cli_add_slot_owner(const char *cluster_name, const char *node_string);
struct cmd_exec_arg;
typedef void proc_result_callback_t(struct cmd_exec_arg* exec_arg, void *uarg);
diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c
index 91d1cf5..3419b49 100644
--- a/tools/swarmkv_cli.c
+++ b/tools/swarmkv_cli.c
@@ -267,7 +267,7 @@ int is_node_active(struct swarmkv_reply *nodes_reply, node_t *node)
}
return node_is_active;
}
-struct swarmkv_reply *cluster_addkeyowner_command(struct swarmkv *db, char *argv[], size_t argc)
+struct swarmkv_reply *cluster_addslotowner_command(struct swarmkv *db, char *argv[], size_t argc)
{
size_t n_new_node=argc-2;
node_t new_nodes[n_new_node];
@@ -276,7 +276,7 @@ struct swarmkv_reply *cluster_addkeyowner_command(struct swarmkv *db, char *argv
if(argc <= 2)
{
- reply = swarmkv_reply_new_error(error_wrong_number_of_arg, "CLUSTER ADDKEYOWNER");
+ reply = swarmkv_reply_new_error(error_wrong_number_of_arg, "CLUSTER ADDSLOTOWNER");
return reply;
}
char *cmd_cluster_nodes[]={"cluster", "nodes", "brief"};
@@ -294,7 +294,7 @@ struct swarmkv_reply *cluster_addkeyowner_command(struct swarmkv *db, char *argv
}
if(!is_node_active(nodes_reply, new_nodes+i))
{
- reply=swarmkv_reply_new_error(error_cluster_addkeyowner_node_is_not_active, (new_nodes+i)->addr);
+ reply=swarmkv_reply_new_error(error_cluster_addslotowner_node_is_not_active, (new_nodes+i)->addr);
goto error_out;
}
}
@@ -316,7 +316,7 @@ struct swarmkv_reply *cluster_addkeyowner_command(struct swarmkv *db, char *argv
{
if(0==node_compare(&(old_slots[j].owner), new_nodes+i))
{
- reply=swarmkv_reply_new_error(error_cluster_addkeyowner_node_has_slot, (new_nodes+i)->addr);
+ reply=swarmkv_reply_new_error(error_cluster_addslotowner_node_has_slot, (new_nodes+i)->addr);
goto error_out;
}
}
@@ -955,7 +955,7 @@ struct cluster_cmd_spec cluster_cmds[]={
{"CLUSTER INFO", "[section]", cluster_info_command},
{"CLUSTER NODES", "[breif | verbose]", cluster_nodes_command},
{"CLUSTER SLOTS", "", cluster_slots_command},
- {"CLUSTER ADDKEYOWNER", "IP:port [IP:port ...]", cluster_addkeyowner_command},
+ {"CLUSTER ADDSLOTOWNER", "IP:port [IP:port ...]", cluster_addslotowner_command},
{"CLUSTER CREATE", "cluster-name IP:port [IP:port ...]", cluster_create_command},
{"CLUSTER SANITY", "check | heal", cluster_sanity_command},
{"ATTACH", "IP:port", attach_command},