summaryrefslogtreecommitdiff
path: root/src/ftp_decoder_entry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ftp_decoder_entry.cpp')
-rw-r--r--src/ftp_decoder_entry.cpp227
1 files changed, 227 insertions, 0 deletions
diff --git a/src/ftp_decoder_entry.cpp b/src/ftp_decoder_entry.cpp
new file mode 100644
index 0000000..3dc0df0
--- /dev/null
+++ b/src/ftp_decoder_entry.cpp
@@ -0,0 +1,227 @@
+#include "ftp_decoder_inner.h"
+#include "ftp_decoder.h"
+#include "ftp_decoder_util.h"
+#include <stellar/session.h>
+#include <sys/types.h>
+#include <MESA/MESA_prof_load.h>
+#include <MESA/MESA_handle_logger.h>
+
+void *__ftp_decoder_logger_handle;
+
+static void ftp_msg_free_cb(struct session *sess, void *msg, void *msg_free_arg)
+{
+ free(msg);
+}
+
+static void *ftp_context_ctx_new_cb(struct session *sess, void *plugin_env)
+{
+ struct ftp_decoder_env *fenv = (struct ftp_decoder_env *)plugin_env;
+ size_t payload_len;
+ const char *payload = session_get0_current_payload(sess, &payload_len);
+ int curdir = ftp_session_get_pkt_dir(sess);
+ int thread_idx = session_get_current_thread_id(sess);
+ int is_ctrl_link = ftp_ctrl_identify(sess, payload, payload_len, curdir);
+ if (is_ctrl_link)
+ {
+ struct ftp_context *fctx = (struct ftp_context *)calloc(1, sizeof(struct ftp_context));
+ fctx->link_type = FTP_LINK_CTRL;
+ ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_CTRL_LINK_OPEN, 1);
+ ftp_runtime_log(RLOG_LV_DEBUG, "ftp ctrl link new: %s", session_get0_readable_addr(sess));
+ return fctx;
+ }
+ ftp_runtime_log(RLOG_LV_DEBUG, "ftp_data_identify: is not ctrl link: '%s' ", session_get0_readable_addr(sess));
+
+ struct ftp_context *data_ctx = ftp_data_identify(sess, fenv);
+ if (data_ctx)
+ {
+ data_ctx->link_type = FTP_LINK_DATA;
+ ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_DATA_LINK_OPEN, 1);
+ ftp_runtime_log(RLOG_LV_DEBUG, "ftp data link new: '%s' ", session_get0_readable_addr(sess));
+ return data_ctx;
+ }
+ ftp_runtime_log(RLOG_LV_DEBUG, "ftp_data_identify: is not data link: '%s' ", session_get0_readable_addr(sess));
+ stellar_session_plugin_dettach_current_session(sess);
+ return NULL;
+}
+
+void ftp_context_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct ftp_context *fctx = (struct ftp_context *)session_ctx;
+ if (NULL == fctx)
+ {
+ return;
+ }
+ int thread_idx = session_get_current_thread_id(sess);
+ if (fctx->link_type == FTP_LINK_CTRL)
+ {
+ ftp_decoder_stat_incrby(thread_idx, (struct ftp_decoder_env *)plugin_env, FTPD_STAT_CTRL_LINK_CLOSE, 1);
+ }
+ else
+ {
+ ftp_decoder_stat_incrby(thread_idx, (struct ftp_decoder_env *)plugin_env, FTPD_STAT_DATA_LINK_CLOSE, 1);
+ }
+ ftp_runtime_log(RLOG_LV_DEBUG, "ftp_context_ctx_free_cb: %s", session_get0_readable_addr(sess));
+ ftp_parse_result_free(&fctx->parse_result);
+ free(fctx);
+}
+
+static void ftpd_logger_init(struct ftp_decoder_env *fenv)
+{
+ int log_level;
+ char log_path[256];
+ MESA_load_profile_int_def(FTP_DECODER_CFG_FILE, "FTP", "log_level", &log_level, 30);
+ MESA_load_profile_string_def(FTP_DECODER_CFG_FILE, "FTP", "log_path", log_path, sizeof(log_path), "./log/ftp_decoder.log");
+ MESA_handle_runtime_log_creation("./etc/sapp_log.conf");
+ fenv->logger_handle = MESA_create_runtime_log_handle(log_path, log_level);
+ __ftp_decoder_logger_handle = fenv->logger_handle;
+ return;
+}
+
+void ftp_message_reset_iterator(const struct ftp_message *msg)
+{
+ msg->fctx->parse_result.cursor = 0;
+}
+
+static void ftp_ctrl_entry(struct session *sess, struct ftp_context *fctx, struct ftp_decoder_env *fenv)
+{
+ size_t payload_len;
+ const char *payload = session_get0_current_payload(sess, &payload_len);
+ int curdir = ftp_session_get_pkt_dir(sess);
+ if (NULL == payload || payload_len == 0)
+ {
+ return;
+ }
+ int thread_idx = session_get_current_thread_id(sess);
+ ftp_cmd_readline(&fctx->cmd_result, payload, payload_len);
+ ftp_cmd_process(sess, fctx, fenv, curdir);
+ if (curdir == PACKET_DIRECTION_C2S)
+ {
+ ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_CTRL_LINK_BYTES_C2S, (long long)payload_len);
+ }
+ else
+ {
+ ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_CTRL_LINK_BYTES_S2C, (long long)payload_len);
+ }
+}
+
+static void ftp_data_entry(struct session *sess, struct ftp_context *fctx, struct ftp_decoder_env *fenv)
+{
+ enum session_state sstate = session_get_current_state(sess);
+ if (SESSION_STATE_OPENING == sstate)
+ {
+ memset(fctx->parse_result.push_result_flags, 0, sizeof(fctx->parse_result.push_result_flags));
+ ftp_runtime_log(RLOG_LV_DEBUG, "ftp data link opening: %s", session_get0_readable_addr(sess));
+ }
+
+ size_t payload_len;
+ const char *payload = session_get0_current_payload(sess, &payload_len);
+ if (NULL == payload || payload_len == 0)
+ {
+ return;
+ }
+ enum ftp_msg_type dlink_presentation = fctx->data_link_presentation;
+ if (FTP_INVENTORY != dlink_presentation && FTP_FILE_CONTENT != dlink_presentation)
+ {
+ dlink_presentation = FTP_FILE_CONTENT; // unknown data link type, default as FTP_FILE_CONTENT
+ }
+ fctx->parse_result.result_array[dlink_presentation].iov_base = (void *)payload;
+ fctx->parse_result.result_array[dlink_presentation].iov_len = payload_len;
+ fctx->parse_result.push_result_flags[dlink_presentation] = 0;
+
+ ftp_decoder_push_msg(sess, fctx, fenv);
+ int thread_idx = session_get_current_thread_id(sess);
+ int curdir = ftp_session_get_pkt_dir(sess);
+ if (curdir == PACKET_DIRECTION_C2S)
+ {
+ ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_DATA_LINK_BYTES_C2S, (long long)payload_len);
+ }
+ else
+ {
+ ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_DATA_LINK_BYTES_S2C, (long long)payload_len);
+ }
+}
+
+static void ftp_decoder_entry(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct ftp_context *fctx = (struct ftp_context *)per_session_ctx;
+ if (FTP_LINK_CTRL == fctx->link_type)
+ {
+ ftp_ctrl_entry(sess, fctx, (struct ftp_decoder_env *)plugin_env);
+ }
+ else
+ {
+ ftp_data_entry(sess, fctx, (struct ftp_decoder_env *)plugin_env);
+ }
+}
+
+int ftp_message_iterate(const struct ftp_message *msg, enum ftp_msg_type *type, struct iovec *value)
+{
+ if (msg->fctx->parse_result.cursor >= FTP_MSG_MAX)
+ {
+ return 0;
+ }
+ int index = msg->fctx->parse_result.cursor;
+ for (int i = index; i < (int)FTP_MSG_MAX; i++)
+ {
+ if (msg->fctx->parse_result.push_result_flags[i] == 1)
+ {
+ continue;
+ }
+ if (msg->fctx->parse_result.result_array[i].iov_base == NULL || msg->fctx->parse_result.result_array[i].iov_len == 0)
+ {
+ continue;
+ }
+ *type = (enum ftp_msg_type)i;
+ value->iov_base = msg->fctx->parse_result.result_array[i].iov_base;
+ value->iov_len = msg->fctx->parse_result.result_array[i].iov_len;
+ msg->fctx->parse_result.push_result_flags[i] = 1;
+ msg->fctx->parse_result.cursor = index + 1;
+ return 1;
+ }
+ return 0;
+}
+
+int ftp_decoder_push_msg(struct session *sess, struct ftp_context *fctx, struct ftp_decoder_env *fenv)
+{
+ struct ftp_message *fmsg = (struct ftp_message *)calloc(1, sizeof(struct ftp_message));
+ fmsg->fctx = fctx;
+ fctx->parse_result.cursor = 0;
+ if (FTP_LINK_CTRL == fctx->link_type)
+ {
+ session_mq_publish_message(sess, fenv->ftp_pub_ctrl_topic_id, fmsg);
+ }
+ else
+ {
+ session_mq_publish_message(sess, fenv->ftp_pub_data_topic_id, fmsg);
+ }
+ return 0;
+}
+
+extern "C" void *FTP_ONLOAD(struct stellar *st)
+{
+ struct ftp_decoder_env *fenv = (struct ftp_decoder_env *)calloc(1, sizeof(struct ftp_decoder_env));
+
+ fenv->thread_count = stellar_get_worker_thread_num(st);
+ assert(fenv->thread_count >= 1);
+ fenv->plugin_id = stellar_session_plugin_register(st, ftp_context_ctx_new_cb, ftp_context_ctx_free_cb, fenv);
+ fenv->tcp_sub_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM);
+ stellar_session_mq_subscribe(st, fenv->tcp_sub_topic_id, ftp_decoder_entry, fenv->plugin_id);
+ fenv->ftp_pub_ctrl_topic_id = stellar_session_mq_create_topic(st, FTP_DECODER_CTRL_TOPIC, ftp_msg_free_cb, fenv);
+ fenv->ftp_pub_data_topic_id = stellar_session_mq_create_topic(st, FTP_DECODER_DATA_TOPIC, ftp_msg_free_cb, fenv);
+ ftp_hash_table_create(fenv);
+ ftp_decoder_stat_init(fenv);
+ ftpd_logger_init(fenv);
+ ftp_runtime_log(RLOG_LV_DEBUG, "FTP_ONLOAD success.");
+ return fenv;
+}
+
+extern "C" void FTP_UNLOAD(void *plugin_env)
+{
+ struct ftp_decoder_env *fenv = (struct ftp_decoder_env *)plugin_env;
+ ftp_runtime_log(RLOG_LV_DEBUG, "FTP_UNLOAD ...");
+ ftp_hash_table_destroy(fenv);
+ ftp_decoder_stat_free(fenv);
+ __ftp_decoder_logger_handle = NULL;
+ MESA_destroy_runtime_log_handle(fenv->logger_handle);
+ free(fenv);
+} \ No newline at end of file