summaryrefslogtreecommitdiff
path: root/src/lua_binding_function.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lua_binding_function.c')
-rw-r--r--src/lua_binding_function.c863
1 files changed, 347 insertions, 516 deletions
diff --git a/src/lua_binding_function.c b/src/lua_binding_function.c
index c308c41..f9ef35c 100644
--- a/src/lua_binding_function.c
+++ b/src/lua_binding_function.c
@@ -1,671 +1,502 @@
+#include "lua_binding_function.h"
+#include "lua_binding_cfunc.h"
+
+#include "stellar/mq.h"
+#include "stellar/module_manager.h"
+#include "stellar/packet.h"
+#include "stellar/packet_manager.h"
+#include "stellar/session.h"
+#include "stellar/session_manager.h"
+#include "stellar/utils.h"
+
#include <stdlib.h>
#include <string.h>
-
+#include <utlist.h>
#include <lua.h>
+#include <lualib.h>
#include <lauxlib.h>
-#include <utlist.h>
-#include "lua_binding_function.h"
+#define LUA_BINDING_FUNCTION_GET_VALUE_POINTER(func_name, param_type, value_type, value_func) \
+ int func_name(struct lua_state *state) \
+ { \
+ lua_State *L = (lua_State *)state; \
+ if (lua_gettop(L) != 1) \
+ { \
+ lua_settop(L, 0); \
+ return 0; \
+ } \
+ param_type *p = (param_type *)lua_topointer(L, -1); \
+ lua_settop(L, 0); \
+ value_type *l = (value_type *)value_func(p); \
+ lua_pushlightuserdata(L, (void *)l); \
+ return 1; \
+ }
-/* ***** ***** ***** ***** ***** ***** */
-int lua_get_worker_thread_num(struct lua_state *state)
-{
- lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 1)
- {
- lua_settop(L, 0);
- return 0;
+#define LUA_BINDING_FUNCTION_GET_VALUE_INT(func_name, param_type, value_func) \
+ int func_name(struct lua_state *state) \
+ { \
+ lua_State *L = (lua_State *)state; \
+ if (lua_gettop(L) != 1) \
+ { \
+ lua_settop(L, 0); \
+ return 0; \
+ } \
+ param_type *p = (param_type *)lua_topointer(L, -1); \
+ lua_settop(L, 0); \
+ int v = (int)value_func(p); \
+ lua_pushinteger(L, v); \
+ return 1; \
}
- struct stellar *st = (struct stellar *)lua_topointer(L, -1);
- lua_settop(L, 0);
- int thread_num = stellar_get_worker_thread_num(st);
- lua_pushinteger(L, thread_num);
- return 1;
-}
-int lua_get_current_thread_id(struct lua_state *state)
+int lua_mq_schema_get_topic_id(struct lua_state *state)
{
lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 1)
+ if (lua_gettop(L) != 2 || lua_type(L, -1) != LUA_TSTRING || lua_type(L, -2) != LUA_TLIGHTUSERDATA)
{
lua_settop(L, 0);
return 0;
}
- struct stellar *st = (struct stellar *)lua_topointer(L, -1);
- lua_settop(L, 0);
- int thread_id = stellar_get_current_thread_id(st);
- lua_pushinteger(L, thread_id);
- return 1;
-}
-int lua_get_stellar_pointer(struct lua_state *state)
-{
- lua_State *L = (lua_State *)state;
+ char *topic_name = strdup(lua_tostring(L, -1));
+ lua_pop(L, 1);
+ struct mq_schema *s = (struct mq_schema *)lua_topointer(L, -1);
lua_settop(L, 0);
- struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state);
- struct stellar *st = plugin_manage->st;
- lua_pushlightuserdata(L, (void *)st);
- return 1;
-}
-/*
-int lua_get_plugin_manage_pointer(struct lua_state *state)
-{
- lua_settop((lua_State *)state, 0);
- struct lua_plugin_manage * plugin_manage = lua_state_get_plugin_manage(state);
- lua_pushlightuserdata((lua_State *)state, (void *)plugin_manage);
+ int topic_id = mq_schema_get_topic_id(s, (const char *)topic_name);
+
+ if (topic_name)
+ FREE(topic_name);
+ lua_pushinteger(L, topic_id);
return 1;
}
-*/
-/* ***** ***** ***** ***** ***** ***** */
-int lua_session_plugin_regist(struct lua_state *state)
+int lua_mq_schema_create_topic(struct lua_state *state)
{
lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 4 || lua_type(L, -4) != LUA_TLIGHTUSERDATA)
+ if (lua_gettop(L) != 6 || lua_type(L, -5) != LUA_TSTRING || lua_type(L, -6) != LUA_TLIGHTUSERDATA)
{
lua_settop(L, 0);
return 0;
}
- /* stack -1 */
- int plugin_env_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
- if (plugin_env_ref_id == LUA_REFNIL)
- plugin_env_ref_id = 0;
-
- /* stack -2 */
- int ctx_free_fn_ref_id = 0;
+ int free_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ int free_cb_fn_ref_id = -1;
if (lua_type(L, -1) == LUA_TFUNCTION)
- ctx_free_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ free_cb_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
else
lua_pop(L, 1);
-
- /* stack -3 */
- int ctx_new_fn_ref_id = 0;
+ int on_dispatch_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ int on_dispatch_cb_fn_ref_id = -1;
if (lua_type(L, -1) == LUA_TFUNCTION)
- ctx_new_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ on_dispatch_cb_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
else
lua_pop(L, 1);
-
- /* stack -4 */
- struct stellar *st = (struct stellar *)lua_topointer(L, -1);
+ char *topic_name = strdup((char *)lua_tostring(L, -1));
+ lua_pop(L, 1);
+ struct mq_schema *s = (struct mq_schema *)lua_topointer(L, -1);
lua_settop(L, 0);
- struct lua_session_plugin_env *new_plugin_env = (struct lua_session_plugin_env *)calloc(1, sizeof(struct lua_session_plugin_env));
- memset(new_plugin_env, 0, sizeof(struct lua_session_plugin_env));
- new_plugin_env->lua_ctx_new_fn_ref_id = ctx_new_fn_ref_id;
- new_plugin_env->lua_ctx_free_fn_ref_id = ctx_free_fn_ref_id;
- new_plugin_env->lua_plug_env_ref_id = plugin_env_ref_id;
-
- int plugin_id = stellar_session_plugin_register(st, lpm_ctx_new_func, lpm_ctx_free_func, (void *)new_plugin_env);
- struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state);
- LL_APPEND(plugin_manage->session_plugin_env_list, new_plugin_env);
- new_plugin_env->plugin_manage = plugin_manage;
- new_plugin_env->session_plugin_id = plugin_id;
-
- lua_settop(L, 0);
- lua_pushinteger(L, plugin_id);
+ struct lua_fn_arg_pair *new_mq_dispatch_arg = CALLOC(struct lua_fn_arg_pair, 1);
+ memset(new_mq_dispatch_arg, 0, sizeof(struct lua_fn_arg_pair));
+ new_mq_dispatch_arg->lua_fn_ref_id = on_dispatch_cb_fn_ref_id;
+ new_mq_dispatch_arg->lua_arg_ref_id = on_dispatch_arg_ref_id;
+ struct lua_fn_arg_pair *new_mq_msg_free_arg = CALLOC(struct lua_fn_arg_pair, 1);
+ memset(new_mq_msg_free_arg, 0, sizeof(struct lua_fn_arg_pair));
+ new_mq_msg_free_arg->lua_fn_ref_id = free_cb_fn_ref_id;
+ new_mq_msg_free_arg->lua_arg_ref_id = free_arg_ref_id;
+ int topic_id = mq_schema_create_topic(s,
+ (const char *)topic_name,
+ lua_manager_mq_on_msg_dispatch_cb_func,
+ new_mq_dispatch_arg,
+ lua_manager_mq_msg_free_cb_func,
+ new_mq_msg_free_arg);
+ if (topic_id >= 0)
+ {
+ struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state);
+ new_mq_dispatch_arg->lua_mod_mgr = lua_mod_mgr;
+ new_mq_msg_free_arg->lua_mod_mgr = lua_mod_mgr;
+ LL_APPEND(lua_mod_mgr->mq_dispatch_list, new_mq_dispatch_arg);
+ LL_APPEND(lua_mod_mgr->mq_msg_free_list, new_mq_msg_free_arg);
+ }
+ else
+ {
+ if (free_arg_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, free_arg_ref_id);
+ if (free_cb_fn_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, free_cb_fn_ref_id);
+ if (on_dispatch_arg_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_dispatch_arg_ref_id);
+ if (on_dispatch_cb_fn_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_dispatch_cb_fn_ref_id);
+ if (new_mq_dispatch_arg)
+ FREE(new_mq_dispatch_arg);
+ if (new_mq_msg_free_arg)
+ FREE(new_mq_msg_free_arg);
+ }
+ lua_pushinteger(L, topic_id);
return 1;
}
-int lua_packet_plugin_regist(struct lua_state *state)
+int lua_mq_schema_update_topic(struct lua_state *state)
{
lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 4 || lua_type(L, -4) != LUA_TLIGHTUSERDATA)
+ if (lua_gettop(L) != 6 || lua_type(L, -5) != LUA_TNUMBER || lua_type(L, -6) != LUA_TLIGHTUSERDATA)
{
lua_settop(L, 0);
return 0;
}
- /* stack -1 */
- int plugin_env_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
- if (plugin_env_ref_id == LUA_REFNIL)
- plugin_env_ref_id = 0;
-
- /* stack -2 */
- int on_packet_fn_ref_id = 0;
+ int free_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ int free_cb_fn_ref_id = -1;
if (lua_type(L, -1) == LUA_TFUNCTION)
- on_packet_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ free_cb_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
else
lua_pop(L, 1);
-
- /* stack -3 */
- int ip_protocol = lua_tointeger(L, -1);
+ int on_dispatch_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ int on_dispatch_cb_fn_ref_id = -1;
+ if (lua_type(L, -1) == LUA_TFUNCTION)
+ on_dispatch_cb_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ else
+ lua_pop(L, 1);
+ int topic_id = lua_tointeger(L, -1);
lua_pop(L, 1);
-
- /* stack -4 */
- struct stellar *st = (struct stellar *)lua_topointer(L, -1);
+ struct mq_schema *s = (struct mq_schema *)lua_topointer(L, -1);
lua_settop(L, 0);
- struct lua_packet_plugin_env *new_plugin_env = (struct lua_packet_plugin_env *)calloc(1, sizeof(struct lua_packet_plugin_env));
- memset(new_plugin_env, 0, sizeof(struct lua_packet_plugin_env));
- new_plugin_env->lua_plug_env_ref_id = plugin_env_ref_id;
- new_plugin_env->lua_on_packet_fn_ref_id = on_packet_fn_ref_id;
-
- int plugin_id = stellar_packet_plugin_register(st, (unsigned char)ip_protocol, lpm_on_packet_func, (void *)new_plugin_env);
- struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state);
- LL_APPEND(plugin_manage->packet_plugin_env_list, new_plugin_env);
- new_plugin_env->plugin_manage = plugin_manage;
- new_plugin_env->packet_plugin_id = plugin_id;
-
- lua_settop(L, 0);
- lua_pushinteger(L, plugin_id);
+ struct lua_fn_arg_pair *new_mq_dispatch_arg = CALLOC(struct lua_fn_arg_pair, 1);
+ memset(new_mq_dispatch_arg, 0, sizeof(struct lua_fn_arg_pair));
+ new_mq_dispatch_arg->lua_fn_ref_id = on_dispatch_cb_fn_ref_id;
+ new_mq_dispatch_arg->lua_arg_ref_id = on_dispatch_arg_ref_id;
+ struct lua_fn_arg_pair *new_mq_msg_free_arg = CALLOC(struct lua_fn_arg_pair, 1);
+ memset(new_mq_msg_free_arg, 0, sizeof(struct lua_fn_arg_pair));
+ new_mq_msg_free_arg->lua_fn_ref_id = free_cb_fn_ref_id;
+ new_mq_msg_free_arg->lua_arg_ref_id = free_arg_ref_id;
+ int update_ret = mq_schema_update_topic(s,
+ topic_id,
+ lua_manager_mq_on_msg_dispatch_cb_func,
+ new_mq_dispatch_arg,
+ lua_manager_mq_msg_free_cb_func,
+ new_mq_msg_free_arg);
+ if (topic_id)
+ {
+ if (free_arg_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, free_arg_ref_id);
+ if (free_cb_fn_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, free_cb_fn_ref_id);
+ if (on_dispatch_arg_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_dispatch_arg_ref_id);
+ if (on_dispatch_cb_fn_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_dispatch_cb_fn_ref_id);
+ if (new_mq_dispatch_arg)
+ FREE(new_mq_dispatch_arg);
+ if (new_mq_msg_free_arg)
+ FREE(new_mq_msg_free_arg);
+ }
+ else
+ {
+ struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state);
+ new_mq_dispatch_arg->lua_mod_mgr = lua_mod_mgr;
+ new_mq_msg_free_arg->lua_mod_mgr = lua_mod_mgr;
+ LL_APPEND(lua_mod_mgr->mq_dispatch_list, new_mq_dispatch_arg);
+ LL_APPEND(lua_mod_mgr->mq_msg_free_list, new_mq_msg_free_arg);
+ }
+ lua_pushinteger(L, update_ret);
return 1;
}
-/* ***** ***** ***** ***** ***** ***** */
-int lua_session_get_type(struct lua_state *state)
+int lua_mq_shcema_destory_topic(struct lua_state *state)
{
lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 1 || lua_type(L, -1) != LUA_TLIGHTUSERDATA)
+ if (lua_gettop(L) != 2 || lua_type(L, -1) != LUA_TNUMBER || lua_type(L, -2) != LUA_TLIGHTUSERDATA)
{
lua_settop(L, 0);
return 0;
}
- struct session *sess = (struct session *)lua_topointer(L, -1);
+ int topic_id = lua_tointeger(L, -1);
+ lua_pop(L, 1);
+ struct mq_schema *s = (struct mq_schema *)lua_topointer(L, -1);
lua_settop(L, 0);
- if (!sess)
- return 0;
- lua_pushinteger(L, (int)session_get_type(sess));
+ int destory_ret = mq_schema_destroy_topic(s, topic_id);
+
+ lua_pushinteger(L, destory_ret);
return 1;
}
-/* ***** ***** ***** ***** ***** ***** */
-#define MQ_TYPE_PACKET 1
-#define MQ_TYPE_SESSION 2
-static int lua_plugin_manage_msg_mq_create_topic(struct lua_state *state, int type)
+int lua_mq_schema_subscribe(struct lua_state *state)
{
lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 4 || lua_type(L, -3) != LUA_TSTRING || lua_type(L, -4) != LUA_TLIGHTUSERDATA)
+ if (lua_gettop(L) != 4 || lua_type(L, -2) != LUA_TFUNCTION || lua_type(L, -4) != LUA_TLIGHTUSERDATA)
{
lua_settop(L, 0);
return 0;
}
- /* stack -1 */
- int free_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
- if (free_arg_ref_id == LUA_REFNIL)
- free_arg_ref_id = 0;
-
- /* stack -2 */
- int free_fn_ref_id = 0;
- if (lua_type(L, -1) == LUA_TFUNCTION)
- free_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
- else
- lua_pop(L, 1);
-
- /* stack -3 */
- char *topic_name = strdup((char *)lua_tostring(L, -1));
+ int on_msg_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ int on_msg_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ int topic_id = lua_tointeger(L, -1);
lua_pop(L, 1);
-
- /* stack -4 */
- struct stellar *st = (struct stellar *)lua_topointer(L, -1);
+ struct mq_schema *s = (struct mq_schema *)lua_topointer(L, -1);
lua_settop(L, 0);
- struct lua_message_free_arg *new_message_free_arg = (struct lua_message_free_arg *)calloc(1, sizeof(struct lua_message_free_arg));
- memset(new_message_free_arg, 0, sizeof(struct lua_message_free_arg));
- new_message_free_arg->lua_msg_free_fn_ref_id = free_fn_ref_id;
- new_message_free_arg->lua_msg_free_arg_ref_id = free_arg_ref_id;
- int topic_id = -1;
- if (type == MQ_TYPE_PACKET)
- topic_id = stellar_packet_mq_create_topic(st, (const char *)topic_name, lpm_packet_message_free_func, new_message_free_arg);
- else if (type == MQ_TYPE_SESSION)
- topic_id = stellar_session_mq_create_topic(st, (const char *)topic_name, lpm_session_message_free_func, new_message_free_arg);
- if (topic_id >= 0)
+ struct lua_fn_arg_pair *new_mq_on_msg_arg = CALLOC(struct lua_fn_arg_pair, 1);
+ memset(new_mq_on_msg_arg, 0, sizeof(struct lua_fn_arg_pair));
+ new_mq_on_msg_arg->lua_arg_ref_id = on_msg_arg_ref_id;
+ new_mq_on_msg_arg->lua_fn_ref_id = on_msg_fn_ref_id;
+ int subscribe_ret = mq_schema_subscribe(s, topic_id, lua_manager_mq_on_msg_cb_func, new_mq_on_msg_arg);
+ if (subscribe_ret)
{
- struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state);
- LL_APPEND(plugin_manage->message_free_arg_list, new_message_free_arg);
- new_message_free_arg->topic_id = topic_id;
- new_message_free_arg->plugin_manage = plugin_manage;
- lua_pop(L, 1);
+ if (on_msg_arg_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_msg_arg_ref_id);
+ if (on_msg_fn_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_msg_fn_ref_id);
+ if (new_mq_on_msg_arg)
+ FREE(new_mq_on_msg_arg);
}
else
{
- if (new_message_free_arg)
- free(new_message_free_arg);
- return 0;
+ struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state);
+ new_mq_on_msg_arg->lua_mod_mgr = lua_mod_mgr;
+ LL_APPEND(lua_mod_mgr->mq_on_msg_list, new_mq_on_msg_arg);
}
- lua_pushinteger(L, topic_id);
+ lua_pushinteger(L, subscribe_ret);
return 1;
}
-static int lua_plugin_manage_msg_mq_get_topic_id(struct lua_state *state, int type)
+int lua_mq_runtime_publish_message(struct lua_state *state)
{
lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 2 || lua_type(L, -1) != LUA_TSTRING || lua_type(L, -2) != LUA_TLIGHTUSERDATA)
+ if (lua_gettop(L) != 3 || lua_type(L, -2) != LUA_TNUMBER || lua_type(L, -3) != LUA_TLIGHTUSERDATA)
{
lua_settop(L, 0);
return 0;
}
- char *topic_name = strdup(lua_tostring(L, -1));
+ int msg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ int topic_id = lua_tointeger(L, -1);
lua_pop(L, 1);
- struct stellar *st = (struct stellar *)lua_topointer(L, -1);
+ struct mq_runtime *rt = (struct mq_runtime *)lua_topointer(L, -1);
lua_settop(L, 0);
- int topic_id = -1;
- if (type == MQ_TYPE_PACKET)
- topic_id = stellar_packet_mq_get_topic_id(st, (const char *)topic_name);
- else if (type == MQ_TYPE_SESSION)
- topic_id = stellar_session_mq_get_topic_id(st, (const char *)topic_name);
- if (topic_name)
- free(topic_name);
+ struct lua_context *new_context = lua_context_new_with_ref_id(state, msg_ref_id);
- lua_pushinteger(L, topic_id);
+ int publish_ret = mq_runtime_publish_message(rt, topic_id, new_context);
+ if (publish_ret)
+ {
+ lua_context_free(new_context);
+ }
+ lua_pushinteger(L, publish_ret);
return 1;
}
-static int lua_plugin_manage_msg_mq_update_topic(struct lua_state *state, int type)
-{
- lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 4 || lua_type(L, -3) != LUA_TNUMBER || lua_type(L, -4) != LUA_TLIGHTUSERDATA)
- goto err;
-
- /* stack -1 */
- int free_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
- if (free_arg_ref_id == LUA_REFNIL)
- free_arg_ref_id = 0;
-
- /* stack -2 */
- int free_fn_ref_id = 0;
- if (lua_type(L, -1) == LUA_TFUNCTION)
- free_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
- else
- lua_pop(L, 1);
+LUA_BINDING_FUNCTION_GET_VALUE_INT(lua_module_manager_get_thread_id,
+ struct stellar_module_manager,
+ stellar_module_manager_get_thread_id)
- /* stack -3 */
- int topic_id = lua_tointeger(L, -1);
- lua_pop(L, 1);
+LUA_BINDING_FUNCTION_GET_VALUE_INT(lua_module_manager_get_max_thread_num,
+ struct stellar_module_manager,
+ stellar_module_manager_get_max_thread_num)
- /* stack -4 */
- struct stellar *st = (struct stellar *)lua_topointer(L, -1);
- lua_settop(L, 0);
+LUA_BINDING_FUNCTION_GET_VALUE_POINTER(lua_module_manager_get_mq_runtime,
+ struct stellar_module_manager,
+ struct mq_runtime,
+ stellar_module_manager_get_mq_runtime)
- struct lua_message_free_arg *new_message_free_arg = (struct lua_message_free_arg *)calloc(1, sizeof(struct lua_message_free_arg));
- memset(new_message_free_arg, 0, sizeof(struct lua_message_free_arg));
- new_message_free_arg->topic_id = topic_id;
- new_message_free_arg->lua_msg_free_fn_ref_id = free_fn_ref_id;
- new_message_free_arg->lua_msg_free_arg_ref_id = free_arg_ref_id;
- int update_result = -1;
- if (type == MQ_TYPE_PACKET)
- update_result = stellar_packet_mq_update_topic(st, topic_id, lpm_packet_message_free_func, new_message_free_arg);
- else if (type == MQ_TYPE_SESSION)
- update_result = stellar_session_mq_update_topic(st, topic_id, lpm_session_message_free_func, new_message_free_arg);
- if (update_result)
- {
- if (new_message_free_arg)
- free(new_message_free_arg);
- goto err;
- }
-
- struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state);
- new_message_free_arg->plugin_manage = plugin_manage;
- LL_APPEND(plugin_manage->message_free_arg_list, new_message_free_arg);
- lua_settop(L, 0);
- lua_pushboolean(L, 1);
- return 1;
-err:
- lua_settop(L, 0);
- lua_pushboolean(L, 0);
- return 1;
-}
+LUA_BINDING_FUNCTION_GET_VALUE_POINTER(lua_module_manager_get_mq_schema,
+ struct stellar_module_manager,
+ struct mq_schema,
+ stellar_module_manager_get_mq_schema)
-static int lua_plugin_manage_msg_mq_destory_topic(struct lua_state *state, int type)
-{
- lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 2)
- goto err;
+LUA_BINDING_FUNCTION_GET_VALUE_POINTER(lua_module_manager_get_logger,
+ struct stellar_module_manager,
+ struct logger,
+ stellar_module_manager_get_logger)
- int topic_id = lua_tointeger(L, -1);
- lua_pop(L, 1);
+LUA_BINDING_FUNCTION_GET_VALUE_INT(lua_packet_get_direction,
+ const struct packet,
+ packet_get_direction)
- struct stellar *st = (struct stellar *)lua_topointer(L, -1);
- lua_settop(L, 0);
+LUA_BINDING_FUNCTION_GET_VALUE_POINTER(lua_packet_get_payload,
+ const struct packet,
+ const char,
+ packet_get_payload)
- int destory_result = -1;
- if (type == MQ_TYPE_PACKET)
- destory_result = stellar_packet_mq_destroy_topic(st, topic_id);
- else if (type == MQ_TYPE_SESSION)
- destory_result = stellar_session_mq_destroy_topic(st, topic_id);
- if (destory_result < 0)
- goto err;
- lua_pushboolean(L, 1);
- return 1;
-err:
- lua_settop(L, 0);
- lua_pushboolean(L, 0);
- return 1;
-}
+LUA_BINDING_FUNCTION_GET_VALUE_INT(lua_packet_get_payload_len,
+ const struct packet,
+ packet_get_payload_len)
-static int lua_plugin_manage_msg_mq_subscribe_topic(struct lua_state *state, int type)
+int lua_packet_manager_subscribe(struct lua_state *state)
{
lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 4 || lua_type(L, -4) != LUA_TLIGHTUSERDATA)
- goto err;
-
- int plugin_id = lua_tointeger(L, -1);
- lua_pop(L, 1);
-
- int on_message_fn_ref_id = 0;
- if (lua_type(L, -1) == LUA_TFUNCTION)
- on_message_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
- else
- lua_pop(L, 1);
+ if (lua_gettop(L) != 4 || lua_type(L, -2) != LUA_TFUNCTION || lua_type(L, -3) != LUA_TNUMBER || lua_type(L, -4) != LUA_TLIGHTUSERDATA)
+ {
+ lua_settop(L, 0);
+ return 0;
+ }
- int topic_id = lua_tointeger(L, -1);
+ int on_packet_stage_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ int on_packet_stage_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ int stage = lua_tointeger(L, -1);
lua_pop(L, 1);
- struct stellar *st = (struct stellar *)lua_topointer(L, -1);
+ struct packet_manager *pkt_mgr = (struct packet_manager *)lua_topointer(L, -1);
lua_settop(L, 0);
- int subscribe_result = -1;
- if (type == MQ_TYPE_PACKET)
- subscribe_result = stellar_packet_mq_subscribe(st, topic_id, lpm_on_packet_msg_func, plugin_id);
- else if (type == MQ_TYPE_SESSION)
- subscribe_result = stellar_session_mq_subscribe(st, topic_id, lpm_on_session_msg_func, plugin_id);
- if (subscribe_result)
- goto err;
-
- struct lua_on_message_fn *on_message = NULL;
- struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state);
- if (type == MQ_TYPE_PACKET)
- on_message = hash_find_on_msg_fn(plugin_manage->on_packet_message_hashlist, topic_id, plugin_id);
- else if (type == MQ_TYPE_SESSION)
- on_message = hash_find_on_msg_fn(plugin_manage->on_session_message_hashlist, topic_id, plugin_id);
- if (on_message)
+ struct lua_fn_arg_pair *new_on_packet_arg = CALLOC(struct lua_fn_arg_pair, 1);
+ new_on_packet_arg->lua_arg_ref_id = on_packet_stage_arg_ref_id;
+ new_on_packet_arg->lua_fn_ref_id = on_packet_stage_fn_ref_id;
+ int subscribe_ret = packet_manager_subscribe(pkt_mgr, (enum packet_stage)stage, lua_manager_packet_on_stage_callback, new_on_packet_arg);
+ if (subscribe_ret)
{
- on_message->lua_on_msg_fn_ref_id = on_message_fn_ref_id;
+ if (on_packet_stage_arg_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_packet_stage_arg_ref_id);
+ if (on_packet_stage_fn_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_packet_stage_fn_ref_id);
+ if (new_on_packet_arg)
+ FREE(new_on_packet_arg);
}
else
{
- if (type == MQ_TYPE_PACKET)
- on_message = hash_on_msg_fn_insert(plugin_manage->on_packet_message_hashlist, topic_id, plugin_id);
- else if (type == MQ_TYPE_SESSION)
- on_message = hash_on_msg_fn_insert(plugin_manage->on_session_message_hashlist, topic_id, plugin_id);
- if (!on_message)
- goto err;
- else
- on_message->lua_on_msg_fn_ref_id = on_message_fn_ref_id;
+ struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state);
+ new_on_packet_arg->lua_mod_mgr = lua_mod_mgr;
+ LL_APPEND(lua_mod_mgr->on_packet_stage_list, new_on_packet_arg);
}
- lua_settop(L, 0);
- lua_pushboolean(L, 1);
- return 1;
-err:
- lua_settop(L, 0);
- lua_pushboolean(L, 0);
+ lua_pushinteger(L, subscribe_ret);
return 1;
}
-static int lua_plugin_manage_msg_mq_publish_message(struct lua_state *state, int type)
+LUA_BINDING_FUNCTION_GET_VALUE_POINTER(lua_session_get0_current_packet,
+ const struct session,
+ const struct packet,
+ session_get0_current_packet)
+
+enum sess_mgr_sub_type
+{
+ sub_type_begin = 0,
+ sub_type_tcp,
+ sub_type_udp,
+ sub_type_control_pkt,
+ sub_type_end
+};
+
+static int lua_session_manager_subscribe(struct lua_state *state, enum sess_mgr_sub_type type)
{
lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 3)
- goto err;
-
- int msg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
- if (msg_ref_id == LUA_REFNIL)
- msg_ref_id = 0;
-
- int topic_id = lua_tointeger(L, -1);
- lua_pop(L, 1);
+ if (lua_gettop(L) != 3 || lua_type(L, -2) != LUA_TFUNCTION || lua_type(L, -3) != LUA_TLIGHTUSERDATA)
+ {
+ lua_settop(L, 0);
+ return 0;
+ }
- void *p = (void *)lua_topointer(L, -1);
+ int on_session_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ int on_session_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ struct session_manager *sess_mgr = (struct session_manager *)lua_topointer(L, -1);
lua_settop(L, 0);
- struct lua_context *new_context = lua_context_new(state);
- new_context->lua_context_ref_id = msg_ref_id;
-
- int publish_result = -1;
- if (type == MQ_TYPE_PACKET)
- publish_result = packet_mq_publish_message((struct packet *)p, topic_id, new_context);
- else if (type == MQ_TYPE_SESSION)
- publish_result = session_mq_publish_message((struct session *)p, topic_id, new_context);
- if (publish_result)
+ struct lua_fn_arg_pair *new_on_sess_arg = CALLOC(struct lua_fn_arg_pair, 1);
+ new_on_sess_arg->lua_arg_ref_id = on_session_arg_ref_id;
+ new_on_sess_arg->lua_fn_ref_id = on_session_fn_ref_id;
+ int subscribe_ret = -1;
+ switch (type)
{
- luaL_unref(L, LUA_REGISTRYINDEX, new_context->lua_context_ref_id);
- free(new_context);
- goto err;
+ case sub_type_tcp:
+ subscribe_ret = session_manager_subscribe_tcp(sess_mgr, lua_manager_session_callback, new_on_sess_arg);
+ break;
+ case sub_type_udp:
+ subscribe_ret = session_manager_subscribe_udp(sess_mgr, lua_manager_session_callback, new_on_sess_arg);
+ break;
+ case sub_type_control_pkt:
+ subscribe_ret = session_manager_subscribe_control_packet(sess_mgr, lua_manager_session_callback, new_on_sess_arg);
+ break;
+ default:
+ break;
+ }
+ if (subscribe_ret)
+ {
+ if (on_session_arg_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_session_arg_ref_id);
+ if (on_session_fn_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_session_fn_ref_id);
+ if (new_on_sess_arg)
+ FREE(new_on_sess_arg);
+ }
+ else
+ {
+ struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state);
+ new_on_sess_arg->lua_mod_mgr = lua_mod_mgr;
+ switch (type)
+ {
+ case sub_type_tcp:
+ LL_APPEND(lua_mod_mgr->on_session_tcp_list, new_on_sess_arg);
+ break;
+ case sub_type_udp:
+ LL_APPEND(lua_mod_mgr->on_session_udp_list, new_on_sess_arg);
+ break;
+ case sub_type_control_pkt:
+ LL_APPEND(lua_mod_mgr->on_session_control_packet_list, new_on_sess_arg);
+ break;
+ default:
+ break;
+ }
}
- lua_pushboolean(L, 1);
- return 1;
-err:
- lua_settop(L, 0);
- lua_pushboolean(L, 0);
+ lua_pushinteger(L, subscribe_ret);
return 1;
}
-/* ***** ***** ***** ***** ***** ***** */
-int lua_packet_mq_create_topic(struct lua_state *state)
+int lua_session_manager_subscribe_tcp(struct lua_state *state)
{
- return lua_plugin_manage_msg_mq_create_topic(state, MQ_TYPE_PACKET);
+ return lua_session_manager_subscribe(state, sub_type_tcp);
}
-int lua_packet_mq_get_topic_id(struct lua_state *state)
+int lua_session_manager_subscribe_udp(struct lua_state *state)
{
- return lua_plugin_manage_msg_mq_get_topic_id(state, MQ_TYPE_PACKET);
+ return lua_session_manager_subscribe(state, sub_type_udp);
}
-int lua_packet_mq_update_topic(struct lua_state *state)
+int lua_session_manager_subscribe_control_packet(struct lua_state *state)
{
- return lua_plugin_manage_msg_mq_update_topic(state, MQ_TYPE_PACKET);
+ return lua_session_manager_subscribe(state, sub_type_control_pkt);
}
-int lua_packet_mq_destory_topic(struct lua_state *state)
-{
- return lua_plugin_manage_msg_mq_destory_topic(state, MQ_TYPE_PACKET);
-}
-
-int lua_packet_mq_subscribe_topic(struct lua_state *state)
-{
- return lua_plugin_manage_msg_mq_subscribe_topic(state, MQ_TYPE_PACKET);
-}
-
-int lua_packet_mq_publish_message(struct lua_state *state)
-{
- return lua_plugin_manage_msg_mq_publish_message(state, MQ_TYPE_PACKET);
-}
-
-/* ***** ***** ***** ***** ***** ***** */
-int lua_session_mq_create_topic(struct lua_state *state)
-{
- return lua_plugin_manage_msg_mq_create_topic(state, MQ_TYPE_SESSION);
-}
-
-int lua_session_mq_get_topic_id(struct lua_state *state)
-{
- return lua_plugin_manage_msg_mq_get_topic_id(state, MQ_TYPE_SESSION);
-}
-
-int lua_session_mq_update_topic(struct lua_state *state)
-{
- return lua_plugin_manage_msg_mq_update_topic(state, MQ_TYPE_SESSION);
-}
-
-int lua_session_mq_destory_topic(struct lua_state *state)
-{
- return lua_plugin_manage_msg_mq_destory_topic(state, MQ_TYPE_SESSION);
-}
-
-int lua_session_mq_subscribe_topic(struct lua_state *state)
-{
- return lua_plugin_manage_msg_mq_subscribe_topic(state, MQ_TYPE_SESSION);
-}
-
-int lua_session_mq_publish_message(struct lua_state *state)
-{
- return lua_plugin_manage_msg_mq_publish_message(state, MQ_TYPE_SESSION);
-}
-
-int lua_session_mq_ignore_message(struct lua_state *state)
+int lua_session_manager_subscribe_tcp_stream(struct lua_state *state)
{
lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 3)
- goto err;
-
- int plugin_id = lua_tointeger(L, -1);
- lua_pop(L, 1);
-
- int topic_id = lua_tointeger(L, -1);
- lua_pop(L, 1);
-
- struct session *sess = (struct session *)lua_topointer(L, -1);
- lua_settop(L, 0);
-
- if (session_mq_ignore_message(sess, topic_id, plugin_id))
- goto err;
-
- lua_pushboolean(L, 1);
- return 1;
-err:
- lua_settop(L, 0);
- lua_pushboolean(L, 0);
- return 1;
-}
-
-int lua_session_mq_unignore_message(struct lua_state *state)
-{
- lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 4)
- goto err;
-
- int plugin_id = lua_tointeger(L, -1);
- lua_pop(L, 1);
-
- int topic_id = lua_tointeger(L, -1);
- lua_pop(L, 1);
-
- struct session *sess = (struct session *)lua_topointer(L, -1);
- lua_settop(L, 0);
-
- if (session_mq_unignore_message(sess, topic_id, plugin_id))
- goto err;
- lua_pushboolean(L, 1);
- return 1;
-err:
- lua_settop(L, 0);
- lua_pushboolean(L, 0);
-
- return 1;
-}
-
-int lua_session_mq_topic_is_active(struct lua_state *state)
-{
- lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 2)
- goto err;
-
- int topic_id = lua_tointeger(L, -1);
- lua_pop(L, 1);
+ if (lua_gettop(L) != 3 || lua_type(L, -2) != LUA_TFUNCTION || lua_type(L, -4) != LUA_TLIGHTUSERDATA)
+ {
+ lua_settop(L, 0);
+ return 0;
+ }
- struct session *sess = (struct session *)lua_topointer(L, -1);
+ int on_tcp_stream_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ int on_tcp_stream_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX);
+ struct session_manager *sess_mgr = (struct session_manager *)lua_topointer(L, -1);
lua_settop(L, 0);
- /* 1 means active */
- if (session_mq_topic_is_active(sess, topic_id) != 1)
- goto err;
- lua_pushboolean(L, 1);
- return 1;
-err:
- lua_settop(L, 0);
- lua_pushboolean(L, 0);
- return 1;
-}
-
-static const char lua_base64_encode_table[] = {
- 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
- 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
- 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd',
- 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
- 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
- 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7',
- '8', '9', '+', '/'};
-
-static int lua_base64_encode(const char *indata, int inlen, char *outdata, int *outlen)
-{
- if (indata == NULL || inlen <= 0)
+ struct lua_fn_arg_pair *new_on_tcp_stream_arg = CALLOC(struct lua_fn_arg_pair, 1);
+ new_on_tcp_stream_arg->lua_arg_ref_id = on_tcp_stream_arg_ref_id;
+ new_on_tcp_stream_arg->lua_fn_ref_id = on_tcp_stream_fn_ref_id;
+ int subscribe_ret = session_manager_subscribe_tcp_stream(sess_mgr, lua_manager_tcp_stream_callback, new_on_tcp_stream_arg);
+ if (subscribe_ret)
{
- return -1;
- }
- int i, j;
- unsigned char num = inlen % 3;
- if (outdata != NULL)
- {
- // 编码,3个字节一组,若数据总长度不是3的倍数,则跳过最后的 num 个字节数据
- for (i = 0, j = 0; i < inlen - num; i += 3, j += 4)
- {
- outdata[j] = lua_base64_encode_table[(unsigned char)indata[i] >> 2];
- outdata[j + 1] = lua_base64_encode_table[(((unsigned char)indata[i] & 0x03) << 4) | ((unsigned char)indata[i + 1] >> 4)];
- outdata[j + 2] = lua_base64_encode_table[(((unsigned char)indata[i + 1] & 0x0f) << 2) | ((unsigned char)indata[i + 2] >> 6)];
- outdata[j + 3] = lua_base64_encode_table[(unsigned char)indata[i + 2] & 0x3f];
- }
- // 继续处理最后的 num 个字节的数据
- if (num == 1)
- { // 余数为1,需补齐两个字节'='
- outdata[j] = lua_base64_encode_table[(unsigned char)indata[inlen - 1] >> 2];
- outdata[j + 1] = lua_base64_encode_table[((unsigned char)indata[inlen - 1] & 0x03) << 4];
- outdata[j + 2] = '=';
- outdata[j + 3] = '=';
- }
- else if (num == 2)
- { // 余数为2,需补齐一个字节'='
- outdata[j] = lua_base64_encode_table[(unsigned char)indata[inlen - 2] >> 2];
- outdata[j + 1] = lua_base64_encode_table[(((unsigned char)indata[inlen - 2] & 0x03) << 4) | ((unsigned char)indata[inlen - 1] >> 4)];
- outdata[j + 2] = lua_base64_encode_table[((unsigned char)indata[inlen - 1] & 0x0f) << 2];
- outdata[j + 3] = '=';
- }
+ if (on_tcp_stream_arg_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_tcp_stream_arg_ref_id);
+ if (on_tcp_stream_fn_ref_id + 1)
+ luaL_unref(L, LUA_REGISTRYINDEX, on_tcp_stream_fn_ref_id);
+ if (new_on_tcp_stream_arg)
+ FREE(new_on_tcp_stream_arg);
}
- if (outlen != NULL)
+ else
{
- *outlen = (inlen + (num == 0 ? 0 : 3 - num)) * 4 / 3;
+ struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state);
+ new_on_tcp_stream_arg->lua_mod_mgr = lua_mod_mgr;
+ LL_APPEND(lua_mod_mgr->on_tcp_stream_list, new_on_tcp_stream_arg);
}
- return 0;
+ lua_pushinteger(L, subscribe_ret);
+ return 1;
}
-
-int lua_session_get0_current_payload(struct lua_state *state)
-{
- lua_State *L = (lua_State *)state;
- if (lua_gettop(L) != 1)
- goto err;
-
- struct session *sess = (struct session *)lua_topointer(L, -1);
- lua_settop(L, 0);
-
- size_t payload_len = 0;
- const char *payload = session_get0_current_payload(sess, &payload_len);
-
-#if 0
- lua_pushlightuserdata(L, (void *)payload);
- lua_pushinteger(L, (lua_Integer)payload_len);
- return 2;
-err:
- lua_settop(L, 0);
- lua_pushlightuserdata(L, NULL);
- lua_pushboolean(L, 0);
- return 2;
-#else
- char *payload_base64 = (char *)calloc(2, payload_len);
- int payload_base64_len = 0;
- if (!lua_base64_encode(payload, payload_len, payload_base64, &payload_base64_len))
- {
- lua_pushstring(L, payload_base64);
- lua_pushinteger(L, (lua_Integer)payload_len);
- if (payload_base64)
- free(payload_base64);
- return 2;
- }
- if (payload_base64)
- free(payload_base64);
-err:
- lua_settop(L, 0);
- lua_pushstring(L, NULL);
- lua_pushboolean(L, 0);
- return 2;
-#endif
-} \ No newline at end of file