summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author郑超 <[email protected]>2023-08-11 04:48:52 +0000
committer郑超 <[email protected]>2023-08-11 04:48:52 +0000
commit9bcaeac796561c12e1e55d9897e856e600d553ff (patch)
treeecee8d7262ccabee7c8a8353ee1df7553b1aafcb
parent23c458661fbb90b464406497b75086925c2c9fe3 (diff)
Refactor naming convention
-rw-r--r--sdk/include/plugin_spec.h2
-rw-r--r--sdk/include/session.h47
-rw-r--r--src/adapter/adapter.c6
-rw-r--r--src/adapter/session_manager.c16
-rw-r--r--src/adapter/test/test_loader.c12
-rw-r--r--src/adapter/test/test_plugin.c12
6 files changed, 54 insertions, 41 deletions
diff --git a/sdk/include/plugin_spec.h b/sdk/include/plugin_spec.h
index 0716677..6bf4f7e 100644
--- a/sdk/include/plugin_spec.h
+++ b/sdk/include/plugin_spec.h
@@ -2,7 +2,7 @@
#include "session.h"
-typedef int plugin_init_callback(int plugin_id);
+typedef int plugin_init_callback(plugin_id_t plugin_id);
typedef void plugin_exit_callback(void);
typedef void plugin_entry_callback(struct session *session, enum session_state state, int thread_id);
diff --git a/sdk/include/session.h b/sdk/include/session.h
index af66642..b1888cc 100644
--- a/sdk/include/session.h
+++ b/sdk/include/session.h
@@ -26,27 +26,40 @@ enum session_state
};
struct session;
-uint8_t session_direction_get(struct session *sess); // tcp or udp
-uint8_t session_current_direction_get(struct session *sess); // tcp or udp
-const char* session_printaddr(struct session *sess);
-const char *session_payload_get(struct session *sess, int *payload_len);
+uint8_t session_get_flow_flag(const struct session *sess); // tcp or udp
+uint8_t session_current_direction_get(const struct session *sess); // tcp or udp
+const char *session_get_readable_addr(struct session *sess);
+const char *session_get_payload(struct session *sess, int *payload_len);
struct packet;
-const struct packet *session_packet_get(struct session *sess);
-
+const struct packet *session_get_current_packet(struct session *sess);
+typedef plugin_id_t int;
//session mq
-typedef void session_mq_free_callback(struct session *sess, const char* topic_name, void *data, void *cb_arg);
-typedef int session_mq_read_callback(struct session *sess, const char* topic_name, void *data, void *cb_arg);
+typedef void msg_free_cb_func(void *msg, void *cb_arg);
+typedef int msg_consume_cb_func(struct session *sess, const char *topic_name, void *msg, void *cb_arg);
+
+int session_mq_create_topic(const char *topic_name, msg_free_cb_func *free_cb, void *cb_arg);
+int session_mq_destroy_topic(const char *topic_name);
+int session_produce_message(struct session *sess, const char *topic_name, void *msg);
+int session_ignore_message(struct session *sess, const char *topic_name, int consumer_id);
+
+//return consumer_id >=0 if success, otherwise return -1.
+int session_mq_subscribe_topic(const char *topic_name, msg_consume_cb_func *read_cb, void *cb_arg, plugin_id_t self);
+int session_mq_unsubscribe_topic(const char *topic_name, msg_consume_cb_func *read_cb, void *cb_arg, plugin_id_t self);
+
+
+typedef void sess_ex_free(struct session *sess, int idx, void *ex_ptr, void *arg);
+int session_get_ex_new_index(void *arg, const char *name, sess_ex_free *free_func);
+int session_set_ex_data(struct session *sess, int idx, void *ex_ptr);
+void *session_get_ex_data(struct session *sess, int idx);
+
+void session_dettach_plugin(struct session *sess, plugin_id_t self);
+void session_attach_plugin(struct session *sess, plugin_id_t self);
-int session_mq_topic_create(const char *topic_name, session_mq_free_callback *free_cb, void *cb_arg);
-int session_mq_topic_destroy(const char *topic_name);
-int session_mq_send_msg(struct session *sess, const char *topic, void *data);
-int session_mq_topic_subscribe(const char *topic_name, session_mq_read_callback *read_cb, void *cb_arg);
-int session_mq_topic_unsubscribe(const char *topic_name, session_mq_read_callback *read_cb, void *cb_arg);
//session plugin
-void *session_get_plugin_ctx(struct session *session, int plugin_id);
-void session_set_plugin_ctx(struct session *session, int plugin_id, void *ctx);
-void plugin_dettach_session(struct session *session, int plugin_id);
-void plugin_attach_session(struct session *session, int plugin_id);
+void *session_get_plugin_ctx(struct session *sess, int plugin_id);
+void session_set_plugin_ctx(struct session *sess, int plugin_id, void *ctx);
+void plugin_dettach_session(struct session *sess, int plugin_id);
+void plugin_attach_session(struct session *sess, int plugin_id);
diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c
index 9e91e33..1875d82 100644
--- a/src/adapter/adapter.c
+++ b/src/adapter/adapter.c
@@ -132,18 +132,18 @@ uint8_t session_current_direction_get(struct session *session)
return 0;
}
-const char* session_printaddr(struct session *session)
+const char* session_get_readable_addr(struct session *session)
{
return printaddr(&session->stream->addr, session->stream->threadnum);
}
-const char *session_payload_get(struct session *session, int *len)
+const char *session_get_payload(struct session *session, int *len)
{
*len = session->stream->ptcpdetail->datalen;
return session->stream->ptcpdetail->pdata;
}
-const struct packet *session_packet_get(struct session *sess)
+const struct packet *session_get_current_packet(struct session *sess)
{
return get_rawpkt_from_streaminfo(sess->stream);
} \ No newline at end of file
diff --git a/src/adapter/session_manager.c b/src/adapter/session_manager.c
index 051ab1e..d66c63a 100644
--- a/src/adapter/session_manager.c
+++ b/src/adapter/session_manager.c
@@ -20,7 +20,7 @@ struct session_mq
struct subscriber
{
- session_mq_read_callback *read_cb;
+ msg_consume_cb_func *read_cb;
void *cb_arg;
STAILQ_ENTRY(subscriber) entries;
};
@@ -28,7 +28,7 @@ struct subscriber
struct session_mq_topic_schema
{
char *topic_name;
- session_mq_free_callback *free_cb;
+ msg_free_cb_func *free_cb;
void *cb_arg;
STAILQ_HEAD(sub_q_head, subscriber) subscribers;
UT_hash_handle hh;
@@ -111,7 +111,7 @@ void session_mq_loop(struct session_mq *mq, struct session *session)
return;
};
-int session_mq_topic_create(const char *topic_name, session_mq_free_callback *free_cb, void *cb_arg)
+int session_mq_create_topic(const char *topic_name, msg_free_cb_func *free_cb, void *cb_arg)
{
struct session_mq_topic_schema *topic;
// 检查topic_name是否已存在
@@ -141,7 +141,7 @@ int session_mq_topic_create(const char *topic_name, session_mq_free_callback *fr
return 0; // 成功
}
-int session_mq_topic_destroy(const char *topic_name) {
+int session_mq_destroy_topic(const char *topic_name) {
struct session_mq_topic_schema *topic;
// 在哈希表中查找指定的主题名
@@ -167,7 +167,7 @@ int session_mq_topic_destroy(const char *topic_name) {
}
-int session_mq_send_msg(struct session *session, const char *topic_name, void *data)
+int session_send_message(struct session *session, const char *topic_name, void *data)
{
struct session_mq_topic_schema *topic;
HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
@@ -183,10 +183,10 @@ int session_mq_send_msg(struct session *session, const char *topic_name, void *d
return 0;
}
-int session_mq_topic_subscribe(const char *topic_name, session_mq_read_callback *read_cb, void *cb_arg)
+int session_mq_subscribe_topic(const char *topic_name, msg_consume_cb_func *read_cb, void *cb_arg)
{
struct session_mq_topic_schema *topic;
- session_mq_topic_create(topic_name, NULL, NULL);
+ session_mq_create_topic(topic_name, NULL, NULL);
HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
if (topic == NULL)
{
@@ -199,7 +199,7 @@ int session_mq_topic_subscribe(const char *topic_name, session_mq_read_callback
return 0;
}
-int session_mq_topic_unsubscribe(const char *topic_name, session_mq_read_callback *read_cb, void *cb_arg)
+int session_mq_unsubscribe_topic(const char *topic_name, msg_consume_cb_func *read_cb, void *cb_arg)
{
struct session_mq_topic_schema *topic;
int unsubscribe_count = 0;
diff --git a/src/adapter/test/test_loader.c b/src/adapter/test/test_loader.c
index 58e0e4b..18f3222 100644
--- a/src/adapter/test/test_loader.c
+++ b/src/adapter/test/test_loader.c
@@ -45,7 +45,7 @@ static void test_mq_topic_free(struct session *sess, const char* topic_name, voi
int test_mq_loader_read(struct session *sess, const char* topic_name, void *data, void *cb_arg)
{
struct test_stream_ctx *ctx =(struct test_stream_ctx *)data;
- printf("loader_read_message(%s)-----------%20s", topic_name, session_printaddr(sess));
+ printf("loader_read_message(%s)-----------%20s", topic_name, session_get_readable_addr(sess));
printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
ctx->c2s_pkts, ctx->c2s_bytes,
ctx->s2c_pkts, ctx->s2c_bytes);
@@ -60,8 +60,8 @@ int TEST_INIT()
g_adapter = adapter_init(g_plugin_schema, 2);
if(g_adapter == NULL)return -1;
- session_mq_topic_create("test_mq_topic", test_mq_topic_free, NULL);
- session_mq_topic_subscribe("test_mq_topic", test_mq_loader_read, NULL);
+ session_mq_create_topic("test_mq_topic", test_mq_topic_free, NULL);
+ session_mq_subscribe_topic("test_mq_topic", test_mq_loader_read, NULL);
return 0;
}
@@ -69,8 +69,8 @@ void TEST_EXIT(void)
{
if(g_adapter)
{
- session_mq_topic_unsubscribe("test_mq_topic", test_mq_loader_read, NULL);
- session_mq_topic_destroy("test_mq_topic");
+ session_mq_unsubscribe_topic("test_mq_topic", test_mq_loader_read, NULL);
+ session_mq_destroy_topic("test_mq_topic");
adapter_exit(g_adapter);
g_adapter = NULL;
}
@@ -124,7 +124,7 @@ static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state
break;
case OP_STATE_DATA:
msg = stream_ctx_dup(*pme);
- if(session_mq_send_msg(ctx->session, "test_mq_topic", msg) < 0)
+ if(session_produce_message(ctx->session, "test_mq_topic", msg) < 0)
{
FREE(msg);
}
diff --git a/src/adapter/test/test_plugin.c b/src/adapter/test/test_plugin.c
index 075028f..1da43bc 100644
--- a/src/adapter/test/test_plugin.c
+++ b/src/adapter/test/test_plugin.c
@@ -29,7 +29,7 @@ void test_entry_plugin_exit(void)
int test_session_mq_plugin_read(struct session *sess, const char* topic_name, void *data, void *cb_arg)
{
struct test_session_message *ctx =(struct test_session_message *)data;
- printf("plugin_read_message(%s)-----------%20s", topic_name, session_printaddr(sess));
+ printf("plugin_read_message(%s)-----------%20s", topic_name, session_get_readable_addr(sess));
printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
ctx->c2s_pkts, ctx->c2s_bytes,
ctx->s2c_pkts, ctx->s2c_bytes);
@@ -41,12 +41,12 @@ int test_session_mq_plugin_read(struct session *sess, const char* topic_name, v
int test_mq_plugin_init(int plugin_id)
{
g_test_mq_plugin_id = plugin_id;
- session_mq_topic_subscribe("test_mq_topic", test_session_mq_plugin_read, NULL);
+ session_mq_subscribe_topic("test_mq_topic", test_session_mq_plugin_read, NULL);
return 0;
}
void test_mq_plugin_exit(void)
{
- session_mq_topic_unsubscribe("test_mq_topic", test_session_mq_plugin_read, NULL);
+ session_mq_unsubscribe_topic("test_mq_topic", test_session_mq_plugin_read, NULL);
return;
}
@@ -54,7 +54,7 @@ void test_mq_plugin_exit(void)
void print_session_ctx(struct session *session, struct test_session_message *ctx, int plugin_id)
{
- printf("session(%d)-----------%20s: ", plugin_id, session_printaddr(session));
+ printf("session(%d)-----------%20s: ", plugin_id, session_get_readable_addr(session));
printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
ctx->c2s_pkts, ctx->c2s_bytes,
ctx->s2c_pkts, ctx->s2c_bytes);
@@ -73,8 +73,8 @@ void test_entry_plugin_entry(struct session *session, enum session_state state,
}
int payload_len = 0;
int curdir = session_current_direction_get(session);
- session_payload_get(session, &payload_len);
- const struct packet *pkt = session_packet_get(session);
+ session_get_payload(session, &payload_len);
+ const struct packet *pkt = session_get_current_packet(session);
if (pkt)
{
if (curdir == SESSION_DIR_C2S)