diff options
| author | yangwei <[email protected]> | 2024-02-02 10:02:11 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-02-28 19:14:21 +0800 |
| commit | dee44c13354d1c3d7c0e5e1828f5175e55790f3a (patch) | |
| tree | bc0adf5b95904a7ded7da7a1a5134b32feadc4df /examples | |
| parent | 9649bafbd6eef60541ab1a1e8e50e6d4312981c4 (diff) | |
🦄 refactor(API implementation): Refactor API to use new headers
Diffstat (limited to 'examples')
| -rw-r--r-- | examples/sapp_plugin/CMakeLists.txt | 7 | ||||
| -rw-r--r-- | examples/sapp_plugin/publisher_loader.c | 148 | ||||
| -rw-r--r-- | examples/sapp_plugin/publisher_loader.inf | 13 | ||||
| -rw-r--r-- | examples/stellar_plugin/simple_stellar_plugin.c | 258 | ||||
| -rw-r--r-- | examples/stellar_plugin/spec.toml (renamed from examples/stellar_plugin/simple_plugin.toml) | 0 |
5 files changed, 158 insertions, 268 deletions
diff --git a/examples/sapp_plugin/CMakeLists.txt b/examples/sapp_plugin/CMakeLists.txt deleted file mode 100644 index d6dc162..0000000 --- a/examples/sapp_plugin/CMakeLists.txt +++ /dev/null @@ -1,7 +0,0 @@ -add_definitions(-fPIC) - -include_directories(${CMAKE_SOURCE_DIR}/src) - -add_library(publisher_loader SHARED publisher_loader.c) -target_link_libraries(publisher_loader adapter) -set_target_properties(publisher_loader PROPERTIES PREFIX "") diff --git a/examples/sapp_plugin/publisher_loader.c b/examples/sapp_plugin/publisher_loader.c deleted file mode 100644 index 4334f9a..0000000 --- a/examples/sapp_plugin/publisher_loader.c +++ /dev/null @@ -1,148 +0,0 @@ -#include "stellar/session.h" -#include "stellar/session_mq.h" -#include "stellar/utils.h" - -#include "adapter/adapter.h" -#include "adapter/session_manager.h" - -#include <MESA/stream.h> -#include <stdio.h> - - -struct simple_stream_ctx -{ - uint32_t c2s_pkts; - uint32_t c2s_bytes; - uint32_t s2c_pkts; - uint32_t s2c_bytes; - struct session *sess; -}; - -static void session_mq_topic_free(void *data, void *cb_arg) -{ - FREE(data); - return; -} - -struct stellar *g_stellar=NULL; -int g_topic_id=-1; -int g_sub_id=-1; - -static int session_mq_loader_read(struct session *sess, int topic_id, const void *data, void *cb_arg) -{ - struct simple_stream_ctx *ctx =(struct simple_stream_ctx *)data; - printf("loader_read_message(topic:%d)-----------%20s", topic_id, session_get0_readable_addr(sess)); - printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", - ctx->c2s_pkts, ctx->c2s_bytes, - ctx->s2c_pkts, ctx->s2c_bytes); - printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); - printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); - session_mq_ignore_message(sess, topic_id, g_sub_id); - return 0; -} - -int PUBLISHER_LOADER_INIT() -{ - g_stellar = stellar_init("./stellar_plugin/simple_plugin.toml"); - if(g_stellar==NULL)return -1; - int t_topic_id=session_mq_get_topic_id(g_stellar, "SIMPLE_MQ_TOPIC"); - if(t_topic_id >= 0) - { - session_mq_update_topic(g_stellar, t_topic_id, session_mq_topic_free, NULL); - g_topic_id=t_topic_id; - } - else - { - g_topic_id=session_mq_create_topic(g_stellar, "SIMPLE_MQ_TOPIC", session_mq_topic_free, NULL); - } - g_sub_id=session_mq_subscribe_topic(g_stellar , g_topic_id, session_mq_loader_read, NULL); - return 0; -} - -void PUBLISHER_LOADER_EXIT(void) -{ - - session_mq_destroy_topic(g_stellar, g_topic_id); - stellar_exit(g_stellar); - return; -} - - -static void print_stream_ctx(struct streaminfo *pstream, struct simple_stream_ctx *ctx) -{ - printf("stream-----------%20s: ", printaddr(&(pstream->addr), pstream->threadnum)); - printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", - ctx->c2s_pkts, ctx->c2s_bytes, - ctx->s2c_pkts, ctx->s2c_bytes); - printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); - printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); - return; -} - -static struct simple_stream_ctx *stream_ctx_dup(const struct simple_stream_ctx *origin) -{ - struct simple_stream_ctx *ctx=CALLOC(struct simple_stream_ctx,1); - memcpy(ctx, origin, sizeof(struct simple_stream_ctx)); - return ctx; -} - -static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet) -{ - struct simple_stream_ctx *ctx=(struct simple_stream_ctx *)*pme; - struct tcpdetail *pdetail=(struct tcpdetail *)pstream->pdetail; - unsigned char entry_ret=APP_STATE_GIVEME; - if(*pme==NULL) - { - *pme=CALLOC(struct simple_stream_ctx,1); - ctx=(struct simple_stream_ctx *)*pme; - } - - if(a_packet!=NULL) - { - if(DIR_C2S == pstream->curdir){ - ctx->c2s_bytes += pdetail->datalen; - ctx->c2s_pkts++; - }else{ - ctx->s2c_bytes += pdetail->datalen; - ctx->s2c_pkts++; - } - } - struct simple_stream_ctx *msg; - switch (state) - { - case OP_STATE_PENDING: - ctx->sess=adapter_session_new(g_stellar, pstream); - entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_OPENING); - adapter_session_poll(ctx->sess); - break; - case OP_STATE_DATA: - msg = stream_ctx_dup((const struct simple_stream_ctx *)*pme); - if(session_mq_publish_message(ctx->sess, g_topic_id, msg) < 0) - { - FREE(msg); - } - entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_ACTIVE); - adapter_session_poll(ctx->sess); - break; - case OP_STATE_CLOSE: - entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_CLOSING); - adapter_session_poll(ctx->sess); - adapter_session_free(ctx->sess); - print_stream_ctx(pstream, ctx); - FREE(*pme); - break; - default: - break; - } - return entry_ret; -} - -char publisher_loader_udp_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) -{ - return loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet); -} - -char publisher_loader_tcpall_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) -{ - return loader_transfer_stream_entry(pstream, pstream->pktstate, pme, thread_seq, a_packet); -} diff --git a/examples/sapp_plugin/publisher_loader.inf b/examples/sapp_plugin/publisher_loader.inf deleted file mode 100644 index 2e505d6..0000000 --- a/examples/sapp_plugin/publisher_loader.inf +++ /dev/null @@ -1,13 +0,0 @@ -[PLUGINFO] -PLUGNAME=publisher_loader -SO_PATH=./plug/business/publisher_loader/publisher_loader.so -INIT_FUNC=PUBLISHER_LOADER_INIT -DESTROY_FUNC=PUBLISHER_LOADER_EXIT - -[TCP_ALL] -FUNC_FLAG=ALL -FUNC_NAME=publisher_loader_tcpall_stream_entry - -[UDP] -FUNC_FLAG=ALL -FUNC_NAME=publisher_loader_udp_stream_entry
\ No newline at end of file diff --git a/examples/stellar_plugin/simple_stellar_plugin.c b/examples/stellar_plugin/simple_stellar_plugin.c index 50f38f0..b4b9cc3 100644 --- a/examples/stellar_plugin/simple_stellar_plugin.c +++ b/examples/stellar_plugin/simple_stellar_plugin.c @@ -1,15 +1,22 @@ #include "stellar/stellar.h" +#include "stellar/session.h" #include "stellar/utils.h" #include "stellar/session_exdata.h" #include "stellar/session_mq.h" #include <stdio.h> +#include <string.h> -struct simple_stellar_plugin_ctx +struct simple_stellar_plugin_env { int plugin_id; int exdata_idx; struct stellar *st; + int stat_topic_id; + int egress_topic_id; + int tcp_topic_id; + int udp_topic_id; + int tcp_stream_topic_id; }; struct mq_message_stat @@ -18,150 +25,201 @@ struct mq_message_stat uint32_t c2s_bytes; uint32_t s2c_pkts; uint32_t s2c_bytes; - struct session_event* ud_ev; + uint32_t c2s_stream_pkts; + uint32_t c2s_stream_bytes; + uint32_t s2c_stream_pkts; + uint32_t s2c_stream_bytes; }; -extern int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg); -static int session_mq_plugin_sub_fn(struct session *sess, int topic_id, const void *data, void *cb_arg) +static void print_session_stat(struct session *sess, struct mq_message_stat *stat, int plugin_id, const char *banner) { - struct mq_message_stat *ctx =(struct mq_message_stat *)data; - struct simple_stellar_plugin_ctx *plugin_ctx = (struct simple_stellar_plugin_ctx *)cb_arg; - printf("%s(topic:%d->plug:%d)-----------%20s: ", __FUNCTION__, topic_id, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); - printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", - ctx->c2s_pkts, ctx->c2s_bytes, - ctx->s2c_pkts, ctx->s2c_bytes); - printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); - printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); - - struct session_event *i_ev=session_get_intrinsic_event(sess, plugin_ctx->plugin_id); - session_event_assign(i_ev, plugin_ctx->st, sess, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_mq_plugin_entry, plugin_ctx); - printf("%s(plug:%d)session_event_assign-----------%20s\n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); - - return 0; + if (stat) + { + printf("%s(plug:%d)-----------%20s: ", banner, plugin_id, session_get0_readable_addr(sess)); + printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", stat->c2s_pkts, stat->c2s_bytes, + stat->s2c_pkts, stat->s2c_bytes); + printf("total-pkt=%u, ", stat->c2s_pkts + stat->s2c_pkts); + printf("total-count=%u\n", stat->c2s_bytes + stat->s2c_bytes); + if(stat->c2s_stream_pkts+stat->s2c_stream_pkts > 0) + { + printf("server-stream_pkt=%u, server-stream_count=%u, client-stream_pkt=%u, client-stream_count=%u, ", stat->c2s_stream_pkts, stat->c2s_stream_bytes, + stat->s2c_stream_pkts, stat->s2c_stream_bytes); + } + } + return; } -static void print_session_ctx(struct session *sess, struct mq_message_stat *ctx, int plugin_id) +/******************************* + * session event plugin * + *******************************/ + +static void *simple_session_plugin_ctx_new(struct session *sess, void *plugin_env) { - printf("%s(plug:%d)-----------%20s: ", __FUNCTION__, plugin_id, session_get0_readable_addr(sess)); - printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", - ctx->c2s_pkts, ctx->c2s_bytes, - ctx->s2c_pkts, ctx->s2c_bytes); - printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); - printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); - return; + struct mq_message_stat * stat= CALLOC(struct mq_message_stat, 1); + struct simple_stellar_plugin_env *env=(struct simple_stellar_plugin_env *)plugin_env; + session_exdata_set(sess, env->exdata_idx, stat); + return stat; } -int simple_event_plugin_user_defined_ev(struct session *sess, int events, const struct packet *pkt, void *cb_arg) +static void simple_session_plugin_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) { - if(cb_arg == NULL)return -1; - struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg; - struct mq_message_stat *mg_stat = (struct mq_message_stat *)session_get_ex_data(sess, plugin_ctx->exdata_idx); - printf("%s(plug:%d)trigger-----------%20s\n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); - print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id); - return 0; + struct simple_stellar_plugin_env *env=(struct simple_stellar_plugin_env *)plugin_env; + struct mq_message_stat *stat = (struct mq_message_stat *)session_ctx; + print_session_stat(sess, stat, env->plugin_id, __FUNCTION__); + session_exdata_set(sess, env->exdata_idx, NULL); + if(session_ctx)FREE(session_ctx); + return; } -int simple_event_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg) +static void simple_event_plugin_entry(struct session *sess, int topic_id, const void *data, void *plugin_ctx, void *plugin_env) { - if(cb_arg == NULL)return -1; - struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg; - struct mq_message_stat *mg_stat = (struct mq_message_stat *)session_get_ex_data(sess, plugin_ctx->exdata_idx); - if (mg_stat == NULL) - { - mg_stat = CALLOC(struct mq_message_stat, 1); - session_set_ex_data(sess, plugin_ctx->exdata_idx, mg_stat); - } - - if ((events & SESS_EV_OPENING)) - { - mg_stat->ud_ev=session_event_new(plugin_ctx->st, sess, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_event_plugin_user_defined_ev, cb_arg); - session_event_add(mg_stat->ud_ev, NULL); - } - + struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env; + struct mq_message_stat *stat = (struct mq_message_stat *)plugin_ctx; + struct packet * pkt=(struct packet *)data; if (pkt) { + // TEST: try stellar_session_plugin_dettach_current_session when pkt > 3 + if(stat->c2s_pkts+stat->s2c_pkts >= 3 && session_get_type(sess)== SESSION_TYPE_UDP) + { + stellar_session_plugin_dettach_current_session(sess); + return; + } + size_t payload_len = 0; session_get0_current_payload(sess, &payload_len); - int dir = session_get_direction(sess); - if (dir==SESSION_DIRECTION_IN) + int dir = packet_get_direction(pkt); + if (dir==PACKET_DIRECTION_C2S) { - mg_stat->c2s_bytes += payload_len; - mg_stat->c2s_pkts += 1; + if(topic_id==env->tcp_stream_topic_id) + { + stat->c2s_stream_bytes += payload_len; + stat->c2s_stream_pkts += 1; + } + else + { + stat->c2s_bytes += payload_len; + stat->c2s_pkts += 1; + } } - if (dir==SESSION_DIRECTION_OUT) + if (dir==PACKET_DIRECTION_S2C) { - mg_stat->s2c_bytes += payload_len; - mg_stat->s2c_pkts += 1; + if(topic_id==env->tcp_stream_topic_id) + { + stat->s2c_stream_bytes += payload_len; + stat->s2c_stream_pkts += 1; + } + else + { + stat->s2c_bytes += payload_len; + stat->s2c_pkts += 1; + } } + session_mq_publish_message(sess, ((struct simple_stellar_plugin_env *)plugin_env)->stat_topic_id, stat); } - if (mg_stat != NULL && (events & SESS_EV_CLOSING)) - { - print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id); - session_event_free(mg_stat->ud_ev); - } - return 0; + return; } -int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg) -{ - if(cb_arg == NULL)return -1; - struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg; - struct session_event *i_ev=session_get_intrinsic_event(sess, plugin_ctx->plugin_id); - session_event_assign(i_ev, plugin_ctx->st, sess, 0, simple_mq_plugin_entry, plugin_ctx); - printf("%s(plug:%d)session_event_assign-----------%20s: \n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); - return 0; -} -static void simple_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg) -{ - if(ex_ptr) - { - FREE(ex_ptr); - } - return; -} void *simple_stellar_event_plugin_init(struct stellar *st) { - struct simple_stellar_plugin_ctx *ctx = CALLOC(struct simple_stellar_plugin_ctx, 1); - ctx->st = st; - ctx->exdata_idx = stellar_session_get_ex_new_index(st, "SIMPLE_EVENT_PLUGIN", simple_exdata_free, NULL); - int plugin_id=stellar_plugin_register(st, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_event_plugin_entry, ctx); - if(plugin_id >= 0) + struct simple_stellar_plugin_env *env = CALLOC(struct simple_stellar_plugin_env, 1); + env->st = st; + env->exdata_idx = stellar_session_exdata_new_index(st, "EXDATA_SESSION_STAT", NULL, NULL); + env->plugin_id = stellar_session_plugin_register(st, + simple_session_plugin_ctx_new, + simple_session_plugin_ctx_free, + env); + + env->tcp_stream_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM); + env->tcp_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP); + env->udp_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_UDP); + if(env->tcp_topic_id < 0 || env->udp_topic_id < 0 || env->tcp_stream_topic_id < 0) { - ctx->plugin_id=plugin_id; + perror("get tcp or udp topic id failed\n"); + exit(-1); } - return ctx; + + stellar_session_mq_subscribe(st, env->tcp_stream_topic_id, simple_event_plugin_entry, env->plugin_id); + stellar_session_mq_subscribe(st, env->tcp_topic_id, simple_event_plugin_entry, env->plugin_id); + stellar_session_mq_subscribe(st, env->udp_topic_id, simple_event_plugin_entry, env->plugin_id); + + int stat_topic_id=stellar_session_mq_get_topic_id(st, "TOPIC_SESSION_STAT"); + if(stat_topic_id < 0) + { + stat_topic_id=stellar_session_mq_create_topic(st, "TOPIC_SESSION_STAT", NULL, NULL); + } + env->stat_topic_id = stat_topic_id; + return env; } -void simple_stellar_event_plugin_exit(void *ctx) +void simple_stellar_event_plugin_exit(void *plugin_env) { - if(ctx)FREE(ctx); + if(plugin_env)FREE(plugin_env); return; } +// TODO: add packet entry +// TODO: add polling entry + +/******************************* + * mq plugin * + *******************************/ + +static void session_mq_plugin_sub_fn(struct session *sess, int topic_id, const void *data, void *plugin_ctx, void *plugin_env) +{ + + struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env; + if(topic_id == env->egress_topic_id) + { + session_mq_ignore_message(sess, topic_id, env->plugin_id); + } + if (topic_id == env->stat_topic_id) + { + struct mq_message_stat *stat = (struct mq_message_stat *)data; + struct mq_message_stat *exdata_stat = (struct mq_message_stat *)session_exdata_get(sess, env->exdata_idx); + if (memcmp(exdata_stat, stat, sizeof(struct mq_message_stat)) != 0) + { + perror("exdata and mq data not equal\n"); + exit(-1); + } + // print_session_stat(sess, stat, env->plugin_id, __FUNCTION__); + session_mq_unignore_message(sess, env->egress_topic_id, env->plugin_id); + } + return; +} + void *simple_stellar_mq_plugin_init(struct stellar *st) { - struct simple_stellar_plugin_ctx *ctx = CALLOC(struct simple_stellar_plugin_ctx, 1); - ctx->st = st; - ctx->exdata_idx = stellar_session_get_ex_new_index(st, "SIMPLE_MQ_PLUGIN", simple_exdata_free, NULL); - int topic_id=session_mq_get_topic_id(st, "SIMPLE_MQ_TOPIC"); + struct simple_stellar_plugin_env *env = CALLOC(struct simple_stellar_plugin_env, 1); + env->st = st; + env->exdata_idx = stellar_session_exdata_new_index(st, "EXDATA_SESSION_STAT", NULL, NULL); + int topic_id=stellar_session_mq_get_topic_id(st, "TOPIC_SESSION_STAT"); if(topic_id < 0) { - topic_id=session_mq_create_topic(st, "SIMPLE_MQ_TOPIC", NULL, NULL); + topic_id=stellar_session_mq_create_topic(st, "TOPIC_SESSION_STAT", NULL, NULL); } - session_mq_subscribe_topic(st, topic_id, session_mq_plugin_sub_fn, ctx); - int plugin_id=stellar_plugin_register(st, 0, simple_mq_plugin_entry, ctx); - if(plugin_id >= 0) + env->stat_topic_id = topic_id; + env->plugin_id = stellar_session_plugin_register(st, + NULL, + NULL, + env); + stellar_session_mq_subscribe(st, topic_id, session_mq_plugin_sub_fn, env->plugin_id); + + // TEST: subscirbe egress message then ignore + env->egress_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_EGRESS); + if(env->egress_topic_id < 0) { - ctx->plugin_id=plugin_id; + perror("get egress topic id failed\n"); + exit(-1); } - return ctx; + stellar_session_mq_subscribe(st, env->egress_topic_id, session_mq_plugin_sub_fn, env->plugin_id); + return env; } -void simple_stellar_mq_plugin_exit(void *ctx) +void simple_stellar_mq_plugin_exit(void *plugin_env) { - if(ctx)FREE(ctx); + if(plugin_env)FREE(plugin_env); return; }
\ No newline at end of file diff --git a/examples/stellar_plugin/simple_plugin.toml b/examples/stellar_plugin/spec.toml index ce08b65..ce08b65 100644 --- a/examples/stellar_plugin/simple_plugin.toml +++ b/examples/stellar_plugin/spec.toml |
