diff options
Diffstat (limited to 'src/main/java/cn/ac/iie/spout')
| -rw-r--r-- | src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java | 66 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/spout/sip/SIP_ORIGIN_ALL_KafkaSpout.java | 79 |
2 files changed, 145 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java new file mode 100644 index 0000000..bddce06 --- /dev/null +++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java @@ -0,0 +1,66 @@ +package cn.ac.iie.spout; + +import cn.ac.iie.common.RealtimeCountConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +public class CustomizedKafkaSpout extends BaseRichSpout{ + private static final long serialVersionUID = -3363788553406229592L; + private KafkaConsumer<String, String> consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class); + + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", RealtimeCountConfig.BOOTSTRAP_SERVERS); + props.put("group.id", RealtimeCountConfig.GROUP_ID_PREFIX+"-rc-"+ RealtimeCountConfig.GROUP_ID); + props.put("auto.offset.reset", RealtimeCountConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + // TODO Auto-generated method stub + this.collector=collector; + this.context=context; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Arrays.asList(RealtimeCountConfig.KAFKA_TOPIC)); + } + + @Override + public void close() { + consumer.close(); + } + + @Override + public void nextTuple() { + // TODO Auto-generated method stub + ConsumerRecords<String, String> records = consumer.poll(10000L); + for (ConsumerRecord<String, String> record : records) { + this.collector.emit(new Values(record.value())); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // TODO Auto-generated method stub + declarer.declare(new Fields("source")); + } +} diff --git a/src/main/java/cn/ac/iie/spout/sip/SIP_ORIGIN_ALL_KafkaSpout.java b/src/main/java/cn/ac/iie/spout/sip/SIP_ORIGIN_ALL_KafkaSpout.java new file mode 100644 index 0000000..c267533 --- /dev/null +++ b/src/main/java/cn/ac/iie/spout/sip/SIP_ORIGIN_ALL_KafkaSpout.java @@ -0,0 +1,79 @@ +package cn.ac.iie.spout.sip; + +import cn.ac.iie.bean.voipSipOrigin.SipOriginALL; +import cn.ac.iie.common.RealtimeCountConfig; +import com.alibaba.fastjson.JSONObject; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +public class SIP_ORIGIN_ALL_KafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = -3363788553406229592L; + private KafkaConsumer<String, String> consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(SIP_ORIGIN_ALL_KafkaSpout.class); + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", RealtimeCountConfig.BOOTSTRAP_SERVERS); + props.put("group.id", RealtimeCountConfig.GROUP_ID_PREFIX + "-sip-" + RealtimeCountConfig.GROUP_ID_SUFFIX); + props.put("fetch.max.bytes", RealtimeCountConfig.FETCH_MAX_BYTES);//默认52428800 + props.put("max.partition.fetch.bytes", RealtimeCountConfig.MAX_PARTITION_FETCH_BYTES);//默认1048576 + props.put("max.poll.interval.ms", RealtimeCountConfig.MAX_POLL_INTERVAL_MS);//默认300000 + props.put("max.poll.records", RealtimeCountConfig.MAX_POLL_RECORDS);//默认500 + props.put("session.timeout.ms", RealtimeCountConfig.SESSION_TIMEOUT_MS);//默认10000 + props.put("auto.offset.reset", RealtimeCountConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + // TODO Auto-generated method stub + this.collector = collector; + this.context = context; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Arrays.asList(RealtimeCountConfig.KAFKA_SIP_ORIGIN_TOPIC)); + } + + @Override + public void close() { + consumer.close(); + } + + @Override + public void nextTuple() { + // TODO Auto-generated method stub + try { + ConsumerRecords<String, String> records = consumer.poll(10000L); + for (ConsumerRecord<String, String> record : records) { + SipOriginALL sipOriginLogDisable = JSONObject.parseObject(record.value(), SipOriginALL.class); + if (!("OPTIONS".equals(sipOriginLogDisable.getMethod()))) { + this.collector.emit(new Values(record.value(), RealtimeCountConfig.KAFKA_SIP_ORIGIN_TOPIC)); + } + } + } catch (Exception e) { + logger.error("SIP_ORIGIN_ALL_KafkaSpout-->filter OPTIONS error ---> " + e + " <---"); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // TODO Auto-generated method stub + declarer.declare(new Fields("source", "logtype")); + } +} |
