summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-04-01 14:22:44 +0800
committerqidaijie <[email protected]>2022-04-01 14:22:44 +0800
commitfdaa5822295b02df66bfa29ef9bc00c713d481eb (patch)
tree7573bc5285733d92fa98211310f6ee9106b18b18
parent5ab76e4335136324df56b4426320c40298d8de8a (diff)
提交Nacos动态获取schema功能。(GAL-144)
-rw-r--r--pom.xml3
-rw-r--r--properties/service_flow_config.properties19
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java21
-rw-r--r--src/main/java/com/zdjizhi/common/NacosConfig.java3
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java2
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java48
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java (renamed from src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java)15
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java4
-rw-r--r--src/test/java/com/zdjizhi/json/JsonPathTest.java79
-rw-r--r--src/test/java/com/zdjizhi/nacos/NacosTest.java58
-rw-r--r--src/test/java/com/zdjizhi/nacos/SchemaListener.java28
11 files changed, 197 insertions, 83 deletions
diff --git a/pom.xml b/pom.xml
index c0c2faa..773429f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,6 +38,7 @@
<kafka.version>1.0.0</kafka.version>
<hbase.version>2.2.3</hbase.version>
<nacos.version>1.2.0</nacos.version>
+ <zdjz.tools.version>1.0.8</zdjz.tools.version>
<scope.type>provided</scope.type>
<!--<scope.type>compile</scope.type>-->
</properties>
@@ -116,7 +117,7 @@
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
- <version>1.0.8</version>
+ <version>${zdjz.tools.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 7a8f907..d548d24 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,3 +1,20 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.44.12:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.44.12:9094
+
+#zookeeper 地址 用于配置log_id
+zookeeper.servers=192.168.44.12:2181
+
+#hbase zookeeper地址 用于连接HBase
+hbase.zookeeper.servers=192.168.44.12:2181
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库地址
+tools.library=D:\\workerspace\\dat\\
+
#--------------------------------nacos配置------------------------------#
#nacos 地址
nacos.server=192.168.44.12:8848
@@ -6,7 +23,7 @@ nacos.server=192.168.44.12:8848
nacos.schema.namespace=test
#nacos topology_common_config.properties namespace
-nacos.common.namespace=flink
+nacos.common.namespace=test
#nacos data id
nacos.data.id=session_record.json
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index fc0c194..bab2a29 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -66,14 +66,12 @@ public class FlowWriteConfig {
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
-
/**
* kafka common
*/
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin"));
-
/**
* kafka source config
*/
@@ -83,7 +81,6 @@ public class FlowWriteConfig {
public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
-
/**
* kafka sink config
*/
@@ -110,11 +107,17 @@ public class FlowWriteConfig {
/**
* common config
*/
- public static final String SOURCE_KAFKA_SERVERS = NacosConfig.getStringProperty("source.kafka.servers");
- public static final String SINK_KAFKA_SERVERS = NacosConfig.getStringProperty("etl.sink.kafka.servers");
- public static final String ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("zookeeper.servers");
- public static final String TOOLS_LIBRARY = NacosConfig.getStringProperty("tools.library");
- public static final String HBASE_ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("hbase.zookeeper.servers");
-
+ /**
+ * public static final String SOURCE_KAFKA_SERVERS = NacosConfig.getStringProperty("source.kafka.servers");
+ * public static final String SINK_KAFKA_SERVERS = NacosConfig.getStringProperty("sink.kafka.servers");
+ * public static final String ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("zookeeper.servers");
+ * public static final String TOOLS_LIBRARY = NacosConfig.getStringProperty("tools.library");
+ * public static final String HBASE_ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("hbase.zookeeper.servers");
+ */
+ public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers");
+ public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers");
+ public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers");
+ public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library");
+ public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers");
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/NacosConfig.java b/src/main/java/com/zdjizhi/common/NacosConfig.java
index f71732f..08bb92a 100644
--- a/src/main/java/com/zdjizhi/common/NacosConfig.java
+++ b/src/main/java/com/zdjizhi/common/NacosConfig.java
@@ -19,6 +19,7 @@ import java.util.Properties;
* @Description:
* @date 2022/3/189:36
*/
+@Deprecated
public class NacosConfig {
private static final Log logger = LogFactory.get();
private static Properties propCommon = new Properties();
@@ -48,7 +49,7 @@ public class NacosConfig {
propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
ConfigService configService = NacosFactory.createConfigService(propNacos);
- String commonConfig = configService.getConfig("topology_common_config.properties", FlowWriteConfig.NACOS_GROUP, 5000);
+ String commonConfig = configService.getConfig("etl_connection_config.properties", FlowWriteConfig.NACOS_GROUP, 5000);
if (StringUtil.isNotBlank(commonConfig)) {
propCommon.load(new StringReader(commonConfig));
}
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index c98687b..801e07a 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -27,6 +27,8 @@ public class LogFlowWriteTopology {
public static void main(String[] args) {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+ environment.enableCheckpointing(180 * 1000);
+
//两个输出之间的最大时间 (单位milliseconds)
environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
index d69e864..f477848 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -12,16 +12,12 @@ import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.common.NacosConfig;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.system.FlowWriteConfigurations;
-import net.sf.cglib.beans.BeanGenerator;
import net.sf.cglib.beans.BeanMap;
import java.util.*;
import java.util.concurrent.Executor;
-import static com.zdjizhi.utils.json.JsonTypeUtils.*;
/**
* 使用FastJson解析json的工具类
@@ -36,10 +32,12 @@ public class JsonParseUtil {
* 获取需要删除字段的列表
*/
private static ArrayList<String> dropList = new ArrayList<>();
+
/**
* 在内存中加载反射类用的map
*/
- private static HashMap<String, Class> map;
+ private static HashMap<String, Class> jsonFieldsMap;
+
/**
* 获取任务列表
* list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
@@ -58,8 +56,8 @@ public class JsonParseUtil {
String group = FlowWriteConfig.NACOS_GROUP;
String schema = configService.getConfig(dataId, group, 5000);
if (StringUtil.isNotBlank(schema)) {
+ jsonFieldsMap = getMapFromHttp(schema);
jobList = getJobListFromHttp(schema);
- map = getMapFromHttp(schema);
}
configService.addListener(dataId, group, new Listener() {
@Override
@@ -70,7 +68,8 @@ public class JsonParseUtil {
@Override
public void receiveConfigInfo(String configMsg) {
if (StringUtil.isNotBlank(configMsg)) {
- map = getMapFromHttp(configMsg);
+ clearCache();
+ jsonFieldsMap = getMapFromHttp(configMsg);
jobList = getJobListFromHttp(configMsg);
}
}
@@ -200,29 +199,29 @@ public class JsonParseUtil {
JsonParseUtil.dropJsonField(jsonMap);
HashMap<String, Object> tmpMap = new HashMap<>(192);
for (String key : jsonMap.keySet()) {
- if (map.containsKey(key)) {
- String simpleName = map.get(key).getSimpleName();
+ if (jsonFieldsMap.containsKey(key)) {
+ String simpleName = jsonFieldsMap.get(key).getSimpleName();
switch (simpleName) {
case "String":
- tmpMap.put(key, checkString(jsonMap.get(key)));
+ tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
break;
case "Integer":
- tmpMap.put(key, getIntValue(jsonMap.get(key)));
+ tmpMap.put(key, JsonTypeUtil.getIntValue(jsonMap.get(key)));
break;
case "long":
- tmpMap.put(key, checkLongValue(jsonMap.get(key)));
+ tmpMap.put(key, JsonTypeUtil.checkLongValue(jsonMap.get(key)));
break;
case "List":
- tmpMap.put(key, checkArray(jsonMap.get(key)));
+ tmpMap.put(key, JsonTypeUtil.checkArray(jsonMap.get(key)));
break;
case "Map":
- tmpMap.put(key, checkObject(jsonMap.get(key)));
+ tmpMap.put(key, JsonTypeUtil.checkObject(jsonMap.get(key)));
break;
case "double":
- tmpMap.put(key, checkDouble(jsonMap.get(key)));
+ tmpMap.put(key, JsonTypeUtil.checkDouble(jsonMap.get(key)));
break;
default:
- tmpMap.put(key, checkString(jsonMap.get(key)));
+ tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key)));
}
}
}
@@ -241,7 +240,7 @@ public class JsonParseUtil {
*
* @return 用于反射生成schema类型的对象的一个map集合
*/
- public static HashMap<String, Class> getMapFromHttp(String schema) {
+ private static HashMap<String, Class> getMapFromHttp(String schema) {
HashMap<String, Class> map = new HashMap<>(16);
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
@@ -298,12 +297,12 @@ public class JsonParseUtil {
}
/**
- * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
+ * 解析schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
*
- * @param schema 网关url
+ * @param schema 日志schema
* @return 任务列表
*/
- public static ArrayList<String[]> getJobListFromHttp(String schema) {
+ private static ArrayList<String[]> getJobListFromHttp(String schema) {
ArrayList<String[]> list = new ArrayList<>();
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
@@ -361,4 +360,13 @@ public class JsonParseUtil {
return list;
}
+ /**
+ * 在配置变动时,清空缓存重新获取
+ */
+ private static void clearCache() {
+ jobList.clear();
+ jsonFieldsMap.clear();
+ dropList.clear();
+ }
+
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
index 63af9d5..0cf16ff 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
@@ -1,24 +1,11 @@
package com.zdjizhi.utils.json;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.listener.Listener;
-import com.alibaba.nacos.api.exception.NacosException;
-import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.exception.FlowWriteException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-import static com.zdjizhi.utils.json.JsonParseUtil.getJobListFromHttp;
-import static com.zdjizhi.utils.json.JsonParseUtil.getMapFromHttp;
/**
* @author qidaijie
@@ -26,7 +13,7 @@ import static com.zdjizhi.utils.json.JsonParseUtil.getMapFromHttp;
* @Description:
* @date 2021/7/1217:34
*/
-public class JsonTypeUtils {
+public class JsonTypeUtil {
/**
* String 类型检验转换方法
*
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index f3d979b..f935689 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -40,6 +40,7 @@ public class KafkaConsumer {
*
* @return kafka logs -> map
*/
+ @SuppressWarnings("unchecked")
public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer() {
FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
new TimestampDeserializationSchema(), createConsumerConfig());
@@ -62,7 +63,10 @@ public class KafkaConsumer {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
+ //随着checkpoint提交,将offset提交到kafka
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+
+ //从消费组当前的offset开始消费
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java
new file mode 100644
index 0000000..cd7ada3
--- /dev/null
+++ b/src/test/java/com/zdjizhi/json/JsonPathTest.java
@@ -0,0 +1,79 @@
+package com.zdjizhi.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.config.listener.Listener;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.json
+ * @Description:
+ * @date 2022/3/2410:22
+ */
+public class JsonPathTest {
+ private static final Log logger = LogFactory.get();
+
+ private static Properties propNacos = new Properties();
+
+ /**
+ * 获取需要删除字段的列表
+ */
+ private static ArrayList<String> dropList = new ArrayList<>();
+
+ /**
+ * 在内存中加载反射类用的map
+ */
+ private static HashMap<String, Class> map;
+
+ /**
+ * 获取任务列表
+ * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
+ * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
+ */
+ private static ArrayList<String[]> jobList;
+
+ private static String schema;
+
+ static {
+ propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
+ propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE);
+ propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
+ propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
+ try {
+ ConfigService configService = NacosFactory.createConfigService(propNacos);
+ String dataId = FlowWriteConfig.NACOS_DATA_ID;
+ String group = FlowWriteConfig.NACOS_GROUP;
+ String config = configService.getConfig(dataId, group, 5000);
+ if (StringUtil.isNotBlank(config)) {
+ schema = config;
+ }
+ } catch (NacosException e) {
+ logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
+ }
+ }
+
+ @Test
+ public void parseSchemaGetFields() {
+ DocumentContext parse = JsonPath.parse(schema);
+ List<Object> fields = parse.read("$.fields[*]");
+ for (Object field : fields) {
+ String name = JsonPath.read(field, "$.name").toString();
+ String type = JsonPath.read(field, "$.type").toString();
+ }
+ }
+}
diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java
index c2b6267..52b99e5 100644
--- a/src/test/java/com/zdjizhi/nacos/NacosTest.java
+++ b/src/test/java/com/zdjizhi/nacos/NacosTest.java
@@ -5,8 +5,6 @@ import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.system.FlowWriteConfigurations;
import org.junit.Test;
import java.io.IOException;
@@ -22,10 +20,26 @@ import java.util.concurrent.Executor;
* @date 2022/3/1016:58
*/
public class NacosTest {
+
+ /**
+ * <dependency>
+ * <groupId>com.alibaba.nacos</groupId>
+ * <artifactId>nacos-client</artifactId>
+ * <version>1.2.0</version>
+ * </dependency>
+ */
+
private static Properties properties = new Properties();
+ /**
+ * config data id = config name
+ */
+ private static final String DATA_ID = "test";
+ /**
+ * config group
+ */
+ private static final String GROUP = "Galaxy";
- @Test
- public void getProperties() {
+ private void getProperties() {
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
@@ -38,11 +52,10 @@ public class NacosTest {
try {
getProperties();
ConfigService configService = NacosFactory.createConfigService(properties);
- String content = configService.getConfig("topology_common_config.properties", "Galaxy", 5000);
+ String content = configService.getConfig(DATA_ID, GROUP, 5000);
Properties nacosConfigMap = new Properties();
nacosConfigMap.load(new StringReader(content));
System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
- System.out.println(nacosConfigMap.getProperty("schema.http"));
} catch (NacosException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -53,16 +66,14 @@ public class NacosTest {
@Test
public void ListenerConfigurationTest() {
getProperties();
- ConfigService configService = null;
try {
- configService = NacosFactory.createConfigService(properties);
- String content = configService.getConfig("ETL-SESSION-RECORD-COMPLETED", "etl", 5000);
-
- Properties nacosConfigMap = new Properties();
- nacosConfigMap.load(new StringReader(content));
- System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
+ //first get config
+ ConfigService configService = NacosFactory.createConfigService(properties);
+ String config = configService.getConfig(DATA_ID, GROUP, 5000);
+ System.out.println(config);
- configService.addListener("ETL-SESSION-RECORD-COMPLETED", "etl", new Listener() {
+ //start listenner
+ configService.addListener(DATA_ID, GROUP, new Listener() {
@Override
public Executor getExecutor() {
return null;
@@ -70,17 +81,20 @@ public class NacosTest {
@Override
public void receiveConfigInfo(String configMsg) {
- try {
- Properties nacosConfigMap = new Properties();
- nacosConfigMap.load(new StringReader(configMsg));
- System.out.println(nacosConfigMap.getProperty("source.kafka.servers"));
- } catch (IOException e) {
- e.printStackTrace();
- }
+ System.out.println(configMsg);
}
});
- } catch (NacosException | IOException e) {
+ } catch (NacosException e) {
e.printStackTrace();
}
+
+ //keep running,change nacos config,print new config
+ while (true) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
}
diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListener.java b/src/test/java/com/zdjizhi/nacos/SchemaListener.java
index b64d9eb..c81b809 100644
--- a/src/test/java/com/zdjizhi/nacos/SchemaListener.java
+++ b/src/test/java/com/zdjizhi/nacos/SchemaListener.java
@@ -9,16 +9,11 @@ import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.json.JsonParseUtil;
-import com.zdjizhi.utils.json.JsonTypeUtils;
+import org.junit.Test;
-import java.io.IOException;
-import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
@@ -62,11 +57,17 @@ public class SchemaListener {
}
- @SuppressWarnings("unchecked")
- public static void dealCommonMessage() {
-
- System.out.println(Arrays.toString(jobList.get(0)));
-
+ @Test
+ public void dealCommonMessage() {
+ //keep running,change nacos config,print new config
+ while (true) {
+ try {
+ System.out.println(Arrays.toString(jobList.get(0)));
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
/**
@@ -74,12 +75,9 @@ public class SchemaListener {
*
* @return 任务列表
*/
- public static ArrayList<String[]> getJobListFromHttp(String schema) {
+ private static ArrayList<String[]> getJobListFromHttp(String schema) {
ArrayList<String[]> list = new ArrayList<>();
- //解析data
-// Object data = JSON.parseObject(schema).get("data");
-
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
JSONObject schemaJson = JSON.parseObject(schema);
JSONArray fields = (JSONArray) schemaJson.get("fields");