summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2023-12-19 10:18:15 +0800
committerwangkuan <[email protected]>2023-12-19 10:18:15 +0800
commit23b8e2b20bbad2684256a5d68ed841f6d20e8d27 (patch)
treec8245e27a00956665c898b2a14bfd1e947135007
parent0702fb66ffd49c4d6f98800152e394814cade21c (diff)
[improve][common][core]修改udf注册文件读取方式,与配置文件读取方式相同,修改配置文件名
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java1
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java2
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java31
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java10
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigBuilder.java62
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigLocator.java47
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamConfigBuilder.java3
-rw-r--r--groot-common/src/main/resources/groot-platform-plugin11
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java42
9 files changed, 195 insertions, 14 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index 9db4d11..2090243 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -113,6 +113,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
} else {
environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
}
+ configuration.setString(Constants.MAPPING_GROOTSTREAM_UDF, JSON.toJSONString(grootStreamConfig.getUdfMappingConfig()));
configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getEngineConfig()));
environment.getConfig().enableObjectReuse();
environment.getConfig().setGlobalJobParameters(configuration);
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index 40f5b23..1a7d88f 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -37,7 +37,9 @@ public final class Constants {
public static final String GROOTSTREAM_CLUSTER_NAME = "gt:impl:grootStreamServer";
+ public static final String MAPPING_GROOTSTREAM_UDF = "groot-platform-udf";
+ public static final String MAPPING_GROOTSTREAM_UDF_DEFAULT_PLUGIN = "groot-platform-udf.plugin";
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java
index af33030..9ec2aae 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java
@@ -9,6 +9,7 @@ import com.hazelcast.config.YamlConfigBuilder;
import com.hazelcast.internal.config.YamlConfigLocator;
import java.io.ByteArrayInputStream;
+import java.util.List;
import java.util.Properties;
import static com.hazelcast.internal.config.DeclarativeConfigUtil.*;
@@ -118,4 +119,34 @@ public class ConfigProvider {
return config;
}
+ public static List<String> locateAndGetMappingConfig(Properties properties) {
+ //validateSuffixInSystemProperty(SYSPROP_MEMBER_CONFIG);
+
+ List<String> config;
+
+ MappingUdfGrootStreamConfigLocator mappingGrootStreamConfigLocator = new MappingUdfGrootStreamConfigLocator();
+ if (mappingGrootStreamConfigLocator.locateFromSystemProperty()) {
+ // 1. Try loading config if provided in system property, and it is an YAML file
+
+
+ config =
+ new MappingUdfGrootStreamConfigBuilder(mappingGrootStreamConfigLocator.getIn())
+ .setProperties(properties)
+ .build();
+ } else if (mappingGrootStreamConfigLocator.locateInWorkDirOrOnClasspath()) {
+ // 2. Try loading YAML config from the working directory or from the classpath
+ config =
+ new MappingUdfGrootStreamConfigBuilder(mappingGrootStreamConfigLocator.getIn())
+ .setProperties(properties)
+ .build();
+ } else {
+ // 3. Loading the default YAML configuration file
+ mappingGrootStreamConfigLocator.locateDefault();
+ config =
+ new MappingUdfGrootStreamConfigBuilder(mappingGrootStreamConfigLocator.getIn())
+ .setProperties(properties)
+ .build();
+ }
+ return config;
+ }
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java
index f36091c..ac61c46 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java
@@ -5,6 +5,7 @@ import com.hazelcast.config.Config;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
+import java.util.List;
@Slf4j
public class GrootStreamConfig {
@@ -12,6 +13,7 @@ public class GrootStreamConfig {
private Config hazelcastConfig;
+ private List<String> udfMappingConfig;
static {
String value = grootStreamHome();
@@ -55,4 +57,12 @@ public class GrootStreamConfig {
return engineConfig;
}
+
+ public List<String> getUdfMappingConfig() {
+ return udfMappingConfig;
+ }
+
+ public void setUdfMappingConfig(List<String> udfMappingConfig) {
+ this.udfMappingConfig = udfMappingConfig;
+ }
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigBuilder.java b/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigBuilder.java
new file mode 100644
index 0000000..fb87546
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigBuilder.java
@@ -0,0 +1,62 @@
+package com.geedgenetworks.common.config;
+
+import com.hazelcast.config.*;
+import com.hazelcast.internal.nio.IOUtil;
+import com.hazelcast.internal.util.Preconditions;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class MappingUdfGrootStreamConfigBuilder extends AbstractConfigBuilder {
+ private final InputStream in;
+ private Properties properties = System.getProperties();
+
+ public MappingUdfGrootStreamConfigBuilder() {
+ this((MappingUdfGrootStreamConfigLocator) null);
+ }
+
+ public MappingUdfGrootStreamConfigBuilder(MappingUdfGrootStreamConfigLocator locator) {
+ if (locator == null) {
+ locator = new MappingUdfGrootStreamConfigLocator();
+ locator.locateEverywhere();
+ }
+ this.in = locator.getIn();
+ }
+ public MappingUdfGrootStreamConfigBuilder(InputStream inputStream) {
+ Preconditions.checkTrue(inputStream != null, "inputStream can't be null");
+ this.in = inputStream;
+ }
+ public List<String> build() {
+ List<String> lines = new ArrayList<>();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(this.in))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ lines.add(line);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ IOUtil.closeResource(this.in);
+ }
+
+
+ return lines;
+ }
+
+ public InputStream getIn() {
+ return in;
+ }
+
+ public MappingUdfGrootStreamConfigBuilder setProperties(Properties properties) {
+ this.properties = properties;
+ return this;
+ }
+
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigLocator.java b/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigLocator.java
new file mode 100644
index 0000000..5bccc7e
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/MappingUdfGrootStreamConfigLocator.java
@@ -0,0 +1,47 @@
+package com.geedgenetworks.common.config;
+
+import com.geedgenetworks.common.Constants;
+import com.hazelcast.internal.config.AbstractConfigLocator;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+@Slf4j
+public final class MappingUdfGrootStreamConfigLocator extends AbstractConfigLocator {
+
+ public static Collection<String> ALL_ACCEPTED_SUFFIXES =Collections.unmodifiableList(Arrays.asList("plugin"));
+
+
+ public MappingUdfGrootStreamConfigLocator() {
+ }
+ @Override
+ public boolean locateFromSystemProperty() {
+ return loadFromSystemProperty(Constants.MAPPING_GROOTSTREAM_UDF, ALL_ACCEPTED_SUFFIXES);
+ }
+
+ @Override
+ protected boolean locateFromSystemPropertyOrFailOnUnacceptedSuffix() {
+ return loadFromSystemPropertyOrFailOnUnacceptedSuffix(
+ Constants.MAPPING_GROOTSTREAM_UDF, ALL_ACCEPTED_SUFFIXES);
+ }
+
+ @Override
+ protected boolean locateInWorkDir() {
+ return loadFromWorkingDirectory(
+ Constants.MAPPING_GROOTSTREAM_UDF, ALL_ACCEPTED_SUFFIXES);
+ }
+
+ @Override
+ protected boolean locateOnClasspath() {
+ return loadConfigurationFromClasspath(
+ Constants.MAPPING_GROOTSTREAM_UDF, ALL_ACCEPTED_SUFFIXES);
+ }
+
+ @Override
+ public boolean locateDefault() {
+ loadDefaultConfigurationFromClasspath(Constants.MAPPING_GROOTSTREAM_UDF_DEFAULT_PLUGIN);
+ return true;
+ }
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamConfigBuilder.java b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamConfigBuilder.java
index 7ecc8d3..104eb9b 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamConfigBuilder.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamConfigBuilder.java
@@ -14,6 +14,7 @@ import org.w3c.dom.Node;
import java.io.InputStream;
import java.util.Properties;
+import static com.geedgenetworks.common.config.ConfigProvider.locateAndGetMappingConfig;
import static com.hazelcast.internal.config.yaml.W3cDomUtil.asW3cNode;
public class YamlGrootStreamConfigBuilder extends AbstractYamlConfigBuilder {
@@ -24,6 +25,7 @@ public class YamlGrootStreamConfigBuilder extends AbstractYamlConfigBuilder {
}
public YamlGrootStreamConfigBuilder(YamlGrootStreamConfigLocator locator) {
+
if (locator == null) {
locator = new YamlGrootStreamConfigLocator();
locator.locateEverywhere();
@@ -51,6 +53,7 @@ public class YamlGrootStreamConfigBuilder extends AbstractYamlConfigBuilder {
throw ExceptionUtil.rethrow(e);
}
config.setHazelcastConfig(ConfigProvider.locateAndGetMemberConfig(getProperties()));
+ config.setUdfMappingConfig(ConfigProvider.locateAndGetMappingConfig(getProperties()));
return config;
}
diff --git a/groot-common/src/main/resources/groot-platform-plugin b/groot-common/src/main/resources/groot-platform-plugin
deleted file mode 100644
index d22c057..0000000
--- a/groot-common/src/main/resources/groot-platform-plugin
+++ /dev/null
@@ -1,11 +0,0 @@
-com.geedgenetworks.core.udf.SnowflakeId
-com.geedgenetworks.core.udf.Drop
-com.geedgenetworks.core.udf.AsnLookup
-com.geedgenetworks.core.udf.Eval
-com.geedgenetworks.core.udf.JsonExtract
-com.geedgenetworks.core.udf.UnixTimestamp
-com.geedgenetworks.core.udf.Domain
-com.geedgenetworks.core.udf.DecodeBase64
-com.geedgenetworks.core.udf.GeoIpLookup
-com.geedgenetworks.core.udf.PathCombine
-com.geedgenetworks.core.udf.UnixTimestampConverter
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index 416257e..e0e7ffe 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -2,6 +2,8 @@ package com.geedgenetworks.core.processor.projection;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
@@ -15,12 +17,17 @@ import com.googlecode.aviator.AviatorEvaluatorInstance;
import com.googlecode.aviator.Expression;
import com.googlecode.aviator.Options;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.quartz.SchedulerException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import static com.geedgenetworks.core.utils.SchedulerUtils.shutdownScheduler;
@@ -37,6 +44,15 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
@Override
public void open(Configuration parameters) throws Exception {
+ Configuration configuration = (Configuration) getRuntimeContext()
+ .getExecutionConfig().getGlobalJobParameters();
+ List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.MAPPING_GROOTSTREAM_UDF), List.class);
+ Map<String,String> udfClassReflect =getClassReflect(udfClassNameLists);
+
+
+
+
+
this.functions = new LinkedList<>();
try {
// KnowledgeBaseUpdateJob.getInstance();
@@ -44,9 +60,9 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
Expression filterExpression = null;
UdfEntity udfEntity = new UdfEntity();
// 平台注册的函数包含任务中配置的函数则对函数进行实例化
- if (CommonConfig.UDF_CLASS_REFLECT.containsKey(udfContext.getFunction())) {
+ if (udfClassReflect.containsKey(udfContext.getFunction())) {
- Class<?> cls = Class.forName(CommonConfig.UDF_CLASS_REFLECT.get(udfContext.getFunction()));
+ Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
UDF udf = (UDF) cls.getConstructor().newInstance();
udf.open(getRuntimeContext(), udfContext);
// 函数如果包含filter,对表达式进行编译
@@ -60,7 +76,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
udfEntity.setUdfFunction(udf);
udfEntity.setFilterExpression(filterExpression);
udfEntity.setName(udfContext.getFunction());
- udfEntity.setClassName(CommonConfig.UDF_CLASS_REFLECT.get(udfContext.getFunction()));
+ udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
functions.add(udfEntity);
} else {
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "No such udf function "+udfContext.getFunction());
@@ -102,6 +118,26 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
}
if(!event.isDropped()){out.collect(event);}
}
+ public static Map<String, String> getClassReflect(List<String> plugins) {
+
+ Map<String, String> classReflect = new HashMap<>();
+
+ for (String classPath : plugins) {
+
+ Class cls = null;
+ try {
+ cls = Class.forName(classPath);
+ Method method = cls.getMethod("functionName");
+ Object object = cls.newInstance();
+ String result = (String) method.invoke(object);
+ classReflect.put(result, classPath);
+ System.out.println("Returned Value: " + result);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ return classReflect;
+ }
@Override
public void close() throws Exception {