diff options
Diffstat (limited to 'decoders/rtp/rtp.c')
| -rw-r--r-- | decoders/rtp/rtp.c | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/decoders/rtp/rtp.c b/decoders/rtp/rtp.c new file mode 100644 index 0000000..03bd10e --- /dev/null +++ b/decoders/rtp/rtp.c @@ -0,0 +1,271 @@ +#include "stellar/packet.h" +#include "stellar/utils.h" +#include "stellar/mq.h" +#include "stellar/session.h" +#include "stellar/rtp.h" + +#include "rtp_internal.h" + +static void rtp_message_free(void *msg, void *arg) +{ + UNUSED(arg); + if (msg) { + free(msg); + } +} + +static void rtp_message_dispatch(int topic, void *msg, on_msg_cb_func *msg_cb, void *arg, void *dispatch_arg) +{ + UNUSED(topic); + UNUSED(dispatch_arg); + struct rtp_message *rtp_msg; + rtp_callback_func *rtp_msg_cb; + + rtp_msg = (struct rtp_message *)msg; + rtp_msg_cb = (rtp_callback_func *)(void *)msg_cb; + rtp_msg_cb(rtp_msg->sess, rtp_msg->hdr, rtp_msg->payload, rtp_msg->payload_len, arg); +} + +static int rtp_decode(struct rtp_module_ctx *mod_ctx, struct rtp_exdata *exdata, struct session *sess, const struct packet *pkt) +{ + UNUSED(exdata); + int ret; + struct rtp_message *msg; + + msg = (struct rtp_message *)calloc(1, sizeof(struct rtp_message)); + msg->sess = sess; + msg->hdr = (struct rtp_header *)packet_get_payload_data(pkt); + msg->payload = (char *)msg->hdr + RTP_HEADER_LEN; + msg->payload_len = packet_get_payload_len(pkt) - RTP_HEADER_LEN; + + ret = mq_runtime_publish_message(stellar_module_manager_get_mq_runtime(mod_ctx->mod_mgr), mod_ctx->topic_id, msg); + if (ret != 0) { + rtp_message_free(msg, NULL); + } + + return ret; +} + +static enum rtp_identify_state rtp_session_identify(struct rtp_module_ctx *mod_ctx, struct rtp_exdata *exdata, struct session *sess, const struct packet *pkt) +{ + UNUSED(mod_ctx); + UNUSED(sess); + unsigned short src_port, dst_port; + unsigned short last_seq, curr_seq; + unsigned int last_ssrc, curr_ssrc; + size_t pkt_len; + enum packet_direction pkt_dir; + const struct layer *packet_layer; + struct stun_header *stun_hdr; + struct rtp_header* rtp_hdr; + + if (exdata->identify_state == RTP_IDENTIFY_STATE_FALSE || + exdata->identify_state == RTP_IDENTIFY_STATE_TRUE) { + goto exit; + } + + if (exdata->identify_times++ > RTP_IDENTIFY_TIMES_MAX) { + exdata->sess_ignore = 1; + } + + pkt_len = packet_get_payload_len(pkt); + if (pkt_len < RTP_HEADER_LEN) { + exdata->identify_state = RTP_IDENTIFY_STATE_TRUE; + goto exit; + } + + packet_layer = packet_get_layer_by_idx(pkt, packet_get_layer_count(pkt) - 1); + src_port = packet_layer->hdr.udp->source; + dst_port = packet_layer->hdr.udp->dest; + if (dst_port ==5060 || dst_port==5061 || dst_port==53 || dst_port==443 || + src_port==5060 || src_port==5061 || src_port==53 || src_port==443) { + exdata->identify_state = RTP_IDENTIFY_STATE_FALSE; + goto exit; + } + + stun_hdr = (struct stun_header *)packet_get_payload_data(pkt); + if(((ntohs(stun_hdr->type) == STUN_BINDING_REQUEST) || + (ntohs(stun_hdr->type) == STUN_BINDING_INDECATION) || + (ntohs(stun_hdr->type) == STUN_BINDING_RESPONSE) || + (ntohs(stun_hdr->type) == STUN_BINDING_ERROR_RESPONSE)) && + pkt_len >= STUN_HEADER_LEN && (ntohs(stun_hdr->len) == pkt_len - STUN_HEADER_LEN)) { + goto exit; + } + + rtp_hdr = (struct rtp_header *)packet_get_payload_data(pkt); + if(rtp_hdr->version != 2 || rtp_hdr->padding != 0 || rtp_hdr->extension != 0 || rtp_hdr->csrc_len != 0) { + exdata->identify_state = RTP_IDENTIFY_STATE_FALSE; + goto exit; + } + + curr_seq = (unsigned short)ntohs(rtp_hdr->seq); + curr_ssrc = (unsigned int)ntohl(rtp_hdr->ssrc); + + pkt_dir = packet_get_direction(pkt); + switch (exdata->identify_state) { + case RTP_IDENTIFY_STATE_UNKNOWN: + exdata->first_pkt_dir = pkt_dir; + exdata->identify_state = RTP_IDENTIFY_STATE_HALF_TRUE; + break; + case RTP_IDENTIFY_STATE_HALF_TRUE: + if(exdata->first_pkt_dir == pkt_dir) { + last_seq = exdata->last_client_seq; + last_ssrc = exdata->last_client_ssrc; + } else { + last_seq = exdata->last_server_seq; + last_ssrc = exdata->last_server_ssrc; + } + + if (last_ssrc == 0 || last_seq == 0) { + break; + } + + if (curr_ssrc != last_ssrc) { + exdata->identify_state = RTP_IDENTIFY_STATE_FALSE; + break; + } + + if (curr_seq < last_seq) { + exdata->identify_state = RTP_IDENTIFY_STATE_FALSE; + break; + } + + if (curr_seq - last_seq == 1) { + exdata->identify_state = RTP_IDENTIFY_STATE_TRUE; + break; + } + break; + default: + break; + } + + if (exdata->first_pkt_dir == pkt_dir) { + exdata->last_client_seq = curr_seq; + exdata->last_client_ssrc = curr_ssrc; + } else { + exdata->last_server_seq = curr_seq; + exdata->last_server_ssrc = curr_ssrc; + } + +exit: + return exdata->identify_state; +} + +static void rtp_session_entry(struct session *sess, enum session_state state, struct packet *pkt, void *arg) +{ + UNUSED(state); + int ret; + enum rtp_identify_state identify_state; + struct rtp_exdata *exdata; + struct rtp_module_ctx *mod_ctx; + + if (pkt == NULL) { + return; + } + + mod_ctx = (struct rtp_module_ctx *)arg; + exdata = (struct rtp_exdata *)session_get_exdata(sess, mod_ctx->exdata_id); + if (exdata == NULL) { + exdata= (struct rtp_exdata *)calloc(1, sizeof(struct rtp_exdata)); + session_set_exdata(sess, mod_ctx->exdata_id, exdata); + } + + if (exdata->sess_ignore) { + return; + } + + identify_state = rtp_session_identify(mod_ctx, exdata, sess, (const struct packet *)pkt); + if (identify_state != RTP_IDENTIFY_STATE_TRUE) { + return; + } + + ret = rtp_decode(mod_ctx, exdata, sess, (const struct packet *)pkt); + if (ret != 0) { + exdata->sess_ignore = 1; + return; + } +} + +static void rtp_exdata_free(int idx, void *ex_ptr, void *arg) +{ + UNUSED(idx); + UNUSED(arg); + if (ex_ptr) { + free(ex_ptr); + } +} + +int rtp_subscribe(struct stellar_module_manager *mod_mgr, rtp_callback_func *cb, void *arg) +{ + int topic; + struct mq_schema *schema; + + if (mod_mgr == NULL) { + return -1; + } + + schema = stellar_module_manager_get_mq_schema(mod_mgr); + if (schema == NULL) { + return -1; + } + + topic = mq_schema_get_topic_id(schema, RTP_TOPIC_NAME); + if (topic < 0) { + topic = mq_schema_create_topic(schema, RTP_TOPIC_NAME, rtp_message_dispatch, NULL, rtp_message_free, NULL); + } + + return mq_schema_subscribe(schema, topic, (on_msg_cb_func *)(void *)cb, arg); +} + +void rtp_exit(struct stellar_module_manager *mod_mgr, struct stellar_module *mod) +{ + struct rtp_module_ctx *mod_ctx; + + if (mod_mgr && mod) { + mod_ctx = (struct rtp_module_ctx *)stellar_module_get_ctx(mod); + if (mod_ctx) { + free(mod_ctx); + } + + stellar_module_free(mod); + } +} + +struct stellar_module* rtp_init(struct stellar_module_manager *mod_mgr) +{ + struct mq_schema *schema; + struct session_manager *sess_mgr; + struct stellar_module *mod; + struct rtp_module_ctx *mod_ctx; + + mod_ctx = (struct rtp_module_ctx *)calloc(1, sizeof(struct rtp_module_ctx)); + mod = stellar_module_new(RTP_MODULE_NAME, mod_ctx); + sess_mgr = stellar_module_get_session_manager(mod_mgr); + schema = stellar_module_manager_get_mq_schema(mod_mgr); + + if (mod_mgr == NULL || sess_mgr == NULL || schema == NULL) { + goto exit; + } + + mod_ctx->exdata_id = session_manager_new_session_exdata_index(sess_mgr, RTP_EXDATA_NAME, rtp_exdata_free, NULL); + if (mod_ctx->exdata_id < 0) { + goto exit; + } + + mod_ctx->topic_id = mq_schema_get_topic_id(schema, RTP_TOPIC_NAME); + if (mod_ctx->topic_id < 0) { + mod_ctx->topic_id = mq_schema_create_topic(schema, RTP_TOPIC_NAME, rtp_message_dispatch, mod_ctx, rtp_message_free, NULL); + if (mod_ctx->topic_id < 0) { + goto exit; + } + } + + session_manager_subscribe_udp(sess_mgr, rtp_session_entry, mod_ctx); + + return mod; +exit: + rtp_exit(mod_mgr, mod); + return NULL; +} + + |
