diff options
| author | yangwei <[email protected]> | 2024-02-28 17:14:51 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-02-28 19:14:21 +0800 |
| commit | 41d9fa5e80726a00c9ca3b66b7619f834dc66377 (patch) | |
| tree | c5a024517f87e49ed7966cd41f766b7fc8b87336 | |
| parent | dee44c13354d1c3d7c0e5e1828f5175e55790f3a (diff) | |
✨ feat(session message defer free): free delivered messags in egress stage
| -rw-r--r-- | examples/stellar_plugin/simple_stellar_plugin.c | 74 | ||||
| -rw-r--r-- | include/stellar/stellar.h | 2 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 56 | ||||
| -rw-r--r-- | src/stellar_on_sapp/start_loader.inf | 14 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_internal.h | 2 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp.h | 2 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_api.c | 48 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_loader.c | 13 |
8 files changed, 158 insertions, 53 deletions
diff --git a/examples/stellar_plugin/simple_stellar_plugin.c b/examples/stellar_plugin/simple_stellar_plugin.c index b4b9cc3..990aee9 100644 --- a/examples/stellar_plugin/simple_stellar_plugin.c +++ b/examples/stellar_plugin/simple_stellar_plugin.c @@ -9,14 +9,18 @@ struct simple_stellar_plugin_env { - int plugin_id; - int exdata_idx; struct stellar *st; + int session_plugin_id; + int exdata_idx; int stat_topic_id; int egress_topic_id; int tcp_topic_id; int udp_topic_id; int tcp_stream_topic_id; + long long tcp_packet_count; + long long udp_packet_count; + long long icmp_packet_count; + long long icmp6_packet_count; }; struct mq_message_stat @@ -43,7 +47,7 @@ static void print_session_stat(struct session *sess, struct mq_message_stat *sta printf("total-count=%u\n", stat->c2s_bytes + stat->s2c_bytes); if(stat->c2s_stream_pkts+stat->s2c_stream_pkts > 0) { - printf("server-stream_pkt=%u, server-stream_count=%u, client-stream_pkt=%u, client-stream_count=%u, ", stat->c2s_stream_pkts, stat->c2s_stream_bytes, + printf("----------------server-stream_pkt=%u, server-stream_count=%u, client-stream_pkt=%u, client-stream_count=%u\n", stat->c2s_stream_pkts, stat->c2s_stream_bytes, stat->s2c_stream_pkts, stat->s2c_stream_bytes); } } @@ -66,7 +70,7 @@ static void simple_session_plugin_ctx_free(struct session *sess, void *session_c { struct simple_stellar_plugin_env *env=(struct simple_stellar_plugin_env *)plugin_env; struct mq_message_stat *stat = (struct mq_message_stat *)session_ctx; - print_session_stat(sess, stat, env->plugin_id, __FUNCTION__); + print_session_stat(sess, stat, env->session_plugin_id, __FUNCTION__); session_exdata_set(sess, env->exdata_idx, NULL); if(session_ctx)FREE(session_ctx); return; @@ -120,6 +124,30 @@ static void simple_event_plugin_entry(struct session *sess, int topic_id, const return; } +void simple_stellar_event_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env; + switch (ip_protocol) + { + case IPPROTO_TCP: + env->tcp_packet_count++; + break; + case IPPROTO_UDP: + env->udp_packet_count++; + break; + case IPPROTO_ICMP: + env->icmp_packet_count++; + break; + case IPPROTO_ICMPV6: + env->icmp6_packet_count++; + break; + default: + perror("invalid ip_protocol\n"); + exit(-1); + break; + } + return; +} void *simple_stellar_event_plugin_init(struct stellar *st) @@ -127,11 +155,21 @@ void *simple_stellar_event_plugin_init(struct stellar *st) struct simple_stellar_plugin_env *env = CALLOC(struct simple_stellar_plugin_env, 1); env->st = st; env->exdata_idx = stellar_session_exdata_new_index(st, "EXDATA_SESSION_STAT", NULL, NULL); - env->plugin_id = stellar_session_plugin_register(st, + env->session_plugin_id = stellar_session_plugin_register(st, simple_session_plugin_ctx_new, simple_session_plugin_ctx_free, env); - + int tcp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_TCP, simple_stellar_event_on_packet_func, env); + int udp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_UDP, simple_stellar_event_on_packet_func, env); + int icmp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_ICMP, simple_stellar_event_on_packet_func, env); + int icmp6_plugin_id=stellar_packet_plugin_register(st, IPPROTO_ICMPV6, simple_stellar_event_on_packet_func, env); + + if(tcp_plugin_id <= 0x10000 || udp_plugin_id <= 0x10000 || icmp_plugin_id <= 0x10000 || icmp6_plugin_id <= 0x10000) + { + perror("register packet plugin failed\n"); + exit(-1); + } + env->tcp_stream_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM); env->tcp_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP); env->udp_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_UDP); @@ -141,9 +179,9 @@ void *simple_stellar_event_plugin_init(struct stellar *st) exit(-1); } - stellar_session_mq_subscribe(st, env->tcp_stream_topic_id, simple_event_plugin_entry, env->plugin_id); - stellar_session_mq_subscribe(st, env->tcp_topic_id, simple_event_plugin_entry, env->plugin_id); - stellar_session_mq_subscribe(st, env->udp_topic_id, simple_event_plugin_entry, env->plugin_id); + stellar_session_mq_subscribe(st, env->tcp_stream_topic_id, simple_event_plugin_entry, env->session_plugin_id); + stellar_session_mq_subscribe(st, env->tcp_topic_id, simple_event_plugin_entry, env->session_plugin_id); + stellar_session_mq_subscribe(st, env->udp_topic_id, simple_event_plugin_entry, env->session_plugin_id); int stat_topic_id=stellar_session_mq_get_topic_id(st, "TOPIC_SESSION_STAT"); if(stat_topic_id < 0) @@ -156,11 +194,15 @@ void *simple_stellar_event_plugin_init(struct stellar *st) void simple_stellar_event_plugin_exit(void *plugin_env) { - if(plugin_env)FREE(plugin_env); + if(plugin_env) + { + struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env; + printf("(%s):tcp_packet_num:%lld, udp_packet_num:%lld, icmp_packet_num:%lld, icmp6_packet_num:%lld \n", __FUNCTION__, env->tcp_packet_count, env->udp_packet_count, env->icmp_packet_count, env->icmp6_packet_count); + FREE(plugin_env); + } return; } -// TODO: add packet entry // TODO: add polling entry /******************************* @@ -173,7 +215,7 @@ static void session_mq_plugin_sub_fn(struct session *sess, int topic_id, const v struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env; if(topic_id == env->egress_topic_id) { - session_mq_ignore_message(sess, topic_id, env->plugin_id); + session_mq_ignore_message(sess, topic_id, env->session_plugin_id); } if (topic_id == env->stat_topic_id) { @@ -185,7 +227,7 @@ static void session_mq_plugin_sub_fn(struct session *sess, int topic_id, const v exit(-1); } // print_session_stat(sess, stat, env->plugin_id, __FUNCTION__); - session_mq_unignore_message(sess, env->egress_topic_id, env->plugin_id); + session_mq_unignore_message(sess, env->egress_topic_id, env->session_plugin_id); } return; } @@ -201,11 +243,11 @@ void *simple_stellar_mq_plugin_init(struct stellar *st) topic_id=stellar_session_mq_create_topic(st, "TOPIC_SESSION_STAT", NULL, NULL); } env->stat_topic_id = topic_id; - env->plugin_id = stellar_session_plugin_register(st, + env->session_plugin_id = stellar_session_plugin_register(st, NULL, NULL, env); - stellar_session_mq_subscribe(st, topic_id, session_mq_plugin_sub_fn, env->plugin_id); + stellar_session_mq_subscribe(st, topic_id, session_mq_plugin_sub_fn, env->session_plugin_id); // TEST: subscirbe egress message then ignore env->egress_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_EGRESS); @@ -214,7 +256,7 @@ void *simple_stellar_mq_plugin_init(struct stellar *st) perror("get egress topic id failed\n"); exit(-1); } - stellar_session_mq_subscribe(st, env->egress_topic_id, session_mq_plugin_sub_fn, env->plugin_id); + stellar_session_mq_subscribe(st, env->egress_topic_id, session_mq_plugin_sub_fn, env->session_plugin_id); return env; } diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index a41a159..96b5394 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -28,7 +28,7 @@ void stellar_session_plugin_dettach_current_session(struct session *sess); struct packet; -typedef void *plugin_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env); +typedef void plugin_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env); //return packet plugin_id int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env); diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index eb4873a..13e0f2a 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -74,12 +74,12 @@ struct plugin_exdata void *exdata; }; -// TODO: add delivered mq, free message in egress stage struct plugin_manager_runtime { struct plugin_manager_schema *plug_mgr; struct session *sess; - struct session_message *mq;// message list + struct session_message *pending_mq;// message list + struct session_message *delivered_mq;// message list char *session_mq_status; //N * M bits, N topic, M subscriber struct plugin_exdata *plugin_exdata_array; struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free @@ -456,10 +456,10 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data) if(plug_mgr_rt->plug_mgr->session_mq_schema_array==NULL)return -1; unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_mq_schema_array); if (len <= (unsigned int)topic_id)return -1; - struct session_message *node= CALLOC(struct session_message,1); - node->topic_id = topic_id; - node->msg_data = data; - DL_APPEND(plug_mgr_rt->mq, node); + struct session_message *msg= CALLOC(struct session_message,1); + msg->topic_id = topic_id; + msg->msg_data = data; + DL_APPEND(plug_mgr_rt->pending_mq, msg); return 0; } @@ -526,7 +526,13 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_fun utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &session_mq_subscriber_info_icd); } - // TODO: check if the plugin already subscribe current topic + // if plugin already subscribe current topic, return 0 + struct session_mq_subscriber_info *p=NULL; + while( (p=(struct session_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p))) + { + if(p->topic_id==topic_id) + return 0; + }; struct session_mq_subscriber *new_subscriber = CALLOC(struct session_mq_subscriber,1); new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; @@ -554,9 +560,9 @@ static void plugin_manager_session_message_dispatch(struct session *sess) struct session_mq_topic_schema *topic; struct registered_session_plugin_schema *session_plugin_schema; struct session_plugin_ctx_runtime *plugin_ctx_rt; - while (plug_mgr_rt->mq != NULL) + while (plug_mgr_rt->pending_mq != NULL) { - DL_FOREACH_SAFE(plug_mgr_rt->mq, mq_elt, mq_tmp) + DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp) { topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)(mq_elt->topic_id)); @@ -587,8 +593,8 @@ static void plugin_manager_session_message_dispatch(struct session *sess) topic->free_cb(mq_elt->msg_data, topic->free_cb_arg); } } - DL_DELETE(plug_mgr_rt->mq, mq_elt); - FREE(mq_elt); + DL_DELETE(plug_mgr_rt->pending_mq, mq_elt); + DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list } } return; @@ -634,7 +640,8 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1); rt->plug_mgr = plug_mgr; rt->sess = sess; - rt->mq = NULL; + rt->pending_mq = NULL; + rt->delivered_mq = NULL; rt->session_mq_status=CALLOC(char, plug_mgr->topic_num*plug_mgr->subscriber_num); memset(rt->session_mq_status, 1, plug_mgr->topic_num*plug_mgr->subscriber_num); rt->plugin_exdata_array = (struct plugin_exdata *)session_exdata_runtime_new(plug_mgr); @@ -646,10 +653,16 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) { if(rt==NULL)return; - if(rt->mq != NULL) + if(rt->pending_mq != NULL) { - session_mq_free(rt->mq); + session_mq_free(rt->pending_mq); + rt->pending_mq=NULL; } + if(rt->delivered_mq != NULL) + { + session_mq_free(rt->delivered_mq); + rt->delivered_mq=NULL; + } if(rt->session_mq_status != NULL) { FREE(rt->session_mq_status); @@ -685,12 +698,13 @@ int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol { utarray_new(plug_mgr->registered_packet_plugin_array, ®istered_packet_plugin_array_icd); } - struct registered_packet_plugin_schema* packet_plugin_schema = CALLOC(struct registered_packet_plugin_schema, 1); - packet_plugin_schema->ip_protocol = ip_protocol; - packet_plugin_schema->on_packet = on_packet; - packet_plugin_schema->plugin_env = plugin_env; - utarray_push_back(plug_mgr->registered_packet_plugin_array, packet_plugin_schema); - return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id + struct registered_packet_plugin_schema packet_plugin_schema; + memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema)); + packet_plugin_schema.ip_protocol = ip_protocol; + packet_plugin_schema.on_packet = on_packet; + packet_plugin_schema.plugin_env = plugin_env; + utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema); + return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array));// return packet plugin_id } void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt) @@ -770,6 +784,8 @@ void plugin_manager_on_session_egress(struct session *sess,const struct packet * if(plug_mgr_rt==NULL)return; session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt); plugin_manager_session_message_dispatch(sess); + session_mq_free(plug_mgr_rt->delivered_mq); + plug_mgr_rt->delivered_mq=NULL; return; } diff --git a/src/stellar_on_sapp/start_loader.inf b/src/stellar_on_sapp/start_loader.inf index 3cfe7c6..0327912 100644 --- a/src/stellar_on_sapp/start_loader.inf +++ b/src/stellar_on_sapp/start_loader.inf @@ -18,4 +18,16 @@ FUNC_NAME=stellar_on_sapp_tcpall_entry [UDP] FUNC_FLAG=ALL -FUNC_NAME=stellar_on_sapp_udp_entry
\ No newline at end of file +FUNC_NAME=stellar_on_sapp_udp_entry + +[IP] +FUNC_FLAG=ALL +FUNC_NAME=stellar_on_sapp_ip4_entry + +[IPV6] +FUNC_FLAG=ALL +FUNC_NAME=stellar_on_sapp_ip6_entry + +#[POLLING] +#FUNC_FLAG=ALL +#FUNC_NAME=stellar_on_sapp_polling_entry
\ No newline at end of file diff --git a/src/stellar_on_sapp/stellar_internal.h b/src/stellar_on_sapp/stellar_internal.h index 652685c..f831ee2 100644 --- a/src/stellar_on_sapp/stellar_internal.h +++ b/src/stellar_on_sapp/stellar_internal.h @@ -12,6 +12,8 @@ struct plugin_manager_runtime * session_plugin_manager_runtime_get(struct sessio enum packet_type { UNKNOWN, + IPv4, + IPv6, UDP, TCP, TCP_STREAM, diff --git a/src/stellar_on_sapp/stellar_on_sapp.h b/src/stellar_on_sapp/stellar_on_sapp.h index 9b509d7..593f40b 100644 --- a/src/stellar_on_sapp/stellar_on_sapp.h +++ b/src/stellar_on_sapp/stellar_on_sapp.h @@ -15,3 +15,5 @@ void session_free_on_sapp(struct session *sess); unsigned char session_state_update_on_sapp(struct streaminfo *stream, struct session *sess, void *a_packet, enum packet_type type); void session_poll_on_sapp(struct session *sess); + +void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *a_packet, enum packet_type type); diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c index 5320b5d..54651b1 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_api.c +++ b/src/stellar_on_sapp/stellar_on_sapp_api.c @@ -69,6 +69,8 @@ void stellar_exit_on_sapp(struct stellar *st) struct packet { enum packet_type type; + unsigned char ip_proto; + unsigned char pad[3]; void *raw_pkt; struct streaminfo *a_stream; }; @@ -147,6 +149,36 @@ void session_poll_on_sapp(struct session *sess) } /********************************************* + * PACKET STATE UPDATE ON SAPP * + *********************************************/ +#include <netinet/in.h> +void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *pkt_hdr, enum packet_type type) +{ + struct packet pkt; + pkt.type=type; + pkt.a_stream=pstream; + pkt.raw_pkt=(void *)get_current_rawpkt_from_streaminfo(pstream); + + switch (type) + { + case IPv4: + { + struct iphdr *ip4_hdr=(struct iphdr *)pkt_hdr; + pkt.ip_proto=ip4_hdr->protocol; + break; + } + case IPv6: + { + struct ip6_hdr *ip6_hdr=(struct ip6_hdr *)pkt_hdr; + pkt.ip_proto=ip6_hdr->ip6_nxt; + break; + } + default: + return; + } + plugin_manager_on_packet(st->plug_mgr, &pkt); +} +/********************************************* * STELLAR INFO INTERFACE WRAPPER ON SAPP* *********************************************/ @@ -212,22 +244,10 @@ int packet_arrive_time(const struct packet *pkt, struct timeval *ts) return 0; } -unsigned char packet_get_ip_protocol(struct packet *pkt) +inline unsigned char packet_get_ip_protocol(struct packet *pkt) { if(pkt == NULL || pkt->a_stream == NULL)return 0; - unsigned char ip_protocol=0; - switch(pkt->a_stream->type) - { - case STREAM_TYPE_TCP: - ip_protocol=IPPROTO_TCP; - break; - case STREAM_TYPE_UDP: - ip_protocol=IPPROTO_UDP; - break; - default: - break; - } - return ip_protocol; // FIXME: to be improved to support ICMP, etc. + return pkt->ip_proto; } /********************************************* diff --git a/src/stellar_on_sapp/stellar_on_sapp_loader.c b/src/stellar_on_sapp/stellar_on_sapp_loader.c index fbed5e4..df864f2 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_loader.c +++ b/src/stellar_on_sapp/stellar_on_sapp_loader.c @@ -211,5 +211,16 @@ unsigned char stellar_on_sapp_tcp_entry(struct streaminfo *pstream,void **pme, i return loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet, ENTRY_TYPE_TCP); } -// TODO: add packet entry and call plugin_manager_on_packet +char stellar_on_sapp_ip4_entry( struct streaminfo *pstream,unsigned char routedir,int thread_seq, void *a_packet) +{ + packet_update_on_sapp(g_stellar, pstream, a_packet, IPv4); + return APP_STATE_GIVEME; +} + +char stellar_on_sapp_ip6_entry( struct streaminfo *pstream,unsigned char routedir,int thread_seq, void *a_packet) +{ + packet_update_on_sapp(g_stellar, pstream, a_packet, IPv6); + return APP_STATE_GIVEME; +} + // TODO: add polling entry and call plugin_manager_on_polling
\ No newline at end of file |
