diff options
| author | gujinkai <[email protected]> | 2024-01-25 10:29:56 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-01-25 14:34:34 +0800 |
| commit | 31969400245dad3fa480c1db0aa7bdb031ee2fcf (patch) | |
| tree | 8c0892fd6f2d79db2e6c88793145521d0318f045 | |
| parent | e5ae0295619de64f4669f646a769ef0ce9fa89c9 (diff) | |
feat: adapt to groot streamrelease-24.01-rc3
7 files changed, 49 insertions, 12 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java index c69e763..0c767a9 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/operator/FirstAggregation.java @@ -298,7 +298,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { cnMetricLog.setCommon_sessions(cnRecordLog.getCommon_sessions()); String commonL4Protocol = cnRecordLog.getCommon_l4_protocol(); - if ("TCP".equals(commonL4Protocol)) { + if ("TCP".equalsIgnoreCase(commonL4Protocol)) { cnMetricLog.setC2s_tcp_pkt_num(cnRecordLog.getCommon_c2s_pkt_num()); cnMetricLog.setS2c_tcp_pkt_num(cnRecordLog.getCommon_s2c_pkt_num()); cnMetricLog.setC2s_tcp_byte_num(cnRecordLog.getCommon_c2s_byte_num()); @@ -377,7 +377,7 @@ public class FirstAggregation extends AbstractFirstAggregation<CnMetricLog> { cnMetricLog.setCommon_sessions(cnRecordLog.getCommon_sessions() + cnMetricLog.getCommon_sessions()); String commonL4Protocol = cnRecordLog.getCommon_l4_protocol(); - if ("TCP".equals(commonL4Protocol)) { + if ("TCP".equalsIgnoreCase(commonL4Protocol)) { cnMetricLog.setC2s_tcp_pkt_num(cnRecordLog.getCommon_c2s_pkt_num() + cnMetricLog.getC2s_tcp_pkt_num()); cnMetricLog.setS2c_tcp_pkt_num(cnRecordLog.getCommon_s2c_pkt_num() + cnMetricLog.getS2c_tcp_pkt_num()); cnMetricLog.setC2s_tcp_byte_num(cnRecordLog.getCommon_c2s_byte_num() + cnMetricLog.getC2s_tcp_byte_num()); diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java index 31feace..18da277 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java @@ -23,7 +23,7 @@ public class CnRelationMetric implements Schedule { @Override public void schedule() throws Exception { SingleOutputStreamOperator<CnRecordLog> source = CnRecordEtl.singleOutputStreamOperator; - SingleOutputStreamOperator<RelationMetricLog> process = source.filter(log -> "TCP".equals(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443) + SingleOutputStreamOperator<RelationMetricLog> process = source.filter(log -> "TCP".equalsIgnoreCase(log.getCommon_l4_protocol()) || log.getCommon_server_port() == 53 || log.getCommon_server_port() == 443) .process(new FirstAggregation()).name("relationProcess"); SingleOutputStreamOperator<String> relationMetric = process.filter(log -> log.getMetricKey() == 1) diff --git a/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java b/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java index cead171..ef64bc7 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java +++ b/platform-base/src/main/java/com/zdjizhi/base/common/CnRecordLog.java @@ -187,10 +187,15 @@ public class CnRecordLog implements Serializable { @JSONField(serialize = false) private long external_query_num; + @JSONField(name = "cn_internal_rule_id_list") private List<Long> rule_id_list = new ArrayList<>(); - + @JSONField(name = "cn_internal_ioc_type_list") private List<String> ioc_type_list = new ArrayList<>(); + public void setClient_ip_tags(List<String> client_ip_tags) { + this.client_ip_tags = client_ip_tags; + } + public List<String> getClient_ip_tags() { return Collections.unmodifiableList(client_ip_tags); } @@ -207,6 +212,10 @@ public class CnRecordLog implements Serializable { } } + public void setServer_ip_tags(List<String> server_ip_tags) { + this.server_ip_tags = server_ip_tags; + } + public List<String> getServer_ip_tags() { return Collections.unmodifiableList(server_ip_tags); } @@ -223,6 +232,10 @@ public class CnRecordLog implements Serializable { } } + public void setDomain_tags(List<String> domain_tags) { + this.domain_tags = domain_tags; + } + public List<String> getDomain_tags() { return Collections.unmodifiableList(domain_tags); } @@ -239,6 +252,10 @@ public class CnRecordLog implements Serializable { } } + public void setApp_tags(List<String> app_tags) { + this.app_tags = app_tags; + } + public List<String> getApp_tags() { return Collections.unmodifiableList(app_tags); } diff --git a/platform-base/src/main/java/com/zdjizhi/base/common/CommonConfig.java b/platform-base/src/main/java/com/zdjizhi/base/common/CommonConfig.java index 6021a80..6cd3557 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/common/CommonConfig.java +++ b/platform-base/src/main/java/com/zdjizhi/base/common/CommonConfig.java @@ -264,4 +264,8 @@ public class CommonConfig { public static final ConfigOption<String> CLICKHOUSE_PASSWORD = ConfigOptions.key("clickhouse.password") .stringType() .defaultValue("ceiec2019"); + + public static final ConfigOption<Boolean> ENABLE_ETL = ConfigOptions.key("etl.enable") + .booleanType() + .defaultValue(true); } diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordEtl.java b/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordEtl.java index 20e296b..ae80f64 100644 --- a/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordEtl.java +++ b/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordEtl.java @@ -1,5 +1,6 @@ package com.zdjizhi.etl; +import com.alibaba.fastjson2.JSON; import com.zdjizhi.base.common.CnRecordLog; import com.zdjizhi.base.common.CommonConfig; import com.zdjizhi.base.common.SourceConfig; @@ -19,17 +20,31 @@ public class CnRecordEtl { public static SingleOutputStreamOperator<CnRecordLog> singleOutputStreamOperator; public CnRecordEtl() { - singleOutputStreamOperator = getCnRecordSource(); + if (Configs.get(CommonConfig.ENABLE_ETL)) { + singleOutputStreamOperator = getCnRecordSource(); + } else { + singleOutputStreamOperator = getCnRecordSourceWithoutEtl(); + } } private SingleOutputStreamOperator<CnRecordLog> getCnRecordSource() { return getKafkaSource().process(new EtlProcessFunc()).name("etlProcess") .assignTimestampsAndWatermarks( - FlinkEnvironmentUtils.createWatermarkStrategy( - Duration.ofSeconds(Configs.get(CommonConfig.WATERMARK_SECONDS)), - (event, timestamp) -> event.getCommon_recv_time() * 1000 - ) - ); + FlinkEnvironmentUtils.createWatermarkStrategy( + Duration.ofSeconds(Configs.get(CommonConfig.WATERMARK_SECONDS)), + (event, timestamp) -> event.getCommon_recv_time() * 1000 + ) + ); + } + + private SingleOutputStreamOperator<CnRecordLog> getCnRecordSourceWithoutEtl() { + return getKafkaSource().map(message -> JSON.parseObject(message, CnRecordLog.class)) + .assignTimestampsAndWatermarks( + FlinkEnvironmentUtils.createWatermarkStrategy( + Duration.ofSeconds(Configs.get(CommonConfig.WATERMARK_SECONDS)), + (event, timestamp) -> event.getCommon_recv_time() * 1000 + ) + ); } private DataStreamSource<String> getKafkaSource() { diff --git a/platform-schedule/src/main/resources/business.properties b/platform-schedule/src/main/resources/business.properties index ebdc14a..c6c8197 100644 --- a/platform-schedule/src/main/resources/business.properties +++ b/platform-schedule/src/main/resources/business.properties @@ -1,5 +1,5 @@ # session-record-cn?? -cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence +#cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence # ??? cn.pre.metric.class=com.zdjizhi.pre.CnPreMetric # ????????? diff --git a/platform-schedule/src/main/resources/common.properties b/platform-schedule/src/main/resources/common.properties index 34b0b71..fc5577f 100644 --- a/platform-schedule/src/main/resources/common.properties +++ b/platform-schedule/src/main/resources/common.properties @@ -37,4 +37,5 @@ nacos.server.addr=192.168.44.55:8848 # api detection url rule.full.url=http://192.168.44.54:8090/v1/rule/detection rule.inc.url=http://192.168.44.54:8090/v1/rule/detection/increase -gateway.host=192.168.44.55
\ No newline at end of file +gateway.host=192.168.44.55 +etl.enable=false
\ No newline at end of file |
