summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-08-03 21:30:39 +0800
committerZheng Chao <[email protected]>2023-08-03 21:30:39 +0800
commit3d97b9505d8e164ba6b1217aea4f3483c3a26f2a (patch)
tree7686a1fed61c242088e82626efc85bb13dc2ccb5 /src
parent523621088abc7f3a701646a7360ff015f9e5c025 (diff)
WIP: multi-thread test
Diffstat (limited to 'src')
-rw-r--r--src/inc_internal/swarmkv_common.h2
-rw-r--r--src/swarmkv.c77
-rw-r--r--src/swarmkv_api.c6
-rw-r--r--src/swarmkv_keyspace.c33
-rw-r--r--src/swarmkv_mesh.c56
-rw-r--r--src/swarmkv_message.c4
-rw-r--r--src/swarmkv_net.c3
-rw-r--r--src/swarmkv_rpc.c26
-rw-r--r--src/swarmkv_store.c53
9 files changed, 151 insertions, 109 deletions
diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h
index 063f0c1..48459f5 100644
--- a/src/inc_internal/swarmkv_common.h
+++ b/src/inc_internal/swarmkv_common.h
@@ -77,6 +77,7 @@ struct swarmkv_options
int nr_caller_threads;
int total_threads;
int is_assigned_to_db;
+ int sanity_check;
};
struct swarmkv_cmd
@@ -101,7 +102,6 @@ struct swarmkv_reply *swarmkv_reply_new_node(node_t *node, int is_ask);
struct swarmkv_reply *swarmkv_reply_new_nil(void);
struct swarmkv_reply *swarmkv_reply_new_status(const char *format, ...);
struct swarmkv_reply *swarmkv_reply_new_error(const char *format, ...);
-struct swarmkv_reply *swarmkv_reply_dup(const struct swarmkv_reply *origin);
void swarmkv_reply_merge_array(struct swarmkv_reply **dst, struct swarmkv_reply *src);
void swarmkv_reply_append_string(struct swarmkv_reply **dst, struct swarmkv_reply *src);
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 947f542..120e358 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -66,7 +66,8 @@ struct swarmkv
int thread_counter;
struct swarmkv_thread_ctx *threads;
-
+ pthread_barrier_t barrier;
+
struct event_base **ref_evbases;
node_t self;
@@ -100,6 +101,7 @@ void swarmkv_register_thread(struct swarmkv *db)
db->threads[thread_id].thread_id=thread_id;
return;
}
+
int __gettid(struct swarmkv *db)
{
int sys_tid=syscall(SYS_gettid);
@@ -113,7 +115,11 @@ int __gettid(struct swarmkv *db)
assert(0);
return -1;
}
-
+struct swarmkv_thread_ctx *swarmkv_get_thread_ctx(struct swarmkv *db)
+{
+ int tid=__gettid(db);
+ return &db->threads[tid];
+}
void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller);
struct local_caller_ctx
{
@@ -148,7 +154,7 @@ void exec_for_local(struct swarmkv *db, const struct swarmkv_cmd *cmd, const nod
local_caller_on_success,
local_caller_on_fail,
ctx);
- __exec_cmd(db, NULL, target_node, cmd, ctx->my_future);
+ __exec_cmd(db, &db->self, target_node, cmd, ctx->my_future);
return;
}
struct remote_caller_ctx
@@ -531,10 +537,6 @@ struct swarmkv_cmd_spec *get_spec_by_argv(struct swarmkv *db, size_t argc, char*
fclose(logfile);
sleep(1000);
}
-static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
-{
-
-}
void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg)
{
struct swarmkv_thread_ctx *thread=(struct swarmkv_thread_ctx *)arg;
@@ -543,9 +545,10 @@ void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg)
}
void *swarmkv_worker_thread(void *arg)
{
- struct swarmkv_thread_ctx* ctx = (struct swarmkv_thread_ctx *) arg;
- struct swarmkv *db=ctx->db;
- swarmkv_register_thread(ctx->db);
+ struct swarmkv *db = (struct swarmkv *)arg;
+ swarmkv_register_thread(db);
+ int tid=__gettid(db);
+ struct swarmkv_thread_ctx *ctx = db->threads+tid;
char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "swarmkv-%u", ctx->thread_id);
prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
@@ -555,6 +558,7 @@ void *swarmkv_worker_thread(void *arg)
evtimer_add(periodic_ev, &sync_interval);
ctx->is_dispatching=1;
+ pthread_barrier_wait(&db->barrier);
int ret=event_base_dispatch(ctx->evbase);
@@ -574,25 +578,17 @@ void *swarmkv_worker_thread(void *arg)
void swarmkv_threads_run(struct swarmkv *db)
{
+ pthread_barrier_init(&db->barrier, NULL, db->opts->nr_worker_threads+1);
int i = 0, ret=0;
for (i = 0; i < db->opts->nr_worker_threads; i++)
{
- ret=pthread_create(&db->threads[i].thr, NULL, swarmkv_worker_thread, (void *) (db->threads+i));
+ ret=pthread_create(&db->threads[i].thr, NULL, swarmkv_worker_thread, db);
if(ret !=0 )//error
{
log_fatal(db->logger, MODULE_SWAMRKV_CORE, "pthread_create() error %d", ret);
}
}
- int running_threads=0;
- while(running_threads < db->opts->nr_worker_threads)
- {
- running_threads=0;
- for (i = 0; i < db->opts->nr_worker_threads; i++)
- {
- running_threads += db->threads[i].is_dispatching;
- }
- usleep(10);
- }
+ pthread_barrier_wait(&db->barrier);
return;
}
@@ -781,7 +777,7 @@ static void crdt_add_on_success(void *result, void *user)
assert(reply->type==SWARMKV_REPLY_STATUS);
if(ctx->future_of_caller)
{
- __exec_cmd(ctx->db, NULL, NULL, ctx->cmd, ctx->future_of_caller);
+ __exec_cmd(ctx->db, &ctx->db->self, NULL, ctx->cmd, ctx->future_of_caller);
}
cmd_ctx_free(ctx);
}
@@ -813,7 +809,7 @@ static void key_route_on_success(void *result, void *user)
}
if(n_replica_node>0)
{
- __exec_cmd(ctx->db, NULL, replica_nodes+0, ctx->cmd, ctx->future_of_caller);
+ __exec_cmd(ctx->db, &ctx->db->self, replica_nodes+0, ctx->cmd, ctx->future_of_caller);
}
if(self_is_a_replica)
{
@@ -821,7 +817,7 @@ static void key_route_on_success(void *result, void *user)
struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(key, replica_nodes, n_replica_node);
crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller);
crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx);
- __exec_cmd(ctx->db, NULL, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine);
+ __exec_cmd(ctx->db, &ctx->db->self, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine);
swarmkv_cmd_free(crdt_add_cmd);
}
free(replica_nodes);
@@ -846,7 +842,7 @@ static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *
}
return tid;
}
-void on_msg_callback(struct swarmkv_msg *msg, void *arg)
+void __on_msg_callback(struct swarmkv_msg *msg, void *arg)
{
struct swarmkv *db = (struct swarmkv *)arg;
int cur_tid=__gettid(db);
@@ -854,6 +850,7 @@ void on_msg_callback(struct swarmkv_msg *msg, void *arg)
{
assert(cur_tid < db->opts->nr_worker_threads);
exec_for_remote(db, msg);
+ swarmkv_msg_free(msg);
}
else
{
@@ -864,6 +861,7 @@ void on_msg_callback(struct swarmkv_msg *msg, void *arg)
else
{
swarmkv_rpc_complete(db->rpc_mgr, cur_tid, msg->sequence, msg->reply);
+ swarmkv_msg_free(msg);
}
}
}
@@ -873,7 +871,12 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
struct swarmkv_reply *reply=NULL;
struct promise *p=NULL;
int cur_tid=__gettid(db);
- node_t peer;
+
+ int is_called_by_self=1;
+ if(accessing_node && 0!=node_compare(&db->self, accessing_node))
+ {
+ is_called_by_self=0;
+ }
spec=get_spec_by_argv(db, cmd->argc, cmd->argv);
if(!spec)
{
@@ -891,8 +894,8 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
swarmkv_reply_free(reply);
return;
}
- /*Initiating a non-auto-route command from local is NOT allowed.*/
- if(!spec->auto_route && !accessing_node && !target_node)
+ /*Initiating a non-auto-route command without target from local is NOT allowed.*/
+ if(!spec->auto_route && is_called_by_self && !target_node)
{
reply=swarmkv_reply_new_error(error_no_target_node, spec->name);
p=future_to_promise(future_of_caller);
@@ -913,7 +916,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
}
if(cur_tid != target_tid)
{
- //cmd is executed in target thread's on_msg_callback
+ //cmd will be executed in target thread's on_msg_callback
long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller);
msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, sequence);
swarmkv_mesh_send(db->mesh, cur_tid, target_tid, msg);
@@ -928,13 +931,14 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end));
- if(accessing_node && node_compare(&db->self, accessing_node))//Remote call, non-recursive exec
+ if(!is_called_by_self)//Remote call, non-recursive exec
{
struct promise *p=future_to_promise(future_of_caller);
promise_success(p, reply);
swarmkv_reply_free(reply);
return;
}
+
switch(exec_ret)
{
case FINISHED:
@@ -946,10 +950,12 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
}
case REDIRECT:
{
+ node_t peer;
struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, future_of_caller);
node_init_from_reply(&peer, reply);
+ swarmkv_reply_free(reply);
ctx->future_of_mine=future_create("peer_exec", peer_exec_on_success, generic_on_fail, ctx);
- __exec_cmd(db, NULL, &peer, cmd, ctx->future_of_mine);
+ __exec_cmd(db, &db->self, &peer, cmd, ctx->future_of_mine);
break;
}
case NEED_KEY_ROUTE:
@@ -957,9 +963,10 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
struct swarmkv_cmd *keyspace_cmd=make_keyroute_cmd(spec->flag, cmd->argv[spec->key_offset], db->opts->dryrun? NULL:(&db->self));
struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, future_of_caller);
ctx->future_of_mine=future_create("key_route", key_route_on_success, generic_on_fail, ctx);
- __exec_cmd(db, NULL, NULL, keyspace_cmd, ctx->future_of_mine);
+ __exec_cmd(db, &db->self, NULL, keyspace_cmd, ctx->future_of_mine);
swarmkv_cmd_free(keyspace_cmd);
keyspace_cmd=NULL;
+ assert(reply==NULL);
break;
}
case ASYNC_WAIT:
@@ -1373,7 +1380,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
}
db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->ref_evbases, opts->total_threads);
db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger);
- swarmkv_mesh_set_on_msg_cb(db->mesh, on_msg_callback, db);
+ swarmkv_mesh_set_on_msg_cb(db->mesh, __on_msg_callback, db);
db->mod_monitor=swarmkv_monitor_new(db->opts);
//Note: if the cluster_port is 0, swarmkv_net_new updates db->self.cluster_port.
@@ -1382,7 +1389,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
{
goto error_out;
}
- swarmkv_net_set_on_msg_callback(db->net, on_msg_callback, db);
+ swarmkv_net_set_on_msg_callback(db->net, __on_msg_callback, db);
swarmkv_net_set_monitor_handle(db->net, db->mod_monitor);
@@ -1487,7 +1494,7 @@ void swarmkv_close(struct swarmkv * db)
db->opts->is_assigned_to_db=0;
swarmkv_options_free(db->opts);
db->opts=NULL;
-
+ pthread_barrier_destroy(&db->barrier);
free(db);
return;
}
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index af8d6b7..3ae2ee3 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -34,6 +34,7 @@ struct swarmkv_options* swarmkv_options_new(void)
strcpy(opts->consul_agent_host, "127.0.0.1");
uuid_generate(opts->bin_uuid);
opts->nr_worker_threads=1;
+ opts->nr_caller_threads=1;
opts->is_assigned_to_db=0;
opts->run_for_leader_flag=1;
return opts;
@@ -373,6 +374,7 @@ struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target,
int argc=0;
sds *argv=NULL;
+
argv=sdssplitargs(cmd_str, &argc);
struct swarmkv_reply *reply=NULL;
reply=swarmkv_command_on_argv(db, target, argc, argv);
@@ -385,14 +387,14 @@ struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target,
return reply;
}
-struct swarmkv_reply *swarmkv_command(struct swarmkv *db,const char *format, ...)
+struct swarmkv_reply *swarmkv_command(struct swarmkv *db, const char *format, ...)
{
char *cmd_str=NULL;
va_list ap;
va_start(ap, format);
vasprintf(&cmd_str, format, ap);
va_end(ap);
-
+
int argc=0;
sds *argv=NULL;
argv=sdssplitargs(cmd_str, &argc);
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index 7c00c18..e18b15c 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -163,7 +163,7 @@ static void crdt_del_on_fail(enum e_future_error err, const char * what, void *
return;
}
-void key_entry_deletion_notification(struct key_route_entry *key_entry, exec_cmd_func *exec_cmd, struct swarmkv *exec_cmd_handle)
+void key_entry_deletion_notification(struct key_route_entry *key_entry, const node_t *self, exec_cmd_func *exec_cmd, struct swarmkv *exec_cmd_handle)
{
struct replica_node *replica=NULL, *tmp=NULL;
struct swarmkv_cmd *crdt_del_cmd=swarmkv_cmd_new(3);
@@ -175,8 +175,8 @@ void key_entry_deletion_notification(struct key_route_entry *key_entry, exec_cmd
struct crdt_del_ctx *ctx=ALLOC(struct crdt_del_ctx, 1);
node_copy(&ctx->peer, &replica->node);
ctx->key=sdsdup(key_entry->key);
- ctx->f=future_create("key_del", crdt_del_on_succ, crdt_del_on_fail, ctx);
- exec_cmd(exec_cmd_handle, NULL, &replica->node, crdt_del_cmd, ctx->f);
+ ctx->f=future_create("_del", crdt_del_on_succ, crdt_del_on_fail, ctx);
+ exec_cmd(exec_cmd_handle, self, &replica->node, crdt_del_cmd, ctx->f);
}
swarmkv_cmd_free(crdt_del_cmd);
return;
@@ -1689,7 +1689,7 @@ enum cmd_exec_result key_route_generic(struct swarmkv_keyspace *ks, enum KEYSPAC
if(key_entry)
{
HASH_DELETE(hh, slot_rt->keyroute_table, key_entry);
- key_entry_deletion_notification(key_entry, ks->exec_cmd_func, ks->exec_cmd_handle);
+ key_entry_deletion_notification(key_entry, &ks->self, ks->exec_cmd_func, ks->exec_cmd_handle);
key_entry_free(key_entry);
*reply=swarmkv_reply_new_integer(1);
}
@@ -1978,29 +1978,31 @@ void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_i
int real_tid=__gettid(ks->exec_cmd_handle);
assert(real_tid==thread_id);
- struct timespec now;
int i=0, j=0;
struct slot_runtime *slot_rt=NULL;
struct timeout *t=NULL;
struct key_route_entry *key_entry=NULL;
- struct timespec start, end;
+ struct timespec start, end, now;
int has_key_expired=0;
clock_gettime(CLOCK_MONOTONIC_COARSE, &start);
+ clock_gettime(CLOCK_REALTIME, &now);
for(i=0; i<KEYSPACE_SLOT_NUM; i++)
{
if(i%ks->opts->nr_worker_threads!=thread_id) continue;
slot_rt=ks->slot_rts+i;
if(!slot_rt->I_am_owner) continue;
- if(0==pthread_mutex_trylock(&slot_rt->sanity_lock))
+ if(ks->opts->sanity_check)
{
- pthread_mutex_unlock(&slot_rt->sanity_lock);
- }
- else
- {
- assert(0);
+ if(0==pthread_mutex_trylock(&slot_rt->sanity_lock))
+ {
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
+ }
+ else
+ {
+ assert(0);
+ }
}
-
- clock_gettime(CLOCK_REALTIME, &now);
+ //if(0==timeouts_count(slot_rt->expires)) continue;
timeouts_update(slot_rt->expires, now.tv_sec);
t=timeouts_get(slot_rt->expires);
while(t && j<ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP)
@@ -2008,13 +2010,12 @@ void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_i
key_entry=container_of(t, struct key_route_entry, timeout_handle);
assert(key_entry->is_expiring);
HASH_DELETE(hh, slot_rt->keyroute_table, key_entry);
- key_entry_deletion_notification(key_entry, ks->exec_cmd_func, ks->exec_cmd_handle);
+ key_entry_deletion_notification(key_entry, &ks->self, ks->exec_cmd_func, ks->exec_cmd_handle);
key_entry_free(key_entry);
t=timeouts_get(slot_rt->expires);
j++;
has_key_expired=1;
}
- pthread_mutex_unlock(&slot_rt->sanity_lock);
}
clock_gettime(CLOCK_MONOTONIC_COARSE, &end);
if(has_key_expired)
diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c
index c98c629..3612e25 100644
--- a/src/swarmkv_mesh.c
+++ b/src/swarmkv_mesh.c
@@ -41,6 +41,7 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest
struct swarmkv_mesh_thread *curr_thr=mesh->threads+current_thread_id;
struct swarmkv_mesh_thread *dest_thr=mesh->threads+dest_thread_id;
ringbuf_t *dest_ring=dest_thr->ring;
+ assert(msg->magic == SWARMKV_MSG_MAGIC);
if(curr_thr->workers[dest_thread_id]==NULL)
{
curr_thr->workers[dest_thread_id]=ringbuf_register(dest_ring, current_thread_id);
@@ -73,21 +74,7 @@ void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_
mesh->msg_recv_arg=cb_arg;
return;
}
-void swarmkv_mesh_free(struct swarmkv_mesh *mesh)
-{
- for(int i=0; i<mesh->nr_thread; i++)
- {
- close(mesh->threads[i].efd);
- ringbuf_release(mesh->threads[i].ring, RINGBUF_SIZE);
- free(mesh->threads[i].ring);
- free(mesh->threads[i].workers);
- free(mesh->threads[i].buff);
- event_del(mesh->threads[i].ev);
- event_free(mesh->threads[i].ev);
- }
- free(mesh->threads);
- free(mesh);
-}
+
static void swarmkv_mesh_on_event(evutil_socket_t fd, short what, void * arg)
{
struct swarmkv_mesh_thread *thread=(struct swarmkv_mesh_thread*)arg;
@@ -99,22 +86,19 @@ static void swarmkv_mesh_on_event(evutil_socket_t fd, short what, void * arg)
{
assert(0);
}
- size_t offset=0, ret=0;
+ size_t offset=0, len=0;
struct swarmkv_msg *msg=NULL;
+ len=ringbuf_consume(ring, &offset);
+ assert(n_msg <= len/sizeof(struct swarmkv_msg*));
for(uint64_t i=0; i<n_msg; i++)
{
- ret=ringbuf_consume(ring, &offset);
- if(ret==0)
- {
- assert(0);
- break;
- }
- ringbuf_release(ring, sizeof(struct swarmkv_msg*));
msg=*(struct swarmkv_msg**)(thread->buff+offset);
+ assert(msg->magic==SWARMKV_MSG_MAGIC);
+ //ownership of msg is transferred to the callback function.
mesh->on_msg_recv(msg, mesh->msg_recv_arg);
- swarmkv_msg_free(msg);
+ offset+=sizeof(struct swarmkv_msg*);
}
-
+ ringbuf_release(ring, n_msg*sizeof(struct swarmkv_msg*));
return;
}
struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, struct log_handle *logger)
@@ -142,5 +126,27 @@ struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads,
event_add(mesh->threads[i].ev, NULL);
mesh->threads[i].ref_mesh=mesh;
}
+ for(int i=0; i<mesh->nr_thread; i++)
+ {
+ for(int j=0; j<mesh->nr_thread; j++)
+ {
+ mesh->threads[i].workers[j]=ringbuf_register(mesh->threads[j].ring, i);
+ }
+ }
return mesh;
+}
+void swarmkv_mesh_free(struct swarmkv_mesh *mesh)
+{
+ for(int i=0; i<mesh->nr_thread; i++)
+ {
+ close(mesh->threads[i].efd);
+ ringbuf_release(mesh->threads[i].ring, RINGBUF_SIZE);
+ free(mesh->threads[i].ring);
+ free(mesh->threads[i].workers);
+ free(mesh->threads[i].buff);
+ event_del(mesh->threads[i].ev);
+ event_free(mesh->threads[i].ev);
+ }
+ free(mesh->threads);
+ free(mesh);
} \ No newline at end of file
diff --git a/src/swarmkv_message.c b/src/swarmkv_message.c
index a1bad9d..9ce5517 100644
--- a/src/swarmkv_message.c
+++ b/src/swarmkv_message.c
@@ -210,10 +210,12 @@ void swarmkv_msg_free(struct swarmkv_msg *msg)
if(msg->type==MSG_TYPE_CMD)
{
swarmkv_cmd_free(msg->cmd);
+ msg->cmd=NULL;
}
else
{
swarmkv_reply_free(msg->reply);
+ msg->reply=NULL;
}
free(msg);
return;
@@ -225,10 +227,12 @@ void swarmkv_msg_serialize(const struct swarmkv_msg *msg, char **blob, size_t *b
size_t payload_len=0;
if(msg->type==MSG_TYPE_CMD)
{
+ assert(msg->cmd->argc>0);
swarmkv_cmd_serialize(msg->cmd,&payload, &payload_len);
}
else
{
+ assert(msg->reply!=NULL);
swarmkv_reply_serialize(msg->reply, &payload, &payload_len);
}
*blob=ALLOC(char, payload_len+SWARMKV_MSG_HDR_SIZE);
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index 74cf7db..f32b2b0 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -335,8 +335,9 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg)
break;
}
msg=swarmkv_msg_deserialize(recv_buff, msg_sz);
+ //onwership of msg is transfered to on_msg_cb
thr->ref_net->on_msg_cb(msg, thr->ref_net->on_msg_cb_arg);
- swarmkv_msg_free(msg);
+ //swarmkv_msg_free(msg);
msg=NULL;
evbuffer_drain(input, msg_sz);
thr->stat.input_bytes += msg_sz;
diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c
index 9390aff..1065b99 100644
--- a/src/swarmkv_rpc.c
+++ b/src/swarmkv_rpc.c
@@ -30,6 +30,13 @@ struct swarmkv_rpc_mgr
long long timed_out_rpcs;
long long unknown_sequence;
};
+void swarmkv_rpc_free(struct swarmkv_rpc *rpc)
+{
+ event_del(rpc->timeout_ev);
+ event_free(rpc->timeout_ev);
+ HASH_DELETE(hh, rpc->ref_mgr->rpc_table[rpc->thread_id], rpc);
+ free(rpc);
+}
struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_threads)
{
struct swarmkv_rpc_mgr *mgr=ALLOC(struct swarmkv_rpc_mgr, 1);
@@ -42,16 +49,21 @@ struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts,
}
void swarmkv_rpc_mgr_free(struct swarmkv_rpc_mgr *mgr)
{
+ struct swarmkv_rpc *rpc=NULL, *tmp=NULL;
+ for(int i=0; i<mgr->nr_worker_threads; i++)
+ {
+ HASH_ITER(hh, mgr->rpc_table[i], rpc, tmp)
+ {
+ struct promise *p=future_to_promise(rpc->f);
+ promise_failed(p, FUTURE_ERROR_CANCEL, NULL);
+ swarmkv_rpc_free(rpc);
+ }
+ }
+
free(mgr->rpc_table);
free(mgr);
}
-void swarmkv_rpc_free(struct swarmkv_rpc *rpc)
-{
- event_del(rpc->timeout_ev);
- event_free(rpc->timeout_ev);
- HASH_DELETE(hh, rpc->ref_mgr->rpc_table[rpc->thread_id], rpc);
- free(rpc);
-}
+
static void rpc_timeout_callback(evutil_socket_t fd, short events, void *arg)
{
struct swarmkv_rpc *rpc=(struct swarmkv_rpc *)arg;
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 00ae03c..047db36 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -144,6 +144,7 @@ struct swarmkv_store_thread
pthread_mutex_t sanity_lock;
long long keys_to_sync;
long long n_keys;
+ long long calls;
};
struct swarmkv_store
{
@@ -539,13 +540,14 @@ void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struc
{
const char *symbol[]={"crdt_get", "crdt_merge", "crdt_join"};
struct crdt_generic_ctx *ctx=NULL;
+ assert(peer);
ctx=ALLOC(struct crdt_generic_ctx, 1);
ctx->op=op;
ctx->store=store;
ctx->key=sdsdup(cmd->argv[2]);
node_copy(&ctx->peer, peer);
ctx->f=future_create(symbol[op], crdt_generic_on_succ, crdt_generic_on_fail, ctx);
- store->exec_cmd(store->exec_cmd_handle, NULL, peer, cmd, ctx->f);
+ store->exec_cmd(store->exec_cmd_handle, &store->self, peer, cmd, ctx->f);
return;
}
#define MAX_SYNC_PER_PERIOD 100000
@@ -558,9 +560,9 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
int real_tid=__gettid(store->exec_cmd_handle);
assert(real_tid==thread_id);
clock_gettime(CLOCK_MONOTONIC, &start);
- //Ease the lock contention among work threads
+
struct swarmkv_store_thread *thr=&store->threads[real_tid];
-
+ thr->calls++;
struct sync_master *sync_master=sync_master_new();
DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp)
{
@@ -573,6 +575,11 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
ctr->is_in_sync_q=0;
store->synced++;
n_synced++;
+ for(int i=0; i<utarray_len(ctr->replica_node_list); i++)
+ {
+ printf("Debug: %s synced %s -> %s\n", ctr->obj.key, store->self.addr, ((node_t*)utarray_eltptr(ctr->replica_node_list, i))->addr);
+ }
+
if(n_synced>=MAX_SYNC_PER_PERIOD) break;
}
node_t peer;
@@ -692,11 +699,7 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st
crdt_get_cmd->argv[1]=sdsnew("get");
crdt_get_cmd->argv[2]=sdsdup(key);
size_t n_replica_node=cmd->argc-3;
- node_t *replica_nodes=ALLOC(node_t, n_replica_node);
- for(size_t i=0; i<n_replica_node; i++)
- {
- node_init_from_sds(&replica_nodes[i], cmd->argv[3+i]);
- }
+
ctr=store_lookup_scontainer(store, key);
if(!ctr)
{
@@ -711,28 +714,27 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st
}
store_add_scontainer(store, ctr);
}
+ node_t *replica_nodes=ALLOC(node_t, n_replica_node);
for(size_t i=0; i<n_replica_node; i++)
{
+ node_init_from_sds(&replica_nodes[i], cmd->argv[3+i]);
+ assert(node_compare(replica_nodes+i, &store->self)!=0);
scontainer_add_replica_node(ctr, replica_nodes+i);
}
-
+ struct swarmkv_cmd *crdt_join_cmd=NULL;
for(size_t i=0; i<n_replica_node; i++)
{
if(i<max_pull_node_num)
{
crdt_generic_call(store, CRDT_GET, crdt_get_cmd, replica_nodes+i);
}
- else
- {
- struct swarmkv_cmd *crdt_join_cmd=NULL;
- crdt_join_cmd=swarmkv_cmd_new(4);
- crdt_join_cmd->argv[0]=sdsnew("crdt");
- crdt_join_cmd->argv[1]=sdsnew("merge");
- crdt_join_cmd->argv[2]=sdsdup(ctr->obj.key);
- crdt_join_cmd->argv[3]=node_addr2sds(replica_nodes+i);
- crdt_generic_call(store, CRDT_JOIN, crdt_join_cmd, replica_nodes+i);
- swarmkv_cmd_free(crdt_join_cmd);
- }
+ crdt_join_cmd=swarmkv_cmd_new(4);
+ crdt_join_cmd->argv[0]=sdsnew("crdt");
+ crdt_join_cmd->argv[1]=sdsnew("join");
+ crdt_join_cmd->argv[2]=sdsdup(ctr->obj.key);
+ crdt_join_cmd->argv[3]=node_addr2sds(&store->self);
+ crdt_generic_call(store, CRDT_JOIN, crdt_join_cmd, replica_nodes+i);
+ swarmkv_cmd_free(crdt_join_cmd);
}
swarmkv_cmd_free(crdt_get_cmd);
crdt_get_cmd=NULL;
@@ -753,7 +755,12 @@ enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const st
ctr=store_lookup_scontainer(store, key);
if(ctr)
{
- scontainer_add_replica_node(ctr, accessing_node);
+/*
+ if(node_compare(accessing_node, &store->self))
+ {
+ scontainer_add_replica_node(ctr, accessing_node);
+ }
+*/
if(ctr->obj.type!=OBJ_TYPE_UNDEFINED)
{
scontainer_serialize(ctr, &blob, &blob_sz);
@@ -827,10 +834,12 @@ enum cmd_exec_result crdt_join_command(struct swarmkv_module *mod_store, const s
struct scontainer *ctr=NULL;
const sds key=cmd->argv[2];
assert(accessing_node!=NULL);//should never invoked by local
-
+
node_t replica_node;
node_init_from_sds(&replica_node, cmd->argv[3]);
struct swarmkv_store *store=module2store(mod_store);
+ assert(node_compare(&replica_node, &store->self)!=0);
+ printf("CRDT JOIN %s %s <- %s\n", key, store->self.addr, cmd->argv[3]);
ctr=store_lookup_scontainer(store, key);
int added=0;
if(ctr)