summaryrefslogtreecommitdiff
path: root/decoders/sip
diff options
context:
space:
mode:
authorzhuzhenjun <[email protected]>2024-11-03 16:55:14 +0000
committerzhuzhenjun <[email protected]>2024-11-06 10:06:51 +0000
commit54837dbc1e9ed341610f3ac7409299f91b6ef55f (patch)
tree082a7c6e1f0a09cc1777956bb729a641410c4b44 /decoders/sip
parent8ddef31bb58c529763d565674ba5036584323921 (diff)
sip decoder initialdev-sip-decoder
Diffstat (limited to 'decoders/sip')
-rw-r--r--decoders/sip/CMakeLists.txt13
-rw-r--r--decoders/sip/sip.c1493
-rw-r--r--decoders/sip/sip_internal.h193
-rw-r--r--decoders/sip/version.map13
4 files changed, 1712 insertions, 0 deletions
diff --git a/decoders/sip/CMakeLists.txt b/decoders/sip/CMakeLists.txt
new file mode 100644
index 0000000..8e50eb3
--- /dev/null
+++ b/decoders/sip/CMakeLists.txt
@@ -0,0 +1,13 @@
+set(DECODER_NAME sip)
+
+file(GLOB DECODER_SRC "${CMAKE_CURRENT_SOURCE_DIR}/${DECODER_NAME}*.c")
+
+add_library(
+ ${DECODER_NAME}
+ ${DECODER_SRC}
+)
+
+set_target_properties(
+ ${DECODER_NAME} PROPERTIES
+ LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/version.map"
+)
diff --git a/decoders/sip/sip.c b/decoders/sip/sip.c
new file mode 100644
index 0000000..830ac7c
--- /dev/null
+++ b/decoders/sip/sip.c
@@ -0,0 +1,1493 @@
+#include <stdio.h>
+#include <limits.h>
+#include <ctype.h>
+#include <assert.h>
+#include <sys/time.h>
+
+#include "stellar/packet.h"
+#include "stellar/utils.h"
+#include "stellar/mq.h"
+#include "stellar/session.h"
+#include "stellar/sip.h"
+
+#include "sip_internal.h"
+
+const char* g_sip_version[] = {
+ "unknown",
+ "sip/1.0",
+ "sip/1.1",
+ "sip/2.0"
+};
+
+
+const char* g_sip_method[] = {
+ "unknown",
+ "invite",
+ "ack",
+ "options",
+ "register",
+ "bye",
+ "cancel",
+ "do",
+ "info",
+ "message",
+ "notify",
+ "prack",
+ "qauth",
+ "refer",
+ "sprack",
+ "subscribe",
+ "update",
+ "publish"
+};
+
+static int strntoi(char *buff, size_t len) {
+ if (buff == NULL || len == 0) {
+ return 0;
+ }
+
+ int result = 0;
+ int sign = 1;
+ size_t i = 0;
+
+ while (i < len && isspace(buff[i])) {
+ i++;
+ }
+
+ if (i < len && buff[i] == '-') {
+ sign = -1;
+ i++;
+ } else if (i < len && buff[i] == '+') {
+ i++;
+ }
+
+ for (; i < len; i++) {
+ if (!isdigit(buff[i])) {
+ break;
+ }
+
+ int digit = buff[i] - '0';
+
+ if (result > (INT_MAX - digit) / 10) {
+ return (sign == 1) ? INT_MAX : INT_MIN;
+ }
+
+ result = result * 10 + digit;
+ }
+
+ return result * sign;
+}
+
+static enum sip_version sip_strn2version(const char *data, size_t len)
+{
+ int i;
+ for(i = 1; i < SIP_VERSION_NUM; i++) {
+ if(0 == SAFE_STRNCASECMP(data, len, g_sip_version[i], strlen(g_sip_version[i]))) {
+ break;
+ }
+ }
+ if (i == SIP_VERSION_NUM) {
+ return SIP_VERSION_UNKNOWN;
+ }
+ return (enum sip_version)i;
+}
+
+enum sip_method sip_strn2method(const char *data, size_t len)
+{
+ int i;
+ if (data == NULL || len == 0) {
+ return SIP_METHOD_UNKNOWN;
+ }
+ for(i = 1; i < SIP_METHOD_NUM; i++) {
+ if(0 == SAFE_STRNCASECMP(data, len, g_sip_method[i], strlen(g_sip_method[i]))) {
+ break;
+ }
+ }
+ if (i == SIP_METHOD_NUM) {
+ return SIP_METHOD_UNKNOWN;
+ }
+ return (enum sip_method)i;
+}
+
+static char* strntrim_sp(char *str, size_t *len) {
+ char *end;
+
+ while (*str == ' ' && *len > 0) {
+ str++;
+ (*len)--;
+ }
+
+ if (*len == 0) {
+ return str;
+ }
+
+ end = str + *len - 1;
+ while (end > str && *end == ' ') {
+ end--;
+ (*len)--;
+ }
+
+ return str;
+}
+
+static int strnsplit(struct iovec *out, size_t n_out, char delimiter, const char *data, size_t data_len)
+{
+ size_t i;
+ size_t offset;
+ size_t part_len;
+ const char *data_end;
+ const char *part_start;
+ const char *part_end;
+
+ if (out == NULL || n_out == 0 || data == NULL || data_len == 0) {
+ return -1;
+ }
+
+ offset = 0;
+ data_end = data + data_len;
+
+ for (i = 0; i < n_out - 1; i++) {
+ part_start = data + offset;
+ part_end = memchr(part_start, delimiter, data_end - part_start);
+
+ if (part_end != NULL) {
+ // Found a delimiter
+ part_len = part_end - part_start;
+ out[i].iov_base = strntrim_sp((char*)part_start, &part_len);
+ out[i].iov_len = part_len;
+ offset += part_len + 1;
+ } else {
+ // Last part, no delimiter found
+ part_len = data_end - part_start;
+ out[i].iov_base = strntrim_sp((char*)part_start, &part_len);
+ out[i].iov_len = part_len;
+ offset += part_len;
+ return i + 1;
+ }
+
+ if (offset >= data_len) {
+ return i + 1;
+ }
+ }
+
+ // last part
+ part_start = data + offset;
+ part_len = data_end - part_start;
+ out[i].iov_base = strntrim_sp((char*)part_start, &part_len);
+ out[i].iov_len = part_len;
+
+ return n_out;
+}
+
+static inline struct sip_header_field_pool* field_pool_new(size_t pool_size)
+{
+ struct sip_header_field_pool *pool;
+
+ pool = calloc(1, sizeof(struct sip_header_field_pool));
+ pool->arr = calloc(pool_size, sizeof(struct sip_header_field));
+ pool->size = pool_size;
+ pool->used = 0;
+
+ return pool;
+}
+
+static inline void field_pool_free(struct sip_header_field_pool *pool)
+{
+ if (pool) {
+ if (pool->arr) {
+ free(pool->arr);
+ }
+ free(pool);
+ }
+}
+
+static inline void field_pool_reset(struct sip_header_field_pool *pool)
+{
+ if (pool && pool->arr && pool->used != 0) {
+ pool->used = 0;
+ }
+}
+
+static inline struct sip_header_field *field_pool_series_get(struct sip_header_field_pool *pool)
+{
+ struct sip_header_field *header_field;
+ if (pool && pool ->arr && pool->used < pool->size) {
+ header_field = &pool->arr[pool->used++];
+ memset(header_field, 0, sizeof(struct sip_header_field));
+ return header_field;
+ }
+ return NULL;
+}
+static inline void stream_buffer_deinit(struct stream_buffer *sb)
+{
+ sb->buf_size = 0;
+ sb->buf_off = 0;
+ if (sb->buf) {
+ free(sb->buf);
+ }
+}
+
+static inline void stream_buffer_init(struct stream_buffer *sb, size_t size)
+{
+ sb->buf_size = size;
+ sb->buf_off = 0;
+ sb->buf = NULL;
+}
+
+static inline int stream_buffer_is_full(struct stream_buffer *sb)
+{
+ if (sb->buf && sb->buf_off == sb->buf_size) {
+ return 1;
+ }
+ return 0;
+}
+
+static inline int stream_buffer_is_empty(struct stream_buffer *sb)
+{
+ if (sb->buf == NULL || sb->buf_off == 0) {
+ return 1;
+ }
+ return 0;
+}
+
+static inline void stream_buffer_reset(struct stream_buffer *sb)
+{
+ if (sb->buf && sb->buf_off > 0) {
+ sb->buf_off = 0;
+ }
+}
+
+static inline void stream_buffer_append(struct stream_buffer *sb, const char *data, size_t data_len)
+{
+ if (sb->buf == NULL && sb->buf_size > 0) {
+ sb->buf = (char *)malloc(sb->buf_size);
+ sb->buf_off = 0;
+ }
+ if (sb->buf_off + data_len > sb->buf_size) {
+ data_len = sb->buf_size - sb->buf_off;
+ }
+ memcpy(sb->buf + sb->buf_off, data, data_len);
+}
+
+static inline void stream_buffer_slide(struct stream_buffer *sb, size_t offset)
+{
+ if (sb->buf) {
+ if (sb->buf_off > offset) {
+ memmove(sb->buf, sb->buf + offset, sb->buf_off - offset);
+ sb->buf_off = sb->buf_off - offset;
+ } else {
+ stream_buffer_reset(sb);
+ }
+ }
+}
+
+static inline void stream_buffer_get_data(struct stream_buffer *sb, const char **data, size_t *data_len, size_t offset)
+{
+ *data = NULL;
+ *data_len = 0;
+ if (sb->buf && sb->buf_off > offset) {
+ *data = sb->buf + offset;
+ *data_len = sb->buf_off - offset;
+ }
+}
+
+static void sip_message_free(struct sip_message *msg)
+{
+ if (msg) {
+ free(msg);
+ }
+}
+
+static struct sip_message* sip_message_new(void)
+{
+ struct sip_message *msg;
+ msg = (struct sip_message *)calloc(1, sizeof(struct sip_message));
+ return msg;
+}
+
+void sip_message_print(struct sip_message *msg)
+{
+ if (msg == NULL) {
+ printf("NULL SIP message\n");
+ return;
+ }
+
+ printf("SIP Message Type: %d\n", msg->type);
+ printf("Call State: %d\n", msg->call_state);
+ printf("Transaction Sequence: %d\n", msg->transaction_seq);
+
+ if (msg->type == SIP_MESSAGE_TYPE_REQUEST) {
+ printf("Request Line:\n");
+ printf(" Method: %d\n", msg->request_line.method);
+ printf(" URI: %.*s\n", (int)msg->request_line.uri_len, msg->request_line.uri);
+ printf(" Version: %.*s\n", (int)msg->request_line.version_len, msg->request_line.version);
+ } else if (msg->type == SIP_MESSAGE_TYPE_RESPONSE) {
+ printf("Status Line:\n");
+ printf(" Version: %.*s\n", (int)msg->status_line.version_len, msg->status_line.version);
+ printf(" Code: %.*s\n", (int)msg->status_line.code_len, msg->status_line.code);
+ printf(" Reason: %.*s\n", (int)msg->status_line.reason_len, msg->status_line.reason);
+ }
+
+ printf("Headers:\n");
+ for (size_t i = 0; i < msg->header.n_header_fields; i++) {
+ struct sip_header_field *field = &msg->header.header_fields[i];
+ printf(" %.*s: %.*s\n", (int)field->field_name_len, field->field_name,
+ (int)field->field_value_len, field->field_value);
+ }
+
+ if (msg->body.body_len > 0) {
+ printf("Body:\n");
+ printf(" Media IP: %.*s\n", (int)msg->body.media_ip_len, msg->body.media_ip);
+ printf(" Media Audio Port: %u\n", msg->body.media_audio_port);
+ printf(" Media Video Port: %u\n", msg->body.media_video_port);
+ printf(" Body: %.*s\n", (int)msg->body.body_len, msg->body.body);
+ } else {
+ printf("Body: Empty\n");
+ }
+}
+
+int sip_message_publish(struct sip_decoder *decoder, struct sip_message *msg, void *arg)
+{
+ UNUSED(arg);
+ int topic_id;
+ struct mq_runtime *runtime;
+
+ runtime = module_manager_get_mq_runtime(decoder->mod_mgr);
+ if (runtime == NULL) {
+ return -1;
+ }
+
+ switch (msg->type) {
+ case SIP_MESSAGE_TYPE_REQUEST:
+ topic_id = decoder->request_message_topic_id;
+ break;
+ case SIP_MESSAGE_TYPE_RESPONSE:
+ topic_id = decoder->response_message_topic_id;
+ break;
+ default:
+ return -1;
+ }
+
+ //sip_message_print(msg);
+
+ return mq_runtime_publish_message(runtime, topic_id, msg);
+}
+
+static void sip_message_dispatch(int topic, void *msg, on_msg_cb_func *msg_cb, void *arg, void *dispatch_arg)
+{
+ struct sip_message *sip_msg;
+ struct sip_decoder *decoder;
+
+ sip_msg = (struct sip_message *)msg;
+ decoder = (struct sip_decoder *)dispatch_arg;
+
+ if (topic == decoder->request_message_topic_id) {
+ sip_request_message_callback_func *req_cb = (sip_request_message_callback_func *)(void *)msg_cb;
+ req_cb(sip_msg->sess_ref, &sip_msg->request_line, &sip_msg->header, &sip_msg->body, arg);
+ return;
+ }
+
+ if (topic == decoder->response_message_topic_id) {
+ sip_response_message_callback_func *res_cb = (sip_response_message_callback_func *)(void *)msg_cb;
+ res_cb(sip_msg->sess_ref, &sip_msg->status_line, &sip_msg->header, &sip_msg->body, arg);
+ return;
+ }
+}
+
+static int sip_message_subscribe(struct module_manager *mod_mgr, const char *topic_name, on_msg_cb_func *cb, void *arg)
+{
+ int topic;
+ struct mq_schema *schema;
+
+ if (mod_mgr == NULL) {
+ return -1;
+ }
+
+ schema = module_manager_get_mq_schema(mod_mgr);
+ if (schema == NULL) {
+ return -1;
+ }
+
+ topic = mq_schema_get_topic_id(schema, topic_name);
+ if (topic < 0) {
+ return -1;
+ }
+
+ return mq_schema_subscribe(schema, topic, (on_msg_cb_func *)(void *)cb, arg);
+}
+
+static int sip_transaction_call_state_check(struct sip_transaction *transaction, enum sip_call_state call_state)
+{
+ return transaction->call_state == call_state;
+}
+
+static void sip_transaction_call_state_set(struct sip_transaction *transaction, enum sip_call_state call_state)
+{
+ transaction->call_state = call_state;
+}
+
+static void sip_transaction_call_state_update(struct sip_transaction *transaction, struct sip_message *msg)
+{
+ if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_OPENING)) {
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_WAITING);
+ }
+
+ if (msg->type == SIP_MESSAGE_TYPE_REQUEST) {
+ enum sip_method method = msg->request_line.method;
+ switch (method) {
+ case SIP_METHOD_INVITE:
+ if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_WAITING)) {
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_CALLING);
+ }
+ break;
+ case SIP_METHOD_BYE:
+ if (transaction->dir != SIP_DIR_DOUBLE) {
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_DISCONNECTED);
+ } else {
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_DISCONNECTING);
+ }
+ break;
+ case SIP_METHOD_ACK:
+ if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_CONNECTING)) {
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_CONFIRMED);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ if (msg->type == SIP_MESSAGE_TYPE_RESPONSE) {
+ int stauts_code = strntoi((char*)msg->status_line.code, msg->status_line.code_len);
+ enum sip_method cseq_method = msg->header.cseq_method;
+
+ // rfc3261#section-7.2
+ switch (stauts_code/100) {
+ // status code 1xx
+ case 1:
+ if (cseq_method == SIP_METHOD_INVITE) {
+ if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_CALLING)) {
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_EARLY);
+ }
+ if (transaction->dir != SIP_DIR_DOUBLE &&
+ sip_transaction_call_state_check(transaction, SIP_CALL_STATE_WAITING)) {
+
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_EARLY);
+ }
+ }
+ break;
+ case 2:
+ if (stauts_code == 200) {
+ if (cseq_method == SIP_METHOD_INVITE) {
+
+ if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_EARLY) ||
+ sip_transaction_call_state_check(transaction, SIP_CALL_STATE_CALLING)) {
+
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_CONNECTING);
+ }
+ }
+ if (cseq_method == SIP_METHOD_BYE) {
+ if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_DISCONNECTING)) {
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_DISCONNECTED);
+ }
+
+ if (transaction->dir != SIP_DIR_DOUBLE &&
+ !sip_transaction_call_state_check(transaction, SIP_CALL_STATE_WAITING)) {
+
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_DISCONNECTED);
+ }
+ }
+ }
+ break;
+ case 5:
+ if (!sip_transaction_call_state_check(transaction, SIP_CALL_STATE_WAITING)) {
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_DISCONNECTED);
+ }
+ break;
+ case 3:
+ case 4:
+ break;
+ default:
+ break;
+ }
+ }
+}
+
+static void sip_transaction_dir_update(struct sip_transaction *transaction, struct sip_message *msg)
+{
+ if (transaction->dir != SIP_DIR_DOUBLE) {
+ switch (transaction->dir) {
+ case SIP_DIR_UNKNOWN:
+ if (msg->type == SIP_MESSAGE_TYPE_REQUEST) {
+ transaction->dir = SIP_DIR_REQUEST;
+ }
+ if (msg->type == SIP_MESSAGE_TYPE_RESPONSE){
+ transaction->dir = SIP_DIR_RESPONSE;
+ }
+ break;
+ case SIP_DIR_REQUEST:
+ if (msg->type == SIP_MESSAGE_TYPE_RESPONSE) {
+ transaction->dir = SIP_DIR_DOUBLE;
+ }
+ break;
+ case SIP_DIR_RESPONSE:
+ if (msg->type == SIP_MESSAGE_TYPE_REQUEST) {
+ transaction->dir = SIP_DIR_DOUBLE;
+ }
+ break;
+ case SIP_DIR_DOUBLE:
+ break;
+ default:
+ break;
+ }
+ }
+}
+
+static void sip_transaction_free(struct sip_transaction *transaction)
+{
+ if (transaction) {
+ if (transaction->key) {
+ free(transaction->key);
+ }
+ free(transaction);
+ }
+}
+
+static struct sip_transaction* sip_transaction_new(void)
+{
+ return (struct sip_transaction *)calloc(1, sizeof(struct sip_transaction));
+}
+
+static void sip_transaction_close(struct sip_decoder *decoder, struct sip_exdata *exdata, struct sip_transaction *transaction)
+{
+ UNUSED(decoder);
+ UNUSED(exdata);
+// int ret;
+// struct sip_message *msg;
+
+ sip_transaction_call_state_set(transaction, SIP_CALL_STATE_CLOSING);
+
+// msg = sip_message_new();
+// msg->sess_ref = exdata->sess_ref;
+// msg->transaction_seq = transaction->seq;
+// msg->call_state = SIP_CALL_STATE_CLOSING;
+// msg->type = SIP_MESSAGE_TYPE_REQUEST;
+// ret = sip_message_publish(decoder, msg, NULL);
+// if (ret < 0) {
+// sip_message_free(msg);
+// }
+//
+// msg = sip_message_new();
+// msg->sess_ref = exdata->sess_ref;
+// msg->transaction_seq = transaction->seq;
+// msg->call_state = SIP_CALL_STATE_CLOSING;
+// msg->type = SIP_MESSAGE_TYPE_RESPONSE;
+// ret = sip_message_publish(decoder, msg, NULL);
+// if (ret < 0) {
+// sip_message_free(msg);
+// }
+
+ sip_transaction_free(transaction);
+}
+
+static struct sip_transaction *sip_transaction_open(struct sip_decoder *decoder, struct sip_exdata *exdata, const char *call_id, size_t call_id_len)
+{
+ UNUSED(decoder);
+ UNUSED(exdata);
+ // int ret;
+// struct sip_message *msg;
+ struct sip_transaction *transaction;
+
+ transaction = sip_transaction_new();
+ transaction->seq = exdata->transaction_seq++;
+ transaction->call_state = SIP_CALL_STATE_OPENING;
+ transaction->key = malloc(call_id_len);
+ transaction->key_len = call_id_len;
+ memcpy(transaction->key, call_id, call_id_len);
+
+// msg = sip_message_new();
+// msg->sess_ref = exdata->sess_ref;
+// msg->transaction_seq = transaction->seq;
+// msg->call_state = SIP_CALL_STATE_OPENING;
+// msg->type = SIP_MESSAGE_TYPE_REQUEST;
+// ret = sip_message_publish(decoder, msg, NULL);
+// if (ret < 0) {
+// sip_message_free(msg);
+// }
+// msg = sip_message_new();
+// msg->sess_ref = exdata->sess_ref;
+// msg->transaction_seq = transaction->seq;
+// msg->call_state = SIP_CALL_STATE_OPENING;
+// msg->type = SIP_MESSAGE_TYPE_RESPONSE;
+// ret = sip_message_publish(decoder, msg, NULL);
+// if (ret < 0) {
+// sip_message_free(msg);
+// }
+
+ return transaction;
+}
+
+static void sip_transaction_prune(struct sip_decoder *decoder, struct sip_exdata *exdata, struct timeval ts)
+{
+ struct sip_transaction *transaction = NULL, *tmp = NULL, *oldest;
+
+ if (exdata->transaction_table == NULL || decoder->transaction_timeout_ms == 0) {
+ return;
+ }
+
+ if (decoder->transaction_limit_count > 0 &&
+ decoder->transaction_limit_count <= (int)HASH_CNT(hh, exdata->transaction_table)) {
+ oldest = exdata->transaction_table;
+ HASH_DELETE(hh, exdata->transaction_table, oldest);
+ sip_transaction_close(decoder, exdata, oldest);
+ }
+
+ HASH_ITER(hh, exdata->transaction_table, transaction, tmp) {
+ if (timeval_delta_ms(transaction->last_update, ts) < decoder->transaction_timeout_ms) {
+ break;
+ }
+
+ HASH_DELETE(hh, exdata->transaction_table, transaction);
+ sip_transaction_close(decoder, exdata, transaction);
+ }
+}
+
+static void sip_transaction_process(struct sip_decoder *decoder, struct sip_exdata *exdata, struct sip_message *msgs[], size_t n_msgs)
+{
+ int ret;
+ size_t i;
+ struct timeval ts;
+ struct sip_message *msg;
+ struct sip_transaction *transaction;
+ struct sip_header_field *call_id;
+
+ gettimeofday(&ts, NULL);
+
+ for (i = 0 ; i < n_msgs; i++) {
+ msg = msgs[i];
+ call_id = msg->header.call_id;
+
+ if (call_id == NULL || call_id->field_value == NULL || call_id->field_value_len == 0) {
+ continue;
+ }
+
+ // get transaction
+ HASH_FIND(hh, exdata->transaction_table, call_id->field_value, call_id->field_value_len, transaction);
+ if (transaction == NULL) {
+ transaction = sip_transaction_open(decoder, exdata, call_id->field_value, call_id->field_value_len);
+ } else {
+ HASH_DELETE(hh, exdata->transaction_table, transaction);
+ }
+ HASH_ADD_KEYPTR(hh, exdata->transaction_table, transaction->key, transaction->key_len, transaction);
+
+ // update transaction
+ transaction->last_update = ts;
+ sip_transaction_dir_update(transaction, msg);
+ sip_transaction_call_state_update(transaction, msg);
+
+ // publish message
+ msg->transaction_seq = transaction->seq;
+ msg->call_state = transaction->call_state;
+ ret = sip_message_publish(decoder, msg, NULL);
+ if (ret < 0) {
+ sip_message_free(msg);
+ }
+
+ // transaction that call state is disconnected, should be closed immediately
+ if (sip_transaction_call_state_check(transaction, SIP_CALL_STATE_DISCONNECTED)) {
+ HASH_DELETE(hh, exdata->transaction_table, transaction);
+ sip_transaction_close(decoder, exdata, transaction);
+ }
+
+ // timeout transaction
+ sip_transaction_prune(decoder, exdata, ts);
+ }
+}
+
+int sip_decode_sdp_media(struct sip_message *msg, const char *line_start, size_t line_len)
+{
+ int ret;
+ struct iovec parts[SDP_MEDIA_PART_NUM];
+ const char *media;
+ size_t media_len;
+ const char *port;
+ size_t port_len;
+
+ if (line_start == NULL || line_len <= 2) {
+ return -1;
+ }
+
+ // skip "m="
+ line_start += strlen("m=");
+ line_len -= strlen("m=");
+
+ ret = strnsplit(parts, SIP_DIM(parts), ' ', line_start, line_len);
+ if (ret != SIP_DIM(parts)) {
+ return -1;
+ }
+
+ media = (const char *)parts[SDP_MEDIA_PART_MEDIA].iov_base;
+ media_len = parts[SDP_MEDIA_PART_MEDIA].iov_len;
+ port = (const char *)parts[SDP_MEDIA_PART_PORT].iov_base;
+ port_len = parts[SDP_MEDIA_PART_PORT].iov_len;
+
+ if (0 == SAFE_STRNCASECMP(media, media_len, "audio", strlen("audio"))) {
+ msg->body.media_audio_port = strntoi((char*)port, port_len);
+ return 0;
+ }
+ if (0 == SAFE_STRNCASECMP(media, media_len, "video", strlen("video"))) {
+ msg->body.media_video_port = strntoi((char*)port, port_len);
+ return 0;
+ }
+
+ return 0;
+}
+
+int sip_decode_sdp_connection(struct sip_message *msg, const char *line_start, size_t line_len)
+{
+ int ret;
+ struct iovec parts[SDP_CONNECTION_PART_NUM];
+ const char *addr;
+ size_t addr_len;
+ const char *nettype;
+ size_t nettype_len;
+
+ if (line_start == NULL || line_len <= 2) {
+ return -1;
+ }
+
+ // skip "c="
+ line_start += strlen("c=");
+ line_len -= strlen("c=");
+
+ ret = strnsplit(parts, SIP_DIM(parts), ' ', line_start, line_len);
+ if (ret != SIP_DIM(parts)) {
+ return -1;
+ }
+
+ addr = (const char *)parts[SDP_CONNECTION_PART_ADDRESS].iov_base;
+ addr_len = parts[SDP_CONNECTION_PART_ADDRESS].iov_len;
+ nettype = (const char *)parts[SDP_CONNECTION_PART_NETTYPE].iov_base;
+ nettype_len = parts[SDP_CONNECTION_PART_NETTYPE].iov_len;
+
+ if (0 == SAFE_STRNCASECMP(nettype, nettype_len, "IN", strlen("IN"))) {
+ msg->body.media_ip = addr;
+ msg->body.media_ip_len = addr_len;
+ return 0;
+ }
+
+ return 0;
+}
+
+static int sip_decode_body(struct sip_message *msg, const char *body, size_t body_len)
+{
+ int ret;
+ size_t remain, offset;
+ struct sip_header_field *content_type;
+
+ if (body == NULL || body_len == 0) {
+ return -1;
+ }
+
+ msg->body.body = body;
+ msg->body.body_len = body_len;
+
+ content_type = msg->header.content_type;
+ if (content_type == NULL) {
+ return 0;
+ }
+
+ // parse sdp media
+ if (0 != SAFE_STRNCASECMP(content_type->field_value, content_type->field_value_len, "application/sdp", strlen("application/sdp"))) {
+ return 0;
+ }
+
+ msg->body.sdp_content = body;
+ msg->body.sdp_content_len = body_len;
+
+ offset = 0;
+ remain = body_len;
+
+ // body lines
+ while (remain > 0) {
+ const char *line_start, *line_end;
+ size_t line_len;
+
+ // line
+ line_start = body + offset;
+ line_end = (const char *)memmem(line_start, remain, SIP_LINE_END, strlen(SIP_LINE_END));
+ if (line_end == NULL || line_end == line_start) {
+ break;
+ }
+
+ line_len = line_end - line_start + strlen(SIP_LINE_END);
+ switch (line_start[0]) {
+ case 'c':
+ ret = sip_decode_sdp_connection(msg, line_start, line_len - strlen(SIP_LINE_END));
+ if (ret != 0) {
+ return -1;
+ }
+ break;
+ case 'm':
+ ret = sip_decode_sdp_media(msg, line_start, line_len - strlen(SIP_LINE_END));
+ if (ret != 0) {
+ return -1;
+ }
+ break;
+ default:
+ break;
+ }
+
+ offset += line_len;
+ remain -= line_len;
+ }
+ return 0;
+}
+
+static int sip_decode_header_field(struct sip_message *msg, const char *line_start, size_t line_len)
+{
+ int ret;
+ struct iovec field_parts[SIP_HEADER_FIELD_PART_NUM];
+ struct sip_header_field *header_field;
+ struct sip_header *header = &msg->header;
+
+ if (line_start == NULL || line_len == 0) {
+ return -1;
+ }
+
+ ret = strnsplit(field_parts, SIP_DIM(field_parts), ':', line_start, line_len);
+ if (ret != SIP_DIM(field_parts)) {
+ return -1;
+ }
+
+ header_field = field_pool_series_get(msg->header_field_pool);
+ if (header_field == NULL) {
+ return -1;
+ }
+
+ if (header->header_fields == NULL) {
+ header->header_fields = header_field;
+ }
+ header->n_header_fields++;
+ header_field->field_name = (const char*)field_parts[SIP_HEADER_FIELD_PART_NAME].iov_base;
+ header_field->field_name_len = field_parts[SIP_HEADER_FIELD_PART_NAME].iov_len;
+ header_field->field_value = (const char*)field_parts[SIP_HEADER_FIELD_PART_VALUE].iov_base;
+ header_field->field_value_len = field_parts[SIP_HEADER_FIELD_PART_VALUE].iov_len;
+
+ if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Call-ID", strlen("Call-ID"))) {
+ msg->header.call_id = header_field;
+ return 0;
+ }
+
+ if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "From", strlen("From"))) {
+ msg->header.from = header_field;
+ return 0;
+ }
+
+ if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "To", strlen("To"))) {
+ msg->header.to = header_field;
+ }
+
+ if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "CSeq", strlen("CSeq"))) {
+ msg->header.cseq = header_field;
+
+ struct iovec cseq_parts[SIP_CSEQ_PART_NUM];
+ ret = strnsplit(cseq_parts, SIP_DIM(cseq_parts), ' ', header_field->field_value, header_field->field_value_len);
+ if (ret != SIP_DIM(cseq_parts)) {
+ return 0;
+ }
+ msg->header.cseq_method = sip_strn2method((const char *)cseq_parts[SIP_CSEQ_PART_METHOD].iov_base,
+ cseq_parts[SIP_CSEQ_PART_METHOD].iov_len);
+ return 0;
+ }
+
+ if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Via", strlen("Via"))) {
+ msg->header.via = header_field;
+ return 0;
+ }
+
+ if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Server", strlen("Server"))) {
+ msg->header.server = header_field;
+ return 0;
+ }
+
+ if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Reason", strlen("Reason"))) {
+ msg->header.reason = header_field;
+ return 0;
+ }
+
+ if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "User-Agent", strlen("User-Agent"))) {
+ msg->header.user_agent = header_field;
+ return 0;
+ }
+
+ if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Content-Type", strlen("Content-Type"))) {
+ msg->header.content_type = header_field;
+ return 0;
+ }
+
+ if (0 == SAFE_STRNCASECMP(header_field->field_name, header_field->field_name_len, "Content-Length", strlen("Content-Length"))) {
+ msg->header.content_length = header_field;
+ return 0;
+ }
+
+ return 0;
+}
+
+int sip_decode_header(struct sip_message *msg, const char *header_start, size_t header_len)
+{
+ int ret = 0;
+ size_t remain, offset;
+
+ offset = 0;
+ remain = header_len;
+
+ // header lines
+ while (remain > 0) {
+ const char *line_start, *line_end;
+ size_t line_len;
+
+ // line
+ line_start = header_start + offset;
+ line_end = (const char *)memmem(line_start, remain, SIP_LINE_END, strlen(SIP_LINE_END));
+ if (line_end == NULL || line_end == line_start) {
+ break;
+ }
+
+ line_len = line_end - line_start + strlen(SIP_LINE_END);
+ ret = sip_decode_header_field(msg, line_start, line_len - strlen(SIP_LINE_END));
+ if (ret != 0) {
+ break;
+ }
+
+ offset += line_len;
+ remain -= line_len;
+ }
+
+ return ret;
+}
+
+int sip_decode_start_line(struct sip_message *msg, const char *payload, size_t payload_len)
+{
+ int ret;
+ enum sip_method method;
+ enum sip_version version;
+ struct iovec parts[SIP_START_LINE_PART_NUM];
+
+ ret = strnsplit(parts, SIP_DIM(parts), ' ', payload, payload_len);
+ if (ret != SIP_DIM(parts)) {
+ return -1;
+ }
+
+ // try to decode as request line
+ method = sip_strn2method((const char *)parts[SIP_REQUEST_LINE_PART_METHOD].iov_base, parts[SIP_REQUEST_LINE_PART_METHOD].iov_len);
+ if (method != SIP_METHOD_UNKNOWN) {
+ msg->request_line.method = method;
+ msg->request_line.uri = (const char *)parts[SIP_REQUEST_LINE_PART_URI].iov_base;
+ msg->request_line.uri_len = parts[SIP_REQUEST_LINE_PART_URI].iov_len;
+ msg->request_line.version = (const char *)parts[SIP_REQUEST_LINE_PART_VERSION].iov_base;
+ msg->request_line.version_len = parts[SIP_REQUEST_LINE_PART_VERSION].iov_len;
+ msg->type = SIP_MESSAGE_TYPE_REQUEST;
+ return 0;
+ }
+
+ // try to decode as status line
+ version = sip_strn2version((const char *)parts[SIP_STATUS_LINE_PART_VERSION].iov_base, parts[SIP_STATUS_LINE_PART_VERSION].iov_len);
+ if (version != SIP_VERSION_UNKNOWN) {
+ msg->status_line.version = (const char *)parts[SIP_STATUS_LINE_PART_VERSION].iov_base;
+ msg->status_line.version_len = parts[SIP_STATUS_LINE_PART_VERSION].iov_len;
+ msg->status_line.code = (const char *)parts[SIP_STATUS_LINE_PART_CODE].iov_base;
+ msg->status_line.code_len = parts[SIP_STATUS_LINE_PART_CODE].iov_len;
+ msg->status_line.reason = (const char *)parts[SIP_STATUS_LINE_PART_REASON].iov_base;
+ msg->status_line.reason_len = parts[SIP_STATUS_LINE_PART_REASON].iov_len;
+ msg->type = SIP_MESSAGE_TYPE_RESPONSE;
+ return 0;
+ }
+
+ return -1;
+}
+
+int sip_decode(struct sip_message *msg, const char *payload, size_t payload_len)
+{
+ int ret = 0;
+ int content_length;
+ size_t offset;
+ size_t line_len = 0;
+ size_t header_len = 0;
+ size_t body_len = 0;
+ const char *line_start = NULL;
+ const char *header_start = NULL;
+ const char *body_start = NULL;
+ const char *line_end;
+ const char *header_end;
+
+ offset = 0;
+
+ // check start-line complete
+ line_start = payload + offset;
+ line_end = (const char *)memmem(line_start, payload_len - offset, SIP_LINE_END, strlen(SIP_LINE_END));
+ if (line_end == NULL) {
+ goto exit;
+ }
+
+ line_len = line_end - line_start + strlen(SIP_LINE_END);
+ offset += line_len;
+
+ if (offset > payload_len) {
+ goto exit;
+ }
+
+ // decode start line
+ ret = sip_decode_start_line(msg, line_start, line_len);
+ if (ret != 0) {
+ ret = -1;
+ goto exit;
+ }
+
+ // check message-header complete
+ header_start = payload + offset;
+ header_end = (const char *)memmem(header_start, payload_len - offset, SIP_HEADER_END, strlen(SIP_HEADER_END));
+ if (header_end == NULL) {
+ goto exit;
+ }
+
+ header_len = header_end - header_start + strlen(SIP_HEADER_END);
+ offset += header_len;
+
+ if (offset > payload_len) {
+ goto exit;
+ }
+
+ // decode header
+ ret = sip_decode_header(msg, header_start, header_len);
+ if (ret != 0) {
+ ret = -1;
+ goto exit;
+ }
+
+ // check body complete
+ body_start = NULL;
+ body_len = 0;
+ if (msg->header.content_length &&
+ msg->header.content_length->field_value && msg->header.content_length->field_value_len > 0) {
+ content_length = strntoi((char*)msg->header.content_length->field_value, msg->header.content_length->field_value_len);
+ if (content_length > 0) {
+ body_start = payload + offset;
+ body_len = content_length;
+
+ offset += body_len;
+ if (offset > payload_len) {
+ goto exit;
+ }
+
+ ret = sip_decode_body(msg, body_start, body_len);
+ if (ret != 0) {
+ goto exit;
+ }
+ }
+ }
+
+ ret = offset;
+exit:
+ return ret;
+}
+
+static enum sip_identify_state sip_identify(struct sip_decoder *decoder, struct sip_exdata *exdata, const char *payload, size_t payload_len)
+{
+ UNUSED(decoder);
+ int ret;
+ size_t i;
+ size_t start_line_len = 0;
+ const char *start_line = NULL;
+ struct iovec parts[SIP_START_LINE_PART_NUM];
+ enum sip_method method;
+ enum sip_version version;
+
+ if (exdata->identify_state == SIP_IDENTIFY_STATE_FALSE ||
+ exdata->identify_state == SIP_IDENTIFY_STATE_TRUE) {
+ goto exit;
+ }
+
+ if (exdata->identify_times++ > SIP_IDENTIFY_TIMES_MAX) {
+ exdata->identify_state = SIP_IDENTIFY_STATE_FALSE;
+ goto exit;
+ }
+
+ // skip first '\r\n'
+ for (i = 0; i < payload_len; i++) {
+ if (payload[i] != '\r' && payload[i] != '\n') {
+ payload += i;
+ payload_len -= i;
+ break;
+ }
+ }
+
+ if (payload_len == 0) {
+ exdata->identify_state = SIP_IDENTIFY_STATE_HALF_TRUE;
+ goto exit;
+ }
+
+ start_line = payload;
+ for (i = 0; i < payload_len; i++) {
+ if (payload[i] == '\r' || payload[i] == '\n') {
+ start_line_len = i;
+ break;
+ }
+ }
+
+ if (start_line_len == 0) {
+ exdata->identify_state = SIP_IDENTIFY_STATE_FALSE;
+ goto exit;
+ }
+
+ // split start line
+ ret = strnsplit(parts, SIP_DIM(parts), ' ', (char *)start_line, start_line_len);
+ if (ret != SIP_START_LINE_PART_NUM) {
+ exdata->identify_state = SIP_IDENTIFY_STATE_FALSE;
+ goto exit;
+ }
+
+ // it's sip request ?
+ method = sip_strn2method((const char *)parts[SIP_REQUEST_LINE_PART_METHOD].iov_base,
+ parts[SIP_REQUEST_LINE_PART_METHOD].iov_len);
+ version = sip_strn2version((const char *)parts[SIP_REQUEST_LINE_PART_VERSION].iov_base,
+ parts[SIP_REQUEST_LINE_PART_VERSION].iov_len);
+ if (method != SIP_METHOD_UNKNOWN && version != SIP_VERSION_UNKNOWN) {
+ exdata->identify_state = SIP_IDENTIFY_STATE_TRUE;
+ goto exit;
+ }
+
+ // it's sip response ?
+ version = sip_strn2version((const char *)parts[SIP_STATUS_LINE_PART_VERSION].iov_base,
+ parts[SIP_STATUS_LINE_PART_VERSION].iov_len);
+ if (version != SIP_VERSION_UNKNOWN && parts[SIP_STATUS_LINE_PART_CODE].iov_len == SIP_STATUS_CODE_LEN) {
+ exdata->identify_state = SIP_IDENTIFY_STATE_TRUE;
+ goto exit;
+ }
+
+ exdata->identify_state = SIP_IDENTIFY_STATE_FALSE;
+exit:
+ return exdata->identify_state;
+}
+
+static void sip_exdata_free(struct sip_exdata *exdata)
+{
+ struct sip_transaction *transaction = NULL, *tmp = NULL;
+
+ if (exdata) {
+ if (exdata->transaction_table) {
+ HASH_ITER(hh, exdata->transaction_table, transaction, tmp) {
+ HASH_DELETE(hh, exdata->transaction_table, transaction);
+
+ sip_transaction_close(exdata->decoder_ref, exdata, transaction);
+ }
+ }
+
+ if (exdata->streambuffer.buf) {
+ free(exdata->streambuffer.buf);
+ exdata->streambuffer.buf = NULL;
+ }
+ free(exdata);
+ }
+}
+
+static struct sip_exdata *sip_exdata_new(void)
+{
+ struct sip_exdata *exdata;
+ exdata = (struct sip_exdata *)calloc(1, sizeof(struct sip_exdata));
+ stream_buffer_init(&exdata->streambuffer, SIP_STREAM_BUFFER_DEFAULT_SIZE);
+ return exdata;
+}
+
+static void sip_on_payload(struct session *sess, enum session_state state, const char *payload, size_t payload_len, void *arg)
+{
+ UNUSED(state);
+ struct sip_exdata *exdata;
+ struct sip_decoder *decoder;
+ enum sip_identify_state identify_state;
+
+ // session is closing?
+ if (payload == NULL) {
+ return;
+ }
+
+ decoder = (struct sip_decoder *)arg;
+
+ exdata = (struct sip_exdata *)session_get_exdata(sess, decoder->exdata_id);
+ if (exdata == NULL) {
+ exdata = sip_exdata_new();
+ exdata->decoder_ref = decoder;
+ exdata->sess_ref = sess;
+ session_set_exdata(sess, decoder->exdata_id, exdata);
+ }
+
+ if (exdata->sess_ignored) {
+ return;
+ }
+
+ identify_state = sip_identify(decoder, exdata, payload, payload_len);
+ if (identify_state == SIP_IDENTIFY_STATE_FALSE) {
+ exdata->sess_ignored = 1;
+ return;
+ }
+ if (identify_state != SIP_IDENTIFY_STATE_TRUE) {
+ return;
+ }
+
+ /*
+ * We need both remaining data from last sip message and
+ * current sip message
+ */
+ if (!stream_buffer_is_empty(&exdata->streambuffer)) {
+ stream_buffer_append(&exdata->streambuffer, payload, payload_len);
+ if (stream_buffer_is_full(&exdata->streambuffer)) {
+ stream_buffer_reset(&exdata->streambuffer);
+ return;
+ }
+
+ stream_buffer_get_data(&exdata->streambuffer, &payload, &payload_len, 0);
+ }
+
+ int tid, msg_cnt = 0;
+ int decoded = 0;
+ int offset;
+ int remain;
+ struct sip_message *msg;
+ struct sip_message* msg_list[SIP_PER_PAYLOAD_MESSAGE_MAX];
+ struct sip_header_field_pool *field_pool;
+
+ tid = module_manager_get_thread_id(decoder->mod_mgr);
+ field_pool = decoder->field_pools[tid];
+ field_pool_reset(field_pool);
+
+ /*
+ * There may be multiple SIP messages in the payload. We should parse all of them
+ * and cache the last incomplete one, if it exists.
+ */
+ offset = 0;
+ remain = payload_len;
+ while (remain > 0 && msg_cnt < (int)sizeof(msg_list)) {
+ msg= sip_message_new();
+ msg->sess_ref = sess;
+ msg->header_field_pool = field_pool;
+
+ decoded = sip_decode(msg, payload + offset, remain);
+ // decode failed
+ if (decoded < 0) {
+ sip_message_free(msg);
+ return;
+ }
+
+ // incomplete SIP message, break and cache it
+ if (decoded == 0) {
+ sip_message_free(msg);
+ break;
+ }
+
+ msg_list[msg_cnt++] = msg;
+
+ offset += decoded;
+ remain -= decoded;
+ }
+
+ // messages should be published or freed in sip_transaction_process
+ if (msg_cnt > 0) {
+ sip_transaction_process(decoder, exdata, msg_list, msg_cnt);
+ }
+
+ // remove decoded data
+ if (!stream_buffer_is_empty(&exdata->streambuffer)) {
+ stream_buffer_slide(&exdata->streambuffer, offset);
+ }
+
+ // cache remaining data
+ if (remain > 0) {
+ stream_buffer_append(&exdata->streambuffer, payload + offset, remain);
+ if (stream_buffer_is_full(&exdata->streambuffer)) {
+ // reach streambuffer limit, just reset
+ stream_buffer_reset(&exdata->streambuffer);
+ }
+ }
+}
+
+static void sip_on_tcp_payload(struct session *sess, enum session_state state, const char *payload, uint32_t payload_len, void *arg)
+{
+ sip_on_payload(sess, state, payload, (size_t)payload_len, arg);
+}
+
+static void sip_on_udp_packet(struct session *sess, enum session_state state, struct packet *pkt, void *arg)
+{
+ const char *payload;
+ size_t payload_len;
+
+ if (pkt == NULL) {
+ payload = NULL;
+ payload_len = 0;
+ } else {
+ payload = packet_get_payload_data(pkt);
+ payload_len = packet_get_payload_len(pkt);
+ }
+
+ sip_on_payload(sess, state, payload, payload_len, arg);
+}
+
+void sip_on_exdata_free(int idx, void *ex_ptr, void *arg)
+{
+ UNUSED(idx);
+ UNUSED(arg);
+ struct sip_exdata *exdata = (struct sip_exdata *)ex_ptr;
+
+ if (exdata) {
+ sip_exdata_free(exdata);
+ }
+}
+
+static void sip_on_message_free(void *msg, void *arg)
+{
+ UNUSED(arg);
+ if (msg) {
+ free(msg);
+ }
+}
+
+long long sip_get_call_duration_ms(struct sip_decoder *decoder, struct session *sess, const char *call_id, size_t call_id_len)
+{
+ //struct module *mod;
+ //struct sip_decoder *decoder;
+ struct sip_exdata *exdata;
+ struct sip_transaction *transaction = NULL;
+
+ //mod = module_manager_get_module(mod_mgr, SIP_MODULE_NAME);
+ //if (mod == NULL) {
+ // return -1;
+ //}
+
+ //decoder = module_get_ctx(mod);
+ //if (decoder == NULL) {
+ // return -1;
+ //}
+
+ exdata = (struct sip_exdata *)session_get_exdata(sess, decoder->exdata_id);
+ if (exdata == NULL) {
+ return -1;
+ }
+
+ HASH_FIND(hh, exdata->transaction_table, call_id, call_id_len, transaction);
+ if (transaction == NULL) {
+ return -1;
+ }
+
+ if (transaction->call_duration_ms == 0) {
+ return -1;
+ }
+
+ return transaction->call_duration_ms;
+}
+
+struct sip_decoder *module_to_sip_decoder(struct module *mod)
+{
+ assert(mod);
+ assert(strcmp(module_get_name(mod), SIP_MODULE_NAME) == 0);
+ return module_get_ctx(mod);
+}
+
+int sip_subscribe(struct sip_decoder *decoder,
+ sip_request_message_callback_func *request_cb,
+ sip_response_message_callback_func *response_cb,
+ void *arg)
+{
+ int ret;
+ ret = sip_message_subscribe(decoder->mod_mgr, SIP_REQUEST_TOPIC_NAME, (on_msg_cb_func *)(void *)request_cb, arg);
+ if (ret < 0) {
+ return ret;
+ }
+ ret = sip_message_subscribe(decoder->mod_mgr, SIP_RESPONSE_TOPIC_NAME, (on_msg_cb_func *)(void *)response_cb, arg);
+ if (ret < 0) {
+ return ret;
+ }
+ return 0;
+}
+
+void sip_exit(struct module_manager *mod_mgr, struct module *mod)
+{
+ (void)(mod_mgr);
+ int i;
+ struct sip_decoder *decoder;
+ struct mq_schema *schema;
+
+ if (mod) {
+ decoder = (struct sip_decoder *)module_get_ctx(mod);
+ if (decoder) {
+ schema = module_manager_get_mq_schema(decoder->mod_mgr);
+ mq_schema_destroy_topic(schema, decoder->request_message_topic_id);
+ mq_schema_destroy_topic(schema, decoder->response_message_topic_id);
+
+ for (i = 0; i < decoder->thread_num; i++) {
+ field_pool_free(decoder->field_pools[i]);
+ }
+ free(decoder->field_pools);
+ free(decoder);
+ }
+
+ module_free(mod);
+ }
+}
+
+struct module* sip_init(struct module_manager *mod_mgr)
+{
+ int i, ret, thread_num;
+ struct mq_schema *schema;
+ struct session_manager *sess_mgr;
+ struct module *mod;
+ struct sip_decoder *decoder;
+
+ decoder = (struct sip_decoder *)calloc(1, sizeof(struct sip_decoder));
+ decoder->mod_mgr = mod_mgr;
+ mod = module_new(SIP_MODULE_NAME, decoder);
+ sess_mgr = module_to_session_manager(module_manager_get_module(mod_mgr, SESSION_MANAGER_MODULE_NAME));
+ schema = module_manager_get_mq_schema(mod_mgr);
+
+ if (sess_mgr == NULL || schema == NULL) {
+ goto exit;
+ }
+
+ decoder->exdata_id = session_manager_new_session_exdata_index(sess_mgr, SIP_EXDATA_NAME, sip_on_exdata_free, NULL);
+ if (decoder->exdata_id < 0) {
+ goto exit;
+ }
+
+ ret = session_manager_subscribe_udp(sess_mgr, sip_on_udp_packet, decoder);
+ if (ret < 0) {
+ goto exit;
+ }
+
+ ret = session_manager_subscribe_tcp_stream(sess_mgr, sip_on_tcp_payload, decoder);
+ if (ret < 0) {
+ goto exit;
+ }
+
+ decoder->request_message_topic_id = mq_schema_create_topic(schema, SIP_REQUEST_TOPIC_NAME, sip_message_dispatch, decoder, sip_on_message_free, NULL);
+ if (decoder->request_message_topic_id < 0) {
+ goto exit;
+ }
+
+ decoder->response_message_topic_id = mq_schema_create_topic(schema, SIP_RESPONSE_TOPIC_NAME, sip_message_dispatch, decoder, sip_on_message_free, NULL);
+ if (decoder->response_message_topic_id < 0) {
+ goto exit;
+ }
+
+ // init per thread sip header field pools
+ thread_num = module_manager_get_max_thread_num(mod_mgr);
+ decoder->thread_num = thread_num;
+ decoder->field_pools = (struct sip_header_field_pool **)calloc(thread_num, sizeof(struct sip_header_field_pool *));
+ for (i = 0; i < thread_num; i++) {
+ decoder->field_pools[i] = field_pool_new(SIP_HEADER_FIELD_POOL_DEFAULT_SIZE);
+ }
+
+ return mod;
+exit:
+ sip_exit(mod_mgr, mod);
+ return NULL;
+}
+
+
diff --git a/decoders/sip/sip_internal.h b/decoders/sip/sip_internal.h
new file mode 100644
index 0000000..4acd653
--- /dev/null
+++ b/decoders/sip/sip_internal.h
@@ -0,0 +1,193 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "uthash/uthash.h"
+
+#ifndef UNUSED
+#define UNUSED(x) (void)(x)
+#endif
+#define SIP_DIM(a) (sizeof (a) / sizeof ((a)[0]))
+
+#define timeval_usec(t) ((t).tv_usec)
+#define timeval_sec(t) ((t).tv_sec)
+#define timeval_add(a, b, r) \
+ do { \
+ (r).tv_sec = (a).tv_sec + (b).tv_sec; \
+ (r).tv_usec = (a).tv_usec + (b).tv_usec; \
+ } while (0)
+#define timeval_cmp(a, b, CMP) \
+ ((timeval_sec(a) == timeval_sec(b)) ? (timeval_usec(a) CMP timeval_usec(b)) \
+ : (timeval_sec(a) CMP timeval_sec(b)))
+#define timeval_cmp_gte(a, b) timeval_cmp((a), (b), >=)
+#define timeval_cmp_gt(a, b) timeval_cmp((a), (b), >)
+#define timeval_cmp_lt(a, b) timeval_cmp((a), (b), <)
+#define timeval_cmp_lte(a, b) timeval_cmp((a), (b), <=)
+#define timeval_cmp_neq(a, b) timeval_cmp((a), (b), !=)
+
+#define timeval_delta_ms(start, end) (((end).tv_sec-(start).tv_sec)*1000 + ((end).tv_usec-(start).tv_usec)/1000)
+#define timeval_delta_us(start, end) (((end).tv_sec-(start).tv_sec)*1000*1000 + ((end).tv_usec-(start).tv_usec))
+#define timeval_to_ms(t) ((t).tv_sec*1000+(t).tv_usec/1000)
+
+#define SAFE_STRNCASECMP(s1, s1_len, s2, s2_len) ((s1_len) >= (s2_len) ? strncasecmp(s1, s2, s2_len) : -1)
+
+#define SIP_EXDATA_NAME "SIP_EXDATA"
+#define SIP_REQUEST_TOPIC_NAME "SIP_REQUEST"
+#define SIP_RESPONSE_TOPIC_NAME "SIP_RESPONSE"
+#define SIP_TRANSACTION_TOPIC_NAME "SIP_TRANSACTION"
+#define SIP_IDENTIFY_TIMES_MAX 128
+
+#define SIP_REQUEST_LINE_PART_METHOD 0
+#define SIP_REQUEST_LINE_PART_URI 1
+#define SIP_REQUEST_LINE_PART_VERSION 2
+#define SIP_STATUS_LINE_PART_VERSION 0
+#define SIP_STATUS_LINE_PART_CODE 1
+#define SIP_STATUS_LINE_PART_REASON 2
+#define SIP_START_LINE_PART_NUM 3
+#define SIP_HEADER_FIELD_PART_NAME 0
+#define SIP_HEADER_FIELD_PART_VALUE 1
+#define SIP_HEADER_FIELD_PART_NUM 2
+#define SDP_MEDIA_PART_MEDIA 0
+#define SDP_MEDIA_PART_PORT 1
+#define SDP_MEDIA_PART_PROTO 2
+#define SDP_MEDIA_PART_FMT 3
+#define SDP_MEDIA_PART_NUM 4
+#define SDP_CONNECTION_PART_NETTYPE 0
+#define SDP_CONNECTION_PART_ADDRTYPE 1
+#define SDP_CONNECTION_PART_ADDRESS 2
+#define SDP_CONNECTION_PART_NUM 3
+
+#define SIP_FROM_PART_ADDR 0
+#define SIP_FROM_PART_TAG 1
+#define SIP_FROM_PART_NUM 2
+#define SIP_TO_PART_ADDR 0
+#define SIP_TO_PART_TAG 1
+#define SIP_TO_PART_NUM 2
+#define SIP_CSEQ_PART_SEQ 0
+#define SIP_CSEQ_PART_METHOD 1
+#define SIP_CSEQ_PART_NUM 2
+
+#define SIP_HEADER_FIELD_POOL_DEFAULT_SIZE 256
+#define SIP_STREAM_BUFFER_DEFAULT_SIZE 4096
+#define SIP_LINE_END "\r\n"
+#define SIP_HEADER_END "\r\n\r\n"
+#define SIP_STATUS_CODE_LEN 3
+#define SIP_PER_PAYLOAD_MESSAGE_MAX 8
+
+enum sip_call_state {
+ SIP_CALL_STATE_OPENING,
+ SIP_CALL_STATE_WAITING, /* before seeing INVITE*/
+ SIP_CALL_STATE_CALLING, /* after seeing INVITE */
+ SIP_CALL_STATE_EARLY, /* provisional response (1xx status code) */
+ SIP_CALL_STATE_CONNECTING, /* after seeing 200/OK response */
+ SIP_CALL_STATE_CONFIRMED, /* after seeing ACK */
+ SIP_CALL_STATE_DISCONNECTING, /* after seeing BYE */
+ SIP_CALL_STATE_DISCONNECTED, /* call is disconnected */
+ SIP_CALL_STATE_CLOSING,
+};
+
+enum sip_identify_state {
+ SIP_IDENTIFY_STATE_UNKNOWN,
+ SIP_IDENTIFY_STATE_HALF_TRUE,
+ SIP_IDENTIFY_STATE_TRUE,
+ SIP_IDENTIFY_STATE_FALSE,
+};
+
+enum sip_message_type {
+ SIP_MESSAGE_TYPE_UNKNOWN,
+ SIP_MESSAGE_TYPE_REQUEST,
+ SIP_MESSAGE_TYPE_RESPONSE,
+ SIP_MESSAGE_TYPE_MAX,
+};
+
+enum sip_dir {
+ SIP_DIR_UNKNOWN,
+ SIP_DIR_REQUEST,
+ SIP_DIR_RESPONSE,
+ SIP_DIR_DOUBLE,
+};
+
+enum sip_version {
+ SIP_VERSION_UNKNOWN,
+ SIP_VERSION_1_0,
+ SIP_VERSION_1_1,
+ SIP_VERSION_2_0,
+
+ SIP_VERSION_NUM,
+};
+
+struct stream_buffer {
+ char *buf;
+ size_t buf_size;
+ size_t buf_off;
+};
+
+struct sip_header_field_pool {
+ struct sip_header_field *arr;
+ unsigned int used;
+ unsigned int size;
+} __attribute__((aligned(64)));
+
+struct sip_transaction {
+ char *key; // use sip Call-ID string for uthash key
+ size_t key_len;
+ int seq;
+ enum sip_dir dir;
+ enum sip_call_state call_state;
+ long long call_duration_ms;
+ struct timeval last_update;
+ UT_hash_handle hh;
+};
+
+struct sip_message {
+ enum sip_message_type type;
+ enum sip_call_state call_state;
+ int transaction_seq;
+
+ union {
+ struct sip_request_line request_line;
+ struct sip_status_line status_line;
+ };
+ struct sip_header header;
+ struct sip_body body;
+
+ struct sip_header_field_pool *header_field_pool;
+ struct session *sess_ref;
+};
+
+struct sip_exdata {
+ struct sip_decoder *decoder_ref;
+ struct session *sess_ref;
+
+ int sess_ignored;
+
+ enum sip_identify_state identify_state;
+ int identify_times;
+
+ struct sip_transaction *transaction_table; // sip transaction uthash head (key: call_id)
+ int transaction_seq;
+ enum sip_dir dir; // request only/response only/ double
+ enum sip_call_state call_state; // sip call state
+
+ struct stream_buffer streambuffer; // temporary buffer for incomplete sip message
+};
+
+struct sip_decoder {
+ int request_message_topic_id;
+ int response_message_topic_id;
+ int exdata_id;
+
+ int transaction_limit_count;
+ int transaction_timeout_ms;
+
+ int thread_num;
+ struct sip_header_field_pool **field_pools;// per thread field pool
+ struct module_manager *mod_mgr;
+};
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/decoders/sip/version.map b/decoders/sip/version.map
new file mode 100644
index 0000000..afa02ed
--- /dev/null
+++ b/decoders/sip/version.map
@@ -0,0 +1,13 @@
+VERS_2.4{
+global:
+extern "C" {
+ sip_init;
+ sip_exit;
+ sip_subscribe;
+ sip_get_call_duration_ms;
+ module_to_sip_decoder;
+ GIT_VERSION_*;
+};
+
+local: *;
+};