summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-08-25 22:16:17 +0800
committerZheng Chao <[email protected]>2023-08-25 22:16:17 +0800
commit2768d7affe693557ed75de54777862c4aee7c37c (patch)
tree3a90a2d53da5f32a65935d20353879152efaef62 /src
parent9ae4145fa7ee5a59c8920bcf40fa322f6617f577 (diff)
The `LATENCY` command works again. The `INFO` command now returns RPC information.
Diffstat (limited to 'src')
-rw-r--r--src/inc_internal/swarmkv_message.h4
-rw-r--r--src/inc_internal/swarmkv_monitor.h3
-rw-r--r--src/inc_internal/swarmkv_rpc.h12
-rw-r--r--src/inc_internal/swarmkv_store.h1
-rw-r--r--src/swarmkv.c59
-rw-r--r--src/swarmkv_keyspace.c2
-rw-r--r--src/swarmkv_message.c8
-rw-r--r--src/swarmkv_monitor.c158
-rw-r--r--src/swarmkv_rpc.c17
-rw-r--r--src/swarmkv_store.c14
10 files changed, 190 insertions, 88 deletions
diff --git a/src/inc_internal/swarmkv_message.h b/src/inc_internal/swarmkv_message.h
index f1ad7e8..830a323 100644
--- a/src/inc_internal/swarmkv_message.h
+++ b/src/inc_internal/swarmkv_message.h
@@ -26,8 +26,8 @@ struct swarmkv_msg
};
#define SWARMKV_MSG_HDR_SIZE offsetof(struct swarmkv_msg, payload)
-struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, const node_t *executer, long long sequence);
-struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, const node_t *executer, long long sequence);
+struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, const node_t *executor, long long sequence);
+struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, const node_t *executor, long long sequence);
void swarmkv_msg_free(struct swarmkv_msg *msg);
//on_msg_callback_t transfers ownership of msg to the callback function
diff --git a/src/inc_internal/swarmkv_monitor.h b/src/inc_internal/swarmkv_monitor.h
index ac7b387..67273ea 100644
--- a/src/inc_internal/swarmkv_monitor.h
+++ b/src/inc_internal/swarmkv_monitor.h
@@ -7,9 +7,10 @@
#include <string.h>
struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts);
-void swarmkv_monitor_register_commands(struct swarmkv_module *mod_monitor, const char *command);
+void swarmkv_monitor_register_command(struct swarmkv_module *mod_monitor, const char *command);
void swarmkv_monitor_free(struct swarmkv_module *mod_monitor);
void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const char *cmd_name, long long latency_usec);
+void swarmkv_monitor_register_event(struct swarmkv_module *mod_monitor, const char *event);
void swarmkv_monitor_record_peer(struct swarmkv_module *mod_monitor, node_t *peer, long long latency_usec, int thread_id);
void swarmkv_monitor_record_event(struct swarmkv_module *mod_monitor, const char *event_name, long long latency_usec);
diff --git a/src/inc_internal/swarmkv_rpc.h b/src/inc_internal/swarmkv_rpc.h
index bd4a0a7..7ad95ad 100644
--- a/src/inc_internal/swarmkv_rpc.h
+++ b/src/inc_internal/swarmkv_rpc.h
@@ -1,6 +1,7 @@
#pragma once
#include "swarmkv/swarmkv.h"
#include "swarmkv_common.h"
+#include "swarmkv_cmd_spec.h"
#include "future_promise.h"
struct swarmkv_rpc_mgr;
@@ -8,5 +9,12 @@ struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts,
void swarmkv_rpc_mgr_free(struct swarmkv_rpc_mgr *mgr);
//Return a sequence number, which can be used to complete the request
long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f);
-void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *response);
-size_t swarmkv_rpc_mgr_count(struct swarmkv_rpc_mgr *mgr, int thread_id); \ No newline at end of file
+//Return latency in microseconds, or -1 if failed to complete the rpc.
+long long swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *response);
+size_t swarmkv_rpc_mgr_count(struct swarmkv_rpc_mgr *mgr, int thread_id);
+struct swarmkv_rpc_mgr_info
+{
+ long long timed_out_rpcs;
+ long long unknown_sequence;
+};
+void swarmkv_rpc_mgr_info(struct swarmkv_rpc_mgr *mgr, struct swarmkv_rpc_mgr_info *info); \ No newline at end of file
diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h
index 224e225..29962e1 100644
--- a/src/inc_internal/swarmkv_store.h
+++ b/src/inc_internal/swarmkv_store.h
@@ -52,7 +52,6 @@ void swarmkv_store_set_exec_cmd_handle(struct swarmkv_module *mod_store, struct
struct store_info
{
long long keys;
- long long shards;
long long keys_to_sync;
long long sync_ok;
long long sync_err;
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 05746d6..314f036 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -39,7 +39,7 @@
#include <ctype.h>
#include <stdatomic.h>
-const char* SWARMKV_VERSION="3.0.0";
+const char* SWARMKV_VERSION="4.0.0";
#define MODULE_SWAMRKV_CORE module_name_str("swarmkv.core")
@@ -247,12 +247,36 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
clock_gettime(CLOCK_MONOTONIC, &now_monotonic);
char uuid_str[37];
uuid_unparse(db->opts->bin_uuid, uuid_str);
+ sds pending_cmds=sdsnew("[");
+ size_t rpc_cnt=0;
+ for(int i=0; i<db->opts->total_threads; i++)
+ {
+ rpc_cnt=swarmkv_rpc_mgr_count(db->rpc_mgr, i);
+ pending_cmds=sdscatprintf(pending_cmds, "%zu", rpc_cnt);
+ if(i==db->opts->nr_worker_threads-1)
+ {
+ pending_cmds=sdscatprintf(pending_cmds, "], [");
+ }
+ else if(i==db->opts->total_threads-1)
+ {
+ pending_cmds=sdscatprintf(pending_cmds, "]");
+ }
+ else
+ {
+ pending_cmds=sdscatprintf(pending_cmds, ", ");
+ }
+ }
+ struct swarmkv_rpc_mgr_info rpc_info;
+ swarmkv_rpc_mgr_info(db->rpc_mgr, &rpc_info);
snprintf(node_info_buff, sizeof(node_info_buff), "# Node\r\n"
"swarmkv_version: %s\r\n"
"address: %s\r\n"
"uuid: %s\r\n"
"worker_threads: %d\r\n"
"caller_threads: %d\r\n"
+ "pending_cmds: %s\r\n"
+ "timed_out_cmds: %lld\r\n"
+ "unknown_sequence: %lld\r\n"
"server_time_usec: %lld\r\n"
"up_time_in_seconds: %ld\r\n"
"up_time_in_days: %ld\r\n"
@@ -262,14 +286,17 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
uuid_str,
db->opts->nr_worker_threads,
db->opts->nr_caller_threads,
+ pending_cmds,
+ rpc_info.timed_out_rpcs,
+ rpc_info.unknown_sequence,
server_time_us,
now_monotonic.tv_sec-db->boot_time.tv_sec,
(now_monotonic.tv_sec-db->boot_time.tv_sec)/(3600*24)
);
+ sdsfree(pending_cmds);
struct store_info sto_info;
swarmkv_store_info(db->mod_store, &sto_info);
snprintf(store_info_buff, sizeof(store_info_buff), "# Store\r\n"
- "shards: %lld\r\n"
"keys: %lld\r\n"
"to_sync: %lld\r\n"
"synced: %lld\r\n"
@@ -277,7 +304,6 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
"sync_err: %lld\r\n"
"sync_interval_in_msec: %.3f\r\n"
,
- sto_info.shards,
sto_info.keys,
sto_info.keys_to_sync,
sto_info.synced,
@@ -488,6 +514,7 @@ struct swarmkv_cmd_spec *get_spec_by_argv(struct swarmkv *db, size_t argc, char*
void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg)
{
struct swarmkv_thread *thr=(struct swarmkv_thread *)arg;
+
swarmkv_store_periodic(thr->db->mod_store, thr->thread_id);
swarmkv_keyspace_periodic(thr->db->mod_keyspace, thr->thread_id);
}
@@ -791,6 +818,7 @@ static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *
}
return tid;
}
+#define MONITOR_INTER_THREAD_RPC "inter-thread-rpc"
void __on_msg_callback(struct swarmkv_msg *msg, void *arg)
{
struct swarmkv *db = (struct swarmkv *)arg;
@@ -802,7 +830,7 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg)
exec_for_remote(db, msg);
swarmkv_msg_free(msg);
}
- else
+ else//MSG_TYPE_REPLY
{
if(msg->caller_tid!=cur_tid)
{
@@ -811,7 +839,19 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg)
}
else
{
- swarmkv_rpc_complete(db->rpc_mgr, cur_tid, msg->sequence, msg->reply);
+ long long latency_us=-1;
+ latency_us=swarmkv_rpc_complete(db->rpc_mgr, cur_tid, msg->sequence, msg->reply);
+ if(latency_us>=0)
+ {
+ if(node_compare(&db->self, &msg->executor))
+ {
+ swarmkv_monitor_record_peer(db->mod_monitor, &msg->executor, latency_us, cur_tid);
+ }
+ else
+ {
+ swarmkv_monitor_record_event(db->mod_monitor, MONITOR_INTER_THREAD_RPC, latency_us);
+ }
+ }
swarmkv_msg_free(msg);
}
}
@@ -1330,10 +1370,13 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
db->ref_evbases[i]=db->threads[i].evbase;
}
event_config_free(ev_cfg);
+
+ db->mod_monitor=swarmkv_monitor_new(db->opts);
+
db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->ref_evbases, opts->total_threads);
db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger);
swarmkv_mesh_set_on_msg_cb(db->mesh, __on_msg_callback, db);
- db->mod_monitor=swarmkv_monitor_new(db->opts);
+
//Note: if the cluster_port is 0, swarmkv_net_new updates db->self.cluster_port.
db->net=swarmkv_net_new(db->ref_evbases, db->opts->nr_worker_threads, opts, db->logger, err);
@@ -1365,9 +1408,9 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
struct swarmkv_cmd_spec *spec=NULL, *tmp_spec=NULL;
HASH_ITER(hh, db->command_table, spec, tmp_spec)
{
- swarmkv_monitor_register_commands(db->mod_monitor, spec->name);
+ swarmkv_monitor_register_command(db->mod_monitor, spec->name);
}
-
+ swarmkv_monitor_register_event(db->mod_monitor, MONITOR_INTER_THREAD_RPC);
swarmkv_threads_run(db);
swarmkv_keyspace_start(db->mod_keyspace);
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index 6647f75..6876863 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -1069,10 +1069,12 @@ void swarmkv_keyspace_set_exec_cmd_handle(struct swarmkv_module *mod_keyspace, s
ks->exec_cmd_handle=db;
return;
}
+#define MONITOR_KEY_EXPIRE_EVENT_EXPIRE "keyspace-expire-cycle"
void swarmkv_keyspace_set_monitor_handle(struct swarmkv_module *mod_keyspace, struct swarmkv_module *mod_monitor)
{
struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace);
ks->mod_monitor=mod_monitor;
+ swarmkv_monitor_register_event(mod_monitor, MONITOR_KEY_EXPIRE_EVENT_EXPIRE);
return;
}
diff --git a/src/swarmkv_message.c b/src/swarmkv_message.c
index 663dd1c..8db7ac9 100644
--- a/src/swarmkv_message.c
+++ b/src/swarmkv_message.c
@@ -183,7 +183,7 @@ static void swarmkv_cmd_serialize(const struct swarmkv_cmd *cmd, char **blob, si
*blob_sz=root_mpack_sz;
return;
}
-struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, const node_t *executer, long long sequence)
+struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, const node_t *executor, long long sequence)
{
struct swarmkv_msg *msg=ALLOC(struct swarmkv_msg, 1);
msg->magic=SWARMKV_MSG_MAGIC;
@@ -191,11 +191,11 @@ struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const
msg->caller_tid=caller_tid;
msg->sequence=sequence;
node_copy(&msg->caller, caller);
- if(executer) node_copy(&msg->executor, executer);
+ if(executor) node_copy(&msg->executor, executor);
msg->cmd=swarmkv_cmd_dup(cmd);
return msg;
}
-struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, const node_t *executer, long long sequence)
+struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, const node_t *executor, long long sequence)
{
struct swarmkv_msg *msg=ALLOC(struct swarmkv_msg, 1);
msg->magic=SWARMKV_MSG_MAGIC;
@@ -203,7 +203,7 @@ struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply,
msg->caller_tid=caller_tid;
msg->sequence=sequence;
node_copy(&msg->caller, caller);
- node_copy(&msg->executor, executer);
+ node_copy(&msg->executor, executor);
msg->reply=swarmkv_reply_dup(reply);
return msg;
}
diff --git a/src/swarmkv_monitor.c b/src/swarmkv_monitor.c
index f21786c..6b10e7f 100644
--- a/src/swarmkv_monitor.c
+++ b/src/swarmkv_monitor.c
@@ -51,7 +51,6 @@ struct recorder_metric
long long min;
double stddev, mean;
long long p50, p80, p90, p95, p99;
- double rate;
};
size_t metric2string(const struct recorder_metric *metric, char *buff, size_t buff_sz)
{
@@ -64,8 +63,7 @@ size_t metric2string(const struct recorder_metric *metric, char *buff, size_t bu
"min_usec=%lld "
"mean_usec=%.2f "
"stddev=%.2f "
- "count=%lld "
- "rate=%.2f"
+ "count=%lld"
,
metric->p50,
metric->p80,
@@ -76,44 +74,57 @@ size_t metric2string(const struct recorder_metric *metric, char *buff, size_t bu
metric->min,
metric->mean,
metric->stddev,
- metric->total_count,
- metric->rate
+ metric->total_count
);
}
-void recorder_sample(struct recorder *recorder, struct recorder_metric *metric)
+void hdr_to_metric(const struct hdr_histogram *hdr, const char *key, struct recorder_metric *metric)
{
memset(metric, 0, sizeof(struct recorder_metric));
+ strncpy(metric->key, key, sizeof(metric->key));
+ metric->total_count=hdr->total_count;
+ if(metric->total_count==0) return;
+ metric->p50=hdr_value_at_percentile(hdr, 0.5);
+ metric->p80=hdr_value_at_percentile(hdr, 0.8);
+ metric->p90=hdr_value_at_percentile(hdr, 0.9);
+ metric->p95=hdr_value_at_percentile(hdr, 0.95);
+ metric->p99=hdr_value_at_percentile(hdr, 0.99);
+ metric->total_count=hdr->total_count;
+ metric->max=hdr_max(hdr);
+ metric->mean=hdr_mean(hdr);
+ metric->min=hdr_min(hdr);
+ metric->stddev=hdr_stddev(hdr);
+ return;
+}
+void recorder_commit(struct recorder *recorder)
+{
recorder->hdr_previous=hdr_interval_recorder_sample_and_recycle(&recorder->hdr_interval, recorder->hdr_previous);
- long long recent_total_count=recorder->hdr_previous->total_count;
long long __attribute__((__unused__))dropped=hdr_add(recorder->hdr_all_time, recorder->hdr_previous);
assert(dropped==0);
- strncpy(metric->key, recorder->key, sizeof(metric->key));
- metric->total_count=recorder->hdr_all_time->total_count;
- if(metric->total_count==0) return;
- metric->p50=hdr_value_at_percentile(recorder->hdr_all_time, 0.5);
- metric->p80=hdr_value_at_percentile(recorder->hdr_all_time, 0.8);
- metric->p90=hdr_value_at_percentile(recorder->hdr_all_time, 0.9);
- metric->p95=hdr_value_at_percentile(recorder->hdr_all_time, 0.95);
- metric->p99=hdr_value_at_percentile(recorder->hdr_all_time, 0.99);
- metric->total_count=recorder->hdr_all_time->total_count;
- metric->max=hdr_max(recorder->hdr_all_time);
- metric->mean=hdr_mean(recorder->hdr_all_time);
- metric->min=hdr_min(recorder->hdr_all_time);
- metric->stddev=hdr_stddev(recorder->hdr_all_time);
- struct timespec now;
- clock_gettime(CLOCK_MONOTONIC, &now);
- double diff_time_ms=(now.tv_sec-recorder->last_sampled.tv_sec)*1000+(now.tv_nsec-recorder->last_sampled.tv_nsec)/1000/1000;
- diff_time_ms=MAX(diff_time_ms, 1);
- metric->rate=(double)(recent_total_count)*1000/diff_time_ms;
- clock_gettime(CLOCK_MONOTONIC, &(recorder->last_sampled));
return;
}
+void recorder_sample(struct recorder *recorder, struct recorder_metric *metric)
+{
+ recorder_commit(recorder);
+ hdr_to_metric(recorder->hdr_all_time, recorder->key, metric);
+ return;
+}
+void recorder_sample_n(struct recorder **recorder, size_t n_recorder, long long max_latency_usec, struct recorder_metric *metric)
+{
+ struct hdr_histogram *hdr_merged;
+ hdr_init(1, max_latency_usec, 2, &hdr_merged);
+ for(size_t i=0; i<n_recorder; i++)
+ {
+ recorder_commit(recorder[i]);
+ hdr_add(hdr_merged, recorder[i]->hdr_all_time);
+ }
+ hdr_to_metric(hdr_merged, recorder[0]->key, metric);
+ hdr_close(hdr_merged);
+}
void recorder_reset(struct recorder *recorder)
{
recorder->hdr_previous=hdr_interval_recorder_sample_and_recycle(&recorder->hdr_interval, recorder->hdr_previous);
hdr_reset(recorder->hdr_previous);
hdr_reset(recorder->hdr_all_time);
- clock_gettime(CLOCK_MONOTONIC, &(recorder->last_sampled));
return;
}
void recorder_record(struct recorder *recorder, long long latency_usec, long long corrected)
@@ -149,6 +160,29 @@ void recorder_table_reset(struct recorder **table)
}
return;
}
+void recorder_table_merge(struct recorder **src_tables, size_t n_table, struct recorder **dst_table)
+{
+ struct recorder *dst_recorder=NULL;
+ struct recorder *src_recorder=NULL, *tmp=NULL;
+ for(size_t i=0; i<n_table; i++)
+ {
+ src_recorder=NULL;
+ tmp=NULL;
+ HASH_ITER(hh, src_tables[i], src_recorder, tmp)
+ {
+
+ HASH_FIND(hh, *dst_table, src_recorder->key, strlen(src_recorder->key), dst_recorder);
+ if(!dst_recorder)
+ {
+ dst_recorder=recorder_new(src_recorder->key, src_recorder->hdr_all_time->highest_trackable_value);
+ HASH_ADD_KEYPTR(hh, *dst_table, dst_recorder->key, strlen(dst_recorder->key), dst_recorder);
+ }
+ recorder_commit(src_recorder);
+ hdr_add(dst_recorder->hdr_all_time, src_recorder->hdr_all_time);
+ }
+ }
+ return;
+}
size_t recorder_table_count(struct recorder **table)
{
return HASH_COUNT(*table);
@@ -179,6 +213,15 @@ void recorder_table_sample_all(struct recorder **table, struct recorder_metric *
}
return;
}
+void recorder_table_free(struct recorder **table)
+{
+ struct recorder *recorder=NULL, *tmp=NULL;
+ HASH_ITER(hh, *table, recorder, tmp)
+ {
+ HASH_DEL(*table, recorder);
+ recorder_free(recorder);
+ }
+}
struct swarmkv_reply *recorder_metric_to_reply(const struct recorder_metric *metric)
{
char metric_buff[1024];
@@ -192,12 +235,12 @@ struct swarmkv_monitor
{
struct swarmkv_module module;
struct recorder *commands;
- pthread_mutex_t lock_event_recorder;
struct recorder *events;
struct recorder **peers;
long long max_latency_usec;
int significant_figures;
int nr_worker_threads;
+ pthread_mutex_t mutex;//only one latency command can be executed at a time
};
struct swarmkv_monitor *module2monitor(struct swarmkv_module *module)
@@ -213,12 +256,12 @@ struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts)
monitor->max_latency_usec=opts->cluster_timeout_us;
monitor->nr_worker_threads=opts->nr_worker_threads;
monitor->peers=ALLOC(struct recorder *, monitor->nr_worker_threads);
- pthread_mutex_init(&monitor->lock_event_recorder, NULL);
strncpy(monitor->module.name, "monitor", sizeof(monitor->module.name));
monitor->module.mod_ctx=monitor;
+ pthread_mutex_init(&monitor->mutex, NULL);
return &monitor->module;
}
-void swarmkv_monitor_register_commands(struct swarmkv_module *mod_monitor, const char *command)
+void swarmkv_monitor_register_command(struct swarmkv_module *mod_monitor, const char *command)
{
struct swarmkv_monitor *monitor=module2monitor(mod_monitor);
struct recorder *recorder=NULL;
@@ -226,27 +269,22 @@ void swarmkv_monitor_register_commands(struct swarmkv_module *mod_monitor, const
HASH_ADD_KEYPTR(hh, monitor->commands, recorder->key, strlen(recorder->key), recorder);
return;
}
+void swarmkv_monitor_register_event(struct swarmkv_module *mod_monitor, const char *event)
+{
+ struct swarmkv_monitor *monitor=module2monitor(mod_monitor);
+ struct recorder *recorder=NULL;
+ recorder=recorder_new(event, monitor->max_latency_usec);
+ HASH_ADD_KEYPTR(hh, monitor->events, recorder->key, strlen(recorder->key), recorder);
+ return;
+}
void swarmkv_monitor_free(struct swarmkv_module *mod_monitor)
{
struct swarmkv_monitor *monitor=module2monitor(mod_monitor);
- struct recorder *recorder=NULL, *tmp=NULL;
- HASH_ITER(hh, monitor->commands, recorder, tmp)
- {
- HASH_DEL(monitor->commands, recorder);
- recorder_free(recorder);
- }
- HASH_ITER(hh, monitor->events, recorder, tmp)
- {
- HASH_DEL(monitor->events, recorder);
- recorder_free(recorder);
- }
+ recorder_table_free(&monitor->commands);
+ recorder_table_free(&monitor->events);
for(size_t i=0; i<monitor->nr_worker_threads; i++)
{
- HASH_ITER(hh, monitor->peers[i], recorder, tmp)
- {
- HASH_DEL(monitor->peers[i], recorder);
- recorder_free(recorder);
- }
+ recorder_table_free(monitor->peers+i);
}
free(monitor->peers);
monitor->peers=NULL;
@@ -261,19 +299,17 @@ void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const ch
void swarmkv_monitor_record_peer(struct swarmkv_module *mod_monitor, node_t *peer, long long latency_usec, int thread_id)
{
struct swarmkv_monitor *monitor=module2monitor(mod_monitor);
+ assert(thread_id<monitor->nr_worker_threads);
recorder_table_record_latency(monitor->peers+thread_id, peer->addr, strlen(peer->addr), latency_usec, monitor->max_latency_usec, 1);
return;
}
void swarmkv_monitor_record_event(struct swarmkv_module *mod_monitor, const char *event_name, long long latency_usec)
{
struct swarmkv_monitor *monitor=module2monitor(mod_monitor);
-
- pthread_mutex_lock(&monitor->lock_event_recorder);
- recorder_table_record_latency(&monitor->events, event_name, strlen(event_name), latency_usec, monitor->max_latency_usec, 1);
- pthread_mutex_unlock(&monitor->lock_event_recorder);
+ recorder_table_record_latency(&monitor->events, event_name, strlen(event_name), latency_usec, monitor->max_latency_usec, 0);
return;
}
-struct swarmkv_reply *lantency_generic(struct recorder **table, const char *key)
+struct swarmkv_reply *latency_generic(struct recorder **table, const char *key)
{
struct swarmkv_reply *reply=NULL;
struct recorder_metric metric;
@@ -308,6 +344,7 @@ struct swarmkv_reply *lantency_generic(struct recorder **table, const char *key)
enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
struct swarmkv_monitor *monitor=module2monitor(mod_monitor);
+ pthread_mutex_lock(&monitor->mutex);
if(cmd->argc==2 && !strcasecmp(cmd->argv[1], "help") )
{
const char *help = {
@@ -325,22 +362,19 @@ enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const s
}
else if(!strcasecmp(cmd->argv[1], "command"))
{
- *reply=lantency_generic(&monitor->commands, cmd->argc>2?cmd->argv[2]:NULL);
+ *reply=latency_generic(&monitor->commands, cmd->argc>2?cmd->argv[2]:NULL);
}
else if(!strcasecmp(cmd->argv[1], "event"))
{
- pthread_mutex_lock(&monitor->lock_event_recorder);
- *reply=lantency_generic(&monitor->events, cmd->argc>2?cmd->argv[2]:NULL);
- pthread_mutex_unlock(&monitor->lock_event_recorder);
+ *reply=latency_generic(&monitor->events, cmd->argc>2?cmd->argv[2]:NULL);
}
else if(!strcasecmp(cmd->argv[1], "peer"))
{
- struct swarmkv_reply *tmp=NULL;
- for(size_t i=0; i<monitor->nr_worker_threads; i++)
- {
- tmp=lantency_generic(monitor->peers+i, cmd->argc>2?cmd->argv[2]:NULL);
- swarmkv_reply_merge_array(reply, tmp);
- }
+ struct recorder *merged_table=NULL;
+ recorder_table_merge(monitor->peers, monitor->nr_worker_threads, &merged_table);
+ *reply=latency_generic(&merged_table, cmd->argc>2?cmd->argv[2]:NULL);
+ recorder_table_free(&merged_table);
+
}
else if(!strcasecmp(cmd->argv[1], "reset"))
{
@@ -384,6 +418,6 @@ enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const s
{
*reply=swarmkv_reply_new_error(erorr_subcommand_syntax, cmd->argv[1], cmd->argv[0]);
}
-
+ pthread_mutex_unlock(&monitor->mutex);
return FINISHED;
} \ No newline at end of file
diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c
index c34ec9a..41a24c2 100644
--- a/src/swarmkv_rpc.c
+++ b/src/swarmkv_rpc.c
@@ -95,6 +95,7 @@ long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct
struct swarmkv_rpc_thr *thr=mgr->threads+thread_id;
struct swarmkv_rpc *rpc=ALLOC(struct swarmkv_rpc, 1);
rpc->sequence=__atomic_add_fetch(&mgr->seq_generator, 1, __ATOMIC_SEQ_CST);
+ clock_gettime(CLOCK_REALTIME, &rpc->start);
//struct timeval timeout={mgr->timeout_us/(1000*1000), mgr->timeout_us%(1000*1000)};
rpc->timeout_ev=event_new(thr->evbase, -1, 0, rpc_timeout_callback, rpc);
event_priority_set(rpc->timeout_ev, 1);
@@ -105,21 +106,33 @@ long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct
HASH_ADD(hh, thr->rpc_table, sequence, sizeof(rpc->sequence), rpc);
return rpc->sequence;
}
-void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *reply)
+long long swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *reply)
{
struct swarmkv_rpc *rpc=NULL;
+ long long latency=-1;
HASH_FIND(hh, mgr->threads[thread_id].rpc_table, &sequence, sizeof(sequence), rpc);
if(rpc==NULL)
{
mgr->unknown_sequence++;
- return;
+ return -1;
}
+ struct timespec end;
+ clock_gettime(CLOCK_REALTIME, &end);
+ latency=timespec_diff_usec(&rpc->start, &end);
+
struct promise *p=future_to_promise(rpc->f);
promise_success(p, reply);
swarmkv_rpc_free(rpc);
+ return latency;
}
size_t swarmkv_rpc_mgr_count(struct swarmkv_rpc_mgr *mgr, int thread_id)
{
assert(thread_id>=0 && thread_id<mgr->nr_worker_threads);
return HASH_COUNT(mgr->threads[thread_id].rpc_table);
+}
+void swarmkv_rpc_mgr_info(struct swarmkv_rpc_mgr *mgr, struct swarmkv_rpc_mgr_info *info)
+{
+ info->timed_out_rpcs=mgr->timed_out_rpcs;
+ info->unknown_sequence=mgr->unknown_sequence;
+ return;
} \ No newline at end of file
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 055135a..72551c1 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -484,7 +484,7 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
store_get_uuid(&(ctx->store->module), uuid);
- long long error_before=ctx->store->sync_err;
+ __attribute__ ((unused)) long long error_before=ctx->store->sync_err;
if(reply->type==SWARMKV_REPLY_ERROR)
{
if(strcasestr(reply->str, "timed out"))
@@ -554,8 +554,9 @@ void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struc
swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, cmd->argc, cmd->argv);
return;
}
+#define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle"
#define MAX_SYNC_PER_PERIOD 100000
-void store_batch_sync(struct swarmkv_store *store, int tid)
+int store_batch_sync(struct swarmkv_store *store, int tid)
{
int n_synced=0;
struct swarmkv_store_thread *thr=&store->threads[tid];
@@ -605,6 +606,7 @@ void store_batch_sync(struct swarmkv_store *store, int tid)
cmd=NULL;
}
sync_master_free(sync_master);
+ return n_synced;
}
void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
{
@@ -620,7 +622,7 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
thr->calls++;
if(store->opts->batch_sync_enabled)
{
- store_batch_sync(store, real_tid);
+ n_synced=store_batch_sync(store, real_tid);
}
else
{
@@ -656,7 +658,7 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
if(n_synced)
{
- swarmkv_monitor_record_event(store->mod_monitor, "crdt-sync-cycle", timespec_diff_usec(&start, &end));
+ swarmkv_monitor_record_event(store->mod_monitor, MONITOR_SYNC_EVENT_NAME, timespec_diff_usec(&start, &end));
}
}
@@ -702,11 +704,11 @@ void swarmkv_store_set_monitor_handle(struct swarmkv_module *mod_store, struct s
{
struct swarmkv_store *store=module2store(mod_store);
store->mod_monitor=mod_monitor;
+ swarmkv_monitor_register_event(mod_monitor, MONITOR_SYNC_EVENT_NAME);
}
void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *info)
{
struct swarmkv_store *store=module2store(mod_store);
- info->shards=STORE_SHARD_NUM;
info->keys=0;
info->keys_to_sync=0;
struct swarmkv_store_thread *thread=NULL;
@@ -714,7 +716,7 @@ void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *inf
{
thread = store->threads+i;
info->keys_to_sync += __sync_add_and_fetch(&thread->keys_to_sync, 0);
- info->keys_to_sync += __sync_add_and_fetch(&thread->n_keys, 0);
+ info->keys += __sync_add_and_fetch(&thread->n_keys, 0);
}
info->sync_ok=__sync_add_and_fetch(&store->sync_ok, 0);
info->sync_err=__sync_add_and_fetch(&store->sync_err, 0);