summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authoryangwei <[email protected]>2023-10-18 16:15:26 +0800
committeryangwei <[email protected]>2023-10-18 16:15:26 +0800
commit0df496905b6320b44f34cb45ee7ea6e3912e0a4c (patch)
tree913422eb2e6d7ca9f2b3cac4c38f87ca2874abf8 /examples
parent25814c801c0b971799969f016aab73535a64be2a (diff)
✨ feat(session set in bridge): Stored Session in bridge for adatpter
Diffstat (limited to 'examples')
-rw-r--r--examples/sapp_plugin/publisher_loader.c2
-rw-r--r--examples/sapp_plugin/simple_loader.c58
2 files changed, 52 insertions, 8 deletions
diff --git a/examples/sapp_plugin/publisher_loader.c b/examples/sapp_plugin/publisher_loader.c
index 8dc2d34..18fc130 100644
--- a/examples/sapp_plugin/publisher_loader.c
+++ b/examples/sapp_plugin/publisher_loader.c
@@ -36,7 +36,7 @@ static int session_mq_loader_read(struct session *sess, int topic_id, const void
return 0;
}
-void *g_stellar=NULL;
+struct stellar *g_stellar=NULL;
int g_topic_id=-1;
int PUBLISHER_LOADER_INIT()
{
diff --git a/examples/sapp_plugin/simple_loader.c b/examples/sapp_plugin/simple_loader.c
index f5eab17..b84924c 100644
--- a/examples/sapp_plugin/simple_loader.c
+++ b/examples/sapp_plugin/simple_loader.c
@@ -7,6 +7,7 @@
#include <MESA/stream.h>
#include <stdio.h>
+#include <stream_inc/stream_base.h>
struct simple_stream_ctx
@@ -19,23 +20,59 @@ struct simple_stream_ctx
};
#define STELLAR_PLUGIN_PATH "./stellar_plugin/spec.toml"
+#define STELLAR_BRIDEGE_NAME "STELLAR_SESSION"
-void *g_stellar=NULL;
-int g_topic_id=-1;
+
+static void __loader_bridge_free_cb(const struct streaminfo *stream, int bridge_id, void *data)
+{
+ adapter_session_close((struct streaminfo *)stream, (struct session *)data, NULL);
+ return;
+}
+
+
+struct stellar *g_stellar=NULL;
+int g_session_bridge_id=-1;
int SIMPLE_LOADER_INIT()
{
g_stellar = stellar_init(STELLAR_PLUGIN_PATH);
if(g_stellar==NULL)return -1;
+ g_session_bridge_id=stream_bridge_build(STELLAR_BRIDEGE_NAME, "w");
+ if(g_session_bridge_id >= 0)// defer close session when streaminfo close
+ {
+ stream_bridge_register_data_free_cb(g_session_bridge_id, __loader_bridge_free_cb);
+ }
return 0;
}
void SIMPLE_LOADER_EXIT(void)
{
-
stellar_exit(g_stellar);
return;
}
+int DEFER_LOADER_INIT()
+{
+ return 0;
+}
+
+char defer_loader_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
+{
+ if(g_session_bridge_id >= 0)
+ {
+ struct session *sess = (struct session *)stream_bridge_async_data_get(pstream, g_session_bridge_id);
+ adapter_session_poll(sess);
+ return APP_STATE_GIVEME;
+ }
+ else
+ {
+ return APP_STATE_DROPME;
+ }
+}
+
+void DEFER_LOADER_EXIT(void)
+{
+ return;
+}
static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet)
{
@@ -61,15 +98,22 @@ static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state
{
case OP_STATE_PENDING:
ctx->sess=adapter_session_open(g_stellar, pstream, a_packet);
+ if(g_session_bridge_id>=0)
+ {
+ stream_bridge_async_data_put(pstream, g_session_bridge_id, ctx->sess);
+ }
break;
case OP_STATE_DATA:
adapter_session_active(pstream, ctx->sess, a_packet);
break;
case OP_STATE_CLOSE:
- adapter_session_close(pstream, ctx->sess, a_packet);
- FREE(*pme);
- break;
- default:
+ if (g_session_bridge_id < 0) // if no bridge, close session immediately
+ {
+ adapter_session_close(pstream, ctx->sess, a_packet);
+ }
+ FREE(*pme);
+ break;
+ default:
break;
}
return;