diff options
| author | 何勇 <[email protected]> | 2024-08-21 06:43:10 +0000 |
|---|---|---|
| committer | 何勇 <[email protected]> | 2024-08-21 06:43:10 +0000 |
| commit | 18409b02c61f4ddbf965dde3b97734b3c69c5fd1 (patch) | |
| tree | eff9a62eab15a5891090bd6588bb9caad532d99f | |
| parent | 7242931925e90f498c8a9ecdc6be1ca09b7aab10 (diff) | |
上传新文件
| -rw-r--r-- | PcapNGFormatAnalys.py | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/PcapNGFormatAnalys.py b/PcapNGFormatAnalys.py new file mode 100644 index 0000000..21ee58b --- /dev/null +++ b/PcapNGFormatAnalys.py @@ -0,0 +1,278 @@ +#!/usr/bin/evn python
+# -*- coding:utf-8 -*-
+
+import binascii
+import sys
+import os
+import subprocess
+import json
+import struct
+
+class SectionHeaderBlock_a: # SHB
+
+ def __init__(self, data):
+
+ self._Block_Type = data[0:4]
+ self._Block_Length = data[4:8]
+ self._Byte_Order_Magic = data[8:12]
+ self._Major_Version = data[12:14]
+ self._Minor_Version = data[14:16]
+ self._Section_Length = data[16:24]
+ self._decimal_number = int.from_bytes(self._Block_Length, byteorder='little')
+
+ def display_content(self):
+ print(binascii.b2a_hex(self._Block_Type))
+ print(binascii.b2a_hex(self._Block_Length))
+ print(binascii.b2a_hex(self._Byte_Order_Magic))
+ print(binascii.b2a_hex(self._Major_Version))
+ print(binascii.b2a_hex(self._Minor_Version))
+ print(binascii.b2a_hex(self._Section_Length))
+ print(self._decimal_number)
+
+class SectionHeaderBlock_b: # SHB
+ def __init__(self, data, Block_Length):
+ self._Options = data[0:Block_Length - 24 - 4 ]
+ self._Block_Length_redundant = data[Block_Length - 24 - 4 : Block_Length - 24]
+
+ def display_content(self):
+ print(binascii.b2a_hex(self._Options))
+ print(binascii.b2a_hex(self._Block_Length_redundant))
+
+class InterfaceDescriptionBlock: # IDB
+ def __init__(self, data):
+ self._Block_Type = data[0:4]
+ self._Block_Length = data[4:8]
+ self._decimal_number = int.from_bytes(self._Block_Length, byteorder='little')
+
+ def display_content(self):
+ print(binascii.b2a_hex(self._Block_Type))
+ print(binascii.b2a_hex(self._Block_Length))
+ print(self._decimal_number)
+
+class PacketBlocks_a: # PB
+ def __init__(self, data):
+ self._Block_Type = data[0:4]
+ self._Block_Length = data[4:8]
+ self._Block_Length_int = int.from_bytes(self._Block_Length, byteorder='little')
+ self._Interface_ID = data[8:12]
+ self._Timestamp_Upper = data[12:16]
+ self._Timestamp_Lower = data[16:20]
+ self._Captured_Packet_Length = data[20:24]
+ self._Captured_Packet_Length_int = int.from_bytes(self._Captured_Packet_Length, byteorder='little')
+ self._Original_Packet_Length = data[24:28]
+ self._Original_Packet_Length_int = int.from_bytes(self._Original_Packet_Length, byteorder='little')
+
+ def display_content(self):
+ print('Block_Type', binascii.b2a_hex(self._Block_Type))
+ print('Block_Length', binascii.b2a_hex(self._Block_Length))
+ print('Block_Length_int', self._Block_Length_int)
+ print('Interface_ID', binascii.b2a_hex(self._Interface_ID))
+ print('Timestamp_Upper', binascii.b2a_hex(self._Timestamp_Upper))
+ print('Timestamp_Lower', binascii.b2a_hex(self._Timestamp_Lower))
+ print('Captured_Packet_Length', binascii.b2a_hex(self._Captured_Packet_Length))
+ print('Captured_Packet_Length_int', self._Captured_Packet_Length_int)
+ print('Original_Packet_Length', binascii.b2a_hex(self._Original_Packet_Length))
+ print('Original_Packet_Length_int', self._Original_Packet_Length_int)
+
+class PacketBlocks_b: # PB
+ def __init__(self, data, Block_Length, Packet_Length):
+ self._Packet_Data= data[0:Packet_Length]
+ self._Options = data[Packet_Length:Block_Length - 32]
+ self._Block_Total_Length_redundant = data[Block_Length - 32: Block_Length - 28]
+
+ def display_content(self):
+ print('Packet_Data', binascii.b2a_hex(self._Packet_Data))
+ print('Options', binascii.b2a_hex(self._Options))
+ print('Block_Total_Length_redundant', binascii.b2a_hex(self._Block_Total_Length_redundant))
+
+class PB_Options:
+ def __init__(self, data):
+ self._Option_Code= data[0:2]
+ self._Option_Length = data[2:4]
+ self._Option_Length_int = int.from_bytes(self._Option_Length, byteorder='little')
+ self._Option_Value = data[4:self._Option_Length_int + 4]
+ self._End_of_Options = data[self._Option_Length_int + 4 : ]
+
+ def display_content(self):
+ print(binascii.b2a_hex(self._Option_Code))
+ print(binascii.b2a_hex(self._Option_Length))
+ print(self._Option_Length_int)
+ print(binascii.b2a_hex(self._Option_Value))
+ print(binascii.b2a_hex(self._End_of_Options))
+
+def ExtractPacketsCommentStr(input_file_path, appsketch_api, pcap_file_id):
+ # Use tshark to extract the five_tuple info
+ tshark_cmd = ['tshark', '-r', input_file_path, '-T', 'fields', '-e', 'frame.number', '-e', 'tcp.stream', '-e', 'udp.stream']
+ ret_str = subprocess.run(tshark_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
+ decoded_result = ret_str.stdout
+ list_1 = decoded_result.splitlines()
+ list_output = []
+ for i in range(0, len(list_1)) :
+ list_2 = list_1[i].split("\t")
+ list_output.append(list_2)
+
+ comment_str_list = []
+ for i in range(len(list_output)):
+ # frame_id = list_output[i][0]
+ session_dict = {}
+ if list_output[i][1] != '':
+ proto = 'tcp'
+ stream_id = list_output[i][1]
+ elif list_output[i][1] == '' and list_output[i][2] != '':
+ proto = 'udp'
+ stream_id = list_output[i][2]
+ else:
+ proto = 'other'
+ stream_id = ''
+ session_dict['session'] = appsketch_api + 'session/' + pcap_file_id + '/' + proto + '/' + stream_id
+ comment_str_list.append(session_dict)
+
+ return comment_str_list
+
+def TransferPcapNG(input_file_path):
+ with open(input_file_path, 'rb') as f:
+ first_four_bytes = f.read(4)
+ if first_four_bytes == bytes.fromhex('0a0d0d0a') or first_four_bytes == bytes.fromhex('0A0D0D0A'):
+ return input_file_path
+ else:
+ # Get the file name (including extension)
+ file_name_with_extension = os.path.basename(input_file_path)
+ # Remove the extension
+ transfer_file_path = os.path.splitext(file_name_with_extension)[0] + '.pcapng'
+ # Get the directory path without the file name
+ directory_path = os.path.dirname(input_file_path)
+
+ tshark_cmd = ['tshark', '-r', input_file_path, '-w', directory_path + '/' + transfer_file_path]
+ subprocess.run(tshark_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
+ input_file_path = './output/Transfer.pcapng'
+ return directory_path + '/' + transfer_file_path
+
+if __name__=="__main__":
+
+ input_file_path = TransferPcapNG(sys.argv[1])
+ output_file_path = sys.argv[2]
+ appsketch_api = 'https://sg.appsketch.gdnt-cloud.com/'
+
+ # Get the file name (including extension)
+ file_name_with_extension = os.path.basename(input_file_path)
+ # Remove the extension
+ pcap_file_id = os.path.splitext(file_name_with_extension)[0]
+
+ comment_str_list = ExtractPacketsCommentStr(input_file_path, appsketch_api, pcap_file_id)
+ # for i, pkt in enumerate(comment_str_list):
+ # print(comment_str_list[i])
+
+ binfile_r = open(input_file_path, 'rb')
+ binfile_w = open(input_file_path, 'rb')
+ binfile_output = open(output_file_path, 'wb')
+
+ size = os.path.getsize(input_file_path)
+
+ # Read Pcapng SHB
+ section_header_block_a = SectionHeaderBlock_a(binfile_r.read(24))
+ # section_header_block_a.display_content()
+ SHB_length = section_header_block_a._decimal_number
+ # Remaining bytes read
+ binfile_r.read(SHB_length - 24)
+
+ # Read Pcapng IDB
+ interface_description_block = InterfaceDescriptionBlock(binfile_r.read(8))
+ # interface_description_block.display_content()
+ IDB_length = interface_description_block._decimal_number
+ # Remaining bytes read
+ binfile_r.read(IDB_length - 8 )
+
+ # 4Byte alignment
+ # Get the number of bytes currently read
+ current_position = binfile_r.tell()
+ # Calculate the next 4-byte aligned position
+ aligned_position = (current_position + 3) & ~3
+
+ align_null_bytes_SHB = bytes.fromhex('')
+ for i in range(aligned_position - current_position):
+ align_null_bytes_SHB += bytes.fromhex('00')
+
+ write_data = binfile_w.read(current_position)
+ binfile_output.write(write_data)
+ binfile_output.write(align_null_bytes_SHB)
+ binfile_w.close()
+
+ # Packets Traversal Add Option
+ for i in range(len(comment_str_list)):
+ # Read Pcapng Packet Blocks
+ packet_block_a = PacketBlocks_a(binfile_r.read(28))
+ # packet_block_a.display_content()
+
+ packet_block_data = binfile_r.read(packet_block_a._Captured_Packet_Length_int)
+ # print('Packet_Data', binascii.b2a_hex(packet_block_data))
+
+ # 4Byte alignment
+ # Get the number of bytes currently read
+ current_position = binfile_r.tell()
+ # Calculate the next 4-byte aligned position
+ aligned_position = (current_position + 3) & ~3
+ align_null_bytes_PB_a = bytes.fromhex('')
+ for j in range(aligned_position - current_position):
+ align_null_bytes_PB_a += bytes.fromhex('00')
+
+ # 使用 `json.dumps` 方法将 JSON 对象格式化为带缩进的字符串
+ remark_str = json.dumps(comment_str_list[i], indent=4)
+
+ for j in range(4 - len(remark_str.encode('utf-8')) % 4):
+ binary_str = '00'
+ char = chr(int(binary_str, 2))
+ remark_str += char
+
+ Block_Type = packet_block_a._Block_Type
+ # Modify Block_Length
+ # Block_Length = packet_block_a._Block_Length
+ Block_Length = struct.pack('<I', packet_block_a._Block_Length_int + 8 + len(remark_str))
+
+ Interface_ID = packet_block_a._Interface_ID
+ Timestamp_Upper = packet_block_a._Timestamp_Upper
+ Timestamp_Lower = packet_block_a._Timestamp_Lower
+ Captured_Packet_Length = packet_block_a._Captured_Packet_Length
+ Original_Packet_Length = packet_block_a._Original_Packet_Length
+
+ # Modify Option_Length
+ Option_Code = bytes.fromhex('0100')
+ Option_Length = struct.pack('<H', len(remark_str))
+ Option_Value = remark_str.encode('utf-8')
+ End_of_Options = bytes.fromhex('00000000')
+
+ current_position = 2 + 2 + len(remark_str) + 4
+ aligned_position = (current_position + 3) & ~3
+ align_null_bytes_PB_b = bytes.fromhex('')
+ for j in range(aligned_position - current_position):
+ align_null_bytes_PB_b += bytes.fromhex('00')
+
+ # Modify Block_Total_Length_Redundant
+ Block_Total_Length_Redundant = struct.pack('<I', packet_block_a._Block_Length_int + 8 + len(remark_str))
+
+ write_data = Block_Type + Block_Length + Interface_ID + Timestamp_Upper + Timestamp_Lower + Captured_Packet_Length + Original_Packet_Length + \
+ packet_block_data + align_null_bytes_PB_a + \
+ Option_Code + Option_Length + Option_Value + End_of_Options + align_null_bytes_PB_b + \
+ Block_Total_Length_Redundant
+
+ binfile_output.write(write_data)
+ binfile_r.read(4)
+
+ if binfile_r.tell() == size:
+ break
+
+ # Get the number of bytes currently read
+ current_position = binfile_r.tell()
+ # Calculate the next 4-byte aligned position
+ aligned_position = (current_position + 3) & ~3
+ # Move the file pointer to a 4-byte aligned position
+ binfile_r.seek(aligned_position)
+
+ try:
+ os.remove(input_file_path)
+ print(f"File '{input_file_path}' has been deleted successfully.")
+ except FileNotFoundError:
+ print(f"File '{input_file_path}' does not exist.")
+ except PermissionError:
+ print(f"Permission denied: '{input_file_path}'.")
+ except Exception as e:
+ print(f"An error occurred: {e}")
\ No newline at end of file |
