1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
|
# Design
I list design choices here.
Following are terminologies used in Swarm KV.
| Name | Explained |
| --------- | ------------------------------------------------------------ |
| KV | Key-value pair |
| Node | Instances in a Swarm KV cluster. A swarmkv node is identified with its IP, P2P port and UUID. |
| Slot | KVs are partitioned in different slots, and keys of one table are scattered in many slots. Each slot is maintained by one node. |
| Key Owner | A key ONLY has one owner. |
| Replica | A CRDT object is replicated in many nodes, which are replicas. |
| RPC | Remote Procedure Call, used for sending command and receiving reply. |
## DHT-based KV spaces
Key-value pairs are distributed over nodes. As shown in the following picture, the key layer keeps track of the current location of every key-value pair, and the value layer stores the actual binary of key-value. This design is inspired by the NSDI'18 paper [*Elastic scaling of stateful network functions*](https://www.usenix.org/conference/nsdi18/presentation/woo).
<img src="./imgs/nsdi18-s6-dht-based-dso-space.png" alt="nsdi18-s6-dht-based-dso-space" style="zoom: 67%;" />
As shown in the above picture, a KV operation is done by following steps:
1. Node A (requester) hash(key) to slot id, lookup (slot id) in the key routing table and get the key owner, which is Node B, and then send routing request to Node B.
2. The Node B (key owner) lookup (key) in the value routing table to get the object owner, which is Node C, and then a response is sent to node A.
- if the key does not exist, the object is assigned to Node A and added to the object routing table.
3. Node A sends KV operation request to Node C.
The keyspace consistency is based on [consul](https://www.consul.io/) and its [Raft](https://raft.github.io/) implementation. As [a typical raft cluster goes up to 7 nodes](https://stackoverflow.com/questions/42422111/raft-nodes-count), so only consul server nodes participate in Raft. The consul and its [Gossip protocol](https://en.wikipedia.org/wiki/Gossip_protocol) implementation are used to track node health.
### Global key slot table
The global slot table is stored in Consul KV, the key is swarmkv/<cluster_name>/slot_table:
```
curl http://localhost:8500/v1/kv/swarmkv/<cluster_name>/slot_table?raw=true
```
The command returns a JSON array.
```json
[
{
"slot": "0-1022",
"owner": "192.168.0.1:5210"
},
{
"slot": "1023",
"owner": "192.168.0.1:5211"
}
]
```
### Cluster leader election
The slot table is maintained by the cluster leader, which is elected from cluster peers. SwarmKV uses [Consul for leader election](https://learn.hashicorp.com/tutorials/consul/application-leader-elections).
```
curl http://localhost:8500/v1/kv/swarmkv/<cluster_name>/lead?raw=true
```
```json
{
"node":"192.168.0.1:5210",
"node_uuid": "0a8ce868-6b92-449a-a44c-7f5f476236c2"
}
```
Except for node in dry run mode, cluster nodes will watch leadership changes and [run for it](https://learn.hashicorp.com/tutorials/consul/application-leader-elections) when no cluster leader.
1. Watch `swarmkv/<cluster_name>/lead` with [blocking query](https://www.consul.io/api/features/blocking).
2. If there is no session in the result, then run for leader with following steps
- [Create a session](https://www.consul.io/api-docs/session#create-session).
- [Update](https://www.consul.io/api-docs/kv#acquire) swarmkv/<cluster_name>/lead with `?acquire=<session>` parameter.
- If the response is `true`, the node is elected to be a leader.
3. Go to step 1.
### Hash tags
Like [Redis Cluster](https://redis.io/docs/reference/cluster-spec/), SwarmKV implements hash tags, which can be used to ensure that multiple keys are allocated in the same hash slot for managed by the same key owner. Note that the hash tags does not guarrantee objects are managed by the same object owner. If the key contains a "{...}" pattern only the substring between `{` and `}` is hashed in order to obtain the hash slot. However since it is possible that there are multiple occurrences of `{` or `}` the algorithm is well specified by the following rules:
- IF the key contains a `{` character.
- AND IF there is a `}` character to the right of `{`.
- AND IF there are one or more characters between the first occurrence of `{` and the first occurrence of `}`.
Then instead of hashing the key, only what is between the first occurrence of `{` and the following first occurrence of `}` is hashed.
Examples:
- The two keys `{user1000}.following` and `{user1000}.followers` will hash to the same hash slot since only the substring `user1000` will be hashed in order to compute the hash slot.
- For the key `foo{}{bar}` the whole key will be hashed as usually since the first occurrence of `{` is followed by `}` on the right without characters in the middle.
- For the key `foo{{bar}}zap` the substring `{bar` will be hashed, because it is the substring between the first occurrence of `{` and the first occurrence of `}` on its right.
- For the key `foo{bar}{zap}` the substring `bar` will be hashed, since the algorithm stops at the first valid or invalid (without bytes inside) match of `{` and `}`.
- What follows from the algorithm is that if the key starts with `{}`, it is guaranteed to be hashed as a whole. This is useful when using binary data as key names.
### Keyspace reorganization
The keyspace is sharding to slots and each slot is owned by one node. This information is recorded in the global shared slot table. If any node's health check failed, the cluster leader will choose a randome health node as the new slot owner.
You can use `CLUSTER ADDSLOTOWNER` to add node to the slot table.
When adding a new node, the operation is **Assign slot_id from `original_node` to `new_node`**, the `swarmkv-cli` executes the following commands:
- `KEYSPACE SETSLOT new_node_ip:port <slot_id> IMPORTING original_node_ip:port`
- The command sets the <slot_id> of new_node to IMPORTING state.
- When a slot is set as IMPORTING, the node will accept all keyspace queries that are about this hash slot.
- `KEYSPACE SETSLOT original_node_ip:port <slot_id> MIGRATING new_node_ip:port`
- The command sets the <slot_id> of old_node to MIGRATING state.
- When a slot is set as MIGRATING, the node will accept all queries that are about this hash slot, but only if the key in question exists, otherwise the query is forwarded using a -ASK redirection to the node that is target of the migration.
- `KEYSPACE GETKEYSINSLOT original_node_ip:port <slot_id>`
- The command gets a blob of all key entries of <slot_id> from the original_node
- `KEYSPACE ADDKEYSTOSLOT new_node_ip:port <slot_id> blob_of_getkeysinslot `
- The command adds all key entries to the <slot_id> of new_node.
- `KEYSPACE DELSLOTKEYS original_node_ip:port <slot_id>`
- The command erases all key entries of the <slot_id> of the original_node.
After all desired slots are migrated, the `swarmkv-cli` updates the global slot table in Consul.
Known issues:
- Deleting existing keys and adding replica to existing keys between the `getkeysinslot` and `delslotkeys` could not take effect on the new node. It's better to execute `CLUSTER SANITY heal` after the migration.
- What if someone nodes are failed during the migration? [Todo] `swarmkv-cli` acquires the cluster leadership before the migration.
- If the new node fails, the original node still owns slot k, and after a timeout without receiving slot table update, it will restore it to a stable state.
- If the original node fails, the new node aborts this migration and waits for the cluster leader to update the global slot table.
- If other nodes fail, the cluster leader will receive destination node's notification and update the slot table.
## KV Operation
When a requestor executes a command, it takes the following steps
- Local Execution
- For key exists in local store, commands are executed locally, except for commands modify keyspace.
- Key Route
- The requestor hash the key to slot id to find out the key owner.
- The requestor send `KEYSPACE RADD/RLIST/DEL` command to the key owner.
- The key owner processes the keyspace command and return the addresses of all replica.
- The requestor receives the key owner's reply, it will
- Create local replica
- Send the command to the one of the replica
- Send `CRDT PULL key` command to some of the replicas
- Other replica executes the command, registers the requestor as one of the replica nodes, and send a reply to the requestor
- The requestor receives the reply and invokes user callback function.
## Fault tolerance
### High availability
Keyspace replication is not implemented yet.
### Health check
Each node registers an HTTP service to the Consul, and the local Consul agent checks node health periodically via HTTP GET.
If a node fails, the cluster leader gets notification and removes the failed node by reassigning its slots to other nodes.
## Wire protocol
Messages between nodes is start with a fixed length header and a variable length payload. The payload is serialized with [MessagePack](https://msgpack.org/index.html) format. For readability, we use JSON to describe the request and response here.
Take the following command as an example:
```json
[
"HMGET",
"zhangsan's information",
"phone number",
"address",
"avatar"
]
```
Reply message
```json
{
"reply_type": "elements",
"element": [
{
"reply_type": "string",
"string": "+86 186 0000 0000"
},
{
"reply_type": "string",
"string": "House 01, Haidian District, Beijing, China"
},
{
"reply_type": "string",
"string": "0c 74 65 78 74 20 20 20 20 43 6f 70 79 72 69 67 68 74 20 28 63 29 20 31 39 39 38 20 48 65 77 6c 65 74 74 2d 50 61 63 6b 61 72 64 20 43 6f 6d 70 61 6e 79 20 20 64 65 73 63 20 20 20 20 20 20 20 12 73 52 47 42 20 49 45 43 36 31 39 36 36 2d 32 2e 31 20 20 20 20 20 20 20 20 20 20 20 12 73 52 47 42 20 49 45 43 36 31 39 36 36 2d 32 2e 31 20 20 00"
}
]
}
```
## Node design
### Thread model
A swarmkv instance has one key space thread and several (configurable) worker threads. Swarmkv uses [libevent](https://github.com/libevent/libevent) as a peer-to-peer communication infrastructure.
The keyspace thread communicates with Consul. Some of the operations are blocking style, so swarmkv uses a seperated keyspace thread.
- Watch global slot table changes.
- Watch leadership changes, run for leader if allowed.
- For leader node:
- Watch cluster health check, remove failed node from cluster by assigning its node to other nodes.
The worker thread 0 is responsible for:
- Accept TCP connections from new peers, and then load balance new peers to worker threads (by peer IP).
All worker threads are responsible for:
- Send KV command message, which is from swarmkv API caller to destination peers.
- Receive KV command message, and then process it based on command type:
- Key routing request (from the requester, object reader/writer): lookup the value owner, and then send key routing response.
- Object operation request (from the requester): lookup the value, perform the operation and then send value operation response.
- Key routing response (from the key owner): send the request to the decoded value owner.
- Object operation response (from object owner): update local cache, and invoke the user-specified callback.
- For multi-thread scalability, each worker thread maintains connections to all active peers. It's possible that the connection number between two nodes is more than one.
### Lock design
For a swarmkv node, it has three modules. Each module has its own internal data structure.
- Store Module stores actual CRDT objects. It's a key-CRDT hash table which has 8 shards. Each shard is protected by a read-write lock. Lock contention happens when writing on keys of same shard.
- Keyspace Module stores replica addresses of keys. It's a key-route hash table which has 16384 slots. Each slot is protected by a mutex lock. Lock contention happens when wrting on keys of same slot.
- Monitor Module collects runtime metrics for diagnostic. It's lock-free by taking advantage of [HdrHistogram_c](https://github.com/HdrHistogram/HdrHistogram_c)'s interval recorder.
[Todo] Lock free API powered by completion queue.
## Source code layout
The source files are organized as follows:
- src/swarmkv.c event loop, orchestration
- src/swarmkv_api.c API implementaions.
- src/swarmkv_store.c KV operations, key hash table is implemented with [uthash](https://troydhanson.github.io/uthash/). Use [sds](https://github.com/antirez/sds) as a dynamic string library.
- src/swarmkv_sync.c batching CRDT syncronization.
- src/swarmkv_keyspace.c: The control plane of SwarmKV. Interact with [HashiCorp Consul](https://www.consul.io/) for node discovery, slot assignment, and leader election.
- src/swarmkv_message.c: Message is encoded to [MessagePack](https://msgpack.org/index.html) format by [mpack](https://github.com/ludocode/mpack) library.
- src/swarmkv_net.c: High performance P2P communication via [libevent](https://libevent.org/).
- src/swarmkv_message.c Collecting performance metrics.
- src/t_xxx.c Implementing SwarmKV commands of various data types.
- src/deps/ Dependencies in source codes format, such as [uthash](https://troydhanson.github.io/uthash/), [sds](https://github.com/antirez/sds), [mpack](https://github.com/ludocode/mpack), [timeout](https://25thandclement.com/~william/projects/timeout.c.html), [cJSON](https://github.com/DaveGamble/cJSON), [HDRHistogram](https://github.com/HdrHistogram/HdrHistogram_c)
- src/vendor/ Dependencies in archive format.
- CRDT/ Implementing Conflict-free replicated data types.
- test/ Unit tests.
- tools/swarmkv_cli: CLI implementation which is facilitated by [linenoise](https://github.com/antirez/linenoise).
# Similar projects
[dqlite](https://dqlite.io/) is a C library that implements an embeddable and replicated SQL database engine with high availability and automatic failover.
[rqlite](https://github.com/rqlite/rqlite) is an easy-to-use, lightweight, distributed relational database, which uses [SQLite](https://www.sqlite.org/) as its storage engine.
[Zeppelin](https://github.com/Qihoo360/zeppelin) is a Distributed Key-Value Platform that aims to provide excellent performance, reliability, and scalability.
[Couchbase](https://www.couchbase.com/) is a distributed, elastic, in-memory database on your cloud and at your edge.
[Apache Geode](http://geode.apache.org/) is a data management platform (JAVA) that provides real-time, consistent access to data-intensive applications throughout widely distributed cloud architectures.
[Apache Ignite](https://ignite.apache.org/) is a distributed database for high-performance computing with in-memory speed.
[Tarantool](https://www.tarantool.io/en/) is an in-memory computing platform consisting of a database and an application server.
[eXtremeDB](https://www.mcobject.com/) is a hybrid persistent and in-memory database management system for edge and cloud. It's a commercial software and close source.
[vedis](https://vedis.symisc.net/) is an embeddable datastore C library built with over 70 commands similar in concept to Redis but without the networking layer since Vedis run in the same process of the host application.
[UnQLite](https://github.com/symisc/unqlite) is a in-process software library which implements a self-contained, serverless, zero-configuration, transactional NoSQL database engine.
[OrbitDB](https://github.com/orbitdb/orbit-db) is a serverless, distributed, peer-to-peer database. OrbitDB uses [IPFS](https://ipfs.io/) as its data storage and [IPFS Pubsub](https://github.com/ipfs/go-ipfs/blob/master/core/commands/pubsub.go#L23) to automatically sync databases with peers. It's implemented in JavaScript.
[Dynomite](https://github.com/Netflix/dynomite), inspired by Dynamo whitepaper, is a thin, distributed dynamo layer for different storage engines and protocols. Currently these include Redis and Memcached. Dynomite supports multi-datacenter replication and is designed for high availability.
[AntidoteDB](https://www.antidotedb.eu/) is a planet scale, highly available, transactional database. It's implemented in Erlang. Research paper: [Cure: Strong semantics meets high availability and low latency](https://hal.inria.fr/hal-01270776v2/document).
[EdgeDB](https://github.com/edgedb/edgedb) is a new kind of database that takes the best parts of relational databases, graph databases, and ORMs. We call it a graph-relational database.
[SwarmDB](https://github.com/gritzko/swarm) is a proof-of-concept key-value RON store.
[Voldemort](https://github.com/voldemort/voldemort) is a distributed key-value storage system.
[Corfu](https://github.com/CorfuDB/CorfuDB) is a consistency platform designed around the abstraction of a shared log.
[Riak KV](https://riak.com/index.html) is a distributed NoSQL key-value database with advanced local and multi-cluster replication that guarantees reads and writes even in the event of hardware failures or network partitions.
# References
Woo, Shinae, et al. "Elastic scaling of stateful network functions." *15th {USENIX} Symposium on Networked Systems Design and Implementation ({NSDI} 18)*. 2018. [Source] (https://www.usenix.org/conference/nsdi18/presentation/woo)
[Redis Cluster Specification](https://redis.io/topics/cluster-spec)
[C implementation of the Raft consensus protocol](https://github.com/willemt/raft) by Willem-Hendrik Thiart.
|