diff options
| author | wanglihui <[email protected]> | 2021-09-10 18:39:48 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2021-09-10 18:39:48 +0800 |
| commit | 8ca0e70ceba87f9fe3daddcd42c6734b3f841832 (patch) | |
| tree | 35a77bb4a5c87157e6f97b1984fb6ebe59f5657d | |
| parent | 6a1dcfe8a7bfb5dfd09c3d000e9854f6a7e1fd4d (diff) | |
新增MiddleStreamBolt并行度配置tsg-v08
| -rw-r--r-- | src/main/java/com/zdjizhi/common/CommonConfig.java | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java | 2 | ||||
| -rw-r--r-- | src/main/resources/common.properties | 3 |
3 files changed, 6 insertions, 1 deletions
diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index cc53165..52436cb 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -30,6 +30,8 @@ public class CommonConfig { public static final int STORM_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("storm.window.max.time"); + public static final int MIDDLE_STREAM_BOLT_PARALLELISM = CommonConfigurations.getIntProperty("middle.stream.bolt.parallelism"); + public static final int SOURCE_IP_LIST_LIMIT = CommonConfigurations.getIntProperty("source.ip.list.limit"); public static final int DATA_CENTER_ID_NUM = CommonConfigurations.getIntProperty("data.center.id.num"); diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index d9059d6..10db13b 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -61,7 +61,7 @@ public class LogFlowWriteTopology { builder.setSpout("SketchLogSpout", new CustomizedKafkaSpout(), CommonConfig.KAFKA_INPUT_PARALLELISM); builder.setBolt("ParseSketchLogBolt", new ParseSketchLogBolt(), CommonConfig.KAFKA_INPUT_PARALLELISM) .localOrShuffleGrouping("SketchLogSpout"); - builder.setBolt("MiddleStreamBolt", new MiddleStreamBolt()) + builder.setBolt("MiddleStreamBolt", new MiddleStreamBolt(),CommonConfig.MIDDLE_STREAM_BOLT_PARALLELISM) .fieldsGrouping("ParseSketchLogBolt", new Fields("attack_type", "destination_ip")); builder.setBolt("ServerIpMetricsBolt", new ServerIpMetricsBolt(), CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM) .localOrShuffleGrouping("MiddleStreamBolt"); diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index d63be2f..b3620e6 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -50,6 +50,9 @@ hbase.baseline.total.num=1000000 #计算窗口大小,默认600s storm.window.max.time=10 +#聚合计算并行度大小 +middle.stream.bolt.parallelism=1 + #dos event结果中distinct source IP限制 source.ip.list.limit=10000 |
