summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhoujinchuan <[email protected]>2024-02-22 11:27:53 +0800
committerhoujinchuan <[email protected]>2024-02-22 11:27:53 +0800
commit68991747b9dd5775ba58439b51331a686c650294 (patch)
tree3412475c7c9215a1fbbfb21cfe016dd2b896fe2c
parent5589e5c0fbf893e559cc291b69a7881c96b7df77 (diff)
[feature][connector-ipfix] GAL-486 IPFIX Collector输出有价值的Metrics
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java14
1 files changed, 14 insertions, 0 deletions
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
index f2deea5..529827c 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
@@ -5,9 +5,11 @@ import cn.hutool.log.LogFactory;
import com.geedgenetworks.connectors.ipfix.collector.utils.IPFixUtil;
import com.geedgenetworks.core.connector.source.SourceProvider;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.core.types.*;
import com.geedgenetworks.core.types.DataType;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -73,9 +75,19 @@ public class IPFixSourceProvider implements SourceProvider {
serviceDiscoveryProperties);
DataStream<byte[]> source = env.addSource(udpSource).setParallelism(udpSourceParallelism);
return source.flatMap(new RichFlatMapFunction<byte[], Event>() {
+ private InternalMetrics internalMetrics;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ internalMetrics = new InternalMetrics(getRuntimeContext());
+ }
+
@Override
public void flatMap(byte[] data, Collector<Event> out) {
try {
+ internalMetrics.incrementInEvents();
+ internalMetrics.incrementInBytes(data.length);
ByteBuffer buffer = ByteBuffer.wrap(data);
RecordReader reader = new RecordReader(new ByteBufferMessageReader(buffer, sessionGroup));
while (reader.hasNext()) {
@@ -101,8 +113,10 @@ public class IPFixSourceProvider implements SourceProvider {
}
} catch (NoTemplateException e) {
LOG.error(e.getMessage() + ". The client did not send the template, or there was a delay in processing the template message.");
+ internalMetrics.incrementErrorEvents();
} catch (Exception e) {
LOG.error("IPFix parsing failed.", e);
+ internalMetrics.incrementErrorEvents();
}
}
});