summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-07-28 11:38:12 +0800
committerZheng Chao <[email protected]>2023-07-28 11:38:12 +0800
commit6b50898b3bebc730979e74982fcf4c6645fb7f21 (patch)
treea7303b0f8a23864e71d0e50362f233b148180c5b /src
parent49c33b4687c065ba2003a512aad67c24388b3d8e (diff)
WIP: fix memory leak
Diffstat (limited to 'src')
-rw-r--r--src/swarmkv.c12
-rw-r--r--src/swarmkv_api.c1
-rw-r--r--src/swarmkv_mesh.c13
-rw-r--r--src/swarmkv_net.c8
-rw-r--r--src/swarmkv_store.c2
5 files changed, 23 insertions, 13 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 70d6e4f..719d8df 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -822,6 +822,7 @@ static void key_route_on_success(void *result, void *user)
crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller);
crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx);
__exec_cmd(ctx->db, NULL, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine);
+ swarmkv_cmd_free(crdt_add_cmd);
}
free(replica_nodes);
}
@@ -1284,7 +1285,7 @@ char *swarmkv_get_command_hint(struct swarmkv *db, const char* cmd_name)
sdsfreesplitres(argv,argc);
return NULL;
}
-void timeout_cb(evutil_socket_t fd, short event, void *arg)
+static void loop_timeout_cb(evutil_socket_t fd, short event, void *arg)
{
struct swarmkv_thread_ctx *ctx = (struct swarmkv_thread_ctx *)arg;
event_base_loopbreak(ctx->evbase);
@@ -1298,9 +1299,10 @@ void swarmkv_caller_loop(struct swarmkv *db, struct timeval *tv)
struct event *timeout_event = NULL;
if(tv)
{
- timeout_event = event_new(ctx->evbase, -1, 0, timeout_cb, ctx);
+ timeout_event = event_new(ctx->evbase, -1, 0, loop_timeout_cb, ctx);
evtimer_add(timeout_event, tv);
- event_base_loop(ctx->evbase, EVLOOP_ONCE);
+ event_base_loop(ctx->evbase, EVLOOP_NO_EXIT_ON_EMPTY);
+ event_del(timeout_event);
event_free(timeout_event);
}
else
@@ -1479,11 +1481,13 @@ void swarmkv_close(struct swarmkv * db)
}
free(db->threads);
db->threads=NULL;
+ free(db->ref_evbases);
+ db->ref_evbases=NULL;
db->opts->is_assigned_to_db=0;
swarmkv_options_free(db->opts);
db->opts=NULL;
-
+
free(db);
return;
}
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index 0d310b1..cc2a21d 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -351,6 +351,7 @@ struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *ta
exec_for_local(db, cmd, &target_node, blocking_query_cb, &ctx);
}
swarmkv_caller_loop(db, NULL);
+ assert(ctx.reply!=NULL);
reply=ctx.reply;
ctx.reply=NULL;
diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c
index 0a24c3a..c98c629 100644
--- a/src/swarmkv_mesh.c
+++ b/src/swarmkv_mesh.c
@@ -37,6 +37,7 @@ struct swarmkv_mesh
int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest_thread_id, struct swarmkv_msg *msg)
{
assert(current_thread_id<mesh->nr_thread);
+ assert(current_thread_id != dest_thread_id);
struct swarmkv_mesh_thread *curr_thr=mesh->threads+current_thread_id;
struct swarmkv_mesh_thread *dest_thr=mesh->threads+dest_thread_id;
ringbuf_t *dest_ring=dest_thr->ring;
@@ -92,28 +93,28 @@ static void swarmkv_mesh_on_event(evutil_socket_t fd, short what, void * arg)
struct swarmkv_mesh_thread *thread=(struct swarmkv_mesh_thread*)arg;
struct swarmkv_mesh *mesh=thread->ref_mesh;
ringbuf_t *ring=thread->ring;
- uint64_t val=0;
- ssize_t s = read(thread->efd, &val, sizeof(uint64_t));
+ uint64_t n_msg=0;
+ ssize_t s = read(thread->efd, &n_msg, sizeof(uint64_t));
if(s!=sizeof(uint64_t))
{
assert(0);
}
- size_t offset=0, ret=0, n_msg=0;
+ size_t offset=0, ret=0;
struct swarmkv_msg *msg=NULL;
- while(1)
+ for(uint64_t i=0; i<n_msg; i++)
{
ret=ringbuf_consume(ring, &offset);
if(ret==0)
{
+ assert(0);
break;
}
ringbuf_release(ring, sizeof(struct swarmkv_msg*));
msg=*(struct swarmkv_msg**)(thread->buff+offset);
- n_msg++;
mesh->on_msg_recv(msg, mesh->msg_recv_arg);
swarmkv_msg_free(msg);
}
- assert(n_msg==val);
+
return;
}
struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, struct log_handle *logger)
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index fee1fb2..ad70dc9 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -89,7 +89,7 @@ struct swarmkv_net
void* logger;
on_msg_callback_t *on_msg_cb;
void *on_msg_cb_arg;
-
+ struct event * stat_ev;
long long last_input_bytes, last_output_bytes;
long long last_input_cmds, last_output_cmds;
struct timespec last_stats;
@@ -499,9 +499,9 @@ struct swarmkv_net *swarmkv_net_new(struct event_base *evbases[], int nr_worker_
thr->ref_net=net;
thr->conn_table=NULL;
}
- struct event * ev = event_new(evbases[0], -1, EV_PERSIST, snet_stat_periodic, net);
+ net->stat_ev = event_new(evbases[0], -1, EV_PERSIST, snet_stat_periodic, net);
struct timeval timer_delay = {2, 0};
- evtimer_add(ev, &timer_delay);
+ evtimer_add(net->stat_ev, &timer_delay);
//Refer to http://www.wangafu.net/~nickm/libevent-book/Ref8_listener.html
struct sockaddr_in sin;
@@ -582,6 +582,8 @@ void swarmkv_net_free(struct swarmkv_net *net)
}
thr->evbase=NULL;
}
+ event_del(net->stat_ev);
+ event_free(net->stat_ev);
free(net->threads);
free(net);
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index bef45ee..0a2fec1 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -627,6 +627,8 @@ void swarmkv_store_free(struct swarmkv_module *mod_store)
scontainer_free(ctr);
}
}
+ free(store->threads);
+ store->threads=NULL;
free(store);
return;
}