diff options
| author | doufenghu <[email protected]> | 2023-12-13 17:45:53 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2023-12-13 17:45:53 +0800 |
| commit | 2154ce4e9ec8d4ace4f98d90ecf9c796e1871dc2 (patch) | |
| tree | 5d8c605f2bbe84c6e7ae555497136ca027eb69c0 /groot-connectors | |
| parent | 1f63d977b41318e2538f5c8b2501cce2af6e4bbf (diff) | |
[fix][UDF] 修复DROP函数,isFinal为事件结束标志,为true下游函数不再执行,Process Function不再将该事件发送到下游Process Function或Sink。isFinal控制事件流在pipeline执行,只在Process Function有效。
Diffstat (limited to 'groot-connectors')
2 files changed, 0 insertions, 3 deletions
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java index e014d87..407b820 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java @@ -73,7 +73,6 @@ public class IPFixSourceProvider implements SourceProvider { } fieldMap.put(Event.INTERNAL_TIMESTAMP_KEY, record.exportTime().get().getEpochSecond() * 1000); event.setExtractedFields(fieldMap); - event.setFinal(true); out.collect(event); } } catch (NoTemplateException e) { diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java index b961dca..f9fd4b4 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java @@ -41,7 +41,6 @@ public class KafkaSource implements Source { public Event map(Tuple2<String, Long> message) { Event event = new Event(); - event.setFinal(true); try { Map<String, Object> values = JSON.parseObject(message.f0); //event.set__timestamp(message.f1); @@ -65,7 +64,6 @@ public class KafkaSource implements Source { event.setExtractedFields(values); return event; } catch (Exception e) { - event.setFinal(false); logger.error(e.toString()); return event; } |
