blob: c198cad99c8a06ba8b9fd6400c1d9bc78c6bc27e (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
#--------------------------------地址配置------------------------------#
#管理kafka地址
input.kafka.servers={{ consumer_kafka_servers }}
#管理输出kafka地址
output.kafka.servers={{ producer_kafka_servers }}
#网关的schema位置
schema.http=http://{{ gateway_keepalive_host }}:9999/metadata/schema/v1/fields/voip_record_log
#kafka broker下的topic名称
kafka.topic=VOIP-RECORD-LOG
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
group.id=voip-relation-20210407-1
#接收自kafka的消费者 client-id
consumer.client.id=consumer-voip-record
#回写给kafka的生产者 client-id
producer.client.id=producer-voip-record
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none
#生产者ack
producer.ack=1
#latest/earliest 从当前消 or 从头消费
auto.offset.reset=latest
#输出topic
results.output.topic=VOIP-CONVERSATION-RECORD-LOG
#--------------------------------topology配置------------------------------#
#storm topology workers
topology.workers=1
#spout并行度 建议与kafka分区数相同
spout.parallelism=3
#关联SIP的bolt并行度-关联SIP后将按照四元组拆分缓存,定时按Key分发到下一个bolt,内网IP定期缓存发入下一bolt,经下一bolt入Kafka
sip.correlation.bolt.parallelism=1
#关联SIP与RTP的bolt并行度-worker的倍数-关联不上的SIP或者RTP(包含四元组,SchemaType为SIP或RTP)按四元组分发到Kafka-bolt缓存,为后续二次关联做准备
sr.correlation.bolt.parallelism=3
#写入kafka的并行度,同时负责二次关联,从上游拿到已关联的(VOIP)缓存后就直接写入Kafka,未关联的(SIP或RTP)就和缓存关联,关联上就放入发送Kafka的缓存,关联不上就放入未关联缓存
kafka.bolt.parallelism=3
#ack设置 1启动ack 0不启动ack
topology.num.acks=1
#kafka批量条数
batch.insert.num=500
#tick时钟频率-SIP关联缓存周期
sip.cr.topology.tick.tuple.freq.secs=60
#tick时钟频率-SIP-RTP关联缓存周期-暂不使用
sr.cr.topology.tick.tuple.freq.secs=60
#tick时钟频率-kafka写入周期
topology.tick.tuple.freq.secs=5
#--------------------------------默认值配置------------------------------#
#当bolt性能受限时,限制spout接收速度,理论看ack开启才有效
topology.config.max.spout.pending=70000
#spout接收睡眠时间
topology.spout.sleep.time=1
#允许发送kafka最大失败数
max.failure.num=20
#--------------------------------voip------------------------------#
#voip二次关联缓存时间-秒
sec.combine.sr.cache.secs=300
|