summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2023-12-14 19:17:50 +0800
committerdoufenghu <[email protected]>2023-12-14 19:17:50 +0800
commit21a1b77c9cfcaec53e6ef1e8eb9ece7bd564288c (patch)
tree5128a957df5df84c2a14189b6952737b037099c8
parent8fd641686064bcc8eca22044c9fb11046fe4ab61 (diff)
[improve][Drop] The isFinal flag has been replaced by the isDropped flag. Because the isFinal indicates that the event has been tagged as end and will not be processed by the next operator.But don't describe events that have been dropped.
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/Event.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java2
-rw-r--r--groot-example/src/main/java/com/geedgenetworks/example/InlineToConsoleExample.java1
-rw-r--r--groot-example/src/main/resources/examples/inline-to-console-job.yaml2
5 files changed, 7 insertions, 9 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/Event.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/Event.java
index e8a7426..97bdd82 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/Event.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/Event.java
@@ -9,10 +9,9 @@ import java.util.Map;
public class Event implements Serializable {
public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp";
private Map<String, Object> extractedFields;
- // Event final flag, default is false. if set to true, stops ingestion data to the downstream functions and don't output next processor or sink.
- private boolean isFinal = false;
-
-
+ //Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing.
+ private boolean isDropped = false;
+
public Map<String, Object> getExtractedFields() {
return extractedFields;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index 57bbfcd..2c90a56 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -78,7 +78,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
for(UdfEntity udfEntity: functions){
try {
- if (event.isFinal()) {
+ if (event.isDropped()) {
break;
} else {
boolean executionUDF = udfEntity.getFilterExpression() != null
@@ -100,7 +100,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
ColumnUtil.columnSelector(
event.getExtractedFields(), projectionConfig.getOutput_fields()));
}
- if(!event.isFinal()){out.collect(event);}
+ if(!event.isDropped()){out.collect(event);}
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
index d1b5509..6426a33 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
@@ -13,7 +13,7 @@ public class Drop implements UDF {
@Override
public Event evaluate(Event event) {
- event.setFinal(true);
+ event.setDropped(true);
return event;
}
diff --git a/groot-example/src/main/java/com/geedgenetworks/example/InlineToConsoleExample.java b/groot-example/src/main/java/com/geedgenetworks/example/InlineToConsoleExample.java
index 7391b07..93a4277 100644
--- a/groot-example/src/main/java/com/geedgenetworks/example/InlineToConsoleExample.java
+++ b/groot-example/src/main/java/com/geedgenetworks/example/InlineToConsoleExample.java
@@ -22,7 +22,6 @@ public class InlineToConsoleExample {
executeCommandArgs.setTargetType(TargetType.LOCAL);
executeCommandArgs.setVariables(null);
GrootStreamServer.run(executeCommandArgs.buildCommand());
-
}
public static String getTestConfigFile(String configFile)
diff --git a/groot-example/src/main/resources/examples/inline-to-console-job.yaml b/groot-example/src/main/resources/examples/inline-to-console-job.yaml
index e107617..c565457 100644
--- a/groot-example/src/main/resources/examples/inline-to-console-job.yaml
+++ b/groot-example/src/main/resources/examples/inline-to-console-job.yaml
@@ -40,7 +40,7 @@ preprocessing_pipelines:
processing_pipelines:
session_record_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
- remove_fields:
+ remove_fields: [log_id]
output_fields:
properties:
key: value