summaryrefslogtreecommitdiff
path: root/kafka/kafka-operation.sh
blob: fbcbf11d529e9f19d1dd999cd89060adf14dab39 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/bin/bash

LOCAL_IP=127.0.0.1:9094

ZK_SERVER=127.0.0.1:2181/kafka
KAFKA_SERVER=127.0.0.1:9092
PARTITIONS=1

usage() {
  echo "------------------------------------------------------------------------"
  echo -e "生产数据\n"
  echo "交互式:kafka-operation.sh producer MOCK-DATA-SESSION-RECORD" | column -t
  echo "读取文件:kafka-operation.sh producer MOCK-DATA-SESSION-RECORD < mock_data.txt" | column -t
  echo "------------------------------------------------------------------------"
  echo -e "消费数据\n"
  echo "从当前消费:kafka-operation.sh consumer MOCK-DATA-SESSION-RECORD" | column -t
  echo "从头消费:kafka-operation.sh consumer-begin MOCK-DATA-SESSION-RECORD" | column -t
  echo "------------------------------------------------------------------------"
  echo -e "创建Topic\n"
  echo "kafka-operation.sh create-topic [topic name] [partition num] [replication num]" | column -t
  echo "样例:kafka-operation.sh create-topic MOCK-DATA-SESSION-RECORD 3 1" | column -t
  echo "------------------------------------------------------------------------"
  echo -e "删除Topic\n"
  echo "kafka-operation.sh delete-topic [topic name]" | column -t
  echo "样例:kafka-operation.sh delete-topic MOCK-DATA-SESSION-RECORD" | column -t
  echo "------------------------------------------------------------------------"
  echo -e "查看Topic列表\n"
  echo "kafka-operation.sh list" | column -t
  echo "------------------------------------------------------------------------"
  echo -e "查看Topic详情\n"
  echo "kafka-operation.sh desc-topic [topic name]" | column -t  
  echo "样例:kafka-operation.sh desc-topic MOCK-DATA-SESSION-RECORD" | column -t  
  echo "------------------------------------------------------------------------"  
  echo -e "查看消费组列表\n"
  echo "kafka-operation.sh groups" | column -t
  echo "------------------------------------------------------------------------"
  echo -e "查看消费组详情\n"
  echo "kafka-operation.sh desc-group [consumer group name]" | column -t  
  echo "样例:kafka-operation.sh desc-group etl-session-record-kafka-to-kafka" | column -t  
  echo "------------------------------------------------------------------------"
  echo -e "增加Topic分区\n"
  echo "kafka-operation.sh add-partition [topic name] [new partition num]" | column -t
  echo "样例:kafka-operation.sh add-partition MOCK-DATA-SESSION-RECORD 5" | column -t
  echo "------------------------------------------------------------------------"
  echo -e "Topic Leader平衡\n"
  echo "kafka-operation.sh election-leader" | column -t
  echo "------------------------------------------------------------------------"  
  echo -e "查看Quotas\n"
  echo "kafka-operation.sh desc-quota" | column -t
  echo "------------------------------------------------------------------------" 
  echo -e "增加Quotas\n"
  echo "kafka-operation.sh add-quota [quatos rule] [user] [client.id]" | column -t
  echo "kafka-operation.sh add-quota 'producer_byte_rate=10485760' tsg_olap SESSION-RECORD" | column -t
  echo "------------------------------------------------------------------------" 
  echo -e "删除Quotas\n"
  echo "kafka-operation.sh delete-quota [quatos rule] [user] [client.id]" | column -t
  echo "kafka-operation.sh delete-quota 'producer_byte_rate' tsg_olap SESSION-RECORD" | column -t
  echo "------------------------------------------------------------------------" 
}

if [ $# -eq 0 ]; then
  usage
  exit 0
fi

case $1 in
  producer)
    kafka-console-producer.sh --producer.config $KAFKA_HOME/config/producer.properties --broker-list $LOCAL_IP --topic $2
  ;;
  consumer)
    kafka-console-consumer.sh --consumer.config $KAFKA_HOME/config/consumer.properties --bootstrap-server $LOCAL_IP --topic $2
  ;;
  consumer-begin)
    kafka-console-consumer.sh --consumer.config $KAFKA_HOME/config/consumer.properties --from-beginning --bootstrap-server $LOCAL_IP --topic $2
  ;;
  create-topic)
    kafka-topics.sh --if-not-exists --create --bootstrap-server $KAFKA_SERVER --topic $2 --partitions $3 --replication-factor $4
  ;;  
  delete-topic)
    kafka-topics.sh --if-exists --delete --bootstrap-server $KAFKA_SERVER --topic $2
  ;;
  list)
    kafka-topics.sh --list --bootstrap-server $KAFKA_SERVER
  ;;
  desc-topic)
    kafka-topics.sh --if-exists --bootstrap-server $KAFKA_SERVER --describe --topic $2
  ;;
  groups)
    kafka-consumer-groups.sh --all-groups --all-topics --list --bootstrap-server $KAFKA_SERVER
  ;;
  desc-group)
    kafka-consumer-groups.sh --bootstrap-server $KAFKA_SERVER --describe --group $2
  ;;  
  add-partition)
    kafka-topics.sh --if-exists --bootstrap-server $KAFKA_SERVER --alter --topic $2 --partitions $3
  ;;
  election-leader)
    kafka-leader-election.sh --bootstrap-server $KAFKA_SERVER --all-topic-partitions --election-type PREFERRED
  ;;
  desc-quota)
    kafka-configs.sh --bootstrap-server  $KAFKA_SERVER  --describe --entity-type users  --entity-type clients 
  ;;
  add-quota) 
    kafka-configs.sh --bootstrap-server  $KAFKA_SERVER --alter --add-config  $2  --entity-type users --entity-name $3 --entity-type clients --entity-name $4
  ;;
  delete-quota)
    kafka-configs.sh --bootstrap-server  $KAFKA_SERVER --alter --delete-config  $2  --entity-type users --entity-name $3 --entity-type clients --entity-name $4
  ;;
  *)
  usage
esac