summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-02-28 17:14:51 +0800
committeryangwei <[email protected]>2024-02-28 19:14:21 +0800
commit41d9fa5e80726a00c9ca3b66b7619f834dc66377 (patch)
treec5a024517f87e49ed7966cd41f766b7fc8b87336
parentdee44c13354d1c3d7c0e5e1828f5175e55790f3a (diff)
✨ feat(session message defer free): free delivered messags in egress stage
-rw-r--r--examples/stellar_plugin/simple_stellar_plugin.c74
-rw-r--r--include/stellar/stellar.h2
-rw-r--r--src/plugin_manager/plugin_manager.c56
-rw-r--r--src/stellar_on_sapp/start_loader.inf14
-rw-r--r--src/stellar_on_sapp/stellar_internal.h2
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp.h2
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_api.c48
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_loader.c13
8 files changed, 158 insertions, 53 deletions
diff --git a/examples/stellar_plugin/simple_stellar_plugin.c b/examples/stellar_plugin/simple_stellar_plugin.c
index b4b9cc3..990aee9 100644
--- a/examples/stellar_plugin/simple_stellar_plugin.c
+++ b/examples/stellar_plugin/simple_stellar_plugin.c
@@ -9,14 +9,18 @@
struct simple_stellar_plugin_env
{
- int plugin_id;
- int exdata_idx;
struct stellar *st;
+ int session_plugin_id;
+ int exdata_idx;
int stat_topic_id;
int egress_topic_id;
int tcp_topic_id;
int udp_topic_id;
int tcp_stream_topic_id;
+ long long tcp_packet_count;
+ long long udp_packet_count;
+ long long icmp_packet_count;
+ long long icmp6_packet_count;
};
struct mq_message_stat
@@ -43,7 +47,7 @@ static void print_session_stat(struct session *sess, struct mq_message_stat *sta
printf("total-count=%u\n", stat->c2s_bytes + stat->s2c_bytes);
if(stat->c2s_stream_pkts+stat->s2c_stream_pkts > 0)
{
- printf("server-stream_pkt=%u, server-stream_count=%u, client-stream_pkt=%u, client-stream_count=%u, ", stat->c2s_stream_pkts, stat->c2s_stream_bytes,
+ printf("----------------server-stream_pkt=%u, server-stream_count=%u, client-stream_pkt=%u, client-stream_count=%u\n", stat->c2s_stream_pkts, stat->c2s_stream_bytes,
stat->s2c_stream_pkts, stat->s2c_stream_bytes);
}
}
@@ -66,7 +70,7 @@ static void simple_session_plugin_ctx_free(struct session *sess, void *session_c
{
struct simple_stellar_plugin_env *env=(struct simple_stellar_plugin_env *)plugin_env;
struct mq_message_stat *stat = (struct mq_message_stat *)session_ctx;
- print_session_stat(sess, stat, env->plugin_id, __FUNCTION__);
+ print_session_stat(sess, stat, env->session_plugin_id, __FUNCTION__);
session_exdata_set(sess, env->exdata_idx, NULL);
if(session_ctx)FREE(session_ctx);
return;
@@ -120,6 +124,30 @@ static void simple_event_plugin_entry(struct session *sess, int topic_id, const
return;
}
+void simple_stellar_event_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env;
+ switch (ip_protocol)
+ {
+ case IPPROTO_TCP:
+ env->tcp_packet_count++;
+ break;
+ case IPPROTO_UDP:
+ env->udp_packet_count++;
+ break;
+ case IPPROTO_ICMP:
+ env->icmp_packet_count++;
+ break;
+ case IPPROTO_ICMPV6:
+ env->icmp6_packet_count++;
+ break;
+ default:
+ perror("invalid ip_protocol\n");
+ exit(-1);
+ break;
+ }
+ return;
+}
void *simple_stellar_event_plugin_init(struct stellar *st)
@@ -127,11 +155,21 @@ void *simple_stellar_event_plugin_init(struct stellar *st)
struct simple_stellar_plugin_env *env = CALLOC(struct simple_stellar_plugin_env, 1);
env->st = st;
env->exdata_idx = stellar_session_exdata_new_index(st, "EXDATA_SESSION_STAT", NULL, NULL);
- env->plugin_id = stellar_session_plugin_register(st,
+ env->session_plugin_id = stellar_session_plugin_register(st,
simple_session_plugin_ctx_new,
simple_session_plugin_ctx_free,
env);
-
+ int tcp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_TCP, simple_stellar_event_on_packet_func, env);
+ int udp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_UDP, simple_stellar_event_on_packet_func, env);
+ int icmp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_ICMP, simple_stellar_event_on_packet_func, env);
+ int icmp6_plugin_id=stellar_packet_plugin_register(st, IPPROTO_ICMPV6, simple_stellar_event_on_packet_func, env);
+
+ if(tcp_plugin_id <= 0x10000 || udp_plugin_id <= 0x10000 || icmp_plugin_id <= 0x10000 || icmp6_plugin_id <= 0x10000)
+ {
+ perror("register packet plugin failed\n");
+ exit(-1);
+ }
+
env->tcp_stream_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM);
env->tcp_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP);
env->udp_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_UDP);
@@ -141,9 +179,9 @@ void *simple_stellar_event_plugin_init(struct stellar *st)
exit(-1);
}
- stellar_session_mq_subscribe(st, env->tcp_stream_topic_id, simple_event_plugin_entry, env->plugin_id);
- stellar_session_mq_subscribe(st, env->tcp_topic_id, simple_event_plugin_entry, env->plugin_id);
- stellar_session_mq_subscribe(st, env->udp_topic_id, simple_event_plugin_entry, env->plugin_id);
+ stellar_session_mq_subscribe(st, env->tcp_stream_topic_id, simple_event_plugin_entry, env->session_plugin_id);
+ stellar_session_mq_subscribe(st, env->tcp_topic_id, simple_event_plugin_entry, env->session_plugin_id);
+ stellar_session_mq_subscribe(st, env->udp_topic_id, simple_event_plugin_entry, env->session_plugin_id);
int stat_topic_id=stellar_session_mq_get_topic_id(st, "TOPIC_SESSION_STAT");
if(stat_topic_id < 0)
@@ -156,11 +194,15 @@ void *simple_stellar_event_plugin_init(struct stellar *st)
void simple_stellar_event_plugin_exit(void *plugin_env)
{
- if(plugin_env)FREE(plugin_env);
+ if(plugin_env)
+ {
+ struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env;
+ printf("(%s):tcp_packet_num:%lld, udp_packet_num:%lld, icmp_packet_num:%lld, icmp6_packet_num:%lld \n", __FUNCTION__, env->tcp_packet_count, env->udp_packet_count, env->icmp_packet_count, env->icmp6_packet_count);
+ FREE(plugin_env);
+ }
return;
}
-// TODO: add packet entry
// TODO: add polling entry
/*******************************
@@ -173,7 +215,7 @@ static void session_mq_plugin_sub_fn(struct session *sess, int topic_id, const v
struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env;
if(topic_id == env->egress_topic_id)
{
- session_mq_ignore_message(sess, topic_id, env->plugin_id);
+ session_mq_ignore_message(sess, topic_id, env->session_plugin_id);
}
if (topic_id == env->stat_topic_id)
{
@@ -185,7 +227,7 @@ static void session_mq_plugin_sub_fn(struct session *sess, int topic_id, const v
exit(-1);
}
// print_session_stat(sess, stat, env->plugin_id, __FUNCTION__);
- session_mq_unignore_message(sess, env->egress_topic_id, env->plugin_id);
+ session_mq_unignore_message(sess, env->egress_topic_id, env->session_plugin_id);
}
return;
}
@@ -201,11 +243,11 @@ void *simple_stellar_mq_plugin_init(struct stellar *st)
topic_id=stellar_session_mq_create_topic(st, "TOPIC_SESSION_STAT", NULL, NULL);
}
env->stat_topic_id = topic_id;
- env->plugin_id = stellar_session_plugin_register(st,
+ env->session_plugin_id = stellar_session_plugin_register(st,
NULL,
NULL,
env);
- stellar_session_mq_subscribe(st, topic_id, session_mq_plugin_sub_fn, env->plugin_id);
+ stellar_session_mq_subscribe(st, topic_id, session_mq_plugin_sub_fn, env->session_plugin_id);
// TEST: subscirbe egress message then ignore
env->egress_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_EGRESS);
@@ -214,7 +256,7 @@ void *simple_stellar_mq_plugin_init(struct stellar *st)
perror("get egress topic id failed\n");
exit(-1);
}
- stellar_session_mq_subscribe(st, env->egress_topic_id, session_mq_plugin_sub_fn, env->plugin_id);
+ stellar_session_mq_subscribe(st, env->egress_topic_id, session_mq_plugin_sub_fn, env->session_plugin_id);
return env;
}
diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h
index a41a159..96b5394 100644
--- a/include/stellar/stellar.h
+++ b/include/stellar/stellar.h
@@ -28,7 +28,7 @@ void stellar_session_plugin_dettach_current_session(struct session *sess);
struct packet;
-typedef void *plugin_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env);
+typedef void plugin_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env);
//return packet plugin_id
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env);
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index eb4873a..13e0f2a 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -74,12 +74,12 @@ struct plugin_exdata
void *exdata;
};
-// TODO: add delivered mq, free message in egress stage
struct plugin_manager_runtime
{
struct plugin_manager_schema *plug_mgr;
struct session *sess;
- struct session_message *mq;// message list
+ struct session_message *pending_mq;// message list
+ struct session_message *delivered_mq;// message list
char *session_mq_status; //N * M bits, N topic, M subscriber
struct plugin_exdata *plugin_exdata_array;
struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
@@ -456,10 +456,10 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data)
if(plug_mgr_rt->plug_mgr->session_mq_schema_array==NULL)return -1;
unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
- struct session_message *node= CALLOC(struct session_message,1);
- node->topic_id = topic_id;
- node->msg_data = data;
- DL_APPEND(plug_mgr_rt->mq, node);
+ struct session_message *msg= CALLOC(struct session_message,1);
+ msg->topic_id = topic_id;
+ msg->msg_data = data;
+ DL_APPEND(plug_mgr_rt->pending_mq, msg);
return 0;
}
@@ -526,7 +526,13 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_fun
utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &session_mq_subscriber_info_icd);
}
- // TODO: check if the plugin already subscribe current topic
+ // if plugin already subscribe current topic, return 0
+ struct session_mq_subscriber_info *p=NULL;
+ while( (p=(struct session_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p)))
+ {
+ if(p->topic_id==topic_id)
+ return 0;
+ };
struct session_mq_subscriber *new_subscriber = CALLOC(struct session_mq_subscriber,1);
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
@@ -554,9 +560,9 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
struct session_mq_topic_schema *topic;
struct registered_session_plugin_schema *session_plugin_schema;
struct session_plugin_ctx_runtime *plugin_ctx_rt;
- while (plug_mgr_rt->mq != NULL)
+ while (plug_mgr_rt->pending_mq != NULL)
{
- DL_FOREACH_SAFE(plug_mgr_rt->mq, mq_elt, mq_tmp)
+ DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp)
{
topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array,
(unsigned int)(mq_elt->topic_id));
@@ -587,8 +593,8 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
topic->free_cb(mq_elt->msg_data, topic->free_cb_arg);
}
}
- DL_DELETE(plug_mgr_rt->mq, mq_elt);
- FREE(mq_elt);
+ DL_DELETE(plug_mgr_rt->pending_mq, mq_elt);
+ DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list
}
}
return;
@@ -634,7 +640,8 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1);
rt->plug_mgr = plug_mgr;
rt->sess = sess;
- rt->mq = NULL;
+ rt->pending_mq = NULL;
+ rt->delivered_mq = NULL;
rt->session_mq_status=CALLOC(char, plug_mgr->topic_num*plug_mgr->subscriber_num);
memset(rt->session_mq_status, 1, plug_mgr->topic_num*plug_mgr->subscriber_num);
rt->plugin_exdata_array = (struct plugin_exdata *)session_exdata_runtime_new(plug_mgr);
@@ -646,10 +653,16 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
{
if(rt==NULL)return;
- if(rt->mq != NULL)
+ if(rt->pending_mq != NULL)
{
- session_mq_free(rt->mq);
+ session_mq_free(rt->pending_mq);
+ rt->pending_mq=NULL;
}
+ if(rt->delivered_mq != NULL)
+ {
+ session_mq_free(rt->delivered_mq);
+ rt->delivered_mq=NULL;
+ }
if(rt->session_mq_status != NULL)
{
FREE(rt->session_mq_status);
@@ -685,12 +698,13 @@ int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol
{
utarray_new(plug_mgr->registered_packet_plugin_array, &registered_packet_plugin_array_icd);
}
- struct registered_packet_plugin_schema* packet_plugin_schema = CALLOC(struct registered_packet_plugin_schema, 1);
- packet_plugin_schema->ip_protocol = ip_protocol;
- packet_plugin_schema->on_packet = on_packet;
- packet_plugin_schema->plugin_env = plugin_env;
- utarray_push_back(plug_mgr->registered_packet_plugin_array, packet_plugin_schema);
- return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id
+ struct registered_packet_plugin_schema packet_plugin_schema;
+ memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
+ packet_plugin_schema.ip_protocol = ip_protocol;
+ packet_plugin_schema.on_packet = on_packet;
+ packet_plugin_schema.plugin_env = plugin_env;
+ utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
+ return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array));// return packet plugin_id
}
void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
@@ -770,6 +784,8 @@ void plugin_manager_on_session_egress(struct session *sess,const struct packet *
if(plug_mgr_rt==NULL)return;
session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt);
plugin_manager_session_message_dispatch(sess);
+ session_mq_free(plug_mgr_rt->delivered_mq);
+ plug_mgr_rt->delivered_mq=NULL;
return;
}
diff --git a/src/stellar_on_sapp/start_loader.inf b/src/stellar_on_sapp/start_loader.inf
index 3cfe7c6..0327912 100644
--- a/src/stellar_on_sapp/start_loader.inf
+++ b/src/stellar_on_sapp/start_loader.inf
@@ -18,4 +18,16 @@ FUNC_NAME=stellar_on_sapp_tcpall_entry
[UDP]
FUNC_FLAG=ALL
-FUNC_NAME=stellar_on_sapp_udp_entry \ No newline at end of file
+FUNC_NAME=stellar_on_sapp_udp_entry
+
+[IP]
+FUNC_FLAG=ALL
+FUNC_NAME=stellar_on_sapp_ip4_entry
+
+[IPV6]
+FUNC_FLAG=ALL
+FUNC_NAME=stellar_on_sapp_ip6_entry
+
+#[POLLING]
+#FUNC_FLAG=ALL
+#FUNC_NAME=stellar_on_sapp_polling_entry \ No newline at end of file
diff --git a/src/stellar_on_sapp/stellar_internal.h b/src/stellar_on_sapp/stellar_internal.h
index 652685c..f831ee2 100644
--- a/src/stellar_on_sapp/stellar_internal.h
+++ b/src/stellar_on_sapp/stellar_internal.h
@@ -12,6 +12,8 @@ struct plugin_manager_runtime * session_plugin_manager_runtime_get(struct sessio
enum packet_type
{
UNKNOWN,
+ IPv4,
+ IPv6,
UDP,
TCP,
TCP_STREAM,
diff --git a/src/stellar_on_sapp/stellar_on_sapp.h b/src/stellar_on_sapp/stellar_on_sapp.h
index 9b509d7..593f40b 100644
--- a/src/stellar_on_sapp/stellar_on_sapp.h
+++ b/src/stellar_on_sapp/stellar_on_sapp.h
@@ -15,3 +15,5 @@ void session_free_on_sapp(struct session *sess);
unsigned char session_state_update_on_sapp(struct streaminfo *stream, struct session *sess, void *a_packet, enum packet_type type);
void session_poll_on_sapp(struct session *sess);
+
+void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *a_packet, enum packet_type type);
diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c
index 5320b5d..54651b1 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_api.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_api.c
@@ -69,6 +69,8 @@ void stellar_exit_on_sapp(struct stellar *st)
struct packet
{
enum packet_type type;
+ unsigned char ip_proto;
+ unsigned char pad[3];
void *raw_pkt;
struct streaminfo *a_stream;
};
@@ -147,6 +149,36 @@ void session_poll_on_sapp(struct session *sess)
}
/*********************************************
+ * PACKET STATE UPDATE ON SAPP *
+ *********************************************/
+#include <netinet/in.h>
+void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *pkt_hdr, enum packet_type type)
+{
+ struct packet pkt;
+ pkt.type=type;
+ pkt.a_stream=pstream;
+ pkt.raw_pkt=(void *)get_current_rawpkt_from_streaminfo(pstream);
+
+ switch (type)
+ {
+ case IPv4:
+ {
+ struct iphdr *ip4_hdr=(struct iphdr *)pkt_hdr;
+ pkt.ip_proto=ip4_hdr->protocol;
+ break;
+ }
+ case IPv6:
+ {
+ struct ip6_hdr *ip6_hdr=(struct ip6_hdr *)pkt_hdr;
+ pkt.ip_proto=ip6_hdr->ip6_nxt;
+ break;
+ }
+ default:
+ return;
+ }
+ plugin_manager_on_packet(st->plug_mgr, &pkt);
+}
+/*********************************************
* STELLAR INFO INTERFACE WRAPPER ON SAPP*
*********************************************/
@@ -212,22 +244,10 @@ int packet_arrive_time(const struct packet *pkt, struct timeval *ts)
return 0;
}
-unsigned char packet_get_ip_protocol(struct packet *pkt)
+inline unsigned char packet_get_ip_protocol(struct packet *pkt)
{
if(pkt == NULL || pkt->a_stream == NULL)return 0;
- unsigned char ip_protocol=0;
- switch(pkt->a_stream->type)
- {
- case STREAM_TYPE_TCP:
- ip_protocol=IPPROTO_TCP;
- break;
- case STREAM_TYPE_UDP:
- ip_protocol=IPPROTO_UDP;
- break;
- default:
- break;
- }
- return ip_protocol; // FIXME: to be improved to support ICMP, etc.
+ return pkt->ip_proto;
}
/*********************************************
diff --git a/src/stellar_on_sapp/stellar_on_sapp_loader.c b/src/stellar_on_sapp/stellar_on_sapp_loader.c
index fbed5e4..df864f2 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_loader.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_loader.c
@@ -211,5 +211,16 @@ unsigned char stellar_on_sapp_tcp_entry(struct streaminfo *pstream,void **pme, i
return loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet, ENTRY_TYPE_TCP);
}
-// TODO: add packet entry and call plugin_manager_on_packet
+char stellar_on_sapp_ip4_entry( struct streaminfo *pstream,unsigned char routedir,int thread_seq, void *a_packet)
+{
+ packet_update_on_sapp(g_stellar, pstream, a_packet, IPv4);
+ return APP_STATE_GIVEME;
+}
+
+char stellar_on_sapp_ip6_entry( struct streaminfo *pstream,unsigned char routedir,int thread_seq, void *a_packet)
+{
+ packet_update_on_sapp(g_stellar, pstream, a_packet, IPv6);
+ return APP_STATE_GIVEME;
+}
+
// TODO: add polling entry and call plugin_manager_on_polling \ No newline at end of file