summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhaoyijun <[email protected]>2024-11-20 17:58:13 +0800
committerzhaoyijun <[email protected]>2024-11-20 17:58:13 +0800
commit779c148a2fd12558c20e97973105e29895652da1 (patch)
treee52065d78c4344c244e0e9b6046ca479be93ac09
parent7ecbffbccc8a91a084bca69cbd68246cff25c2d5 (diff)
add multithreaded logic, add package format checking scriptHEADmain
-rw-r--r--src/nat_format.cpp109
-rw-r--r--tools/binary_filed_extraction.py102
2 files changed, 173 insertions, 38 deletions
diff --git a/src/nat_format.cpp b/src/nat_format.cpp
index 72f99a5..c67716d 100644
--- a/src/nat_format.cpp
+++ b/src/nat_format.cpp
@@ -21,7 +21,7 @@
#define HW_EVENT_ADD "SESSION_BUILT"
#define HW_EVENT_DEL "SESSION_TEARDOWN"
-#define HW_EVENT_NEW 0x01
+#define HW_EVENT_NEW 0x00
#define HW_EVENT_AGED 0x02
#define HW_EVENT_PERIOD 0x03
@@ -42,10 +42,13 @@
struct nat_format_global_info g_nat_format_info;
-char *multicast_payload;
-int cur_pkt = 0;
+// 组播报文缓冲区和socket都是每个线程一个
+int platform_thread_num = get_thread_count();
-int udp_socket;
+char **multicast_payloads;
+int *cur_pkts;
+
+int *udp_sockets;
struct sockaddr_in dst_addr = {0};
// 函数:子串匹配,返回子串结尾+1的位置,即值的起始位置
@@ -138,22 +141,22 @@ int extract_action_hs(char *data, int data_len, const char *key, char *dst) {
}
// 组播报文发送
-int send_multicast() {
+int send_multicast(int thread_seq) {
// 发之前统一填充发送时间
time_t now;
time(&now);
unsigned int now_timestamp = (int)now;
for (int i = 0; i < g_nat_format_info.batch_size; i++) {
- memcpy(multicast_payload + i*PAYLOAD_LEN+SEND_TIME_OFFSET, &now_timestamp, 4);
+ memcpy(multicast_payloads[thread_seq] + i*PAYLOAD_LEN+SEND_TIME_OFFSET, &now_timestamp, 4);
}
// 进行发送
- if (sendto(udp_socket, multicast_payload, PAYLOAD_LEN*g_nat_format_info.batch_size, 0, (struct sockaddr *)&dst_addr, sizeof(dst_addr)) < 0) {
+ if (sendto(udp_sockets[thread_seq], multicast_payloads[thread_seq], PAYLOAD_LEN*g_nat_format_info.batch_size, 0, (struct sockaddr *)&dst_addr, sizeof(dst_addr)) < 0) {
MESA_handle_runtime_log(g_nat_format_info.log, RLOG_LV_INFO, "nat_format", "Send multicast failed: %s", strerror(errno));
- cur_pkt--;
+ cur_pkts[thread_seq]--;
return -1;
}
- cur_pkt = 0;
+ cur_pkts[thread_seq] = 0;
return 0;
}
@@ -185,31 +188,54 @@ int nat_format_init(void) {
}
// 分配并初始化组播报文存储空间,长度为 batch_size*46Bytes
- multicast_payload = (char *)malloc(PAYLOAD_LEN * g_nat_format_info.batch_size);
- memset(multicast_payload, 0, PAYLOAD_LEN * g_nat_format_info.batch_size);
-
- // 创建socket用于发包
- udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
- if (udp_socket == -1) {
- printf("UDP multicast socket creation failed:%d, %s\n", errno, strerror(errno));
+ multicast_payloads = (char **)malloc(platform_thread_num * sizeof(char *));
+ if (multicast_payloads == NULL) {
+ printf("Memory of multicast_payloads allocation failure.\n");
return -1;
}
- // 绑定组播地址
+ for (int i = 0; i < platform_thread_num; i++) {
+ multicast_payloads[i] = (char *)malloc(PAYLOAD_LEN * g_nat_format_info.batch_size);
+ if (multicast_payloads[i] == NULL) {
+ for (int j = 0; j < i; j++) {
+ free(multicast_payloads[j]);
+ }
+ free(multicast_payloads);
+ printf("Memory of multicast_payload for thread %d allocation failure.\n", i);
+ return -1;
+ }
+ memset(multicast_payloads[i], 0, PAYLOAD_LEN * g_nat_format_info.batch_size);
+ }
+ cur_pkts = (int *)malloc(platform_thread_num * sizeof(int));
+ memset(cur_pkts, 0, platform_thread_num * sizeof(int));
+
+ // 组播地址
dst_addr.sin_family = AF_INET;
dst_addr.sin_addr.s_addr = inet_addr(g_nat_format_info.multicast_ip);
dst_addr.sin_port = htons(g_nat_format_info.multicast_port);
struct in_addr multicast_addr;
inet_pton(AF_INET, g_nat_format_info.multicast_ip, &multicast_addr.s_addr);
- setsockopt(udp_socket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_addr, sizeof(multicast_addr));
- // 绑定发包端口
+ // 发包端口
struct sockaddr_in src_addr;
src_addr.sin_family = AF_INET;
src_addr.sin_port = htons(g_nat_format_info.host_port);
src_addr.sin_addr.s_addr = inet_addr(g_nat_format_info.host_ip);
- if (bind(udp_socket, (struct sockaddr*)&src_addr, sizeof(src_addr)) < 0) {
- printf("socket bind failed\n");
- close(udp_socket);
- return -1;
+
+ // 创建socket用于发包
+ udp_sockets = (int *)malloc(platform_thread_num * sizeof(int));
+ for (int i = 0; i < platform_thread_num; i++) {
+ udp_sockets[i] = socket(AF_INET, SOCK_DGRAM, 0);
+ if (udp_sockets[i] == -1) {
+ free(udp_sockets);
+ printf("UDP multicast socket for thread %d creation failed:%d, %s\n", i, errno, strerror(errno));
+ return -1;
+ }
+ setsockopt(udp_sockets[i], IPPROTO_IP, IP_MULTICAST_IF, &multicast_addr, sizeof(multicast_addr));
+ if (bind(udp_sockets[i], (struct sockaddr*)&src_addr, sizeof(src_addr)) < 0) {
+ free(udp_sockets);
+ printf("socket bind for thread %d failed:%d, %s\n", i, errno, strerror(errno));
+ close(udp_sockets[i]);
+ return -1;
+ }
}
return 0;
@@ -217,7 +243,14 @@ int nat_format_init(void) {
// 卸载函数
void nat_format_destroy(void) {
- free(multicast_payload);
+ for (int j = 0; j < platform_thread_num; j++) {
+ free(multicast_payloads[j]);
+ }
+ free(multicast_payloads);
+
+ free(cur_pkts);
+
+ free(udp_sockets);
}
// 入口函数
@@ -328,12 +361,12 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void
}
// 将提取出来的信息写进组播载荷
- memcpy(multicast_payload + cur_pkt*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
- cur_pkt++;
+ memcpy(multicast_payloads[thread_seq] + cur_pkts[thread_seq]*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
+ cur_pkts[thread_seq]++;
// 攒够20个进行发送
- if (cur_pkt == g_nat_format_info.batch_size) {
- send_multicast();
+ if (cur_pkts[thread_seq] == g_nat_format_info.batch_size) {
+ send_multicast(thread_seq);
}
}
// 华三syslog
@@ -409,12 +442,12 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void
}
// 将提取出来的信息写进组播载荷
- memcpy(multicast_payload + cur_pkt*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
- cur_pkt++;
+ memcpy(multicast_payloads[thread_seq] + cur_pkts[thread_seq]*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
+ cur_pkts[thread_seq]++;
// 攒够20个进行发送
- if (cur_pkt == g_nat_format_info.batch_size) {
- send_multicast();
+ if (cur_pkts[thread_seq] == g_nat_format_info.batch_size) {
+ send_multicast(thread_seq);
}
}
// 迪普syslog TODO
@@ -447,7 +480,7 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void
MESA_handle_runtime_log(g_nat_format_info.log, RLOG_LV_INFO, "nat_format Huawei Binary", "Unknown Version %d", hb_head->Version);
return APP_STATE_GIVEME;
}
- u_int16_t log_num = hb_head->Count;
+ u_int16_t log_num = ntohs(hb_head->Count);
// 提取防火墙日志生成时间
nat_payload.fw_log_timestamp = hb_head->Second;
// 分别处理每一个body
@@ -486,12 +519,12 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void
}
// 将提取出来的信息写进组播载荷
- memcpy(multicast_payload + cur_pkt*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
- cur_pkt++;
+ memcpy(multicast_payloads[thread_seq] + cur_pkts[thread_seq]*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN);
+ cur_pkts[thread_seq]++;
// 攒够20个进行发送
- if (cur_pkt == g_nat_format_info.batch_size) {
- send_multicast();
+ if (cur_pkts[thread_seq] == g_nat_format_info.batch_size) {
+ send_multicast(thread_seq);
}
// 定位下一个body的offset -- 03版本28,08版本的IPV4、NOPAT为44,USER为209,URL、TLV每个不同,由附加长度确定
@@ -500,7 +533,7 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void
offset += body_len;
} else {
char *tmp = (char *)hb_body;
- u_int16_t AppendLength = *(u_int16_t *)(tmp + sizeof(struct hw_binary_log_body) + HW_BINARY_BODY_V8_TLV_OFFSET);
+ u_int16_t AppendLength = ntohs(*(u_int16_t *)(tmp + sizeof(struct hw_binary_log_body) + HW_BINARY_BODY_V8_TLV_OFFSET));
offset += (HW_BINARY_BODY_LENGTH_V8_IPV4 + AppendLength);
}
}
diff --git a/tools/binary_filed_extraction.py b/tools/binary_filed_extraction.py
new file mode 100644
index 0000000..31573ad
--- /dev/null
+++ b/tools/binary_filed_extraction.py
@@ -0,0 +1,102 @@
+from scapy.all import rdpcap, UDP
+
+def parser_hw_binary_detail(s):
+ len_list = [i * 2 for i in [1, 1, 2, 4, 4, 2, 1, 1]]
+ head_s = s[0:sum(len_list)]
+ head = []
+ start = 0
+ for l in len_list:
+ head.append(head_s[start:start + l])
+ start += l
+ [Version, LogType, Count, Second, FlowSequence, DeviceId, Slot, Reserved] = head
+ print(head)
+
+ body_start = sum(len_list)
+ len_list = [i * 2 for i in [1, 1, 1, 1, 4, 4, 4, 4, 2, 2, 2, 2, 4, 4, 4, 4, 4, 4, 2, 2, 1, 1, 2, 4]]
+ bodys = []
+ for i in range(int(Count, 16)):
+ body_s = s[body_start:body_start+sum(len_list)]
+ body = []
+ start = 0
+ for l in len_list:
+ body.append(body_s[start:start + l])
+ start += l
+ [Prot, Operator, IpVersion, TosIPv4, SourceIP, SrcNatIP, DestIP, DestNatIP, SrcPort, SrcNatPort, DestPort, DestNatPort, StartTime, EndTime, InTotalPkg, InTotalByte, OutTotalPkg, OutTotalByte, SourVpnIndex, DestVpnIndex, Reserved1, EventTrend, Reserved2, Reserved3] = body
+ print(body)
+
+def parser_hw_binary(s):
+ sum_ = 0
+ counts = [0, 0, 0, 0, 0]
+
+ len_list = [i * 2 for i in [1, 1, 2, 4, 4, 2, 1, 1]]
+ head_s = s[0:sum(len_list)]
+ Count = head_s[4:8]
+ sum_ += int(Count, 16)
+
+ body_start = sum(len_list)
+ len_list = [i * 2 for i in [1, 1, 1, 1, 4, 4, 4, 4, 2, 2, 2, 2, 4, 4, 4, 4, 4, 4, 2, 2, 1, 1, 2, 4]]
+ for i in range(int(Count, 16)):
+ body_s = s[body_start:body_start+sum(len_list)]
+ EventTrend = min(int(body_s[114:116]), 4)
+ counts[EventTrend] += 1
+ body_start += sum(len_list)
+ return (sum_, counts)
+
+def parser_nat(s):
+ counts = [0, 0, 0]
+ for i in range(20):
+ body_s = s[i*46*2:(i+1)*46*2]
+ EventTrend = body_s[36:38]
+ EventTrend = min(int(body_s[36:38]), 2)
+ counts[EventTrend] += 1
+ return counts
+
+def read_and_filter_pcap_hw(pcap_file, protocol="UDP"):
+ nat_num = 0
+ count_nums = [0, 0, 0, 0, 0]
+
+ packets = rdpcap(pcap_file)
+ for pkt in packets:
+ if protocol in pkt:
+ # print(packet.show()) # 显示每个符合条件的数据包详情
+ udp_payload = bytes(pkt[UDP].payload)
+ hex_payload = ''.join(f'{byte:02x}' for byte in udp_payload)
+ (sum_, counts) = parser_hw_binary(hex_payload)
+ nat_num += sum_
+ for i in range(5):
+ count_nums[i] += counts[i]
+
+ print(f"Total {len(packets)} {protocol} packets in hw_binary found.")
+ print(f"unknown(00) log:\t{count_nums[0]}/{nat_num}({count_nums[0]/nat_num:.2f}%)")
+ print(f"build(01) log:\t{count_nums[1]}/{nat_num}({count_nums[1]/nat_num:.2f}%)")
+ print(f"aged(02) log:\t{count_nums[2]}/{nat_num}({count_nums[2]/nat_num:.2f}%)")
+ print(f"period(03) log:\t{count_nums[3]}/{nat_num}({count_nums[3]/nat_num:.2f}%)")
+ print(f"unknown(>03) log:\t{count_nums[4]}/{nat_num}({count_nums[4]/nat_num:.2f}%)")
+
+def read_and_filter_pcap_nat(pcap_file, protocol="UDP"):
+ nat_num = 0
+ count_nums = [0, 0, 0]
+
+ packets = rdpcap(pcap_file)
+ for pkt in packets:
+ if protocol in pkt:
+ # print(packet.show()) # 显示每个符合条件的数据包详情
+ udp_payload = bytes(pkt[UDP].payload)
+ hex_payload = ''.join(f'{byte:02x}' for byte in udp_payload)
+ counts = parser_nat(hex_payload)
+ for i in range(3):
+ count_nums[i] += counts[i]
+
+ print(f"Total {len(packets)} {protocol} packets in nat_format found.")
+ nat_num = len(packets)*20
+ print(f"add(00) log:\t{count_nums[0]}/{nat_num}({count_nums[0]/nat_num:.2f}%)")
+ print(f"del(01) log:\t{count_nums[1]}/{nat_num}({count_nums[1]/nat_num:.2f}%)")
+ print(f"unknown(>01) log:\t{count_nums[2]}/{nat_num}({count_nums[2]/nat_num:.2f}%)")
+
+
+if __name__ == "__main__":
+ pcap_file_path = "D://MESA//搞点项目//NAT//nat-pcap//hw_binary_1021.pcap"
+ read_and_filter_pcap_hw(pcap_file_path)
+ pcap_file_path = "D://MESA//搞点项目//NAT//nat-pcap//nat_1021.pcap"
+ read_and_filter_pcap_nat(pcap_file_path)
+ # print('ok!') \ No newline at end of file