diff options
| author | lee <[email protected]> | 2020-06-08 15:32:26 +0800 |
|---|---|---|
| committer | lee <[email protected]> | 2020-06-08 15:32:26 +0800 |
| commit | cb4ee7544e1c75e15fcc45e800e047ec269816db (patch) | |
| tree | 68e566dabc6537248370df89fd4848dd5fdf911a | |
| parent | 36b04e3feaaf1f1c5af1a005af20d2eb28804bc7 (diff) | |
OLAP预聚合代码更新original
| -rw-r--r-- | dependency-reduced-pom.xml | 125 | ||||
| -rw-r--r-- | log-stream-aggregation.properties | 4 | ||||
| -rw-r--r-- | log-stream-aggregation.xml | 318 | ||||
| -rw-r--r-- | pom.xml | 143 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 3 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java | 23 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java | 1 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java | 111 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java | 35 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java | 6 | ||||
| -rw-r--r-- | src/test/java/com/wp/AppTest.java | 92 | ||||
| -rw-r--r-- | storm-topology.log | 0 |
12 files changed, 728 insertions, 133 deletions
diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml new file mode 100644 index 0000000..cc9ded2 --- /dev/null +++ b/dependency-reduced-pom.xml @@ -0,0 +1,125 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>cn.ac.iie</groupId> + <artifactId>log-stream-aggregation</artifactId> + <name>log-stream-aggregation</name> + <version>0.0.1-SNAPSHOT</version> + <url>http://maven.apache.org</url> + <build> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + </includes> + </resource> + <resource> + <directory>properties</directory> + <includes> + <include>log4j.properties</include> + </includes> + </resource> + </resources> + <plugins> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.2</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer> + <mainClass>cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology</mainClass> + </transformer> + <transformer> + <resource>META-INF/spring.handlers</resource> + </transformer> + <transformer> + <resource>META-INF/spring.schemas</resource> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.40.125:8099/content/groups/public</url> + </repository> + </repositories> + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>1.0.2</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>kryo</artifactId> + <groupId>com.esotericsoftware</groupId> + </exclusion> + <exclusion> + <artifactId>clojure</artifactId> + <groupId>org.clojure</groupId> + </exclusion> + <exclusion> + <artifactId>disruptor</artifactId> + <groupId>com.lmax</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-api</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-core</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-slf4j-impl</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>hamcrest-core</artifactId> + <groupId>org.hamcrest</groupId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> +</project> + diff --git a/log-stream-aggregation.properties b/log-stream-aggregation.properties new file mode 100644 index 0000000..c77a9da --- /dev/null +++ b/log-stream-aggregation.properties @@ -0,0 +1,4 @@ +path.variable.kotlin_bundled=F\:\\tools\\ideaIU-2018.1.4\\plugins\\Kotlin\\kotlinc +path.variable.maven_repository=C\:\\Users\\lixikang\\.m2\\repository +jdk.home.1.8=C\:/developer_tools/jdk1.8.0_131 +javac2.instrumentation.includeJavaRuntime=false
\ No newline at end of file diff --git a/log-stream-aggregation.xml b/log-stream-aggregation.xml new file mode 100644 index 0000000..197b825 --- /dev/null +++ b/log-stream-aggregation.xml @@ -0,0 +1,318 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project name="log-stream-aggregation" default="all"> + + + <property file="log-stream-aggregation.properties"/> + <!-- Uncomment the following property if no tests compilation is needed --> + <!-- + <property name="skip.tests" value="true"/> + --> + + <!-- Compiler options --> + + <property name="compiler.debug" value="on"/> + <property name="compiler.generate.no.warnings" value="off"/> + <property name="compiler.args" value=""/> + <property name="compiler.max.memory" value="700m"/> + <patternset id="ignored.files"> + <exclude name="**/*.hprof/**"/> + <exclude name="**/*.pyc/**"/> + <exclude name="**/*.pyo/**"/> + <exclude name="**/*.rbc/**"/> + <exclude name="**/*.yarb/**"/> + <exclude name="**/*~/**"/> + <exclude name="**/.DS_Store/**"/> + <exclude name="**/.git/**"/> + <exclude name="**/.hg/**"/> + <exclude name="**/.svn/**"/> + <exclude name="**/CVS/**"/> + <exclude name="**/__pycache__/**"/> + <exclude name="**/_svn/**"/> + <exclude name="**/vssver.scc/**"/> + <exclude name="**/vssver2.scc/**"/> + </patternset> + <patternset id="library.patterns"> + <include name="*.egg"/> + <include name="*.jar"/> + <include name="*.ear"/> + <include name="*.swc"/> + <include name="*.war"/> + <include name="*.zip"/> + <include name="*.ane"/> + </patternset> + <patternset id="compiler.resources"> + <exclude name="**/?*.java"/> + <exclude name="**/?*.form"/> + <exclude name="**/?*.class"/> + <exclude name="**/?*.groovy"/> + <exclude name="**/?*.scala"/> + <exclude name="**/?*.flex"/> + <exclude name="**/?*.kt"/> + <exclude name="**/?*.clj"/> + <exclude name="**/?*.aj"/> + </patternset> + + <!-- JDK definitions --> + + <property name="jdk.bin.1.8" value="${jdk.home.1.8}/bin"/> + <path id="jdk.classpath.1.8"> + <fileset dir="${jdk.home.1.8}"> + <include name="jre/lib/charsets.jar"/> + <include name="jre/lib/deploy.jar"/> + <include name="jre/lib/ext/access-bridge-64.jar"/> + <include name="jre/lib/ext/cldrdata.jar"/> + <include name="jre/lib/ext/dnsns.jar"/> + <include name="jre/lib/ext/jaccess.jar"/> + <include name="jre/lib/ext/jfxrt.jar"/> + <include name="jre/lib/ext/localedata.jar"/> + <include name="jre/lib/ext/nashorn.jar"/> + <include name="jre/lib/ext/sunec.jar"/> + <include name="jre/lib/ext/sunjce_provider.jar"/> + <include name="jre/lib/ext/sunmscapi.jar"/> + <include name="jre/lib/ext/sunpkcs11.jar"/> + <include name="jre/lib/ext/zipfs.jar"/> + <include name="jre/lib/javaws.jar"/> + <include name="jre/lib/jce.jar"/> + <include name="jre/lib/jfr.jar"/> + <include name="jre/lib/jfxswt.jar"/> + <include name="jre/lib/jsse.jar"/> + <include name="jre/lib/management-agent.jar"/> + <include name="jre/lib/plugin.jar"/> + <include name="jre/lib/resources.jar"/> + <include name="jre/lib/rt.jar"/> + </fileset> + </path> + + <property name="project.jdk.home" value="${jdk.home.1.8}"/> + <property name="project.jdk.bin" value="${jdk.bin.1.8}"/> + <property name="project.jdk.classpath" value="jdk.classpath.1.8"/> + + + <!-- Project Libraries --> + + <path id="library.maven:_com.101tec:zkclient:0.10.classpath"> + <pathelement location="${path.variable.maven_repository}/com/101tec/zkclient/0.10/zkclient-0.10.jar"/> + </path> + + <path id="library.maven:_com.alibaba:fastjson:1.2.59.classpath"> + <pathelement location="${path.variable.maven_repository}/com/alibaba/fastjson/1.2.59/fastjson-1.2.59.jar"/> + </path> + + <path id="library.maven:_com.esotericsoftware:kryo:3.0.3.classpath"> + <pathelement location="${path.variable.maven_repository}/com/esotericsoftware/kryo/3.0.3/kryo-3.0.3.jar"/> + </path> + + <path id="library.maven:_com.esotericsoftware:minlog:1.3.0.classpath"> + <pathelement location="${path.variable.maven_repository}/com/esotericsoftware/minlog/1.3.0/minlog-1.3.0.jar"/> + </path> + + <path id="library.maven:_com.esotericsoftware:reflectasm:1.10.1.classpath"> + <pathelement location="${path.variable.maven_repository}/com/esotericsoftware/reflectasm/1.10.1/reflectasm-1.10.1.jar"/> + </path> + + <path id="library.maven:_com.fasterxml.jackson.core:jackson-annotations:2.9.5.classpath"> + <pathelement location="${path.variable.maven_repository}/com/fasterxml/jackson/core/jackson-annotations/2.9.5/jackson-annotations-2.9.5.jar"/> + </path> + + <path id="library.maven:_com.fasterxml.jackson.core:jackson-core:2.9.5.classpath"> + <pathelement location="${path.variable.maven_repository}/com/fasterxml/jackson/core/jackson-core/2.9.5/jackson-core-2.9.5.jar"/> + </path> + + <path id="library.maven:_com.fasterxml.jackson.core:jackson-databind:2.9.1.classpath"> + <pathelement location="${path.variable.maven_repository}/com/fasterxml/jackson/core/jackson-databind/2.9.1/jackson-databind-2.9.1.jar"/> + </path> + + <path id="library.maven:_com.google.guava:guava:16.0.1.classpath"> + <pathelement location="${path.variable.maven_repository}/com/google/guava/guava/16.0.1/guava-16.0.1.jar"/> + </path> + + <path id="library.maven:_com.googlecode.json-simple:json-simple:1.1.classpath"> + <pathelement location="${path.variable.maven_repository}/com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar"/> + </path> + + <path id="library.maven:_com.lmax:disruptor:3.3.2.classpath"> + <pathelement location="${path.variable.maven_repository}/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar"/> + </path> + + <path id="library.maven:_com.maxmind.db:maxmind-db:1.2.2.classpath"> + <pathelement location="${path.variable.maven_repository}/com/maxmind/db/maxmind-db/1.2.2/maxmind-db-1.2.2.jar"/> + </path> + + <path id="library.maven:_com.maxmind.geoip2:geoip2:2.12.0.classpath"> + <pathelement location="${path.variable.maven_repository}/com/maxmind/geoip2/geoip2/2.12.0/geoip2-2.12.0.jar"/> + </path> + + <path id="library.maven:_com.maxmind.geoip:geoip-api:1.3.1.classpath"> + <pathelement location="${path.variable.maven_repository}/com/maxmind/geoip/geoip-api/1.3.1/geoip-api-1.3.1.jar"/> + </path> + + <path id="library.maven:_com.yammer.metrics:metrics-core:2.2.0.classpath"> + <pathelement location="${path.variable.maven_repository}/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar"/> + </path> + + <path id="library.maven:_com.zdjizhi:galaxy:1.0.3.classpath"> + <pathelement location="${path.variable.maven_repository}/com/zdjizhi/galaxy/1.0.3/galaxy-1.0.3.jar"/> + </path> + + <path id="library.maven:_commons-codec:commons-codec:1.10.classpath"> + <pathelement location="${path.variable.maven_repository}/commons-codec/commons-codec/1.10/commons-codec-1.10.jar"/> + </path> + + <path id="library.maven:_commons-io:commons-io:2.5.classpath"> + <pathelement location="${path.variable.maven_repository}/commons-io/commons-io/2.5/commons-io-2.5.jar"/> + </path> + + <path id="library.maven:_commons-lang:commons-lang:2.5.classpath"> + <pathelement location="${path.variable.maven_repository}/commons-lang/commons-lang/2.5/commons-lang-2.5.jar"/> + </path> + + <path id="library.maven:_commons-logging:commons-logging:1.2.classpath"> + <pathelement location="${path.variable.maven_repository}/commons-logging/commons-logging/1.2/commons-logging-1.2.jar"/> + </path> + + <path id="library.maven:_io.netty:netty:3.10.5.final.classpath"> + <pathelement location="${path.variable.maven_repository}/io/netty/netty/3.10.5.Final/netty-3.10.5.Final.jar"/> + </path> + + <path id="library.maven:_javax.servlet:servlet-api:2.5.classpath"> + <pathelement location="${path.variable.maven_repository}/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar"/> + </path> + + <path id="library.maven:_jline:jline:0.9.94.classpath"> + <pathelement location="${path.variable.maven_repository}/jline/jline/0.9.94/jline-0.9.94.jar"/> + </path> + + <path id="library.maven:_joda-time:joda-time:2.10.classpath"> + <pathelement location="${path.variable.maven_repository}/joda-time/joda-time/2.10/joda-time-2.10.jar"/> + </path> + + <path id="library.maven:_junit:junit:4.11.classpath"> + <pathelement location="${path.variable.maven_repository}/junit/junit/4.11/junit-4.11.jar"/> + </path> + + <path id="library.maven:_log4j:log4j:1.2.14.classpath"> + <pathelement location="${path.variable.maven_repository}/log4j/log4j/1.2.14/log4j-1.2.14.jar"/> + </path> + + <path id="library.maven:_net.sf.jopt-simple:jopt-simple:5.0.4.classpath"> + <pathelement location="${path.variable.maven_repository}/net/sf/jopt-simple/jopt-simple/5.0.4/jopt-simple-5.0.4.jar"/> + </path> + + <path id="library.maven:_org.apache.curator:curator-client:2.10.0.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/curator/curator-client/2.10.0/curator-client-2.10.0.jar"/> + </path> + + <path id="library.maven:_org.apache.curator:curator-framework:2.10.0.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/curator/curator-framework/2.10.0/curator-framework-2.10.0.jar"/> + </path> + + <path id="library.maven:_org.apache.httpcomponents:httpclient:4.5.5.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/httpcomponents/httpclient/4.5.5/httpclient-4.5.5.jar"/> + </path> + + <path id="library.maven:_org.apache.httpcomponents:httpcore:4.4.9.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/httpcomponents/httpcore/4.4.9/httpcore-4.4.9.jar"/> + </path> + + <path id="library.maven:_org.apache.kafka:kafka-clients:1.0.0.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/kafka/kafka-clients/1.0.0/kafka-clients-1.0.0.jar"/> + </path> + + <path id="library.maven:_org.apache.kafka:kafka_2.11:1.0.0.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/kafka/kafka_2.11/1.0.0/kafka_2.11-1.0.0.jar"/> + </path> + + <path id="library.maven:_org.apache.logging.log4j:log4j-api:2.1.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/logging/log4j/log4j-api/2.1/log4j-api-2.1.jar"/> + </path> + + <path id="library.maven:_org.apache.logging.log4j:log4j-core:2.1.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/logging/log4j/log4j-core/2.1/log4j-core-2.1.jar"/> + </path> + + <path id="library.maven:_org.apache.logging.log4j:log4j-slf4j-impl:2.1.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/logging/log4j/log4j-slf4j-impl/2.1/log4j-slf4j-impl-2.1.jar"/> + </path> + + <path id="library.maven:_org.apache.storm:storm-core:1.0.2.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/storm/storm-core/1.0.2/storm-core-1.0.2.jar"/> + </path> + + <path id="library.maven:_org.apache.storm:storm-kafka:1.0.2.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/storm/storm-kafka/1.0.2/storm-kafka-1.0.2.jar"/> + </path> + + <path id="library.maven:_org.apache.zookeeper:zookeeper:3.4.9.classpath"> + <pathelement location="${path.variable.maven_repository}/org/apache/zookeeper/zookeeper/3.4.9/zookeeper-3.4.9.jar"/> + </path> + + <path id="library.maven:_org.clojure:clojure:1.7.0.classpath"> + <pathelement location="${path.variable.maven_repository}/org/clojure/clojure/1.7.0/clojure-1.7.0.jar"/> + </path> + + <path id="library.maven:_org.hamcrest:hamcrest-core:1.3.classpath"> + <pathelement location="${path.variable.maven_repository}/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar"/> + </path> + + <path id="library.maven:_org.lz4:lz4-java:1.4.classpath"> + <pathelement location="${path.variable.maven_repository}/org/lz4/lz4-java/1.4/lz4-java-1.4.jar"/> + </path> + + <path id="library.maven:_org.objenesis:objenesis:2.1.classpath"> + <pathelement location="${path.variable.maven_repository}/org/objenesis/objenesis/2.1/objenesis-2.1.jar"/> + </path> + + <path id="library.maven:_org.ow2.asm:asm:5.0.3.classpath"> + <pathelement location="${path.variable.maven_repository}/org/ow2/asm/asm/5.0.3/asm-5.0.3.jar"/> + </path> + + <path id="library.maven:_org.scala-lang:scala-library:2.11.11.classpath"> + <pathelement location="${path.variable.maven_repository}/org/scala-lang/scala-library/2.11.11/scala-library-2.11.11.jar"/> + </path> + + <path id="library.maven:_org.slf4j:log4j-over-slf4j:1.6.6.classpath"> + <pathelement location="${path.variable.maven_repository}/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar"/> + </path> + + <path id="library.maven:_org.slf4j:slf4j-api:1.7.7.classpath"> + <pathelement location="${path.variable.maven_repository}/org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.jar"/> + </path> + + <path id="library.maven:_org.xerial.snappy:snappy-java:1.1.4.classpath"> + <pathelement location="${path.variable.maven_repository}/org/xerial/snappy/snappy-java/1.1.4/snappy-java-1.1.4.jar"/> + </path> + + + <!-- Global Libraries --> + + + <!-- Application Server Libraries --> + <!-- Register Custom Compiler Taskdefs --> + <property name="javac2.home" value="${idea.home}/lib"/> + <path id="javac2.classpath"> + <fileset dir="${javac2.home}"> + <include name="javac2.jar"/> + <include name="jdom.jar"/> + <include name="asm-all*.jar"/> + <include name="jgoodies-forms.jar"/> + </fileset> + </path> + <target name="register.custom.compilers"> + <taskdef name="javac2" classname="com.intellij.ant.Javac2" classpathref="javac2.classpath"/> + <taskdef name="instrumentIdeaExtensions" classname="com.intellij.ant.InstrumentIdeaExtensions" classpathref="javac2.classpath"/> + </target> + + <!-- Modules --> + + <import file="${basedir}/.idea/module_log-stream-aggregation.xml"/> + + <target name="init" description="Build initialization"> + <!-- Perform any build initialization in this target --> + </target> + + <target name="clean" depends="clean.module.log-stream-aggregation" description="cleanup all"/> + + <target name="build.modules" depends="init, clean, compile.module.log-stream-aggregation" description="build all modules"/> + + <target name="all" depends="build.modules" description="build all"/> +</project>
\ No newline at end of file @@ -12,27 +12,80 @@ <url>http://maven.apache.org</url> + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.40.125:8099/content/groups/public</url> + </repository> + </repositories> + - <!--<repositories>--> - <!--<repository>--> - <!--<id>nexus</id>--> - <!--<name>Team Nexus Repository</name>--> - <!--<url>http://192.168.40.125:8099/content/groups/public</url>--> - <!--</repository>--> - <!--</repositories>--> <build> <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.2</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology</mainClass> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.handlers</resource> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.schemas</resource> + </transformer> + + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> <configuration> - <source>8</source> - <target>8</target> + <source>1.8</source> + <target>1.8</target> </configuration> </plugin> </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + </includes> + <filtering>false</filtering> + </resource> + <resource> + <directory>properties</directory> + <includes> + <include>log4j.properties</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> + <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> @@ -42,25 +95,35 @@ <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.0.2</version> + <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> - </dependency> <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> <version>1.0.3</version> <exclusions> + + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </exclusion> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> @@ -69,26 +132,22 @@ <artifactId>log4j-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> + <exclusion> + <artifactId>guava</artifactId> + <groupId>com.google.guava</groupId> + </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> - <version>0.10.0.1</version> + <version>1.0.0</version> <exclusions> <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> </exclusions> </dependency> @@ -99,20 +158,6 @@ <scope>test</scope> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.thrift/libthrift --> - <dependency> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> - <version>0.10.0</version> - <type>pom</type> - </dependency> - - <!-- https://mvnrepository.com/artifact/org.apache.thrift.tools/maven-thrift-plugin --> - <dependency> - <groupId>org.apache.thrift.tools</groupId> - <artifactId>maven-thrift-plugin</artifactId> - <version>0.1.11</version> - </dependency> <dependency> <groupId>com.alibaba</groupId> @@ -120,37 +165,5 @@ <version>1.2.59</version> </dependency> - <dependency> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - <version>3.2.2</version> - </dependency> - <dependency> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - <version>1.10</version> - </dependency> - - <!--<!– https://mvnrepository.com/artifact/org.jgrapht/jgrapht-core –>--> - <!--<dependency>--> - <!--<groupId>org.jgrapht</groupId>--> - <!--<artifactId>jgrapht-core</artifactId>--> - <!--<version>1.1.0</version>--> - <!--</dependency>--> - <!-- https://mvnrepository.com/artifact/org.jgrapht/jgrapht-dist --> - <dependency> - <groupId>org.jgrapht</groupId> - <artifactId>jgrapht-dist</artifactId> - <version>1.0.1</version> - <type>pom</type> - </dependency> - <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter-api</artifactId> - <version>5.0.0</version> - <scope>compile</scope> - </dependency> - - </dependencies> </project> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index ce08511..b3eb718 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -30,7 +30,8 @@ group.id=lxk-200512 #输出topic results.output.topic=agg_test #results.output.topic=SECURITY-EVENT-COMPLETED-LOG - +#聚合时间,单位秒 +agg.time=30 #storm topology workers topology.workers=1 diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java index cd179cd..f63884d 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java @@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit; **/ public class AggregateTopology { + + + public static void main(String[] args) { //TODO 创建一个topo任务 TridentTopology topology = new TridentTopology(); @@ -29,16 +32,26 @@ public class AggregateTopology { OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) - .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) + .name("one") + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度 + .name("two") .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) - .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) - .slidingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) - .each(new Fields("map"), new KafkaBolt(), new Fields()); + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度 + .name("three") + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .name("four") + .each(new Fields("map"), new KafkaBolt(), new Fields()) + .name("five") + .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度 + .name("six"); Config config = new Config(); +// config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); config.setDebug(false); - config.setNumWorkers(5); + config.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); //worker的数量 LocalCluster cluster = new LocalCluster(); + + cluster.submitTopology("trident-wordcount", config, topology.build()); // StormSubmitter.submitTopology("kafka2storm_opaqueTrident_topology", config,topology.build()); } diff --git a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java index 21ce8d3..2583b5b 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java @@ -29,6 +29,7 @@ public class TridentKafkaSpout { TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC); kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); kafkaConfig.startOffsetTime = -1L; + kafkaConfig.socketTimeoutMs=60000; //不透明事务型Spout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig); diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java new file mode 100644 index 0000000..d981fe1 --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java @@ -0,0 +1,111 @@ +package cn.ac.iie.trident.aggregate.topology; + + +import cn.ac.iie.trident.aggregate.AggCount; +import cn.ac.iie.trident.aggregate.ParseJson2KV; +import cn.ac.iie.trident.aggregate.bolt.KafkaBolt; +import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout; +import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.tuple.Fields; + +import java.util.concurrent.TimeUnit; + +/** + * Storm程序主类 + * + * @author Administrator + */ + +public class LogFlowWriteTopology { + private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class); + + private TopologyBuilder builder; + private static TridentTopology tridentTopology; + + + + + + private static Config createTopologConfig() { + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); + return conf; + } + + + private static StormTopology buildTopology() { + + OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); + + tridentTopology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) + .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .each(new Fields("map"), new KafkaBolt(), new Fields()); + return tridentTopology.build(); + } + + public static void main(String[] args) throws Exception { + + + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); + + //TODO 创建一个topo任务 + TridentTopology topology = new TridentTopology(); + //TODO 为Topo绑定Spout + OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); + + /* topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)//6 + .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)//9 + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .each(new Fields("map"), new KafkaBolt(), new Fields());*/ + + + topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) + .name("one") + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度 + .name("two") + .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度 + .name("three") + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .name("four") + .each(new Fields("map"), new KafkaBolt(), new Fields()) + .name("five") + .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度 + .name("six"); + + if(args.length == 0){//本地模式运行 + + LocalCluster cluster = new LocalCluster(); + + cluster.submitTopology("trident-function", conf, topology.build()); + Thread.sleep(100000); + cluster.shutdown(); + }else{//集群模式运行 + StormSubmitter.submitTopology(args[0], conf, topology.build()); + } + + } +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java new file mode 100644 index 0000000..708f77c --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java @@ -0,0 +1,35 @@ +package cn.ac.iie.trident.aggregate.topology; + + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; + +/** + * @author Administrator + */ +public final class StormRunner { + private static final int MILLS_IN_SEC = 1000; + + private StormRunner() {} + + public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { + + LocalCluster localCluster = new LocalCluster(); + localCluster.submitTopology(topologyName, conf, builder.createTopology()); + Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC); + localCluster.shutdown(); + + } + + public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); + } + + +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java index 0ab90a5..3905580 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java @@ -23,6 +23,10 @@ public class FlowWriteConfig { public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num"); public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); + + public static final Integer AGG_TIME = FlowWriteConfigurations.getIntProperty(0, "agg.time"); + + /** * kafka */ @@ -36,8 +40,6 @@ public class FlowWriteConfig { public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset"); public static final String KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "kafka.compression.type"); - public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library"); - /** * http */ diff --git a/src/test/java/com/wp/AppTest.java b/src/test/java/com/wp/AppTest.java index 3202adb..77c826f 100644 --- a/src/test/java/com/wp/AppTest.java +++ b/src/test/java/com/wp/AppTest.java @@ -7,78 +7,50 @@ import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; import junit.framework.Test; import junit.framework.TestCase; import junit.framework.TestSuite; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; /** * Unit test for simple App. */ -public class AppTest - extends TestCase -{ - /** - * Create the test case - * - * @param testName name of the test case - */ - public AppTest( String testName ) - { - super( testName ); - } - - /** - * @return the suite of tests being tested - */ - public static Test suite() - { - return new TestSuite( AppTest.class ); - } +public class AppTest{ - /** - * Rigourous Test :-) - */ - public void testApp() - { - assertTrue( true ); - } - - - private static ValueBean valueBean; @org.junit.Test public void test(){ - System.out.println(valueBean == null); + Config conf = new Config(); +// conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setNumWorkers(1); + + FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3, + new Values("nickt1", 4), + new Values("nickt2", 7), + new Values("nickt3", 8), + new Values("nickt4", 9), + new Values("nickt5", 7), + new Values("nickt6", 11), + new Values("nickt7", 5) + ); + spout.setCycle(true); + TridentTopology topology = new TridentTopology(); + topology.newStream("spout1", spout) + .batchGlobal() + .each(new Fields("user"),new Debug("print:")) + .parallelismHint(5); + + LocalCluster cluster = new LocalCluster(); + + cluster.submitTopology("trident-function", conf, topology.build()); } - static class Demo{ - private String a; - private String b; - private String c; - - public String getA() { - return a; - } - - public void setA(String a) { - this.a = a; - } - - public String getB() { - return b; - } - - public void setB(String b) { - this.b = b; - } - - public String getC() { - return c; - } - - public void setC(String c) { - this.c = c; - } - } } diff --git a/storm-topology.log b/storm-topology.log new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/storm-topology.log |
