summaryrefslogtreecommitdiff
path: root/src/main/java/cn/ac/iie/topology/StormRunner.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/cn/ac/iie/topology/StormRunner.java')
-rw-r--r--src/main/java/cn/ac/iie/topology/StormRunner.java32
1 files changed, 32 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/cn/ac/iie/topology/StormRunner.java
new file mode 100644
index 0000000..d2d4ab9
--- /dev/null
+++ b/src/main/java/cn/ac/iie/topology/StormRunner.java
@@ -0,0 +1,32 @@
+package cn.ac.iie.topology;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+
+public final class StormRunner{
+ private static final int MILLS_IN_SEC = 1000;
+
+ private StormRunner() {}
+
+ public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
+
+ LocalCluster localCluster = new LocalCluster();
+ localCluster.submitTopology(topologyName, conf, builder.createTopology());
+ Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
+ localCluster.shutdown();
+
+ }
+
+ public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+
+ StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
+ }
+
+
+}