summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-01-25 10:29:56 +0800
committergujinkai <[email protected]>2024-01-25 14:34:34 +0800
commit31969400245dad3fa480c1db0aa7bdb031ee2fcf (patch)
tree8c0892fd6f2d79db2e6c88793145521d0318f045
parente5ae0295619de64f4669f646a769ef0ce9fa89c9 (diff)
feat: adapt to groot streamrelease-24.01-rc3
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java4
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java2
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java19
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/common/CommonConfig.java4
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/CnRecordEtl.java27
-rw-r--r--platform-schedule/src/main/resources/business.properties2
-rw-r--r--platform-schedule/src/main/resources/common.properties3
7 files changed, 49 insertions, 12 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
index c69e763..0c767a9 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java
@@ -298,7 +298,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
cnMetricLog.setCommon_sessions(cnRecordLog.getCommon_sessions());
String commonL4Protocol = cnRecordLog.getCommon_l4_protocol();
- if ("TCP".equals(commonL4Protocol)) {
+ if ("TCP".equalsIgnoreCase(commonL4Protocol)) {
cnMetricLog.setC2s_tcp_pkt_num(cnRecordLog.getCommon_c2s_pkt_num());
cnMetricLog.setS2c_tcp_pkt_num(cnRecordLog.getCommon_s2c_pkt_num());
cnMetricLog.setC2s_tcp_byte_num(cnRecordLog.getCommon_c2s_byte_num());
@@ -377,7 +377,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> {
cnMetricLog.setCommon_sessions(cnRecordLog.getCommon_sessions() + cnMetricLog.getCommon_sessions());
String commonL4Protocol = cnRecordLog.getCommon_l4_protocol();
- if ("TCP".equals(commonL4Protocol)) {
+ if ("TCP".equalsIgnoreCase(commonL4Protocol)) {
cnMetricLog.setC2s_tcp_pkt_num(cnRecordLog.getCommon_c2s_pkt_num() + cnMetricLog.getC2s_tcp_pkt_num());
cnMetricLog.setS2c_tcp_pkt_num(cnRecordLog.getCommon_s2c_pkt_num() + cnMetricLog.getS2c_tcp_pkt_num());
cnMetricLog.setC2s_tcp_byte_num(cnRecordLog.getCommon_c2s_byte_num() + cnMetricLog.getC2s_tcp_byte_num());
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
index 31feace..18da277 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
@@ -23,7 +23,7 @@ public class CnRelationMetric implements Schedule {
@Override
public void schedule() throws Exception {
SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator;
- SingleOutputStreamOperator<RelationMetricLog> process = source.filter(log -> "TCP".equals(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443)
+ SingleOutputStreamOperator<RelationMetricLog> process = source.filter(log -> "TCP".equalsIgnoreCase(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443)
.process(new FirstAggregation()).name("relationProcess");
SingleOutputStreamOperator<String> relationMetric = process.filter(log -> log.getMetricKey() == 1)
diff --git a/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java b/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java
index cead171..ef64bc7 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java
@@ -187,10 +187,15 @@ public class CnRecordLog implements Serializable {
@JSONField(serialize = false)
private long external_query_num;
+ @JSONField(name = "cn_internal_rule_id_list")
private List<Long> rule_id_list = new ArrayList<>();
-
+ @JSONField(name = "cn_internal_ioc_type_list")
private List<String> ioc_type_list = new ArrayList<>();
+ public void setClient_ip_tags(List<String> client_ip_tags) {
+ this.client_ip_tags = client_ip_tags;
+ }
+
public List<String> getClient_ip_tags() {
return Collections.unmodifiableList(client_ip_tags);
}
@@ -207,6 +212,10 @@ public class CnRecordLog implements Serializable {
}
}
+ public void setServer_ip_tags(List<String> server_ip_tags) {
+ this.server_ip_tags = server_ip_tags;
+ }
+
public List<String> getServer_ip_tags() {
return Collections.unmodifiableList(server_ip_tags);
}
@@ -223,6 +232,10 @@ public class CnRecordLog implements Serializable {
}
}
+ public void setDomain_tags(List<String> domain_tags) {
+ this.domain_tags = domain_tags;
+ }
+
public List<String> getDomain_tags() {
return Collections.unmodifiableList(domain_tags);
}
@@ -239,6 +252,10 @@ public class CnRecordLog implements Serializable {
}
}
+ public void setApp_tags(List<String> app_tags) {
+ this.app_tags = app_tags;
+ }
+
public List<String> getApp_tags() {
return Collections.unmodifiableList(app_tags);
}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/common/CommonConfig.java b/platform-base/src/main/java/com/zdjizhi/base/common/CommonConfig.java
index 6021a80..6cd3557 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/common/CommonConfig.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/common/CommonConfig.java
@@ -264,4 +264,8 @@ public class CommonConfig {
public static final ConfigOption<String> CLICKHOUSE_PASSWORD = ConfigOptions.key("clickhouse.password")
.stringType()
.defaultValue("ceiec2019");
+
+ public static final ConfigOption<Boolean> ENABLE_ETL = ConfigOptions.key("etl.enable")
+ .booleanType()
+ .defaultValue(true);
}
diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordEtl.java b/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordEtl.java
index 20e296b..ae80f64 100644
--- a/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordEtl.java
+++ b/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordEtl.java
@@ -1,5 +1,6 @@
package com.zdjizhi.etl;
+import com.alibaba.fastjson2.JSON;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.common.CommonConfig;
import com.zdjizhi.base.common.SourceConfig;
@@ -19,17 +20,31 @@ public class CnRecordEtl {
public static SingleOutputStreamOperator<CnRecordLog> singleOutputStreamOperator;
public CnRecordEtl() {
- singleOutputStreamOperator = getCnRecordSource();
+ if (Configs.get(CommonConfig.ENABLE_ETL)) {
+ singleOutputStreamOperator = getCnRecordSource();
+ } else {
+ singleOutputStreamOperator = getCnRecordSourceWithoutEtl();
+ }
}
private SingleOutputStreamOperator<CnRecordLog> getCnRecordSource() {
return getKafkaSource().process(new EtlProcessFunc()).name("etlProcess")
.assignTimestampsAndWatermarks(
- FlinkEnvironmentUtils.createWatermarkStrategy(
- Duration.ofSeconds(Configs.get(CommonConfig.WATERMARK_SECONDS)),
- (event, timestamp) -> event.getCommon_recv_time() * 1000
- )
- );
+ FlinkEnvironmentUtils.createWatermarkStrategy(
+ Duration.ofSeconds(Configs.get(CommonConfig.WATERMARK_SECONDS)),
+ (event, timestamp) -> event.getCommon_recv_time() * 1000
+ )
+ );
+ }
+
+ private SingleOutputStreamOperator<CnRecordLog> getCnRecordSourceWithoutEtl() {
+ return getKafkaSource().map(message -> JSON.parseObject(message, CnRecordLog.class))
+ .assignTimestampsAndWatermarks(
+ FlinkEnvironmentUtils.createWatermarkStrategy(
+ Duration.ofSeconds(Configs.get(CommonConfig.WATERMARK_SECONDS)),
+ (event, timestamp) -> event.getCommon_recv_time() * 1000
+ )
+ );
}
private DataStreamSource<String> getKafkaSource() {
diff --git a/platform-schedule/src/main/resources/business.properties b/platform-schedule/src/main/resources/business.properties
index ebdc14a..c6c8197 100644
--- a/platform-schedule/src/main/resources/business.properties
+++ b/platform-schedule/src/main/resources/business.properties
@@ -1,5 +1,5 @@
# session-record-cn??
-cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence
+#cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence
# ???
cn.pre.metric.class=com.zdjizhi.pre.CnPreMetric
# ?????????
diff --git a/platform-schedule/src/main/resources/common.properties b/platform-schedule/src/main/resources/common.properties
index 34b0b71..fc5577f 100644
--- a/platform-schedule/src/main/resources/common.properties
+++ b/platform-schedule/src/main/resources/common.properties
@@ -37,4 +37,5 @@ nacos.server.addr=192.168.44.55:8848
# api detection url
rule.full.url=http://192.168.44.54:8090/v1/rule/detection
rule.inc.url=http://192.168.44.54:8090/v1/rule/detection/increase
-gateway.host=192.168.44.55 \ No newline at end of file
+gateway.host=192.168.44.55
+etl.enable=false \ No newline at end of file