summaryrefslogtreecommitdiff
path: root/groot-connectors
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2023-12-13 17:45:53 +0800
committerdoufenghu <[email protected]>2023-12-13 17:45:53 +0800
commit2154ce4e9ec8d4ace4f98d90ecf9c796e1871dc2 (patch)
tree5d8c605f2bbe84c6e7ae555497136ca027eb69c0 /groot-connectors
parent1f63d977b41318e2538f5c8b2501cce2af6e4bbf (diff)
[fix][UDF] 修复DROP函数,isFinal为事件结束标志,为true下游函数不再执行,Process Function不再将该事件发送到下游Process Function或Sink。isFinal控制事件流在pipeline执行,只在Process Function有效。
Diffstat (limited to 'groot-connectors')
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java1
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java2
2 files changed, 0 insertions, 3 deletions
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
index e014d87..407b820 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
@@ -73,7 +73,6 @@ public class IPFixSourceProvider implements SourceProvider {
}
fieldMap.put(Event.INTERNAL_TIMESTAMP_KEY, record.exportTime().get().getEpochSecond() * 1000);
event.setExtractedFields(fieldMap);
- event.setFinal(true);
out.collect(event);
}
} catch (NoTemplateException e) {
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java
index b961dca..f9fd4b4 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java
@@ -41,7 +41,6 @@ public class KafkaSource implements Source {
public Event map(Tuple2<String, Long> message) {
Event event = new Event();
- event.setFinal(true);
try {
Map<String, Object> values = JSON.parseObject(message.f0);
//event.set__timestamp(message.f1);
@@ -65,7 +64,6 @@ public class KafkaSource implements Source {
event.setExtractedFields(values);
return event;
} catch (Exception e) {
- event.setFinal(false);
logger.error(e.toString());
return event;
}