diff options
| author | doufenghu <[email protected]> | 2024-10-29 20:42:50 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-10-29 20:42:50 +0800 |
| commit | d2579028fb90bd60ca9e5f9fa36cbde8a6db8872 (patch) | |
| tree | 062db25f5d5740cc76a6a66edc2ef3484b624614 /groot-bootstrap | |
| parent | 06975ee829f9395f095a12c10eaedffcd89b3d83 (diff) | |
[Improve][core] Add CheckUDFContextUtil for verifying UDF configurations. Rename lookup_fields and output_fields to lookupFields and outputFields.
Diffstat (limited to 'groot-bootstrap')
2 files changed, 4 insertions, 12 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index b45d643..f5b1a5d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -14,7 +14,7 @@ import java.net.URLClassLoader; import java.util.*; import java.util.function.BiConsumer; -public abstract class AbstractExecutor<K, V> +public abstract class AbstractExecutor<K, V> implements Executor<DataStream<Event>, JobRuntimeEnvironment> { protected JobRuntimeEnvironment jobRuntimeEnvironment; protected final Config operatorConfig; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index 1719059..42a3a11 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -1,22 +1,14 @@ package com.geedgenetworks.bootstrap.execution; -import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.pojo.AggregateConfig; import com.geedgenetworks.core.pojo.ProcessorConfig; -import com.geedgenetworks.core.pojo.ProjectionConfig; -import com.geedgenetworks.core.pojo.TableConfig; import com.geedgenetworks.core.processor.Processor; -import com.geedgenetworks.core.processor.table.TableProcessor; -import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; import java.util.List; @@ -59,7 +51,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } protected ProcessorConfig checkConfig(String key, Map<String, Object> value, Config processorsConfig) { - ProcessorConfig projectionConfig = new ProcessorConfig(); + ProcessorConfig ProcessorConfig = new ProcessorConfig(); boolean found = false; // 标志变量 CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key), ProjectionConfigOptions.TYPE.key()); @@ -73,7 +65,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, if(processor.type().equals(value.getOrDefault("type", "").toString())){ found = true; try { - projectionConfig = processor.checkConfig(key, value, processorsConfig); + ProcessorConfig = processor.checkConfig(key, value, processorsConfig); } catch (Exception e) { throw new JobExecuteException("Create orderby pipeline instance failed!", e); @@ -84,7 +76,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, if (!found) { throw new JobExecuteException("No matching processor found for type: " + value.getOrDefault("type", "").toString()); } - return projectionConfig; + return ProcessorConfig; } |
