diff options
| author | yangwei <[email protected]> | 2024-02-02 10:02:11 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-02-28 19:14:21 +0800 |
| commit | dee44c13354d1c3d7c0e5e1828f5175e55790f3a (patch) | |
| tree | bc0adf5b95904a7ded7da7a1a5134b32feadc4df /examples/stellar_plugin | |
| parent | 9649bafbd6eef60541ab1a1e8e50e6d4312981c4 (diff) | |
🦄 refactor(API implementation): Refactor API to use new headers
Diffstat (limited to 'examples/stellar_plugin')
| -rw-r--r-- | examples/stellar_plugin/simple_stellar_plugin.c | 258 | ||||
| -rw-r--r-- | examples/stellar_plugin/spec.toml (renamed from examples/stellar_plugin/simple_plugin.toml) | 0 |
2 files changed, 158 insertions, 100 deletions
diff --git a/examples/stellar_plugin/simple_stellar_plugin.c b/examples/stellar_plugin/simple_stellar_plugin.c index 50f38f0..b4b9cc3 100644 --- a/examples/stellar_plugin/simple_stellar_plugin.c +++ b/examples/stellar_plugin/simple_stellar_plugin.c @@ -1,15 +1,22 @@ #include "stellar/stellar.h" +#include "stellar/session.h" #include "stellar/utils.h" #include "stellar/session_exdata.h" #include "stellar/session_mq.h" #include <stdio.h> +#include <string.h> -struct simple_stellar_plugin_ctx +struct simple_stellar_plugin_env { int plugin_id; int exdata_idx; struct stellar *st; + int stat_topic_id; + int egress_topic_id; + int tcp_topic_id; + int udp_topic_id; + int tcp_stream_topic_id; }; struct mq_message_stat @@ -18,150 +25,201 @@ struct mq_message_stat uint32_t c2s_bytes; uint32_t s2c_pkts; uint32_t s2c_bytes; - struct session_event* ud_ev; + uint32_t c2s_stream_pkts; + uint32_t c2s_stream_bytes; + uint32_t s2c_stream_pkts; + uint32_t s2c_stream_bytes; }; -extern int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg); -static int session_mq_plugin_sub_fn(struct session *sess, int topic_id, const void *data, void *cb_arg) +static void print_session_stat(struct session *sess, struct mq_message_stat *stat, int plugin_id, const char *banner) { - struct mq_message_stat *ctx =(struct mq_message_stat *)data; - struct simple_stellar_plugin_ctx *plugin_ctx = (struct simple_stellar_plugin_ctx *)cb_arg; - printf("%s(topic:%d->plug:%d)-----------%20s: ", __FUNCTION__, topic_id, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); - printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", - ctx->c2s_pkts, ctx->c2s_bytes, - ctx->s2c_pkts, ctx->s2c_bytes); - printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); - printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); - - struct session_event *i_ev=session_get_intrinsic_event(sess, plugin_ctx->plugin_id); - session_event_assign(i_ev, plugin_ctx->st, sess, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_mq_plugin_entry, plugin_ctx); - printf("%s(plug:%d)session_event_assign-----------%20s\n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); - - return 0; + if (stat) + { + printf("%s(plug:%d)-----------%20s: ", banner, plugin_id, session_get0_readable_addr(sess)); + printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", stat->c2s_pkts, stat->c2s_bytes, + stat->s2c_pkts, stat->s2c_bytes); + printf("total-pkt=%u, ", stat->c2s_pkts + stat->s2c_pkts); + printf("total-count=%u\n", stat->c2s_bytes + stat->s2c_bytes); + if(stat->c2s_stream_pkts+stat->s2c_stream_pkts > 0) + { + printf("server-stream_pkt=%u, server-stream_count=%u, client-stream_pkt=%u, client-stream_count=%u, ", stat->c2s_stream_pkts, stat->c2s_stream_bytes, + stat->s2c_stream_pkts, stat->s2c_stream_bytes); + } + } + return; } -static void print_session_ctx(struct session *sess, struct mq_message_stat *ctx, int plugin_id) +/******************************* + * session event plugin * + *******************************/ + +static void *simple_session_plugin_ctx_new(struct session *sess, void *plugin_env) { - printf("%s(plug:%d)-----------%20s: ", __FUNCTION__, plugin_id, session_get0_readable_addr(sess)); - printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", - ctx->c2s_pkts, ctx->c2s_bytes, - ctx->s2c_pkts, ctx->s2c_bytes); - printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); - printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); - return; + struct mq_message_stat * stat= CALLOC(struct mq_message_stat, 1); + struct simple_stellar_plugin_env *env=(struct simple_stellar_plugin_env *)plugin_env; + session_exdata_set(sess, env->exdata_idx, stat); + return stat; } -int simple_event_plugin_user_defined_ev(struct session *sess, int events, const struct packet *pkt, void *cb_arg) +static void simple_session_plugin_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) { - if(cb_arg == NULL)return -1; - struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg; - struct mq_message_stat *mg_stat = (struct mq_message_stat *)session_get_ex_data(sess, plugin_ctx->exdata_idx); - printf("%s(plug:%d)trigger-----------%20s\n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); - print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id); - return 0; + struct simple_stellar_plugin_env *env=(struct simple_stellar_plugin_env *)plugin_env; + struct mq_message_stat *stat = (struct mq_message_stat *)session_ctx; + print_session_stat(sess, stat, env->plugin_id, __FUNCTION__); + session_exdata_set(sess, env->exdata_idx, NULL); + if(session_ctx)FREE(session_ctx); + return; } -int simple_event_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg) +static void simple_event_plugin_entry(struct session *sess, int topic_id, const void *data, void *plugin_ctx, void *plugin_env) { - if(cb_arg == NULL)return -1; - struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg; - struct mq_message_stat *mg_stat = (struct mq_message_stat *)session_get_ex_data(sess, plugin_ctx->exdata_idx); - if (mg_stat == NULL) - { - mg_stat = CALLOC(struct mq_message_stat, 1); - session_set_ex_data(sess, plugin_ctx->exdata_idx, mg_stat); - } - - if ((events & SESS_EV_OPENING)) - { - mg_stat->ud_ev=session_event_new(plugin_ctx->st, sess, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_event_plugin_user_defined_ev, cb_arg); - session_event_add(mg_stat->ud_ev, NULL); - } - + struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env; + struct mq_message_stat *stat = (struct mq_message_stat *)plugin_ctx; + struct packet * pkt=(struct packet *)data; if (pkt) { + // TEST: try stellar_session_plugin_dettach_current_session when pkt > 3 + if(stat->c2s_pkts+stat->s2c_pkts >= 3 && session_get_type(sess)== SESSION_TYPE_UDP) + { + stellar_session_plugin_dettach_current_session(sess); + return; + } + size_t payload_len = 0; session_get0_current_payload(sess, &payload_len); - int dir = session_get_direction(sess); - if (dir==SESSION_DIRECTION_IN) + int dir = packet_get_direction(pkt); + if (dir==PACKET_DIRECTION_C2S) { - mg_stat->c2s_bytes += payload_len; - mg_stat->c2s_pkts += 1; + if(topic_id==env->tcp_stream_topic_id) + { + stat->c2s_stream_bytes += payload_len; + stat->c2s_stream_pkts += 1; + } + else + { + stat->c2s_bytes += payload_len; + stat->c2s_pkts += 1; + } } - if (dir==SESSION_DIRECTION_OUT) + if (dir==PACKET_DIRECTION_S2C) { - mg_stat->s2c_bytes += payload_len; - mg_stat->s2c_pkts += 1; + if(topic_id==env->tcp_stream_topic_id) + { + stat->s2c_stream_bytes += payload_len; + stat->s2c_stream_pkts += 1; + } + else + { + stat->s2c_bytes += payload_len; + stat->s2c_pkts += 1; + } } + session_mq_publish_message(sess, ((struct simple_stellar_plugin_env *)plugin_env)->stat_topic_id, stat); } - if (mg_stat != NULL && (events & SESS_EV_CLOSING)) - { - print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id); - session_event_free(mg_stat->ud_ev); - } - return 0; + return; } -int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg) -{ - if(cb_arg == NULL)return -1; - struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg; - struct session_event *i_ev=session_get_intrinsic_event(sess, plugin_ctx->plugin_id); - session_event_assign(i_ev, plugin_ctx->st, sess, 0, simple_mq_plugin_entry, plugin_ctx); - printf("%s(plug:%d)session_event_assign-----------%20s: \n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); - return 0; -} -static void simple_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg) -{ - if(ex_ptr) - { - FREE(ex_ptr); - } - return; -} void *simple_stellar_event_plugin_init(struct stellar *st) { - struct simple_stellar_plugin_ctx *ctx = CALLOC(struct simple_stellar_plugin_ctx, 1); - ctx->st = st; - ctx->exdata_idx = stellar_session_get_ex_new_index(st, "SIMPLE_EVENT_PLUGIN", simple_exdata_free, NULL); - int plugin_id=stellar_plugin_register(st, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_event_plugin_entry, ctx); - if(plugin_id >= 0) + struct simple_stellar_plugin_env *env = CALLOC(struct simple_stellar_plugin_env, 1); + env->st = st; + env->exdata_idx = stellar_session_exdata_new_index(st, "EXDATA_SESSION_STAT", NULL, NULL); + env->plugin_id = stellar_session_plugin_register(st, + simple_session_plugin_ctx_new, + simple_session_plugin_ctx_free, + env); + + env->tcp_stream_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM); + env->tcp_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP); + env->udp_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_UDP); + if(env->tcp_topic_id < 0 || env->udp_topic_id < 0 || env->tcp_stream_topic_id < 0) { - ctx->plugin_id=plugin_id; + perror("get tcp or udp topic id failed\n"); + exit(-1); } - return ctx; + + stellar_session_mq_subscribe(st, env->tcp_stream_topic_id, simple_event_plugin_entry, env->plugin_id); + stellar_session_mq_subscribe(st, env->tcp_topic_id, simple_event_plugin_entry, env->plugin_id); + stellar_session_mq_subscribe(st, env->udp_topic_id, simple_event_plugin_entry, env->plugin_id); + + int stat_topic_id=stellar_session_mq_get_topic_id(st, "TOPIC_SESSION_STAT"); + if(stat_topic_id < 0) + { + stat_topic_id=stellar_session_mq_create_topic(st, "TOPIC_SESSION_STAT", NULL, NULL); + } + env->stat_topic_id = stat_topic_id; + return env; } -void simple_stellar_event_plugin_exit(void *ctx) +void simple_stellar_event_plugin_exit(void *plugin_env) { - if(ctx)FREE(ctx); + if(plugin_env)FREE(plugin_env); return; } +// TODO: add packet entry +// TODO: add polling entry + +/******************************* + * mq plugin * + *******************************/ + +static void session_mq_plugin_sub_fn(struct session *sess, int topic_id, const void *data, void *plugin_ctx, void *plugin_env) +{ + + struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env; + if(topic_id == env->egress_topic_id) + { + session_mq_ignore_message(sess, topic_id, env->plugin_id); + } + if (topic_id == env->stat_topic_id) + { + struct mq_message_stat *stat = (struct mq_message_stat *)data; + struct mq_message_stat *exdata_stat = (struct mq_message_stat *)session_exdata_get(sess, env->exdata_idx); + if (memcmp(exdata_stat, stat, sizeof(struct mq_message_stat)) != 0) + { + perror("exdata and mq data not equal\n"); + exit(-1); + } + // print_session_stat(sess, stat, env->plugin_id, __FUNCTION__); + session_mq_unignore_message(sess, env->egress_topic_id, env->plugin_id); + } + return; +} + void *simple_stellar_mq_plugin_init(struct stellar *st) { - struct simple_stellar_plugin_ctx *ctx = CALLOC(struct simple_stellar_plugin_ctx, 1); - ctx->st = st; - ctx->exdata_idx = stellar_session_get_ex_new_index(st, "SIMPLE_MQ_PLUGIN", simple_exdata_free, NULL); - int topic_id=session_mq_get_topic_id(st, "SIMPLE_MQ_TOPIC"); + struct simple_stellar_plugin_env *env = CALLOC(struct simple_stellar_plugin_env, 1); + env->st = st; + env->exdata_idx = stellar_session_exdata_new_index(st, "EXDATA_SESSION_STAT", NULL, NULL); + int topic_id=stellar_session_mq_get_topic_id(st, "TOPIC_SESSION_STAT"); if(topic_id < 0) { - topic_id=session_mq_create_topic(st, "SIMPLE_MQ_TOPIC", NULL, NULL); + topic_id=stellar_session_mq_create_topic(st, "TOPIC_SESSION_STAT", NULL, NULL); } - session_mq_subscribe_topic(st, topic_id, session_mq_plugin_sub_fn, ctx); - int plugin_id=stellar_plugin_register(st, 0, simple_mq_plugin_entry, ctx); - if(plugin_id >= 0) + env->stat_topic_id = topic_id; + env->plugin_id = stellar_session_plugin_register(st, + NULL, + NULL, + env); + stellar_session_mq_subscribe(st, topic_id, session_mq_plugin_sub_fn, env->plugin_id); + + // TEST: subscirbe egress message then ignore + env->egress_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_EGRESS); + if(env->egress_topic_id < 0) { - ctx->plugin_id=plugin_id; + perror("get egress topic id failed\n"); + exit(-1); } - return ctx; + stellar_session_mq_subscribe(st, env->egress_topic_id, session_mq_plugin_sub_fn, env->plugin_id); + return env; } -void simple_stellar_mq_plugin_exit(void *ctx) +void simple_stellar_mq_plugin_exit(void *plugin_env) { - if(ctx)FREE(ctx); + if(plugin_env)FREE(plugin_env); return; }
\ No newline at end of file diff --git a/examples/stellar_plugin/simple_plugin.toml b/examples/stellar_plugin/spec.toml index ce08b65..ce08b65 100644 --- a/examples/stellar_plugin/simple_plugin.toml +++ b/examples/stellar_plugin/spec.toml |
