diff options
| author | qidaijie <[email protected]> | 2022-04-01 14:22:44 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2022-04-01 14:22:44 +0800 |
| commit | fdaa5822295b02df66bfa29ef9bc00c713d481eb (patch) | |
| tree | 7573bc5285733d92fa98211310f6ee9106b18b18 | |
| parent | 5ab76e4335136324df56b4426320c40298d8de8a (diff) | |
提交Nacos动态获取schema功能。(GAL-144)
| -rw-r--r-- | pom.xml | 3 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 19 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/FlowWriteConfig.java | 21 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/NacosConfig.java | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java | 48 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java (renamed from src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java) | 15 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java | 4 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/json/JsonPathTest.java | 79 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/nacos/NacosTest.java | 58 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/nacos/SchemaListener.java | 28 |
11 files changed, 197 insertions, 83 deletions
@@ -38,6 +38,7 @@ <kafka.version>1.0.0</kafka.version> <hbase.version>2.2.3</hbase.version> <nacos.version>1.2.0</nacos.version> + <zdjz.tools.version>1.0.8</zdjz.tools.version> <scope.type>provided</scope.type> <!--<scope.type>compile</scope.type>--> </properties> @@ -116,7 +117,7 @@ <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> - <version>1.0.8</version> + <version>${zdjz.tools.version}</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 7a8f907..d548d24 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,3 +1,20 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.44.12:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.44.12:9094 + +#zookeeper 地址 用于配置log_id +zookeeper.servers=192.168.44.12:2181 + +#hbase zookeeper地址 用于连接HBase +hbase.zookeeper.servers=192.168.44.12:2181 + +#--------------------------------HTTP/定位库------------------------------# +#定位库地址 +tools.library=D:\\workerspace\\dat\\ + #--------------------------------nacos配置------------------------------# #nacos 地址 nacos.server=192.168.44.12:8848 @@ -6,7 +23,7 @@ nacos.server=192.168.44.12:8848 nacos.schema.namespace=test #nacos topology_common_config.properties namespace -nacos.common.namespace=flink +nacos.common.namespace=test #nacos data id nacos.data.id=session_record.json diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index fc0c194..bab2a29 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -66,14 +66,12 @@ public class FlowWriteConfig { public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name"); - /** * kafka common */ public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user")); public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin")); - /** * kafka source config */ @@ -83,7 +81,6 @@ public class FlowWriteConfig { public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records"); public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); - /** * kafka sink config */ @@ -110,11 +107,17 @@ public class FlowWriteConfig { /** * common config */ - public static final String SOURCE_KAFKA_SERVERS = NacosConfig.getStringProperty("source.kafka.servers"); - public static final String SINK_KAFKA_SERVERS = NacosConfig.getStringProperty("etl.sink.kafka.servers"); - public static final String ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("zookeeper.servers"); - public static final String TOOLS_LIBRARY = NacosConfig.getStringProperty("tools.library"); - public static final String HBASE_ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("hbase.zookeeper.servers"); - + /** + * public static final String SOURCE_KAFKA_SERVERS = NacosConfig.getStringProperty("source.kafka.servers"); + * public static final String SINK_KAFKA_SERVERS = NacosConfig.getStringProperty("sink.kafka.servers"); + * public static final String ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("zookeeper.servers"); + * public static final String TOOLS_LIBRARY = NacosConfig.getStringProperty("tools.library"); + * public static final String HBASE_ZOOKEEPER_SERVERS = NacosConfig.getStringProperty("hbase.zookeeper.servers"); + */ + public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers"); + public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers"); + public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers"); + public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library"); + public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers"); }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/NacosConfig.java b/src/main/java/com/zdjizhi/common/NacosConfig.java index f71732f..08bb92a 100644 --- a/src/main/java/com/zdjizhi/common/NacosConfig.java +++ b/src/main/java/com/zdjizhi/common/NacosConfig.java @@ -19,6 +19,7 @@ import java.util.Properties; * @Description: * @date 2022/3/189:36 */ +@Deprecated public class NacosConfig { private static final Log logger = LogFactory.get(); private static Properties propCommon = new Properties(); @@ -48,7 +49,7 @@ public class NacosConfig { propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); ConfigService configService = NacosFactory.createConfigService(propNacos); - String commonConfig = configService.getConfig("topology_common_config.properties", FlowWriteConfig.NACOS_GROUP, 5000); + String commonConfig = configService.getConfig("etl_connection_config.properties", FlowWriteConfig.NACOS_GROUP, 5000); if (StringUtil.isNotBlank(commonConfig)) { propCommon.load(new StringReader(commonConfig)); } diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index c98687b..801e07a 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -27,6 +27,8 @@ public class LogFlowWriteTopology { public static void main(String[] args) { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + environment.enableCheckpointing(180 * 1000); + //两个输出之间的最大时间 (单位milliseconds) environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index d69e864..f477848 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -12,16 +12,12 @@ import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.common.NacosConfig; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.system.FlowWriteConfigurations; -import net.sf.cglib.beans.BeanGenerator; import net.sf.cglib.beans.BeanMap; import java.util.*; import java.util.concurrent.Executor; -import static com.zdjizhi.utils.json.JsonTypeUtils.*; /** * 使用FastJson解析json的工具类 @@ -36,10 +32,12 @@ public class JsonParseUtil { * 获取需要删除字段的列表 */ private static ArrayList<String> dropList = new ArrayList<>(); + /** * 在内存中加载反射类用的map */ - private static HashMap<String, Class> map; + private static HashMap<String, Class> jsonFieldsMap; + /** * 获取任务列表 * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: @@ -58,8 +56,8 @@ public class JsonParseUtil { String group = FlowWriteConfig.NACOS_GROUP; String schema = configService.getConfig(dataId, group, 5000); if (StringUtil.isNotBlank(schema)) { + jsonFieldsMap = getMapFromHttp(schema); jobList = getJobListFromHttp(schema); - map = getMapFromHttp(schema); } configService.addListener(dataId, group, new Listener() { @Override @@ -70,7 +68,8 @@ public class JsonParseUtil { @Override public void receiveConfigInfo(String configMsg) { if (StringUtil.isNotBlank(configMsg)) { - map = getMapFromHttp(configMsg); + clearCache(); + jsonFieldsMap = getMapFromHttp(configMsg); jobList = getJobListFromHttp(configMsg); } } @@ -200,29 +199,29 @@ public class JsonParseUtil { JsonParseUtil.dropJsonField(jsonMap); HashMap<String, Object> tmpMap = new HashMap<>(192); for (String key : jsonMap.keySet()) { - if (map.containsKey(key)) { - String simpleName = map.get(key).getSimpleName(); + if (jsonFieldsMap.containsKey(key)) { + String simpleName = jsonFieldsMap.get(key).getSimpleName(); switch (simpleName) { case "String": - tmpMap.put(key, checkString(jsonMap.get(key))); + tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key))); break; case "Integer": - tmpMap.put(key, getIntValue(jsonMap.get(key))); + tmpMap.put(key, JsonTypeUtil.getIntValue(jsonMap.get(key))); break; case "long": - tmpMap.put(key, checkLongValue(jsonMap.get(key))); + tmpMap.put(key, JsonTypeUtil.checkLongValue(jsonMap.get(key))); break; case "List": - tmpMap.put(key, checkArray(jsonMap.get(key))); + tmpMap.put(key, JsonTypeUtil.checkArray(jsonMap.get(key))); break; case "Map": - tmpMap.put(key, checkObject(jsonMap.get(key))); + tmpMap.put(key, JsonTypeUtil.checkObject(jsonMap.get(key))); break; case "double": - tmpMap.put(key, checkDouble(jsonMap.get(key))); + tmpMap.put(key, JsonTypeUtil.checkDouble(jsonMap.get(key))); break; default: - tmpMap.put(key, checkString(jsonMap.get(key))); + tmpMap.put(key, JsonTypeUtil.checkString(jsonMap.get(key))); } } } @@ -241,7 +240,7 @@ public class JsonParseUtil { * * @return 用于反射生成schema类型的对象的一个map集合 */ - public static HashMap<String, Class> getMapFromHttp(String schema) { + private static HashMap<String, Class> getMapFromHttp(String schema) { HashMap<String, Class> map = new HashMap<>(16); //获取fields,并转化为数组,数组的每个元素都是一个name doc type @@ -298,12 +297,12 @@ public class JsonParseUtil { } /** - * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) + * 解析schema,解析之后返回一个任务列表 (useList toList funcList paramlist) * - * @param schema 网关url + * @param schema 日志schema * @return 任务列表 */ - public static ArrayList<String[]> getJobListFromHttp(String schema) { + private static ArrayList<String[]> getJobListFromHttp(String schema) { ArrayList<String[]> list = new ArrayList<>(); //获取fields,并转化为数组,数组的每个元素都是一个name doc type @@ -361,4 +360,13 @@ public class JsonParseUtil { return list; } + /** + * 在配置变动时,清空缓存重新获取 + */ + private static void clearCache() { + jobList.clear(); + jsonFieldsMap.clear(); + dropList.clear(); + } + }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java index 63af9d5..0cf16ff 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java @@ -1,24 +1,11 @@ package com.zdjizhi.utils.json; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.exception.FlowWriteException; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Executor; -import static com.zdjizhi.utils.json.JsonParseUtil.getJobListFromHttp; -import static com.zdjizhi.utils.json.JsonParseUtil.getMapFromHttp; /** * @author qidaijie @@ -26,7 +13,7 @@ import static com.zdjizhi.utils.json.JsonParseUtil.getMapFromHttp; * @Description: * @date 2021/7/1217:34 */ -public class JsonTypeUtils { +public class JsonTypeUtil { /** * String 类型检验转换方法 * diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index f3d979b..f935689 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -40,6 +40,7 @@ public class KafkaConsumer { * * @return kafka logs -> map */ + @SuppressWarnings("unchecked") public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer() { FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC, new TimestampDeserializationSchema(), createConsumerConfig()); @@ -62,7 +63,10 @@ public class KafkaConsumer { FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC, new SimpleStringSchema(), createConsumerConfig()); + //随着checkpoint提交,将offset提交到kafka kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + + //从消费组当前的offset开始消费 kafkaConsumer.setStartFromGroupOffsets(); return kafkaConsumer; diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java new file mode 100644 index 0000000..cd7ada3 --- /dev/null +++ b/src/test/java/com/zdjizhi/json/JsonPathTest.java @@ -0,0 +1,79 @@ +package com.zdjizhi.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * @author qidaijie + * @Package com.zdjizhi.json + * @Description: + * @date 2022/3/2410:22 + */ +public class JsonPathTest { + private static final Log logger = LogFactory.get(); + + private static Properties propNacos = new Properties(); + + /** + * 获取需要删除字段的列表 + */ + private static ArrayList<String> dropList = new ArrayList<>(); + + /** + * 在内存中加载反射类用的map + */ + private static HashMap<String, Class> map; + + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList<String[]> jobList; + + private static String schema; + + static { + propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); + propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE); + propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); + propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); + try { + ConfigService configService = NacosFactory.createConfigService(propNacos); + String dataId = FlowWriteConfig.NACOS_DATA_ID; + String group = FlowWriteConfig.NACOS_GROUP; + String config = configService.getConfig(dataId, group, 5000); + if (StringUtil.isNotBlank(config)) { + schema = config; + } + } catch (NacosException e) { + logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); + } + } + + @Test + public void parseSchemaGetFields() { + DocumentContext parse = JsonPath.parse(schema); + List<Object> fields = parse.read("$.fields[*]"); + for (Object field : fields) { + String name = JsonPath.read(field, "$.name").toString(); + String type = JsonPath.read(field, "$.type").toString(); + } + } +} diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java index c2b6267..52b99e5 100644 --- a/src/test/java/com/zdjizhi/nacos/NacosTest.java +++ b/src/test/java/com/zdjizhi/nacos/NacosTest.java @@ -5,8 +5,6 @@ import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.system.FlowWriteConfigurations; import org.junit.Test; import java.io.IOException; @@ -22,10 +20,26 @@ import java.util.concurrent.Executor; * @date 2022/3/1016:58 */ public class NacosTest { + + /** + * <dependency> + * <groupId>com.alibaba.nacos</groupId> + * <artifactId>nacos-client</artifactId> + * <version>1.2.0</version> + * </dependency> + */ + private static Properties properties = new Properties(); + /** + * config data id = config name + */ + private static final String DATA_ID = "test"; + /** + * config group + */ + private static final String GROUP = "Galaxy"; - @Test - public void getProperties() { + private void getProperties() { properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); properties.setProperty(PropertyKeyConst.NAMESPACE, "flink"); properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); @@ -38,11 +52,10 @@ public class NacosTest { try { getProperties(); ConfigService configService = NacosFactory.createConfigService(properties); - String content = configService.getConfig("topology_common_config.properties", "Galaxy", 5000); + String content = configService.getConfig(DATA_ID, GROUP, 5000); Properties nacosConfigMap = new Properties(); nacosConfigMap.load(new StringReader(content)); System.out.println(nacosConfigMap.getProperty("source.kafka.servers")); - System.out.println(nacosConfigMap.getProperty("schema.http")); } catch (NacosException | IOException e) { // TODO Auto-generated catch block e.printStackTrace(); @@ -53,16 +66,14 @@ public class NacosTest { @Test public void ListenerConfigurationTest() { getProperties(); - ConfigService configService = null; try { - configService = NacosFactory.createConfigService(properties); - String content = configService.getConfig("ETL-SESSION-RECORD-COMPLETED", "etl", 5000); - - Properties nacosConfigMap = new Properties(); - nacosConfigMap.load(new StringReader(content)); - System.out.println(nacosConfigMap.getProperty("source.kafka.servers")); + //first get config + ConfigService configService = NacosFactory.createConfigService(properties); + String config = configService.getConfig(DATA_ID, GROUP, 5000); + System.out.println(config); - configService.addListener("ETL-SESSION-RECORD-COMPLETED", "etl", new Listener() { + //start listenner + configService.addListener(DATA_ID, GROUP, new Listener() { @Override public Executor getExecutor() { return null; @@ -70,17 +81,20 @@ public class NacosTest { @Override public void receiveConfigInfo(String configMsg) { - try { - Properties nacosConfigMap = new Properties(); - nacosConfigMap.load(new StringReader(configMsg)); - System.out.println(nacosConfigMap.getProperty("source.kafka.servers")); - } catch (IOException e) { - e.printStackTrace(); - } + System.out.println(configMsg); } }); - } catch (NacosException | IOException e) { + } catch (NacosException e) { e.printStackTrace(); } + + //keep running,change nacos config,print new config + while (true) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } } diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListener.java b/src/test/java/com/zdjizhi/nacos/SchemaListener.java index b64d9eb..c81b809 100644 --- a/src/test/java/com/zdjizhi/nacos/SchemaListener.java +++ b/src/test/java/com/zdjizhi/nacos/SchemaListener.java @@ -9,16 +9,11 @@ import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.json.JsonParseUtil; -import com.zdjizhi.utils.json.JsonTypeUtils; +import org.junit.Test; -import java.io.IOException; -import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; -import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; @@ -62,11 +57,17 @@ public class SchemaListener { } - @SuppressWarnings("unchecked") - public static void dealCommonMessage() { - - System.out.println(Arrays.toString(jobList.get(0))); - + @Test + public void dealCommonMessage() { + //keep running,change nacos config,print new config + while (true) { + try { + System.out.println(Arrays.toString(jobList.get(0))); + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } /** @@ -74,12 +75,9 @@ public class SchemaListener { * * @return 任务列表 */ - public static ArrayList<String[]> getJobListFromHttp(String schema) { + private static ArrayList<String[]> getJobListFromHttp(String schema) { ArrayList<String[]> list = new ArrayList<>(); - //解析data -// Object data = JSON.parseObject(schema).get("data"); - //获取fields,并转化为数组,数组的每个元素都是一个name doc type JSONObject schemaJson = JSON.parseObject(schema); JSONArray fields = (JSONArray) schemaJson.get("fields"); |
