summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-09-10 18:39:48 +0800
committerwanglihui <[email protected]>2021-09-10 18:39:48 +0800
commit8ca0e70ceba87f9fe3daddcd42c6734b3f841832 (patch)
tree35a77bb4a5c87157e6f97b1984fb6ebe59f5657d
parent6a1dcfe8a7bfb5dfd09c3d000e9854f6a7e1fd4d (diff)
新增MiddleStreamBolt并行度配置tsg-v08
-rw-r--r--src/main/java/com/zdjizhi/common/CommonConfig.java2
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java2
-rw-r--r--src/main/resources/common.properties3
3 files changed, 6 insertions, 1 deletions
diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java
index cc53165..52436cb 100644
--- a/src/main/java/com/zdjizhi/common/CommonConfig.java
+++ b/src/main/java/com/zdjizhi/common/CommonConfig.java
@@ -30,6 +30,8 @@ public class CommonConfig {
public static final int STORM_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("storm.window.max.time");
+ public static final int MIDDLE_STREAM_BOLT_PARALLELISM = CommonConfigurations.getIntProperty("middle.stream.bolt.parallelism");
+
public static final int SOURCE_IP_LIST_LIMIT = CommonConfigurations.getIntProperty("source.ip.list.limit");
public static final int DATA_CENTER_ID_NUM = CommonConfigurations.getIntProperty("data.center.id.num");
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index d9059d6..10db13b 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -61,7 +61,7 @@ public class LogFlowWriteTopology {
builder.setSpout("SketchLogSpout", new CustomizedKafkaSpout(), CommonConfig.KAFKA_INPUT_PARALLELISM);
builder.setBolt("ParseSketchLogBolt", new ParseSketchLogBolt(), CommonConfig.KAFKA_INPUT_PARALLELISM)
.localOrShuffleGrouping("SketchLogSpout");
- builder.setBolt("MiddleStreamBolt", new MiddleStreamBolt())
+ builder.setBolt("MiddleStreamBolt", new MiddleStreamBolt(),CommonConfig.MIDDLE_STREAM_BOLT_PARALLELISM)
.fieldsGrouping("ParseSketchLogBolt", new Fields("attack_type", "destination_ip"));
builder.setBolt("ServerIpMetricsBolt", new ServerIpMetricsBolt(), CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM)
.localOrShuffleGrouping("MiddleStreamBolt");
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index d63be2f..b3620e6 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -50,6 +50,9 @@ hbase.baseline.total.num=1000000
#计算窗口大小,默认600s
storm.window.max.time=10
+#聚合计算并行度大小
+middle.stream.bolt.parallelism=1
+
#dos event结果中distinct source IP限制
source.ip.list.limit=10000