summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-08-10 00:27:47 +0800
committerZheng Chao <[email protected]>2023-08-10 00:27:47 +0800
commit546b91069cac4dabc0b2751f57b3cce2385f1291 (patch)
tree5fe709bbbb6af32f41ad525f1a58b0f795945e6c /src
parente8214a0677509e332fabfa2832428102ffd0cda0 (diff)
bugfix `cluster keys` command
Diffstat (limited to 'src')
-rw-r--r--src/inc_internal/swarmkv_common.h1
-rw-r--r--src/inc_internal/swarmkv_keyspace.h1
-rw-r--r--src/swarmkv.c20
-rw-r--r--src/swarmkv_api.c2
-rw-r--r--src/swarmkv_common.c11
-rw-r--r--src/swarmkv_keyspace.c23
-rw-r--r--src/swarmkv_mesh.c27
-rw-r--r--src/swarmkv_store.c33
8 files changed, 69 insertions, 49 deletions
diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h
index 231f6a6..db1d85d 100644
--- a/src/inc_internal/swarmkv_common.h
+++ b/src/inc_internal/swarmkv_common.h
@@ -52,6 +52,7 @@ sds keyslots2json(void *slot_container_base, size_t sz_slot_container, size_t of
void health_response2active_nodes(const char *resp_buff, node_t *nodes, uuid_t *uuids, size_t *n_node);
void leader_response2leader_node(const char *resp_body, node_t *leader);
int key_hash_slot(const char* key, size_t keylen);
+int key2tid(const sds key, int nr_worker_threads);
int str2integer(const char *str, long long *integer);
char *str_replace(char *orig, char *rep, char *with);
struct swarmkv_options
diff --git a/src/inc_internal/swarmkv_keyspace.h b/src/inc_internal/swarmkv_keyspace.h
index 6a36da6..20dd9f4 100644
--- a/src/inc_internal/swarmkv_keyspace.h
+++ b/src/inc_internal/swarmkv_keyspace.h
@@ -11,6 +11,7 @@
struct swarmkv_module *swarmkv_keyspace_new(struct swarmkv_options *opts, const char *db_name, struct log_handle *logger, char **err);
void swarmkv_keyspace_free(struct swarmkv_module* mod_keyspace);
void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_id);
+int swarmkv_keyspace_get_tid(struct swarmkv_module *mod_keyspace, int slot_id);
struct keyspace_info
{
unsigned int health_check_port;
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 4b1c44b..900b2c0 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -237,6 +237,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
"address: %s\r\n"
"uuid: %s\r\n"
"worker_threads: %d\r\n"
+ "caller_threads: %d\r\n"
"server_time_usec: %lld\r\n"
"up_time_in_seconds: %ld\r\n"
"up_time_in_days: %ld\r\n"
@@ -245,6 +246,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
db->self.addr,
uuid_str,
db->opts->nr_worker_threads,
+ db->opts->nr_caller_threads,
server_time_us,
now_monotonic.tv_sec-db->boot_time.tv_sec,
(now_monotonic.tv_sec-db->boot_time.tv_sec)/(3600*24)
@@ -314,23 +316,23 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
{
if(0==strcasecmp("Node", cmd->argv[1]))
{
- *reply=swarmkv_reply_new_status(node_info_buff);
+ *reply=swarmkv_reply_new_string_fmt(node_info_buff);
}
else if(0==strcasecmp("Store", cmd->argv[1]))
{
- *reply=swarmkv_reply_new_status(store_info_buff);
+ *reply=swarmkv_reply_new_string_fmt(store_info_buff);
}
else if(0==strcasecmp("Keyspace", cmd->argv[1]))
{
- *reply=swarmkv_reply_new_status(ks_info_buff);
+ *reply=swarmkv_reply_new_string_fmt(ks_info_buff);
}
else if(0==strcasecmp("Network", cmd->argv[1]))
{
- *reply=swarmkv_reply_new_status(net_info_buff);
+ *reply=swarmkv_reply_new_string_fmt(net_info_buff);
}
- else if(0==strcasecmp("Thread", cmd->argv[1]))
+ else if(0==strcasecmp("Threads", cmd->argv[1]))
{
- *reply=swarmkv_reply_new_integer(db->opts->nr_worker_threads);
+ *reply=swarmkv_reply_new_string_from_integer(db->opts->nr_worker_threads);
}
else
{
@@ -791,10 +793,10 @@ static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *
tid=atoi(cmd->argv[2]);
break;
case KEY_OFFSET_SLOTID:
- tid=atoi(cmd->argv[2])%nr_worker_threads;
+ tid=swarmkv_keyspace_get_tid(spec->module, atoi(cmd->argv[2]));
break;
default:
- tid=key_hash_slot(cmd->argv[spec->key_offset], sdslen(cmd->argv[spec->key_offset]))%nr_worker_threads;
+ tid=key2tid(cmd->argv[spec->key_offset], nr_worker_threads);
break;
}
return tid;
@@ -852,7 +854,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar
int target_tid=spec_gettid(spec, cmd, db->opts->nr_worker_threads);
struct swarmkv_msg *msg=NULL;
- //Because only worker thread has network thread, so command should be executed at worker thread first.
+ //Only worker threads can do network communication, so command should be executed at worker thread first.
if(cur_tid != target_tid)
{
//cmd will be executed in target thread's on_msg_callback
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index 01ace95..1b55caf 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -354,7 +354,7 @@ struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *ta
}
if(ctx.reply==NULL)
{
- swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL);
}
assert(ctx.reply!=NULL);
reply=ctx.reply;
diff --git a/src/swarmkv_common.c b/src/swarmkv_common.c
index aa855a4..55406bc 100644
--- a/src/swarmkv_common.c
+++ b/src/swarmkv_common.c
@@ -11,7 +11,6 @@
#include <stdio.h>
#include <arpa/inet.h>
-
int str2integer(const char *str, long long *integer)
{
char *endptr=NULL;
@@ -479,7 +478,11 @@ struct swarmkv_reply *swarmkv_reply_dup(const struct swarmkv_reply *origin)
}
void swarmkv_reply_merge_array(struct swarmkv_reply **dst, struct swarmkv_reply *src)
{
- if(src->type==SWARMKV_REPLY_NIL) return;
+ if(src->type==SWARMKV_REPLY_NIL)
+ {
+ swarmkv_reply_free(src);
+ return;
+ }
if(*dst==NULL)
{
*dst=swarmkv_reply_new_array(0);
@@ -736,6 +739,10 @@ int key_hash_slot(const char *key, size_t keylen)
}
return XXH32(key+s+1, e-s-1, KEYSPACE_XXHASH_SEED)%KEYSPACE_SLOT_NUM;
}
+int key2tid(const sds key, int nr_worker_threads)
+{
+ return key_hash_slot(key, sdslen(key))%nr_worker_threads;
+}
struct http_client
{
int response_code;
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index 13fbbe7..ba7c2b9 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -129,6 +129,10 @@ struct swarmkv_reply *key_entry_list_replica_nodes(struct key_route_entry *key_e
}
return reply;
}
+static int slot_is_my_thread(int slot_id, int thread_id, int nr_threads)
+{
+ return (slot_id%nr_threads)==thread_id;
+}
struct crdt_op_ctx
{
node_t peer;
@@ -1742,17 +1746,19 @@ enum cmd_exec_result key_route_generic(struct swarmkv_keyspace *ks, enum KEYSPAC
UT_icd ut_swarmkv_reply_pointer_icd = {sizeof(struct swarmkv_reply *), NULL, NULL, NULL};
enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
-/* KEYSPACE KEYS key-pattern */
+/* KEYSPACE KEYS tid pattern */
struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace);
size_t i=0, n_matched=0;
struct slot_runtime *slot_rt=NULL;
struct key_route_entry *key_entry=NULL, *tmp=NULL;
int is_matched=0;
- UT_array* matched_replies=NULL;
+ UT_array *matched_replies=NULL;
struct swarmkv_reply *r=NULL;
-
- const sds pattern=cmd->argv[2];
+ int real_tid=__gettid(ks->exec_cmd_handle);
+ int thread_id=atoll(cmd->argv[2]);
+ assert(real_tid==thread_id);
+ const sds pattern=cmd->argv[3];
utarray_new(matched_replies, &ut_swarmkv_reply_pointer_icd);
for(i=0; i<KEYSPACE_SLOT_NUM; i++)
{
@@ -1763,7 +1769,7 @@ enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace,
pthread_mutex_unlock(&slot_rt->sanity_lock);
continue;
}
-
+ slot_is_my_thread(i, thread_id, ks->opts->nr_worker_threads);
HASH_ITER(hh, slot_rt->keyroute_table, key_entry, tmp)
{
is_matched=stringmatchlen(pattern, sdslen(pattern), key_entry->key, sdslen(key_entry->key), 0);
@@ -1987,7 +1993,11 @@ enum cmd_exec_result persist_command(struct swarmkv_module *mod_keyspace, const
return ret;
}
-
+int swarmkv_keyspace_get_tid(struct swarmkv_module *mod_keyspace, int slot_id)
+{
+ struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace);
+ return slot_id%ks->opts->nr_worker_threads;
+}
#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 1000
void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_id)
{
@@ -2006,6 +2016,7 @@ void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_i
for(i=0; i<KEYSPACE_SLOT_NUM; i++)
{
if(i%ks->opts->nr_worker_threads!=thread_id) continue;
+ if(!slot_is_my_thread(i, thread_id, ks->opts->nr_worker_threads))continue;
slot_rt=ks->slot_rts+i;
if(!slot_rt->I_am_owner) continue;
if(ks->opts->sanity_check)
diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c
index 3612e25..49203d4 100644
--- a/src/swarmkv_mesh.c
+++ b/src/swarmkv_mesh.c
@@ -11,6 +11,7 @@
#include <stdint.h> /* Definition of uint64_t */
#include <assert.h>
#include <string.h>
+#include <pthread.h> //sanity check
#define MODULE_SWAMRKV_MESH module_name_str("swarmkv.mesh")
#define RINGBUF_SIZE 1024
@@ -24,6 +25,7 @@ struct swarmkv_mesh_thread
char *buff;
ringbuf_worker_t **workers;
struct swarmkv_mesh *ref_mesh;
+ int n_write, n_read;
};
struct swarmkv_mesh
{
@@ -42,11 +44,6 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest
struct swarmkv_mesh_thread *dest_thr=mesh->threads+dest_thread_id;
ringbuf_t *dest_ring=dest_thr->ring;
assert(msg->magic == SWARMKV_MSG_MAGIC);
- if(curr_thr->workers[dest_thread_id]==NULL)
- {
- curr_thr->workers[dest_thread_id]=ringbuf_register(dest_ring, current_thread_id);
- assert(curr_thr->workers[dest_thread_id]);
- }
ssize_t offset=0;
offset=ringbuf_acquire(dest_ring, curr_thr->workers[dest_thread_id], sizeof(struct swarmkv_msg*));
if(offset == -1)
@@ -63,6 +60,7 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest
{
assert(0);
}
+ dest_thr->n_write++;
return 0;
error_out:
swarmkv_msg_free(msg);
@@ -77,28 +75,29 @@ void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_
static void swarmkv_mesh_on_event(evutil_socket_t fd, short what, void * arg)
{
- struct swarmkv_mesh_thread *thread=(struct swarmkv_mesh_thread*)arg;
- struct swarmkv_mesh *mesh=thread->ref_mesh;
- ringbuf_t *ring=thread->ring;
+ struct swarmkv_mesh_thread *thr=(struct swarmkv_mesh_thread*)arg;
+ struct swarmkv_mesh *mesh=thr->ref_mesh;
+ ringbuf_t *ring=thr->ring;
uint64_t n_msg=0;
- ssize_t s = read(thread->efd, &n_msg, sizeof(uint64_t));
+ ssize_t s = read(thr->efd, &n_msg, sizeof(uint64_t));
if(s!=sizeof(uint64_t))
{
assert(0);
}
size_t offset=0, len=0;
struct swarmkv_msg *msg=NULL;
- len=ringbuf_consume(ring, &offset);
- assert(n_msg <= len/sizeof(struct swarmkv_msg*));
for(uint64_t i=0; i<n_msg; i++)
{
- msg=*(struct swarmkv_msg**)(thread->buff+offset);
+ len=ringbuf_consume(ring, &offset);
+ assert(len>0);
+ msg=*(struct swarmkv_msg**)(thr->buff+offset);
assert(msg->magic==SWARMKV_MSG_MAGIC);
+ ringbuf_release(ring, sizeof(struct swarmkv_msg*));
//ownership of msg is transferred to the callback function.
mesh->on_msg_recv(msg, mesh->msg_recv_arg);
- offset+=sizeof(struct swarmkv_msg*);
+ offset=0;
+ thr->n_read++;
}
- ringbuf_release(ring, n_msg*sizeof(struct swarmkv_msg*));
return;
}
struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, struct log_handle *logger)
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index de1620e..c93ac3d 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -150,15 +150,14 @@ struct swarmkv_store
{
struct swarmkv_module module;
node_t self;
- size_t nr_worker_threads;
struct swarmkv_store_thread *threads;
struct swarmkv *exec_cmd_handle;
- uuid_t my_uuid;
struct swarmkv_module *mod_monitor;
long long sync_err;
long long sync_ok;
long long synced;
+ const struct swarmkv_options *opts;
};
@@ -179,7 +178,7 @@ int __store_gettid(sds key, int nr_worker_threads)
}
int store_gettid(struct swarmkv_module * mod_store, sds key)
{
- return __store_gettid(key, module2store(mod_store)->nr_worker_threads);
+ return __store_gettid(key, module2store(mod_store)->opts->nr_worker_threads);
}
struct scontainer
{
@@ -300,13 +299,13 @@ void store_iterate_sobj(struct swarmkv_store *store, int tid, sobj_callback_func
struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key)
{
struct scontainer *ctr=NULL;
- int tid=__store_gettid(key, store->nr_worker_threads);
+ int designated_tid=key2tid(key, store->opts->nr_worker_threads);
int real_tid=__gettid(store->exec_cmd_handle);
- assert(tid==real_tid);
- ctr=scontainer_find(&(store->threads[tid].obj_table), key);
- if(0==pthread_mutex_trylock(&store->threads[tid].sanity_lock))
+ assert(designated_tid==real_tid);
+ ctr=scontainer_find(&(store->threads[designated_tid].obj_table), key);
+ if(0==pthread_mutex_trylock(&store->threads[designated_tid].sanity_lock))
{
- pthread_mutex_unlock(&store->threads[tid].sanity_lock);
+ pthread_mutex_unlock(&store->threads[designated_tid].sanity_lock);
}
else
{
@@ -338,7 +337,7 @@ struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key)
void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid)
{
struct swarmkv_store *store=module2store(mod_store);
- uuid_copy(uuid, store->my_uuid);
+ uuid_copy(uuid, store->opts->bin_uuid);
return;
}
void store_get_node_addr(struct swarmkv_module* mod_store, node_t *node)
@@ -600,9 +599,9 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts)
struct swarmkv_store *store=ALLOC(struct swarmkv_store, 1);
strncpy(store->module.name, "store", sizeof(store->module.name));
store->module.mod_ctx=store;
- store->nr_worker_threads=opts->nr_worker_threads;
+ store->opts=opts;
store->threads=ALLOC(struct swarmkv_store_thread, opts->nr_worker_threads);
- uuid_copy(store->my_uuid, opts->bin_uuid);
+
node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port);
return &(store->module);
@@ -618,7 +617,7 @@ void swarmkv_store_free(struct swarmkv_module *mod_store)
struct swarmkv_store *store=module2store(mod_store);
struct scontainer *ctr=NULL, *tmp=NULL;
- for(size_t i=0; i<store->nr_worker_threads; i++)
+ for(size_t i=0; i<store->opts->nr_worker_threads; i++)
{
HASH_ITER(hh, store->threads[i].obj_table, ctr, tmp)
@@ -641,11 +640,11 @@ void swarmkv_store_set_monitor_handle(struct swarmkv_module *mod_store, struct s
void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *info)
{
struct swarmkv_store *store=module2store(mod_store);
- info->shards=store->nr_worker_threads;
+ info->shards=store->opts->nr_worker_threads;
info->keys=0;
info->keys_to_sync=0;
struct swarmkv_store_thread *thread=NULL;
- for(size_t i=0; i<store->nr_worker_threads; i++)
+ for(size_t i=0; i<store->opts->nr_worker_threads; i++)
{
thread = store->threads+i;
info->keys_to_sync += __sync_add_and_fetch(&thread->keys_to_sync, 0);
@@ -870,11 +869,11 @@ enum cmd_exec_result crdt_keys_command(struct swarmkv_module *mod_store, const s
//CRDT KEYS tid pattern
int i=0, n_matched=0;
struct pattern_match_arg cb_arg;
- int thread_id=atoll(cmd->argv[3]);
- cb_arg.pattern=cmd->argv[4];
+ int thread_id=atoll(cmd->argv[2]);
+ cb_arg.pattern=cmd->argv[3];
utarray_new(cb_arg.matched_replies, &ut_array_matched_reply);
struct swarmkv_store *store=module2store(mod_store);
- if(thread_id>store->nr_worker_threads)
+ if(thread_id>store->opts->nr_worker_threads)
{
*reply=swarmkv_reply_new_error("Invalid thread id");
return FINISHED;