diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/FileChunkCombiner.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/FileChunkCombiner.java | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java index 6955039..e56c018 100644 --- a/src/main/java/com/zdjizhi/FileChunkCombiner.java +++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java @@ -1,6 +1,6 @@ package com.zdjizhi; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.text.CharSequenceUtil; import com.zdjizhi.config.Configs; import com.zdjizhi.function.*; import com.zdjizhi.function.map.ParseMessagePackMapFunction; @@ -42,7 +42,7 @@ public class FileChunkCombiner { List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>(); triggers.add(ProcessingTimeTrigger.create()); - if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) { + if (Boolean.TRUE.equals(configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER))) { triggers.add(LastChunkTrigger.create()); } Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers); @@ -58,8 +58,9 @@ public class FileChunkCombiner { SingleOutputStreamOperator<FileChunk> fileMetaProxySingleOutputStreamOperator; for (String sinkType : configuration.get(Configs.SINK_TYPE).split(",")) { switch (sinkType) { + default: case "hos": - if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) { + if (CharSequenceUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) { windowStream .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hos")) .name("Filter: Hos") @@ -75,7 +76,7 @@ public class FileChunkCombiner { } break; case "hbase": - if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) { + if (CharSequenceUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) { windowStream .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hbase")) .name("Filter: HBase") |
