summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/FileChunkCombiner.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi/FileChunkCombiner.java')
-rw-r--r--src/main/java/com/zdjizhi/FileChunkCombiner.java9
1 files changed, 5 insertions, 4 deletions
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java
index 6955039..e56c018 100644
--- a/src/main/java/com/zdjizhi/FileChunkCombiner.java
+++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java
@@ -1,6 +1,6 @@
package com.zdjizhi;
-import cn.hutool.core.util.StrUtil;
+import cn.hutool.core.text.CharSequenceUtil;
import com.zdjizhi.config.Configs;
import com.zdjizhi.function.*;
import com.zdjizhi.function.map.ParseMessagePackMapFunction;
@@ -42,7 +42,7 @@ public class FileChunkCombiner {
List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
triggers.add(ProcessingTimeTrigger.create());
- if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) {
+ if (Boolean.TRUE.equals(configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER))) {
triggers.add(LastChunkTrigger.create());
}
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
@@ -58,8 +58,9 @@ public class FileChunkCombiner {
SingleOutputStreamOperator<FileChunk> fileMetaProxySingleOutputStreamOperator;
for (String sinkType : configuration.get(Configs.SINK_TYPE).split(",")) {
switch (sinkType) {
+ default:
case "hos":
- if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
+ if (CharSequenceUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
windowStream
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hos"))
.name("Filter: Hos")
@@ -75,7 +76,7 @@ public class FileChunkCombiner {
}
break;
case "hbase":
- if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
+ if (CharSequenceUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
windowStream
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hbase"))
.name("Filter: HBase")