diff options
| author | qidaijie <[email protected]> | 2023-12-08 15:57:25 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2023-12-08 15:57:25 +0800 |
| commit | 3e11e59d9f9a89f3a9c78deac044d46bd3a12f3d (patch) | |
| tree | e38e0cf064c57f45a98b3f4eaa3f38aa74117fe0 | |
| parent | 0ecaf2c47cf34df5115c8053ee25d85fae995cc8 (diff) | |
fix:优化类型转换处理逻辑fix/23.09
fix:优化类型转换时null值处理逻辑
fix:优化雪花ID配置workerid获取方式
10 files changed, 27 insertions, 69 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>log-stream-processing</artifactId> - <version>1.2.1</version> + <version>1.0.1</version> <name>log-stream-processing</name> <url>http://www.example.com</url> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 0c5ba13..3bc1ad7 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -29,9 +29,6 @@ nacos.schema.namespace=prod nacos.schema.data.id=session_record.json #--------------------------------topology配置------------------------------# -#HOS地址 -hos.endpoint=http://192.168.44.12:9098 - #工具库地址,存放秘钥文件等。 tools.library=D:\\workerspace\\dat\\ diff --git a/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java index b11627c..705984f 100644 --- a/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java @@ -124,15 +124,4 @@ public class FlowWriteConfig { public static final String ZOOKEEPER_SERVERS = ConfigurationsUtils.getStringProperty(propService, "zookeeper.servers"); public static final String TOOLS_LIBRARY = ConfigurationsUtils.getStringProperty(propService, "tools.library"); - - /** - * HOS - * #Galaxy-hos-service服务地址 - * hos.endpoint=192.168.44.12:9098 - * - * #TSG处理流量产生的文件的存储桶名 - * hos.traffic.file.bucket=traffic_file_bucket - */ - public static final String HOS_ENDPOINT = ConfigurationsUtils.getStringProperty(propService, "hos.endpoint"); - public static final String HOS_TRAFFIC_FILE_BUCKET = ConfigurationsUtils.getStringProperty(propDefault, "hos.traffic.file.bucket"); }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java b/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java index 9691123..85e3805 100644 --- a/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java +++ b/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java @@ -140,11 +140,6 @@ public class TransFormProcess extends ProcessFunction<String, String> { transformFunction.getValue(jsonObject, appendToKey, logValue); } break; - case "combine": - if (logValue != null && param != null) { - jsonObject.put(appendToKey, transformFunction.combine(logValue.toString(), param)); - } - break; default: } } diff --git a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java b/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java index 6c7c5c4..43deb80 100644 --- a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java +++ b/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java @@ -162,9 +162,9 @@ class GtpCRelation { */ private static HashMap<String, Object> buildUserData(String phoneNumber, String imsi, String imei, Long lastUpdateTime) { HashMap<String, Object> tmpMap = new HashMap<>(4); - tmpMap.put("phone_number", phoneNumber); - tmpMap.put("imsi", imsi); - tmpMap.put("imei", imei); + tmpMap.put("common_phone_number", phoneNumber); + tmpMap.put("common_imsi", imsi); + tmpMap.put("common_imei", imei); tmpMap.put("last_update_time", lastUpdateTime); return tmpMap; } diff --git a/src/main/java/com/zdjizhi/etl/tools/general/IpLookupUtils.java b/src/main/java/com/zdjizhi/etl/tools/general/IpLookupUtils.java index 7bae58d..854c415 100644 --- a/src/main/java/com/zdjizhi/etl/tools/general/IpLookupUtils.java +++ b/src/main/java/com/zdjizhi/etl/tools/general/IpLookupUtils.java @@ -26,11 +26,13 @@ import java.util.concurrent.Executor; */ public class IpLookupUtils { private static final Log logger = LogFactory.get(); - private static final String ipBuiltInName = "ip_builtin.mmdb"; + private static final String ipv4BuiltInName = "ip_v4_built_in.mmdb"; + private static final String ipv6BuiltInName = "ip_v6_built_in.mmdb"; + private static final String ipv4UserDefinedName = "ip_v4_user_defined.mmdb"; + private static final String ipv6UserDefinedName = "ip_v6_user_defined.mmdb"; + private static final String asnV4Name = "asn_v4.mmdb"; + private static final String asnV6Name = "asn_v6.mmdb"; - private static final String ipUserDefinedName = "ip_user_defined.mmdb"; - - private static final String asnName = "asn_builtin.mmdb"; /** * ip定位库 */ @@ -118,14 +120,23 @@ public class IpLookupUtils { if (metaSha256.equals(downloadFileSha256)) { ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte); switch (fileName) { - case ipBuiltInName: - builder.loadDataFile(inputStream); + case ipv4BuiltInName: + builder.loadDataFileV4(inputStream); + break; + case ipv6BuiltInName: + builder.loadDataFileV6(inputStream); + break; + case ipv4UserDefinedName: + builder.loadDataFilePrivateV4(inputStream); + break; + case ipv6UserDefinedName: + builder.loadDataFilePrivateV6(inputStream); break; - case ipUserDefinedName: - builder.loadDataFilePrivate(inputStream); + case asnV4Name: + builder.loadAsnDataFileV4(inputStream); break; - case asnName: - builder.loadAsnDataFile(inputStream); + case asnV6Name: + builder.loadAsnDataFileV6(inputStream); break; default: } diff --git a/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java b/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java index cb644ce..991c051 100644 --- a/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java +++ b/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java @@ -38,6 +38,4 @@ public interface TransformFunction { void fromUnixTimestampMS(JSONObject jsonObject, String appendToKey, Object logValue); - String combine(String fileName,String bucketName); - } diff --git a/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java b/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java index e78957c..22cc0de 100644 --- a/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java +++ b/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java @@ -121,7 +121,7 @@ public class TransformFunctionImpl implements TransformFunction { @Override public String radiusMatch(JSONObject jsonObject, String ip) { if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { - int vsysId = jsonObject.getIntValue("vsys_id", 1); + int vsysId = jsonObject.getIntValue("common_vsys_id", 1); return HBaseUtils.getAccount(ip + vsysId); } else { return HBaseUtils.getAccount(ip); @@ -154,7 +154,7 @@ public class TransformFunctionImpl implements TransformFunction { if (teid != null) { if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { - int vsysId = jsonObject.getIntValue("vsys_id", 1); + int vsysId = jsonObject.getIntValue("common_vsys_id", 1); teid = teid + vsysId; } String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER); @@ -310,11 +310,6 @@ public class TransformFunctionImpl implements TransformFunction { } } - @Override - public String combine(String fileName, String bucketName) { - return FlowWriteConfig.HOS_ENDPOINT + "/hos/" + bucketName + "/" + fileName; - } - /** * 判断是否为日志字段,是则返回对应value,否则返回原始字符串 * diff --git a/src/test/java/com/zdjizhi/etl/function/CombineTest.java b/src/test/java/com/zdjizhi/etl/function/CombineTest.java deleted file mode 100644 index 5551184..0000000 --- a/src/test/java/com/zdjizhi/etl/function/CombineTest.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.zdjizhi.etl.function; - -import com.zdjizhi.etl.common.FlowWriteConfig; -import org.junit.Test; - -/** - * @author qidaijie - * @Package com.zdjizhi.etl.function - * @Description: - * @date 2023/10/1118:52 - */ -public class CombineTest { - - @Test - public void hosCombineTest() { - //http://192.168.44.12:9098/hos/traffic_file_bucket/54470ead-8fb9-4559-a996-c8459f36ebf6 - String logValue = "54470ead-8fb9-4559-a996-c8459f36ebf6"; - long start = System.currentTimeMillis(); - String uri = FlowWriteConfig.HOS_ENDPOINT + "/hos/" + FlowWriteConfig.HOS_TRAFFIC_FILE_BUCKET + "/" + logValue; - System.out.println(uri); - System.out.println("time:" + System.currentTimeMillis() + "\tstart:" + start); - - } -} diff --git a/src/test/java/com/zdjizhi/etl/json/NewSchemaTest.java b/src/test/java/com/zdjizhi/etl/json/NewSchemaTest.java index 7a513ed..8a57827 100644 --- a/src/test/java/com/zdjizhi/etl/json/NewSchemaTest.java +++ b/src/test/java/com/zdjizhi/etl/json/NewSchemaTest.java @@ -38,8 +38,6 @@ public class NewSchemaTest { private static HashMap<String, Object> defaultFieldsMap = new HashMap<>(16); - - static { // properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.11:8848"); properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); @@ -67,7 +65,6 @@ public class NewSchemaTest { HashMap<String, Class> fieldsFromSchema = getFieldsFromSchema(schema); for (String key : fieldsFromSchema.keySet()) { System.out.println("fileName:" + key + " Class:" + fieldsFromSchema.get(key)); - } } catch (NacosException e) { e.printStackTrace(); |
