diff options
| author | wangkuan <[email protected]> | 2023-12-19 10:18:15 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2023-12-19 10:18:15 +0800 |
| commit | 23b8e2b20bbad2684256a5d68ed841f6d20e8d27 (patch) | |
| tree | c8245e27a00956665c898b2a14bfd1e947135007 | |
| parent | 0702fb66ffd49c4d6f98800152e394814cade21c (diff) | |
[improve][common][core]修改udf注册文件读取方式,与配置文件读取方式相同,修改配置文件名
9 files changed, 195 insertions, 14 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 9db4d11..2090243 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -113,6 +113,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } else { environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); } + configuration.setString(Constants.MAPPING_GROOTSTREAM_UDF, JSON.toJSONString(grootStreamConfig.getUdfMappingConfig())); configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getEngineConfig())); environment.getConfig().enableObjectReuse(); environment.getConfig().setGlobalJobParameters(configuration); diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index 40f5b23..1a7d88f 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -37,7 +37,9 @@ public final class Constants { public static final String GROOTSTREAM_CLUSTER_NAME = "gt:impl:grootStreamServer"; + public static final String MAPPING_GROOTSTREAM_UDF = "groot-platform-udf"; + public static final String MAPPING_GROOTSTREAM_UDF_DEFAULT_PLUGIN = "groot-platform-udf.plugin"; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java index af33030..9ec2aae 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java @@ -9,6 +9,7 @@ import com.hazelcast.config.YamlConfigBuilder; import com.hazelcast.internal.config.YamlConfigLocator; import java.io.ByteArrayInputStream; +import java.util.List; import java.util.Properties; import static com.hazelcast.internal.config.DeclarativeConfigUtil.*; @@ -118,4 +119,34 @@ public class ConfigProvider { return config; } + public static List<String> locateAndGetMappingConfig(Properties properties) { + //validateSuffixInSystemProperty(SYSPROP_MEMBER_CONFIG); + + List<String> config; + + MappingUdfGrootStreamConfigLocator mappingGrootStreamConfigLocator = new MappingUdfGrootStreamConfigLocator(); + if (mappingGrootStreamConfigLocator.locateFromSystemProperty()) { + // 1. Try loading config if provided in system property, and it is an YAML file + + + config = + new MappingUdfGrootStreamConfigBuilder(mappingGrootStreamConfigLocator.getIn()) + .setProperties(properties) + .build(); + } else if (mappingGrootStreamConfigLocator.locateInWorkDirOrOnClasspath()) { + // 2. Try loading YAML config from the working directory or from the classpath + config = + new MappingUdfGrootStreamConfigBuilder(mappingGrootStreamConfigLocator.getIn()) + .setProperties(properties) + .build(); + } else { + // 3. Loading the default YAML configuration file + mappingGrootStreamConfigLocator.locateDefault(); + config = + new MappingUdfGrootStreamConfigBuilder(mappingGrootStreamConfigLocator.getIn()) + .setProperties(properties) + .build(); + } + return config; + } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java index f36091c..ac61c46 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java @@ -5,6 +5,7 @@ import com.hazelcast.config.Config; import lombok.extern.slf4j.Slf4j; import java.io.File; +import java.util.List; @Slf4j public class GrootStreamConfig { @@ -12,6 +13,7 @@ public class GrootStreamConfig { private Config hazelcastConfig; + private List<String> udfMappingConfig; static { String value = grootStreamHome(); @@ -55,4 +57,12 @@ public class GrootStreamConfig { return engineConfig; } + + public List<String> getUdfMappingConfig() { + return udfMappingConfig; + } + + public void setUdfMappingConfig(List<String> udfMappingConfig) { + this.udfMappingConfig = udfMappingConfig; + } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigBuilder.java b/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigBuilder.java new file mode 100644 index 0000000..fb87546 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigBuilder.java @@ -0,0 +1,62 @@ +package com.geedgenetworks.common.config; + +import com.hazelcast.config.*; +import com.hazelcast.internal.nio.IOUtil; +import com.hazelcast.internal.util.Preconditions; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +public class MappingUdfGrootStreamConfigBuilder extends AbstractConfigBuilder { + private final InputStream in; + private Properties properties = System.getProperties(); + + public MappingUdfGrootStreamConfigBuilder() { + this((MappingUdfGrootStreamConfigLocator) null); + } + + public MappingUdfGrootStreamConfigBuilder(MappingUdfGrootStreamConfigLocator locator) { + if (locator == null) { + locator = new MappingUdfGrootStreamConfigLocator(); + locator.locateEverywhere(); + } + this.in = locator.getIn(); + } + public MappingUdfGrootStreamConfigBuilder(InputStream inputStream) { + Preconditions.checkTrue(inputStream != null, "inputStream can't be null"); + this.in = inputStream; + } + public List<String> build() { + List<String> lines = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(this.in))) { + String line; + while ((line = reader.readLine()) != null) { + lines.add(line); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + finally { + IOUtil.closeResource(this.in); + } + + + return lines; + } + + public InputStream getIn() { + return in; + } + + public MappingUdfGrootStreamConfigBuilder setProperties(Properties properties) { + this.properties = properties; + return this; + } + + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigLocator.java b/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigLocator.java new file mode 100644 index 0000000..5bccc7e --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigLocator.java @@ -0,0 +1,47 @@ +package com.geedgenetworks.common.config; + +import com.geedgenetworks.common.Constants; +import com.hazelcast.internal.config.AbstractConfigLocator; +import lombok.extern.slf4j.Slf4j; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +@Slf4j +public final class MappingUdfGrootStreamConfigLocator extends AbstractConfigLocator { + + public static Collection<String> ALL_ACCEPTED_SUFFIXES =Collections.unmodifiableList(Arrays.asList("plugin")); + + + public MappingUdfGrootStreamConfigLocator() { + } + @Override + public boolean locateFromSystemProperty() { + return loadFromSystemProperty(Constants.MAPPING_GROOTSTREAM_UDF, ALL_ACCEPTED_SUFFIXES); + } + + @Override + protected boolean locateFromSystemPropertyOrFailOnUnacceptedSuffix() { + return loadFromSystemPropertyOrFailOnUnacceptedSuffix( + Constants.MAPPING_GROOTSTREAM_UDF, ALL_ACCEPTED_SUFFIXES); + } + + @Override + protected boolean locateInWorkDir() { + return loadFromWorkingDirectory( + Constants.MAPPING_GROOTSTREAM_UDF, ALL_ACCEPTED_SUFFIXES); + } + + @Override + protected boolean locateOnClasspath() { + return loadConfigurationFromClasspath( + Constants.MAPPING_GROOTSTREAM_UDF, ALL_ACCEPTED_SUFFIXES); + } + + @Override + public boolean locateDefault() { + loadDefaultConfigurationFromClasspath(Constants.MAPPING_GROOTSTREAM_UDF_DEFAULT_PLUGIN); + return true; + } +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamConfigBuilder.java b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamConfigBuilder.java index 7ecc8d3..104eb9b 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamConfigBuilder.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamConfigBuilder.java @@ -14,6 +14,7 @@ import org.w3c.dom.Node; import java.io.InputStream; import java.util.Properties; +import static com.geedgenetworks.common.config.ConfigProvider.locateAndGetMappingConfig; import static com.hazelcast.internal.config.yaml.W3cDomUtil.asW3cNode; public class YamlGrootStreamConfigBuilder extends AbstractYamlConfigBuilder { @@ -24,6 +25,7 @@ public class YamlGrootStreamConfigBuilder extends AbstractYamlConfigBuilder { } public YamlGrootStreamConfigBuilder(YamlGrootStreamConfigLocator locator) { + if (locator == null) { locator = new YamlGrootStreamConfigLocator(); locator.locateEverywhere(); @@ -51,6 +53,7 @@ public class YamlGrootStreamConfigBuilder extends AbstractYamlConfigBuilder { throw ExceptionUtil.rethrow(e); } config.setHazelcastConfig(ConfigProvider.locateAndGetMemberConfig(getProperties())); + config.setUdfMappingConfig(ConfigProvider.locateAndGetMappingConfig(getProperties())); return config; } diff --git a/groot-common/src/main/resources/groot-platform-plugin b/groot-common/src/main/resources/groot-platform-plugin deleted file mode 100644 index d22c057..0000000 --- a/groot-common/src/main/resources/groot-platform-plugin +++ /dev/null @@ -1,11 +0,0 @@ -com.geedgenetworks.core.udf.SnowflakeId -com.geedgenetworks.core.udf.Drop -com.geedgenetworks.core.udf.AsnLookup -com.geedgenetworks.core.udf.Eval -com.geedgenetworks.core.udf.JsonExtract -com.geedgenetworks.core.udf.UnixTimestamp -com.geedgenetworks.core.udf.Domain -com.geedgenetworks.core.udf.DecodeBase64 -com.geedgenetworks.core.udf.GeoIpLookup -com.geedgenetworks.core.udf.PathCombine -com.geedgenetworks.core.udf.UnixTimestampConverter diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index 416257e..e0e7ffe 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -2,6 +2,8 @@ package com.geedgenetworks.core.processor.projection; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSON; +import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; @@ -15,12 +17,17 @@ import com.googlecode.aviator.AviatorEvaluatorInstance; import com.googlecode.aviator.Expression; import com.googlecode.aviator.Options; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.quartz.SchedulerException; +import java.lang.reflect.Method; +import java.util.HashMap; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import static com.geedgenetworks.core.utils.SchedulerUtils.shutdownScheduler; @@ -37,6 +44,15 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { @Override public void open(Configuration parameters) throws Exception { + Configuration configuration = (Configuration) getRuntimeContext() + .getExecutionConfig().getGlobalJobParameters(); + List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.MAPPING_GROOTSTREAM_UDF), List.class); + Map<String,String> udfClassReflect =getClassReflect(udfClassNameLists); + + + + + this.functions = new LinkedList<>(); try { // KnowledgeBaseUpdateJob.getInstance(); @@ -44,9 +60,9 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { Expression filterExpression = null; UdfEntity udfEntity = new UdfEntity(); // 平台注册的函数包含任务中配置的函数则对函数进行实例化 - if (CommonConfig.UDF_CLASS_REFLECT.containsKey(udfContext.getFunction())) { + if (udfClassReflect.containsKey(udfContext.getFunction())) { - Class<?> cls = Class.forName(CommonConfig.UDF_CLASS_REFLECT.get(udfContext.getFunction())); + Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); UDF udf = (UDF) cls.getConstructor().newInstance(); udf.open(getRuntimeContext(), udfContext); // 函数如果包含filter,对表达式进行编译 @@ -60,7 +76,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { udfEntity.setUdfFunction(udf); udfEntity.setFilterExpression(filterExpression); udfEntity.setName(udfContext.getFunction()); - udfEntity.setClassName(CommonConfig.UDF_CLASS_REFLECT.get(udfContext.getFunction())); + udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); functions.add(udfEntity); } else { throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "No such udf function "+udfContext.getFunction()); @@ -102,6 +118,26 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { } if(!event.isDropped()){out.collect(event);} } + public static Map<String, String> getClassReflect(List<String> plugins) { + + Map<String, String> classReflect = new HashMap<>(); + + for (String classPath : plugins) { + + Class cls = null; + try { + cls = Class.forName(classPath); + Method method = cls.getMethod("functionName"); + Object object = cls.newInstance(); + String result = (String) method.invoke(object); + classReflect.put(result, classPath); + System.out.println("Returned Value: " + result); + } catch (Exception e) { + e.printStackTrace(); + } + } + return classReflect; + } @Override public void close() throws Exception { |
