diff options
Diffstat (limited to 'src/swarmkv_store.c')
| -rw-r--r-- | src/swarmkv_store.c | 115 |
1 files changed, 68 insertions, 47 deletions
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 189a561..f1a3ac8 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -294,7 +294,7 @@ struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) { struct scontainer *ctr=NULL; int designated_tid=key2tid(key, store->opts->nr_worker_threads); - int real_tid=__gettid(store->exec_cmd_handle); + int real_tid=swarmkv_gettid(store->exec_cmd_handle); assert(designated_tid==real_tid); ctr=scontainer_find(&(store->threads[designated_tid].obj_table), key); if(0==pthread_mutex_trylock(&store->threads[designated_tid].sanity_lock)) @@ -541,16 +541,16 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) return; } -void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struct swarmkv_cmd *cmd, const node_t *peer) +void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const node_t *peer, int argc, const char *argv[], size_t *argv_len) { struct crdt_generic_ctx *ctx=NULL; assert(peer); ctx=ALLOC(struct crdt_generic_ctx, 1); ctx->op=op; ctx->store=store; - ctx->key=sdsdup(cmd->argv[2]); + ctx->key=sdsnew(argv[2]); node_copy(&ctx->peer, peer); - swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, cmd->argc, cmd->argv); + swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, argc, argv, argv_len); return; } #define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle" @@ -574,17 +574,21 @@ int store_batch_sync(struct swarmkv_store *store, int tid) } else { - struct swarmkv_cmd *cmd=swarmkv_cmd_new(4); - cmd->argv[0]=sdsnew("crdt"); - cmd->argv[1]=sdsnew("merge"); - cmd->argv[2]=sdsnew(ctr->obj.key); - cmd->argv[3]=sdsnewlen(blob, blob_sz); + const char *argv[4]; + size_t argv_len[4]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="merge"; + argv_len[1]=strlen(argv[1]); + argv[2]=ctr->obj.key; + argv_len[2]=sdslen(ctr->obj.key); + argv[3]=blob; + argv_len[3]=blob_sz; for(int i=0; i<utarray_len(ctr->replica_node_list); i++) { node_t *peer=utarray_eltptr(ctr->replica_node_list, i); - crdt_generic_call(store, CRDT_MERGE, cmd, peer); + crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len); } - swarmkv_cmd_free(cmd); free(blob); } DL_DELETE(thr->sync_queue, ctr); @@ -593,16 +597,22 @@ int store_batch_sync(struct swarmkv_store *store, int tid) n_synced++; if(n_synced>=MAX_SYNC_PER_PERIOD) break; } - node_t peer; - struct swarmkv_cmd *cmd=NULL; - int ret=1; - while(1) + + struct sync_task *task=NULL; + task=sync_master_get_task(sync_master); + while(task) { - ret=sync_master_get_cmd(sync_master, &peer, &cmd); - if(!ret) break; - crdt_generic_call(store, CRDT_MERGE, cmd, &peer); - swarmkv_cmd_free(cmd); - cmd=NULL; + size_t n_data=sync_task_key_count(task); + const char *argv[n_data*2+2]; + size_t argv_len[n_data*2+2]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="merge"; + argv_len[1]=strlen(argv[1]); + sync_task_read_key_blob(task, argv+2, argv_len+2, n_data*2); + crdt_generic_call(store, CRDT_MERGE, sync_task_peer(task), n_data*2+2, argv, argv_len); + sync_task_free(task); + task=sync_master_get_task(sync_master); } sync_master_free(sync_master); return n_synced; @@ -613,7 +623,7 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) struct scontainer *ctr=NULL, *tmp=NULL; struct timespec start, end; int n_synced=0; - int real_tid=__gettid(store->exec_cmd_handle); + int real_tid=swarmkv_gettid(store->exec_cmd_handle); assert(real_tid==thread_id); clock_gettime(CLOCK_MONOTONIC, &start); @@ -630,20 +640,22 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) char *blob=NULL; size_t blob_sz=0; scontainer_serialize(ctr, &blob, &blob_sz); - - struct swarmkv_cmd *cmd=swarmkv_cmd_new(4); - cmd->argv[0]=sdsnew("crdt"); - cmd->argv[1]=sdsnew("merge"); - cmd->argv[2]=sdsnew(ctr->obj.key); - cmd->argv[3]=sdsnewlen(blob, blob_sz); + const char *argv[4]; + size_t argv_len[4]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="merge"; + argv_len[1]=strlen(argv[1]); + argv[2]=ctr->obj.key; + argv_len[2]=sdslen(ctr->obj.key); + argv[3]=blob; + argv_len[3]=blob_sz; for(int i=0; i<utarray_len(ctr->replica_node_list); i++) { node_t *peer=utarray_eltptr(ctr->replica_node_list, i); - crdt_generic_call(store, CRDT_MERGE, cmd, peer); + crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len); } - swarmkv_cmd_free(cmd); free(blob); - DL_DELETE(thr->sync_queue, ctr); ctr->is_in_sync_q=0; store->synced++; @@ -752,13 +764,11 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st sds key=cmd->argv[2]; int tid=store_gettid(mod_store, key); - int real_tid=__gettid(store->exec_cmd_handle); + int real_tid=swarmkv_gettid(store->exec_cmd_handle); assert(tid==real_tid); - struct swarmkv_cmd *crdt_get_cmd=NULL; - crdt_get_cmd=swarmkv_cmd_new(3); - crdt_get_cmd->argv[0]=sdsnew("crdt"); - crdt_get_cmd->argv[1]=sdsnew("get"); - crdt_get_cmd->argv[2]=sdsdup(key); + + + size_t n_replica_node=cmd->argc-3; ctr=store_lookup_scontainer(store, key); @@ -782,23 +792,34 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st assert(node_compare(replica_nodes+i, &store->self)!=0); scontainer_add_replica_node(ctr, replica_nodes+i); } - struct swarmkv_cmd *crdt_meet_cmd=NULL; + const char *crdt_get_argv[3]; + size_t crdt_get_argv_len[3]; + crdt_get_argv[0]="crdt"; + crdt_get_argv_len[0]=strlen(crdt_get_argv[0]); + crdt_get_argv[1]="get"; + crdt_get_argv_len[1]=strlen(crdt_get_argv[1]); + crdt_get_argv[2]=key; + crdt_get_argv_len[2]=sdslen(key); + + const char *crdt_meet_argv[4]; + size_t crdt_meet_argv_len[4]; for(size_t i=0; i<n_replica_node; i++) { if(i<max_pull_node_num) { - crdt_generic_call(store, CRDT_GET, crdt_get_cmd, replica_nodes+i); + crdt_generic_call(store, CRDT_GET, replica_nodes+i, 3, crdt_get_argv, crdt_get_argv_len); } - crdt_meet_cmd=swarmkv_cmd_new(4); - crdt_meet_cmd->argv[0]=sdsnew("crdt"); - crdt_meet_cmd->argv[1]=sdsnew("meet"); - crdt_meet_cmd->argv[2]=sdsdup(ctr->obj.key); - crdt_meet_cmd->argv[3]=node_addr2sds(&store->self); - crdt_generic_call(store, CRDT_MEET, crdt_meet_cmd, replica_nodes+i); - swarmkv_cmd_free(crdt_meet_cmd); + + crdt_meet_argv[0]="crdt"; + crdt_meet_argv_len[0]=strlen(crdt_meet_argv[0]); + crdt_meet_argv[1]="meet"; + crdt_meet_argv_len[1]=strlen(crdt_meet_argv[1]); + crdt_meet_argv[2]=key; + crdt_meet_argv_len[2]=sdslen(key); + crdt_meet_argv[3]=node_addr2cstr(&store->self); + crdt_meet_argv_len[3]=strlen(crdt_meet_argv[3]); + crdt_generic_call(store, CRDT_MEET, replica_nodes+i, 4, crdt_meet_argv, crdt_meet_argv_len); } - swarmkv_cmd_free(crdt_get_cmd); - crdt_get_cmd=NULL; free(replica_nodes); *reply=swarmkv_reply_new_status("OK"); return FINISHED; |
