diff options
| author | wangkuan <[email protected]> | 2024-02-20 17:58:27 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-02-20 17:58:27 +0800 |
| commit | f0c8b24fefaaa9d5567120a95c62290b7a909792 (patch) | |
| tree | 36c9966911e69f3711f91b9cce94b3aa837b1fa3 | |
| parent | 346dda6ac57bc8ead0939f30f63f926a0d3c07e1 (diff) | |
[feature][core]统一内置metrics输出,修改ProjectionProcessFunction对error日志的统计逻辑
3 files changed, 71 insertions, 28 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java index 144d138..facb4af 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java @@ -2,6 +2,7 @@ package com.geedgenetworks.core.filter; import com.geedgenetworks.common.utils.ColumnUtil; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.metrics.InternalMetrics; import com.geedgenetworks.core.pojo.FilterConfig; import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.AviatorEvaluatorInstance; @@ -19,32 +20,26 @@ public class FilterFunction extends RichFilterFunction<Event> { private final com.geedgenetworks.core.pojo.FilterConfig FilterConfig; private static Expression compiledExp; private static String expression; - private transient Counter droppedEvents; - private transient Counter errorEvents; - private transient Counter outEvents; - private transient Counter inEvents; + private transient InternalMetrics internalMetrics; + public FilterFunction(FilterConfig FilterConfig) { this.FilterConfig = FilterConfig; } @Override public void open(Configuration parameters) throws Exception { - + this.internalMetrics = new InternalMetrics(getRuntimeContext()); expression = FilterConfig.getProperties().getOrDefault("expression", "").toString(); AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); instance.setCachedExpressionByDefault(true); instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); instance.setFunctionMissing(null); compiledExp = instance.compile(expression, true); - this.errorEvents = getRuntimeContext().getMetricGroup().counter("error_events"); - this.droppedEvents = getRuntimeContext().getMetricGroup().counter("dropped_events"); - this.inEvents = getRuntimeContext().getMetricGroup().counter("in_events"); - this.outEvents = getRuntimeContext().getMetricGroup().counter("out_events"); } @Override public boolean filter(Event value) { - inEvents.inc(); + internalMetrics.incrementInEvents(); boolean isFilter ; try { @@ -59,17 +54,17 @@ public class FilterFunction extends RichFilterFunction<Event> { } catch (ExpressionRuntimeException e){ isFilter = false; log.error("Invalid filter ! expression=" +expression); - errorEvents.inc(); + internalMetrics.incrementErrorEvents(); } catch (RuntimeException ignored) { isFilter = false; - errorEvents.inc(); + internalMetrics.incrementErrorEvents(); } if(isFilter){ - outEvents.inc(); + internalMetrics.incrementOutEvents(); } else { - droppedEvents.inc(); + internalMetrics.incrementDroppedEvents(); } return isFilter; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java b/groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java new file mode 100644 index 0000000..ff68845 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java @@ -0,0 +1,49 @@ +package com.geedgenetworks.core.metrics; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; + +public class InternalMetrics { + private final MetricGroup metricGroup; + private final Counter errorEvents; + private final Counter droppedEvents; + private final Counter inEvents; + private final Counter outEvents; + private final Counter inBytes; + private final Counter outBytes; + + public InternalMetrics(RuntimeContext runtimeContext) { + metricGroup = runtimeContext.getMetricGroup().addGroup("internal_metrics"); + errorEvents = metricGroup.counter("error_events"); + droppedEvents = metricGroup.counter("dropped_events"); + inEvents = metricGroup.counter("in_events"); + outEvents = metricGroup.counter("out_events"); + inBytes = metricGroup.counter("in_bytes"); + outBytes = metricGroup.counter("out_bytes"); + } + + public void incrementErrorEvents() { + errorEvents.inc(); + } + + public void incrementDroppedEvents() { + droppedEvents.inc(); + } + + public void incrementInEvents() { + inEvents.inc(); + } + + public void incrementOutEvents() { + outEvents.inc(); + } + + public void incrementInBytes(long bytes) { + inBytes.inc(bytes); + } + + public void incrementOutBytes(long bytes) { + outBytes.inc(bytes); + } +}
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index 8cd889c..71454dc 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -8,6 +8,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.UDFContext; import com.geedgenetworks.common.utils.ColumnUtil; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.metrics.InternalMetrics; import com.geedgenetworks.core.pojo.ProjectionConfig; import com.geedgenetworks.common.udf.UDF; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseScheduler; @@ -33,10 +34,7 @@ import java.util.Map; public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { private final ProjectionConfig projectionConfig; private LinkedList<UdfEntity> functions; - private transient Counter droppedEvents; - private transient Counter errorEvents; - private transient Counter outEvents; - private transient Counter inEvents; + private transient InternalMetrics internalMetrics; public ProjectionProcessFunction(ProjectionConfig projectionConfig) { @@ -47,6 +45,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { public void open(Configuration parameters) throws Exception { functions = Lists.newLinkedList(); try { + this.internalMetrics = new InternalMetrics(getRuntimeContext()); List<UDFContext> udfContexts = projectionConfig.getFunctions(); if (udfContexts == null || udfContexts.isEmpty()) { return; @@ -83,10 +82,6 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { } } - this.errorEvents = getRuntimeContext().getMetricGroup().counter("error_events"); - this.droppedEvents = getRuntimeContext().getMetricGroup().counter("dropped_events"); - this.inEvents = getRuntimeContext().getMetricGroup().counter("in_events"); - this.outEvents = getRuntimeContext().getMetricGroup().counter("out_events"); } catch (Exception e) { throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDF failed!", e); } @@ -95,8 +90,10 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { @Override public void processElement(Event event, Context ctx, Collector<Event> out) { - inEvents.inc(); + internalMetrics.incrementInEvents(); + int errorCount = 0; for (UdfEntity udfEntity : functions) { + try { if (event.isDropped()) { break; @@ -108,14 +105,16 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { } } catch (ExpressionRuntimeException ignore) { log.error("Function " + udfEntity.getName() + " Invalid filter ! "); - errorEvents.inc(); + errorCount++; } catch (Exception e) { log.error("Function " + udfEntity.getName() + " execute exception !", e); - errorEvents.inc(); + errorCount++; } } - + if(errorCount>0){ + internalMetrics.incrementErrorEvents(); + } // 判断函数是否有output fields,减少输出字段 if (projectionConfig.getOutput_fields() != null && !projectionConfig.getOutput_fields().isEmpty()) { @@ -131,9 +130,9 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { } if (!event.isDropped()) { out.collect(event); - outEvents.inc(); + internalMetrics.incrementOutEvents(); } else { - droppedEvents.inc(); + internalMetrics.incrementDroppedEvents(); } } |
