diff options
| author | lee <[email protected]> | 2020-06-04 15:55:12 +0800 |
|---|---|---|
| committer | lee <[email protected]> | 2020-06-04 15:55:12 +0800 |
| commit | 36b04e3feaaf1f1c5af1a005af20d2eb28804bc7 (patch) | |
| tree | 807a56280a57df565d2ce6030073c02951b397cb /src | |
| parent | a34f8c4df6280dc0a80b7e500c2869ba97971f9c (diff) | |
OLAP预聚合代码初始版本
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java new file mode 100644 index 0000000..21ce8d3 --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java @@ -0,0 +1,39 @@ +package cn.ac.iie.trident.aggregate.spout; + +import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; +import org.apache.storm.kafka.BrokerHosts; +import org.apache.storm.kafka.StringScheme; +import org.apache.storm.kafka.ZkHosts; +import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout; +import org.apache.storm.kafka.trident.TridentKafkaConfig; +import org.apache.storm.spout.SchemeAsMultiScheme; + +/** + * @ClassNameKafkaSpout + * @Author [email protected] + * @Date2020/6/4 11:55 + * @Version V1.0 + **/ +public class TridentKafkaSpout { + + + /** + * kafka生产者适配器(单例),用来代理kafka生产者发送消息 + */ + private static OpaqueTridentKafkaSpout opaqueTridentKafkaSpout; + + public static OpaqueTridentKafkaSpout getInstance() { + if (opaqueTridentKafkaSpout == null) { + + BrokerHosts zkHosts = new ZkHosts(FlowWriteConfig.ZOOKEEPER_SERVERS); + TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC); + kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); + kafkaConfig.startOffsetTime = -1L; + + //不透明事务型Spout + opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig); + } + return opaqueTridentKafkaSpout; + } + +} |
