diff options
| author | Zheng Chao <[email protected]> | 2023-08-25 22:16:17 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-08-25 22:16:17 +0800 |
| commit | 2768d7affe693557ed75de54777862c4aee7c37c (patch) | |
| tree | 3a90a2d53da5f32a65935d20353879152efaef62 /src | |
| parent | 9ae4145fa7ee5a59c8920bcf40fa322f6617f577 (diff) | |
The `LATENCY` command works again. The `INFO` command now returns RPC information.
Diffstat (limited to 'src')
| -rw-r--r-- | src/inc_internal/swarmkv_message.h | 4 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_monitor.h | 3 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_rpc.h | 12 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_store.h | 1 | ||||
| -rw-r--r-- | src/swarmkv.c | 59 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 2 | ||||
| -rw-r--r-- | src/swarmkv_message.c | 8 | ||||
| -rw-r--r-- | src/swarmkv_monitor.c | 158 | ||||
| -rw-r--r-- | src/swarmkv_rpc.c | 17 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 14 |
10 files changed, 190 insertions, 88 deletions
diff --git a/src/inc_internal/swarmkv_message.h b/src/inc_internal/swarmkv_message.h index f1ad7e8..830a323 100644 --- a/src/inc_internal/swarmkv_message.h +++ b/src/inc_internal/swarmkv_message.h @@ -26,8 +26,8 @@ struct swarmkv_msg }; #define SWARMKV_MSG_HDR_SIZE offsetof(struct swarmkv_msg, payload) -struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, const node_t *executer, long long sequence); -struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, const node_t *executer, long long sequence); +struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, const node_t *executor, long long sequence); +struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, const node_t *executor, long long sequence); void swarmkv_msg_free(struct swarmkv_msg *msg); //on_msg_callback_t transfers ownership of msg to the callback function diff --git a/src/inc_internal/swarmkv_monitor.h b/src/inc_internal/swarmkv_monitor.h index ac7b387..67273ea 100644 --- a/src/inc_internal/swarmkv_monitor.h +++ b/src/inc_internal/swarmkv_monitor.h @@ -7,9 +7,10 @@ #include <string.h> struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts); -void swarmkv_monitor_register_commands(struct swarmkv_module *mod_monitor, const char *command); +void swarmkv_monitor_register_command(struct swarmkv_module *mod_monitor, const char *command); void swarmkv_monitor_free(struct swarmkv_module *mod_monitor); void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const char *cmd_name, long long latency_usec); +void swarmkv_monitor_register_event(struct swarmkv_module *mod_monitor, const char *event); void swarmkv_monitor_record_peer(struct swarmkv_module *mod_monitor, node_t *peer, long long latency_usec, int thread_id); void swarmkv_monitor_record_event(struct swarmkv_module *mod_monitor, const char *event_name, long long latency_usec); diff --git a/src/inc_internal/swarmkv_rpc.h b/src/inc_internal/swarmkv_rpc.h index bd4a0a7..7ad95ad 100644 --- a/src/inc_internal/swarmkv_rpc.h +++ b/src/inc_internal/swarmkv_rpc.h @@ -1,6 +1,7 @@ #pragma once #include "swarmkv/swarmkv.h" #include "swarmkv_common.h" +#include "swarmkv_cmd_spec.h" #include "future_promise.h" struct swarmkv_rpc_mgr; @@ -8,5 +9,12 @@ struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, void swarmkv_rpc_mgr_free(struct swarmkv_rpc_mgr *mgr); //Return a sequence number, which can be used to complete the request long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f); -void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *response); -size_t swarmkv_rpc_mgr_count(struct swarmkv_rpc_mgr *mgr, int thread_id);
\ No newline at end of file +//Return latency in microseconds, or -1 if failed to complete the rpc. +long long swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *response); +size_t swarmkv_rpc_mgr_count(struct swarmkv_rpc_mgr *mgr, int thread_id); +struct swarmkv_rpc_mgr_info +{ + long long timed_out_rpcs; + long long unknown_sequence; +}; +void swarmkv_rpc_mgr_info(struct swarmkv_rpc_mgr *mgr, struct swarmkv_rpc_mgr_info *info);
\ No newline at end of file diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h index 224e225..29962e1 100644 --- a/src/inc_internal/swarmkv_store.h +++ b/src/inc_internal/swarmkv_store.h @@ -52,7 +52,6 @@ void swarmkv_store_set_exec_cmd_handle(struct swarmkv_module *mod_store, struct struct store_info { long long keys; - long long shards; long long keys_to_sync; long long sync_ok; long long sync_err; diff --git a/src/swarmkv.c b/src/swarmkv.c index 05746d6..314f036 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -39,7 +39,7 @@ #include <ctype.h> #include <stdatomic.h> -const char* SWARMKV_VERSION="3.0.0"; +const char* SWARMKV_VERSION="4.0.0"; #define MODULE_SWAMRKV_CORE module_name_str("swarmkv.core") @@ -247,12 +247,36 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw clock_gettime(CLOCK_MONOTONIC, &now_monotonic); char uuid_str[37]; uuid_unparse(db->opts->bin_uuid, uuid_str); + sds pending_cmds=sdsnew("["); + size_t rpc_cnt=0; + for(int i=0; i<db->opts->total_threads; i++) + { + rpc_cnt=swarmkv_rpc_mgr_count(db->rpc_mgr, i); + pending_cmds=sdscatprintf(pending_cmds, "%zu", rpc_cnt); + if(i==db->opts->nr_worker_threads-1) + { + pending_cmds=sdscatprintf(pending_cmds, "], ["); + } + else if(i==db->opts->total_threads-1) + { + pending_cmds=sdscatprintf(pending_cmds, "]"); + } + else + { + pending_cmds=sdscatprintf(pending_cmds, ", "); + } + } + struct swarmkv_rpc_mgr_info rpc_info; + swarmkv_rpc_mgr_info(db->rpc_mgr, &rpc_info); snprintf(node_info_buff, sizeof(node_info_buff), "# Node\r\n" "swarmkv_version: %s\r\n" "address: %s\r\n" "uuid: %s\r\n" "worker_threads: %d\r\n" "caller_threads: %d\r\n" + "pending_cmds: %s\r\n" + "timed_out_cmds: %lld\r\n" + "unknown_sequence: %lld\r\n" "server_time_usec: %lld\r\n" "up_time_in_seconds: %ld\r\n" "up_time_in_days: %ld\r\n" @@ -262,14 +286,17 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw uuid_str, db->opts->nr_worker_threads, db->opts->nr_caller_threads, + pending_cmds, + rpc_info.timed_out_rpcs, + rpc_info.unknown_sequence, server_time_us, now_monotonic.tv_sec-db->boot_time.tv_sec, (now_monotonic.tv_sec-db->boot_time.tv_sec)/(3600*24) ); + sdsfree(pending_cmds); struct store_info sto_info; swarmkv_store_info(db->mod_store, &sto_info); snprintf(store_info_buff, sizeof(store_info_buff), "# Store\r\n" - "shards: %lld\r\n" "keys: %lld\r\n" "to_sync: %lld\r\n" "synced: %lld\r\n" @@ -277,7 +304,6 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw "sync_err: %lld\r\n" "sync_interval_in_msec: %.3f\r\n" , - sto_info.shards, sto_info.keys, sto_info.keys_to_sync, sto_info.synced, @@ -488,6 +514,7 @@ struct swarmkv_cmd_spec *get_spec_by_argv(struct swarmkv *db, size_t argc, char* void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg) { struct swarmkv_thread *thr=(struct swarmkv_thread *)arg; + swarmkv_store_periodic(thr->db->mod_store, thr->thread_id); swarmkv_keyspace_periodic(thr->db->mod_keyspace, thr->thread_id); } @@ -791,6 +818,7 @@ static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd * } return tid; } +#define MONITOR_INTER_THREAD_RPC "inter-thread-rpc" void __on_msg_callback(struct swarmkv_msg *msg, void *arg) { struct swarmkv *db = (struct swarmkv *)arg; @@ -802,7 +830,7 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg) exec_for_remote(db, msg); swarmkv_msg_free(msg); } - else + else//MSG_TYPE_REPLY { if(msg->caller_tid!=cur_tid) { @@ -811,7 +839,19 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg) } else { - swarmkv_rpc_complete(db->rpc_mgr, cur_tid, msg->sequence, msg->reply); + long long latency_us=-1; + latency_us=swarmkv_rpc_complete(db->rpc_mgr, cur_tid, msg->sequence, msg->reply); + if(latency_us>=0) + { + if(node_compare(&db->self, &msg->executor)) + { + swarmkv_monitor_record_peer(db->mod_monitor, &msg->executor, latency_us, cur_tid); + } + else + { + swarmkv_monitor_record_event(db->mod_monitor, MONITOR_INTER_THREAD_RPC, latency_us); + } + } swarmkv_msg_free(msg); } } @@ -1330,10 +1370,13 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, db->ref_evbases[i]=db->threads[i].evbase; } event_config_free(ev_cfg); + + db->mod_monitor=swarmkv_monitor_new(db->opts); + db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->ref_evbases, opts->total_threads); db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger); swarmkv_mesh_set_on_msg_cb(db->mesh, __on_msg_callback, db); - db->mod_monitor=swarmkv_monitor_new(db->opts); + //Note: if the cluster_port is 0, swarmkv_net_new updates db->self.cluster_port. db->net=swarmkv_net_new(db->ref_evbases, db->opts->nr_worker_threads, opts, db->logger, err); @@ -1365,9 +1408,9 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, struct swarmkv_cmd_spec *spec=NULL, *tmp_spec=NULL; HASH_ITER(hh, db->command_table, spec, tmp_spec) { - swarmkv_monitor_register_commands(db->mod_monitor, spec->name); + swarmkv_monitor_register_command(db->mod_monitor, spec->name); } - + swarmkv_monitor_register_event(db->mod_monitor, MONITOR_INTER_THREAD_RPC); swarmkv_threads_run(db); swarmkv_keyspace_start(db->mod_keyspace); diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index 6647f75..6876863 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -1069,10 +1069,12 @@ void swarmkv_keyspace_set_exec_cmd_handle(struct swarmkv_module *mod_keyspace, s ks->exec_cmd_handle=db; return; } +#define MONITOR_KEY_EXPIRE_EVENT_EXPIRE "keyspace-expire-cycle" void swarmkv_keyspace_set_monitor_handle(struct swarmkv_module *mod_keyspace, struct swarmkv_module *mod_monitor) { struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace); ks->mod_monitor=mod_monitor; + swarmkv_monitor_register_event(mod_monitor, MONITOR_KEY_EXPIRE_EVENT_EXPIRE); return; } diff --git a/src/swarmkv_message.c b/src/swarmkv_message.c index 663dd1c..8db7ac9 100644 --- a/src/swarmkv_message.c +++ b/src/swarmkv_message.c @@ -183,7 +183,7 @@ static void swarmkv_cmd_serialize(const struct swarmkv_cmd *cmd, char **blob, si *blob_sz=root_mpack_sz; return; } -struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, const node_t *executer, long long sequence) +struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, const node_t *executor, long long sequence) { struct swarmkv_msg *msg=ALLOC(struct swarmkv_msg, 1); msg->magic=SWARMKV_MSG_MAGIC; @@ -191,11 +191,11 @@ struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const msg->caller_tid=caller_tid; msg->sequence=sequence; node_copy(&msg->caller, caller); - if(executer) node_copy(&msg->executor, executer); + if(executor) node_copy(&msg->executor, executor); msg->cmd=swarmkv_cmd_dup(cmd); return msg; } -struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, const node_t *executer, long long sequence) +struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, const node_t *executor, long long sequence) { struct swarmkv_msg *msg=ALLOC(struct swarmkv_msg, 1); msg->magic=SWARMKV_MSG_MAGIC; @@ -203,7 +203,7 @@ struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, msg->caller_tid=caller_tid; msg->sequence=sequence; node_copy(&msg->caller, caller); - node_copy(&msg->executor, executer); + node_copy(&msg->executor, executor); msg->reply=swarmkv_reply_dup(reply); return msg; } diff --git a/src/swarmkv_monitor.c b/src/swarmkv_monitor.c index f21786c..6b10e7f 100644 --- a/src/swarmkv_monitor.c +++ b/src/swarmkv_monitor.c @@ -51,7 +51,6 @@ struct recorder_metric long long min; double stddev, mean; long long p50, p80, p90, p95, p99; - double rate; }; size_t metric2string(const struct recorder_metric *metric, char *buff, size_t buff_sz) { @@ -64,8 +63,7 @@ size_t metric2string(const struct recorder_metric *metric, char *buff, size_t bu "min_usec=%lld " "mean_usec=%.2f " "stddev=%.2f " - "count=%lld " - "rate=%.2f" + "count=%lld" , metric->p50, metric->p80, @@ -76,44 +74,57 @@ size_t metric2string(const struct recorder_metric *metric, char *buff, size_t bu metric->min, metric->mean, metric->stddev, - metric->total_count, - metric->rate + metric->total_count ); } -void recorder_sample(struct recorder *recorder, struct recorder_metric *metric) +void hdr_to_metric(const struct hdr_histogram *hdr, const char *key, struct recorder_metric *metric) { memset(metric, 0, sizeof(struct recorder_metric)); + strncpy(metric->key, key, sizeof(metric->key)); + metric->total_count=hdr->total_count; + if(metric->total_count==0) return; + metric->p50=hdr_value_at_percentile(hdr, 0.5); + metric->p80=hdr_value_at_percentile(hdr, 0.8); + metric->p90=hdr_value_at_percentile(hdr, 0.9); + metric->p95=hdr_value_at_percentile(hdr, 0.95); + metric->p99=hdr_value_at_percentile(hdr, 0.99); + metric->total_count=hdr->total_count; + metric->max=hdr_max(hdr); + metric->mean=hdr_mean(hdr); + metric->min=hdr_min(hdr); + metric->stddev=hdr_stddev(hdr); + return; +} +void recorder_commit(struct recorder *recorder) +{ recorder->hdr_previous=hdr_interval_recorder_sample_and_recycle(&recorder->hdr_interval, recorder->hdr_previous); - long long recent_total_count=recorder->hdr_previous->total_count; long long __attribute__((__unused__))dropped=hdr_add(recorder->hdr_all_time, recorder->hdr_previous); assert(dropped==0); - strncpy(metric->key, recorder->key, sizeof(metric->key)); - metric->total_count=recorder->hdr_all_time->total_count; - if(metric->total_count==0) return; - metric->p50=hdr_value_at_percentile(recorder->hdr_all_time, 0.5); - metric->p80=hdr_value_at_percentile(recorder->hdr_all_time, 0.8); - metric->p90=hdr_value_at_percentile(recorder->hdr_all_time, 0.9); - metric->p95=hdr_value_at_percentile(recorder->hdr_all_time, 0.95); - metric->p99=hdr_value_at_percentile(recorder->hdr_all_time, 0.99); - metric->total_count=recorder->hdr_all_time->total_count; - metric->max=hdr_max(recorder->hdr_all_time); - metric->mean=hdr_mean(recorder->hdr_all_time); - metric->min=hdr_min(recorder->hdr_all_time); - metric->stddev=hdr_stddev(recorder->hdr_all_time); - struct timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); - double diff_time_ms=(now.tv_sec-recorder->last_sampled.tv_sec)*1000+(now.tv_nsec-recorder->last_sampled.tv_nsec)/1000/1000; - diff_time_ms=MAX(diff_time_ms, 1); - metric->rate=(double)(recent_total_count)*1000/diff_time_ms; - clock_gettime(CLOCK_MONOTONIC, &(recorder->last_sampled)); return; } +void recorder_sample(struct recorder *recorder, struct recorder_metric *metric) +{ + recorder_commit(recorder); + hdr_to_metric(recorder->hdr_all_time, recorder->key, metric); + return; +} +void recorder_sample_n(struct recorder **recorder, size_t n_recorder, long long max_latency_usec, struct recorder_metric *metric) +{ + struct hdr_histogram *hdr_merged; + hdr_init(1, max_latency_usec, 2, &hdr_merged); + for(size_t i=0; i<n_recorder; i++) + { + recorder_commit(recorder[i]); + hdr_add(hdr_merged, recorder[i]->hdr_all_time); + } + hdr_to_metric(hdr_merged, recorder[0]->key, metric); + hdr_close(hdr_merged); +} void recorder_reset(struct recorder *recorder) { recorder->hdr_previous=hdr_interval_recorder_sample_and_recycle(&recorder->hdr_interval, recorder->hdr_previous); hdr_reset(recorder->hdr_previous); hdr_reset(recorder->hdr_all_time); - clock_gettime(CLOCK_MONOTONIC, &(recorder->last_sampled)); return; } void recorder_record(struct recorder *recorder, long long latency_usec, long long corrected) @@ -149,6 +160,29 @@ void recorder_table_reset(struct recorder **table) } return; } +void recorder_table_merge(struct recorder **src_tables, size_t n_table, struct recorder **dst_table) +{ + struct recorder *dst_recorder=NULL; + struct recorder *src_recorder=NULL, *tmp=NULL; + for(size_t i=0; i<n_table; i++) + { + src_recorder=NULL; + tmp=NULL; + HASH_ITER(hh, src_tables[i], src_recorder, tmp) + { + + HASH_FIND(hh, *dst_table, src_recorder->key, strlen(src_recorder->key), dst_recorder); + if(!dst_recorder) + { + dst_recorder=recorder_new(src_recorder->key, src_recorder->hdr_all_time->highest_trackable_value); + HASH_ADD_KEYPTR(hh, *dst_table, dst_recorder->key, strlen(dst_recorder->key), dst_recorder); + } + recorder_commit(src_recorder); + hdr_add(dst_recorder->hdr_all_time, src_recorder->hdr_all_time); + } + } + return; +} size_t recorder_table_count(struct recorder **table) { return HASH_COUNT(*table); @@ -179,6 +213,15 @@ void recorder_table_sample_all(struct recorder **table, struct recorder_metric * } return; } +void recorder_table_free(struct recorder **table) +{ + struct recorder *recorder=NULL, *tmp=NULL; + HASH_ITER(hh, *table, recorder, tmp) + { + HASH_DEL(*table, recorder); + recorder_free(recorder); + } +} struct swarmkv_reply *recorder_metric_to_reply(const struct recorder_metric *metric) { char metric_buff[1024]; @@ -192,12 +235,12 @@ struct swarmkv_monitor { struct swarmkv_module module; struct recorder *commands; - pthread_mutex_t lock_event_recorder; struct recorder *events; struct recorder **peers; long long max_latency_usec; int significant_figures; int nr_worker_threads; + pthread_mutex_t mutex;//only one latency command can be executed at a time }; struct swarmkv_monitor *module2monitor(struct swarmkv_module *module) @@ -213,12 +256,12 @@ struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts) monitor->max_latency_usec=opts->cluster_timeout_us; monitor->nr_worker_threads=opts->nr_worker_threads; monitor->peers=ALLOC(struct recorder *, monitor->nr_worker_threads); - pthread_mutex_init(&monitor->lock_event_recorder, NULL); strncpy(monitor->module.name, "monitor", sizeof(monitor->module.name)); monitor->module.mod_ctx=monitor; + pthread_mutex_init(&monitor->mutex, NULL); return &monitor->module; } -void swarmkv_monitor_register_commands(struct swarmkv_module *mod_monitor, const char *command) +void swarmkv_monitor_register_command(struct swarmkv_module *mod_monitor, const char *command) { struct swarmkv_monitor *monitor=module2monitor(mod_monitor); struct recorder *recorder=NULL; @@ -226,27 +269,22 @@ void swarmkv_monitor_register_commands(struct swarmkv_module *mod_monitor, const HASH_ADD_KEYPTR(hh, monitor->commands, recorder->key, strlen(recorder->key), recorder); return; } +void swarmkv_monitor_register_event(struct swarmkv_module *mod_monitor, const char *event) +{ + struct swarmkv_monitor *monitor=module2monitor(mod_monitor); + struct recorder *recorder=NULL; + recorder=recorder_new(event, monitor->max_latency_usec); + HASH_ADD_KEYPTR(hh, monitor->events, recorder->key, strlen(recorder->key), recorder); + return; +} void swarmkv_monitor_free(struct swarmkv_module *mod_monitor) { struct swarmkv_monitor *monitor=module2monitor(mod_monitor); - struct recorder *recorder=NULL, *tmp=NULL; - HASH_ITER(hh, monitor->commands, recorder, tmp) - { - HASH_DEL(monitor->commands, recorder); - recorder_free(recorder); - } - HASH_ITER(hh, monitor->events, recorder, tmp) - { - HASH_DEL(monitor->events, recorder); - recorder_free(recorder); - } + recorder_table_free(&monitor->commands); + recorder_table_free(&monitor->events); for(size_t i=0; i<monitor->nr_worker_threads; i++) { - HASH_ITER(hh, monitor->peers[i], recorder, tmp) - { - HASH_DEL(monitor->peers[i], recorder); - recorder_free(recorder); - } + recorder_table_free(monitor->peers+i); } free(monitor->peers); monitor->peers=NULL; @@ -261,19 +299,17 @@ void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const ch void swarmkv_monitor_record_peer(struct swarmkv_module *mod_monitor, node_t *peer, long long latency_usec, int thread_id) { struct swarmkv_monitor *monitor=module2monitor(mod_monitor); + assert(thread_id<monitor->nr_worker_threads); recorder_table_record_latency(monitor->peers+thread_id, peer->addr, strlen(peer->addr), latency_usec, monitor->max_latency_usec, 1); return; } void swarmkv_monitor_record_event(struct swarmkv_module *mod_monitor, const char *event_name, long long latency_usec) { struct swarmkv_monitor *monitor=module2monitor(mod_monitor); - - pthread_mutex_lock(&monitor->lock_event_recorder); - recorder_table_record_latency(&monitor->events, event_name, strlen(event_name), latency_usec, monitor->max_latency_usec, 1); - pthread_mutex_unlock(&monitor->lock_event_recorder); + recorder_table_record_latency(&monitor->events, event_name, strlen(event_name), latency_usec, monitor->max_latency_usec, 0); return; } -struct swarmkv_reply *lantency_generic(struct recorder **table, const char *key) +struct swarmkv_reply *latency_generic(struct recorder **table, const char *key) { struct swarmkv_reply *reply=NULL; struct recorder_metric metric; @@ -308,6 +344,7 @@ struct swarmkv_reply *lantency_generic(struct recorder **table, const char *key) enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { struct swarmkv_monitor *monitor=module2monitor(mod_monitor); + pthread_mutex_lock(&monitor->mutex); if(cmd->argc==2 && !strcasecmp(cmd->argv[1], "help") ) { const char *help = { @@ -325,22 +362,19 @@ enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const s } else if(!strcasecmp(cmd->argv[1], "command")) { - *reply=lantency_generic(&monitor->commands, cmd->argc>2?cmd->argv[2]:NULL); + *reply=latency_generic(&monitor->commands, cmd->argc>2?cmd->argv[2]:NULL); } else if(!strcasecmp(cmd->argv[1], "event")) { - pthread_mutex_lock(&monitor->lock_event_recorder); - *reply=lantency_generic(&monitor->events, cmd->argc>2?cmd->argv[2]:NULL); - pthread_mutex_unlock(&monitor->lock_event_recorder); + *reply=latency_generic(&monitor->events, cmd->argc>2?cmd->argv[2]:NULL); } else if(!strcasecmp(cmd->argv[1], "peer")) { - struct swarmkv_reply *tmp=NULL; - for(size_t i=0; i<monitor->nr_worker_threads; i++) - { - tmp=lantency_generic(monitor->peers+i, cmd->argc>2?cmd->argv[2]:NULL); - swarmkv_reply_merge_array(reply, tmp); - } + struct recorder *merged_table=NULL; + recorder_table_merge(monitor->peers, monitor->nr_worker_threads, &merged_table); + *reply=latency_generic(&merged_table, cmd->argc>2?cmd->argv[2]:NULL); + recorder_table_free(&merged_table); + } else if(!strcasecmp(cmd->argv[1], "reset")) { @@ -384,6 +418,6 @@ enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const s { *reply=swarmkv_reply_new_error(erorr_subcommand_syntax, cmd->argv[1], cmd->argv[0]); } - + pthread_mutex_unlock(&monitor->mutex); return FINISHED; }
\ No newline at end of file diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c index c34ec9a..41a24c2 100644 --- a/src/swarmkv_rpc.c +++ b/src/swarmkv_rpc.c @@ -95,6 +95,7 @@ long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct struct swarmkv_rpc_thr *thr=mgr->threads+thread_id; struct swarmkv_rpc *rpc=ALLOC(struct swarmkv_rpc, 1); rpc->sequence=__atomic_add_fetch(&mgr->seq_generator, 1, __ATOMIC_SEQ_CST); + clock_gettime(CLOCK_REALTIME, &rpc->start); //struct timeval timeout={mgr->timeout_us/(1000*1000), mgr->timeout_us%(1000*1000)}; rpc->timeout_ev=event_new(thr->evbase, -1, 0, rpc_timeout_callback, rpc); event_priority_set(rpc->timeout_ev, 1); @@ -105,21 +106,33 @@ long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct HASH_ADD(hh, thr->rpc_table, sequence, sizeof(rpc->sequence), rpc); return rpc->sequence; } -void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *reply) +long long swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *reply) { struct swarmkv_rpc *rpc=NULL; + long long latency=-1; HASH_FIND(hh, mgr->threads[thread_id].rpc_table, &sequence, sizeof(sequence), rpc); if(rpc==NULL) { mgr->unknown_sequence++; - return; + return -1; } + struct timespec end; + clock_gettime(CLOCK_REALTIME, &end); + latency=timespec_diff_usec(&rpc->start, &end); + struct promise *p=future_to_promise(rpc->f); promise_success(p, reply); swarmkv_rpc_free(rpc); + return latency; } size_t swarmkv_rpc_mgr_count(struct swarmkv_rpc_mgr *mgr, int thread_id) { assert(thread_id>=0 && thread_id<mgr->nr_worker_threads); return HASH_COUNT(mgr->threads[thread_id].rpc_table); +} +void swarmkv_rpc_mgr_info(struct swarmkv_rpc_mgr *mgr, struct swarmkv_rpc_mgr_info *info) +{ + info->timed_out_rpcs=mgr->timed_out_rpcs; + info->unknown_sequence=mgr->unknown_sequence; + return; }
\ No newline at end of file diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 055135a..72551c1 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -484,7 +484,7 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) store_get_uuid(&(ctx->store->module), uuid); - long long error_before=ctx->store->sync_err; + __attribute__ ((unused)) long long error_before=ctx->store->sync_err; if(reply->type==SWARMKV_REPLY_ERROR) { if(strcasestr(reply->str, "timed out")) @@ -554,8 +554,9 @@ void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struc swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, cmd->argc, cmd->argv); return; } +#define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle" #define MAX_SYNC_PER_PERIOD 100000 -void store_batch_sync(struct swarmkv_store *store, int tid) +int store_batch_sync(struct swarmkv_store *store, int tid) { int n_synced=0; struct swarmkv_store_thread *thr=&store->threads[tid]; @@ -605,6 +606,7 @@ void store_batch_sync(struct swarmkv_store *store, int tid) cmd=NULL; } sync_master_free(sync_master); + return n_synced; } void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) { @@ -620,7 +622,7 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) thr->calls++; if(store->opts->batch_sync_enabled) { - store_batch_sync(store, real_tid); + n_synced=store_batch_sync(store, real_tid); } else { @@ -656,7 +658,7 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) if(n_synced) { - swarmkv_monitor_record_event(store->mod_monitor, "crdt-sync-cycle", timespec_diff_usec(&start, &end)); + swarmkv_monitor_record_event(store->mod_monitor, MONITOR_SYNC_EVENT_NAME, timespec_diff_usec(&start, &end)); } } @@ -702,11 +704,11 @@ void swarmkv_store_set_monitor_handle(struct swarmkv_module *mod_store, struct s { struct swarmkv_store *store=module2store(mod_store); store->mod_monitor=mod_monitor; + swarmkv_monitor_register_event(mod_monitor, MONITOR_SYNC_EVENT_NAME); } void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *info) { struct swarmkv_store *store=module2store(mod_store); - info->shards=STORE_SHARD_NUM; info->keys=0; info->keys_to_sync=0; struct swarmkv_store_thread *thread=NULL; @@ -714,7 +716,7 @@ void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *inf { thread = store->threads+i; info->keys_to_sync += __sync_add_and_fetch(&thread->keys_to_sync, 0); - info->keys_to_sync += __sync_add_and_fetch(&thread->n_keys, 0); + info->keys += __sync_add_and_fetch(&thread->n_keys, 0); } info->sync_ok=__sync_add_and_fetch(&store->sync_ok, 0); info->sync_err=__sync_add_and_fetch(&store->sync_err, 0); |
