summaryrefslogtreecommitdiff
path: root/examples/stellar_plugin
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-02-02 10:02:11 +0800
committeryangwei <[email protected]>2024-02-28 19:14:21 +0800
commitdee44c13354d1c3d7c0e5e1828f5175e55790f3a (patch)
treebc0adf5b95904a7ded7da7a1a5134b32feadc4df /examples/stellar_plugin
parent9649bafbd6eef60541ab1a1e8e50e6d4312981c4 (diff)
🦄 refactor(API implementation): Refactor API to use new headers
Diffstat (limited to 'examples/stellar_plugin')
-rw-r--r--examples/stellar_plugin/simple_stellar_plugin.c258
-rw-r--r--examples/stellar_plugin/spec.toml (renamed from examples/stellar_plugin/simple_plugin.toml)0
2 files changed, 158 insertions, 100 deletions
diff --git a/examples/stellar_plugin/simple_stellar_plugin.c b/examples/stellar_plugin/simple_stellar_plugin.c
index 50f38f0..b4b9cc3 100644
--- a/examples/stellar_plugin/simple_stellar_plugin.c
+++ b/examples/stellar_plugin/simple_stellar_plugin.c
@@ -1,15 +1,22 @@
#include "stellar/stellar.h"
+#include "stellar/session.h"
#include "stellar/utils.h"
#include "stellar/session_exdata.h"
#include "stellar/session_mq.h"
#include <stdio.h>
+#include <string.h>
-struct simple_stellar_plugin_ctx
+struct simple_stellar_plugin_env
{
int plugin_id;
int exdata_idx;
struct stellar *st;
+ int stat_topic_id;
+ int egress_topic_id;
+ int tcp_topic_id;
+ int udp_topic_id;
+ int tcp_stream_topic_id;
};
struct mq_message_stat
@@ -18,150 +25,201 @@ struct mq_message_stat
uint32_t c2s_bytes;
uint32_t s2c_pkts;
uint32_t s2c_bytes;
- struct session_event* ud_ev;
+ uint32_t c2s_stream_pkts;
+ uint32_t c2s_stream_bytes;
+ uint32_t s2c_stream_pkts;
+ uint32_t s2c_stream_bytes;
};
-extern int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg);
-static int session_mq_plugin_sub_fn(struct session *sess, int topic_id, const void *data, void *cb_arg)
+static void print_session_stat(struct session *sess, struct mq_message_stat *stat, int plugin_id, const char *banner)
{
- struct mq_message_stat *ctx =(struct mq_message_stat *)data;
- struct simple_stellar_plugin_ctx *plugin_ctx = (struct simple_stellar_plugin_ctx *)cb_arg;
- printf("%s(topic:%d->plug:%d)-----------%20s: ", __FUNCTION__, topic_id, plugin_ctx->plugin_id, session_get0_readable_addr(sess));
- printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
- ctx->c2s_pkts, ctx->c2s_bytes,
- ctx->s2c_pkts, ctx->s2c_bytes);
- printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts);
- printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes);
-
- struct session_event *i_ev=session_get_intrinsic_event(sess, plugin_ctx->plugin_id);
- session_event_assign(i_ev, plugin_ctx->st, sess, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_mq_plugin_entry, plugin_ctx);
- printf("%s(plug:%d)session_event_assign-----------%20s\n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess));
-
- return 0;
+ if (stat)
+ {
+ printf("%s(plug:%d)-----------%20s: ", banner, plugin_id, session_get0_readable_addr(sess));
+ printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", stat->c2s_pkts, stat->c2s_bytes,
+ stat->s2c_pkts, stat->s2c_bytes);
+ printf("total-pkt=%u, ", stat->c2s_pkts + stat->s2c_pkts);
+ printf("total-count=%u\n", stat->c2s_bytes + stat->s2c_bytes);
+ if(stat->c2s_stream_pkts+stat->s2c_stream_pkts > 0)
+ {
+ printf("server-stream_pkt=%u, server-stream_count=%u, client-stream_pkt=%u, client-stream_count=%u, ", stat->c2s_stream_pkts, stat->c2s_stream_bytes,
+ stat->s2c_stream_pkts, stat->s2c_stream_bytes);
+ }
+ }
+ return;
}
-static void print_session_ctx(struct session *sess, struct mq_message_stat *ctx, int plugin_id)
+/*******************************
+ * session event plugin *
+ *******************************/
+
+static void *simple_session_plugin_ctx_new(struct session *sess, void *plugin_env)
{
- printf("%s(plug:%d)-----------%20s: ", __FUNCTION__, plugin_id, session_get0_readable_addr(sess));
- printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
- ctx->c2s_pkts, ctx->c2s_bytes,
- ctx->s2c_pkts, ctx->s2c_bytes);
- printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts);
- printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes);
- return;
+ struct mq_message_stat * stat= CALLOC(struct mq_message_stat, 1);
+ struct simple_stellar_plugin_env *env=(struct simple_stellar_plugin_env *)plugin_env;
+ session_exdata_set(sess, env->exdata_idx, stat);
+ return stat;
}
-int simple_event_plugin_user_defined_ev(struct session *sess, int events, const struct packet *pkt, void *cb_arg)
+static void simple_session_plugin_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
{
- if(cb_arg == NULL)return -1;
- struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg;
- struct mq_message_stat *mg_stat = (struct mq_message_stat *)session_get_ex_data(sess, plugin_ctx->exdata_idx);
- printf("%s(plug:%d)trigger-----------%20s\n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess));
- print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id);
- return 0;
+ struct simple_stellar_plugin_env *env=(struct simple_stellar_plugin_env *)plugin_env;
+ struct mq_message_stat *stat = (struct mq_message_stat *)session_ctx;
+ print_session_stat(sess, stat, env->plugin_id, __FUNCTION__);
+ session_exdata_set(sess, env->exdata_idx, NULL);
+ if(session_ctx)FREE(session_ctx);
+ return;
}
-int simple_event_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg)
+static void simple_event_plugin_entry(struct session *sess, int topic_id, const void *data, void *plugin_ctx, void *plugin_env)
{
- if(cb_arg == NULL)return -1;
- struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg;
- struct mq_message_stat *mg_stat = (struct mq_message_stat *)session_get_ex_data(sess, plugin_ctx->exdata_idx);
- if (mg_stat == NULL)
- {
- mg_stat = CALLOC(struct mq_message_stat, 1);
- session_set_ex_data(sess, plugin_ctx->exdata_idx, mg_stat);
- }
-
- if ((events & SESS_EV_OPENING))
- {
- mg_stat->ud_ev=session_event_new(plugin_ctx->st, sess, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_event_plugin_user_defined_ev, cb_arg);
- session_event_add(mg_stat->ud_ev, NULL);
- }
-
+ struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env;
+ struct mq_message_stat *stat = (struct mq_message_stat *)plugin_ctx;
+ struct packet * pkt=(struct packet *)data;
if (pkt)
{
+ // TEST: try stellar_session_plugin_dettach_current_session when pkt > 3
+ if(stat->c2s_pkts+stat->s2c_pkts >= 3 && session_get_type(sess)== SESSION_TYPE_UDP)
+ {
+ stellar_session_plugin_dettach_current_session(sess);
+ return;
+ }
+
size_t payload_len = 0;
session_get0_current_payload(sess, &payload_len);
- int dir = session_get_direction(sess);
- if (dir==SESSION_DIRECTION_IN)
+ int dir = packet_get_direction(pkt);
+ if (dir==PACKET_DIRECTION_C2S)
{
- mg_stat->c2s_bytes += payload_len;
- mg_stat->c2s_pkts += 1;
+ if(topic_id==env->tcp_stream_topic_id)
+ {
+ stat->c2s_stream_bytes += payload_len;
+ stat->c2s_stream_pkts += 1;
+ }
+ else
+ {
+ stat->c2s_bytes += payload_len;
+ stat->c2s_pkts += 1;
+ }
}
- if (dir==SESSION_DIRECTION_OUT)
+ if (dir==PACKET_DIRECTION_S2C)
{
- mg_stat->s2c_bytes += payload_len;
- mg_stat->s2c_pkts += 1;
+ if(topic_id==env->tcp_stream_topic_id)
+ {
+ stat->s2c_stream_bytes += payload_len;
+ stat->s2c_stream_pkts += 1;
+ }
+ else
+ {
+ stat->s2c_bytes += payload_len;
+ stat->s2c_pkts += 1;
+ }
}
+ session_mq_publish_message(sess, ((struct simple_stellar_plugin_env *)plugin_env)->stat_topic_id, stat);
}
- if (mg_stat != NULL && (events & SESS_EV_CLOSING))
- {
- print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id);
- session_event_free(mg_stat->ud_ev);
- }
- return 0;
+ return;
}
-int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg)
-{
- if(cb_arg == NULL)return -1;
- struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg;
- struct session_event *i_ev=session_get_intrinsic_event(sess, plugin_ctx->plugin_id);
- session_event_assign(i_ev, plugin_ctx->st, sess, 0, simple_mq_plugin_entry, plugin_ctx);
- printf("%s(plug:%d)session_event_assign-----------%20s: \n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess));
- return 0;
-}
-static void simple_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg)
-{
- if(ex_ptr)
- {
- FREE(ex_ptr);
- }
- return;
-}
void *simple_stellar_event_plugin_init(struct stellar *st)
{
- struct simple_stellar_plugin_ctx *ctx = CALLOC(struct simple_stellar_plugin_ctx, 1);
- ctx->st = st;
- ctx->exdata_idx = stellar_session_get_ex_new_index(st, "SIMPLE_EVENT_PLUGIN", simple_exdata_free, NULL);
- int plugin_id=stellar_plugin_register(st, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_event_plugin_entry, ctx);
- if(plugin_id >= 0)
+ struct simple_stellar_plugin_env *env = CALLOC(struct simple_stellar_plugin_env, 1);
+ env->st = st;
+ env->exdata_idx = stellar_session_exdata_new_index(st, "EXDATA_SESSION_STAT", NULL, NULL);
+ env->plugin_id = stellar_session_plugin_register(st,
+ simple_session_plugin_ctx_new,
+ simple_session_plugin_ctx_free,
+ env);
+
+ env->tcp_stream_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM);
+ env->tcp_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP);
+ env->udp_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_UDP);
+ if(env->tcp_topic_id < 0 || env->udp_topic_id < 0 || env->tcp_stream_topic_id < 0)
{
- ctx->plugin_id=plugin_id;
+ perror("get tcp or udp topic id failed\n");
+ exit(-1);
}
- return ctx;
+
+ stellar_session_mq_subscribe(st, env->tcp_stream_topic_id, simple_event_plugin_entry, env->plugin_id);
+ stellar_session_mq_subscribe(st, env->tcp_topic_id, simple_event_plugin_entry, env->plugin_id);
+ stellar_session_mq_subscribe(st, env->udp_topic_id, simple_event_plugin_entry, env->plugin_id);
+
+ int stat_topic_id=stellar_session_mq_get_topic_id(st, "TOPIC_SESSION_STAT");
+ if(stat_topic_id < 0)
+ {
+ stat_topic_id=stellar_session_mq_create_topic(st, "TOPIC_SESSION_STAT", NULL, NULL);
+ }
+ env->stat_topic_id = stat_topic_id;
+ return env;
}
-void simple_stellar_event_plugin_exit(void *ctx)
+void simple_stellar_event_plugin_exit(void *plugin_env)
{
- if(ctx)FREE(ctx);
+ if(plugin_env)FREE(plugin_env);
return;
}
+// TODO: add packet entry
+// TODO: add polling entry
+
+/*******************************
+ * mq plugin *
+ *******************************/
+
+static void session_mq_plugin_sub_fn(struct session *sess, int topic_id, const void *data, void *plugin_ctx, void *plugin_env)
+{
+
+ struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env;
+ if(topic_id == env->egress_topic_id)
+ {
+ session_mq_ignore_message(sess, topic_id, env->plugin_id);
+ }
+ if (topic_id == env->stat_topic_id)
+ {
+ struct mq_message_stat *stat = (struct mq_message_stat *)data;
+ struct mq_message_stat *exdata_stat = (struct mq_message_stat *)session_exdata_get(sess, env->exdata_idx);
+ if (memcmp(exdata_stat, stat, sizeof(struct mq_message_stat)) != 0)
+ {
+ perror("exdata and mq data not equal\n");
+ exit(-1);
+ }
+ // print_session_stat(sess, stat, env->plugin_id, __FUNCTION__);
+ session_mq_unignore_message(sess, env->egress_topic_id, env->plugin_id);
+ }
+ return;
+}
+
void *simple_stellar_mq_plugin_init(struct stellar *st)
{
- struct simple_stellar_plugin_ctx *ctx = CALLOC(struct simple_stellar_plugin_ctx, 1);
- ctx->st = st;
- ctx->exdata_idx = stellar_session_get_ex_new_index(st, "SIMPLE_MQ_PLUGIN", simple_exdata_free, NULL);
- int topic_id=session_mq_get_topic_id(st, "SIMPLE_MQ_TOPIC");
+ struct simple_stellar_plugin_env *env = CALLOC(struct simple_stellar_plugin_env, 1);
+ env->st = st;
+ env->exdata_idx = stellar_session_exdata_new_index(st, "EXDATA_SESSION_STAT", NULL, NULL);
+ int topic_id=stellar_session_mq_get_topic_id(st, "TOPIC_SESSION_STAT");
if(topic_id < 0)
{
- topic_id=session_mq_create_topic(st, "SIMPLE_MQ_TOPIC", NULL, NULL);
+ topic_id=stellar_session_mq_create_topic(st, "TOPIC_SESSION_STAT", NULL, NULL);
}
- session_mq_subscribe_topic(st, topic_id, session_mq_plugin_sub_fn, ctx);
- int plugin_id=stellar_plugin_register(st, 0, simple_mq_plugin_entry, ctx);
- if(plugin_id >= 0)
+ env->stat_topic_id = topic_id;
+ env->plugin_id = stellar_session_plugin_register(st,
+ NULL,
+ NULL,
+ env);
+ stellar_session_mq_subscribe(st, topic_id, session_mq_plugin_sub_fn, env->plugin_id);
+
+ // TEST: subscirbe egress message then ignore
+ env->egress_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_EGRESS);
+ if(env->egress_topic_id < 0)
{
- ctx->plugin_id=plugin_id;
+ perror("get egress topic id failed\n");
+ exit(-1);
}
- return ctx;
+ stellar_session_mq_subscribe(st, env->egress_topic_id, session_mq_plugin_sub_fn, env->plugin_id);
+ return env;
}
-void simple_stellar_mq_plugin_exit(void *ctx)
+void simple_stellar_mq_plugin_exit(void *plugin_env)
{
- if(ctx)FREE(ctx);
+ if(plugin_env)FREE(plugin_env);
return;
} \ No newline at end of file
diff --git a/examples/stellar_plugin/simple_plugin.toml b/examples/stellar_plugin/spec.toml
index ce08b65..ce08b65 100644
--- a/examples/stellar_plugin/simple_plugin.toml
+++ b/examples/stellar_plugin/spec.toml