diff options
| author | Zheng Chao <[email protected]> | 2023-08-10 00:27:47 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-08-10 00:27:47 +0800 |
| commit | 546b91069cac4dabc0b2751f57b3cce2385f1291 (patch) | |
| tree | 5fe709bbbb6af32f41ad525f1a58b0f795945e6c /src | |
| parent | e8214a0677509e332fabfa2832428102ffd0cda0 (diff) | |
bugfix `cluster keys` command
Diffstat (limited to 'src')
| -rw-r--r-- | src/inc_internal/swarmkv_common.h | 1 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_keyspace.h | 1 | ||||
| -rw-r--r-- | src/swarmkv.c | 20 | ||||
| -rw-r--r-- | src/swarmkv_api.c | 2 | ||||
| -rw-r--r-- | src/swarmkv_common.c | 11 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 23 | ||||
| -rw-r--r-- | src/swarmkv_mesh.c | 27 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 33 |
8 files changed, 69 insertions, 49 deletions
diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h index 231f6a6..db1d85d 100644 --- a/src/inc_internal/swarmkv_common.h +++ b/src/inc_internal/swarmkv_common.h @@ -52,6 +52,7 @@ sds keyslots2json(void *slot_container_base, size_t sz_slot_container, size_t of void health_response2active_nodes(const char *resp_buff, node_t *nodes, uuid_t *uuids, size_t *n_node); void leader_response2leader_node(const char *resp_body, node_t *leader); int key_hash_slot(const char* key, size_t keylen); +int key2tid(const sds key, int nr_worker_threads); int str2integer(const char *str, long long *integer); char *str_replace(char *orig, char *rep, char *with); struct swarmkv_options diff --git a/src/inc_internal/swarmkv_keyspace.h b/src/inc_internal/swarmkv_keyspace.h index 6a36da6..20dd9f4 100644 --- a/src/inc_internal/swarmkv_keyspace.h +++ b/src/inc_internal/swarmkv_keyspace.h @@ -11,6 +11,7 @@ struct swarmkv_module *swarmkv_keyspace_new(struct swarmkv_options *opts, const char *db_name, struct log_handle *logger, char **err); void swarmkv_keyspace_free(struct swarmkv_module* mod_keyspace); void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_id); +int swarmkv_keyspace_get_tid(struct swarmkv_module *mod_keyspace, int slot_id); struct keyspace_info { unsigned int health_check_port; diff --git a/src/swarmkv.c b/src/swarmkv.c index 4b1c44b..900b2c0 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -237,6 +237,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw "address: %s\r\n" "uuid: %s\r\n" "worker_threads: %d\r\n" + "caller_threads: %d\r\n" "server_time_usec: %lld\r\n" "up_time_in_seconds: %ld\r\n" "up_time_in_days: %ld\r\n" @@ -245,6 +246,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw db->self.addr, uuid_str, db->opts->nr_worker_threads, + db->opts->nr_caller_threads, server_time_us, now_monotonic.tv_sec-db->boot_time.tv_sec, (now_monotonic.tv_sec-db->boot_time.tv_sec)/(3600*24) @@ -314,23 +316,23 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw { if(0==strcasecmp("Node", cmd->argv[1])) { - *reply=swarmkv_reply_new_status(node_info_buff); + *reply=swarmkv_reply_new_string_fmt(node_info_buff); } else if(0==strcasecmp("Store", cmd->argv[1])) { - *reply=swarmkv_reply_new_status(store_info_buff); + *reply=swarmkv_reply_new_string_fmt(store_info_buff); } else if(0==strcasecmp("Keyspace", cmd->argv[1])) { - *reply=swarmkv_reply_new_status(ks_info_buff); + *reply=swarmkv_reply_new_string_fmt(ks_info_buff); } else if(0==strcasecmp("Network", cmd->argv[1])) { - *reply=swarmkv_reply_new_status(net_info_buff); + *reply=swarmkv_reply_new_string_fmt(net_info_buff); } - else if(0==strcasecmp("Thread", cmd->argv[1])) + else if(0==strcasecmp("Threads", cmd->argv[1])) { - *reply=swarmkv_reply_new_integer(db->opts->nr_worker_threads); + *reply=swarmkv_reply_new_string_from_integer(db->opts->nr_worker_threads); } else { @@ -791,10 +793,10 @@ static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd * tid=atoi(cmd->argv[2]); break; case KEY_OFFSET_SLOTID: - tid=atoi(cmd->argv[2])%nr_worker_threads; + tid=swarmkv_keyspace_get_tid(spec->module, atoi(cmd->argv[2])); break; default: - tid=key_hash_slot(cmd->argv[spec->key_offset], sdslen(cmd->argv[spec->key_offset]))%nr_worker_threads; + tid=key2tid(cmd->argv[spec->key_offset], nr_worker_threads); break; } return tid; @@ -852,7 +854,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar int target_tid=spec_gettid(spec, cmd, db->opts->nr_worker_threads); struct swarmkv_msg *msg=NULL; - //Because only worker thread has network thread, so command should be executed at worker thread first. + //Only worker threads can do network communication, so command should be executed at worker thread first. if(cur_tid != target_tid) { //cmd will be executed in target thread's on_msg_callback diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index 01ace95..1b55caf 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -354,7 +354,7 @@ struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *ta } if(ctx.reply==NULL) { - swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL); } assert(ctx.reply!=NULL); reply=ctx.reply; diff --git a/src/swarmkv_common.c b/src/swarmkv_common.c index aa855a4..55406bc 100644 --- a/src/swarmkv_common.c +++ b/src/swarmkv_common.c @@ -11,7 +11,6 @@ #include <stdio.h> #include <arpa/inet.h> - int str2integer(const char *str, long long *integer) { char *endptr=NULL; @@ -479,7 +478,11 @@ struct swarmkv_reply *swarmkv_reply_dup(const struct swarmkv_reply *origin) } void swarmkv_reply_merge_array(struct swarmkv_reply **dst, struct swarmkv_reply *src) { - if(src->type==SWARMKV_REPLY_NIL) return; + if(src->type==SWARMKV_REPLY_NIL) + { + swarmkv_reply_free(src); + return; + } if(*dst==NULL) { *dst=swarmkv_reply_new_array(0); @@ -736,6 +739,10 @@ int key_hash_slot(const char *key, size_t keylen) } return XXH32(key+s+1, e-s-1, KEYSPACE_XXHASH_SEED)%KEYSPACE_SLOT_NUM; } +int key2tid(const sds key, int nr_worker_threads) +{ + return key_hash_slot(key, sdslen(key))%nr_worker_threads; +} struct http_client { int response_code; diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index 13fbbe7..ba7c2b9 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -129,6 +129,10 @@ struct swarmkv_reply *key_entry_list_replica_nodes(struct key_route_entry *key_e } return reply; } +static int slot_is_my_thread(int slot_id, int thread_id, int nr_threads) +{ + return (slot_id%nr_threads)==thread_id; +} struct crdt_op_ctx { node_t peer; @@ -1742,17 +1746,19 @@ enum cmd_exec_result key_route_generic(struct swarmkv_keyspace *ks, enum KEYSPAC UT_icd ut_swarmkv_reply_pointer_icd = {sizeof(struct swarmkv_reply *), NULL, NULL, NULL}; enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { -/* KEYSPACE KEYS key-pattern */ +/* KEYSPACE KEYS tid pattern */ struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace); size_t i=0, n_matched=0; struct slot_runtime *slot_rt=NULL; struct key_route_entry *key_entry=NULL, *tmp=NULL; int is_matched=0; - UT_array* matched_replies=NULL; + UT_array *matched_replies=NULL; struct swarmkv_reply *r=NULL; - - const sds pattern=cmd->argv[2]; + int real_tid=__gettid(ks->exec_cmd_handle); + int thread_id=atoll(cmd->argv[2]); + assert(real_tid==thread_id); + const sds pattern=cmd->argv[3]; utarray_new(matched_replies, &ut_swarmkv_reply_pointer_icd); for(i=0; i<KEYSPACE_SLOT_NUM; i++) { @@ -1763,7 +1769,7 @@ enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace, pthread_mutex_unlock(&slot_rt->sanity_lock); continue; } - + slot_is_my_thread(i, thread_id, ks->opts->nr_worker_threads); HASH_ITER(hh, slot_rt->keyroute_table, key_entry, tmp) { is_matched=stringmatchlen(pattern, sdslen(pattern), key_entry->key, sdslen(key_entry->key), 0); @@ -1987,7 +1993,11 @@ enum cmd_exec_result persist_command(struct swarmkv_module *mod_keyspace, const return ret; } - +int swarmkv_keyspace_get_tid(struct swarmkv_module *mod_keyspace, int slot_id) +{ + struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace); + return slot_id%ks->opts->nr_worker_threads; +} #define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 1000 void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_id) { @@ -2006,6 +2016,7 @@ void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_i for(i=0; i<KEYSPACE_SLOT_NUM; i++) { if(i%ks->opts->nr_worker_threads!=thread_id) continue; + if(!slot_is_my_thread(i, thread_id, ks->opts->nr_worker_threads))continue; slot_rt=ks->slot_rts+i; if(!slot_rt->I_am_owner) continue; if(ks->opts->sanity_check) diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c index 3612e25..49203d4 100644 --- a/src/swarmkv_mesh.c +++ b/src/swarmkv_mesh.c @@ -11,6 +11,7 @@ #include <stdint.h> /* Definition of uint64_t */ #include <assert.h> #include <string.h> +#include <pthread.h> //sanity check #define MODULE_SWAMRKV_MESH module_name_str("swarmkv.mesh") #define RINGBUF_SIZE 1024 @@ -24,6 +25,7 @@ struct swarmkv_mesh_thread char *buff; ringbuf_worker_t **workers; struct swarmkv_mesh *ref_mesh; + int n_write, n_read; }; struct swarmkv_mesh { @@ -42,11 +44,6 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest struct swarmkv_mesh_thread *dest_thr=mesh->threads+dest_thread_id; ringbuf_t *dest_ring=dest_thr->ring; assert(msg->magic == SWARMKV_MSG_MAGIC); - if(curr_thr->workers[dest_thread_id]==NULL) - { - curr_thr->workers[dest_thread_id]=ringbuf_register(dest_ring, current_thread_id); - assert(curr_thr->workers[dest_thread_id]); - } ssize_t offset=0; offset=ringbuf_acquire(dest_ring, curr_thr->workers[dest_thread_id], sizeof(struct swarmkv_msg*)); if(offset == -1) @@ -63,6 +60,7 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest { assert(0); } + dest_thr->n_write++; return 0; error_out: swarmkv_msg_free(msg); @@ -77,28 +75,29 @@ void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_ static void swarmkv_mesh_on_event(evutil_socket_t fd, short what, void * arg) { - struct swarmkv_mesh_thread *thread=(struct swarmkv_mesh_thread*)arg; - struct swarmkv_mesh *mesh=thread->ref_mesh; - ringbuf_t *ring=thread->ring; + struct swarmkv_mesh_thread *thr=(struct swarmkv_mesh_thread*)arg; + struct swarmkv_mesh *mesh=thr->ref_mesh; + ringbuf_t *ring=thr->ring; uint64_t n_msg=0; - ssize_t s = read(thread->efd, &n_msg, sizeof(uint64_t)); + ssize_t s = read(thr->efd, &n_msg, sizeof(uint64_t)); if(s!=sizeof(uint64_t)) { assert(0); } size_t offset=0, len=0; struct swarmkv_msg *msg=NULL; - len=ringbuf_consume(ring, &offset); - assert(n_msg <= len/sizeof(struct swarmkv_msg*)); for(uint64_t i=0; i<n_msg; i++) { - msg=*(struct swarmkv_msg**)(thread->buff+offset); + len=ringbuf_consume(ring, &offset); + assert(len>0); + msg=*(struct swarmkv_msg**)(thr->buff+offset); assert(msg->magic==SWARMKV_MSG_MAGIC); + ringbuf_release(ring, sizeof(struct swarmkv_msg*)); //ownership of msg is transferred to the callback function. mesh->on_msg_recv(msg, mesh->msg_recv_arg); - offset+=sizeof(struct swarmkv_msg*); + offset=0; + thr->n_read++; } - ringbuf_release(ring, n_msg*sizeof(struct swarmkv_msg*)); return; } struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, struct log_handle *logger) diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index de1620e..c93ac3d 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -150,15 +150,14 @@ struct swarmkv_store { struct swarmkv_module module; node_t self; - size_t nr_worker_threads; struct swarmkv_store_thread *threads; struct swarmkv *exec_cmd_handle; - uuid_t my_uuid; struct swarmkv_module *mod_monitor; long long sync_err; long long sync_ok; long long synced; + const struct swarmkv_options *opts; }; @@ -179,7 +178,7 @@ int __store_gettid(sds key, int nr_worker_threads) } int store_gettid(struct swarmkv_module * mod_store, sds key) { - return __store_gettid(key, module2store(mod_store)->nr_worker_threads); + return __store_gettid(key, module2store(mod_store)->opts->nr_worker_threads); } struct scontainer { @@ -300,13 +299,13 @@ void store_iterate_sobj(struct swarmkv_store *store, int tid, sobj_callback_func struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) { struct scontainer *ctr=NULL; - int tid=__store_gettid(key, store->nr_worker_threads); + int designated_tid=key2tid(key, store->opts->nr_worker_threads); int real_tid=__gettid(store->exec_cmd_handle); - assert(tid==real_tid); - ctr=scontainer_find(&(store->threads[tid].obj_table), key); - if(0==pthread_mutex_trylock(&store->threads[tid].sanity_lock)) + assert(designated_tid==real_tid); + ctr=scontainer_find(&(store->threads[designated_tid].obj_table), key); + if(0==pthread_mutex_trylock(&store->threads[designated_tid].sanity_lock)) { - pthread_mutex_unlock(&store->threads[tid].sanity_lock); + pthread_mutex_unlock(&store->threads[designated_tid].sanity_lock); } else { @@ -338,7 +337,7 @@ struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key) void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid) { struct swarmkv_store *store=module2store(mod_store); - uuid_copy(uuid, store->my_uuid); + uuid_copy(uuid, store->opts->bin_uuid); return; } void store_get_node_addr(struct swarmkv_module* mod_store, node_t *node) @@ -600,9 +599,9 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts) struct swarmkv_store *store=ALLOC(struct swarmkv_store, 1); strncpy(store->module.name, "store", sizeof(store->module.name)); store->module.mod_ctx=store; - store->nr_worker_threads=opts->nr_worker_threads; + store->opts=opts; store->threads=ALLOC(struct swarmkv_store_thread, opts->nr_worker_threads); - uuid_copy(store->my_uuid, opts->bin_uuid); + node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port); return &(store->module); @@ -618,7 +617,7 @@ void swarmkv_store_free(struct swarmkv_module *mod_store) struct swarmkv_store *store=module2store(mod_store); struct scontainer *ctr=NULL, *tmp=NULL; - for(size_t i=0; i<store->nr_worker_threads; i++) + for(size_t i=0; i<store->opts->nr_worker_threads; i++) { HASH_ITER(hh, store->threads[i].obj_table, ctr, tmp) @@ -641,11 +640,11 @@ void swarmkv_store_set_monitor_handle(struct swarmkv_module *mod_store, struct s void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *info) { struct swarmkv_store *store=module2store(mod_store); - info->shards=store->nr_worker_threads; + info->shards=store->opts->nr_worker_threads; info->keys=0; info->keys_to_sync=0; struct swarmkv_store_thread *thread=NULL; - for(size_t i=0; i<store->nr_worker_threads; i++) + for(size_t i=0; i<store->opts->nr_worker_threads; i++) { thread = store->threads+i; info->keys_to_sync += __sync_add_and_fetch(&thread->keys_to_sync, 0); @@ -870,11 +869,11 @@ enum cmd_exec_result crdt_keys_command(struct swarmkv_module *mod_store, const s //CRDT KEYS tid pattern int i=0, n_matched=0; struct pattern_match_arg cb_arg; - int thread_id=atoll(cmd->argv[3]); - cb_arg.pattern=cmd->argv[4]; + int thread_id=atoll(cmd->argv[2]); + cb_arg.pattern=cmd->argv[3]; utarray_new(cb_arg.matched_replies, &ut_array_matched_reply); struct swarmkv_store *store=module2store(mod_store); - if(thread_id>store->nr_worker_threads) + if(thread_id>store->opts->nr_worker_threads) { *reply=swarmkv_reply_new_error("Invalid thread id"); return FINISHED; |
