summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-09-23 12:55:03 +0800
committerZheng Chao <[email protected]>2023-09-23 12:55:03 +0800
commit603046d4d9a95d3a74b2e880db93c25f9aa02013 (patch)
tree9393f21f9b9a4a96241e8b6e5b340a8b21f6a265
parent6ac41d2ce6b4d02a86d7424b6c6ce1e617559c36 (diff)
:memo: update design.mdv4.0.1
-rw-r--r--docs/cli.md8
-rw-r--r--docs/design.md14
-rw-r--r--src/swarmkv.c3
-rw-r--r--src/swarmkv_keyspace.c2
-rw-r--r--src/swarmkv_net.c11
-rw-r--r--src/swarmkv_store.c19
6 files changed, 26 insertions, 31 deletions
diff --git a/docs/cli.md b/docs/cli.md
index 2f12b38..f31dfa2 100644
--- a/docs/cli.md
+++ b/docs/cli.md
@@ -13,7 +13,7 @@ In interactive mode, you can prefixing the command name by a number to run it N
(integer) 3
```
-Prefixing by two number are repeat times and interval (unit: second, default: 0 second, 0.1 represents 100 milliseconds).
+Prefixing by two number are repeat times and interval (unit: second, default: 0 second, decimal is allowed, i.e., 0.1 represents 100 milliseconds).
```
127.0.0.1:7311@swarmkv-sync> 5 .1 incrby id001 1
@@ -23,8 +23,10 @@ Prefixing by two number are repeat times and interval (unit: second, default: 0
(integer) 7
(integer) 8
```
-
+### Attach/Detach
Low-level commands don't have auto-route ability. If you execute low-level command via `swarmkv-cli`, you should execute `ATTACH IP:port` first. The `DETACH` command exits attaching model.
-### Known Bugs
+Another use case is the ability to attach to a node and execute commands on it. For example, you can connect to a node and execute the DEBUG command to debug the node.
+
+### Known Issues
If a swarmkv-cli instance attempts to connect to itself using an address other than 127.0.0.1:port, it will result in a crash. \ No newline at end of file
diff --git a/docs/design.md b/docs/design.md
index 90ebaf3..611ae1f 100644
--- a/docs/design.md
+++ b/docs/design.md
@@ -9,7 +9,7 @@ Following are terminologies used in Swarm KV.
| --------- | ------------------------------------------------------------ |
| KV | Key-value pair |
| Node | Instances in a Swarm KV cluster. A swarmkv node is identified with its IP, P2P port and UUID. |
-| Slot | KVs are partitioned in different slots, and keys of one table are scattered in many slots. Each slot is maintained by one node. |
+| Slot | Keyspace are partitioned in 16384 slots. Each slot is maintained by one node. |
| Key Owner | A key ONLY has one owner. |
| Replica | A CRDT object is replicated in many nodes, which are replicas. |
| RPC | Remote Procedure Call, used for sending command and receiving reply. |
@@ -232,18 +232,18 @@ All worker threads are responsible for:
- Run periodic tasks.
- Send command message, which is from swarmkv API caller to destination peers.
- Receive KV command message, and then process it based on command type:
- - Key routing request (from the requester, object reader/writer): lookup the value owner, and then send key routing response.
- - Object operation request (from the requester): lookup the value, perform the operation and then send value operation response.
- - Key routing response (from the key owner): send the request to the decoded value owner.
+ - Key routing request (from the requester, object reader/writer): lookup the object owner, and then send key routing response.
+ - Object operation request (from the requester): lookup the object, perform the operation and then send object operation response.
+ - Key routing response (from the key owner): send the request to the object owner.
- Object operation response (from object owner): update local cache, and invoke the user-specified callback.
- For multi-thread scalability, each worker thread maintains connections to all active peers. It's possible that there are one or more connections between two nodes.
The worker thread 0 has extra tasks including:
-- Accept TCP connections from new peers, and then load balance new peers to worker threads (by peer IP).
-- Watch leadership changes, run for leader if allowed.
+- Accepting TCP connections from other peers. It begins by peeking at the message header to determine the caller thread ID, then it load balances the connection across worker threads using the caller_ID mod worker thread number. This mechanism eliminates unnecessary inter-thread communication for nodes with the same worker thread number.
+- Watch leadership changes, and run for leader if cluster loses its leader.
- For leader node:
- - Watch cluster health check, remove failed node from cluster by assigning its node to other nodes.
+ - Watch cluster health check, remove failed node from cluster by assigning its slots to other nodes.
## Source code layout
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 9c60721..bd73915 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -1373,8 +1373,9 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
opts->total_threads=opts->nr_caller_threads+opts->nr_worker_threads;
db->threads=ALLOC(struct swarmkv_thread, opts->total_threads);
/* adds locking, only required if accessed from separate threads */
+ //When accept a new connection, the worker thread 0 may create a bufferevent with evbase of other worker threads.
evthread_use_pthreads();
-
+ //evthread_enable_lock_debugging();
strncpy(db->module.name, "db", sizeof(db->module.name));
db->module.mod_ctx=db;
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index 6c83e4b..b86d5c9 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -739,7 +739,7 @@ void watch_slots_changes_on_success(void *result, void *arg)
struct key_slot *slot=NULL;
struct slot_runtime *slot_rt=NULL;
node_t *owner=NULL;
- log_info(thr->ref_ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots update started.");
+ log_info(thr->ref_ks->logger, MODULE_SWARMKV_KEYSPACE, "thread %d key slots update started.", thr->thread_id);
for(i=0; i<KEYSPACE_SLOT_NUM; i++)
{
if(!slot_is_my_thread(i, thr->thread_id, thr->ref_ks->opts->nr_worker_threads)) continue;
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index c6301ab..4607d1d 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -103,14 +103,12 @@ static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg)
struct snet_conn *snet_conn_new_by_connecting(struct snet_thread *thr, const node_t *dest)
{
- struct event_base *base=thr->evbase;
-
struct snet_conn *conn=ALLOC(struct snet_conn, 1);
conn->buff_for_sending=evbuffer_new();
conn->fd=-1;
//http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html
- conn->bev=bufferevent_socket_new(base, -1, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS|
+ conn->bev=bufferevent_socket_new(thr->evbase, -1, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS);//BEV_OPT_UNLOCK_CALLBACKS|BEV_OPT_THREADSAFE
bufferevent_setcb(conn->bev, NULL, NULL, peer_connected_event_cb, conn);
node_copy(&conn->peer_listen_addr, dest);
conn->thread_id=thr->thread_id;
@@ -466,9 +464,10 @@ void peek_msg_hdr_cb(evutil_socket_t fd, short what, void * arg)
struct snet_thread *thr=net->threads+tid;
struct snet_conn *conn=NULL;
conn=snet_conn_new_by_fd(thr, fd, &ctx->peer);
-
- log_debug(net->logger, MODULE_SWAMRKV_NET, "accept connection from %s (tid=%d).",
- conn->connected_from.addr, hdr->caller_tid);
+ //Assign a connection to the thread, but for thread safety, do not add it to the table.
+ //Instead, add it during the peer_conn_read_cb() function call.
+ log_debug(net->logger, MODULE_SWAMRKV_NET, "thread %d accept connection from %s (tid=%d).",
+ conn->thread_id, conn->connected_from.addr, hdr->caller_tid);
failed:
event_del(ctx->ev);
event_free(ctx->ev);
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 4a18e1d..987cfd4 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -484,17 +484,6 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
store_get_uuid(&(ctx->store->module), uuid);
__attribute__ ((unused)) long long error_before=ctx->store->sync_err;
- if(reply->type==SWARMKV_REPLY_ERROR)
- {
- if(strcasestr(reply->str, "timed out"))
- {
- __attribute__ ((unused)) int tid=__gettid(ctx->store->exec_cmd_handle);
- atomic_inc(&ctx->store->sync_err);
- //Time consuming operation, block the event loop.
- //store_remove_failed_peer(ctx->store, tid, &ctx->peer);
- }
- goto error_out;
- }
switch(ctx->op)
{
@@ -516,13 +505,18 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
}
case CRDT_MERGE:
{
-
if(reply->type==SWARMKV_REPLY_INTEGER && reply->integer>0)
{
atomic_add(&ctx->store->sync_ok, reply->integer);
}
else
{
+ if(0 && reply->type==SWARMKV_REPLY_ERROR && strcasestr(reply->str, "timed out"))
+ {
+ struct scontainer *ctr=NULL;
+ ctr=store_lookup_scontainer(ctx->store, ctx->key);
+ scontainer_remove_replica_node(ctr, &ctx->peer);
+ }
atomic_inc(&ctx->store->sync_err);
}
break;
@@ -546,7 +540,6 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
}
}
//assert(ctx->store->sync_err==error_before);
-error_out:
crdt_generic_ctx_free(ctx);
return;
}