summaryrefslogtreecommitdiff
path: root/src/nat_format.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/nat_format.cpp')
-rw-r--r--src/nat_format.cpp109
1 files changed, 71 insertions, 38 deletions
diff --git a/src/nat_format.cpp b/src/nat_format.cpp
index 72f99a5..c67716d 100644
--- a/src/nat_format.cpp
+++ b/src/nat_format.cpp
@@ -21,7 +21,7 @@
#define HW_EVENT_ADD "SESSION_BUILT"
#define HW_EVENT_DEL "SESSION_TEARDOWN"
-#define HW_EVENT_NEW 0x01
+#define HW_EVENT_NEW 0x00
#define HW_EVENT_AGED 0x02
#define HW_EVENT_PERIOD 0x03
@@ -42,10 +42,13 @@
struct nat_format_global_info g_nat_format_info;
-char *multicast_payload;
-int cur_pkt = 0;
+// 组播报文缓冲区和socket都是每个线程一个
+int platform_thread_num = get_thread_count();
-int udp_socket;
+char **multicast_payloads;
+int *cur_pkts;
+
+int *udp_sockets;
struct sockaddr_in dst_addr = {0};
// 函数:子串匹配,返回子串结尾+1的位置,即值的起始位置
@@ -138,22 +141,22 @@ int extract_action_hs(char *data, int data_len, const char *key, char *dst) {
}
// 组播报文发送
-int send_multicast() {
+int send_multicast(int thread_seq) {
// 发之前统一填充发送时间
time_t now;
time(&now);
unsigned int now_timestamp = (int)now;
for (int i = 0; i < g_nat_format_info.batch_size; i++) {
- memcpy(multicast_payload + i*PAYLOAD_LEN+SEND_TIME_OFFSET, &now_timestamp, 4);
+ memcpy(multicast_payloads[thread_seq] + i*PAYLOAD_LEN+SEND_TIME_OFFSET, &now_timestamp, 4);
}
// 进行发送
- if (sendto(udp_socket, multicast_payload, PAYLOAD_LEN*g_nat_format_info.batch_size, 0, (struct sockaddr *)&dst_addr, sizeof(dst_addr)) < 0) {
+ if (sendto(udp_sockets[thread_seq], multicast_payloads[thread_seq], PAYLOAD_LEN*g_nat_format_info.batch_size, 0, (struct sockaddr *)&dst_addr, sizeof(dst_addr)) < 0) {
MESA_handle_runtime_log(g_nat_format_info.log, RLOG_LV_INFO, "nat_format", "Send multicast failed: %s", strerror(errno));
- cur_pkt--;
+ cur_pkts[thread_seq]--;
return -1;
}
- cur_pkt = 0;
+ cur_pkts[thread_seq] = 0;
return 0;
}
@@ -185,31 +188,54 @@ int nat_format_init(void) {
}
// 分配并初始化组播报文存储空间,长度为 batch_size*46Bytes
- multicast_payload = (char *)malloc(PAYLOAD_LEN * g_nat_format_info.batch_size);
- memset(multicast_payload, 0, PAYLOAD_LEN * g_nat_format_info.batch_size);
-
- // 创建socket用于发包
- udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
- if (udp_socket == -1) {
- printf("UDP multicast socket creation failed:%d, %s\n", errno, strerror(errno));
+ multicast_payloads = (char **)malloc(platform_thread_num * sizeof(char *));
+ if (multicast_payloads == NULL) {
+ printf("Memory of multicast_payloads allocation failure.\n");
return -1;
}
- // 绑定组播地址
+ for (int i = 0; i < platform_thread_num; i++) {
+ multicast_payloads[i] = (char *)malloc(PAYLOAD_LEN * g_nat_format_info.batch_size);
+ if (multicast_payloads[i] == NULL) {
+ for (int j = 0; j < i; j++) {
+ free(multicast_payloads[j]);
+ }
+ free(multicast_payloads);
+ printf("Memory of multicast_payload for thread %d allocation failure.\n", i);
+ return -1;
+ }
+ memset(multicast_payloads[i], 0, PAYLOAD_LEN * g_nat_format_info.batch_size);
+ }
+ cur_pkts = (int *)malloc(platform_thread_num * sizeof(int));
+ memset(cur_pkts, 0, platform_thread_num * sizeof(int));
+
+ // 组播地址
dst_addr.sin_family = AF_INET;
dst_addr.sin_addr.s_addr = inet_addr(g_nat_format_info.multicast_ip);
dst_addr.sin_port = htons(g_nat_format_info.multicast_port);
struct in_addr multicast_addr;
inet_pton(AF_INET, g_nat_format_info.multicast_ip, &multicast_addr.s_addr);
- setsockopt(udp_socket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_addr, sizeof(multicast_addr));
- // 绑定发包端口
+ // 发包端口
struct sockaddr_in src_addr;
src_addr.sin_family = AF_INET;
src_addr.sin_port = htons(g_nat_format_info.host_port);
src_addr.sin_addr.s_addr = inet_addr(g_nat_format_info.host_ip);
- if (bind(udp_socket, (struct sockaddr*)&src_addr, sizeof(src_addr)) < 0) {
- printf("socket bind failed\n");
- close(udp_socket);
- return -1;
+
+ // 创建socket用于发包
+ udp_sockets = (int *)malloc(platform_thread_num * sizeof(int));
+ for (int i = 0; i < platform_thread_num; i++) {
+ udp_sockets[i] = socket(AF_INET, SOCK_DGRAM, 0);
+ if (udp_sockets[i] == -1) {
+ free(udp_sockets);
+ printf("UDP multicast socket for thread %d creation failed:%d, %s\n", i, errno, strerror(errno));
+ return -1;
+ }
+ setsockopt(udp_sockets[i], IPPROTO_IP, IP_MULTICAST_IF, &multicast_addr, sizeof(multicast_addr));
+ if (bind(udp_sockets[i], (struct sockaddr*)&src_addr, sizeof(src_addr)) < 0) {
+ free(udp_sockets);
+ printf("socket bind for thread %d failed:%d, %s\n", i, errno, strerror(errno));
+ close(udp_sockets[i]);
+ return -1;
+ }
}
return 0;
@@ -217,7 +243,14 @@ int nat_format_init(void) {
// 卸载函数
void nat_format_destroy(void) {
- free(multicast_payload);
+ for (int j = 0; j < platform_thread_num; j++) {
+ free(multicast_payloads[j]);
+ }
+ free(multicast_payloads);
+
+ free(cur_pkts);
+
+ free(udp_sockets);
}
// 入口函数
@@ -328,12 +361,12 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void
}
// 将提取出来的信息写进组播载荷
- memcpy(multicast_payload + cur_pkt*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
- cur_pkt++;
+ memcpy(multicast_payloads[thread_seq] + cur_pkts[thread_seq]*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
+ cur_pkts[thread_seq]++;
// 攒够20个进行发送
- if (cur_pkt == g_nat_format_info.batch_size) {
- send_multicast();
+ if (cur_pkts[thread_seq] == g_nat_format_info.batch_size) {
+ send_multicast(thread_seq);
}
}
// 华三syslog
@@ -409,12 +442,12 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void
}
// 将提取出来的信息写进组播载荷
- memcpy(multicast_payload + cur_pkt*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
- cur_pkt++;
+ memcpy(multicast_payloads[thread_seq] + cur_pkts[thread_seq]*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
+ cur_pkts[thread_seq]++;
// 攒够20个进行发送
- if (cur_pkt == g_nat_format_info.batch_size) {
- send_multicast();
+ if (cur_pkts[thread_seq] == g_nat_format_info.batch_size) {
+ send_multicast(thread_seq);
}
}
// 迪普syslog TODO
@@ -447,7 +480,7 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void
MESA_handle_runtime_log(g_nat_format_info.log, RLOG_LV_INFO, "nat_format Huawei Binary", "Unknown Version %d", hb_head->Version);
return APP_STATE_GIVEME;
}
- u_int16_t log_num = hb_head->Count;
+ u_int16_t log_num = ntohs(hb_head->Count);
// 提取防火墙日志生成时间
nat_payload.fw_log_timestamp = hb_head->Second;
// 分别处理每一个body
@@ -486,12 +519,12 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void
}
// 将提取出来的信息写进组播载荷
- memcpy(multicast_payload + cur_pkt*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
- cur_pkt++;
+ memcpy(multicast_payloads[thread_seq] + cur_pkts[thread_seq]*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
+ cur_pkts[thread_seq]++;
// 攒够20个进行发送
- if (cur_pkt == g_nat_format_info.batch_size) {
- send_multicast();
+ if (cur_pkts[thread_seq] == g_nat_format_info.batch_size) {
+ send_multicast(thread_seq);
}
// 定位下一个body的offset -- 03版本28,08版本的IPV4、NOPAT为44,USER为209,URL、TLV每个不同,由附加长度确定
@@ -500,7 +533,7 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void
offset += body_len;
} else {
char *tmp = (char *)hb_body;
- u_int16_t AppendLength = *(u_int16_t *)(tmp + sizeof(struct hw_binary_log_body) + HW_BINARY_BODY_V8_TLV_OFFSET);
+ u_int16_t AppendLength = ntohs(*(u_int16_t *)(tmp + sizeof(struct hw_binary_log_body) + HW_BINARY_BODY_V8_TLV_OFFSET));
offset += (HW_BINARY_BODY_LENGTH_V8_IPV4 + AppendLength);
}
}