summaryrefslogtreecommitdiff
path: root/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java')
-rw-r--r--src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java66
1 files changed, 66 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
new file mode 100644
index 0000000..bddce06
--- /dev/null
+++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
@@ -0,0 +1,66 @@
+package cn.ac.iie.spout;
+
+import cn.ac.iie.common.RealtimeCountConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.log4j.Logger;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+public class CustomizedKafkaSpout extends BaseRichSpout{
+ private static final long serialVersionUID = -3363788553406229592L;
+ private KafkaConsumer<String, String> consumer;
+ private SpoutOutputCollector collector = null;
+ private TopologyContext context = null;
+ private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
+
+
+ private static Properties createConsumerConfig() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", RealtimeCountConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", RealtimeCountConfig.GROUP_ID_PREFIX+"-rc-"+ RealtimeCountConfig.GROUP_ID);
+ props.put("auto.offset.reset", RealtimeCountConfig.AUTO_OFFSET_RESET);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ return props;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ // TODO Auto-generated method stub
+ this.collector=collector;
+ this.context=context;
+ Properties prop = createConsumerConfig();
+ this.consumer = new KafkaConsumer<>(prop);
+ this.consumer.subscribe(Arrays.asList(RealtimeCountConfig.KAFKA_TOPIC));
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+
+ @Override
+ public void nextTuple() {
+ // TODO Auto-generated method stub
+ ConsumerRecords<String, String> records = consumer.poll(10000L);
+ for (ConsumerRecord<String, String> record : records) {
+ this.collector.emit(new Values(record.value()));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // TODO Auto-generated method stub
+ declarer.declare(new Fields("source"));
+ }
+}