#include "ftp_decoder_inner.h" #include "ftp_decoder.h" #include "ftp_decoder_util.h" #include #include #include #include void *__ftp_decoder_logger_handle; static void ftp_msg_free_cb(struct session *sess, void *msg, void *msg_free_arg) { free(msg); } static void *ftp_context_ctx_new_cb(struct session *sess, void *plugin_env) { struct ftp_decoder_env *fenv = (struct ftp_decoder_env *)plugin_env; size_t payload_len; const char *payload = session_get0_current_payload(sess, &payload_len); int curdir = ftp_session_get_pkt_dir(sess); int thread_idx = session_get_current_thread_id(sess); int is_ctrl_link = ftp_ctrl_identify(sess, payload, payload_len, curdir); if (is_ctrl_link) { struct ftp_context *fctx = (struct ftp_context *)calloc(1, sizeof(struct ftp_context)); fctx->link_type = FTP_LINK_CTRL; ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_CTRL_LINK_OPEN, 1); ftp_runtime_log(RLOG_LV_DEBUG, "ftp ctrl link new: %s", session_get0_readable_addr(sess)); return fctx; } ftp_runtime_log(RLOG_LV_DEBUG, "ftp_data_identify: is not ctrl link: '%s' ", session_get0_readable_addr(sess)); struct ftp_context *data_ctx = ftp_data_identify(sess, fenv); if (data_ctx) { data_ctx->link_type = FTP_LINK_DATA; ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_DATA_LINK_OPEN, 1); ftp_runtime_log(RLOG_LV_DEBUG, "ftp data link new: '%s' ", session_get0_readable_addr(sess)); return data_ctx; } ftp_runtime_log(RLOG_LV_DEBUG, "ftp_data_identify: is not data link: '%s' ", session_get0_readable_addr(sess)); stellar_session_plugin_dettach_current_session(sess); return NULL; } void ftp_context_ctx_free_cb(struct session *sess, void *session_ctx, void *plugin_env) { struct ftp_context *fctx = (struct ftp_context *)session_ctx; if (NULL == fctx) { return; } int thread_idx = session_get_current_thread_id(sess); if (fctx->link_type == FTP_LINK_CTRL) { ftp_decoder_stat_incrby(thread_idx, (struct ftp_decoder_env *)plugin_env, FTPD_STAT_CTRL_LINK_CLOSE, 1); } else { ftp_decoder_stat_incrby(thread_idx, (struct ftp_decoder_env *)plugin_env, FTPD_STAT_DATA_LINK_CLOSE, 1); } ftp_runtime_log(RLOG_LV_DEBUG, "ftp_context_ctx_free_cb: %s", session_get0_readable_addr(sess)); ftp_parse_result_free(&fctx->parse_result); free(fctx); } static void ftpd_logger_init(struct ftp_decoder_env *fenv) { int log_level; char log_path[256]; MESA_load_profile_int_def(FTP_DECODER_CFG_FILE, "FTP", "log_level", &log_level, 30); MESA_load_profile_string_def(FTP_DECODER_CFG_FILE, "FTP", "log_path", log_path, sizeof(log_path), "./log/ftp_decoder.log"); MESA_handle_runtime_log_creation("./etc/sapp_log.conf"); fenv->logger_handle = MESA_create_runtime_log_handle(log_path, log_level); __ftp_decoder_logger_handle = fenv->logger_handle; return; } void ftp_message_reset_iterator(const struct ftp_message *msg) { msg->fctx->parse_result.cursor = 0; } static void ftp_ctrl_entry(struct session *sess, struct ftp_context *fctx, struct ftp_decoder_env *fenv) { size_t payload_len; const char *payload = session_get0_current_payload(sess, &payload_len); int curdir = ftp_session_get_pkt_dir(sess); if (NULL == payload || payload_len == 0) { return; } int thread_idx = session_get_current_thread_id(sess); ftp_cmd_readline(&fctx->cmd_result, payload, payload_len); ftp_cmd_process(sess, fctx, fenv, curdir); if (curdir == PACKET_DIRECTION_C2S) { ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_CTRL_LINK_BYTES_C2S, (long long)payload_len); } else { ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_CTRL_LINK_BYTES_S2C, (long long)payload_len); } } static void ftp_data_entry(struct session *sess, struct ftp_context *fctx, struct ftp_decoder_env *fenv) { enum session_state sstate = session_get_current_state(sess); if (SESSION_STATE_OPENING == sstate) { memset(fctx->parse_result.push_result_flags, 0, sizeof(fctx->parse_result.push_result_flags)); ftp_runtime_log(RLOG_LV_DEBUG, "ftp data link opening: %s", session_get0_readable_addr(sess)); } size_t payload_len; const char *payload = session_get0_current_payload(sess, &payload_len); if (NULL == payload || payload_len == 0) { return; } enum ftp_msg_type dlink_presentation = fctx->data_link_presentation; if (FTP_INVENTORY != dlink_presentation && FTP_FILE_CONTENT != dlink_presentation) { dlink_presentation = FTP_FILE_CONTENT; // unknown data link type, default as FTP_FILE_CONTENT } fctx->parse_result.result_array[dlink_presentation].iov_base = (void *)payload; fctx->parse_result.result_array[dlink_presentation].iov_len = payload_len; fctx->parse_result.push_result_flags[dlink_presentation] = 0; ftp_decoder_push_msg(sess, fctx, fenv); int thread_idx = session_get_current_thread_id(sess); int curdir = ftp_session_get_pkt_dir(sess); if (curdir == PACKET_DIRECTION_C2S) { ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_DATA_LINK_BYTES_C2S, (long long)payload_len); } else { ftp_decoder_stat_incrby(thread_idx, fenv, FTPD_STAT_DATA_LINK_BYTES_S2C, (long long)payload_len); } } static void ftp_decoder_entry(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) { struct ftp_context *fctx = (struct ftp_context *)per_session_ctx; if (FTP_LINK_CTRL == fctx->link_type) { ftp_ctrl_entry(sess, fctx, (struct ftp_decoder_env *)plugin_env); } else { ftp_data_entry(sess, fctx, (struct ftp_decoder_env *)plugin_env); } } int ftp_message_iterate(const struct ftp_message *msg, enum ftp_msg_type *type, struct iovec *value) { if (msg->fctx->parse_result.cursor >= FTP_MSG_MAX) { return 0; } int index = msg->fctx->parse_result.cursor; for (int i = index; i < (int)FTP_MSG_MAX; i++) { if (msg->fctx->parse_result.push_result_flags[i] == 1) { continue; } if (msg->fctx->parse_result.result_array[i].iov_base == NULL || msg->fctx->parse_result.result_array[i].iov_len == 0) { continue; } *type = (enum ftp_msg_type)i; value->iov_base = msg->fctx->parse_result.result_array[i].iov_base; value->iov_len = msg->fctx->parse_result.result_array[i].iov_len; msg->fctx->parse_result.push_result_flags[i] = 1; msg->fctx->parse_result.cursor = index + 1; return 1; } return 0; } int ftp_decoder_push_msg(struct session *sess, struct ftp_context *fctx, struct ftp_decoder_env *fenv) { struct ftp_message *fmsg = (struct ftp_message *)calloc(1, sizeof(struct ftp_message)); fmsg->fctx = fctx; fctx->parse_result.cursor = 0; if (FTP_LINK_CTRL == fctx->link_type) { session_mq_publish_message(sess, fenv->ftp_pub_ctrl_topic_id, fmsg); } else { session_mq_publish_message(sess, fenv->ftp_pub_data_topic_id, fmsg); } return 0; } extern "C" void *FTP_ONLOAD(struct stellar *st) { struct ftp_decoder_env *fenv = (struct ftp_decoder_env *)calloc(1, sizeof(struct ftp_decoder_env)); fenv->thread_count = stellar_get_worker_thread_num(st); assert(fenv->thread_count >= 1); fenv->plugin_id = stellar_session_plugin_register(st, ftp_context_ctx_new_cb, ftp_context_ctx_free_cb, fenv); fenv->tcp_sub_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM); stellar_session_mq_subscribe(st, fenv->tcp_sub_topic_id, ftp_decoder_entry, fenv->plugin_id); fenv->ftp_pub_ctrl_topic_id = stellar_session_mq_create_topic(st, FTP_DECODER_CTRL_TOPIC, ftp_msg_free_cb, fenv); fenv->ftp_pub_data_topic_id = stellar_session_mq_create_topic(st, FTP_DECODER_DATA_TOPIC, ftp_msg_free_cb, fenv); ftp_hash_table_create(fenv); ftp_decoder_stat_init(fenv); ftpd_logger_init(fenv); ftp_runtime_log(RLOG_LV_DEBUG, "FTP_ONLOAD success."); return fenv; } extern "C" void FTP_UNLOAD(void *plugin_env) { struct ftp_decoder_env *fenv = (struct ftp_decoder_env *)plugin_env; ftp_runtime_log(RLOG_LV_DEBUG, "FTP_UNLOAD ..."); ftp_hash_table_destroy(fenv); ftp_decoder_stat_free(fenv); __ftp_decoder_logger_handle = NULL; MESA_destroy_runtime_log_handle(fenv->logger_handle); free(fenv); }