diff options
| author | gujinkai <[email protected]> | 2024-04-18 11:55:00 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-04-18 11:55:00 +0800 |
| commit | 7fc963401f377041ba8d77b791414151a0628686 (patch) | |
| tree | 6213e098fbd6836faa22542374bf1fb796830e83 | |
| parent | cc03e6750cc57bab8bf0ba8917080581a6dc751a (diff) | |
feature: add operator name in LocationMetric
| -rw-r--r-- | module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java index 574c256..e552453 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/location/LocationMetric.java @@ -37,12 +37,14 @@ public class LocationMetric implements Schedule { SingleOutputStreamOperator<String> locationSubscriberMetric = process.keyBy(LocationSubscriber::getSubscriber_id) .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) - .reduce(new SecondAggregationReduce(), new MetricSubscriberProcessWindowFunc()); + .reduce(new SecondAggregationReduce(), new MetricSubscriberProcessWindowFunc()) + .name("locationSubscriberMetric"); locationSubscriberMetric .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TABLE))) //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.LOCATION_SUBSCRIBER_TOPIC))) - .setParallelism(outputParallelism); + .setParallelism(outputParallelism) + .name("locationSubscriberMetricSink"); //todo GMLC数据源字段详情不清楚,后续再开发GMLC->location_subscriber的数据流 } |
