summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-04-18 11:55:00 +0800
committergujinkai <[email protected]>2024-04-18 11:55:00 +0800
commit7fc963401f377041ba8d77b791414151a0628686 (patch)
tree6213e098fbd6836faa22542374bf1fb796830e83
parentcc03e6750cc57bab8bf0ba8917080581a6dc751a (diff)
feature: add operator name in LocationMetric
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java6
1 files changed, 4 insertions, 2 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java
index 574c256..e552453 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java
@@ -37,12 +37,14 @@ public class LocationMetric implements Schedule {
SingleOutputStreamOperator<String> locationSubscriberMetric = process.keyBy(LocationSubscriber::getSubscriber_id)
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
- .reduce(new SecondAggregationReduce(), new MetricSubscriberProcessWindowFunc());
+ .reduce(new SecondAggregationReduce(), new MetricSubscriberProcessWindowFunc())
+ .name("locationSubscriberMetric");
locationSubscriberMetric
.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TABLE)))
//.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TOPIC)))
- .setParallelism(outputParallelism);
+ .setParallelism(outputParallelism)
+ .name("locationSubscriberMetricSink");
//todo GMLC数据源字段详情不清楚,后续再开发GMLC->location_subscriber的数据流
}