diff options
| author | houjinchuan <[email protected]> | 2024-02-22 11:27:53 +0800 |
|---|---|---|
| committer | houjinchuan <[email protected]> | 2024-02-22 11:27:53 +0800 |
| commit | 68991747b9dd5775ba58439b51331a686c650294 (patch) | |
| tree | 3412475c7c9215a1fbbfb21cfe016dd2b896fe2c | |
| parent | 5589e5c0fbf893e559cc291b69a7881c96b7df77 (diff) | |
[feature][connector-ipfix] GAL-486 IPFIX Collector输出有价值的Metrics
| -rw-r--r-- | groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java | 14 |
1 files changed, 14 insertions, 0 deletions
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java index f2deea5..529827c 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java @@ -5,9 +5,11 @@ import cn.hutool.log.LogFactory; import com.geedgenetworks.connectors.ipfix.collector.utils.IPFixUtil; import com.geedgenetworks.core.connector.source.SourceProvider; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.metrics.InternalMetrics; import com.geedgenetworks.core.types.*; import com.geedgenetworks.core.types.DataType; import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -73,9 +75,19 @@ public class IPFixSourceProvider implements SourceProvider { serviceDiscoveryProperties); DataStream<byte[]> source = env.addSource(udpSource).setParallelism(udpSourceParallelism); return source.flatMap(new RichFlatMapFunction<byte[], Event>() { + private InternalMetrics internalMetrics; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + internalMetrics = new InternalMetrics(getRuntimeContext()); + } + @Override public void flatMap(byte[] data, Collector<Event> out) { try { + internalMetrics.incrementInEvents(); + internalMetrics.incrementInBytes(data.length); ByteBuffer buffer = ByteBuffer.wrap(data); RecordReader reader = new RecordReader(new ByteBufferMessageReader(buffer, sessionGroup)); while (reader.hasNext()) { @@ -101,8 +113,10 @@ public class IPFixSourceProvider implements SourceProvider { } } catch (NoTemplateException e) { LOG.error(e.getMessage() + ". The client did not send the template, or there was a delay in processing the template message."); + internalMetrics.incrementErrorEvents(); } catch (Exception e) { LOG.error("IPFix parsing failed.", e); + internalMetrics.incrementErrorEvents(); } } }); |
