summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-02-20 17:58:27 +0800
committerwangkuan <[email protected]>2024-02-20 17:58:27 +0800
commitf0c8b24fefaaa9d5567120a95c62290b7a909792 (patch)
tree36c9966911e69f3711f91b9cce94b3aa837b1fa3
parent346dda6ac57bc8ead0939f30f63f926a0d3c07e1 (diff)
[feature][core]统一内置metrics输出,修改ProjectionProcessFunction对error日志的统计逻辑
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java49
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java27
3 files changed, 71 insertions, 28 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java
index 144d138..facb4af 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java
@@ -2,6 +2,7 @@ package com.geedgenetworks.core.filter;
import com.geedgenetworks.common.utils.ColumnUtil;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.core.pojo.FilterConfig;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.AviatorEvaluatorInstance;
@@ -19,32 +20,26 @@ public class FilterFunction extends RichFilterFunction<Event> {
private final com.geedgenetworks.core.pojo.FilterConfig FilterConfig;
private static Expression compiledExp;
private static String expression;
- private transient Counter droppedEvents;
- private transient Counter errorEvents;
- private transient Counter outEvents;
- private transient Counter inEvents;
+ private transient InternalMetrics internalMetrics;
+
public FilterFunction(FilterConfig FilterConfig) {
this.FilterConfig = FilterConfig;
}
@Override
public void open(Configuration parameters) throws Exception {
-
+ this.internalMetrics = new InternalMetrics(getRuntimeContext());
expression = FilterConfig.getProperties().getOrDefault("expression", "").toString();
AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
instance.setCachedExpressionByDefault(true);
instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
instance.setFunctionMissing(null);
compiledExp = instance.compile(expression, true);
- this.errorEvents = getRuntimeContext().getMetricGroup().counter("error_events");
- this.droppedEvents = getRuntimeContext().getMetricGroup().counter("dropped_events");
- this.inEvents = getRuntimeContext().getMetricGroup().counter("in_events");
- this.outEvents = getRuntimeContext().getMetricGroup().counter("out_events");
}
@Override
public boolean filter(Event value) {
- inEvents.inc();
+ internalMetrics.incrementInEvents();
boolean isFilter ;
try {
@@ -59,17 +54,17 @@ public class FilterFunction extends RichFilterFunction<Event> {
} catch (ExpressionRuntimeException e){
isFilter = false;
log.error("Invalid filter ! expression=" +expression);
- errorEvents.inc();
+ internalMetrics.incrementErrorEvents();
}
catch (RuntimeException ignored) {
isFilter = false;
- errorEvents.inc();
+ internalMetrics.incrementErrorEvents();
}
if(isFilter){
- outEvents.inc();
+ internalMetrics.incrementOutEvents();
}
else {
- droppedEvents.inc();
+ internalMetrics.incrementDroppedEvents();
}
return isFilter;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java b/groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java
new file mode 100644
index 0000000..ff68845
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java
@@ -0,0 +1,49 @@
+package com.geedgenetworks.core.metrics;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+public class InternalMetrics {
+ private final MetricGroup metricGroup;
+ private final Counter errorEvents;
+ private final Counter droppedEvents;
+ private final Counter inEvents;
+ private final Counter outEvents;
+ private final Counter inBytes;
+ private final Counter outBytes;
+
+ public InternalMetrics(RuntimeContext runtimeContext) {
+ metricGroup = runtimeContext.getMetricGroup().addGroup("internal_metrics");
+ errorEvents = metricGroup.counter("error_events");
+ droppedEvents = metricGroup.counter("dropped_events");
+ inEvents = metricGroup.counter("in_events");
+ outEvents = metricGroup.counter("out_events");
+ inBytes = metricGroup.counter("in_bytes");
+ outBytes = metricGroup.counter("out_bytes");
+ }
+
+ public void incrementErrorEvents() {
+ errorEvents.inc();
+ }
+
+ public void incrementDroppedEvents() {
+ droppedEvents.inc();
+ }
+
+ public void incrementInEvents() {
+ inEvents.inc();
+ }
+
+ public void incrementOutEvents() {
+ outEvents.inc();
+ }
+
+ public void incrementInBytes(long bytes) {
+ inBytes.inc(bytes);
+ }
+
+ public void incrementOutBytes(long bytes) {
+ outBytes.inc(bytes);
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index 8cd889c..71454dc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -8,6 +8,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDFContext;
import com.geedgenetworks.common.utils.ColumnUtil;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import com.geedgenetworks.common.udf.UDF;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseScheduler;
@@ -33,10 +34,7 @@ import java.util.Map;
public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
private final ProjectionConfig projectionConfig;
private LinkedList<UdfEntity> functions;
- private transient Counter droppedEvents;
- private transient Counter errorEvents;
- private transient Counter outEvents;
- private transient Counter inEvents;
+ private transient InternalMetrics internalMetrics;
public ProjectionProcessFunction(ProjectionConfig projectionConfig) {
@@ -47,6 +45,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
public void open(Configuration parameters) throws Exception {
functions = Lists.newLinkedList();
try {
+ this.internalMetrics = new InternalMetrics(getRuntimeContext());
List<UDFContext> udfContexts = projectionConfig.getFunctions();
if (udfContexts == null || udfContexts.isEmpty()) {
return;
@@ -83,10 +82,6 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
}
}
- this.errorEvents = getRuntimeContext().getMetricGroup().counter("error_events");
- this.droppedEvents = getRuntimeContext().getMetricGroup().counter("dropped_events");
- this.inEvents = getRuntimeContext().getMetricGroup().counter("in_events");
- this.outEvents = getRuntimeContext().getMetricGroup().counter("out_events");
} catch (Exception e) {
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDF failed!", e);
}
@@ -95,8 +90,10 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
@Override
public void processElement(Event event, Context ctx, Collector<Event> out) {
- inEvents.inc();
+ internalMetrics.incrementInEvents();
+ int errorCount = 0;
for (UdfEntity udfEntity : functions) {
+
try {
if (event.isDropped()) {
break;
@@ -108,14 +105,16 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
}
} catch (ExpressionRuntimeException ignore) {
log.error("Function " + udfEntity.getName() + " Invalid filter ! ");
- errorEvents.inc();
+ errorCount++;
} catch (Exception e) {
log.error("Function " + udfEntity.getName() + " execute exception !", e);
- errorEvents.inc();
+ errorCount++;
}
}
-
+ if(errorCount>0){
+ internalMetrics.incrementErrorEvents();
+ }
// 判断函数是否有output fields,减少输出字段
if (projectionConfig.getOutput_fields() != null
&& !projectionConfig.getOutput_fields().isEmpty()) {
@@ -131,9 +130,9 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
}
if (!event.isDropped()) {
out.collect(event);
- outEvents.inc();
+ internalMetrics.incrementOutEvents();
} else {
- droppedEvents.inc();
+ internalMetrics.incrementDroppedEvents();
}
}