diff options
| author | wangkuan <[email protected]> | 2024-08-13 11:10:40 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-08-13 11:10:40 +0800 |
| commit | b6d37a57e16fee20b2725333dd9740fbe03dc142 (patch) | |
| tree | 15c927ccaa218940e082b49c50c3cc70f35b3b01 | |
| parent | 8ff8fb17f7cf59c163452a62d9d3ecb66672c1b8 (diff) | |
[feature][core][common][bootstrap]支持tableProcessor,增加UnRoll函数
13 files changed, 572 insertions, 7 deletions
diff --git a/config/udf.plugins b/config/udf.plugins index 2978bbe..9eb32c4 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -21,4 +21,5 @@ com.geedgenetworks.core.udf.udaf.CollectSet com.geedgenetworks.core.udf.udaf.LongCount com.geedgenetworks.core.udf.udaf.Mean com.geedgenetworks.core.udf.udaf.LastValue -com.geedgenetworks.core.udf.udaf.FirstValue
\ No newline at end of file +com.geedgenetworks.core.udf.udaf.FirstValue +com.geedgenetworks.core.udf.udtf.UnRoll
\ No newline at end of file diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index bd8b75c..66c0b0f 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -2,15 +2,14 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.common.config.AggregateConfigOptions; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.ProjectionConfigOptions; +import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.pojo.AggregateConfig; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.geedgenetworks.core.pojo.ProjectionConfig; +import com.geedgenetworks.core.pojo.TableConfig; +import com.geedgenetworks.core.processor.table.TableProcessor; import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; @@ -35,6 +34,9 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, case "aggregate": dataStream = executeAggregateProcessor(dataStream, node, (AggregateConfig) processorConfig); break; + case "table": + dataStream = executeTableProcessor(dataStream, node, (TableConfig) processorConfig); + break; case "projection": dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig); break; @@ -43,7 +45,32 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } return dataStream; } + protected SingleOutputStreamOperator executeTableProcessor(SingleOutputStreamOperator dataStream, Node node, TableConfig tableConfig) throws JobExecuteException { + TableProcessor tableProcessor; + if (processorMap.containsKey(tableConfig.getType())) { + tableProcessor = (TableProcessor) processorMap.get(tableConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(tableConfig.getType()); + tableProcessor = (TableProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get processing pipeline instance failed!", e); + } + } + if (node.getParallelism() > 0) { + tableConfig.setParallelism(node.getParallelism()); + } + try { + dataStream = + tableProcessor.processorFunction( + dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); + } catch (Exception e) { + throw new JobExecuteException("Create orderby pipeline instance failed!", e); + } + return dataStream; + } protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { AggregateProcessor aggregateProcessor; @@ -114,6 +141,9 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, case "aggregate": projectionConfig = checkAggregateProcessorConfig(key, value, processorsConfig); break; + case "table": + projectionConfig = checkTableProcessorConfig(key, value, processorsConfig); + break; default://兼容历史版本 projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig); } @@ -164,5 +194,24 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, return aggregateConfig; } + protected TableConfig checkTableProcessorConfig(String key, Map<String, Object> value, Config config) { + + CheckResult result = CheckConfigUtil.checkAtLeastOneExists(config.getConfig(key), + TableConfigOptions.OUTPUT_FIELDS.key(), + TableConfigOptions.REMOVE_FIELDS.key(), + TableConfigOptions.FUNCTIONS.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Table processor: %s, At least one of [%s] should be specified.", + key, String.join(",", + TableConfigOptions.OUTPUT_FIELDS.key(), + TableConfigOptions.REMOVE_FIELDS.key(), + TableConfigOptions.FUNCTIONS.key()))); + } + + TableConfig tableConfig = new JSONObject(value).toJavaObject(TableConfig.class); + tableConfig.setName(key); + return tableConfig; + } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java new file mode 100644 index 0000000..480496d --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java @@ -0,0 +1,34 @@ +package com.geedgenetworks.common.config; + +import com.alibaba.fastjson2.TypeReference; +import com.geedgenetworks.common.udf.UDFContext; + +import java.util.List; + +public interface TableConfigOptions { + Option<String> TYPE = Options.key("type") + .stringType() + .noDefaultValue() + .withDescription("The type of processor."); + + Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be outputted."); + + Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be removed."); + + Option<List<UDFContext>> FUNCTIONS = Options.key("functions") + .type(new TypeReference<List<UDFContext>>() {}) + .noDefaultValue() + .withDescription("The functions to be executed."); + + + + + + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java new file mode 100644 index 0000000..e602291 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java @@ -0,0 +1,20 @@ +package com.geedgenetworks.common.udf; + +import com.geedgenetworks.common.Event; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Collector; + +import java.io.Serializable; +import java.util.List; + +public interface TableFunction extends Serializable { + + void open(RuntimeContext runtimeContext, UDFContext udfContext); + + List<Event> evaluate(Event event); + + String functionName(); + + void close(); + +} diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 1b7fca4..f5a4c3f 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -20,4 +20,5 @@ com.geedgenetworks.core.udf.udaf.CollectSet com.geedgenetworks.core.udf.udaf.LongCount com.geedgenetworks.core.udf.udaf.Mean com.geedgenetworks.core.udf.udaf.LastValue -com.geedgenetworks.core.udf.udaf.FirstValue
\ No newline at end of file +com.geedgenetworks.core.udf.udaf.FirstValue +com.geedgenetworks.core.udf.udtf.UnRoll
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/TableConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/TableConfig.java new file mode 100644 index 0000000..3efb8e1 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/TableConfig.java @@ -0,0 +1,15 @@ +package com.geedgenetworks.core.pojo; + +import com.geedgenetworks.common.udf.UDFContext; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.List; + +@EqualsAndHashCode(callSuper = true) +@Data +public class TableConfig extends ProcessorConfig { + + private List<UDFContext> functions; + private String format; +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java index 34267a6..ab6a6f5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java @@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.projection; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.ScalarFunction; +import com.geedgenetworks.common.udf.TableFunction; import com.geedgenetworks.common.udf.UDFContext; import com.googlecode.aviator.Expression; import lombok.Data; @@ -13,6 +14,7 @@ import java.io.Serializable; public class UdfEntity implements Serializable { private ScalarFunction scalarFunction; private AggregateFunction aggregateFunction; + private TableFunction tableFunction; private Expression filterExpression; private String name; private String className; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java new file mode 100644 index 0000000..4078997 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java @@ -0,0 +1,7 @@ +package com.geedgenetworks.core.processor.table; + +import com.geedgenetworks.core.pojo.TableConfig; +import com.geedgenetworks.core.processor.Processor; +public interface TableProcessor extends Processor<TableConfig> { + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java new file mode 100644 index 0000000..7b6a5e2 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java @@ -0,0 +1,138 @@ +package com.geedgenetworks.core.processor.table; + +import com.alibaba.fastjson.JSON; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.TableFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.common.utils.ColumnUtil; +import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.core.pojo.TableConfig; +import com.geedgenetworks.core.processor.projection.UdfEntity; +import com.google.common.collect.Lists; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; +import org.checkerframework.checker.units.qual.A; + +import java.util.*; + +import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; +import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; +@Slf4j +public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> { + private LinkedList<UdfEntity> functions; + private final TableConfig tableConfig; + private transient InternalMetrics internalMetrics; + + public TableProcessorFunction(TableConfig tableConfig) { + this.tableConfig = tableConfig; + } + + @Override + public void open(Configuration parameters) { + functions = Lists.newLinkedList(); + try { + this.internalMetrics = new InternalMetrics(getRuntimeContext()); + List<UDFContext> udfContexts = tableConfig.getFunctions(); + if (udfContexts == null || udfContexts.isEmpty()) { + return; + } + Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); + List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); + Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); + for (UDFContext udfContext : udfContexts) { + Expression filterExpression = null; + UdfEntity udfEntity = new UdfEntity(); + // 平台注册的函数包含任务中配置的函数则对函数进行实例化 + if (udfClassReflect.containsKey(udfContext.getFunction())) { + Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); + TableFunction tableFunction = (TableFunction) cls.getConstructor().newInstance(); + tableFunction.open(getRuntimeContext(), udfContext); + // 函数如果包含filter,对表达式进行编译 + if (udfContext.getFilter() != null) { + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + filterExpression = instance.compile(udfContext.getFilter(), true); + } + udfEntity.setTableFunction(tableFunction); + udfEntity.setFilterExpression(filterExpression); + udfEntity.setName(udfContext.getFunction()); + udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); + functions.add(udfEntity); + } else { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, + "Unsupported UDTF: " + udfContext.getFunction()); + } + + } + } catch (Exception e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDTF failed!", e); + } + } + + + @Override + public void flatMap(Event event, Collector<Event> out) throws Exception { + internalMetrics.incrementInEvents(); + int errorCount = 0; + List<Event> events = new ArrayList<>(); + events.add(event); + for (UdfEntity udfEntity : functions) { + List<Event> newEvents = new ArrayList<>(); + for(int i=0;i<events.size();i++) { + try { + boolean result = udfEntity.getFilterExpression() != null ? filterExecute(udfEntity.getFilterExpression(), udfEntity.getFilterExpression().newEnv("event", events.get(i).getExtractedFields())) : true; + if (!events.get(i).isDropped() && result) { + newEvents.addAll(udfEntity.getTableFunction().evaluate(events.get(i))); + } + } catch (ExpressionRuntimeException ignore) { + log.error("Function " + udfEntity.getName() + " Invalid filter ! "); + errorCount++; + } catch (Exception e) { + log.error("Function " + udfEntity.getName() + " execute exception !", e); + errorCount++; + } + + } + events.clear(); + events.addAll(newEvents); + } + if(errorCount>0){ + internalMetrics.incrementErrorEvents(); + } + for(Event newEvent:events){ + if (tableConfig.getOutput_fields() != null + && !tableConfig.getOutput_fields().isEmpty()) { + newEvent.setExtractedFields( + ColumnUtil.columnSelector( + newEvent.getExtractedFields(), tableConfig.getOutput_fields())); + } + if (tableConfig.getRemove_fields() != null + && !tableConfig.getRemove_fields().isEmpty()) { + newEvent.setExtractedFields( + ColumnUtil.columnRemover( + newEvent.getExtractedFields(), tableConfig.getRemove_fields())); + } + if (!event.isDropped()) { + out.collect(newEvent); + internalMetrics.incrementOutEvents(); + } else { + internalMetrics.incrementDroppedEvents(); + } + + } + + + } +}
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java new file mode 100644 index 0000000..f36f8db --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java @@ -0,0 +1,33 @@ +package com.geedgenetworks.core.processor.table; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.TableConfig; +import com.geedgenetworks.core.processor.projection.ProjectionProcessFunction; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +import java.time.Duration; + +public class TableProcessorImpl implements TableProcessor { + + @Override + public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, TableConfig tableConfig, ExecutionConfig config) throws Exception { + + if (tableConfig.getParallelism() != 0) { + return grootEventSingleOutputStreamOperator + .flatMap(new TableProcessorFunction(tableConfig)) + .setParallelism(tableConfig.getParallelism()) + .name(tableConfig.getName()); + } else { + return grootEventSingleOutputStreamOperator + .flatMap(new TableProcessorFunction(tableConfig)) + .name(tableConfig.getName()); + } + } + + @Override + public String type() { + return "table"; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java new file mode 100644 index 0000000..82bea7b --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java @@ -0,0 +1,132 @@ +package com.geedgenetworks.core.udf.udtf; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.TableFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.util.*; + + +@Slf4j +public class UnRoll implements TableFunction { + + private String lookupFieldName; + private String outputFieldName; + private Expression compiledExp; + private String expression; + private String outputFieldType; + + + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); + this.outputFieldType="object"; + if(udfContext.getParameters()==null ){ + expression=""; + } + else { + this.outputFieldType=udfContext.getParameters().getOrDefault("output_field_type", "object").toString().trim(); + expression=udfContext.getParameters().getOrDefault("path", "").toString().trim(); + } + if(!expression.isEmpty()){ + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + compiledExp = instance.compile("event."+expression, true); + } + + } + + @Override + public List<Event> evaluate(Event event) { + try { + if(event.getExtractedFields().containsKey(lookupFieldName)){ + Object object; + if(event.getExtractedFields().get(lookupFieldName) instanceof String){ + object = JSONObject.parseObject((String) event.getExtractedFields().get(lookupFieldName), Object.class); + } + else { + object = event.getExtractedFields().get(lookupFieldName); + } + try { + if(compiledExp!=null){ + Object obj = compiledExp.execute(compiledExp.newEnv("event", object)); + if(obj instanceof List) { + return parseList(obj,event); + } + else if(obj instanceof String){ + object = JSONObject.parseObject((String)obj, Object.class); + if(object instanceof List){ + return parseList(object,event); + } + else { + log.error("Invalid unroll ! Object is not instance of list. expression=" +expression); + } + }else { + log.error("Invalid unroll ! Object is not instance of String or List. expression=" +expression); + } + } + else { + if(object instanceof List){ + return parseList(object,event); + } + else { + log.error("Invalid unroll ! Object is not instance of list. "); + } + } + }catch (Exception e) { + log.error("Invalid unroll ! expression=" +expression + " Exception :" + e.getMessage()); + } + } + }catch (Exception e) { + log.error("Invalid parseObject ! expression=" +expression + " Exception :" + e.getMessage()); + } + return Collections.singletonList(event); + } + + private List<Event> parseList(Object object,Event event) { + List list = (List) object; + List<Event> eventList = new ArrayList<>(); + for (Object obj : list) { + Event newEvent = new Event(); + newEvent.setExtractedFields(new HashMap<>()); + newEvent.getExtractedFields().putAll(event.getExtractedFields()); + newEvent.getExtractedFields().remove(lookupFieldName); + if("string".equals(outputFieldType)) { + String jsonString = JSON.toJSONString(obj); + newEvent.getExtractedFields().put(outputFieldName, jsonString); + } + else { + newEvent.getExtractedFields().put(outputFieldName, obj); + } + eventList.add(newEvent); + } + return eventList; + } + + @Override + public String functionName() { + return "UNROLL"; + } + + @Override + public void close() { + + } + +} diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor index 727b42b..1f32ffa 100644 --- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.Processor @@ -1,2 +1,3 @@ com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl -com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
\ No newline at end of file +com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl +com.geedgenetworks.core.processor.table.TableProcessorImpl
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java new file mode 100644 index 0000000..5686a42 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java @@ -0,0 +1,132 @@ +package com.geedgenetworks.core.udf.test.table; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udtf.UnRoll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class UnRollFunctionTest { + + private static Map<String, Object> nestedMap; + @BeforeAll + public static void setUp() { + nestedMap = Map.of( + "k1", List.of( + Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"), + Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff") + ), + "k2", Map.of("name", "[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"c2s_destination_mac\":\"10:70:fd:03:c2:6d\",\"s2c_source_mac\":\"10:70:fd:03:c2:6d\",\"s2c_destination_mac\":\"10:70:fd:03:c2:6c\"},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"c2s_destination_mac\":\"10:70:fd:03:c2:6d\",\"s2c_source_mac\":\"10:70:fd:03:c2:6d\",\"s2c_destination_mac\":\"10:70:fd:03:c2:6c\"}]", "lastName", "{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}"), + "k3","[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}]", + "k4","{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}", + "k5", Map.of("name",List.of( + Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"), + Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff") + ) + ), "k6", Map.of("name",List.of( + Map.of("name2", List.of( + Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"), + Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff") + ), "source_mac", "52:d4:18:c7:e5:11"), + Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff") + ) + ) + ); + } + + // 测试方法 + @Test + public void testUnrollFunction1() { + + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("k1")); + udfContext.setOutput_fields(List.of("newk1")); + Map<String, Object> params = new HashMap<>(); + params.put("path", ""); + udfContext.setParameters(params); + UnRoll unroll = new UnRoll(); + unroll.open(null, udfContext); + Event event = new Event(); + event.setExtractedFields(nestedMap); + List<Event> result = unroll.evaluate(event); + assertEquals(2, result.size()); + Map<String, Object> map1 = (Map<String, Object>) result.get(0).getExtractedFields().get("newk1"); + assertEquals("52:d4:18:c7:e5:11", map1.get("source_mac")); + Map<String, Object> map2 = (Map<String, Object>) result.get(1).getExtractedFields().get("newk1"); + assertEquals("ff:ff:ff:ff:ff:ff", map2.get("source_mac")); + + + + } + + @Test + public void testUnrollFunction2() { + UDFContext udfContext = new UDFContext(); + UnRoll unroll = new UnRoll(); + Event event = new Event(); + event.setExtractedFields(nestedMap); + udfContext.setLookup_fields(List.of("k3")); + udfContext.setOutput_fields(List.of("newk3")); + unroll.open(null, udfContext); + List<Event> result3 = unroll.evaluate(event); + assertEquals(2, result3.size()); + Map<String, Object> map3 = (Map<String, Object>) result3.get(0).getExtractedFields().get("newk3"); + assertEquals("52:d4:18:c7:e5:11", map3.get("source_mac")); + Map<String, Object> map4 = (Map<String, Object>) result3.get(1).getExtractedFields().get("newk3"); + assertEquals("ff:ff:ff:ff:ff:ff", map4.get("source_mac")); + + + Map<String, Object> params = new HashMap<>(); + params.put("path", "name"); + udfContext.setParameters(params); + event.setExtractedFields(nestedMap); + udfContext.setLookup_fields(List.of("k2")); + udfContext.setOutput_fields(List.of("newk2")); + unroll.open(null, udfContext); + List<Event> result2 = unroll.evaluate(event); + assertEquals(2, result2.size()); + Map<String, Object> map5 = (Map<String, Object>) result2.get(0).getExtractedFields().get("newk2"); + assertEquals("52:d4:18:c7:e5:11", map5.get("source_mac")); + Map<String, Object> map6 = (Map<String, Object>) result2.get(1).getExtractedFields().get("newk2"); + assertEquals("ff:ff:ff:ff:ff:ff", map6.get("source_mac")); + + + Map<String, Object> params1 = new HashMap<>(); + params.put("path", "name.0.name2"); + udfContext.setParameters(params1); + event.setExtractedFields(nestedMap); + udfContext.setLookup_fields(List.of("k6")); + udfContext.setOutput_fields(List.of("newk6")); + unroll.open(null, udfContext); + List<Event> result6 = unroll.evaluate(event); + assertEquals(2, result6.size()); + Map<String, Object> map9 = (Map<String, Object>) result6.get(0).getExtractedFields().get("newk6"); + assertEquals("52:d4:18:c7:e5:11", map9.get("source_mac")); + Map<String, Object> map10 = (Map<String, Object>) result6.get(1).getExtractedFields().get("newk6"); + assertEquals("ff:ff:ff:ff:ff:ff", map10.get("source_mac")); + } + @Test + public void testUnrollFunction4() { + + + UDFContext udfContext = new UDFContext(); + UnRoll unroll = new UnRoll(); + Event event = new Event(); + Map<String, Object> params = new HashMap<>(); + udfContext.setParameters(params); + event.setExtractedFields(nestedMap); + udfContext.setLookup_fields(List.of("k4")); + udfContext.setOutput_fields(List.of("newk4")); + unroll.open(null, udfContext); + List<Event> result2 = unroll.evaluate(event); + assertEquals(1, result2.size()); + + } +} |
