diff options
Diffstat (limited to 'src/lua_binding_function.c')
| -rw-r--r-- | src/lua_binding_function.c | 863 |
1 files changed, 347 insertions, 516 deletions
diff --git a/src/lua_binding_function.c b/src/lua_binding_function.c index c308c41..f9ef35c 100644 --- a/src/lua_binding_function.c +++ b/src/lua_binding_function.c @@ -1,671 +1,502 @@ +#include "lua_binding_function.h" +#include "lua_binding_cfunc.h" + +#include "stellar/mq.h" +#include "stellar/module_manager.h" +#include "stellar/packet.h" +#include "stellar/packet_manager.h" +#include "stellar/session.h" +#include "stellar/session_manager.h" +#include "stellar/utils.h" + #include <stdlib.h> #include <string.h> - +#include <utlist.h> #include <lua.h> +#include <lualib.h> #include <lauxlib.h> -#include <utlist.h> -#include "lua_binding_function.h" +#define LUA_BINDING_FUNCTION_GET_VALUE_POINTER(func_name, param_type, value_type, value_func) \ + int func_name(struct lua_state *state) \ + { \ + lua_State *L = (lua_State *)state; \ + if (lua_gettop(L) != 1) \ + { \ + lua_settop(L, 0); \ + return 0; \ + } \ + param_type *p = (param_type *)lua_topointer(L, -1); \ + lua_settop(L, 0); \ + value_type *l = (value_type *)value_func(p); \ + lua_pushlightuserdata(L, (void *)l); \ + return 1; \ + } -/* ***** ***** ***** ***** ***** ***** */ -int lua_get_worker_thread_num(struct lua_state *state) -{ - lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 1) - { - lua_settop(L, 0); - return 0; +#define LUA_BINDING_FUNCTION_GET_VALUE_INT(func_name, param_type, value_func) \ + int func_name(struct lua_state *state) \ + { \ + lua_State *L = (lua_State *)state; \ + if (lua_gettop(L) != 1) \ + { \ + lua_settop(L, 0); \ + return 0; \ + } \ + param_type *p = (param_type *)lua_topointer(L, -1); \ + lua_settop(L, 0); \ + int v = (int)value_func(p); \ + lua_pushinteger(L, v); \ + return 1; \ } - struct stellar *st = (struct stellar *)lua_topointer(L, -1); - lua_settop(L, 0); - int thread_num = stellar_get_worker_thread_num(st); - lua_pushinteger(L, thread_num); - return 1; -} -int lua_get_current_thread_id(struct lua_state *state) +int lua_mq_schema_get_topic_id(struct lua_state *state) { lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 1) + if (lua_gettop(L) != 2 || lua_type(L, -1) != LUA_TSTRING || lua_type(L, -2) != LUA_TLIGHTUSERDATA) { lua_settop(L, 0); return 0; } - struct stellar *st = (struct stellar *)lua_topointer(L, -1); - lua_settop(L, 0); - int thread_id = stellar_get_current_thread_id(st); - lua_pushinteger(L, thread_id); - return 1; -} -int lua_get_stellar_pointer(struct lua_state *state) -{ - lua_State *L = (lua_State *)state; + char *topic_name = strdup(lua_tostring(L, -1)); + lua_pop(L, 1); + struct mq_schema *s = (struct mq_schema *)lua_topointer(L, -1); lua_settop(L, 0); - struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state); - struct stellar *st = plugin_manage->st; - lua_pushlightuserdata(L, (void *)st); - return 1; -} -/* -int lua_get_plugin_manage_pointer(struct lua_state *state) -{ - lua_settop((lua_State *)state, 0); - struct lua_plugin_manage * plugin_manage = lua_state_get_plugin_manage(state); - lua_pushlightuserdata((lua_State *)state, (void *)plugin_manage); + int topic_id = mq_schema_get_topic_id(s, (const char *)topic_name); + + if (topic_name) + FREE(topic_name); + lua_pushinteger(L, topic_id); return 1; } -*/ -/* ***** ***** ***** ***** ***** ***** */ -int lua_session_plugin_regist(struct lua_state *state) +int lua_mq_schema_create_topic(struct lua_state *state) { lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 4 || lua_type(L, -4) != LUA_TLIGHTUSERDATA) + if (lua_gettop(L) != 6 || lua_type(L, -5) != LUA_TSTRING || lua_type(L, -6) != LUA_TLIGHTUSERDATA) { lua_settop(L, 0); return 0; } - /* stack -1 */ - int plugin_env_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); - if (plugin_env_ref_id == LUA_REFNIL) - plugin_env_ref_id = 0; - - /* stack -2 */ - int ctx_free_fn_ref_id = 0; + int free_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + int free_cb_fn_ref_id = -1; if (lua_type(L, -1) == LUA_TFUNCTION) - ctx_free_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + free_cb_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); else lua_pop(L, 1); - - /* stack -3 */ - int ctx_new_fn_ref_id = 0; + int on_dispatch_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + int on_dispatch_cb_fn_ref_id = -1; if (lua_type(L, -1) == LUA_TFUNCTION) - ctx_new_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + on_dispatch_cb_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); else lua_pop(L, 1); - - /* stack -4 */ - struct stellar *st = (struct stellar *)lua_topointer(L, -1); + char *topic_name = strdup((char *)lua_tostring(L, -1)); + lua_pop(L, 1); + struct mq_schema *s = (struct mq_schema *)lua_topointer(L, -1); lua_settop(L, 0); - struct lua_session_plugin_env *new_plugin_env = (struct lua_session_plugin_env *)calloc(1, sizeof(struct lua_session_plugin_env)); - memset(new_plugin_env, 0, sizeof(struct lua_session_plugin_env)); - new_plugin_env->lua_ctx_new_fn_ref_id = ctx_new_fn_ref_id; - new_plugin_env->lua_ctx_free_fn_ref_id = ctx_free_fn_ref_id; - new_plugin_env->lua_plug_env_ref_id = plugin_env_ref_id; - - int plugin_id = stellar_session_plugin_register(st, lpm_ctx_new_func, lpm_ctx_free_func, (void *)new_plugin_env); - struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state); - LL_APPEND(plugin_manage->session_plugin_env_list, new_plugin_env); - new_plugin_env->plugin_manage = plugin_manage; - new_plugin_env->session_plugin_id = plugin_id; - - lua_settop(L, 0); - lua_pushinteger(L, plugin_id); + struct lua_fn_arg_pair *new_mq_dispatch_arg = CALLOC(struct lua_fn_arg_pair, 1); + memset(new_mq_dispatch_arg, 0, sizeof(struct lua_fn_arg_pair)); + new_mq_dispatch_arg->lua_fn_ref_id = on_dispatch_cb_fn_ref_id; + new_mq_dispatch_arg->lua_arg_ref_id = on_dispatch_arg_ref_id; + struct lua_fn_arg_pair *new_mq_msg_free_arg = CALLOC(struct lua_fn_arg_pair, 1); + memset(new_mq_msg_free_arg, 0, sizeof(struct lua_fn_arg_pair)); + new_mq_msg_free_arg->lua_fn_ref_id = free_cb_fn_ref_id; + new_mq_msg_free_arg->lua_arg_ref_id = free_arg_ref_id; + int topic_id = mq_schema_create_topic(s, + (const char *)topic_name, + lua_manager_mq_on_msg_dispatch_cb_func, + new_mq_dispatch_arg, + lua_manager_mq_msg_free_cb_func, + new_mq_msg_free_arg); + if (topic_id >= 0) + { + struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state); + new_mq_dispatch_arg->lua_mod_mgr = lua_mod_mgr; + new_mq_msg_free_arg->lua_mod_mgr = lua_mod_mgr; + LL_APPEND(lua_mod_mgr->mq_dispatch_list, new_mq_dispatch_arg); + LL_APPEND(lua_mod_mgr->mq_msg_free_list, new_mq_msg_free_arg); + } + else + { + if (free_arg_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, free_arg_ref_id); + if (free_cb_fn_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, free_cb_fn_ref_id); + if (on_dispatch_arg_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_dispatch_arg_ref_id); + if (on_dispatch_cb_fn_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_dispatch_cb_fn_ref_id); + if (new_mq_dispatch_arg) + FREE(new_mq_dispatch_arg); + if (new_mq_msg_free_arg) + FREE(new_mq_msg_free_arg); + } + lua_pushinteger(L, topic_id); return 1; } -int lua_packet_plugin_regist(struct lua_state *state) +int lua_mq_schema_update_topic(struct lua_state *state) { lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 4 || lua_type(L, -4) != LUA_TLIGHTUSERDATA) + if (lua_gettop(L) != 6 || lua_type(L, -5) != LUA_TNUMBER || lua_type(L, -6) != LUA_TLIGHTUSERDATA) { lua_settop(L, 0); return 0; } - /* stack -1 */ - int plugin_env_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); - if (plugin_env_ref_id == LUA_REFNIL) - plugin_env_ref_id = 0; - - /* stack -2 */ - int on_packet_fn_ref_id = 0; + int free_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + int free_cb_fn_ref_id = -1; if (lua_type(L, -1) == LUA_TFUNCTION) - on_packet_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + free_cb_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); else lua_pop(L, 1); - - /* stack -3 */ - int ip_protocol = lua_tointeger(L, -1); + int on_dispatch_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + int on_dispatch_cb_fn_ref_id = -1; + if (lua_type(L, -1) == LUA_TFUNCTION) + on_dispatch_cb_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + else + lua_pop(L, 1); + int topic_id = lua_tointeger(L, -1); lua_pop(L, 1); - - /* stack -4 */ - struct stellar *st = (struct stellar *)lua_topointer(L, -1); + struct mq_schema *s = (struct mq_schema *)lua_topointer(L, -1); lua_settop(L, 0); - struct lua_packet_plugin_env *new_plugin_env = (struct lua_packet_plugin_env *)calloc(1, sizeof(struct lua_packet_plugin_env)); - memset(new_plugin_env, 0, sizeof(struct lua_packet_plugin_env)); - new_plugin_env->lua_plug_env_ref_id = plugin_env_ref_id; - new_plugin_env->lua_on_packet_fn_ref_id = on_packet_fn_ref_id; - - int plugin_id = stellar_packet_plugin_register(st, (unsigned char)ip_protocol, lpm_on_packet_func, (void *)new_plugin_env); - struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state); - LL_APPEND(plugin_manage->packet_plugin_env_list, new_plugin_env); - new_plugin_env->plugin_manage = plugin_manage; - new_plugin_env->packet_plugin_id = plugin_id; - - lua_settop(L, 0); - lua_pushinteger(L, plugin_id); + struct lua_fn_arg_pair *new_mq_dispatch_arg = CALLOC(struct lua_fn_arg_pair, 1); + memset(new_mq_dispatch_arg, 0, sizeof(struct lua_fn_arg_pair)); + new_mq_dispatch_arg->lua_fn_ref_id = on_dispatch_cb_fn_ref_id; + new_mq_dispatch_arg->lua_arg_ref_id = on_dispatch_arg_ref_id; + struct lua_fn_arg_pair *new_mq_msg_free_arg = CALLOC(struct lua_fn_arg_pair, 1); + memset(new_mq_msg_free_arg, 0, sizeof(struct lua_fn_arg_pair)); + new_mq_msg_free_arg->lua_fn_ref_id = free_cb_fn_ref_id; + new_mq_msg_free_arg->lua_arg_ref_id = free_arg_ref_id; + int update_ret = mq_schema_update_topic(s, + topic_id, + lua_manager_mq_on_msg_dispatch_cb_func, + new_mq_dispatch_arg, + lua_manager_mq_msg_free_cb_func, + new_mq_msg_free_arg); + if (topic_id) + { + if (free_arg_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, free_arg_ref_id); + if (free_cb_fn_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, free_cb_fn_ref_id); + if (on_dispatch_arg_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_dispatch_arg_ref_id); + if (on_dispatch_cb_fn_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_dispatch_cb_fn_ref_id); + if (new_mq_dispatch_arg) + FREE(new_mq_dispatch_arg); + if (new_mq_msg_free_arg) + FREE(new_mq_msg_free_arg); + } + else + { + struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state); + new_mq_dispatch_arg->lua_mod_mgr = lua_mod_mgr; + new_mq_msg_free_arg->lua_mod_mgr = lua_mod_mgr; + LL_APPEND(lua_mod_mgr->mq_dispatch_list, new_mq_dispatch_arg); + LL_APPEND(lua_mod_mgr->mq_msg_free_list, new_mq_msg_free_arg); + } + lua_pushinteger(L, update_ret); return 1; } -/* ***** ***** ***** ***** ***** ***** */ -int lua_session_get_type(struct lua_state *state) +int lua_mq_shcema_destory_topic(struct lua_state *state) { lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 1 || lua_type(L, -1) != LUA_TLIGHTUSERDATA) + if (lua_gettop(L) != 2 || lua_type(L, -1) != LUA_TNUMBER || lua_type(L, -2) != LUA_TLIGHTUSERDATA) { lua_settop(L, 0); return 0; } - struct session *sess = (struct session *)lua_topointer(L, -1); + int topic_id = lua_tointeger(L, -1); + lua_pop(L, 1); + struct mq_schema *s = (struct mq_schema *)lua_topointer(L, -1); lua_settop(L, 0); - if (!sess) - return 0; - lua_pushinteger(L, (int)session_get_type(sess)); + int destory_ret = mq_schema_destroy_topic(s, topic_id); + + lua_pushinteger(L, destory_ret); return 1; } -/* ***** ***** ***** ***** ***** ***** */ -#define MQ_TYPE_PACKET 1 -#define MQ_TYPE_SESSION 2 -static int lua_plugin_manage_msg_mq_create_topic(struct lua_state *state, int type) +int lua_mq_schema_subscribe(struct lua_state *state) { lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 4 || lua_type(L, -3) != LUA_TSTRING || lua_type(L, -4) != LUA_TLIGHTUSERDATA) + if (lua_gettop(L) != 4 || lua_type(L, -2) != LUA_TFUNCTION || lua_type(L, -4) != LUA_TLIGHTUSERDATA) { lua_settop(L, 0); return 0; } - /* stack -1 */ - int free_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); - if (free_arg_ref_id == LUA_REFNIL) - free_arg_ref_id = 0; - - /* stack -2 */ - int free_fn_ref_id = 0; - if (lua_type(L, -1) == LUA_TFUNCTION) - free_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); - else - lua_pop(L, 1); - - /* stack -3 */ - char *topic_name = strdup((char *)lua_tostring(L, -1)); + int on_msg_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + int on_msg_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + int topic_id = lua_tointeger(L, -1); lua_pop(L, 1); - - /* stack -4 */ - struct stellar *st = (struct stellar *)lua_topointer(L, -1); + struct mq_schema *s = (struct mq_schema *)lua_topointer(L, -1); lua_settop(L, 0); - struct lua_message_free_arg *new_message_free_arg = (struct lua_message_free_arg *)calloc(1, sizeof(struct lua_message_free_arg)); - memset(new_message_free_arg, 0, sizeof(struct lua_message_free_arg)); - new_message_free_arg->lua_msg_free_fn_ref_id = free_fn_ref_id; - new_message_free_arg->lua_msg_free_arg_ref_id = free_arg_ref_id; - int topic_id = -1; - if (type == MQ_TYPE_PACKET) - topic_id = stellar_packet_mq_create_topic(st, (const char *)topic_name, lpm_packet_message_free_func, new_message_free_arg); - else if (type == MQ_TYPE_SESSION) - topic_id = stellar_session_mq_create_topic(st, (const char *)topic_name, lpm_session_message_free_func, new_message_free_arg); - if (topic_id >= 0) + struct lua_fn_arg_pair *new_mq_on_msg_arg = CALLOC(struct lua_fn_arg_pair, 1); + memset(new_mq_on_msg_arg, 0, sizeof(struct lua_fn_arg_pair)); + new_mq_on_msg_arg->lua_arg_ref_id = on_msg_arg_ref_id; + new_mq_on_msg_arg->lua_fn_ref_id = on_msg_fn_ref_id; + int subscribe_ret = mq_schema_subscribe(s, topic_id, lua_manager_mq_on_msg_cb_func, new_mq_on_msg_arg); + if (subscribe_ret) { - struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state); - LL_APPEND(plugin_manage->message_free_arg_list, new_message_free_arg); - new_message_free_arg->topic_id = topic_id; - new_message_free_arg->plugin_manage = plugin_manage; - lua_pop(L, 1); + if (on_msg_arg_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_msg_arg_ref_id); + if (on_msg_fn_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_msg_fn_ref_id); + if (new_mq_on_msg_arg) + FREE(new_mq_on_msg_arg); } else { - if (new_message_free_arg) - free(new_message_free_arg); - return 0; + struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state); + new_mq_on_msg_arg->lua_mod_mgr = lua_mod_mgr; + LL_APPEND(lua_mod_mgr->mq_on_msg_list, new_mq_on_msg_arg); } - lua_pushinteger(L, topic_id); + lua_pushinteger(L, subscribe_ret); return 1; } -static int lua_plugin_manage_msg_mq_get_topic_id(struct lua_state *state, int type) +int lua_mq_runtime_publish_message(struct lua_state *state) { lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 2 || lua_type(L, -1) != LUA_TSTRING || lua_type(L, -2) != LUA_TLIGHTUSERDATA) + if (lua_gettop(L) != 3 || lua_type(L, -2) != LUA_TNUMBER || lua_type(L, -3) != LUA_TLIGHTUSERDATA) { lua_settop(L, 0); return 0; } - char *topic_name = strdup(lua_tostring(L, -1)); + int msg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + int topic_id = lua_tointeger(L, -1); lua_pop(L, 1); - struct stellar *st = (struct stellar *)lua_topointer(L, -1); + struct mq_runtime *rt = (struct mq_runtime *)lua_topointer(L, -1); lua_settop(L, 0); - int topic_id = -1; - if (type == MQ_TYPE_PACKET) - topic_id = stellar_packet_mq_get_topic_id(st, (const char *)topic_name); - else if (type == MQ_TYPE_SESSION) - topic_id = stellar_session_mq_get_topic_id(st, (const char *)topic_name); - if (topic_name) - free(topic_name); + struct lua_context *new_context = lua_context_new_with_ref_id(state, msg_ref_id); - lua_pushinteger(L, topic_id); + int publish_ret = mq_runtime_publish_message(rt, topic_id, new_context); + if (publish_ret) + { + lua_context_free(new_context); + } + lua_pushinteger(L, publish_ret); return 1; } -static int lua_plugin_manage_msg_mq_update_topic(struct lua_state *state, int type) -{ - lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 4 || lua_type(L, -3) != LUA_TNUMBER || lua_type(L, -4) != LUA_TLIGHTUSERDATA) - goto err; - - /* stack -1 */ - int free_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); - if (free_arg_ref_id == LUA_REFNIL) - free_arg_ref_id = 0; - - /* stack -2 */ - int free_fn_ref_id = 0; - if (lua_type(L, -1) == LUA_TFUNCTION) - free_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); - else - lua_pop(L, 1); +LUA_BINDING_FUNCTION_GET_VALUE_INT(lua_module_manager_get_thread_id, + struct stellar_module_manager, + stellar_module_manager_get_thread_id) - /* stack -3 */ - int topic_id = lua_tointeger(L, -1); - lua_pop(L, 1); +LUA_BINDING_FUNCTION_GET_VALUE_INT(lua_module_manager_get_max_thread_num, + struct stellar_module_manager, + stellar_module_manager_get_max_thread_num) - /* stack -4 */ - struct stellar *st = (struct stellar *)lua_topointer(L, -1); - lua_settop(L, 0); +LUA_BINDING_FUNCTION_GET_VALUE_POINTER(lua_module_manager_get_mq_runtime, + struct stellar_module_manager, + struct mq_runtime, + stellar_module_manager_get_mq_runtime) - struct lua_message_free_arg *new_message_free_arg = (struct lua_message_free_arg *)calloc(1, sizeof(struct lua_message_free_arg)); - memset(new_message_free_arg, 0, sizeof(struct lua_message_free_arg)); - new_message_free_arg->topic_id = topic_id; - new_message_free_arg->lua_msg_free_fn_ref_id = free_fn_ref_id; - new_message_free_arg->lua_msg_free_arg_ref_id = free_arg_ref_id; - int update_result = -1; - if (type == MQ_TYPE_PACKET) - update_result = stellar_packet_mq_update_topic(st, topic_id, lpm_packet_message_free_func, new_message_free_arg); - else if (type == MQ_TYPE_SESSION) - update_result = stellar_session_mq_update_topic(st, topic_id, lpm_session_message_free_func, new_message_free_arg); - if (update_result) - { - if (new_message_free_arg) - free(new_message_free_arg); - goto err; - } - - struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state); - new_message_free_arg->plugin_manage = plugin_manage; - LL_APPEND(plugin_manage->message_free_arg_list, new_message_free_arg); - lua_settop(L, 0); - lua_pushboolean(L, 1); - return 1; -err: - lua_settop(L, 0); - lua_pushboolean(L, 0); - return 1; -} +LUA_BINDING_FUNCTION_GET_VALUE_POINTER(lua_module_manager_get_mq_schema, + struct stellar_module_manager, + struct mq_schema, + stellar_module_manager_get_mq_schema) -static int lua_plugin_manage_msg_mq_destory_topic(struct lua_state *state, int type) -{ - lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 2) - goto err; +LUA_BINDING_FUNCTION_GET_VALUE_POINTER(lua_module_manager_get_logger, + struct stellar_module_manager, + struct logger, + stellar_module_manager_get_logger) - int topic_id = lua_tointeger(L, -1); - lua_pop(L, 1); +LUA_BINDING_FUNCTION_GET_VALUE_INT(lua_packet_get_direction, + const struct packet, + packet_get_direction) - struct stellar *st = (struct stellar *)lua_topointer(L, -1); - lua_settop(L, 0); +LUA_BINDING_FUNCTION_GET_VALUE_POINTER(lua_packet_get_payload, + const struct packet, + const char, + packet_get_payload) - int destory_result = -1; - if (type == MQ_TYPE_PACKET) - destory_result = stellar_packet_mq_destroy_topic(st, topic_id); - else if (type == MQ_TYPE_SESSION) - destory_result = stellar_session_mq_destroy_topic(st, topic_id); - if (destory_result < 0) - goto err; - lua_pushboolean(L, 1); - return 1; -err: - lua_settop(L, 0); - lua_pushboolean(L, 0); - return 1; -} +LUA_BINDING_FUNCTION_GET_VALUE_INT(lua_packet_get_payload_len, + const struct packet, + packet_get_payload_len) -static int lua_plugin_manage_msg_mq_subscribe_topic(struct lua_state *state, int type) +int lua_packet_manager_subscribe(struct lua_state *state) { lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 4 || lua_type(L, -4) != LUA_TLIGHTUSERDATA) - goto err; - - int plugin_id = lua_tointeger(L, -1); - lua_pop(L, 1); - - int on_message_fn_ref_id = 0; - if (lua_type(L, -1) == LUA_TFUNCTION) - on_message_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); - else - lua_pop(L, 1); + if (lua_gettop(L) != 4 || lua_type(L, -2) != LUA_TFUNCTION || lua_type(L, -3) != LUA_TNUMBER || lua_type(L, -4) != LUA_TLIGHTUSERDATA) + { + lua_settop(L, 0); + return 0; + } - int topic_id = lua_tointeger(L, -1); + int on_packet_stage_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + int on_packet_stage_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + int stage = lua_tointeger(L, -1); lua_pop(L, 1); - struct stellar *st = (struct stellar *)lua_topointer(L, -1); + struct packet_manager *pkt_mgr = (struct packet_manager *)lua_topointer(L, -1); lua_settop(L, 0); - int subscribe_result = -1; - if (type == MQ_TYPE_PACKET) - subscribe_result = stellar_packet_mq_subscribe(st, topic_id, lpm_on_packet_msg_func, plugin_id); - else if (type == MQ_TYPE_SESSION) - subscribe_result = stellar_session_mq_subscribe(st, topic_id, lpm_on_session_msg_func, plugin_id); - if (subscribe_result) - goto err; - - struct lua_on_message_fn *on_message = NULL; - struct lua_plugin_manage *plugin_manage = lua_state_get_plugin_manage(state); - if (type == MQ_TYPE_PACKET) - on_message = hash_find_on_msg_fn(plugin_manage->on_packet_message_hashlist, topic_id, plugin_id); - else if (type == MQ_TYPE_SESSION) - on_message = hash_find_on_msg_fn(plugin_manage->on_session_message_hashlist, topic_id, plugin_id); - if (on_message) + struct lua_fn_arg_pair *new_on_packet_arg = CALLOC(struct lua_fn_arg_pair, 1); + new_on_packet_arg->lua_arg_ref_id = on_packet_stage_arg_ref_id; + new_on_packet_arg->lua_fn_ref_id = on_packet_stage_fn_ref_id; + int subscribe_ret = packet_manager_subscribe(pkt_mgr, (enum packet_stage)stage, lua_manager_packet_on_stage_callback, new_on_packet_arg); + if (subscribe_ret) { - on_message->lua_on_msg_fn_ref_id = on_message_fn_ref_id; + if (on_packet_stage_arg_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_packet_stage_arg_ref_id); + if (on_packet_stage_fn_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_packet_stage_fn_ref_id); + if (new_on_packet_arg) + FREE(new_on_packet_arg); } else { - if (type == MQ_TYPE_PACKET) - on_message = hash_on_msg_fn_insert(plugin_manage->on_packet_message_hashlist, topic_id, plugin_id); - else if (type == MQ_TYPE_SESSION) - on_message = hash_on_msg_fn_insert(plugin_manage->on_session_message_hashlist, topic_id, plugin_id); - if (!on_message) - goto err; - else - on_message->lua_on_msg_fn_ref_id = on_message_fn_ref_id; + struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state); + new_on_packet_arg->lua_mod_mgr = lua_mod_mgr; + LL_APPEND(lua_mod_mgr->on_packet_stage_list, new_on_packet_arg); } - lua_settop(L, 0); - lua_pushboolean(L, 1); - return 1; -err: - lua_settop(L, 0); - lua_pushboolean(L, 0); + lua_pushinteger(L, subscribe_ret); return 1; } -static int lua_plugin_manage_msg_mq_publish_message(struct lua_state *state, int type) +LUA_BINDING_FUNCTION_GET_VALUE_POINTER(lua_session_get0_current_packet, + const struct session, + const struct packet, + session_get0_current_packet) + +enum sess_mgr_sub_type +{ + sub_type_begin = 0, + sub_type_tcp, + sub_type_udp, + sub_type_control_pkt, + sub_type_end +}; + +static int lua_session_manager_subscribe(struct lua_state *state, enum sess_mgr_sub_type type) { lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 3) - goto err; - - int msg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); - if (msg_ref_id == LUA_REFNIL) - msg_ref_id = 0; - - int topic_id = lua_tointeger(L, -1); - lua_pop(L, 1); + if (lua_gettop(L) != 3 || lua_type(L, -2) != LUA_TFUNCTION || lua_type(L, -3) != LUA_TLIGHTUSERDATA) + { + lua_settop(L, 0); + return 0; + } - void *p = (void *)lua_topointer(L, -1); + int on_session_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + int on_session_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + struct session_manager *sess_mgr = (struct session_manager *)lua_topointer(L, -1); lua_settop(L, 0); - struct lua_context *new_context = lua_context_new(state); - new_context->lua_context_ref_id = msg_ref_id; - - int publish_result = -1; - if (type == MQ_TYPE_PACKET) - publish_result = packet_mq_publish_message((struct packet *)p, topic_id, new_context); - else if (type == MQ_TYPE_SESSION) - publish_result = session_mq_publish_message((struct session *)p, topic_id, new_context); - if (publish_result) + struct lua_fn_arg_pair *new_on_sess_arg = CALLOC(struct lua_fn_arg_pair, 1); + new_on_sess_arg->lua_arg_ref_id = on_session_arg_ref_id; + new_on_sess_arg->lua_fn_ref_id = on_session_fn_ref_id; + int subscribe_ret = -1; + switch (type) { - luaL_unref(L, LUA_REGISTRYINDEX, new_context->lua_context_ref_id); - free(new_context); - goto err; + case sub_type_tcp: + subscribe_ret = session_manager_subscribe_tcp(sess_mgr, lua_manager_session_callback, new_on_sess_arg); + break; + case sub_type_udp: + subscribe_ret = session_manager_subscribe_udp(sess_mgr, lua_manager_session_callback, new_on_sess_arg); + break; + case sub_type_control_pkt: + subscribe_ret = session_manager_subscribe_control_packet(sess_mgr, lua_manager_session_callback, new_on_sess_arg); + break; + default: + break; + } + if (subscribe_ret) + { + if (on_session_arg_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_session_arg_ref_id); + if (on_session_fn_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_session_fn_ref_id); + if (new_on_sess_arg) + FREE(new_on_sess_arg); + } + else + { + struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state); + new_on_sess_arg->lua_mod_mgr = lua_mod_mgr; + switch (type) + { + case sub_type_tcp: + LL_APPEND(lua_mod_mgr->on_session_tcp_list, new_on_sess_arg); + break; + case sub_type_udp: + LL_APPEND(lua_mod_mgr->on_session_udp_list, new_on_sess_arg); + break; + case sub_type_control_pkt: + LL_APPEND(lua_mod_mgr->on_session_control_packet_list, new_on_sess_arg); + break; + default: + break; + } } - lua_pushboolean(L, 1); - return 1; -err: - lua_settop(L, 0); - lua_pushboolean(L, 0); + lua_pushinteger(L, subscribe_ret); return 1; } -/* ***** ***** ***** ***** ***** ***** */ -int lua_packet_mq_create_topic(struct lua_state *state) +int lua_session_manager_subscribe_tcp(struct lua_state *state) { - return lua_plugin_manage_msg_mq_create_topic(state, MQ_TYPE_PACKET); + return lua_session_manager_subscribe(state, sub_type_tcp); } -int lua_packet_mq_get_topic_id(struct lua_state *state) +int lua_session_manager_subscribe_udp(struct lua_state *state) { - return lua_plugin_manage_msg_mq_get_topic_id(state, MQ_TYPE_PACKET); + return lua_session_manager_subscribe(state, sub_type_udp); } -int lua_packet_mq_update_topic(struct lua_state *state) +int lua_session_manager_subscribe_control_packet(struct lua_state *state) { - return lua_plugin_manage_msg_mq_update_topic(state, MQ_TYPE_PACKET); + return lua_session_manager_subscribe(state, sub_type_control_pkt); } -int lua_packet_mq_destory_topic(struct lua_state *state) -{ - return lua_plugin_manage_msg_mq_destory_topic(state, MQ_TYPE_PACKET); -} - -int lua_packet_mq_subscribe_topic(struct lua_state *state) -{ - return lua_plugin_manage_msg_mq_subscribe_topic(state, MQ_TYPE_PACKET); -} - -int lua_packet_mq_publish_message(struct lua_state *state) -{ - return lua_plugin_manage_msg_mq_publish_message(state, MQ_TYPE_PACKET); -} - -/* ***** ***** ***** ***** ***** ***** */ -int lua_session_mq_create_topic(struct lua_state *state) -{ - return lua_plugin_manage_msg_mq_create_topic(state, MQ_TYPE_SESSION); -} - -int lua_session_mq_get_topic_id(struct lua_state *state) -{ - return lua_plugin_manage_msg_mq_get_topic_id(state, MQ_TYPE_SESSION); -} - -int lua_session_mq_update_topic(struct lua_state *state) -{ - return lua_plugin_manage_msg_mq_update_topic(state, MQ_TYPE_SESSION); -} - -int lua_session_mq_destory_topic(struct lua_state *state) -{ - return lua_plugin_manage_msg_mq_destory_topic(state, MQ_TYPE_SESSION); -} - -int lua_session_mq_subscribe_topic(struct lua_state *state) -{ - return lua_plugin_manage_msg_mq_subscribe_topic(state, MQ_TYPE_SESSION); -} - -int lua_session_mq_publish_message(struct lua_state *state) -{ - return lua_plugin_manage_msg_mq_publish_message(state, MQ_TYPE_SESSION); -} - -int lua_session_mq_ignore_message(struct lua_state *state) +int lua_session_manager_subscribe_tcp_stream(struct lua_state *state) { lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 3) - goto err; - - int plugin_id = lua_tointeger(L, -1); - lua_pop(L, 1); - - int topic_id = lua_tointeger(L, -1); - lua_pop(L, 1); - - struct session *sess = (struct session *)lua_topointer(L, -1); - lua_settop(L, 0); - - if (session_mq_ignore_message(sess, topic_id, plugin_id)) - goto err; - - lua_pushboolean(L, 1); - return 1; -err: - lua_settop(L, 0); - lua_pushboolean(L, 0); - return 1; -} - -int lua_session_mq_unignore_message(struct lua_state *state) -{ - lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 4) - goto err; - - int plugin_id = lua_tointeger(L, -1); - lua_pop(L, 1); - - int topic_id = lua_tointeger(L, -1); - lua_pop(L, 1); - - struct session *sess = (struct session *)lua_topointer(L, -1); - lua_settop(L, 0); - - if (session_mq_unignore_message(sess, topic_id, plugin_id)) - goto err; - lua_pushboolean(L, 1); - return 1; -err: - lua_settop(L, 0); - lua_pushboolean(L, 0); - - return 1; -} - -int lua_session_mq_topic_is_active(struct lua_state *state) -{ - lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 2) - goto err; - - int topic_id = lua_tointeger(L, -1); - lua_pop(L, 1); + if (lua_gettop(L) != 3 || lua_type(L, -2) != LUA_TFUNCTION || lua_type(L, -4) != LUA_TLIGHTUSERDATA) + { + lua_settop(L, 0); + return 0; + } - struct session *sess = (struct session *)lua_topointer(L, -1); + int on_tcp_stream_arg_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + int on_tcp_stream_fn_ref_id = luaL_ref(L, LUA_REGISTRYINDEX); + struct session_manager *sess_mgr = (struct session_manager *)lua_topointer(L, -1); lua_settop(L, 0); - /* 1 means active */ - if (session_mq_topic_is_active(sess, topic_id) != 1) - goto err; - lua_pushboolean(L, 1); - return 1; -err: - lua_settop(L, 0); - lua_pushboolean(L, 0); - return 1; -} - -static const char lua_base64_encode_table[] = { - 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', - 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', - 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', - 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', - 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', - 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', - '8', '9', '+', '/'}; - -static int lua_base64_encode(const char *indata, int inlen, char *outdata, int *outlen) -{ - if (indata == NULL || inlen <= 0) + struct lua_fn_arg_pair *new_on_tcp_stream_arg = CALLOC(struct lua_fn_arg_pair, 1); + new_on_tcp_stream_arg->lua_arg_ref_id = on_tcp_stream_arg_ref_id; + new_on_tcp_stream_arg->lua_fn_ref_id = on_tcp_stream_fn_ref_id; + int subscribe_ret = session_manager_subscribe_tcp_stream(sess_mgr, lua_manager_tcp_stream_callback, new_on_tcp_stream_arg); + if (subscribe_ret) { - return -1; - } - int i, j; - unsigned char num = inlen % 3; - if (outdata != NULL) - { - // 编码,3个字节一组,若数据总长度不是3的倍数,则跳过最后的 num 个字节数据 - for (i = 0, j = 0; i < inlen - num; i += 3, j += 4) - { - outdata[j] = lua_base64_encode_table[(unsigned char)indata[i] >> 2]; - outdata[j + 1] = lua_base64_encode_table[(((unsigned char)indata[i] & 0x03) << 4) | ((unsigned char)indata[i + 1] >> 4)]; - outdata[j + 2] = lua_base64_encode_table[(((unsigned char)indata[i + 1] & 0x0f) << 2) | ((unsigned char)indata[i + 2] >> 6)]; - outdata[j + 3] = lua_base64_encode_table[(unsigned char)indata[i + 2] & 0x3f]; - } - // 继续处理最后的 num 个字节的数据 - if (num == 1) - { // 余数为1,需补齐两个字节'=' - outdata[j] = lua_base64_encode_table[(unsigned char)indata[inlen - 1] >> 2]; - outdata[j + 1] = lua_base64_encode_table[((unsigned char)indata[inlen - 1] & 0x03) << 4]; - outdata[j + 2] = '='; - outdata[j + 3] = '='; - } - else if (num == 2) - { // 余数为2,需补齐一个字节'=' - outdata[j] = lua_base64_encode_table[(unsigned char)indata[inlen - 2] >> 2]; - outdata[j + 1] = lua_base64_encode_table[(((unsigned char)indata[inlen - 2] & 0x03) << 4) | ((unsigned char)indata[inlen - 1] >> 4)]; - outdata[j + 2] = lua_base64_encode_table[((unsigned char)indata[inlen - 1] & 0x0f) << 2]; - outdata[j + 3] = '='; - } + if (on_tcp_stream_arg_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_tcp_stream_arg_ref_id); + if (on_tcp_stream_fn_ref_id + 1) + luaL_unref(L, LUA_REGISTRYINDEX, on_tcp_stream_fn_ref_id); + if (new_on_tcp_stream_arg) + FREE(new_on_tcp_stream_arg); } - if (outlen != NULL) + else { - *outlen = (inlen + (num == 0 ? 0 : 3 - num)) * 4 / 3; + struct lua_module_manager *lua_mod_mgr = lua_state_get_lua_module_manager(state); + new_on_tcp_stream_arg->lua_mod_mgr = lua_mod_mgr; + LL_APPEND(lua_mod_mgr->on_tcp_stream_list, new_on_tcp_stream_arg); } - return 0; + lua_pushinteger(L, subscribe_ret); + return 1; } - -int lua_session_get0_current_payload(struct lua_state *state) -{ - lua_State *L = (lua_State *)state; - if (lua_gettop(L) != 1) - goto err; - - struct session *sess = (struct session *)lua_topointer(L, -1); - lua_settop(L, 0); - - size_t payload_len = 0; - const char *payload = session_get0_current_payload(sess, &payload_len); - -#if 0 - lua_pushlightuserdata(L, (void *)payload); - lua_pushinteger(L, (lua_Integer)payload_len); - return 2; -err: - lua_settop(L, 0); - lua_pushlightuserdata(L, NULL); - lua_pushboolean(L, 0); - return 2; -#else - char *payload_base64 = (char *)calloc(2, payload_len); - int payload_base64_len = 0; - if (!lua_base64_encode(payload, payload_len, payload_base64, &payload_base64_len)) - { - lua_pushstring(L, payload_base64); - lua_pushinteger(L, (lua_Integer)payload_len); - if (payload_base64) - free(payload_base64); - return 2; - } - if (payload_base64) - free(payload_base64); -err: - lua_settop(L, 0); - lua_pushstring(L, NULL); - lua_pushboolean(L, 0); - return 2; -#endif -}
\ No newline at end of file |
