summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorlee <[email protected]>2020-06-08 15:32:26 +0800
committerlee <[email protected]>2020-06-08 15:32:26 +0800
commitcb4ee7544e1c75e15fcc45e800e047ec269816db (patch)
tree68e566dabc6537248370df89fd4848dd5fdf911a /src
parent36b04e3feaaf1f1c5af1a005af20d2eb28804bc7 (diff)
OLAP预聚合代码更新original
Diffstat (limited to 'src')
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java23
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java1
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java111
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java35
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java6
-rw-r--r--src/test/java/com/wp/AppTest.java92
6 files changed, 201 insertions, 67 deletions
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java
index cd179cd..f63884d 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java
+++ b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java
@@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit;
**/
public class AggregateTopology {
+
+
+
public static void main(String[] args) {
//TODO 创建一个topo任务
TridentTopology topology = new TridentTopology();
@@ -29,16 +32,26 @@ public class AggregateTopology {
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
- .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)
+ .name("one")
+ .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度
+ .name("two")
.each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
- .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
- .slidingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
- .each(new Fields("map"), new KafkaBolt(), new Fields());
+ .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度
+ .name("three")
+ .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
+ .name("four")
+ .each(new Fields("map"), new KafkaBolt(), new Fields())
+ .name("five")
+ .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度
+ .name("six");
Config config = new Config();
+// config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
config.setDebug(false);
- config.setNumWorkers(5);
+ config.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); //worker的数量
LocalCluster cluster = new LocalCluster();
+
+
cluster.submitTopology("trident-wordcount", config, topology.build());
// StormSubmitter.submitTopology("kafka2storm_opaqueTrident_topology", config,topology.build());
}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java
index 21ce8d3..2583b5b 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java
+++ b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java
@@ -29,6 +29,7 @@ public class TridentKafkaSpout {
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.startOffsetTime = -1L;
+ kafkaConfig.socketTimeoutMs=60000;
//不透明事务型Spout
opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java
new file mode 100644
index 0000000..d981fe1
--- /dev/null
+++ b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java
@@ -0,0 +1,111 @@
+package cn.ac.iie.trident.aggregate.topology;
+
+
+import cn.ac.iie.trident.aggregate.AggCount;
+import cn.ac.iie.trident.aggregate.ParseJson2KV;
+import cn.ac.iie.trident.aggregate.bolt.KafkaBolt;
+import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout;
+import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.tuple.Fields;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Storm程序主类
+ *
+ * @author Administrator
+ */
+
+public class LogFlowWriteTopology {
+ private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
+
+ private TopologyBuilder builder;
+ private static TridentTopology tridentTopology;
+
+
+
+
+
+ private static Config createTopologConfig() {
+ Config conf = new Config();
+ conf.setDebug(false);
+ conf.setMessageTimeoutSecs(60);
+ conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
+ return conf;
+ }
+
+
+ private static StormTopology buildTopology() {
+
+ OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
+
+ tridentTopology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
+ .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)
+ .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
+ .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
+ .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
+ .each(new Fields("map"), new KafkaBolt(), new Fields());
+ return tridentTopology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+
+
+ Config conf = new Config();
+ conf.setDebug(false);
+ conf.setMessageTimeoutSecs(60);
+ conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
+
+ //TODO 创建一个topo任务
+ TridentTopology topology = new TridentTopology();
+ //TODO 为Topo绑定Spout
+ OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
+
+ /* topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
+ .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)//6
+ .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
+ .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)//9
+ .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
+ .each(new Fields("map"), new KafkaBolt(), new Fields());*/
+
+
+ topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
+ .name("one")
+ .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度
+ .name("two")
+ .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
+ .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度
+ .name("three")
+ .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
+ .name("four")
+ .each(new Fields("map"), new KafkaBolt(), new Fields())
+ .name("five")
+ .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度
+ .name("six");
+
+ if(args.length == 0){//本地模式运行
+
+ LocalCluster cluster = new LocalCluster();
+
+ cluster.submitTopology("trident-function", conf, topology.build());
+ Thread.sleep(100000);
+ cluster.shutdown();
+ }else{//集群模式运行
+ StormSubmitter.submitTopology(args[0], conf, topology.build());
+ }
+
+ }
+}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java
new file mode 100644
index 0000000..708f77c
--- /dev/null
+++ b/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java
@@ -0,0 +1,35 @@
+package cn.ac.iie.trident.aggregate.topology;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+
+/**
+ * @author Administrator
+ */
+public final class StormRunner {
+ private static final int MILLS_IN_SEC = 1000;
+
+ private StormRunner() {}
+
+ public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
+
+ LocalCluster localCluster = new LocalCluster();
+ localCluster.submitTopology(topologyName, conf, builder.createTopology());
+ Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
+ localCluster.shutdown();
+
+ }
+
+ public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+
+ StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java
index 0ab90a5..3905580 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java
+++ b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java
@@ -23,6 +23,10 @@ public class FlowWriteConfig {
public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
+
+ public static final Integer AGG_TIME = FlowWriteConfigurations.getIntProperty(0, "agg.time");
+
+
/**
* kafka
*/
@@ -36,8 +40,6 @@ public class FlowWriteConfig {
public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset");
public static final String KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "kafka.compression.type");
- public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library");
-
/**
* http
*/
diff --git a/src/test/java/com/wp/AppTest.java b/src/test/java/com/wp/AppTest.java
index 3202adb..77c826f 100644
--- a/src/test/java/com/wp/AppTest.java
+++ b/src/test/java/com/wp/AppTest.java
@@ -7,78 +7,50 @@ import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
/**
* Unit test for simple App.
*/
-public class AppTest
- extends TestCase
-{
- /**
- * Create the test case
- *
- * @param testName name of the test case
- */
- public AppTest( String testName )
- {
- super( testName );
- }
-
- /**
- * @return the suite of tests being tested
- */
- public static Test suite()
- {
- return new TestSuite( AppTest.class );
- }
+public class AppTest{
- /**
- * Rigourous Test :-)
- */
- public void testApp()
- {
- assertTrue( true );
- }
-
-
- private static ValueBean valueBean;
@org.junit.Test
public void test(){
- System.out.println(valueBean == null);
+ Config conf = new Config();
+// conf.setDebug(false);
+ conf.setMessageTimeoutSecs(60);
+ conf.setNumWorkers(1);
+
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,
+ new Values("nickt1", 4),
+ new Values("nickt2", 7),
+ new Values("nickt3", 8),
+ new Values("nickt4", 9),
+ new Values("nickt5", 7),
+ new Values("nickt6", 11),
+ new Values("nickt7", 5)
+ );
+ spout.setCycle(true);
+ TridentTopology topology = new TridentTopology();
+ topology.newStream("spout1", spout)
+ .batchGlobal()
+ .each(new Fields("user"),new Debug("print:"))
+ .parallelismHint(5);
+
+ LocalCluster cluster = new LocalCluster();
+
+ cluster.submitTopology("trident-function", conf, topology.build());
}
- static class Demo{
- private String a;
- private String b;
- private String c;
-
- public String getA() {
- return a;
- }
-
- public void setA(String a) {
- this.a = a;
- }
-
- public String getB() {
- return b;
- }
-
- public void setB(String b) {
- this.b = b;
- }
-
- public String getC() {
- return c;
- }
-
- public void setC(String c) {
- this.c = c;
- }
- }
}