summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangchengcheng <[email protected]>2024-01-22 10:16:56 +0800
committerwangchengcheng <[email protected]>2024-01-22 10:16:56 +0800
commitc50e2436bce9fc55a4fdb478ee569860e1e12bec (patch)
tree135f3f517515293d70de96b7e0a7fae808558b47
parent68b4805c4f787c99ec43bd7e6ec9241e13cf5bb7 (diff)
feat:change common_action value filling logicv24.01-rc1
-rw-r--r--properties/action_definition.properties35
-rw-r--r--properties/service_flow_config.properties8
-rw-r--r--src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java46
-rw-r--r--src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java7
-rw-r--r--src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java77
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java2
6 files changed, 88 insertions, 87 deletions
diff --git a/properties/action_definition.properties b/properties/action_definition.properties
new file mode 100644
index 0000000..2a3f59e
--- /dev/null
+++ b/properties/action_definition.properties
@@ -0,0 +1,35 @@
+none=0
+Monitor=1
+monitor=1
+Intercept=2
+intercept=2
+NoIntercept=3
+nointercept=3
+ActiveDefence=4
+activedefence=4
+WANNAT=8
+wannat=8
+Reject=16
+reject=16
+Deny=16
+deny=16
+Shaping=32
+shaping=32
+Manipulate=48
+manipulate=48
+ServiceChaining=64
+servicechaining=64
+Allow=96
+allow=96
+Bypass=96
+bypass=96
+Shunt=128
+shunt=128
+Statistics=129
+statistics=129
+redirect=48
+replace=48
+hijack=48
+insert=48
+edit_element=48
+run_script=48 \ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 125e092..c65dfcd 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,7 +1,7 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-#source.kafka.servers=192.168.44.12:9094
-source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
+source.kafka.servers=192.168.44.12:9094
+#source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#百分点输出kafka地址
percent.sink.kafka.servers=192.168.44.12:9094
#文件源数据topic输出kafka地址
@@ -68,12 +68,12 @@ deal.file.statistics.time=60
#------------------------------------knowledge配置------------------------------------#
knowledge.execution.minutes=600
-knowledge.base.uri=http://192.168.44.67:9999
+knowledge.base.uri=http://192.168.44.12:9999
knowledge.base.path=/v1/knowledge_base
ip.user.defined.kd.id=004390bc-3135-4a6f-a492-3662ecb9e289
ip.builtin.kd.id=64af7077-eb9b-4b8f-80cf-2ceebc89bea9
asn.builtin.kd.id=f9f6bc91-2142-4673-8249-e097c00fe1ea
-hos.url=http://192.168.44.67:9098/hos/traffic_file_bucket/
+hos.url=http://192.168.44.12:9098/hos/traffic_file_bucket/
diff --git a/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java b/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java
index 9bbf95f..97f0d93 100644
--- a/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java
+++ b/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java
@@ -2,6 +2,7 @@ package com.zdjizhi.operator.map;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.tools.general.ConfigurationsUtils;
@@ -12,6 +13,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
+import java.util.HashMap;
import java.util.Properties;
@@ -22,19 +24,35 @@ import java.util.Properties;
public class TypeMapCompleted extends ProcessFunction<String, JSONObject> {
private static final Log logger = LogFactory.get();
private ConvertRecordToPERCENT convertRecordToPERCENT;
- Properties Prop = new Properties();
+ Properties prop = new Properties();
+ Properties actionProp = new Properties();
+ private HashMap<String, Integer> actionMap = new HashMap<String, Integer>();
+
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("security_event.json")) {
- Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties"));
+ prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties"));
} else if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("proxy_event.json")) {
- Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("proxy_event_mapping_table.properties"));
+ prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("proxy_event_mapping_table.properties"));
}
- convertRecordToPERCENT = new ConvertRecordToPERCENT(Prop);
+ convertRecordToPERCENT = new ConvertRecordToPERCENT(prop);
logger.info(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载成功");
+
+ try {
+ actionProp.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("action_definition.properties"));
+ for (String key : actionProp.stringPropertyNames()) {
+ final String action = actionProp.getProperty(key);
+ actionMap.put(key, Integer.valueOf(action));
+ }
+
+ logger.info(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载成功");
+ } catch (Exception e) {
+ logger.error(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载失败,失败原因为:" + e);
+ }
+
} catch (Exception e) {
logger.error(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载失败,失败原因为:" + e);
}
@@ -58,6 +76,26 @@ public class TypeMapCompleted extends ProcessFunction<String, JSONObject> {
}
if (jsonObject != null) {
+
+ if (record.containsKey("security_rule_list")) {
+ jsonObject.put("common_policy_id", JSONArray.from(record.get("security_rule_list")).get(0));
+ jsonObject.put("common_action", actionMap.get(record.get("security_action").toString().replace(" ", "")));
+ }
+
+ if (record.containsKey("monitor_rule_list")) {
+ jsonObject.put("common_policy_id", JSONArray.from(record.get("monitor_rule_list")).get(0));
+ jsonObject.put("common_action", 1);
+ }
+
+ if (record.containsKey("proxy_rule_list")) {
+ jsonObject.put("common_policy_id", JSONArray.from(record.get("proxy_rule_list")).get(0));
+ jsonObject.put("common_action", actionMap.get(record.get("proxy_action").toString().replace(" ", "")));
+
+ if ((int) jsonObject.get("common_action") == 48) {
+ jsonObject.put("common_sub_action", record.get("proxy_action"));
+ }
+ }
+
jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000);
TransForm.transformLog(jsonObject);
diff --git a/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java b/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java
index 57db453..c3639e8 100644
--- a/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java
+++ b/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java
@@ -88,6 +88,7 @@ public class IpLookupUtils {
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipBuiltinknowlegeBaseMeta.getName(), ipBuiltinknowlegeBaseMeta.getFormat());
knowledgeMetaCache.put(fileName, ipBuiltinknowlegeBaseMeta);
}
+
final KnowlegeBaseMeta ipUserDefinedknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_USER_DEFINED_KD_ID);
if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256())) {
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipUserDefinedknowlegeBaseMeta.getName(), ipUserDefinedknowlegeBaseMeta.getFormat());
@@ -163,9 +164,7 @@ public class IpLookupUtils {
* @return 过滤参数
*/
private static String getFilterParameter() {
-
String expr = "[?(@.version=='latest')][?(@.name in ('ip_builtin','ip_user_defined','asn_builtin'))]";
-
return expr;
}
@@ -205,8 +204,4 @@ public class IpLookupUtils {
return knowlegeBaseMeta;
}
- public static void main(String[] args) {
- final String countryLookup = IpLookupUtils.getIpLookup().asnLookup("10.64.10.7");
- System.out.println(countryLookup);
- }
}
diff --git a/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java b/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java
index e6dc4c9..84d695f 100644
--- a/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java
+++ b/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java
@@ -1,6 +1,5 @@
package com.zdjizhi.tools.logtransformation;
-import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import java.util.*;
@@ -27,30 +26,18 @@ public class ConvertRecordToPERCENT {
}
}
- if (record.containsKey("security_rule_list")) {
- percent.put("common_policy_id", (Integer) JSONArray.from(record.get("security_rule_list")).get(0));
- percent.put("common_action", fillingCommonAction((String) record.get("security_action")));
- }
-
- if (record.containsKey("monitor_rule_list")) {
- percent.put("common_policy_id", (Integer) JSONArray.from(record.get("monitor_rule_list")).get(0));
- percent.put("common_action", 1);
- }
-
- if (record.containsKey("proxy_rule_list")){
- percent.put("common_policy_id", (Integer) JSONArray.from(record.get("proxy_rule_list")).get(0));
- percent.put("common_action", fillingCommonAction((String) record.get("proxy_action")));
- }
-
//填充common_start_time、common_end_time
- percent.put("common_start_time",record.get("start_timestamp_ms"));
- percent.put("common_end_time",record.get("end_timestamp_ms"));
+ percent.put("common_start_time", (long) record.get("start_timestamp_ms") / 1000);
+ percent.put("common_end_time", (long) record.get("end_timestamp_ms") / 1000);
+
+ //填充common_sessions
+ percent.put("common_sessions", 1);
//填充common_internal_ip、common_external_ip、common_direction、common_stream_dir
if (record.containsKey("flags")) {
final int flags = (int) record.get("flags");
if (flags > 0) {
- if ((8L & flags) == 8L && (16L & flags) != 16L) {
+ if ((8L & flags) == 8L && (16L & flags) != 16L) {
percent.put("common_internal_ip", record.get("common_client_ip"));
percent.put("common_external_ip", record.get("common_server_ip"));
percent.put("common_direction", 69);
@@ -70,56 +57,4 @@ public class ConvertRecordToPERCENT {
}
return percent;
}
-
-
- private int fillingCommonAction(String action) {
- int number = 0;
- switch (action) {
- case "none":
- number = 0;
- break;
- case "Monitor":
- number = 1;
- break;
- case "Intercept":
- number = 2;
- break;
- case "No Intercept":
- number = 3;
- break;
- case "Active Defence":
- number = 4;
- break;
- case "WAN NAT":
- number = 8;
- break;
- case "Reject":
- case "Deny":
- number = 16;
- break;
- case "Shaping":
- number = 32;
- break;
- case "Manipulate":
- number = 48;
- break;
- case "Service Chaining":
- number = 64;
- break;
- case "Allow":
- case "Bypass":
- number = 96;
- break;
- case "Shunt":
- number = 128;
- break;
- case "Statistics":
- number = 129;
- break;
- default:
- number = 0;
- }
- return number;
- }
-
}
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index e473df6..95d2a49 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -14,8 +14,6 @@ import com.zdjizhi.tools.connections.kafka.KafkaProducer;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-
public class LogFlowWriteTopology {
private static final Log logger = LogFactory.get();