diff options
| author | doufenghu <[email protected]> | 2023-12-14 19:17:50 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2023-12-14 19:17:50 +0800 |
| commit | 21a1b77c9cfcaec53e6ef1e8eb9ece7bd564288c (patch) | |
| tree | 5128a957df5df84c2a14189b6952737b037099c8 | |
| parent | 8fd641686064bcc8eca22044c9fb11046fe4ab61 (diff) | |
[improve][Drop] The isFinal flag has been replaced by the isDropped flag. Because the isFinal indicates that the event has been tagged as end and will not be processed by the next operator.But don't describe events that have been dropped.
5 files changed, 7 insertions, 9 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/Event.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/Event.java index e8a7426..97bdd82 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/Event.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/Event.java @@ -9,10 +9,9 @@ import java.util.Map; public class Event implements Serializable { public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp"; private Map<String, Object> extractedFields; - // Event final flag, default is false. if set to true, stops ingestion data to the downstream functions and don't output next processor or sink. - private boolean isFinal = false; - - + //Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing. + private boolean isDropped = false; + public Map<String, Object> getExtractedFields() { return extractedFields; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index 57bbfcd..2c90a56 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -78,7 +78,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { for(UdfEntity udfEntity: functions){ try { - if (event.isFinal()) { + if (event.isDropped()) { break; } else { boolean executionUDF = udfEntity.getFilterExpression() != null @@ -100,7 +100,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { ColumnUtil.columnSelector( event.getExtractedFields(), projectionConfig.getOutput_fields())); } - if(!event.isFinal()){out.collect(event);} + if(!event.isDropped()){out.collect(event);} } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java index d1b5509..6426a33 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java @@ -13,7 +13,7 @@ public class Drop implements UDF { @Override public Event evaluate(Event event) { - event.setFinal(true); + event.setDropped(true); return event; } diff --git a/groot-example/src/main/java/com/geedgenetworks/example/InlineToConsoleExample.java b/groot-example/src/main/java/com/geedgenetworks/example/InlineToConsoleExample.java index 7391b07..93a4277 100644 --- a/groot-example/src/main/java/com/geedgenetworks/example/InlineToConsoleExample.java +++ b/groot-example/src/main/java/com/geedgenetworks/example/InlineToConsoleExample.java @@ -22,7 +22,6 @@ public class InlineToConsoleExample { executeCommandArgs.setTargetType(TargetType.LOCAL); executeCommandArgs.setVariables(null); GrootStreamServer.run(executeCommandArgs.buildCommand()); - } public static String getTestConfigFile(String configFile) diff --git a/groot-example/src/main/resources/examples/inline-to-console-job.yaml b/groot-example/src/main/resources/examples/inline-to-console-job.yaml index e107617..c565457 100644 --- a/groot-example/src/main/resources/examples/inline-to-console-job.yaml +++ b/groot-example/src/main/resources/examples/inline-to-console-job.yaml @@ -40,7 +40,7 @@ preprocessing_pipelines: processing_pipelines: session_record_processor: # [object] Processing Pipeline type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl - remove_fields: + remove_fields: [log_id] output_fields: properties: key: value |
