summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dependency-reduced-pom.xml125
-rw-r--r--log-stream-aggregation.properties4
-rw-r--r--log-stream-aggregation.xml318
-rw-r--r--pom.xml143
-rw-r--r--properties/service_flow_config.properties3
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java23
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java1
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java111
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java35
-rw-r--r--src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java6
-rw-r--r--src/test/java/com/wp/AppTest.java92
-rw-r--r--storm-topology.log0
12 files changed, 728 insertions, 133 deletions
diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml
new file mode 100644
index 0000000..cc9ded2
--- /dev/null
+++ b/dependency-reduced-pom.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>cn.ac.iie</groupId>
+ <artifactId>log-stream-aggregation</artifactId>
+ <name>log-stream-aggregation</name>
+ <version>0.0.1-SNAPSHOT</version>
+ <url>http://maven.apache.org</url>
+ <build>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ </includes>
+ </resource>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer>
+ <mainClass>cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology</mainClass>
+ </transformer>
+ <transformer>
+ <resource>META-INF/spring.handlers</resource>
+ </transformer>
+ <transformer>
+ <resource>META-INF/spring.schemas</resource>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public</url>
+ </repository>
+ </repositories>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>1.0.2</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>kryo</artifactId>
+ <groupId>com.esotericsoftware</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>clojure</artifactId>
+ <groupId>org.clojure</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>disruptor</artifactId>
+ <groupId>com.lmax</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-api</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-core</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>hamcrest-core</artifactId>
+ <groupId>org.hamcrest</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+</project>
+
diff --git a/log-stream-aggregation.properties b/log-stream-aggregation.properties
new file mode 100644
index 0000000..c77a9da
--- /dev/null
+++ b/log-stream-aggregation.properties
@@ -0,0 +1,4 @@
+path.variable.kotlin_bundled=F\:\\tools\\ideaIU-2018.1.4\\plugins\\Kotlin\\kotlinc
+path.variable.maven_repository=C\:\\Users\\lixikang\\.m2\\repository
+jdk.home.1.8=C\:/developer_tools/jdk1.8.0_131
+javac2.instrumentation.includeJavaRuntime=false \ No newline at end of file
diff --git a/log-stream-aggregation.xml b/log-stream-aggregation.xml
new file mode 100644
index 0000000..197b825
--- /dev/null
+++ b/log-stream-aggregation.xml
@@ -0,0 +1,318 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project name="log-stream-aggregation" default="all">
+
+
+ <property file="log-stream-aggregation.properties"/>
+ <!-- Uncomment the following property if no tests compilation is needed -->
+ <!--
+ <property name="skip.tests" value="true"/>
+ -->
+
+ <!-- Compiler options -->
+
+ <property name="compiler.debug" value="on"/>
+ <property name="compiler.generate.no.warnings" value="off"/>
+ <property name="compiler.args" value=""/>
+ <property name="compiler.max.memory" value="700m"/>
+ <patternset id="ignored.files">
+ <exclude name="**/*.hprof/**"/>
+ <exclude name="**/*.pyc/**"/>
+ <exclude name="**/*.pyo/**"/>
+ <exclude name="**/*.rbc/**"/>
+ <exclude name="**/*.yarb/**"/>
+ <exclude name="**/*~/**"/>
+ <exclude name="**/.DS_Store/**"/>
+ <exclude name="**/.git/**"/>
+ <exclude name="**/.hg/**"/>
+ <exclude name="**/.svn/**"/>
+ <exclude name="**/CVS/**"/>
+ <exclude name="**/__pycache__/**"/>
+ <exclude name="**/_svn/**"/>
+ <exclude name="**/vssver.scc/**"/>
+ <exclude name="**/vssver2.scc/**"/>
+ </patternset>
+ <patternset id="library.patterns">
+ <include name="*.egg"/>
+ <include name="*.jar"/>
+ <include name="*.ear"/>
+ <include name="*.swc"/>
+ <include name="*.war"/>
+ <include name="*.zip"/>
+ <include name="*.ane"/>
+ </patternset>
+ <patternset id="compiler.resources">
+ <exclude name="**/?*.java"/>
+ <exclude name="**/?*.form"/>
+ <exclude name="**/?*.class"/>
+ <exclude name="**/?*.groovy"/>
+ <exclude name="**/?*.scala"/>
+ <exclude name="**/?*.flex"/>
+ <exclude name="**/?*.kt"/>
+ <exclude name="**/?*.clj"/>
+ <exclude name="**/?*.aj"/>
+ </patternset>
+
+ <!-- JDK definitions -->
+
+ <property name="jdk.bin.1.8" value="${jdk.home.1.8}/bin"/>
+ <path id="jdk.classpath.1.8">
+ <fileset dir="${jdk.home.1.8}">
+ <include name="jre/lib/charsets.jar"/>
+ <include name="jre/lib/deploy.jar"/>
+ <include name="jre/lib/ext/access-bridge-64.jar"/>
+ <include name="jre/lib/ext/cldrdata.jar"/>
+ <include name="jre/lib/ext/dnsns.jar"/>
+ <include name="jre/lib/ext/jaccess.jar"/>
+ <include name="jre/lib/ext/jfxrt.jar"/>
+ <include name="jre/lib/ext/localedata.jar"/>
+ <include name="jre/lib/ext/nashorn.jar"/>
+ <include name="jre/lib/ext/sunec.jar"/>
+ <include name="jre/lib/ext/sunjce_provider.jar"/>
+ <include name="jre/lib/ext/sunmscapi.jar"/>
+ <include name="jre/lib/ext/sunpkcs11.jar"/>
+ <include name="jre/lib/ext/zipfs.jar"/>
+ <include name="jre/lib/javaws.jar"/>
+ <include name="jre/lib/jce.jar"/>
+ <include name="jre/lib/jfr.jar"/>
+ <include name="jre/lib/jfxswt.jar"/>
+ <include name="jre/lib/jsse.jar"/>
+ <include name="jre/lib/management-agent.jar"/>
+ <include name="jre/lib/plugin.jar"/>
+ <include name="jre/lib/resources.jar"/>
+ <include name="jre/lib/rt.jar"/>
+ </fileset>
+ </path>
+
+ <property name="project.jdk.home" value="${jdk.home.1.8}"/>
+ <property name="project.jdk.bin" value="${jdk.bin.1.8}"/>
+ <property name="project.jdk.classpath" value="jdk.classpath.1.8"/>
+
+
+ <!-- Project Libraries -->
+
+ <path id="library.maven:_com.101tec:zkclient:0.10.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/101tec/zkclient/0.10/zkclient-0.10.jar"/>
+ </path>
+
+ <path id="library.maven:_com.alibaba:fastjson:1.2.59.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/alibaba/fastjson/1.2.59/fastjson-1.2.59.jar"/>
+ </path>
+
+ <path id="library.maven:_com.esotericsoftware:kryo:3.0.3.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/esotericsoftware/kryo/3.0.3/kryo-3.0.3.jar"/>
+ </path>
+
+ <path id="library.maven:_com.esotericsoftware:minlog:1.3.0.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/esotericsoftware/minlog/1.3.0/minlog-1.3.0.jar"/>
+ </path>
+
+ <path id="library.maven:_com.esotericsoftware:reflectasm:1.10.1.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/esotericsoftware/reflectasm/1.10.1/reflectasm-1.10.1.jar"/>
+ </path>
+
+ <path id="library.maven:_com.fasterxml.jackson.core:jackson-annotations:2.9.5.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/fasterxml/jackson/core/jackson-annotations/2.9.5/jackson-annotations-2.9.5.jar"/>
+ </path>
+
+ <path id="library.maven:_com.fasterxml.jackson.core:jackson-core:2.9.5.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/fasterxml/jackson/core/jackson-core/2.9.5/jackson-core-2.9.5.jar"/>
+ </path>
+
+ <path id="library.maven:_com.fasterxml.jackson.core:jackson-databind:2.9.1.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/fasterxml/jackson/core/jackson-databind/2.9.1/jackson-databind-2.9.1.jar"/>
+ </path>
+
+ <path id="library.maven:_com.google.guava:guava:16.0.1.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/google/guava/guava/16.0.1/guava-16.0.1.jar"/>
+ </path>
+
+ <path id="library.maven:_com.googlecode.json-simple:json-simple:1.1.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar"/>
+ </path>
+
+ <path id="library.maven:_com.lmax:disruptor:3.3.2.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar"/>
+ </path>
+
+ <path id="library.maven:_com.maxmind.db:maxmind-db:1.2.2.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/maxmind/db/maxmind-db/1.2.2/maxmind-db-1.2.2.jar"/>
+ </path>
+
+ <path id="library.maven:_com.maxmind.geoip2:geoip2:2.12.0.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/maxmind/geoip2/geoip2/2.12.0/geoip2-2.12.0.jar"/>
+ </path>
+
+ <path id="library.maven:_com.maxmind.geoip:geoip-api:1.3.1.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/maxmind/geoip/geoip-api/1.3.1/geoip-api-1.3.1.jar"/>
+ </path>
+
+ <path id="library.maven:_com.yammer.metrics:metrics-core:2.2.0.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar"/>
+ </path>
+
+ <path id="library.maven:_com.zdjizhi:galaxy:1.0.3.classpath">
+ <pathelement location="${path.variable.maven_repository}/com/zdjizhi/galaxy/1.0.3/galaxy-1.0.3.jar"/>
+ </path>
+
+ <path id="library.maven:_commons-codec:commons-codec:1.10.classpath">
+ <pathelement location="${path.variable.maven_repository}/commons-codec/commons-codec/1.10/commons-codec-1.10.jar"/>
+ </path>
+
+ <path id="library.maven:_commons-io:commons-io:2.5.classpath">
+ <pathelement location="${path.variable.maven_repository}/commons-io/commons-io/2.5/commons-io-2.5.jar"/>
+ </path>
+
+ <path id="library.maven:_commons-lang:commons-lang:2.5.classpath">
+ <pathelement location="${path.variable.maven_repository}/commons-lang/commons-lang/2.5/commons-lang-2.5.jar"/>
+ </path>
+
+ <path id="library.maven:_commons-logging:commons-logging:1.2.classpath">
+ <pathelement location="${path.variable.maven_repository}/commons-logging/commons-logging/1.2/commons-logging-1.2.jar"/>
+ </path>
+
+ <path id="library.maven:_io.netty:netty:3.10.5.final.classpath">
+ <pathelement location="${path.variable.maven_repository}/io/netty/netty/3.10.5.Final/netty-3.10.5.Final.jar"/>
+ </path>
+
+ <path id="library.maven:_javax.servlet:servlet-api:2.5.classpath">
+ <pathelement location="${path.variable.maven_repository}/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar"/>
+ </path>
+
+ <path id="library.maven:_jline:jline:0.9.94.classpath">
+ <pathelement location="${path.variable.maven_repository}/jline/jline/0.9.94/jline-0.9.94.jar"/>
+ </path>
+
+ <path id="library.maven:_joda-time:joda-time:2.10.classpath">
+ <pathelement location="${path.variable.maven_repository}/joda-time/joda-time/2.10/joda-time-2.10.jar"/>
+ </path>
+
+ <path id="library.maven:_junit:junit:4.11.classpath">
+ <pathelement location="${path.variable.maven_repository}/junit/junit/4.11/junit-4.11.jar"/>
+ </path>
+
+ <path id="library.maven:_log4j:log4j:1.2.14.classpath">
+ <pathelement location="${path.variable.maven_repository}/log4j/log4j/1.2.14/log4j-1.2.14.jar"/>
+ </path>
+
+ <path id="library.maven:_net.sf.jopt-simple:jopt-simple:5.0.4.classpath">
+ <pathelement location="${path.variable.maven_repository}/net/sf/jopt-simple/jopt-simple/5.0.4/jopt-simple-5.0.4.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.curator:curator-client:2.10.0.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/curator/curator-client/2.10.0/curator-client-2.10.0.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.curator:curator-framework:2.10.0.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/curator/curator-framework/2.10.0/curator-framework-2.10.0.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.httpcomponents:httpclient:4.5.5.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/httpcomponents/httpclient/4.5.5/httpclient-4.5.5.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.httpcomponents:httpcore:4.4.9.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/httpcomponents/httpcore/4.4.9/httpcore-4.4.9.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.kafka:kafka-clients:1.0.0.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/kafka/kafka-clients/1.0.0/kafka-clients-1.0.0.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.kafka:kafka_2.11:1.0.0.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/kafka/kafka_2.11/1.0.0/kafka_2.11-1.0.0.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.logging.log4j:log4j-api:2.1.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/logging/log4j/log4j-api/2.1/log4j-api-2.1.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.logging.log4j:log4j-core:2.1.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/logging/log4j/log4j-core/2.1/log4j-core-2.1.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.logging.log4j:log4j-slf4j-impl:2.1.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/logging/log4j/log4j-slf4j-impl/2.1/log4j-slf4j-impl-2.1.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.storm:storm-core:1.0.2.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/storm/storm-core/1.0.2/storm-core-1.0.2.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.storm:storm-kafka:1.0.2.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/storm/storm-kafka/1.0.2/storm-kafka-1.0.2.jar"/>
+ </path>
+
+ <path id="library.maven:_org.apache.zookeeper:zookeeper:3.4.9.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/apache/zookeeper/zookeeper/3.4.9/zookeeper-3.4.9.jar"/>
+ </path>
+
+ <path id="library.maven:_org.clojure:clojure:1.7.0.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/clojure/clojure/1.7.0/clojure-1.7.0.jar"/>
+ </path>
+
+ <path id="library.maven:_org.hamcrest:hamcrest-core:1.3.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar"/>
+ </path>
+
+ <path id="library.maven:_org.lz4:lz4-java:1.4.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/lz4/lz4-java/1.4/lz4-java-1.4.jar"/>
+ </path>
+
+ <path id="library.maven:_org.objenesis:objenesis:2.1.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/objenesis/objenesis/2.1/objenesis-2.1.jar"/>
+ </path>
+
+ <path id="library.maven:_org.ow2.asm:asm:5.0.3.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/ow2/asm/asm/5.0.3/asm-5.0.3.jar"/>
+ </path>
+
+ <path id="library.maven:_org.scala-lang:scala-library:2.11.11.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/scala-lang/scala-library/2.11.11/scala-library-2.11.11.jar"/>
+ </path>
+
+ <path id="library.maven:_org.slf4j:log4j-over-slf4j:1.6.6.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar"/>
+ </path>
+
+ <path id="library.maven:_org.slf4j:slf4j-api:1.7.7.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.jar"/>
+ </path>
+
+ <path id="library.maven:_org.xerial.snappy:snappy-java:1.1.4.classpath">
+ <pathelement location="${path.variable.maven_repository}/org/xerial/snappy/snappy-java/1.1.4/snappy-java-1.1.4.jar"/>
+ </path>
+
+
+ <!-- Global Libraries -->
+
+
+ <!-- Application Server Libraries -->
+ <!-- Register Custom Compiler Taskdefs -->
+ <property name="javac2.home" value="${idea.home}/lib"/>
+ <path id="javac2.classpath">
+ <fileset dir="${javac2.home}">
+ <include name="javac2.jar"/>
+ <include name="jdom.jar"/>
+ <include name="asm-all*.jar"/>
+ <include name="jgoodies-forms.jar"/>
+ </fileset>
+ </path>
+ <target name="register.custom.compilers">
+ <taskdef name="javac2" classname="com.intellij.ant.Javac2" classpathref="javac2.classpath"/>
+ <taskdef name="instrumentIdeaExtensions" classname="com.intellij.ant.InstrumentIdeaExtensions" classpathref="javac2.classpath"/>
+ </target>
+
+ <!-- Modules -->
+
+ <import file="${basedir}/.idea/module_log-stream-aggregation.xml"/>
+
+ <target name="init" description="Build initialization">
+ <!-- Perform any build initialization in this target -->
+ </target>
+
+ <target name="clean" depends="clean.module.log-stream-aggregation" description="cleanup all"/>
+
+ <target name="build.modules" depends="init, clean, compile.module.log-stream-aggregation" description="build all modules"/>
+
+ <target name="all" depends="build.modules" description="build all"/>
+</project> \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index f6c2779..8b12d9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,27 +12,80 @@
<url>http://maven.apache.org</url>
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public</url>
+ </repository>
+ </repositories>
+
- <!--<repositories>-->
- <!--<repository>-->
- <!--<id>nexus</id>-->
- <!--<name>Team Nexus Repository</name>-->
- <!--<url>http://192.168.40.125:8099/content/groups/public</url>-->
- <!--</repository>-->
- <!--</repositories>-->
<build>
<plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology</mainClass>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.handlers</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.schemas</resource>
+ </transformer>
+
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
<configuration>
- <source>8</source>
- <target>8</target>
+ <source>1.8</source>
+ <target>1.8</target>
</configuration>
</plugin>
</plugins>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+
</build>
+
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
@@ -42,25 +95,35 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.2</version>
+
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
-
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.3</version>
<exclusions>
+
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
@@ -69,26 +132,22 @@
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
+ <exclusion>
+ <artifactId>guava</artifactId>
+ <groupId>com.google.guava</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
- <version>0.10.0.1</version>
+ <version>1.0.0</version>
<exclusions>
<exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
</exclusions>
</dependency>
@@ -99,20 +158,6 @@
<scope>test</scope>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.thrift/libthrift -->
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.10.0</version>
- <type>pom</type>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.thrift.tools/maven-thrift-plugin -->
- <dependency>
- <groupId>org.apache.thrift.tools</groupId>
- <artifactId>maven-thrift-plugin</artifactId>
- <version>0.1.11</version>
- </dependency>
<dependency>
<groupId>com.alibaba</groupId>
@@ -120,37 +165,5 @@
<version>1.2.59</version>
</dependency>
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- <version>3.2.2</version>
- </dependency>
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <version>1.10</version>
- </dependency>
-
- <!--&lt;!&ndash; https://mvnrepository.com/artifact/org.jgrapht/jgrapht-core &ndash;&gt;-->
- <!--<dependency>-->
- <!--<groupId>org.jgrapht</groupId>-->
- <!--<artifactId>jgrapht-core</artifactId>-->
- <!--<version>1.1.0</version>-->
- <!--</dependency>-->
- <!-- https://mvnrepository.com/artifact/org.jgrapht/jgrapht-dist -->
- <dependency>
- <groupId>org.jgrapht</groupId>
- <artifactId>jgrapht-dist</artifactId>
- <version>1.0.1</version>
- <type>pom</type>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- <version>5.0.0</version>
- <scope>compile</scope>
- </dependency>
-
-
</dependencies>
</project>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index ce08511..b3eb718 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -30,7 +30,8 @@ group.id=lxk-200512
#输出topic
results.output.topic=agg_test
#results.output.topic=SECURITY-EVENT-COMPLETED-LOG
-
+#聚合时间,单位秒
+agg.time=30
#storm topology workers
topology.workers=1
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java
index cd179cd..f63884d 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java
+++ b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java
@@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit;
**/
public class AggregateTopology {
+
+
+
public static void main(String[] args) {
//TODO 创建一个topo任务
TridentTopology topology = new TridentTopology();
@@ -29,16 +32,26 @@ public class AggregateTopology {
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
- .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)
+ .name("one")
+ .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度
+ .name("two")
.each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
- .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
- .slidingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
- .each(new Fields("map"), new KafkaBolt(), new Fields());
+ .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度
+ .name("three")
+ .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
+ .name("four")
+ .each(new Fields("map"), new KafkaBolt(), new Fields())
+ .name("five")
+ .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度
+ .name("six");
Config config = new Config();
+// config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
config.setDebug(false);
- config.setNumWorkers(5);
+ config.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); //worker的数量
LocalCluster cluster = new LocalCluster();
+
+
cluster.submitTopology("trident-wordcount", config, topology.build());
// StormSubmitter.submitTopology("kafka2storm_opaqueTrident_topology", config,topology.build());
}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java
index 21ce8d3..2583b5b 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java
+++ b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java
@@ -29,6 +29,7 @@ public class TridentKafkaSpout {
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.startOffsetTime = -1L;
+ kafkaConfig.socketTimeoutMs=60000;
//不透明事务型Spout
opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java
new file mode 100644
index 0000000..d981fe1
--- /dev/null
+++ b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java
@@ -0,0 +1,111 @@
+package cn.ac.iie.trident.aggregate.topology;
+
+
+import cn.ac.iie.trident.aggregate.AggCount;
+import cn.ac.iie.trident.aggregate.ParseJson2KV;
+import cn.ac.iie.trident.aggregate.bolt.KafkaBolt;
+import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout;
+import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.tuple.Fields;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Storm程序主类
+ *
+ * @author Administrator
+ */
+
+public class LogFlowWriteTopology {
+ private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
+
+ private TopologyBuilder builder;
+ private static TridentTopology tridentTopology;
+
+
+
+
+
+ private static Config createTopologConfig() {
+ Config conf = new Config();
+ conf.setDebug(false);
+ conf.setMessageTimeoutSecs(60);
+ conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
+ return conf;
+ }
+
+
+ private static StormTopology buildTopology() {
+
+ OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
+
+ tridentTopology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
+ .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)
+ .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
+ .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
+ .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
+ .each(new Fields("map"), new KafkaBolt(), new Fields());
+ return tridentTopology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+
+
+ Config conf = new Config();
+ conf.setDebug(false);
+ conf.setMessageTimeoutSecs(60);
+ conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
+
+ //TODO 创建一个topo任务
+ TridentTopology topology = new TridentTopology();
+ //TODO 为Topo绑定Spout
+ OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
+
+ /* topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
+ .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)//6
+ .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
+ .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)//9
+ .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
+ .each(new Fields("map"), new KafkaBolt(), new Fields());*/
+
+
+ topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
+ .name("one")
+ .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度
+ .name("two")
+ .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
+ .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度
+ .name("three")
+ .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
+ .name("four")
+ .each(new Fields("map"), new KafkaBolt(), new Fields())
+ .name("five")
+ .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度
+ .name("six");
+
+ if(args.length == 0){//本地模式运行
+
+ LocalCluster cluster = new LocalCluster();
+
+ cluster.submitTopology("trident-function", conf, topology.build());
+ Thread.sleep(100000);
+ cluster.shutdown();
+ }else{//集群模式运行
+ StormSubmitter.submitTopology(args[0], conf, topology.build());
+ }
+
+ }
+}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java
new file mode 100644
index 0000000..708f77c
--- /dev/null
+++ b/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java
@@ -0,0 +1,35 @@
+package cn.ac.iie.trident.aggregate.topology;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+
+/**
+ * @author Administrator
+ */
+public final class StormRunner {
+ private static final int MILLS_IN_SEC = 1000;
+
+ private StormRunner() {}
+
+ public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
+
+ LocalCluster localCluster = new LocalCluster();
+ localCluster.submitTopology(topologyName, conf, builder.createTopology());
+ Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
+ localCluster.shutdown();
+
+ }
+
+ public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+
+ StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java
index 0ab90a5..3905580 100644
--- a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java
+++ b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java
@@ -23,6 +23,10 @@ public class FlowWriteConfig {
public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
+
+ public static final Integer AGG_TIME = FlowWriteConfigurations.getIntProperty(0, "agg.time");
+
+
/**
* kafka
*/
@@ -36,8 +40,6 @@ public class FlowWriteConfig {
public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset");
public static final String KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "kafka.compression.type");
- public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library");
-
/**
* http
*/
diff --git a/src/test/java/com/wp/AppTest.java b/src/test/java/com/wp/AppTest.java
index 3202adb..77c826f 100644
--- a/src/test/java/com/wp/AppTest.java
+++ b/src/test/java/com/wp/AppTest.java
@@ -7,78 +7,50 @@ import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
/**
* Unit test for simple App.
*/
-public class AppTest
- extends TestCase
-{
- /**
- * Create the test case
- *
- * @param testName name of the test case
- */
- public AppTest( String testName )
- {
- super( testName );
- }
-
- /**
- * @return the suite of tests being tested
- */
- public static Test suite()
- {
- return new TestSuite( AppTest.class );
- }
+public class AppTest{
- /**
- * Rigourous Test :-)
- */
- public void testApp()
- {
- assertTrue( true );
- }
-
-
- private static ValueBean valueBean;
@org.junit.Test
public void test(){
- System.out.println(valueBean == null);
+ Config conf = new Config();
+// conf.setDebug(false);
+ conf.setMessageTimeoutSecs(60);
+ conf.setNumWorkers(1);
+
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,
+ new Values("nickt1", 4),
+ new Values("nickt2", 7),
+ new Values("nickt3", 8),
+ new Values("nickt4", 9),
+ new Values("nickt5", 7),
+ new Values("nickt6", 11),
+ new Values("nickt7", 5)
+ );
+ spout.setCycle(true);
+ TridentTopology topology = new TridentTopology();
+ topology.newStream("spout1", spout)
+ .batchGlobal()
+ .each(new Fields("user"),new Debug("print:"))
+ .parallelismHint(5);
+
+ LocalCluster cluster = new LocalCluster();
+
+ cluster.submitTopology("trident-function", conf, topology.build());
}
- static class Demo{
- private String a;
- private String b;
- private String c;
-
- public String getA() {
- return a;
- }
-
- public void setA(String a) {
- this.a = a;
- }
-
- public String getB() {
- return b;
- }
-
- public void setB(String b) {
- this.b = b;
- }
-
- public String getC() {
- return c;
- }
-
- public void setC(String c) {
- this.c = c;
- }
- }
}
diff --git a/storm-topology.log b/storm-topology.log
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/storm-topology.log