summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pom.xml4
-rw-r--r--properties/service_flow_config.properties24
-rw-r--r--src/main/java/com/zdjizhi/bean/FileMeta.java10
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java37
-rw-r--r--src/main/java/com/zdjizhi/utils/general/FileEdit.java12
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormMap.java2
-rw-r--r--src/test/java/com/zdjizhi/EncryptorTest.java35
-rw-r--r--src/test/java/com/zdjizhi/FunctionTest.java52
-rw-r--r--src/test/java/com/zdjizhi/HBaseTest.java54
-rw-r--r--src/test/java/com/zdjizhi/json/JsonPathTest.java79
-rw-r--r--src/test/java/com/zdjizhi/nacos/NacosTest.java100
-rw-r--r--src/test/java/com/zdjizhi/nacos/SchemaListener.java136
12 files changed, 42 insertions, 503 deletions
diff --git a/pom.xml b/pom.xml
index 08db21b..3222397 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-stream-doublewrite</artifactId>
- <version>22.04</version>
+ <version>22.04-v3</version>
<name>log-stream-doublewrite</name>
<url>http://www.example.com</url>
@@ -39,7 +39,7 @@
<hbase.version>2.2.3</hbase.version>
<nacos.version>1.2.0</nacos.version>
<zdjz.tools.version>1.0.8</zdjz.tools.version>
-<!-- <scope.type>provided</scope.type>-->
+ <scope.type>provided</scope.type>
</properties>
<build>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 241d28c..06a33b3 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,35 +1,35 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-source.kafka.servers=10.3.60.3:9094
+source.kafka.servers=192.168.44.12:9094
#百分点输出kafka地址
-percent.sink.kafka.servers=10.3.45.126:6667,10.3.45.127:6667,10.3.45.128:6667
+percent.sink.kafka.servers=192.168.44.12:9094
#文件源数据topic输出kafka地址
-file.data.sink.kafka.servers=10.3.60.3:9094
+file.data.sink.kafka.servers=192.168.44.12:9094
#zookeeper 地址 用于配置log_id
-zookeeper.servers=10.3.60.3:2181
+zookeeper.servers=192.168.44.12:2181
#hbase zookeeper地址 用于连接HBase
-hbase.zookeeper.servers=10.3.60.3:2181
+hbase.zookeeper.servers=192.168.44.12:2181
#--------------------------------HTTP/定位库------------------------------#
#定位库地址
-tools.library=/opt/dat/
+tools.library=C:\\workspace\\dat\\
#--------------------------------nacos配置------------------------------#
#nacos 地址
-nacos.server=10.3.60.3:8848
+nacos.server=192.168.44.12:8848
#nacos namespace
-nacos.schema.namespace=prod
+nacos.schema.namespace=P19
#nacos topology_common_config.properties namespace
-nacos.common.namespace=prod
+nacos.common.namespace=P19
#nacos data id
-nacos.data.id=session_record.json
+nacos.data.id=security_event.json
#------------------------------------OOS配置------------------------------------#
#oos地址
@@ -38,10 +38,10 @@ oos.servers=10.3.45.100:8057
#--------------------------------Kafka消费/生产配置------------------------------#
#kafka 接收数据topic
-source.kafka.topic=SESSION-RECORD
+source.kafka.topic=test
#百分点对应的topic
-percent.kafka.topic=SESSION-RECORD
+percent.kafka.topic=PERCENT-RECORD
#文件源数据topic
file.data.kafka.topic=test-file-data
diff --git a/src/main/java/com/zdjizhi/bean/FileMeta.java b/src/main/java/com/zdjizhi/bean/FileMeta.java
index e24e0b4..96a18ea 100644
--- a/src/main/java/com/zdjizhi/bean/FileMeta.java
+++ b/src/main/java/com/zdjizhi/bean/FileMeta.java
@@ -6,7 +6,7 @@ public class FileMeta {
private long common_log_id;
protected int common_recv_time;
private String common_schema_type;
- private JSONArray sourceList;
+ private JSONArray source_list;
private int processing_time;
public long getCommon_log_id() {
@@ -33,12 +33,12 @@ public class FileMeta {
this.common_schema_type = common_schema_type;
}
- public JSONArray getSourceList() {
- return sourceList;
+ public JSONArray getSource_list() {
+ return source_list;
}
- public void setSourceList(JSONArray sourceList) {
- this.sourceList = sourceList;
+ public void setSource_list(JSONArray source_list) {
+ this.source_list = source_list;
}
public int getProcessing_time() {
diff --git a/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java b/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java
index a90f2f2..bec1fe5 100644
--- a/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java
@@ -10,7 +10,6 @@ import com.zdjizhi.bean.SourceList;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.FileEdit;
-import com.zdjizhi.utils.json.JsonTypeUtil;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
@@ -53,56 +52,50 @@ public class DealFileProcessFunction extends ProcessFunction<Map<String, Object>
public void processElement(Map<String, Object> message, Context context, Collector<String> collector) throws Exception {
try {
if (message.size() > 0) {
-// jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
-// jsonMap = JsonTypeUtil.typeTransform(map);
+
rpUrlValue = (String) message.get("http_response_body");
rqUrlValue = (String) message.get("http_request_body");
emailUrlValue = (String) message.get("mail_eml_file");
-
if (StringUtil.isNotBlank(rpUrlValue) || StringUtil.isNotBlank(rqUrlValue) || StringUtil.isNotBlank(emailUrlValue)) {
- cfgId = (long) message.get("common_policy_id");
+ cfgId = (long) message.getOrDefault("common_policy_id",0L);
sIp = (String) message.get("common_client_ip");
sPort = (int) message.get("common_client_port");
dIp = (String) message.get("common_server_ip");
dPort = (int) message.get("common_server_port");
foundTime = (long) message.get("common_recv_time");
schemaType = (String) message.get("common_schema_type");
+ domain = (String)message.getOrDefault("http_domain","");
+ account = (String)message.getOrDefault("common_subscribe_id","");
- if (StringUtil.isNotBlank((String) message.get("http_domain"))) {
- domain = message.get("http_domain").toString();
- } else {
- domain = "NA";
- }
- if (StringUtil.isNotBlank((String) message.get("common_subscribe_id"))) {
- account = message.get("common_subscribe_id").toString();
- } else {
- account = "NA";
- }
FileMeta fileMeta = new FileMeta();
JSONArray jsonarray = new JSONArray();
if (StringUtil.isNotBlank(rqUrlValue)) {
- message.put("http_request_body", FileEdit.fileDownloadUrl(rqUrlValue, "_1"));
+ System.out.println(rqUrlValue);
+ String fileId = FileEdit.getFileId(rqUrlValue,"_1");
+ message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId));
SourceList request = new SourceList();
request.setSource_oss_path(rqUrlValue);
- request.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, "_1"));
+ request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, fileId));
jsonarray.add(request);
}
if (StringUtil.isNotBlank(rpUrlValue)) {
- message.put("http_response_body", FileEdit.fileDownloadUrl(rpUrlValue, "_2"));
+ String fileId = FileEdit.getFileId(rpUrlValue,"_2");
+ message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId));
SourceList response = new SourceList();
response.setSource_oss_path(rpUrlValue);
- response.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, "_2"));
+ response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, fileId));
jsonarray.add(response);
}
if (StringUtil.isNotBlank(emailUrlValue)) {
- message.put("mail_eml_file", FileEdit.fileDownloadUrl(emailUrlValue, "_9"));
+ String fileId = FileEdit.getFileId(emailUrlValue,"_9");
+ message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId));
SourceList emailFile = new SourceList();
emailFile.setSource_oss_path(emailUrlValue);
- emailFile.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, "_9"));
+ emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, fileId));
jsonarray.add(emailFile);
}
- fileMeta.setSourceList(jsonarray);
+ fileMeta.setSource_list(jsonarray);
fileMeta.setCommon_log_id((long) message.get("common_log_id"));
fileMeta.setCommon_recv_time(Integer.parseInt(message.get("common_recv_time").toString()));
fileMeta.setCommon_schema_type((String) message.get("common_schema_type"));
diff --git a/src/main/java/com/zdjizhi/utils/general/FileEdit.java b/src/main/java/com/zdjizhi/utils/general/FileEdit.java
index 8c3da79..144dedd 100644
--- a/src/main/java/com/zdjizhi/utils/general/FileEdit.java
+++ b/src/main/java/com/zdjizhi/utils/general/FileEdit.java
@@ -12,7 +12,7 @@ import static com.zdjizhi.utils.system.FlowWriteConfigurations.judgeFileType;
public class FileEdit {
- public static String fileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileSuffix) throws Exception {
+ public static String getFileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileId){
String fileType = null;
if (judgeFileType(getFileType(urlValue))){
fileType = getFileType(urlValue);
@@ -24,11 +24,12 @@ public class FileEdit {
fileType = "eml";
}
}
- return "http://"+ FlowWriteConfig.OOS_SERVERS+"/upload_v2"+"/"+cfgId+"/"+fileType+"/"+sIp+"/"+sPort+"/"+dIp+"/"+dPort+"/"+foundTime+"/"+account+"/"+domain+"/"+getFileName(urlValue,fileSuffix);
+
+ return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/upload?cfg_id="+cfgId+"&file_id="+fileId+"&file_type="+fileType+"&found_time="+foundTime+"&s_ip="+sIp+"&s_port="+sPort+"&d_ip="+dIp+"&d_port="+dPort+"&domain="+domain+"&account="+account;
}
- public static String fileDownloadUrl( String urlValue,String fileSuffix) throws Exception {
- return "http://"+ FlowWriteConfig.OOS_SERVERS+"/download_v2"+"/"+getFileName(urlValue,fileSuffix);
+ public static String getFileDownloadUrl(String fileId){
+ return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/download?file_id="+fileId;
}
@@ -37,7 +38,8 @@ public class FileEdit {
return split[split.length-1];
}
- public static String getFileName(String url,String fileSuffix) throws Exception {
+ public static String getFileId(String url,String fileSuffix) throws Exception {
+
String[] arr = url.split("/");
String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_"));
String prefix = MD5Utils.md5Encode(filename);
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
index 6023dd5..9aa54c8 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
@@ -44,7 +44,7 @@ public class TransFormMap {
} catch (RuntimeException e) {
logger.error("TransForm logs failed,The exception is :" + e);
return null;
- }
+ }
}
diff --git a/src/test/java/com/zdjizhi/EncryptorTest.java b/src/test/java/com/zdjizhi/EncryptorTest.java
deleted file mode 100644
index 170086c..0000000
--- a/src/test/java/com/zdjizhi/EncryptorTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.zdjizhi;
-
-import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
-import org.junit.Test;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2022/3/1610:55
- */
-public class EncryptorTest {
-
-
- @Test
- public void passwordTest(){
- StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
- // 配置加密解密的密码/salt值
- encryptor.setPassword("galaxy");
- // 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p
- String pin = "galaxy2019";
- String encPin = encryptor.encrypt(pin);
- String user = "admin";
- String encUser = encryptor.encrypt(user);
- System.out.println(encPin);
- System.out.println(encUser);
- // 再进行解密:raw_password
- String rawPwd = encryptor.decrypt("6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ");
- String rawUser = encryptor.decrypt("nsyGpHKGFA4KW0zro9MDdw==");
-
- System.out.println("The username is: "+rawPwd);
- System.out.println("The pin is: "+rawUser);
- }
-
-}
diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java
deleted file mode 100644
index 2dd5837..0000000
--- a/src/test/java/com/zdjizhi/FunctionTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.zdjizhi;
-
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.IpLookupV2;
-import com.zdjizhi.utils.general.CityHash;
-import org.junit.Test;
-
-import java.math.BigInteger;
-import java.util.Calendar;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2021/11/611:38
- */
-public class FunctionTest {
-
- private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
- .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb")
- .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
- .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
- .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
- .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
- .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
- .build();
-
- @Test
- public void CityHashTest() {
-
- byte[] dataBytes = String.valueOf(613970406986188816L).getBytes();
- long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length);
- String decimalValue = Long.toUnsignedString(hashValue, 10);
- BigInteger result = new BigInteger(decimalValue);
- System.out.println(result);
- }
-
- @Test
- public void ipLookupTest() {
- String ip = "61.144.36.144";
- System.out.println(ipLookup.cityLookupDetail(ip));
- System.out.println(ipLookup.countryLookup(ip));
- }
-
- @Test
- public void timestampTest(){
- Calendar cal = Calendar.getInstance();
- Long utcTime=cal.getTimeInMillis();
- System.out.println(utcTime);
- System.out.println(System.currentTimeMillis());
- }
-}
diff --git a/src/test/java/com/zdjizhi/HBaseTest.java b/src/test/java/com/zdjizhi/HBaseTest.java
deleted file mode 100644
index 5f94e32..0000000
--- a/src/test/java/com/zdjizhi/HBaseTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.zdjizhi;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2021/12/310:42
- */
-public class HBaseTest {
-
- @Test
- public void getColumn() {
- // 管理Hbase的配置信息
- Configuration configuration = HBaseConfiguration.create();
- // 设置zookeeper节点
- configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181");
- configuration.set("hbase.client.retries.number", "3");
- configuration.set("hbase.bulkload.retries.number", "3");
- configuration.set("zookeeper.recovery.retry", "3");
- try {
- Connection connection = ConnectionFactory.createConnection(configuration);
- Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account"));
- Scan scan2 = new Scan();
- ResultScanner scanner = table.getScanner(scan2);
- for (Result result : scanner) {
- int acctStatusType;
- boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
- if (hasType) {
- acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
- } else {
- acctStatusType = 3;
- }
- String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
- String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
- System.out.println("status" + acctStatusType + "key:" + framedIp + "value:" + account);
-// System.out.println(Arrays.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))));
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-}
diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java
deleted file mode 100644
index cd7ada3..0000000
--- a/src/test/java/com/zdjizhi/json/JsonPathTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.zdjizhi.json;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.listener.Listener;
-import com.alibaba.nacos.api.exception.NacosException;
-import com.jayway.jsonpath.DocumentContext;
-import com.jayway.jsonpath.JsonPath;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.StringUtil;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.json
- * @Description:
- * @date 2022/3/2410:22
- */
-public class JsonPathTest {
- private static final Log logger = LogFactory.get();
-
- private static Properties propNacos = new Properties();
-
- /**
- * 获取需要删除字段的列表
- */
- private static ArrayList<String> dropList = new ArrayList<>();
-
- /**
- * 在内存中加载反射类用的map
- */
- private static HashMap<String, Class> map;
-
- /**
- * 获取任务列表
- * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
- * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
- */
- private static ArrayList<String[]> jobList;
-
- private static String schema;
-
- static {
- propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
- propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE);
- propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
- propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
- try {
- ConfigService configService = NacosFactory.createConfigService(propNacos);
- String dataId = FlowWriteConfig.NACOS_DATA_ID;
- String group = FlowWriteConfig.NACOS_GROUP;
- String config = configService.getConfig(dataId, group, 5000);
- if (StringUtil.isNotBlank(config)) {
- schema = config;
- }
- } catch (NacosException e) {
- logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
- }
- }
-
- @Test
- public void parseSchemaGetFields() {
- DocumentContext parse = JsonPath.parse(schema);
- List<Object> fields = parse.read("$.fields[*]");
- for (Object field : fields) {
- String name = JsonPath.read(field, "$.name").toString();
- String type = JsonPath.read(field, "$.type").toString();
- }
- }
-}
diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java
deleted file mode 100644
index 52b99e5..0000000
--- a/src/test/java/com/zdjizhi/nacos/NacosTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package com.zdjizhi.nacos;
-
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.listener.Listener;
-import com.alibaba.nacos.api.exception.NacosException;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2022/3/1016:58
- */
-public class NacosTest {
-
- /**
- * <dependency>
- * <groupId>com.alibaba.nacos</groupId>
- * <artifactId>nacos-client</artifactId>
- * <version>1.2.0</version>
- * </dependency>
- */
-
- private static Properties properties = new Properties();
- /**
- * config data id = config name
- */
- private static final String DATA_ID = "test";
- /**
- * config group
- */
- private static final String GROUP = "Galaxy";
-
- private void getProperties() {
- properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
- properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
- properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
- properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
- }
-
-
- @Test
- public void GetConfigurationTest() {
- try {
- getProperties();
- ConfigService configService = NacosFactory.createConfigService(properties);
- String content = configService.getConfig(DATA_ID, GROUP, 5000);
- Properties nacosConfigMap = new Properties();
- nacosConfigMap.load(new StringReader(content));
- System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
- } catch (NacosException | IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
-
- @Test
- public void ListenerConfigurationTest() {
- getProperties();
- try {
- //first get config
- ConfigService configService = NacosFactory.createConfigService(properties);
- String config = configService.getConfig(DATA_ID, GROUP, 5000);
- System.out.println(config);
-
- //start listenner
- configService.addListener(DATA_ID, GROUP, new Listener() {
- @Override
- public Executor getExecutor() {
- return null;
- }
-
- @Override
- public void receiveConfigInfo(String configMsg) {
- System.out.println(configMsg);
- }
- });
- } catch (NacosException e) {
- e.printStackTrace();
- }
-
- //keep running,change nacos config,print new config
- while (true) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-}
diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListener.java b/src/test/java/com/zdjizhi/nacos/SchemaListener.java
deleted file mode 100644
index c81b809..0000000
--- a/src/test/java/com/zdjizhi/nacos/SchemaListener.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package com.zdjizhi.nacos;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.listener.Listener;
-import com.alibaba.nacos.api.exception.NacosException;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.StringUtil;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.nacos
- * @Description:
- * @date 2022/3/1714:57
- */
-public class SchemaListener {
-
- private static Properties properties = new Properties();
- private static ArrayList<String[]> jobList;
-
-
- static {
- properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
- properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
- properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
- properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
-
- try {
- ConfigService configService = NacosFactory.createConfigService(properties);
- String dataId = "session_record.json";
- String group = "Galaxy";
- jobList = getJobListFromHttp(configService.getConfig(dataId, group, 5000));
- configService.addListener(dataId, group, new Listener() {
- @Override
- public Executor getExecutor() {
- return null;
- }
-
- @Override
- public void receiveConfigInfo(String configMsg) {
- jobList = getJobListFromHttp(configMsg);
- }
- });
- } catch (NacosException e) {
- e.printStackTrace();
- }
- }
-
-
- @Test
- public void dealCommonMessage() {
- //keep running,change nacos config,print new config
- while (true) {
- try {
- System.out.println(Arrays.toString(jobList.get(0)));
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
- *
- * @return 任务列表
- */
- private static ArrayList<String[]> getJobListFromHttp(String schema) {
- ArrayList<String[]> list = new ArrayList<>();
-
- //获取fields,并转化为数组,数组的每个元素都是一个name doc type
- JSONObject schemaJson = JSON.parseObject(schema);
- JSONArray fields = (JSONArray) schemaJson.get("fields");
-
- for (Object field : fields) {
-
- if (JSON.parseObject(field.toString()).containsKey("doc")) {
- Object doc = JSON.parseObject(field.toString()).get("doc");
-
- if (JSON.parseObject(doc.toString()).containsKey("format")) {
- String name = JSON.parseObject(field.toString()).get("name").toString();
- Object format = JSON.parseObject(doc.toString()).get("format");
- JSONObject formatObject = JSON.parseObject(format.toString());
-
- String functions = formatObject.get("functions").toString();
- String appendTo = null;
- String params = null;
-
- if (formatObject.containsKey("appendTo")) {
- appendTo = formatObject.get("appendTo").toString();
- }
-
- if (formatObject.containsKey("param")) {
- params = formatObject.get("param").toString();
- }
-
-
- if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) {
- String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
- String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
-
- for (int i = 0; i < functionArray.length; i++) {
- list.add(new String[]{name, appendToArray[i], functionArray[i], null});
- }
-
- } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) {
- String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
- String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
- String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER);
-
- for (int i = 0; i < functionArray.length; i++) {
- list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]});
-
- }
- } else {
- list.add(new String[]{name, name, functions, params});
- }
-
- }
- }
-
- }
- return list;
- }
-
-}