summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorlee <[email protected]>2020-06-04 15:55:12 +0800
committerlee <[email protected]>2020-06-04 15:55:12 +0800
commit36b04e3feaaf1f1c5af1a005af20d2eb28804bc7 (patch)
tree807a56280a57df565d2ce6030073c02951b397cb /src
parenta34f8c4df6280dc0a80b7e500c2869ba97971f9c (diff)
OLAP预聚合代码初始版本
Diffstat (limited to 'src')
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java39
1 files changed, 39 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java
new file mode 100644
index 0000000..21ce8d3
--- /dev/null
+++ b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java
@@ -0,0 +1,39 @@
+package cn.ac.iie.trident.aggregate.spout;
+
+import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
+import org.apache.storm.kafka.BrokerHosts;
+import org.apache.storm.kafka.StringScheme;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
+import org.apache.storm.kafka.trident.TridentKafkaConfig;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+
+/**
+ * @ClassNameKafkaSpout
+ * @Author [email protected]
+ * @Date2020/6/4 11:55
+ * @Version V1.0
+ **/
+public class TridentKafkaSpout {
+
+
+ /**
+ * kafka生产者适配器(单例),用来代理kafka生产者发送消息
+ */
+ private static OpaqueTridentKafkaSpout opaqueTridentKafkaSpout;
+
+ public static OpaqueTridentKafkaSpout getInstance() {
+ if (opaqueTridentKafkaSpout == null) {
+
+ BrokerHosts zkHosts = new ZkHosts(FlowWriteConfig.ZOOKEEPER_SERVERS);
+ TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC);
+ kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
+ kafkaConfig.startOffsetTime = -1L;
+
+ //不透明事务型Spout
+ opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
+ }
+ return opaqueTridentKafkaSpout;
+ }
+
+}