summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author何勇 <[email protected]>2024-07-03 12:48:08 +0000
committer何勇 <[email protected]>2024-07-03 12:48:08 +0000
commit87c448e2cd0b884591da21bc136f258914d42601 (patch)
tree089511284f649e7612c370d05db3f52ea7313c01
parent0444e7347f6ea6f2fb4a0cb8deac599fee006b45 (diff)
上传新文件HEADmain
-rw-r--r--SignatureExtract.py136
1 files changed, 136 insertions, 0 deletions
diff --git a/SignatureExtract.py b/SignatureExtract.py
new file mode 100644
index 0000000..23dbb7c
--- /dev/null
+++ b/SignatureExtract.py
@@ -0,0 +1,136 @@
+#!/usr/bin/evn python
+# -*- coding:utf-8 -*-
+
+import sys
+import os
+import json
+import GetSignature_2402
+import logging
+
+# Setting the log level
+logging.basicConfig(level=logging.INFO)
+# logging.basicConfig(level=logging.DEBUG)
+# logging.basicConfig(level=logging.WARNING)
+# logging.basicConfig(level=logging.ERROR)
+
+# Determine whether the selected file is a Pcap file
+def IsPcapFile(filename):
+ ret_str = os.popen("tshark -r " + filename + ' -Y "frame.number==1" -T fields -e frame.number').read()
+ if ret_str == '':
+ return False
+ return True
+
+# TCP data flow analysis
+def TcpDataFlowAnalysis(signature_object, stream_dict, tcp_frame_signature_list):
+ stream_dict["tcp.payload.c2s_first_data"] = signature_object.tcp_c2s_first_data(tcp_frame_signature_list)
+ stream_dict["tcp.payload.s2c_first_data"] = signature_object.tcp_s2c_first_data(tcp_frame_signature_list)
+ stream_dict["tcp.payload.c2s_first_data_len"] = signature_object.tcp_c2s_first_data_len(tcp_frame_signature_list)
+ stream_dict["tcp.payload.s2c_first_data_len"] = signature_object.tcp_s2c_first_data_len(tcp_frame_signature_list)
+ stream_dict["tcp.payload"] = signature_object.tcp_get_payload(tcp_frame_signature_list)
+ return
+
+# UDP data flow analysis
+def UdpDataFlowAnalysis(signature_object, stream_dict, udp_frame_signature_list):
+ stream_dict["udp.payload.c2s_first_data"] = signature_object.udp_c2s_first_data(udp_frame_signature_list)
+ stream_dict["udp.payload.s2c_first_data"] = signature_object.udp_s2c_first_data(udp_frame_signature_list)
+ stream_dict["udp.payload.c2s_first_data_len"] = signature_object.udp_c2s_first_data_len(udp_frame_signature_list)
+ stream_dict["udp.payload.s2c_first_data_len"] = signature_object.udp_s2c_first_data_len(udp_frame_signature_list)
+ stream_dict["udp.payload"] = signature_object.udp_get_payload(udp_frame_signature_list)
+ return
+
+# General data flow analysis (common, ip, dns, http, ssl)
+def GeneralDataFlowAnalysis(signature_object, stream_dict, frame_signature_list):
+ # common
+ stream_dict["common.server_fqdn"] = signature_object.ssl_extensions_server_name(frame_signature_list)
+ stream_dict["common.app_id"] = ['unknow']
+ if frame_signature_list[0]['ip.proto'] == '6' :
+ stream_dict["srcport"] = signature_object.tcp_srcport(frame_signature_list)
+ stream_dict["dstport"] = signature_object.tcp_dstport(frame_signature_list)
+ else:
+ stream_dict["srcport"] = signature_object.udp_srcport(frame_signature_list)
+ stream_dict["dstport"] = signature_object.udp_dstport(frame_signature_list)
+ # ip
+ stream_dict["ip.src"] = signature_object.ip_src(frame_signature_list)
+ stream_dict["ip.dst"] = signature_object.ip_dst(frame_signature_list)
+ stream_dict["ip.proto"] = signature_object.ip_proto(frame_signature_list)
+ stream_dict["heartbeat_flag"] = signature_object.heartbeat_flag(frame_signature_list)
+ # dns
+ stream_dict["dns.qry.name"] = signature_object.dns_qry_name(frame_signature_list)
+ # http
+ stream_dict["http.request.full_uri"] = signature_object.http_request_full_uri(frame_signature_list)
+ stream_dict["http.request.header"] = signature_object.http_request_header(frame_signature_list)
+ stream_dict["http.response.header"] = signature_object.http_response_header(frame_signature_list)
+ # ssl
+ stream_dict["ssl.handshake.certificate.algorithm_identifier"] = signature_object.ssl_algorithm_identifier(frame_signature_list)
+ stream_dict["ssl.handshake.certificate.serial_number"] = signature_object.ssl_serial_number(frame_signature_list)
+ stream_dict["ssl.handshake.certificate.issuer_common_name"] = signature_object.ssl_issuer_common_name(frame_signature_list)
+ stream_dict["ssl.handshake.certificate.issuer_organization_name"] = signature_object.ssl_issuer_organization_name(frame_signature_list)
+ stream_dict["ssl.handshake.certificate.issuer_country_name"] = signature_object.ssl_issuer_country_name(frame_signature_list)
+ stream_dict["ssl.handshake.certificate.subject_common_name"] = signature_object.ssl_subject_common_name(frame_signature_list)
+ stream_dict["ssl.handshake.certificate.subject_organization_name"] = signature_object.ssl_subject_organization_name(frame_signature_list)
+ stream_dict["ssl.handshake.certificate.subject_country_name"] = signature_object.ssl_subject_country_name(frame_signature_list)
+ stream_dict["ssl.handshake.certificate.not_valid_before"] = signature_object.ssl_not_valid_before(frame_signature_list)
+ stream_dict["ssl.handshake.certificate.not_valid_after"] = signature_object.ssl_not_valid_after(frame_signature_list)
+ stream_dict["ssl.handshake.certificate.algorithm_id"] = signature_object.ssl_algorithm_id(frame_signature_list)
+ stream_dict["ssl.analysis.ja3"] = signature_object.ssl_ja3(frame_signature_list)
+ stream_dict["ssl.analysis.sni_absent"] = signature_object.sni_absent(frame_signature_list)
+ stream_dict["ssl.analysis.ech_enabled"] = signature_object.ssl_ech_enabled(frame_signature_list)
+ stream_dict["ssl.analysis.esni_enabled"] = signature_object.ssl_analysis_esni_enabled(frame_signature_list)
+ return
+
+if __name__=="__main__":
+ # Get the pcap file name in the main function parameter
+ if len(sys.argv) < 2 :
+ logging.error("Please enter the correct parameters !!")
+ sys.exit()
+ pacp_file_path = sys.argv[1]
+
+ # determine file is pcap
+ if IsPcapFile(pacp_file_path):
+ # Define the result output dict
+ result_output_dict = {}
+
+ # Creation signature extraction objects
+ signature_object = GetSignature_2402.GetStreamSignatureFromTshrak(pacp_file_path)
+ # Get all the field dictionaries parsed based on the Tshark command
+ all_frame_signature_dict_list = signature_object._output_dict_list
+
+ # Get basic information of TCP data streams
+ tcp_stream_basic_info_list = GetSignature_2402.GetTCPStreamBaseInfo(all_frame_signature_dict_list)
+ tcp_stream_all_info_list = tcp_stream_basic_info_list
+ # Get other information of TCP data streams
+ # Processing data stream by stream
+ for i in range(len(tcp_stream_all_info_list)):
+ # Get all the Frame IDs of the data stream
+ tcp_frame_signature_list = signature_object.GetOneTcpFrameSignatureList(tcp_stream_all_info_list[i]['StreamID'])
+ # Merge signature information from all Frame IDs
+ # TCP data flow analysis
+ TcpDataFlowAnalysis(signature_object, tcp_stream_all_info_list[i], tcp_frame_signature_list)
+ # General data flow analysis (common, ip, dns, http, ssl)
+ GeneralDataFlowAnalysis(signature_object, tcp_stream_all_info_list[i], tcp_frame_signature_list)
+
+ # Get basic information of UDP data streams
+ udp_stream_basic_info_list = GetSignature_2402.GetUDPStreamBaseInfo(all_frame_signature_dict_list)
+ udp_stream_all_info_list = udp_stream_basic_info_list
+ # Get other information of UDP data streams
+ # Processing data stream by stream
+ for i in range(len(udp_stream_all_info_list)):
+ # Get all the Frame IDs of the data stream
+ udp_frame_signature_list = signature_object.GetOneUdpFrameSignatureList(udp_stream_all_info_list[i]['StreamID'])
+ # Merge signature information from all Frame IDs
+ # UDP data flow analysis
+ UdpDataFlowAnalysis(signature_object, udp_stream_all_info_list[i], udp_frame_signature_list)
+ # General data flow analysis (common, ip, dns, http, ssl)
+ GeneralDataFlowAnalysis(signature_object, udp_stream_all_info_list[i], udp_frame_signature_list)
+
+ # Merge all data stream results
+ result_output_dict = tcp_stream_all_info_list + udp_stream_all_info_list
+
+ # Write signature dictionary to json
+ with open('signature.json', 'w', encoding='utf-8') as f:
+ json.dump(result_output_dict, f, ensure_ascii=False, indent=4)
+ else:
+ logging.error("The input is not a pcap file !!")
+ with open('signature.json', 'w', encoding='utf-8') as f:
+ json.dump({'Error':'The input is not a pcap file'}, f, ensure_ascii=False, indent=4)
+ pass \ No newline at end of file