diff options
| author | Zheng Chao <[email protected]> | 2023-08-03 21:30:39 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-08-03 21:30:39 +0800 |
| commit | 3d97b9505d8e164ba6b1217aea4f3483c3a26f2a (patch) | |
| tree | 7686a1fed61c242088e82626efc85bb13dc2ccb5 /src | |
| parent | 523621088abc7f3a701646a7360ff015f9e5c025 (diff) | |
WIP: multi-thread test
Diffstat (limited to 'src')
| -rw-r--r-- | src/inc_internal/swarmkv_common.h | 2 | ||||
| -rw-r--r-- | src/swarmkv.c | 77 | ||||
| -rw-r--r-- | src/swarmkv_api.c | 6 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 33 | ||||
| -rw-r--r-- | src/swarmkv_mesh.c | 56 | ||||
| -rw-r--r-- | src/swarmkv_message.c | 4 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 3 | ||||
| -rw-r--r-- | src/swarmkv_rpc.c | 26 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 53 |
9 files changed, 151 insertions, 109 deletions
diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h index 063f0c1..48459f5 100644 --- a/src/inc_internal/swarmkv_common.h +++ b/src/inc_internal/swarmkv_common.h @@ -77,6 +77,7 @@ struct swarmkv_options int nr_caller_threads; int total_threads; int is_assigned_to_db; + int sanity_check; }; struct swarmkv_cmd @@ -101,7 +102,6 @@ struct swarmkv_reply *swarmkv_reply_new_node(node_t *node, int is_ask); struct swarmkv_reply *swarmkv_reply_new_nil(void); struct swarmkv_reply *swarmkv_reply_new_status(const char *format, ...); struct swarmkv_reply *swarmkv_reply_new_error(const char *format, ...); -struct swarmkv_reply *swarmkv_reply_dup(const struct swarmkv_reply *origin); void swarmkv_reply_merge_array(struct swarmkv_reply **dst, struct swarmkv_reply *src); void swarmkv_reply_append_string(struct swarmkv_reply **dst, struct swarmkv_reply *src); diff --git a/src/swarmkv.c b/src/swarmkv.c index 947f542..120e358 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -66,7 +66,8 @@ struct swarmkv int thread_counter; struct swarmkv_thread_ctx *threads; - + pthread_barrier_t barrier; + struct event_base **ref_evbases; node_t self; @@ -100,6 +101,7 @@ void swarmkv_register_thread(struct swarmkv *db) db->threads[thread_id].thread_id=thread_id; return; } + int __gettid(struct swarmkv *db) { int sys_tid=syscall(SYS_gettid); @@ -113,7 +115,11 @@ int __gettid(struct swarmkv *db) assert(0); return -1; } - +struct swarmkv_thread_ctx *swarmkv_get_thread_ctx(struct swarmkv *db) +{ + int tid=__gettid(db); + return &db->threads[tid]; +} void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller); struct local_caller_ctx { @@ -148,7 +154,7 @@ void exec_for_local(struct swarmkv *db, const struct swarmkv_cmd *cmd, const nod local_caller_on_success, local_caller_on_fail, ctx); - __exec_cmd(db, NULL, target_node, cmd, ctx->my_future); + __exec_cmd(db, &db->self, target_node, cmd, ctx->my_future); return; } struct remote_caller_ctx @@ -531,10 +537,6 @@ struct swarmkv_cmd_spec *get_spec_by_argv(struct swarmkv *db, size_t argc, char* fclose(logfile); sleep(1000); } -static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) -{ - -} void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg) { struct swarmkv_thread_ctx *thread=(struct swarmkv_thread_ctx *)arg; @@ -543,9 +545,10 @@ void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg) } void *swarmkv_worker_thread(void *arg) { - struct swarmkv_thread_ctx* ctx = (struct swarmkv_thread_ctx *) arg; - struct swarmkv *db=ctx->db; - swarmkv_register_thread(ctx->db); + struct swarmkv *db = (struct swarmkv *)arg; + swarmkv_register_thread(db); + int tid=__gettid(db); + struct swarmkv_thread_ctx *ctx = db->threads+tid; char thread_name[16]; snprintf(thread_name, sizeof(thread_name), "swarmkv-%u", ctx->thread_id); prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); @@ -555,6 +558,7 @@ void *swarmkv_worker_thread(void *arg) evtimer_add(periodic_ev, &sync_interval); ctx->is_dispatching=1; + pthread_barrier_wait(&db->barrier); int ret=event_base_dispatch(ctx->evbase); @@ -574,25 +578,17 @@ void *swarmkv_worker_thread(void *arg) void swarmkv_threads_run(struct swarmkv *db) { + pthread_barrier_init(&db->barrier, NULL, db->opts->nr_worker_threads+1); int i = 0, ret=0; for (i = 0; i < db->opts->nr_worker_threads; i++) { - ret=pthread_create(&db->threads[i].thr, NULL, swarmkv_worker_thread, (void *) (db->threads+i)); + ret=pthread_create(&db->threads[i].thr, NULL, swarmkv_worker_thread, db); if(ret !=0 )//error { log_fatal(db->logger, MODULE_SWAMRKV_CORE, "pthread_create() error %d", ret); } } - int running_threads=0; - while(running_threads < db->opts->nr_worker_threads) - { - running_threads=0; - for (i = 0; i < db->opts->nr_worker_threads; i++) - { - running_threads += db->threads[i].is_dispatching; - } - usleep(10); - } + pthread_barrier_wait(&db->barrier); return; } @@ -781,7 +777,7 @@ static void crdt_add_on_success(void *result, void *user) assert(reply->type==SWARMKV_REPLY_STATUS); if(ctx->future_of_caller) { - __exec_cmd(ctx->db, NULL, NULL, ctx->cmd, ctx->future_of_caller); + __exec_cmd(ctx->db, &ctx->db->self, NULL, ctx->cmd, ctx->future_of_caller); } cmd_ctx_free(ctx); } @@ -813,7 +809,7 @@ static void key_route_on_success(void *result, void *user) } if(n_replica_node>0) { - __exec_cmd(ctx->db, NULL, replica_nodes+0, ctx->cmd, ctx->future_of_caller); + __exec_cmd(ctx->db, &ctx->db->self, replica_nodes+0, ctx->cmd, ctx->future_of_caller); } if(self_is_a_replica) { @@ -821,7 +817,7 @@ static void key_route_on_success(void *result, void *user) struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(key, replica_nodes, n_replica_node); crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller); crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx); - __exec_cmd(ctx->db, NULL, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine); + __exec_cmd(ctx->db, &ctx->db->self, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine); swarmkv_cmd_free(crdt_add_cmd); } free(replica_nodes); @@ -846,7 +842,7 @@ static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd * } return tid; } -void on_msg_callback(struct swarmkv_msg *msg, void *arg) +void __on_msg_callback(struct swarmkv_msg *msg, void *arg) { struct swarmkv *db = (struct swarmkv *)arg; int cur_tid=__gettid(db); @@ -854,6 +850,7 @@ void on_msg_callback(struct swarmkv_msg *msg, void *arg) { assert(cur_tid < db->opts->nr_worker_threads); exec_for_remote(db, msg); + swarmkv_msg_free(msg); } else { @@ -864,6 +861,7 @@ void on_msg_callback(struct swarmkv_msg *msg, void *arg) else { swarmkv_rpc_complete(db->rpc_mgr, cur_tid, msg->sequence, msg->reply); + swarmkv_msg_free(msg); } } } @@ -873,7 +871,12 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * struct swarmkv_reply *reply=NULL; struct promise *p=NULL; int cur_tid=__gettid(db); - node_t peer; + + int is_called_by_self=1; + if(accessing_node && 0!=node_compare(&db->self, accessing_node)) + { + is_called_by_self=0; + } spec=get_spec_by_argv(db, cmd->argc, cmd->argv); if(!spec) { @@ -891,8 +894,8 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * swarmkv_reply_free(reply); return; } - /*Initiating a non-auto-route command from local is NOT allowed.*/ - if(!spec->auto_route && !accessing_node && !target_node) + /*Initiating a non-auto-route command without target from local is NOT allowed.*/ + if(!spec->auto_route && is_called_by_self && !target_node) { reply=swarmkv_reply_new_error(error_no_target_node, spec->name); p=future_to_promise(future_of_caller); @@ -913,7 +916,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * } if(cur_tid != target_tid) { - //cmd is executed in target thread's on_msg_callback + //cmd will be executed in target thread's on_msg_callback long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller); msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, sequence); swarmkv_mesh_send(db->mesh, cur_tid, target_tid, msg); @@ -928,13 +931,14 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end)); - if(accessing_node && node_compare(&db->self, accessing_node))//Remote call, non-recursive exec + if(!is_called_by_self)//Remote call, non-recursive exec { struct promise *p=future_to_promise(future_of_caller); promise_success(p, reply); swarmkv_reply_free(reply); return; } + switch(exec_ret) { case FINISHED: @@ -946,10 +950,12 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * } case REDIRECT: { + node_t peer; struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, future_of_caller); node_init_from_reply(&peer, reply); + swarmkv_reply_free(reply); ctx->future_of_mine=future_create("peer_exec", peer_exec_on_success, generic_on_fail, ctx); - __exec_cmd(db, NULL, &peer, cmd, ctx->future_of_mine); + __exec_cmd(db, &db->self, &peer, cmd, ctx->future_of_mine); break; } case NEED_KEY_ROUTE: @@ -957,9 +963,10 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * struct swarmkv_cmd *keyspace_cmd=make_keyroute_cmd(spec->flag, cmd->argv[spec->key_offset], db->opts->dryrun? NULL:(&db->self)); struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, future_of_caller); ctx->future_of_mine=future_create("key_route", key_route_on_success, generic_on_fail, ctx); - __exec_cmd(db, NULL, NULL, keyspace_cmd, ctx->future_of_mine); + __exec_cmd(db, &db->self, NULL, keyspace_cmd, ctx->future_of_mine); swarmkv_cmd_free(keyspace_cmd); keyspace_cmd=NULL; + assert(reply==NULL); break; } case ASYNC_WAIT: @@ -1373,7 +1380,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, } db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->ref_evbases, opts->total_threads); db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger); - swarmkv_mesh_set_on_msg_cb(db->mesh, on_msg_callback, db); + swarmkv_mesh_set_on_msg_cb(db->mesh, __on_msg_callback, db); db->mod_monitor=swarmkv_monitor_new(db->opts); //Note: if the cluster_port is 0, swarmkv_net_new updates db->self.cluster_port. @@ -1382,7 +1389,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, { goto error_out; } - swarmkv_net_set_on_msg_callback(db->net, on_msg_callback, db); + swarmkv_net_set_on_msg_callback(db->net, __on_msg_callback, db); swarmkv_net_set_monitor_handle(db->net, db->mod_monitor); @@ -1487,7 +1494,7 @@ void swarmkv_close(struct swarmkv * db) db->opts->is_assigned_to_db=0; swarmkv_options_free(db->opts); db->opts=NULL; - + pthread_barrier_destroy(&db->barrier); free(db); return; } diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index af8d6b7..3ae2ee3 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -34,6 +34,7 @@ struct swarmkv_options* swarmkv_options_new(void) strcpy(opts->consul_agent_host, "127.0.0.1"); uuid_generate(opts->bin_uuid); opts->nr_worker_threads=1; + opts->nr_caller_threads=1; opts->is_assigned_to_db=0; opts->run_for_leader_flag=1; return opts; @@ -373,6 +374,7 @@ struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, int argc=0; sds *argv=NULL; + argv=sdssplitargs(cmd_str, &argc); struct swarmkv_reply *reply=NULL; reply=swarmkv_command_on_argv(db, target, argc, argv); @@ -385,14 +387,14 @@ struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, return reply; } -struct swarmkv_reply *swarmkv_command(struct swarmkv *db,const char *format, ...) +struct swarmkv_reply *swarmkv_command(struct swarmkv *db, const char *format, ...) { char *cmd_str=NULL; va_list ap; va_start(ap, format); vasprintf(&cmd_str, format, ap); va_end(ap); - + int argc=0; sds *argv=NULL; argv=sdssplitargs(cmd_str, &argc); diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index 7c00c18..e18b15c 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -163,7 +163,7 @@ static void crdt_del_on_fail(enum e_future_error err, const char * what, void * return; } -void key_entry_deletion_notification(struct key_route_entry *key_entry, exec_cmd_func *exec_cmd, struct swarmkv *exec_cmd_handle) +void key_entry_deletion_notification(struct key_route_entry *key_entry, const node_t *self, exec_cmd_func *exec_cmd, struct swarmkv *exec_cmd_handle) { struct replica_node *replica=NULL, *tmp=NULL; struct swarmkv_cmd *crdt_del_cmd=swarmkv_cmd_new(3); @@ -175,8 +175,8 @@ void key_entry_deletion_notification(struct key_route_entry *key_entry, exec_cmd struct crdt_del_ctx *ctx=ALLOC(struct crdt_del_ctx, 1); node_copy(&ctx->peer, &replica->node); ctx->key=sdsdup(key_entry->key); - ctx->f=future_create("key_del", crdt_del_on_succ, crdt_del_on_fail, ctx); - exec_cmd(exec_cmd_handle, NULL, &replica->node, crdt_del_cmd, ctx->f); + ctx->f=future_create("_del", crdt_del_on_succ, crdt_del_on_fail, ctx); + exec_cmd(exec_cmd_handle, self, &replica->node, crdt_del_cmd, ctx->f); } swarmkv_cmd_free(crdt_del_cmd); return; @@ -1689,7 +1689,7 @@ enum cmd_exec_result key_route_generic(struct swarmkv_keyspace *ks, enum KEYSPAC if(key_entry) { HASH_DELETE(hh, slot_rt->keyroute_table, key_entry); - key_entry_deletion_notification(key_entry, ks->exec_cmd_func, ks->exec_cmd_handle); + key_entry_deletion_notification(key_entry, &ks->self, ks->exec_cmd_func, ks->exec_cmd_handle); key_entry_free(key_entry); *reply=swarmkv_reply_new_integer(1); } @@ -1978,29 +1978,31 @@ void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_i int real_tid=__gettid(ks->exec_cmd_handle); assert(real_tid==thread_id); - struct timespec now; int i=0, j=0; struct slot_runtime *slot_rt=NULL; struct timeout *t=NULL; struct key_route_entry *key_entry=NULL; - struct timespec start, end; + struct timespec start, end, now; int has_key_expired=0; clock_gettime(CLOCK_MONOTONIC_COARSE, &start); + clock_gettime(CLOCK_REALTIME, &now); for(i=0; i<KEYSPACE_SLOT_NUM; i++) { if(i%ks->opts->nr_worker_threads!=thread_id) continue; slot_rt=ks->slot_rts+i; if(!slot_rt->I_am_owner) continue; - if(0==pthread_mutex_trylock(&slot_rt->sanity_lock)) + if(ks->opts->sanity_check) { - pthread_mutex_unlock(&slot_rt->sanity_lock); - } - else - { - assert(0); + if(0==pthread_mutex_trylock(&slot_rt->sanity_lock)) + { + pthread_mutex_unlock(&slot_rt->sanity_lock); + } + else + { + assert(0); + } } - - clock_gettime(CLOCK_REALTIME, &now); + //if(0==timeouts_count(slot_rt->expires)) continue; timeouts_update(slot_rt->expires, now.tv_sec); t=timeouts_get(slot_rt->expires); while(t && j<ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP) @@ -2008,13 +2010,12 @@ void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_i key_entry=container_of(t, struct key_route_entry, timeout_handle); assert(key_entry->is_expiring); HASH_DELETE(hh, slot_rt->keyroute_table, key_entry); - key_entry_deletion_notification(key_entry, ks->exec_cmd_func, ks->exec_cmd_handle); + key_entry_deletion_notification(key_entry, &ks->self, ks->exec_cmd_func, ks->exec_cmd_handle); key_entry_free(key_entry); t=timeouts_get(slot_rt->expires); j++; has_key_expired=1; } - pthread_mutex_unlock(&slot_rt->sanity_lock); } clock_gettime(CLOCK_MONOTONIC_COARSE, &end); if(has_key_expired) diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c index c98c629..3612e25 100644 --- a/src/swarmkv_mesh.c +++ b/src/swarmkv_mesh.c @@ -41,6 +41,7 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest struct swarmkv_mesh_thread *curr_thr=mesh->threads+current_thread_id; struct swarmkv_mesh_thread *dest_thr=mesh->threads+dest_thread_id; ringbuf_t *dest_ring=dest_thr->ring; + assert(msg->magic == SWARMKV_MSG_MAGIC); if(curr_thr->workers[dest_thread_id]==NULL) { curr_thr->workers[dest_thread_id]=ringbuf_register(dest_ring, current_thread_id); @@ -73,21 +74,7 @@ void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_ mesh->msg_recv_arg=cb_arg; return; } -void swarmkv_mesh_free(struct swarmkv_mesh *mesh) -{ - for(int i=0; i<mesh->nr_thread; i++) - { - close(mesh->threads[i].efd); - ringbuf_release(mesh->threads[i].ring, RINGBUF_SIZE); - free(mesh->threads[i].ring); - free(mesh->threads[i].workers); - free(mesh->threads[i].buff); - event_del(mesh->threads[i].ev); - event_free(mesh->threads[i].ev); - } - free(mesh->threads); - free(mesh); -} + static void swarmkv_mesh_on_event(evutil_socket_t fd, short what, void * arg) { struct swarmkv_mesh_thread *thread=(struct swarmkv_mesh_thread*)arg; @@ -99,22 +86,19 @@ static void swarmkv_mesh_on_event(evutil_socket_t fd, short what, void * arg) { assert(0); } - size_t offset=0, ret=0; + size_t offset=0, len=0; struct swarmkv_msg *msg=NULL; + len=ringbuf_consume(ring, &offset); + assert(n_msg <= len/sizeof(struct swarmkv_msg*)); for(uint64_t i=0; i<n_msg; i++) { - ret=ringbuf_consume(ring, &offset); - if(ret==0) - { - assert(0); - break; - } - ringbuf_release(ring, sizeof(struct swarmkv_msg*)); msg=*(struct swarmkv_msg**)(thread->buff+offset); + assert(msg->magic==SWARMKV_MSG_MAGIC); + //ownership of msg is transferred to the callback function. mesh->on_msg_recv(msg, mesh->msg_recv_arg); - swarmkv_msg_free(msg); + offset+=sizeof(struct swarmkv_msg*); } - + ringbuf_release(ring, n_msg*sizeof(struct swarmkv_msg*)); return; } struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, struct log_handle *logger) @@ -142,5 +126,27 @@ struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, event_add(mesh->threads[i].ev, NULL); mesh->threads[i].ref_mesh=mesh; } + for(int i=0; i<mesh->nr_thread; i++) + { + for(int j=0; j<mesh->nr_thread; j++) + { + mesh->threads[i].workers[j]=ringbuf_register(mesh->threads[j].ring, i); + } + } return mesh; +} +void swarmkv_mesh_free(struct swarmkv_mesh *mesh) +{ + for(int i=0; i<mesh->nr_thread; i++) + { + close(mesh->threads[i].efd); + ringbuf_release(mesh->threads[i].ring, RINGBUF_SIZE); + free(mesh->threads[i].ring); + free(mesh->threads[i].workers); + free(mesh->threads[i].buff); + event_del(mesh->threads[i].ev); + event_free(mesh->threads[i].ev); + } + free(mesh->threads); + free(mesh); }
\ No newline at end of file diff --git a/src/swarmkv_message.c b/src/swarmkv_message.c index a1bad9d..9ce5517 100644 --- a/src/swarmkv_message.c +++ b/src/swarmkv_message.c @@ -210,10 +210,12 @@ void swarmkv_msg_free(struct swarmkv_msg *msg) if(msg->type==MSG_TYPE_CMD) { swarmkv_cmd_free(msg->cmd); + msg->cmd=NULL; } else { swarmkv_reply_free(msg->reply); + msg->reply=NULL; } free(msg); return; @@ -225,10 +227,12 @@ void swarmkv_msg_serialize(const struct swarmkv_msg *msg, char **blob, size_t *b size_t payload_len=0; if(msg->type==MSG_TYPE_CMD) { + assert(msg->cmd->argc>0); swarmkv_cmd_serialize(msg->cmd,&payload, &payload_len); } else { + assert(msg->reply!=NULL); swarmkv_reply_serialize(msg->reply, &payload, &payload_len); } *blob=ALLOC(char, payload_len+SWARMKV_MSG_HDR_SIZE); diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index 74cf7db..f32b2b0 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -335,8 +335,9 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg) break; } msg=swarmkv_msg_deserialize(recv_buff, msg_sz); + //onwership of msg is transfered to on_msg_cb thr->ref_net->on_msg_cb(msg, thr->ref_net->on_msg_cb_arg); - swarmkv_msg_free(msg); + //swarmkv_msg_free(msg); msg=NULL; evbuffer_drain(input, msg_sz); thr->stat.input_bytes += msg_sz; diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c index 9390aff..1065b99 100644 --- a/src/swarmkv_rpc.c +++ b/src/swarmkv_rpc.c @@ -30,6 +30,13 @@ struct swarmkv_rpc_mgr long long timed_out_rpcs; long long unknown_sequence; }; +void swarmkv_rpc_free(struct swarmkv_rpc *rpc) +{ + event_del(rpc->timeout_ev); + event_free(rpc->timeout_ev); + HASH_DELETE(hh, rpc->ref_mgr->rpc_table[rpc->thread_id], rpc); + free(rpc); +} struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_threads) { struct swarmkv_rpc_mgr *mgr=ALLOC(struct swarmkv_rpc_mgr, 1); @@ -42,16 +49,21 @@ struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, } void swarmkv_rpc_mgr_free(struct swarmkv_rpc_mgr *mgr) { + struct swarmkv_rpc *rpc=NULL, *tmp=NULL; + for(int i=0; i<mgr->nr_worker_threads; i++) + { + HASH_ITER(hh, mgr->rpc_table[i], rpc, tmp) + { + struct promise *p=future_to_promise(rpc->f); + promise_failed(p, FUTURE_ERROR_CANCEL, NULL); + swarmkv_rpc_free(rpc); + } + } + free(mgr->rpc_table); free(mgr); } -void swarmkv_rpc_free(struct swarmkv_rpc *rpc) -{ - event_del(rpc->timeout_ev); - event_free(rpc->timeout_ev); - HASH_DELETE(hh, rpc->ref_mgr->rpc_table[rpc->thread_id], rpc); - free(rpc); -} + static void rpc_timeout_callback(evutil_socket_t fd, short events, void *arg) { struct swarmkv_rpc *rpc=(struct swarmkv_rpc *)arg; diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 00ae03c..047db36 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -144,6 +144,7 @@ struct swarmkv_store_thread pthread_mutex_t sanity_lock; long long keys_to_sync; long long n_keys; + long long calls; }; struct swarmkv_store { @@ -539,13 +540,14 @@ void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struc { const char *symbol[]={"crdt_get", "crdt_merge", "crdt_join"}; struct crdt_generic_ctx *ctx=NULL; + assert(peer); ctx=ALLOC(struct crdt_generic_ctx, 1); ctx->op=op; ctx->store=store; ctx->key=sdsdup(cmd->argv[2]); node_copy(&ctx->peer, peer); ctx->f=future_create(symbol[op], crdt_generic_on_succ, crdt_generic_on_fail, ctx); - store->exec_cmd(store->exec_cmd_handle, NULL, peer, cmd, ctx->f); + store->exec_cmd(store->exec_cmd_handle, &store->self, peer, cmd, ctx->f); return; } #define MAX_SYNC_PER_PERIOD 100000 @@ -558,9 +560,9 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) int real_tid=__gettid(store->exec_cmd_handle); assert(real_tid==thread_id); clock_gettime(CLOCK_MONOTONIC, &start); - //Ease the lock contention among work threads + struct swarmkv_store_thread *thr=&store->threads[real_tid]; - + thr->calls++; struct sync_master *sync_master=sync_master_new(); DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp) { @@ -573,6 +575,11 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) ctr->is_in_sync_q=0; store->synced++; n_synced++; + for(int i=0; i<utarray_len(ctr->replica_node_list); i++) + { + printf("Debug: %s synced %s -> %s\n", ctr->obj.key, store->self.addr, ((node_t*)utarray_eltptr(ctr->replica_node_list, i))->addr); + } + if(n_synced>=MAX_SYNC_PER_PERIOD) break; } node_t peer; @@ -692,11 +699,7 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st crdt_get_cmd->argv[1]=sdsnew("get"); crdt_get_cmd->argv[2]=sdsdup(key); size_t n_replica_node=cmd->argc-3; - node_t *replica_nodes=ALLOC(node_t, n_replica_node); - for(size_t i=0; i<n_replica_node; i++) - { - node_init_from_sds(&replica_nodes[i], cmd->argv[3+i]); - } + ctr=store_lookup_scontainer(store, key); if(!ctr) { @@ -711,28 +714,27 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st } store_add_scontainer(store, ctr); } + node_t *replica_nodes=ALLOC(node_t, n_replica_node); for(size_t i=0; i<n_replica_node; i++) { + node_init_from_sds(&replica_nodes[i], cmd->argv[3+i]); + assert(node_compare(replica_nodes+i, &store->self)!=0); scontainer_add_replica_node(ctr, replica_nodes+i); } - + struct swarmkv_cmd *crdt_join_cmd=NULL; for(size_t i=0; i<n_replica_node; i++) { if(i<max_pull_node_num) { crdt_generic_call(store, CRDT_GET, crdt_get_cmd, replica_nodes+i); } - else - { - struct swarmkv_cmd *crdt_join_cmd=NULL; - crdt_join_cmd=swarmkv_cmd_new(4); - crdt_join_cmd->argv[0]=sdsnew("crdt"); - crdt_join_cmd->argv[1]=sdsnew("merge"); - crdt_join_cmd->argv[2]=sdsdup(ctr->obj.key); - crdt_join_cmd->argv[3]=node_addr2sds(replica_nodes+i); - crdt_generic_call(store, CRDT_JOIN, crdt_join_cmd, replica_nodes+i); - swarmkv_cmd_free(crdt_join_cmd); - } + crdt_join_cmd=swarmkv_cmd_new(4); + crdt_join_cmd->argv[0]=sdsnew("crdt"); + crdt_join_cmd->argv[1]=sdsnew("join"); + crdt_join_cmd->argv[2]=sdsdup(ctr->obj.key); + crdt_join_cmd->argv[3]=node_addr2sds(&store->self); + crdt_generic_call(store, CRDT_JOIN, crdt_join_cmd, replica_nodes+i); + swarmkv_cmd_free(crdt_join_cmd); } swarmkv_cmd_free(crdt_get_cmd); crdt_get_cmd=NULL; @@ -753,7 +755,12 @@ enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const st ctr=store_lookup_scontainer(store, key); if(ctr) { - scontainer_add_replica_node(ctr, accessing_node); +/* + if(node_compare(accessing_node, &store->self)) + { + scontainer_add_replica_node(ctr, accessing_node); + } +*/ if(ctr->obj.type!=OBJ_TYPE_UNDEFINED) { scontainer_serialize(ctr, &blob, &blob_sz); @@ -827,10 +834,12 @@ enum cmd_exec_result crdt_join_command(struct swarmkv_module *mod_store, const s struct scontainer *ctr=NULL; const sds key=cmd->argv[2]; assert(accessing_node!=NULL);//should never invoked by local - + node_t replica_node; node_init_from_sds(&replica_node, cmd->argv[3]); struct swarmkv_store *store=module2store(mod_store); + assert(node_compare(&replica_node, &store->self)!=0); + printf("CRDT JOIN %s %s <- %s\n", key, store->self.addr, cmd->argv[3]); ctr=store_lookup_scontainer(store, key); int added=0; if(ctr) |
