1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
#include "swarmkv_rpc.h"
#include "swarmkv_utils.h"
#include "uthash.h"
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
struct swarmkv_rpc
{
unsigned long long sequence;
struct event *timeout_ev;
struct future *f;
struct timespec start;
node_t target_peer; // For inter-node rpc
int thread_id;
struct swarmkv_rpc_mgr *ref_mgr;
UT_hash_handle hh;
};
struct swarmkv_rpc_thr
{
struct swarmkv_rpc *rpc_table;
struct event_base *evbase; // reference to db->evbases[i]
const struct timeval *common_timeout;
};
struct swarmkv_rpc_mgr
{
long long seq_generator;
int nr_worker_threads;
unsigned int default_timeout_us;
struct swarmkv_rpc_thr *threads;
// stats
long long timed_out_rpcs;
long long unknown_sequence;
};
void swarmkv_rpc_free(struct swarmkv_rpc *rpc)
{
event_del(rpc->timeout_ev);
event_free(rpc->timeout_ev);
struct swarmkv_rpc_thr *thr = rpc->ref_mgr->threads + rpc->thread_id;
HASH_DELETE(hh, thr->rpc_table, rpc);
free(rpc);
}
struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(struct event_base *evbases[], int nr_threads, long long default_timeout_us)
{
struct swarmkv_rpc_mgr *mgr = ALLOC(struct swarmkv_rpc_mgr, 1);
mgr->seq_generator = 0;
mgr->nr_worker_threads = nr_threads;
mgr->default_timeout_us = default_timeout_us;
mgr->threads = ALLOC(struct swarmkv_rpc_thr, nr_threads);
struct timeval duration = {mgr->default_timeout_us / (1000 * 1000), mgr->default_timeout_us % (1000 * 1000)};
struct swarmkv_rpc_thr *thr = NULL;
for (int i = 0; i < nr_threads; i++)
{
thr = mgr->threads + i;
thr->evbase = evbases[i];
thr->rpc_table = NULL;
thr->common_timeout = event_base_init_common_timeout(evbases[i], &duration);
}
return mgr;
}
void swarmkv_rpc_mgr_free(struct swarmkv_rpc_mgr *mgr)
{
struct swarmkv_rpc *rpc = NULL, *tmp = NULL;
struct swarmkv_rpc_thr *thr = NULL;
for (int i = 0; i < mgr->nr_worker_threads; i++)
{
thr = mgr->threads + i;
HASH_ITER(hh, thr->rpc_table, rpc, tmp)
{
struct promise *p = future_to_promise(rpc->f);
promise_failed(p, FUTURE_ERROR_CANCEL, NULL);
swarmkv_rpc_free(rpc);
}
}
free(mgr->threads);
free(mgr);
}
static void rpc_timeout_callback(evutil_socket_t fd, short events, void *arg)
{
struct swarmkv_rpc *rpc = (struct swarmkv_rpc *)arg;
rpc->ref_mgr->timed_out_rpcs++;
struct promise *p = future_to_promise(rpc->f);
char error_str[128];
if (!node_is_empty(&rpc->target_peer))
{
snprintf(error_str, sizeof(error_str), "peer %s timed out", rpc->target_peer.addr);
}
else
{
snprintf(error_str, sizeof(error_str), "inter-thread rpc timed out");
}
promise_failed(p, FUTURE_ERROR_TIMEOUT, error_str);
swarmkv_rpc_free(rpc);
}
struct swarmkv_rpc *swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f)
{
struct swarmkv_rpc_thr *thr = mgr->threads + thread_id;
struct swarmkv_rpc *rpc = ALLOC(struct swarmkv_rpc, 1);
rpc->sequence = __atomic_add_fetch(&mgr->seq_generator, 1, __ATOMIC_SEQ_CST);
clock_gettime(CLOCK_REALTIME, &rpc->start);
// struct timeval timeout={mgr->timeout_us/(1000*1000), mgr->timeout_us%(1000*1000)};
rpc->timeout_ev = event_new(thr->evbase, -1, 0, rpc_timeout_callback, rpc);
event_priority_set(rpc->timeout_ev, 1);
event_add(rpc->timeout_ev, thr->common_timeout);
rpc->thread_id = thread_id;
rpc->ref_mgr = mgr;
rpc->f = f;
HASH_ADD(hh, thr->rpc_table, sequence, sizeof(rpc->sequence), rpc);
return rpc;
}
long long swarmkv_rpc_get_sequence(const struct swarmkv_rpc *rpc)
{
return rpc->sequence;
}
void swarmkv_rpc_set_peer(struct swarmkv_rpc *rpc, const node_t *peer)
{
node_copy(&rpc->target_peer, peer);
return;
}
void swarmkv_rpc_set_timeout(struct swarmkv_rpc *rpc, long long timeout_us)
{
struct timeval timeout = {timeout_us / (1000 * 1000), timeout_us % (1000 * 1000)};
event_del(rpc->timeout_ev);
event_add(rpc->timeout_ev, &timeout);
return;
}
long long swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *reply)
{
struct swarmkv_rpc *rpc = NULL;
long long latency = -1;
HASH_FIND(hh, mgr->threads[thread_id].rpc_table, &sequence, sizeof(sequence), rpc);
if (rpc == NULL)
{
mgr->unknown_sequence++;
return -1;
}
struct timespec end;
clock_gettime(CLOCK_REALTIME, &end);
latency = timespec_diff_usec(&rpc->start, &end);
struct promise *p = future_to_promise(rpc->f);
promise_success(p, reply);
swarmkv_rpc_free(rpc);
return latency;
}
long long swarmkv_rpc_mgr_count(struct swarmkv_rpc_mgr *mgr, int thread_id)
{
assert(thread_id >= 0 && thread_id < mgr->nr_worker_threads);
return HASH_COUNT(mgr->threads[thread_id].rpc_table);
}
void swarmkv_rpc_mgr_info(struct swarmkv_rpc_mgr *mgr, struct swarmkv_rpc_mgr_info *info)
{
info->timed_out_rpcs = mgr->timed_out_rpcs;
info->unknown_sequence = mgr->unknown_sequence;
return;
}
|