summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-10-29 20:42:50 +0800
committerdoufenghu <[email protected]>2024-10-29 20:42:50 +0800
commitd2579028fb90bd60ca9e5f9fa36cbde8a6db8872 (patch)
tree062db25f5d5740cc76a6a66edc2ef3484b624614 /groot-bootstrap
parent06975ee829f9395f095a12c10eaedffcd89b3d83 (diff)
[Improve][core] Add CheckUDFContextUtil for verifying UDF configurations. Rename lookup_fields and output_fields to lookupFields and outputFields.
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java14
2 files changed, 4 insertions, 12 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index b45d643..f5b1a5d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -14,7 +14,7 @@ import java.net.URLClassLoader;
import java.util.*;
import java.util.function.BiConsumer;
-public abstract class AbstractExecutor<K, V>
+public abstract class AbstractExecutor<K, V>
implements Executor<DataStream<Event>, JobRuntimeEnvironment> {
protected JobRuntimeEnvironment jobRuntimeEnvironment;
protected final Config operatorConfig;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index 1719059..42a3a11 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -1,22 +1,14 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.pojo.AggregateConfig;
import com.geedgenetworks.core.pojo.ProcessorConfig;
-import com.geedgenetworks.core.pojo.ProjectionConfig;
-import com.geedgenetworks.core.pojo.TableConfig;
import com.geedgenetworks.core.processor.Processor;
-import com.geedgenetworks.core.processor.table.TableProcessor;
-import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
import java.util.List;
@@ -59,7 +51,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
protected ProcessorConfig checkConfig(String key, Map<String, Object> value, Config processorsConfig) {
- ProcessorConfig projectionConfig = new ProcessorConfig();
+ ProcessorConfig ProcessorConfig = new ProcessorConfig();
boolean found = false; // 标志变量
CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key),
ProjectionConfigOptions.TYPE.key());
@@ -73,7 +65,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
if(processor.type().equals(value.getOrDefault("type", "").toString())){
found = true;
try {
- projectionConfig = processor.checkConfig(key, value, processorsConfig);
+ ProcessorConfig = processor.checkConfig(key, value, processorsConfig);
} catch (Exception e) {
throw new JobExecuteException("Create orderby pipeline instance failed!", e);
@@ -84,7 +76,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
if (!found) {
throw new JobExecuteException("No matching processor found for type: " + value.getOrDefault("type", "").toString());
}
- return projectionConfig;
+ return ProcessorConfig;
}