diff options
Diffstat (limited to 'src')
6 files changed, 201 insertions, 67 deletions
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java index cd179cd..f63884d 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java @@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit; **/ public class AggregateTopology { + + + public static void main(String[] args) { //TODO 创建一个topo任务 TridentTopology topology = new TridentTopology(); @@ -29,16 +32,26 @@ public class AggregateTopology { OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) - .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) + .name("one") + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度 + .name("two") .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) - .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) - .slidingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) - .each(new Fields("map"), new KafkaBolt(), new Fields()); + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度 + .name("three") + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .name("four") + .each(new Fields("map"), new KafkaBolt(), new Fields()) + .name("five") + .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度 + .name("six"); Config config = new Config(); +// config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); config.setDebug(false); - config.setNumWorkers(5); + config.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); //worker的数量 LocalCluster cluster = new LocalCluster(); + + cluster.submitTopology("trident-wordcount", config, topology.build()); // StormSubmitter.submitTopology("kafka2storm_opaqueTrident_topology", config,topology.build()); } diff --git a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java index 21ce8d3..2583b5b 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java @@ -29,6 +29,7 @@ public class TridentKafkaSpout { TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC); kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); kafkaConfig.startOffsetTime = -1L; + kafkaConfig.socketTimeoutMs=60000; //不透明事务型Spout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig); diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java new file mode 100644 index 0000000..d981fe1 --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java @@ -0,0 +1,111 @@ +package cn.ac.iie.trident.aggregate.topology; + + +import cn.ac.iie.trident.aggregate.AggCount; +import cn.ac.iie.trident.aggregate.ParseJson2KV; +import cn.ac.iie.trident.aggregate.bolt.KafkaBolt; +import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout; +import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.tuple.Fields; + +import java.util.concurrent.TimeUnit; + +/** + * Storm程序主类 + * + * @author Administrator + */ + +public class LogFlowWriteTopology { + private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class); + + private TopologyBuilder builder; + private static TridentTopology tridentTopology; + + + + + + private static Config createTopologConfig() { + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); + return conf; + } + + + private static StormTopology buildTopology() { + + OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); + + tridentTopology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) + .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .each(new Fields("map"), new KafkaBolt(), new Fields()); + return tridentTopology.build(); + } + + public static void main(String[] args) throws Exception { + + + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); + + //TODO 创建一个topo任务 + TridentTopology topology = new TridentTopology(); + //TODO 为Topo绑定Spout + OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); + + /* topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)//6 + .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)//9 + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .each(new Fields("map"), new KafkaBolt(), new Fields());*/ + + + topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) + .name("one") + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度 + .name("two") + .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度 + .name("three") + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .name("four") + .each(new Fields("map"), new KafkaBolt(), new Fields()) + .name("five") + .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度 + .name("six"); + + if(args.length == 0){//本地模式运行 + + LocalCluster cluster = new LocalCluster(); + + cluster.submitTopology("trident-function", conf, topology.build()); + Thread.sleep(100000); + cluster.shutdown(); + }else{//集群模式运行 + StormSubmitter.submitTopology(args[0], conf, topology.build()); + } + + } +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java new file mode 100644 index 0000000..708f77c --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java @@ -0,0 +1,35 @@ +package cn.ac.iie.trident.aggregate.topology; + + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; + +/** + * @author Administrator + */ +public final class StormRunner { + private static final int MILLS_IN_SEC = 1000; + + private StormRunner() {} + + public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { + + LocalCluster localCluster = new LocalCluster(); + localCluster.submitTopology(topologyName, conf, builder.createTopology()); + Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC); + localCluster.shutdown(); + + } + + public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); + } + + +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java index 0ab90a5..3905580 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java @@ -23,6 +23,10 @@ public class FlowWriteConfig { public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num"); public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); + + public static final Integer AGG_TIME = FlowWriteConfigurations.getIntProperty(0, "agg.time"); + + /** * kafka */ @@ -36,8 +40,6 @@ public class FlowWriteConfig { public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset"); public static final String KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "kafka.compression.type"); - public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library"); - /** * http */ diff --git a/src/test/java/com/wp/AppTest.java b/src/test/java/com/wp/AppTest.java index 3202adb..77c826f 100644 --- a/src/test/java/com/wp/AppTest.java +++ b/src/test/java/com/wp/AppTest.java @@ -7,78 +7,50 @@ import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; import junit.framework.Test; import junit.framework.TestCase; import junit.framework.TestSuite; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; /** * Unit test for simple App. */ -public class AppTest - extends TestCase -{ - /** - * Create the test case - * - * @param testName name of the test case - */ - public AppTest( String testName ) - { - super( testName ); - } - - /** - * @return the suite of tests being tested - */ - public static Test suite() - { - return new TestSuite( AppTest.class ); - } +public class AppTest{ - /** - * Rigourous Test :-) - */ - public void testApp() - { - assertTrue( true ); - } - - - private static ValueBean valueBean; @org.junit.Test public void test(){ - System.out.println(valueBean == null); + Config conf = new Config(); +// conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setNumWorkers(1); + + FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3, + new Values("nickt1", 4), + new Values("nickt2", 7), + new Values("nickt3", 8), + new Values("nickt4", 9), + new Values("nickt5", 7), + new Values("nickt6", 11), + new Values("nickt7", 5) + ); + spout.setCycle(true); + TridentTopology topology = new TridentTopology(); + topology.newStream("spout1", spout) + .batchGlobal() + .each(new Fields("user"),new Debug("print:")) + .parallelismHint(5); + + LocalCluster cluster = new LocalCluster(); + + cluster.submitTopology("trident-function", conf, topology.build()); } - static class Demo{ - private String a; - private String b; - private String c; - - public String getA() { - return a; - } - - public void setA(String a) { - this.a = a; - } - - public String getB() { - return b; - } - - public void setB(String b) { - this.b = b; - } - - public String getC() { - return c; - } - - public void setC(String c) { - this.c = c; - } - } } |
