diff options
Diffstat (limited to 'decoders/sip/sip.c')
| -rw-r--r-- | decoders/sip/sip.c | 1493 |
1 files changed, 1493 insertions, 0 deletions
diff --git a/decoders/sip/sip.c b/decoders/sip/sip.c new file mode 100644 index 0000000..830ac7c --- /dev/null +++ b/decoders/sip/sip.c @@ -0,0 +1,1493 @@ +#include <stdio.h> +#include <limits.h> +#include <ctype.h> +#include <assert.h> +#include <sys/time.h> + +#include "stellar/packet.h" +#include "stellar/utils.h" +#include "stellar/mq.h" +#include "stellar/session.h" +#include "stellar/sip.h" + +#include "sip_internal.h" + +const char* g_sip_version[] = { + "unknown", + "sip/1.0", + "sip/1.1", + "sip/2.0" +}; + + +const char* g_sip_method[] = { + "unknown", + "invite", + "ack", + "options", + "register", + "bye", + "cancel", + "do", + "info", + "message", + "notify", + "prack", + "qauth", + "refer", + "sprack", + "subscribe", + "update", + "publish" +}; + +static int strntoi(char *buff, size_t len) { + if (buff == NULL || len == 0) { + return 0; + } + + int result = 0; + int sign = 1; + size_t i = 0; + + while (i < len && isspace(buff[i])) { + i++; + } + + if (i < len && buff[i] == '-') { + sign = -1; + i++; + } else if (i < len && buff[i] == '+') { + i++; + } + + for (; i < len; i++) { + if (!isdigit(buff[i])) { + break; + } + + int digit = buff[i] - '0'; + + if (result > (INT_MAX - digit) / 10) { + return (sign == 1) ? INT_MAX : INT_MIN; + } + + result = result * 10 + digit; + } + + return result * sign; +} + +static enum sip_version sip_strn2version(const char *data, size_t len) +{ + int i; + for(i = 1; i < SIP_VERSION_NUM; i++) { + if(0 == SAFE_STRNCASECMP(data, len, g_sip_version[i], strlen(g_sip_version[i]))) { + break; + } + } + if (i == SIP_VERSION_NUM) { + return SIP_VERSION_UNKNOWN; + } + return (enum sip_version)i; +} + +enum sip_method sip_strn2method(const char *data, size_t len) +{ + int i; + if (data == NULL || len == 0) { + return SIP_METHOD_UNKNOWN; + } + for(i = 1; i < SIP_METHOD_NUM; i++) { + if(0 == SAFE_STRNCASECMP(data, len, g_sip_method[i], strlen(g_sip_method[i]))) { + break; + } + } + if (i == SIP_METHOD_NUM) { + return SIP_METHOD_UNKNOWN; + } + return (enum sip_method)i; +} + +static char* strntrim_sp(char *str, size_t *len) { + char *end; + + while (*str == ' ' && *len > 0) { + str++; + (*len)--; + } + + if (*len == 0) { + return str; + } + + end = str + *len - 1; + while (end > str && *end == ' ') { + end--; + (*len)--; + } + + return str; +} + +static int strnsplit(struct iovec *out, size_t n_out, char delimiter, const char *data, size_t data_len) +{ + size_t i; + size_t offset; + size_t part_len; + const char *data_end; + const char *part_start; + const char *part_end; + + if (out == NULL || n_out == 0 || data == NULL || data_len == 0) { + return -1; + } + + offset = 0; + data_end = data + data_len; + + for (i = 0; i < n_out - 1; i++) { + part_start = data + offset; + part_end = memchr(part_start, delimiter, data_end - part_start); + + if (part_end != NULL) { + // Found a delimiter + part_len = part_end - part_start; + out[i].iov_base = strntrim_sp((char*)part_start, &part_len); + out[i].iov_len = part_len; + offset += part_len + 1; + } else { + // Last part, no delimiter found + part_len = data_end - part_start; + out[i].iov_base = strntrim_sp((char*)part_start, &part_len); + out[i].iov_len = part_len; + offset += part_len; + return i + 1; + } + + if (offset >= data_len) { + return i + 1; + } + } + + // last part + part_start = data + offset; + part_len = data_end - part_start; + out[i].iov_base = strntrim_sp((char*)part_start, &part_len); + out[i].iov_len = part_len; + + return n_out; +} + +static inline struct sip_header_field_pool* field_pool_new(size_t pool_size) +{ + struct sip_header_field_pool *pool; + + pool = calloc(1, sizeof(struct sip_header_field_pool)); + pool->arr = calloc(pool_size, sizeof(struct sip_header_field)); + pool->size = pool_size; + pool->used = 0; + + return pool; +} + +static inline void field_pool_free(struct sip_header_field_pool *pool) +{ + if (pool) { + if (pool->arr) { + free(pool->arr); + } + free(pool); + } +} + +static inline void field_pool_reset(struct sip_header_field_pool *pool) +{ + if (pool && pool->arr && pool->used != 0) { + pool->used = 0; + } +} + +static inline struct sip_header_field *field_pool_series_get(struct sip_header_field_pool *pool) +{ + struct sip_header_field *header_field; + if (pool && pool ->arr && pool->used < pool->size) { + header_field = &pool->arr[pool->used++]; + memset(header_field, 0, sizeof(struct sip_header_field)); + return header_field; + } + return NULL; +} +static inline void stream_buffer_deinit(struct stream_buffer *sb) +{ + sb->buf_size = 0; + sb->buf_off = 0; + if (sb->buf) { + free(sb->buf); + } +} + +static inline void stream_buffer_init(struct stream_buffer *sb, size_t size) +{ + sb->buf_size = size; + sb->buf_off = 0; + sb->buf = NULL; +} + +static inline int stream_buffer_is_full(struct stream_buffer *sb) +{ + if (sb->buf && sb->buf_off == sb->buf_size) { + return 1; + } + return 0; +} + +static inline int stream_buffer_is_empty(struct stream_buffer *sb) +{ + if (sb->buf == NULL || sb->buf_off == 0) { + return 1; + } + return 0; +} + +static inline void stream_buffer_reset(struct stream_buffer *sb) +{ + if (sb->buf && sb->buf_off > 0) { + sb->buf_off = 0; + } +} + +static inline void stream_buffer_append(struct stream_buffer *sb, const char *data, size_t data_len) +{ + if (sb->buf == NULL && sb->buf_size > 0) { + sb->buf = (char *)malloc(sb->buf_size); + sb->buf_off = 0; + } + if (sb->buf_off + data_len > sb->buf_size) { + data_len = sb->buf_size - sb->buf_off; + } + memcpy(sb->buf + sb->buf_off, data, data_len); +} + +static inline void stream_buffer_slide(struct stream_buffer *sb, size_t offset) +{ + if (sb->buf) { + if (sb->buf_off > offset) { + memmove(sb->buf, sb->buf + offset, sb->buf_off - offset); + sb->buf_off = sb->buf_off - offset; + } else { + stream_buffer_reset(sb); + } + } +} + +static inline void stream_buffer_get_data(struct stream_buffer *sb, const char **data, size_t *data_len, size_t offset) +{ + *data = NULL; + *data_len = 0; + if (sb->buf && sb->buf_off > offset) { + *data = sb->buf + offset; + *data_len = sb->buf_off - offset; + } +} + +static void sip_message_free(struct sip_message *msg) +{ + if (msg) { + free(msg); + } +} + +static struct sip_message* sip_message_new(void) +{ + struct sip_message *msg; + msg = (struct sip_message *)calloc(1, sizeof(struct sip_message)); + return msg; +} + +void sip_message_print(struct sip_message *msg) +{ + if (msg == NULL) { + printf("NULL SIP message\n"); + return; + } + + printf("SIP Message Type: %d\n", msg->type); + printf("Call State: %d\n", msg->call_state); + printf("Transaction Sequence: %d\n", msg->transaction_seq); + + if (msg->type == SIP_MESSAGE_TYPE_REQUEST) { + printf("Request Line:\n"); + printf(" Method: %d\n", msg->request_line.method); + printf(" URI: %.*s\n", (int)msg->request_line.uri_len, msg->request_line.uri); + printf(" Version: %.*s\n", (int)msg->request_line.version_len, msg->request_line.version); + } else if (msg->type == SIP_MESSAGE_TYPE_RESPONSE) { + printf("Status Line:\n"); + printf(" Version: %.*s\n", (int)msg->status_line.version_len, msg->status_line.version); + printf(" Code: %.*s\n", (int)msg->status_line.code_len, msg->status_line.code); + printf(" Reason: %.*s\n", (int)msg->status_line.reason_len, msg->status_line.reason); + } + + printf("Headers:\n"); + for (size_t i = 0; i < msg->header.n_header_fields; i++) { + struct sip_header_field *field = &msg->header.header_fields[i]; + printf(" %.*s: %.*s\n", (int)field->field_name_len, field->field_name, + (int)field->field_value_len, field->field_value); + } + + if (msg->body.body_len > 0) { + printf("Body:\n"); + printf(" Media IP: %.*s\n", (int)msg->body.media_ip_len, msg->body.media_ip); + printf(" Media Audio Port: %u\n", msg->body.media_audio_port); + printf(" Media Video Port: %u\n", msg->body.media_video_port); + printf(" Body: %.*s\n", (int)msg->body.body_len, msg->body.body); + } else { + printf("Body: Empty\n"); + } +} + +int sip_message_publish(struct sip_decoder *decoder, struct sip_message *msg, void *arg) +{ + UNUSED(arg); + int topic_id; + struct mq_runtime *runtime; + + runtime = module_manager_get_mq_runtime(decoder->mod_mgr); + if (runtime == NULL) { + return -1; + } + + switch (msg->type) { + case SIP_MESSAGE_TYPE_REQUEST: + topic_id = decoder->request_message_topic_id; + break; + case SIP_MESSAGE_TYPE_RESPONSE: + topic_id = decoder->response_message_topic_id; + break; + default: + return -1; + } + + //sip_message_print(msg); + + return mq_runtime_publish_message(runtime, topic_id, msg); +} + +static void sip_message_dispatch(int topic, void *msg, on_msg_cb_func *msg_cb, void *arg, void *dispatch_arg) +{ + struct sip_message *sip_msg; + struct sip_decoder *decoder; + + sip_msg = (struct sip_message *)msg; + decoder = (struct sip_decoder *)dispatch_arg; + + if (topic == decoder->request_message_topic_id) { + sip_request_message_callback_func *req_cb = (sip_request_message_callback_func *)(void *)msg_cb; + req_cb(sip_msg->sess_ref, &sip_msg->request_line, &sip_msg->header, &sip_msg->body, arg); + return; + } + + if (topic == decoder->response_message_topic_id) { + sip_response_message_callback_func *res_cb = (sip_response_message_callback_func *)(void *)msg_cb; + res_cb(sip_msg->sess_ref, &sip_msg->status_line, &sip_msg->header, &sip_msg->body, arg); + return; + } +} + +static int sip_message_subscribe(struct module_manager *mod_mgr, const char *topic_name, on_msg_cb_func *cb, void *arg) +{ + int topic; + struct mq_schema *schema; + + if (mod_mgr == NULL) { + return -1; + } + + schema = module_manager_get_mq_schema(mod_mgr); + if (schema == NULL) { + return -1; + } + + topic = mq_schema_get_topic_id(schema, topic_name); + if (topic < 0) { + return -1; + } + + return mq_schema_subscribe(schema, topic, (on_msg_cb_func *)(void *)cb, arg); +} + +static int sip_transaction_call_state_check(struct sip_transaction *transaction, enum sip_call_state call_state) +{ + return transaction->call_state == call_state; +} + +static void sip_transaction_call_state_set(struct sip_transaction *transaction, enum sip_call_state call_state) +{ + transaction->call_state = call_state; +} + +static void sip_transaction_call_state_update(struct sip_transaction *transaction, struct sip_message *msg) +{ + if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_OPENING)) { + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_WAITING); + } + + if (msg->type == SIP_MESSAGE_TYPE_REQUEST) { + enum sip_method method = msg->request_line.method; + switch (method) { + case SIP_METHOD_INVITE: + if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_WAITING)) { + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_CALLING); + } + break; + case SIP_METHOD_BYE: + if (transaction->dir != SIP_DIR_DOUBLE) { + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_DISCONNECTED); + } else { + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_DISCONNECTING); + } + break; + case SIP_METHOD_ACK: + if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_CONNECTING)) { + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_CONFIRMED); + } + break; + default: + break; + } + } + + if (msg->type == SIP_MESSAGE_TYPE_RESPONSE) { + int stauts_code = strntoi((char*)msg->status_line.code, msg->status_line.code_len); + enum sip_method cseq_method = msg->header.cseq_method; + + // rfc3261#section-7.2 + switch (stauts_code/100) { + // status code 1xx + case 1: + if (cseq_method == SIP_METHOD_INVITE) { + if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_CALLING)) { + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_EARLY); + } + if (transaction->dir != SIP_DIR_DOUBLE && + sip_transaction_call_state_check(transaction, SIP_CALL_STATE_WAITING)) { + + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_EARLY); + } + } + break; + case 2: + if (stauts_code == 200) { + if (cseq_method == SIP_METHOD_INVITE) { + + if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_EARLY) || + sip_transaction_call_state_check(transaction, SIP_CALL_STATE_CALLING)) { + + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_CONNECTING); + } + } + if (cseq_method == SIP_METHOD_BYE) { + if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_DISCONNECTING)) { + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_DISCONNECTED); + } + + if (transaction->dir != SIP_DIR_DOUBLE && + !sip_transaction_call_state_check(transaction, SIP_CALL_STATE_WAITING)) { + + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_DISCONNECTED); + } + } + } + break; + case 5: + if (!sip_transaction_call_state_check(transaction, SIP_CALL_STATE_WAITING)) { + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_DISCONNECTED); + } + break; + case 3: + case 4: + break; + default: + break; + } + } +} + +static void sip_transaction_dir_update(struct sip_transaction *transaction, struct sip_message *msg) +{ + if (transaction->dir != SIP_DIR_DOUBLE) { + switch (transaction->dir) { + case SIP_DIR_UNKNOWN: + if (msg->type == SIP_MESSAGE_TYPE_REQUEST) { + transaction->dir = SIP_DIR_REQUEST; + } + if (msg->type == SIP_MESSAGE_TYPE_RESPONSE){ + transaction->dir = SIP_DIR_RESPONSE; + } + break; + case SIP_DIR_REQUEST: + if (msg->type == SIP_MESSAGE_TYPE_RESPONSE) { + transaction->dir = SIP_DIR_DOUBLE; + } + break; + case SIP_DIR_RESPONSE: + if (msg->type == SIP_MESSAGE_TYPE_REQUEST) { + transaction->dir = SIP_DIR_DOUBLE; + } + break; + case SIP_DIR_DOUBLE: + break; + default: + break; + } + } +} + +static void sip_transaction_free(struct sip_transaction *transaction) +{ + if (transaction) { + if (transaction->key) { + free(transaction->key); + } + free(transaction); + } +} + +static struct sip_transaction* sip_transaction_new(void) +{ + return (struct sip_transaction *)calloc(1, sizeof(struct sip_transaction)); +} + +static void sip_transaction_close(struct sip_decoder *decoder, struct sip_exdata *exdata, struct sip_transaction *transaction) +{ + UNUSED(decoder); + UNUSED(exdata); +// int ret; +// struct sip_message *msg; + + sip_transaction_call_state_set(transaction, SIP_CALL_STATE_CLOSING); + +// msg = sip_message_new(); +// msg->sess_ref = exdata->sess_ref; +// msg->transaction_seq = transaction->seq; +// msg->call_state = SIP_CALL_STATE_CLOSING; +// msg->type = SIP_MESSAGE_TYPE_REQUEST; +// ret = sip_message_publish(decoder, msg, NULL); +// if (ret < 0) { +// sip_message_free(msg); +// } +// +// msg = sip_message_new(); +// msg->sess_ref = exdata->sess_ref; +// msg->transaction_seq = transaction->seq; +// msg->call_state = SIP_CALL_STATE_CLOSING; +// msg->type = SIP_MESSAGE_TYPE_RESPONSE; +// ret = sip_message_publish(decoder, msg, NULL); +// if (ret < 0) { +// sip_message_free(msg); +// } + + sip_transaction_free(transaction); +} + +static struct sip_transaction *sip_transaction_open(struct sip_decoder *decoder, struct sip_exdata *exdata, const char *call_id, size_t call_id_len) +{ + UNUSED(decoder); + UNUSED(exdata); + // int ret; +// struct sip_message *msg; + struct sip_transaction *transaction; + + transaction = sip_transaction_new(); + transaction->seq = exdata->transaction_seq++; + transaction->call_state = SIP_CALL_STATE_OPENING; + transaction->key = malloc(call_id_len); + transaction->key_len = call_id_len; + memcpy(transaction->key, call_id, call_id_len); + +// msg = sip_message_new(); +// msg->sess_ref = exdata->sess_ref; +// msg->transaction_seq = transaction->seq; +// msg->call_state = SIP_CALL_STATE_OPENING; +// msg->type = SIP_MESSAGE_TYPE_REQUEST; +// ret = sip_message_publish(decoder, msg, NULL); +// if (ret < 0) { +// sip_message_free(msg); +// } +// msg = sip_message_new(); +// msg->sess_ref = exdata->sess_ref; +// msg->transaction_seq = transaction->seq; +// msg->call_state = SIP_CALL_STATE_OPENING; +// msg->type = SIP_MESSAGE_TYPE_RESPONSE; +// ret = sip_message_publish(decoder, msg, NULL); +// if (ret < 0) { +// sip_message_free(msg); +// } + + return transaction; +} + +static void sip_transaction_prune(struct sip_decoder *decoder, struct sip_exdata *exdata, struct timeval ts) +{ + struct sip_transaction *transaction = NULL, *tmp = NULL, *oldest; + + if (exdata->transaction_table == NULL || decoder->transaction_timeout_ms == 0) { + return; + } + + if (decoder->transaction_limit_count > 0 && + decoder->transaction_limit_count <= (int)HASH_CNT(hh, exdata->transaction_table)) { + oldest = exdata->transaction_table; + HASH_DELETE(hh, exdata->transaction_table, oldest); + sip_transaction_close(decoder, exdata, oldest); + } + + HASH_ITER(hh, exdata->transaction_table, transaction, tmp) { + if (timeval_delta_ms(transaction->last_update, ts) < decoder->transaction_timeout_ms) { + break; + } + + HASH_DELETE(hh, exdata->transaction_table, transaction); + sip_transaction_close(decoder, exdata, transaction); + } +} + +static void sip_transaction_process(struct sip_decoder *decoder, struct sip_exdata *exdata, struct sip_message *msgs[], size_t n_msgs) +{ + int ret; + size_t i; + struct timeval ts; + struct sip_message *msg; + struct sip_transaction *transaction; + struct sip_header_field *call_id; + + gettimeofday(&ts, NULL); + + for (i = 0 ; i < n_msgs; i++) { + msg = msgs[i]; + call_id = msg->header.call_id; + + if (call_id == NULL || call_id->field_value == NULL || call_id->field_value_len == 0) { + continue; + } + + // get transaction + HASH_FIND(hh, exdata->transaction_table, call_id->field_value, call_id->field_value_len, transaction); + if (transaction == NULL) { + transaction = sip_transaction_open(decoder, exdata, call_id->field_value, call_id->field_value_len); + } else { + HASH_DELETE(hh, exdata->transaction_table, transaction); + } + HASH_ADD_KEYPTR(hh, exdata->transaction_table, transaction->key, transaction->key_len, transaction); + + // update transaction + transaction->last_update = ts; + sip_transaction_dir_update(transaction, msg); + sip_transaction_call_state_update(transaction, msg); + + // publish message + msg->transaction_seq = transaction->seq; + msg->call_state = transaction->call_state; + ret = sip_message_publish(decoder, msg, NULL); + if (ret < 0) { + sip_message_free(msg); + } + + // transaction that call state is disconnected, should be closed immediately + if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_DISCONNECTED)) { + HASH_DELETE(hh, exdata->transaction_table, transaction); + sip_transaction_close(decoder, exdata, transaction); + } + + // timeout transaction + sip_transaction_prune(decoder, exdata, ts); + } +} + +int sip_decode_sdp_media(struct sip_message *msg, const char *line_start, size_t line_len) +{ + int ret; + struct iovec parts[SDP_MEDIA_PART_NUM]; + const char *media; + size_t media_len; + const char *port; + size_t port_len; + + if (line_start == NULL || line_len <= 2) { + return -1; + } + + // skip "m=" + line_start += strlen("m="); + line_len -= strlen("m="); + + ret = strnsplit(parts, SIP_DIM(parts), ' ', line_start, line_len); + if (ret != SIP_DIM(parts)) { + return -1; + } + + media = (const char *)parts[SDP_MEDIA_PART_MEDIA].iov_base; + media_len = parts[SDP_MEDIA_PART_MEDIA].iov_len; + port = (const char *)parts[SDP_MEDIA_PART_PORT].iov_base; + port_len = parts[SDP_MEDIA_PART_PORT].iov_len; + + if (0 == SAFE_STRNCASECMP(media, media_len, "audio", strlen("audio"))) { + msg->body.media_audio_port = strntoi((char*)port, port_len); + return 0; + } + if (0 == SAFE_STRNCASECMP(media, media_len, "video", strlen("video"))) { + msg->body.media_video_port = strntoi((char*)port, port_len); + return 0; + } + + return 0; +} + +int sip_decode_sdp_connection(struct sip_message *msg, const char *line_start, size_t line_len) +{ + int ret; + struct iovec parts[SDP_CONNECTION_PART_NUM]; + const char *addr; + size_t addr_len; + const char *nettype; + size_t nettype_len; + + if (line_start == NULL || line_len <= 2) { + return -1; + } + + // skip "c=" + line_start += strlen("c="); + line_len -= strlen("c="); + + ret = strnsplit(parts, SIP_DIM(parts), ' ', line_start, line_len); + if (ret != SIP_DIM(parts)) { + return -1; + } + + addr = (const char *)parts[SDP_CONNECTION_PART_ADDRESS].iov_base; + addr_len = parts[SDP_CONNECTION_PART_ADDRESS].iov_len; + nettype = (const char *)parts[SDP_CONNECTION_PART_NETTYPE].iov_base; + nettype_len = parts[SDP_CONNECTION_PART_NETTYPE].iov_len; + + if (0 == SAFE_STRNCASECMP(nettype, nettype_len, "IN", strlen("IN"))) { + msg->body.media_ip = addr; + msg->body.media_ip_len = addr_len; + return 0; + } + + return 0; +} + +static int sip_decode_body(struct sip_message *msg, const char *body, size_t body_len) +{ + int ret; + size_t remain, offset; + struct sip_header_field *content_type; + + if (body == NULL || body_len == 0) { + return -1; + } + + msg->body.body = body; + msg->body.body_len = body_len; + + content_type = msg->header.content_type; + if (content_type == NULL) { + return 0; + } + + // parse sdp media + if (0 != SAFE_STRNCASECMP(content_type->field_value, content_type->field_value_len, "application/sdp", strlen("application/sdp"))) { + return 0; + } + + msg->body.sdp_content = body; + msg->body.sdp_content_len = body_len; + + offset = 0; + remain = body_len; + + // body lines + while (remain > 0) { + const char *line_start, *line_end; + size_t line_len; + + // line + line_start = body + offset; + line_end = (const char *)memmem(line_start, remain, SIP_LINE_END, strlen(SIP_LINE_END)); + if (line_end == NULL || line_end == line_start) { + break; + } + + line_len = line_end - line_start + strlen(SIP_LINE_END); + switch (line_start[0]) { + case 'c': + ret = sip_decode_sdp_connection(msg, line_start, line_len - strlen(SIP_LINE_END)); + if (ret != 0) { + return -1; + } + break; + case 'm': + ret = sip_decode_sdp_media(msg, line_start, line_len - strlen(SIP_LINE_END)); + if (ret != 0) { + return -1; + } + break; + default: + break; + } + + offset += line_len; + remain -= line_len; + } + return 0; +} + +static int sip_decode_header_field(struct sip_message *msg, const char *line_start, size_t line_len) +{ + int ret; + struct iovec field_parts[SIP_HEADER_FIELD_PART_NUM]; + struct sip_header_field *header_field; + struct sip_header *header = &msg->header; + + if (line_start == NULL || line_len == 0) { + return -1; + } + + ret = strnsplit(field_parts, SIP_DIM(field_parts), ':', line_start, line_len); + if (ret != SIP_DIM(field_parts)) { + return -1; + } + + header_field = field_pool_series_get(msg->header_field_pool); + if (header_field == NULL) { + return -1; + } + + if (header->header_fields == NULL) { + header->header_fields = header_field; + } + header->n_header_fields++; + header_field->field_name = (const char*)field_parts[SIP_HEADER_FIELD_PART_NAME].iov_base; + header_field->field_name_len = field_parts[SIP_HEADER_FIELD_PART_NAME].iov_len; + header_field->field_value = (const char*)field_parts[SIP_HEADER_FIELD_PART_VALUE].iov_base; + header_field->field_value_len = field_parts[SIP_HEADER_FIELD_PART_VALUE].iov_len; + + if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Call-ID", strlen("Call-ID"))) { + msg->header.call_id = header_field; + return 0; + } + + if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "From", strlen("From"))) { + msg->header.from = header_field; + return 0; + } + + if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "To", strlen("To"))) { + msg->header.to = header_field; + } + + if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "CSeq", strlen("CSeq"))) { + msg->header.cseq = header_field; + + struct iovec cseq_parts[SIP_CSEQ_PART_NUM]; + ret = strnsplit(cseq_parts, SIP_DIM(cseq_parts), ' ', header_field->field_value, header_field->field_value_len); + if (ret != SIP_DIM(cseq_parts)) { + return 0; + } + msg->header.cseq_method = sip_strn2method((const char *)cseq_parts[SIP_CSEQ_PART_METHOD].iov_base, + cseq_parts[SIP_CSEQ_PART_METHOD].iov_len); + return 0; + } + + if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Via", strlen("Via"))) { + msg->header.via = header_field; + return 0; + } + + if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Server", strlen("Server"))) { + msg->header.server = header_field; + return 0; + } + + if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Reason", strlen("Reason"))) { + msg->header.reason = header_field; + return 0; + } + + if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "User-Agent", strlen("User-Agent"))) { + msg->header.user_agent = header_field; + return 0; + } + + if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Content-Type", strlen("Content-Type"))) { + msg->header.content_type = header_field; + return 0; + } + + if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Content-Length", strlen("Content-Length"))) { + msg->header.content_length = header_field; + return 0; + } + + return 0; +} + +int sip_decode_header(struct sip_message *msg, const char *header_start, size_t header_len) +{ + int ret = 0; + size_t remain, offset; + + offset = 0; + remain = header_len; + + // header lines + while (remain > 0) { + const char *line_start, *line_end; + size_t line_len; + + // line + line_start = header_start + offset; + line_end = (const char *)memmem(line_start, remain, SIP_LINE_END, strlen(SIP_LINE_END)); + if (line_end == NULL || line_end == line_start) { + break; + } + + line_len = line_end - line_start + strlen(SIP_LINE_END); + ret = sip_decode_header_field(msg, line_start, line_len - strlen(SIP_LINE_END)); + if (ret != 0) { + break; + } + + offset += line_len; + remain -= line_len; + } + + return ret; +} + +int sip_decode_start_line(struct sip_message *msg, const char *payload, size_t payload_len) +{ + int ret; + enum sip_method method; + enum sip_version version; + struct iovec parts[SIP_START_LINE_PART_NUM]; + + ret = strnsplit(parts, SIP_DIM(parts), ' ', payload, payload_len); + if (ret != SIP_DIM(parts)) { + return -1; + } + + // try to decode as request line + method = sip_strn2method((const char *)parts[SIP_REQUEST_LINE_PART_METHOD].iov_base, parts[SIP_REQUEST_LINE_PART_METHOD].iov_len); + if (method != SIP_METHOD_UNKNOWN) { + msg->request_line.method = method; + msg->request_line.uri = (const char *)parts[SIP_REQUEST_LINE_PART_URI].iov_base; + msg->request_line.uri_len = parts[SIP_REQUEST_LINE_PART_URI].iov_len; + msg->request_line.version = (const char *)parts[SIP_REQUEST_LINE_PART_VERSION].iov_base; + msg->request_line.version_len = parts[SIP_REQUEST_LINE_PART_VERSION].iov_len; + msg->type = SIP_MESSAGE_TYPE_REQUEST; + return 0; + } + + // try to decode as status line + version = sip_strn2version((const char *)parts[SIP_STATUS_LINE_PART_VERSION].iov_base, parts[SIP_STATUS_LINE_PART_VERSION].iov_len); + if (version != SIP_VERSION_UNKNOWN) { + msg->status_line.version = (const char *)parts[SIP_STATUS_LINE_PART_VERSION].iov_base; + msg->status_line.version_len = parts[SIP_STATUS_LINE_PART_VERSION].iov_len; + msg->status_line.code = (const char *)parts[SIP_STATUS_LINE_PART_CODE].iov_base; + msg->status_line.code_len = parts[SIP_STATUS_LINE_PART_CODE].iov_len; + msg->status_line.reason = (const char *)parts[SIP_STATUS_LINE_PART_REASON].iov_base; + msg->status_line.reason_len = parts[SIP_STATUS_LINE_PART_REASON].iov_len; + msg->type = SIP_MESSAGE_TYPE_RESPONSE; + return 0; + } + + return -1; +} + +int sip_decode(struct sip_message *msg, const char *payload, size_t payload_len) +{ + int ret = 0; + int content_length; + size_t offset; + size_t line_len = 0; + size_t header_len = 0; + size_t body_len = 0; + const char *line_start = NULL; + const char *header_start = NULL; + const char *body_start = NULL; + const char *line_end; + const char *header_end; + + offset = 0; + + // check start-line complete + line_start = payload + offset; + line_end = (const char *)memmem(line_start, payload_len - offset, SIP_LINE_END, strlen(SIP_LINE_END)); + if (line_end == NULL) { + goto exit; + } + + line_len = line_end - line_start + strlen(SIP_LINE_END); + offset += line_len; + + if (offset > payload_len) { + goto exit; + } + + // decode start line + ret = sip_decode_start_line(msg, line_start, line_len); + if (ret != 0) { + ret = -1; + goto exit; + } + + // check message-header complete + header_start = payload + offset; + header_end = (const char *)memmem(header_start, payload_len - offset, SIP_HEADER_END, strlen(SIP_HEADER_END)); + if (header_end == NULL) { + goto exit; + } + + header_len = header_end - header_start + strlen(SIP_HEADER_END); + offset += header_len; + + if (offset > payload_len) { + goto exit; + } + + // decode header + ret = sip_decode_header(msg, header_start, header_len); + if (ret != 0) { + ret = -1; + goto exit; + } + + // check body complete + body_start = NULL; + body_len = 0; + if (msg->header.content_length && + msg->header.content_length->field_value && msg->header.content_length->field_value_len > 0) { + content_length = strntoi((char*)msg->header.content_length->field_value, msg->header.content_length->field_value_len); + if (content_length > 0) { + body_start = payload + offset; + body_len = content_length; + + offset += body_len; + if (offset > payload_len) { + goto exit; + } + + ret = sip_decode_body(msg, body_start, body_len); + if (ret != 0) { + goto exit; + } + } + } + + ret = offset; +exit: + return ret; +} + +static enum sip_identify_state sip_identify(struct sip_decoder *decoder, struct sip_exdata *exdata, const char *payload, size_t payload_len) +{ + UNUSED(decoder); + int ret; + size_t i; + size_t start_line_len = 0; + const char *start_line = NULL; + struct iovec parts[SIP_START_LINE_PART_NUM]; + enum sip_method method; + enum sip_version version; + + if (exdata->identify_state == SIP_IDENTIFY_STATE_FALSE || + exdata->identify_state == SIP_IDENTIFY_STATE_TRUE) { + goto exit; + } + + if (exdata->identify_times++ > SIP_IDENTIFY_TIMES_MAX) { + exdata->identify_state = SIP_IDENTIFY_STATE_FALSE; + goto exit; + } + + // skip first '\r\n' + for (i = 0; i < payload_len; i++) { + if (payload[i] != '\r' && payload[i] != '\n') { + payload += i; + payload_len -= i; + break; + } + } + + if (payload_len == 0) { + exdata->identify_state = SIP_IDENTIFY_STATE_HALF_TRUE; + goto exit; + } + + start_line = payload; + for (i = 0; i < payload_len; i++) { + if (payload[i] == '\r' || payload[i] == '\n') { + start_line_len = i; + break; + } + } + + if (start_line_len == 0) { + exdata->identify_state = SIP_IDENTIFY_STATE_FALSE; + goto exit; + } + + // split start line + ret = strnsplit(parts, SIP_DIM(parts), ' ', (char *)start_line, start_line_len); + if (ret != SIP_START_LINE_PART_NUM) { + exdata->identify_state = SIP_IDENTIFY_STATE_FALSE; + goto exit; + } + + // it's sip request ? + method = sip_strn2method((const char *)parts[SIP_REQUEST_LINE_PART_METHOD].iov_base, + parts[SIP_REQUEST_LINE_PART_METHOD].iov_len); + version = sip_strn2version((const char *)parts[SIP_REQUEST_LINE_PART_VERSION].iov_base, + parts[SIP_REQUEST_LINE_PART_VERSION].iov_len); + if (method != SIP_METHOD_UNKNOWN && version != SIP_VERSION_UNKNOWN) { + exdata->identify_state = SIP_IDENTIFY_STATE_TRUE; + goto exit; + } + + // it's sip response ? + version = sip_strn2version((const char *)parts[SIP_STATUS_LINE_PART_VERSION].iov_base, + parts[SIP_STATUS_LINE_PART_VERSION].iov_len); + if (version != SIP_VERSION_UNKNOWN && parts[SIP_STATUS_LINE_PART_CODE].iov_len == SIP_STATUS_CODE_LEN) { + exdata->identify_state = SIP_IDENTIFY_STATE_TRUE; + goto exit; + } + + exdata->identify_state = SIP_IDENTIFY_STATE_FALSE; +exit: + return exdata->identify_state; +} + +static void sip_exdata_free(struct sip_exdata *exdata) +{ + struct sip_transaction *transaction = NULL, *tmp = NULL; + + if (exdata) { + if (exdata->transaction_table) { + HASH_ITER(hh, exdata->transaction_table, transaction, tmp) { + HASH_DELETE(hh, exdata->transaction_table, transaction); + + sip_transaction_close(exdata->decoder_ref, exdata, transaction); + } + } + + if (exdata->streambuffer.buf) { + free(exdata->streambuffer.buf); + exdata->streambuffer.buf = NULL; + } + free(exdata); + } +} + +static struct sip_exdata *sip_exdata_new(void) +{ + struct sip_exdata *exdata; + exdata = (struct sip_exdata *)calloc(1, sizeof(struct sip_exdata)); + stream_buffer_init(&exdata->streambuffer, SIP_STREAM_BUFFER_DEFAULT_SIZE); + return exdata; +} + +static void sip_on_payload(struct session *sess, enum session_state state, const char *payload, size_t payload_len, void *arg) +{ + UNUSED(state); + struct sip_exdata *exdata; + struct sip_decoder *decoder; + enum sip_identify_state identify_state; + + // session is closing? + if (payload == NULL) { + return; + } + + decoder = (struct sip_decoder *)arg; + + exdata = (struct sip_exdata *)session_get_exdata(sess, decoder->exdata_id); + if (exdata == NULL) { + exdata = sip_exdata_new(); + exdata->decoder_ref = decoder; + exdata->sess_ref = sess; + session_set_exdata(sess, decoder->exdata_id, exdata); + } + + if (exdata->sess_ignored) { + return; + } + + identify_state = sip_identify(decoder, exdata, payload, payload_len); + if (identify_state == SIP_IDENTIFY_STATE_FALSE) { + exdata->sess_ignored = 1; + return; + } + if (identify_state != SIP_IDENTIFY_STATE_TRUE) { + return; + } + + /* + * We need both remaining data from last sip message and + * current sip message + */ + if (!stream_buffer_is_empty(&exdata->streambuffer)) { + stream_buffer_append(&exdata->streambuffer, payload, payload_len); + if (stream_buffer_is_full(&exdata->streambuffer)) { + stream_buffer_reset(&exdata->streambuffer); + return; + } + + stream_buffer_get_data(&exdata->streambuffer, &payload, &payload_len, 0); + } + + int tid, msg_cnt = 0; + int decoded = 0; + int offset; + int remain; + struct sip_message *msg; + struct sip_message* msg_list[SIP_PER_PAYLOAD_MESSAGE_MAX]; + struct sip_header_field_pool *field_pool; + + tid = module_manager_get_thread_id(decoder->mod_mgr); + field_pool = decoder->field_pools[tid]; + field_pool_reset(field_pool); + + /* + * There may be multiple SIP messages in the payload. We should parse all of them + * and cache the last incomplete one, if it exists. + */ + offset = 0; + remain = payload_len; + while (remain > 0 && msg_cnt < (int)sizeof(msg_list)) { + msg= sip_message_new(); + msg->sess_ref = sess; + msg->header_field_pool = field_pool; + + decoded = sip_decode(msg, payload + offset, remain); + // decode failed + if (decoded < 0) { + sip_message_free(msg); + return; + } + + // incomplete SIP message, break and cache it + if (decoded == 0) { + sip_message_free(msg); + break; + } + + msg_list[msg_cnt++] = msg; + + offset += decoded; + remain -= decoded; + } + + // messages should be published or freed in sip_transaction_process + if (msg_cnt > 0) { + sip_transaction_process(decoder, exdata, msg_list, msg_cnt); + } + + // remove decoded data + if (!stream_buffer_is_empty(&exdata->streambuffer)) { + stream_buffer_slide(&exdata->streambuffer, offset); + } + + // cache remaining data + if (remain > 0) { + stream_buffer_append(&exdata->streambuffer, payload + offset, remain); + if (stream_buffer_is_full(&exdata->streambuffer)) { + // reach streambuffer limit, just reset + stream_buffer_reset(&exdata->streambuffer); + } + } +} + +static void sip_on_tcp_payload(struct session *sess, enum session_state state, const char *payload, uint32_t payload_len, void *arg) +{ + sip_on_payload(sess, state, payload, (size_t)payload_len, arg); +} + +static void sip_on_udp_packet(struct session *sess, enum session_state state, struct packet *pkt, void *arg) +{ + const char *payload; + size_t payload_len; + + if (pkt == NULL) { + payload = NULL; + payload_len = 0; + } else { + payload = packet_get_payload_data(pkt); + payload_len = packet_get_payload_len(pkt); + } + + sip_on_payload(sess, state, payload, payload_len, arg); +} + +void sip_on_exdata_free(int idx, void *ex_ptr, void *arg) +{ + UNUSED(idx); + UNUSED(arg); + struct sip_exdata *exdata = (struct sip_exdata *)ex_ptr; + + if (exdata) { + sip_exdata_free(exdata); + } +} + +static void sip_on_message_free(void *msg, void *arg) +{ + UNUSED(arg); + if (msg) { + free(msg); + } +} + +long long sip_get_call_duration_ms(struct sip_decoder *decoder, struct session *sess, const char *call_id, size_t call_id_len) +{ + //struct module *mod; + //struct sip_decoder *decoder; + struct sip_exdata *exdata; + struct sip_transaction *transaction = NULL; + + //mod = module_manager_get_module(mod_mgr, SIP_MODULE_NAME); + //if (mod == NULL) { + // return -1; + //} + + //decoder = module_get_ctx(mod); + //if (decoder == NULL) { + // return -1; + //} + + exdata = (struct sip_exdata *)session_get_exdata(sess, decoder->exdata_id); + if (exdata == NULL) { + return -1; + } + + HASH_FIND(hh, exdata->transaction_table, call_id, call_id_len, transaction); + if (transaction == NULL) { + return -1; + } + + if (transaction->call_duration_ms == 0) { + return -1; + } + + return transaction->call_duration_ms; +} + +struct sip_decoder *module_to_sip_decoder(struct module *mod) +{ + assert(mod); + assert(strcmp(module_get_name(mod), SIP_MODULE_NAME) == 0); + return module_get_ctx(mod); +} + +int sip_subscribe(struct sip_decoder *decoder, + sip_request_message_callback_func *request_cb, + sip_response_message_callback_func *response_cb, + void *arg) +{ + int ret; + ret = sip_message_subscribe(decoder->mod_mgr, SIP_REQUEST_TOPIC_NAME, (on_msg_cb_func *)(void *)request_cb, arg); + if (ret < 0) { + return ret; + } + ret = sip_message_subscribe(decoder->mod_mgr, SIP_RESPONSE_TOPIC_NAME, (on_msg_cb_func *)(void *)response_cb, arg); + if (ret < 0) { + return ret; + } + return 0; +} + +void sip_exit(struct module_manager *mod_mgr, struct module *mod) +{ + (void)(mod_mgr); + int i; + struct sip_decoder *decoder; + struct mq_schema *schema; + + if (mod) { + decoder = (struct sip_decoder *)module_get_ctx(mod); + if (decoder) { + schema = module_manager_get_mq_schema(decoder->mod_mgr); + mq_schema_destroy_topic(schema, decoder->request_message_topic_id); + mq_schema_destroy_topic(schema, decoder->response_message_topic_id); + + for (i = 0; i < decoder->thread_num; i++) { + field_pool_free(decoder->field_pools[i]); + } + free(decoder->field_pools); + free(decoder); + } + + module_free(mod); + } +} + +struct module* sip_init(struct module_manager *mod_mgr) +{ + int i, ret, thread_num; + struct mq_schema *schema; + struct session_manager *sess_mgr; + struct module *mod; + struct sip_decoder *decoder; + + decoder = (struct sip_decoder *)calloc(1, sizeof(struct sip_decoder)); + decoder->mod_mgr = mod_mgr; + mod = module_new(SIP_MODULE_NAME, decoder); + sess_mgr = module_to_session_manager(module_manager_get_module(mod_mgr, SESSION_MANAGER_MODULE_NAME)); + schema = module_manager_get_mq_schema(mod_mgr); + + if (sess_mgr == NULL || schema == NULL) { + goto exit; + } + + decoder->exdata_id = session_manager_new_session_exdata_index(sess_mgr, SIP_EXDATA_NAME, sip_on_exdata_free, NULL); + if (decoder->exdata_id < 0) { + goto exit; + } + + ret = session_manager_subscribe_udp(sess_mgr, sip_on_udp_packet, decoder); + if (ret < 0) { + goto exit; + } + + ret = session_manager_subscribe_tcp_stream(sess_mgr, sip_on_tcp_payload, decoder); + if (ret < 0) { + goto exit; + } + + decoder->request_message_topic_id = mq_schema_create_topic(schema, SIP_REQUEST_TOPIC_NAME, sip_message_dispatch, decoder, sip_on_message_free, NULL); + if (decoder->request_message_topic_id < 0) { + goto exit; + } + + decoder->response_message_topic_id = mq_schema_create_topic(schema, SIP_RESPONSE_TOPIC_NAME, sip_message_dispatch, decoder, sip_on_message_free, NULL); + if (decoder->response_message_topic_id < 0) { + goto exit; + } + + // init per thread sip header field pools + thread_num = module_manager_get_max_thread_num(mod_mgr); + decoder->thread_num = thread_num; + decoder->field_pools = (struct sip_header_field_pool **)calloc(thread_num, sizeof(struct sip_header_field_pool *)); + for (i = 0; i < thread_num; i++) { + decoder->field_pools[i] = field_pool_new(SIP_HEADER_FIELD_POOL_DEFAULT_SIZE); + } + + return mod; +exit: + sip_exit(mod_mgr, mod); + return NULL; +} + + |
