summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-08-13 11:10:40 +0800
committerwangkuan <[email protected]>2024-08-13 11:10:40 +0800
commitb6d37a57e16fee20b2725333dd9740fbe03dc142 (patch)
tree15c927ccaa218940e082b49c50c3cc70f35b3b01
parent8ff8fb17f7cf59c163452a62d9d3ecb66672c1b8 (diff)
[feature][core][common][bootstrap]支持tableProcessor,增加UnRoll函数
-rw-r--r--config/udf.plugins3
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java57
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java34
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java20
-rw-r--r--groot-common/src/main/resources/udf.plugins3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/TableConfig.java15
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java138
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java33
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java132
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor3
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java132
13 files changed, 572 insertions, 7 deletions
diff --git a/config/udf.plugins b/config/udf.plugins
index 2978bbe..9eb32c4 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -21,4 +21,5 @@ com.geedgenetworks.core.udf.udaf.CollectSet
com.geedgenetworks.core.udf.udaf.LongCount
com.geedgenetworks.core.udf.udaf.Mean
com.geedgenetworks.core.udf.udaf.LastValue
-com.geedgenetworks.core.udf.udaf.FirstValue \ No newline at end of file
+com.geedgenetworks.core.udf.udaf.FirstValue
+com.geedgenetworks.core.udf.udtf.UnRoll \ No newline at end of file
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index bd8b75c..66c0b0f 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -2,15 +2,14 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.common.config.AggregateConfigOptions;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.ProjectionConfigOptions;
+import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.core.pojo.AggregateConfig;
import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.geedgenetworks.core.pojo.ProjectionConfig;
+import com.geedgenetworks.core.pojo.TableConfig;
+import com.geedgenetworks.core.processor.table.TableProcessor;
import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.typesafe.config.Config;
@@ -35,6 +34,9 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
case "aggregate":
dataStream = executeAggregateProcessor(dataStream, node, (AggregateConfig) processorConfig);
break;
+ case "table":
+ dataStream = executeTableProcessor(dataStream, node, (TableConfig) processorConfig);
+ break;
case "projection":
dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig);
break;
@@ -43,7 +45,32 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
return dataStream;
}
+ protected SingleOutputStreamOperator executeTableProcessor(SingleOutputStreamOperator dataStream, Node node, TableConfig tableConfig) throws JobExecuteException {
+ TableProcessor tableProcessor;
+ if (processorMap.containsKey(tableConfig.getType())) {
+ tableProcessor = (TableProcessor) processorMap.get(tableConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(tableConfig.getType());
+ tableProcessor = (TableProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get processing pipeline instance failed!", e);
+ }
+ }
+ if (node.getParallelism() > 0) {
+ tableConfig.setParallelism(node.getParallelism());
+ }
+ try {
+ dataStream =
+ tableProcessor.processorFunction(
+ dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
+ } catch (Exception e) {
+ throw new JobExecuteException("Create orderby pipeline instance failed!", e);
+ }
+ return dataStream;
+ }
protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
AggregateProcessor aggregateProcessor;
@@ -114,6 +141,9 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
case "aggregate":
projectionConfig = checkAggregateProcessorConfig(key, value, processorsConfig);
break;
+ case "table":
+ projectionConfig = checkTableProcessorConfig(key, value, processorsConfig);
+ break;
default://兼容历史版本
projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig);
}
@@ -164,5 +194,24 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
return aggregateConfig;
}
+ protected TableConfig checkTableProcessorConfig(String key, Map<String, Object> value, Config config) {
+
+ CheckResult result = CheckConfigUtil.checkAtLeastOneExists(config.getConfig(key),
+ TableConfigOptions.OUTPUT_FIELDS.key(),
+ TableConfigOptions.REMOVE_FIELDS.key(),
+ TableConfigOptions.FUNCTIONS.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Table processor: %s, At least one of [%s] should be specified.",
+ key, String.join(",",
+ TableConfigOptions.OUTPUT_FIELDS.key(),
+ TableConfigOptions.REMOVE_FIELDS.key(),
+ TableConfigOptions.FUNCTIONS.key())));
+ }
+
+ TableConfig tableConfig = new JSONObject(value).toJavaObject(TableConfig.class);
+ tableConfig.setName(key);
+ return tableConfig;
+ }
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java
new file mode 100644
index 0000000..480496d
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java
@@ -0,0 +1,34 @@
+package com.geedgenetworks.common.config;
+
+import com.alibaba.fastjson2.TypeReference;
+import com.geedgenetworks.common.udf.UDFContext;
+
+import java.util.List;
+
+public interface TableConfigOptions {
+ Option<String> TYPE = Options.key("type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The type of processor.");
+
+ Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be outputted.");
+
+ Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be removed.");
+
+ Option<List<UDFContext>> FUNCTIONS = Options.key("functions")
+ .type(new TypeReference<List<UDFContext>>() {})
+ .noDefaultValue()
+ .withDescription("The functions to be executed.");
+
+
+
+
+
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java
new file mode 100644
index 0000000..e602291
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java
@@ -0,0 +1,20 @@
+package com.geedgenetworks.common.udf;
+
+import com.geedgenetworks.common.Event;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface TableFunction extends Serializable {
+
+ void open(RuntimeContext runtimeContext, UDFContext udfContext);
+
+ List<Event> evaluate(Event event);
+
+ String functionName();
+
+ void close();
+
+}
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index 1b7fca4..f5a4c3f 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -20,4 +20,5 @@ com.geedgenetworks.core.udf.udaf.CollectSet
com.geedgenetworks.core.udf.udaf.LongCount
com.geedgenetworks.core.udf.udaf.Mean
com.geedgenetworks.core.udf.udaf.LastValue
-com.geedgenetworks.core.udf.udaf.FirstValue \ No newline at end of file
+com.geedgenetworks.core.udf.udaf.FirstValue
+com.geedgenetworks.core.udf.udtf.UnRoll \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/TableConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/TableConfig.java
new file mode 100644
index 0000000..3efb8e1
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/TableConfig.java
@@ -0,0 +1,15 @@
+package com.geedgenetworks.core.pojo;
+
+import com.geedgenetworks.common.udf.UDFContext;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.util.List;
+
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class TableConfig extends ProcessorConfig {
+
+ private List<UDFContext> functions;
+ private String format;
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
index 34267a6..ab6a6f5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
@@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.projection;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.ScalarFunction;
+import com.geedgenetworks.common.udf.TableFunction;
import com.geedgenetworks.common.udf.UDFContext;
import com.googlecode.aviator.Expression;
import lombok.Data;
@@ -13,6 +14,7 @@ import java.io.Serializable;
public class UdfEntity implements Serializable {
private ScalarFunction scalarFunction;
private AggregateFunction aggregateFunction;
+ private TableFunction tableFunction;
private Expression filterExpression;
private String name;
private String className;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java
new file mode 100644
index 0000000..4078997
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java
@@ -0,0 +1,7 @@
+package com.geedgenetworks.core.processor.table;
+
+import com.geedgenetworks.core.pojo.TableConfig;
+import com.geedgenetworks.core.processor.Processor;
+public interface TableProcessor extends Processor<TableConfig> {
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
new file mode 100644
index 0000000..7b6a5e2
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
@@ -0,0 +1,138 @@
+package com.geedgenetworks.core.processor.table;
+
+import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.TableFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.common.utils.ColumnUtil;
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.core.pojo.TableConfig;
+import com.geedgenetworks.core.processor.projection.UdfEntity;
+import com.google.common.collect.Lists;
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import com.googlecode.aviator.exception.ExpressionRuntimeException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.checkerframework.checker.units.qual.A;
+
+import java.util.*;
+
+import static com.geedgenetworks.core.utils.UDFUtils.filterExecute;
+import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
+@Slf4j
+public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> {
+ private LinkedList<UdfEntity> functions;
+ private final TableConfig tableConfig;
+ private transient InternalMetrics internalMetrics;
+
+ public TableProcessorFunction(TableConfig tableConfig) {
+ this.tableConfig = tableConfig;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ functions = Lists.newLinkedList();
+ try {
+ this.internalMetrics = new InternalMetrics(getRuntimeContext());
+ List<UDFContext> udfContexts = tableConfig.getFunctions();
+ if (udfContexts == null || udfContexts.isEmpty()) {
+ return;
+ }
+ Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
+ Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
+ for (UDFContext udfContext : udfContexts) {
+ Expression filterExpression = null;
+ UdfEntity udfEntity = new UdfEntity();
+ // 平台注册的函数包含任务中配置的函数则对函数进行实例化
+ if (udfClassReflect.containsKey(udfContext.getFunction())) {
+ Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
+ TableFunction tableFunction = (TableFunction) cls.getConstructor().newInstance();
+ tableFunction.open(getRuntimeContext(), udfContext);
+ // 函数如果包含filter,对表达式进行编译
+ if (udfContext.getFilter() != null) {
+ AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
+ instance.setCachedExpressionByDefault(true);
+ instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
+ instance.setFunctionMissing(null);
+ filterExpression = instance.compile(udfContext.getFilter(), true);
+ }
+ udfEntity.setTableFunction(tableFunction);
+ udfEntity.setFilterExpression(filterExpression);
+ udfEntity.setName(udfContext.getFunction());
+ udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
+ functions.add(udfEntity);
+ } else {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ "Unsupported UDTF: " + udfContext.getFunction());
+ }
+
+ }
+ } catch (Exception e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDTF failed!", e);
+ }
+ }
+
+
+ @Override
+ public void flatMap(Event event, Collector<Event> out) throws Exception {
+ internalMetrics.incrementInEvents();
+ int errorCount = 0;
+ List<Event> events = new ArrayList<>();
+ events.add(event);
+ for (UdfEntity udfEntity : functions) {
+ List<Event> newEvents = new ArrayList<>();
+ for(int i=0;i<events.size();i++) {
+ try {
+ boolean result = udfEntity.getFilterExpression() != null ? filterExecute(udfEntity.getFilterExpression(), udfEntity.getFilterExpression().newEnv("event", events.get(i).getExtractedFields())) : true;
+ if (!events.get(i).isDropped() && result) {
+ newEvents.addAll(udfEntity.getTableFunction().evaluate(events.get(i)));
+ }
+ } catch (ExpressionRuntimeException ignore) {
+ log.error("Function " + udfEntity.getName() + " Invalid filter ! ");
+ errorCount++;
+ } catch (Exception e) {
+ log.error("Function " + udfEntity.getName() + " execute exception !", e);
+ errorCount++;
+ }
+
+ }
+ events.clear();
+ events.addAll(newEvents);
+ }
+ if(errorCount>0){
+ internalMetrics.incrementErrorEvents();
+ }
+ for(Event newEvent:events){
+ if (tableConfig.getOutput_fields() != null
+ && !tableConfig.getOutput_fields().isEmpty()) {
+ newEvent.setExtractedFields(
+ ColumnUtil.columnSelector(
+ newEvent.getExtractedFields(), tableConfig.getOutput_fields()));
+ }
+ if (tableConfig.getRemove_fields() != null
+ && !tableConfig.getRemove_fields().isEmpty()) {
+ newEvent.setExtractedFields(
+ ColumnUtil.columnRemover(
+ newEvent.getExtractedFields(), tableConfig.getRemove_fields()));
+ }
+ if (!event.isDropped()) {
+ out.collect(newEvent);
+ internalMetrics.incrementOutEvents();
+ } else {
+ internalMetrics.incrementDroppedEvents();
+ }
+
+ }
+
+
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
new file mode 100644
index 0000000..f36f8db
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
@@ -0,0 +1,33 @@
+package com.geedgenetworks.core.processor.table;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.TableConfig;
+import com.geedgenetworks.core.processor.projection.ProjectionProcessFunction;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import java.time.Duration;
+
+public class TableProcessorImpl implements TableProcessor {
+
+ @Override
+ public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, TableConfig tableConfig, ExecutionConfig config) throws Exception {
+
+ if (tableConfig.getParallelism() != 0) {
+ return grootEventSingleOutputStreamOperator
+ .flatMap(new TableProcessorFunction(tableConfig))
+ .setParallelism(tableConfig.getParallelism())
+ .name(tableConfig.getName());
+ } else {
+ return grootEventSingleOutputStreamOperator
+ .flatMap(new TableProcessorFunction(tableConfig))
+ .name(tableConfig.getName());
+ }
+ }
+
+ @Override
+ public String type() {
+ return "table";
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java
new file mode 100644
index 0000000..82bea7b
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java
@@ -0,0 +1,132 @@
+package com.geedgenetworks.core.udf.udtf;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.TableFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.util.*;
+
+
+@Slf4j
+public class UnRoll implements TableFunction {
+
+ private String lookupFieldName;
+ private String outputFieldName;
+ private Expression compiledExp;
+ private String expression;
+ private String outputFieldType;
+
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.outputFieldType="object";
+ if(udfContext.getParameters()==null ){
+ expression="";
+ }
+ else {
+ this.outputFieldType=udfContext.getParameters().getOrDefault("output_field_type", "object").toString().trim();
+ expression=udfContext.getParameters().getOrDefault("path", "").toString().trim();
+ }
+ if(!expression.isEmpty()){
+ AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
+ instance.setCachedExpressionByDefault(true);
+ instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
+ instance.setFunctionMissing(null);
+ compiledExp = instance.compile("event."+expression, true);
+ }
+
+ }
+
+ @Override
+ public List<Event> evaluate(Event event) {
+ try {
+ if(event.getExtractedFields().containsKey(lookupFieldName)){
+ Object object;
+ if(event.getExtractedFields().get(lookupFieldName) instanceof String){
+ object = JSONObject.parseObject((String) event.getExtractedFields().get(lookupFieldName), Object.class);
+ }
+ else {
+ object = event.getExtractedFields().get(lookupFieldName);
+ }
+ try {
+ if(compiledExp!=null){
+ Object obj = compiledExp.execute(compiledExp.newEnv("event", object));
+ if(obj instanceof List) {
+ return parseList(obj,event);
+ }
+ else if(obj instanceof String){
+ object = JSONObject.parseObject((String)obj, Object.class);
+ if(object instanceof List){
+ return parseList(object,event);
+ }
+ else {
+ log.error("Invalid unroll ! Object is not instance of list. expression=" +expression);
+ }
+ }else {
+ log.error("Invalid unroll ! Object is not instance of String or List. expression=" +expression);
+ }
+ }
+ else {
+ if(object instanceof List){
+ return parseList(object,event);
+ }
+ else {
+ log.error("Invalid unroll ! Object is not instance of list. ");
+ }
+ }
+ }catch (Exception e) {
+ log.error("Invalid unroll ! expression=" +expression + " Exception :" + e.getMessage());
+ }
+ }
+ }catch (Exception e) {
+ log.error("Invalid parseObject ! expression=" +expression + " Exception :" + e.getMessage());
+ }
+ return Collections.singletonList(event);
+ }
+
+ private List<Event> parseList(Object object,Event event) {
+ List list = (List) object;
+ List<Event> eventList = new ArrayList<>();
+ for (Object obj : list) {
+ Event newEvent = new Event();
+ newEvent.setExtractedFields(new HashMap<>());
+ newEvent.getExtractedFields().putAll(event.getExtractedFields());
+ newEvent.getExtractedFields().remove(lookupFieldName);
+ if("string".equals(outputFieldType)) {
+ String jsonString = JSON.toJSONString(obj);
+ newEvent.getExtractedFields().put(outputFieldName, jsonString);
+ }
+ else {
+ newEvent.getExtractedFields().put(outputFieldName, obj);
+ }
+ eventList.add(newEvent);
+ }
+ return eventList;
+ }
+
+ @Override
+ public String functionName() {
+ return "UNROLL";
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor
index 727b42b..1f32ffa 100644
--- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor
@@ -1,2 +1,3 @@
com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl
-com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl \ No newline at end of file
+com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+com.geedgenetworks.core.processor.table.TableProcessorImpl \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java
new file mode 100644
index 0000000..5686a42
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java
@@ -0,0 +1,132 @@
+package com.geedgenetworks.core.udf.test.table;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udtf.UnRoll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class UnRollFunctionTest {
+
+ private static Map<String, Object> nestedMap;
+ @BeforeAll
+ public static void setUp() {
+ nestedMap = Map.of(
+ "k1", List.of(
+ Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"),
+ Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff")
+ ),
+ "k2", Map.of("name", "[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"c2s_destination_mac\":\"10:70:fd:03:c2:6d\",\"s2c_source_mac\":\"10:70:fd:03:c2:6d\",\"s2c_destination_mac\":\"10:70:fd:03:c2:6c\"},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"c2s_destination_mac\":\"10:70:fd:03:c2:6d\",\"s2c_source_mac\":\"10:70:fd:03:c2:6d\",\"s2c_destination_mac\":\"10:70:fd:03:c2:6c\"}]", "lastName", "{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}"),
+ "k3","[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}]",
+ "k4","{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}",
+ "k5", Map.of("name",List.of(
+ Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"),
+ Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff")
+ )
+ ), "k6", Map.of("name",List.of(
+ Map.of("name2", List.of(
+ Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"),
+ Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff")
+ ), "source_mac", "52:d4:18:c7:e5:11"),
+ Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff")
+ )
+ )
+ );
+ }
+
+ // 测试方法
+ @Test
+ public void testUnrollFunction1() {
+
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("k1"));
+ udfContext.setOutput_fields(List.of("newk1"));
+ Map<String, Object> params = new HashMap<>();
+ params.put("path", "");
+ udfContext.setParameters(params);
+ UnRoll unroll = new UnRoll();
+ unroll.open(null, udfContext);
+ Event event = new Event();
+ event.setExtractedFields(nestedMap);
+ List<Event> result = unroll.evaluate(event);
+ assertEquals(2, result.size());
+ Map<String, Object> map1 = (Map<String, Object>) result.get(0).getExtractedFields().get("newk1");
+ assertEquals("52:d4:18:c7:e5:11", map1.get("source_mac"));
+ Map<String, Object> map2 = (Map<String, Object>) result.get(1).getExtractedFields().get("newk1");
+ assertEquals("ff:ff:ff:ff:ff:ff", map2.get("source_mac"));
+
+
+
+ }
+
+ @Test
+ public void testUnrollFunction2() {
+ UDFContext udfContext = new UDFContext();
+ UnRoll unroll = new UnRoll();
+ Event event = new Event();
+ event.setExtractedFields(nestedMap);
+ udfContext.setLookup_fields(List.of("k3"));
+ udfContext.setOutput_fields(List.of("newk3"));
+ unroll.open(null, udfContext);
+ List<Event> result3 = unroll.evaluate(event);
+ assertEquals(2, result3.size());
+ Map<String, Object> map3 = (Map<String, Object>) result3.get(0).getExtractedFields().get("newk3");
+ assertEquals("52:d4:18:c7:e5:11", map3.get("source_mac"));
+ Map<String, Object> map4 = (Map<String, Object>) result3.get(1).getExtractedFields().get("newk3");
+ assertEquals("ff:ff:ff:ff:ff:ff", map4.get("source_mac"));
+
+
+ Map<String, Object> params = new HashMap<>();
+ params.put("path", "name");
+ udfContext.setParameters(params);
+ event.setExtractedFields(nestedMap);
+ udfContext.setLookup_fields(List.of("k2"));
+ udfContext.setOutput_fields(List.of("newk2"));
+ unroll.open(null, udfContext);
+ List<Event> result2 = unroll.evaluate(event);
+ assertEquals(2, result2.size());
+ Map<String, Object> map5 = (Map<String, Object>) result2.get(0).getExtractedFields().get("newk2");
+ assertEquals("52:d4:18:c7:e5:11", map5.get("source_mac"));
+ Map<String, Object> map6 = (Map<String, Object>) result2.get(1).getExtractedFields().get("newk2");
+ assertEquals("ff:ff:ff:ff:ff:ff", map6.get("source_mac"));
+
+
+ Map<String, Object> params1 = new HashMap<>();
+ params.put("path", "name.0.name2");
+ udfContext.setParameters(params1);
+ event.setExtractedFields(nestedMap);
+ udfContext.setLookup_fields(List.of("k6"));
+ udfContext.setOutput_fields(List.of("newk6"));
+ unroll.open(null, udfContext);
+ List<Event> result6 = unroll.evaluate(event);
+ assertEquals(2, result6.size());
+ Map<String, Object> map9 = (Map<String, Object>) result6.get(0).getExtractedFields().get("newk6");
+ assertEquals("52:d4:18:c7:e5:11", map9.get("source_mac"));
+ Map<String, Object> map10 = (Map<String, Object>) result6.get(1).getExtractedFields().get("newk6");
+ assertEquals("ff:ff:ff:ff:ff:ff", map10.get("source_mac"));
+ }
+ @Test
+ public void testUnrollFunction4() {
+
+
+ UDFContext udfContext = new UDFContext();
+ UnRoll unroll = new UnRoll();
+ Event event = new Event();
+ Map<String, Object> params = new HashMap<>();
+ udfContext.setParameters(params);
+ event.setExtractedFields(nestedMap);
+ udfContext.setLookup_fields(List.of("k4"));
+ udfContext.setOutput_fields(List.of("newk4"));
+ unroll.open(null, udfContext);
+ List<Event> result2 = unroll.evaluate(event);
+ assertEquals(1, result2.size());
+
+ }
+}