summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-08-17 22:29:08 +0800
committerZheng Chao <[email protected]>2023-08-17 22:29:08 +0800
commite4dc5e3f31d97f7ece5348860abfd4468934ceab (patch)
tree80618e69dc2ddc80551d93211446b41e0d9cfaec /src
parentb721c09d86229f9049456c4f8754f94b22a9b652 (diff)
performance test passed.
Diffstat (limited to 'src')
-rw-r--r--src/swarmkv.c32
-rw-r--r--src/swarmkv_store.c74
2 files changed, 72 insertions, 34 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c
index fdcefac..033c554 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -134,7 +134,15 @@ static void local_caller_on_success(void *result, void *user)
static void local_caller_on_fail(enum e_future_error err, const char * what, void * user)
{
struct local_caller_ctx *ctx=(struct local_caller_ctx*)user;
- struct swarmkv_reply *reply=swarmkv_reply_new_error(what);
+ struct swarmkv_reply *reply=NULL;
+ if(err==FUTURE_ERROR_CANCEL)
+ {
+ reply=swarmkv_reply_new_error("cancelled");
+ }
+ else
+ {
+ reply=swarmkv_reply_new_error(what);
+ }
ctx->cb(reply, ctx->cb_arg);
future_destroy(ctx->my_future);
free(ctx);
@@ -1213,7 +1221,7 @@ char *swarmkv_get_command_hint(struct swarmkv *db, const char* cmd_name)
sdsfreesplitres(argv,argc);
return NULL;
}
-static void loop_timeout_cb(evutil_socket_t fd, short event, void *arg)
+static void evloop_timeout_cb(evutil_socket_t fd, short event, void *arg)
{
struct swarmkv_thread_ctx *ctx = (struct swarmkv_thread_ctx *)arg;
event_base_loopbreak(ctx->evbase);
@@ -1227,7 +1235,7 @@ void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv)
struct event *timeout_event = NULL;
if(tv)
{
- timeout_event = event_new(ctx->evbase, -1, 0, loop_timeout_cb, ctx);
+ timeout_event = event_new(ctx->evbase, -1, 0, evloop_timeout_cb, ctx);
evtimer_add(timeout_event, tv);
event_base_loop(ctx->evbase, flags);
event_del(timeout_event);
@@ -1247,13 +1255,6 @@ void swarmkv_caller_loop_break(struct swarmkv *db)
assert(tid >= db->opts->nr_worker_threads);
event_base_loopbreak(db->ref_evbases[tid]);
}
-struct event_base *swarmkv_get_event_base(struct swarmkv *db)
-{
- int tid=__gettid(db);
- //must initiate from caller threads, and caller thread ID is larger than worker thread ID
- assert(tid >= db->opts->nr_worker_threads);
- return db->threads[tid].evbase;
-}
struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, char **err)
{
struct swarmkv *db = NULL;
@@ -1311,8 +1312,6 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
goto error_out;
}
swarmkv_net_set_on_msg_callback(db->net, __on_msg_callback, db);
-
-
swarmkv_net_set_monitor_handle(db->net, db->mod_monitor);
node_init(&db->self, opts->cluster_announce_ip, opts->cluster_announce_port);
@@ -1376,17 +1375,20 @@ void swarmkv_close(struct swarmkv * db)
pthread_join(db->threads[i].thr, NULL);
}
+ swarmkv_rpc_mgr_free(db->rpc_mgr);
+ db->rpc_mgr=NULL;
+ swarmkv_monitor_free(db->mod_monitor);
+ db->mod_monitor=NULL;
swarmkv_store_free(db->mod_store);
db->mod_store=NULL;
swarmkv_keyspace_free(db->mod_keyspace);
db->mod_keyspace=NULL;
- swarmkv_monitor_free(db->mod_monitor);
- db->mod_monitor=NULL;
swarmkv_mesh_free(db->mesh);
- swarmkv_rpc_mgr_free(db->rpc_mgr);
+ db->mesh=NULL;
swarmkv_net_free(db->net);
db->net=NULL;
+
struct swarmkv_cmd_spec *spec=NULL, *tmp_spec=NULL;
HASH_ITER(hh, db->command_table, spec, tmp_spec)
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 57f9815..26c688d 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -469,6 +469,7 @@ void crdt_generic_ctx_free(struct crdt_generic_ctx *ctx)
static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer)
{
struct scontainer *ctr=NULL, *tmp=NULL;
+ if(tid>=store->opts->nr_worker_threads) return;//swarmkv_close() is called.
HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp)
{
scontainer_remove_replica_node(ctr, peer);
@@ -547,19 +548,12 @@ void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struc
return;
}
#define MAX_SYNC_PER_PERIOD 100000
-void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
+void store_batch_sync(struct swarmkv_store *store, int tid)
{
- struct swarmkv_store *store=module2store(mod_store);
- struct scontainer *ctr=NULL, *tmp=NULL;
- struct timespec start, end;
int n_synced=0;
- int real_tid=__gettid(store->exec_cmd_handle);
- assert(real_tid==thread_id);
- clock_gettime(CLOCK_MONOTONIC, &start);
-
- struct swarmkv_store_thread *thr=&store->threads[real_tid];
- thr->calls++;
+ struct swarmkv_store_thread *thr=&store->threads[tid];
struct sync_master *sync_master=sync_master_new();
+ struct scontainer *ctr=NULL, *tmp=NULL;
DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp)
{
char *blob=NULL;
@@ -592,20 +586,62 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
n_synced++;
if(n_synced>=MAX_SYNC_PER_PERIOD) break;
}
+ node_t peer;
+ struct swarmkv_cmd *cmd=NULL;
+ int ret=1;
+ while(1)
+ {
+ ret=sync_master_get_cmd(sync_master, &peer, &cmd);
+ if(!ret) break;
+ crdt_generic_call(store, CRDT_MERGE, cmd, &peer);
+ swarmkv_cmd_free(cmd);
+ cmd=NULL;
+ }
+ sync_master_free(sync_master);
+}
+void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
+{
+ struct swarmkv_store *store=module2store(mod_store);
+ struct scontainer *ctr=NULL, *tmp=NULL;
+ struct timespec start, end;
+ int n_synced=0;
+ int real_tid=__gettid(store->exec_cmd_handle);
+ assert(real_tid==thread_id);
+ clock_gettime(CLOCK_MONOTONIC, &start);
+
+ struct swarmkv_store_thread *thr=&store->threads[real_tid];
+ thr->calls++;
if(store->opts->batch_sync_enabled)
{
- node_t peer;
- struct swarmkv_cmd *cmd=NULL;
- int ret=1;
- while(1)
+ store_batch_sync(store, real_tid);
+ }
+ else
+ {
+ DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp)
{
- ret=sync_master_get_cmd(sync_master, &peer, &cmd);
- if(!ret) break;
- crdt_generic_call(store, CRDT_MERGE, cmd, &peer);
+ char *blob=NULL;
+ size_t blob_sz=0;
+ scontainer_serialize(ctr, &blob, &blob_sz);
+
+ struct swarmkv_cmd *cmd=swarmkv_cmd_new(4);
+ cmd->argv[0]=sdsnew("crdt");
+ cmd->argv[1]=sdsnew("merge");
+ cmd->argv[2]=sdsnew(ctr->obj.key);
+ cmd->argv[3]=sdsnewlen(blob, blob_sz);
+ for(int i=0; i<utarray_len(ctr->replica_node_list); i++)
+ {
+ node_t *peer=utarray_eltptr(ctr->replica_node_list, i);
+ crdt_generic_call(store, CRDT_MERGE, cmd, peer);
+ }
swarmkv_cmd_free(cmd);
- cmd=NULL;
+ free(blob);
+
+ DL_DELETE(thr->sync_queue, ctr);
+ ctr->is_in_sync_q=0;
+ store->synced++;
+ n_synced++;
+ if(n_synced>=MAX_SYNC_PER_PERIOD) break;
}
- sync_master_free(sync_master);
}
thr->n_keys=HASH_COUNT(thr->obj_table);
DL_COUNT(thr->sync_queue, ctr, thr->keys_to_sync);