summaryrefslogtreecommitdiff
path: root/src/swarmkv_store.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/swarmkv_store.c')
-rw-r--r--src/swarmkv_store.c115
1 files changed, 68 insertions, 47 deletions
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 189a561..f1a3ac8 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -294,7 +294,7 @@ struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key)
{
struct scontainer *ctr=NULL;
int designated_tid=key2tid(key, store->opts->nr_worker_threads);
- int real_tid=__gettid(store->exec_cmd_handle);
+ int real_tid=swarmkv_gettid(store->exec_cmd_handle);
assert(designated_tid==real_tid);
ctr=scontainer_find(&(store->threads[designated_tid].obj_table), key);
if(0==pthread_mutex_trylock(&store->threads[designated_tid].sanity_lock))
@@ -541,16 +541,16 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
return;
}
-void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struct swarmkv_cmd *cmd, const node_t *peer)
+void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const node_t *peer, int argc, const char *argv[], size_t *argv_len)
{
struct crdt_generic_ctx *ctx=NULL;
assert(peer);
ctx=ALLOC(struct crdt_generic_ctx, 1);
ctx->op=op;
ctx->store=store;
- ctx->key=sdsdup(cmd->argv[2]);
+ ctx->key=sdsnew(argv[2]);
node_copy(&ctx->peer, peer);
- swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, cmd->argc, cmd->argv);
+ swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, argc, argv, argv_len);
return;
}
#define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle"
@@ -574,17 +574,21 @@ int store_batch_sync(struct swarmkv_store *store, int tid)
}
else
{
- struct swarmkv_cmd *cmd=swarmkv_cmd_new(4);
- cmd->argv[0]=sdsnew("crdt");
- cmd->argv[1]=sdsnew("merge");
- cmd->argv[2]=sdsnew(ctr->obj.key);
- cmd->argv[3]=sdsnewlen(blob, blob_sz);
+ const char *argv[4];
+ size_t argv_len[4];
+ argv[0]="crdt";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]="merge";
+ argv_len[1]=strlen(argv[1]);
+ argv[2]=ctr->obj.key;
+ argv_len[2]=sdslen(ctr->obj.key);
+ argv[3]=blob;
+ argv_len[3]=blob_sz;
for(int i=0; i<utarray_len(ctr->replica_node_list); i++)
{
node_t *peer=utarray_eltptr(ctr->replica_node_list, i);
- crdt_generic_call(store, CRDT_MERGE, cmd, peer);
+ crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len);
}
- swarmkv_cmd_free(cmd);
free(blob);
}
DL_DELETE(thr->sync_queue, ctr);
@@ -593,16 +597,22 @@ int store_batch_sync(struct swarmkv_store *store, int tid)
n_synced++;
if(n_synced>=MAX_SYNC_PER_PERIOD) break;
}
- node_t peer;
- struct swarmkv_cmd *cmd=NULL;
- int ret=1;
- while(1)
+
+ struct sync_task *task=NULL;
+ task=sync_master_get_task(sync_master);
+ while(task)
{
- ret=sync_master_get_cmd(sync_master, &peer, &cmd);
- if(!ret) break;
- crdt_generic_call(store, CRDT_MERGE, cmd, &peer);
- swarmkv_cmd_free(cmd);
- cmd=NULL;
+ size_t n_data=sync_task_key_count(task);
+ const char *argv[n_data*2+2];
+ size_t argv_len[n_data*2+2];
+ argv[0]="crdt";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]="merge";
+ argv_len[1]=strlen(argv[1]);
+ sync_task_read_key_blob(task, argv+2, argv_len+2, n_data*2);
+ crdt_generic_call(store, CRDT_MERGE, sync_task_peer(task), n_data*2+2, argv, argv_len);
+ sync_task_free(task);
+ task=sync_master_get_task(sync_master);
}
sync_master_free(sync_master);
return n_synced;
@@ -613,7 +623,7 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
struct scontainer *ctr=NULL, *tmp=NULL;
struct timespec start, end;
int n_synced=0;
- int real_tid=__gettid(store->exec_cmd_handle);
+ int real_tid=swarmkv_gettid(store->exec_cmd_handle);
assert(real_tid==thread_id);
clock_gettime(CLOCK_MONOTONIC, &start);
@@ -630,20 +640,22 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
char *blob=NULL;
size_t blob_sz=0;
scontainer_serialize(ctr, &blob, &blob_sz);
-
- struct swarmkv_cmd *cmd=swarmkv_cmd_new(4);
- cmd->argv[0]=sdsnew("crdt");
- cmd->argv[1]=sdsnew("merge");
- cmd->argv[2]=sdsnew(ctr->obj.key);
- cmd->argv[3]=sdsnewlen(blob, blob_sz);
+ const char *argv[4];
+ size_t argv_len[4];
+ argv[0]="crdt";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]="merge";
+ argv_len[1]=strlen(argv[1]);
+ argv[2]=ctr->obj.key;
+ argv_len[2]=sdslen(ctr->obj.key);
+ argv[3]=blob;
+ argv_len[3]=blob_sz;
for(int i=0; i<utarray_len(ctr->replica_node_list); i++)
{
node_t *peer=utarray_eltptr(ctr->replica_node_list, i);
- crdt_generic_call(store, CRDT_MERGE, cmd, peer);
+ crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len);
}
- swarmkv_cmd_free(cmd);
free(blob);
-
DL_DELETE(thr->sync_queue, ctr);
ctr->is_in_sync_q=0;
store->synced++;
@@ -752,13 +764,11 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st
sds key=cmd->argv[2];
int tid=store_gettid(mod_store, key);
- int real_tid=__gettid(store->exec_cmd_handle);
+ int real_tid=swarmkv_gettid(store->exec_cmd_handle);
assert(tid==real_tid);
- struct swarmkv_cmd *crdt_get_cmd=NULL;
- crdt_get_cmd=swarmkv_cmd_new(3);
- crdt_get_cmd->argv[0]=sdsnew("crdt");
- crdt_get_cmd->argv[1]=sdsnew("get");
- crdt_get_cmd->argv[2]=sdsdup(key);
+
+
+
size_t n_replica_node=cmd->argc-3;
ctr=store_lookup_scontainer(store, key);
@@ -782,23 +792,34 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st
assert(node_compare(replica_nodes+i, &store->self)!=0);
scontainer_add_replica_node(ctr, replica_nodes+i);
}
- struct swarmkv_cmd *crdt_meet_cmd=NULL;
+ const char *crdt_get_argv[3];
+ size_t crdt_get_argv_len[3];
+ crdt_get_argv[0]="crdt";
+ crdt_get_argv_len[0]=strlen(crdt_get_argv[0]);
+ crdt_get_argv[1]="get";
+ crdt_get_argv_len[1]=strlen(crdt_get_argv[1]);
+ crdt_get_argv[2]=key;
+ crdt_get_argv_len[2]=sdslen(key);
+
+ const char *crdt_meet_argv[4];
+ size_t crdt_meet_argv_len[4];
for(size_t i=0; i<n_replica_node; i++)
{
if(i<max_pull_node_num)
{
- crdt_generic_call(store, CRDT_GET, crdt_get_cmd, replica_nodes+i);
+ crdt_generic_call(store, CRDT_GET, replica_nodes+i, 3, crdt_get_argv, crdt_get_argv_len);
}
- crdt_meet_cmd=swarmkv_cmd_new(4);
- crdt_meet_cmd->argv[0]=sdsnew("crdt");
- crdt_meet_cmd->argv[1]=sdsnew("meet");
- crdt_meet_cmd->argv[2]=sdsdup(ctr->obj.key);
- crdt_meet_cmd->argv[3]=node_addr2sds(&store->self);
- crdt_generic_call(store, CRDT_MEET, crdt_meet_cmd, replica_nodes+i);
- swarmkv_cmd_free(crdt_meet_cmd);
+
+ crdt_meet_argv[0]="crdt";
+ crdt_meet_argv_len[0]=strlen(crdt_meet_argv[0]);
+ crdt_meet_argv[1]="meet";
+ crdt_meet_argv_len[1]=strlen(crdt_meet_argv[1]);
+ crdt_meet_argv[2]=key;
+ crdt_meet_argv_len[2]=sdslen(key);
+ crdt_meet_argv[3]=node_addr2cstr(&store->self);
+ crdt_meet_argv_len[3]=strlen(crdt_meet_argv[3]);
+ crdt_generic_call(store, CRDT_MEET, replica_nodes+i, 4, crdt_meet_argv, crdt_meet_argv_len);
}
- swarmkv_cmd_free(crdt_get_cmd);
- crdt_get_cmd=NULL;
free(replica_nodes);
*reply=swarmkv_reply_new_status("OK");
return FINISHED;