summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2023-12-08 15:57:25 +0800
committerqidaijie <[email protected]>2023-12-08 15:57:25 +0800
commit3e11e59d9f9a89f3a9c78deac044d46bd3a12f3d (patch)
treee38e0cf064c57f45a98b3f4eaa3f38aa74117fe0
parent0ecaf2c47cf34df5115c8053ee25d85fae995cc8 (diff)
fix:优化类型转换处理逻辑fix/23.09
fix:优化类型转换时null值处理逻辑 fix:优化雪花ID配置workerid获取方式
-rw-r--r--pom.xml2
-rw-r--r--properties/service_flow_config.properties3
-rw-r--r--src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java11
-rw-r--r--src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java5
-rw-r--r--src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java6
-rw-r--r--src/main/java/com/zdjizhi/etl/tools/general/IpLookupUtils.java31
-rw-r--r--src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java2
-rw-r--r--src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java9
-rw-r--r--src/test/java/com/zdjizhi/etl/function/CombineTest.java24
-rw-r--r--src/test/java/com/zdjizhi/etl/json/NewSchemaTest.java3
10 files changed, 27 insertions, 69 deletions
diff --git a/pom.xml b/pom.xml
index 34cf196..32ed7f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-stream-processing</artifactId>
- <version>1.2.1</version>
+ <version>1.0.1</version>
<name>log-stream-processing</name>
<url>http://www.example.com</url>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 0c5ba13..3bc1ad7 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -29,9 +29,6 @@ nacos.schema.namespace=prod
nacos.schema.data.id=session_record.json
#--------------------------------topology配置------------------------------#
-#HOS地址
-hos.endpoint=http://192.168.44.12:9098
-
#工具库地址,存放秘钥文件等。
tools.library=D:\\workerspace\\dat\\
diff --git a/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java
index b11627c..705984f 100644
--- a/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java
@@ -124,15 +124,4 @@ public class FlowWriteConfig {
public static final String ZOOKEEPER_SERVERS = ConfigurationsUtils.getStringProperty(propService, "zookeeper.servers");
public static final String TOOLS_LIBRARY = ConfigurationsUtils.getStringProperty(propService, "tools.library");
-
- /**
- * HOS
- * #Galaxy-hos-service服务地址
- * hos.endpoint=192.168.44.12:9098
- *
- * #TSG处理流量产生的文件的存储桶名
- * hos.traffic.file.bucket=traffic_file_bucket
- */
- public static final String HOS_ENDPOINT = ConfigurationsUtils.getStringProperty(propService, "hos.endpoint");
- public static final String HOS_TRAFFIC_FILE_BUCKET = ConfigurationsUtils.getStringProperty(propDefault, "hos.traffic.file.bucket");
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java b/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java
index 9691123..85e3805 100644
--- a/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java
+++ b/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java
@@ -140,11 +140,6 @@ public class TransFormProcess extends ProcessFunction<String, String> {
transformFunction.getValue(jsonObject, appendToKey, logValue);
}
break;
- case "combine":
- if (logValue != null && param != null) {
- jsonObject.put(appendToKey, transformFunction.combine(logValue.toString(), param));
- }
- break;
default:
}
}
diff --git a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java b/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java
index 6c7c5c4..43deb80 100644
--- a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java
+++ b/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java
@@ -162,9 +162,9 @@ class GtpCRelation {
*/
private static HashMap<String, Object> buildUserData(String phoneNumber, String imsi, String imei, Long lastUpdateTime) {
HashMap<String, Object> tmpMap = new HashMap<>(4);
- tmpMap.put("phone_number", phoneNumber);
- tmpMap.put("imsi", imsi);
- tmpMap.put("imei", imei);
+ tmpMap.put("common_phone_number", phoneNumber);
+ tmpMap.put("common_imsi", imsi);
+ tmpMap.put("common_imei", imei);
tmpMap.put("last_update_time", lastUpdateTime);
return tmpMap;
}
diff --git a/src/main/java/com/zdjizhi/etl/tools/general/IpLookupUtils.java b/src/main/java/com/zdjizhi/etl/tools/general/IpLookupUtils.java
index 7bae58d..854c415 100644
--- a/src/main/java/com/zdjizhi/etl/tools/general/IpLookupUtils.java
+++ b/src/main/java/com/zdjizhi/etl/tools/general/IpLookupUtils.java
@@ -26,11 +26,13 @@ import java.util.concurrent.Executor;
*/
public class IpLookupUtils {
private static final Log logger = LogFactory.get();
- private static final String ipBuiltInName = "ip_builtin.mmdb";
+ private static final String ipv4BuiltInName = "ip_v4_built_in.mmdb";
+ private static final String ipv6BuiltInName = "ip_v6_built_in.mmdb";
+ private static final String ipv4UserDefinedName = "ip_v4_user_defined.mmdb";
+ private static final String ipv6UserDefinedName = "ip_v6_user_defined.mmdb";
+ private static final String asnV4Name = "asn_v4.mmdb";
+ private static final String asnV6Name = "asn_v6.mmdb";
- private static final String ipUserDefinedName = "ip_user_defined.mmdb";
-
- private static final String asnName = "asn_builtin.mmdb";
/**
* ip定位库
*/
@@ -118,14 +120,23 @@ public class IpLookupUtils {
if (metaSha256.equals(downloadFileSha256)) {
ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte);
switch (fileName) {
- case ipBuiltInName:
- builder.loadDataFile(inputStream);
+ case ipv4BuiltInName:
+ builder.loadDataFileV4(inputStream);
+ break;
+ case ipv6BuiltInName:
+ builder.loadDataFileV6(inputStream);
+ break;
+ case ipv4UserDefinedName:
+ builder.loadDataFilePrivateV4(inputStream);
+ break;
+ case ipv6UserDefinedName:
+ builder.loadDataFilePrivateV6(inputStream);
break;
- case ipUserDefinedName:
- builder.loadDataFilePrivate(inputStream);
+ case asnV4Name:
+ builder.loadAsnDataFileV4(inputStream);
break;
- case asnName:
- builder.loadAsnDataFile(inputStream);
+ case asnV6Name:
+ builder.loadAsnDataFileV6(inputStream);
break;
default:
}
diff --git a/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java b/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java
index cb644ce..991c051 100644
--- a/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java
+++ b/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java
@@ -38,6 +38,4 @@ public interface TransformFunction {
void fromUnixTimestampMS(JSONObject jsonObject, String appendToKey, Object logValue);
- String combine(String fileName,String bucketName);
-
}
diff --git a/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java b/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java
index e78957c..22cc0de 100644
--- a/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java
+++ b/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java
@@ -121,7 +121,7 @@ public class TransformFunctionImpl implements TransformFunction {
@Override
public String radiusMatch(JSONObject jsonObject, String ip) {
if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
- int vsysId = jsonObject.getIntValue("vsys_id", 1);
+ int vsysId = jsonObject.getIntValue("common_vsys_id", 1);
return HBaseUtils.getAccount(ip + vsysId);
} else {
return HBaseUtils.getAccount(ip);
@@ -154,7 +154,7 @@ public class TransformFunctionImpl implements TransformFunction {
if (teid != null) {
if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
- int vsysId = jsonObject.getIntValue("vsys_id", 1);
+ int vsysId = jsonObject.getIntValue("common_vsys_id", 1);
teid = teid + vsysId;
}
String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER);
@@ -310,11 +310,6 @@ public class TransformFunctionImpl implements TransformFunction {
}
}
- @Override
- public String combine(String fileName, String bucketName) {
- return FlowWriteConfig.HOS_ENDPOINT + "/hos/" + bucketName + "/" + fileName;
- }
-
/**
* 判断是否为日志字段,是则返回对应value,否则返回原始字符串
*
diff --git a/src/test/java/com/zdjizhi/etl/function/CombineTest.java b/src/test/java/com/zdjizhi/etl/function/CombineTest.java
deleted file mode 100644
index 5551184..0000000
--- a/src/test/java/com/zdjizhi/etl/function/CombineTest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.zdjizhi.etl.function;
-
-import com.zdjizhi.etl.common.FlowWriteConfig;
-import org.junit.Test;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.etl.function
- * @Description:
- * @date 2023/10/1118:52
- */
-public class CombineTest {
-
- @Test
- public void hosCombineTest() {
- //http://192.168.44.12:9098/hos/traffic_file_bucket/54470ead-8fb9-4559-a996-c8459f36ebf6
- String logValue = "54470ead-8fb9-4559-a996-c8459f36ebf6";
- long start = System.currentTimeMillis();
- String uri = FlowWriteConfig.HOS_ENDPOINT + "/hos/" + FlowWriteConfig.HOS_TRAFFIC_FILE_BUCKET + "/" + logValue;
- System.out.println(uri);
- System.out.println("time:" + System.currentTimeMillis() + "\tstart:" + start);
-
- }
-}
diff --git a/src/test/java/com/zdjizhi/etl/json/NewSchemaTest.java b/src/test/java/com/zdjizhi/etl/json/NewSchemaTest.java
index 7a513ed..8a57827 100644
--- a/src/test/java/com/zdjizhi/etl/json/NewSchemaTest.java
+++ b/src/test/java/com/zdjizhi/etl/json/NewSchemaTest.java
@@ -38,8 +38,6 @@ public class NewSchemaTest {
private static HashMap<String, Object> defaultFieldsMap = new HashMap<>(16);
-
-
static {
// properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.11:8848");
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
@@ -67,7 +65,6 @@ public class NewSchemaTest {
HashMap<String, Class> fieldsFromSchema = getFieldsFromSchema(schema);
for (String key : fieldsFromSchema.keySet()) {
System.out.println("fileName:" + key + " Class:" + fieldsFromSchema.get(key));
-
}
} catch (NacosException e) {
e.printStackTrace();