diff options
| author | LAPTOP-CUUVN8AS\wk <[email protected]> | 2022-02-22 11:47:29 +0800 |
|---|---|---|
| committer | LAPTOP-CUUVN8AS\wk <[email protected]> | 2022-02-22 11:47:29 +0800 |
| commit | d8ceb36b4b1efa62f28f426aed472afe4571c9b9 (patch) | |
| tree | 1941b140395c5876b873d38aa90e1c81b68c36c2 | |
| parent | 23c4bf3fa22387daa0cd90895f0e0846a2b943a9 (diff) | |
| -rw-r--r-- | dependency-reduced-pom.xml | 100 | ||||
| -rw-r--r-- | flink-vpn-recommend.iml | 171 | ||||
| -rw-r--r-- | pom.xml | 240 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/cn/common/CipEntity.java | 37 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/cn/common/Entity.java | 55 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/cn/common/ObjectEntity.java | 55 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/cn/common/ResultEntity.java | 57 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/cn/config/CommonConfigurations.java | 62 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/cn/config/commonConfig.java | 40 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/cn/function/TopNHotItems.java | 137 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/cn/function/TopnHotItem.java | 98 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/cn/function/metricsCalculate.java | 30 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/cn/sink/HbaseSink.java | 67 | ||||
| -rw-r--r-- | src/main/java/com/galaxy/recommend/Recommendation.java | 143 | ||||
| -rw-r--r-- | src/main/resources/common.properties | 74 | ||||
| -rw-r--r-- | src/test/java/com/galaxy/cn/catalog/CatalogTest.java | 7 |
16 files changed, 1373 insertions, 0 deletions
diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml new file mode 100644 index 0000000..d75d52a --- /dev/null +++ b/dependency-reduced-pom.xml @@ -0,0 +1,100 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>com.galaxy.recommend</groupId> + <artifactId>flink-vpn-recommend</artifactId> + <version>21-01-07</version> + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.0</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + <useIncrementalCompilation>false</useIncrementalCompilation> + <compilerArgs> + <arg>-Xpkginfo:always</arg> + </compilerArgs> + </configuration> + </plugin> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>flink-vpn-recommend</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>flink-vpn-recommend-21-01-07</finalName> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer> + <mainClass>com.galaxy.recommend.Recommendation</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.40.125:8099/content/groups/public</url> + </repository> + </repositories> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.12</artifactId> + <version>1.13.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>1.13.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>1.13.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>1.13.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-csv</artifactId> + <version>1.13.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.11</artifactId> + <version>1.13.1</version> + <scope>provided</scope> + </dependency> + </dependencies> + <properties> + <hadoop.version>2.7.1</hadoop.version> + <flink.version>1.13.1</flink.version> + </properties> +</project> diff --git a/flink-vpn-recommend.iml b/flink-vpn-recommend.iml new file mode 100644 index 0000000..e574d38 --- /dev/null +++ b/flink-vpn-recommend.iml @@ -0,0 +1,171 @@ +<?xml version="1.0" encoding="UTF-8"?> +<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4"> + <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8"> + <output url="file://$MODULE_DIR$/target/classes" /> + <output-test url="file://$MODULE_DIR$/target/test-classes" /> + <content url="file://$MODULE_DIR$"> + <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" /> + <sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" /> + <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" /> + <excludeFolder url="file://$MODULE_DIR$/target" /> + </content> + <orderEntry type="inheritedJdk" /> + <orderEntry type="sourceFolder" forTests="false" /> + <orderEntry type="library" name="Maven: com.zdjizhi:galaxy:1.0.6" level="project" /> + <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.4" level="project" /> + <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" /> + <orderEntry type="library" name="Maven: log4j:log4j:1.2.14" level="project" /> + <orderEntry type="library" name="Maven: joda-time:joda-time:2.10" level="project" /> + <orderEntry type="library" name="Maven: com.maxmind.geoip:geoip-api:1.3.1" level="project" /> + <orderEntry type="library" name="Maven: com.maxmind.geoip2:geoip2:2.12.0" level="project" /> + <orderEntry type="library" name="Maven: org.apache.httpcomponents:httpclient:4.5.5" level="project" /> + <orderEntry type="library" name="Maven: org.apache.httpcomponents:httpcore:4.4.9" level="project" /> + <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.9.5" level="project" /> + <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.9.5" level="project" /> + <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.9.5" level="project" /> + <orderEntry type="library" name="Maven: com.maxmind.db:maxmind-db:1.2.2" level="project" /> + <orderEntry type="library" name="Maven: com.google.guava:guava:23.0" level="project" /> + <orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.0.18" level="project" /> + <orderEntry type="library" name="Maven: com.google.j2objc:j2objc-annotations:1.1" level="project" /> + <orderEntry type="library" name="Maven: org.codehaus.mojo:animal-sniffer-annotations:1.14" level="project" /> + <orderEntry type="library" name="Maven: cn.hutool:hutool-all:5.5.2" level="project" /> + <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.21" level="project" /> + <orderEntry type="library" name="Maven: org.slf4j:slf4j-log4j12:1.7.21" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-streaming-java_2.12:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-file-sink-common:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-runtime_2.12:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-queryable-state-client-java:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-hadoop-fs:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-shaded-netty:4.1.49.Final-13.0" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-shaded-jackson:2.12.1-13.0" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-shaded-zookeeper-3:3.4.14-13.0" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.javassist:javassist:3.24.0-GA" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.scala-lang:scala-library:2.12.7" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe.akka:akka-actor_2.12:2.5.21" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe:config:1.3.3" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.scala-lang.modules:scala-java8-compat_2.12:0.8.0" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe.akka:akka-stream_2.12:2.5.21" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.reactivestreams:reactive-streams:1.0.2" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe:ssl-config-core_2.12:0.3.7" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.scala-lang.modules:scala-parser-combinators_2.12:1.1.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe.akka:akka-protobuf_2.12:2.5.21" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe.akka:akka-slf4j_2.12:2.5.21" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.clapper:grizzled-slf4j_2.12:1.3.2" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.github.scopt:scopt_2.12:3.5.0" level="project" /> + <orderEntry type="library" name="Maven: org.xerial.snappy:snappy-java:1.1.8.3" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.twitter:chill_2.12:0.7.6" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.twitter:chill-java:0.7.6" level="project" /> + <orderEntry type="library" name="Maven: org.lz4:lz4-java:1.6.0" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-shaded-guava:18.0-13.0" level="project" /> + <orderEntry type="library" name="Maven: org.apache.commons:commons-math3:3.5" level="project" /> + <orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:1.3.9" level="project" /> + <orderEntry type="library" name="Maven: org.apache.flink:force-shading:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-java:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-core:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-annotations:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-metrics-core:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-shaded-asm-7:7.1-13.0" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.kryo:kryo:2.24.0" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.minlog:minlog:1.2" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.objenesis:objenesis:2.1" level="project" /> + <orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.2" level="project" /> + <orderEntry type="library" name="Maven: org.apache.commons:commons-compress:1.20" level="project" /> + <orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka_2.12:1.13.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.kafka:kafka-clients:2.4.1" level="project" /> + <orderEntry type="library" name="Maven: com.github.luben:zstd-jni:1.4.3-1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.flink:flink-connector-base:1.13.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.zookeeper:zookeeper:3.4.9" level="project" /> + <orderEntry type="library" name="Maven: jline:jline:0.9.94" level="project" /> + <orderEntry type="library" name="Maven: io.netty:netty:3.10.5.Final" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hbase:hbase-client:2.2.3" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hbase.thirdparty:hbase-shaded-protobuf:2.2.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hbase:hbase-common:2.2.3" level="project" /> + <orderEntry type="library" name="Maven: com.github.stephenc.findbugs:findbugs-annotations:1.3.9-1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hbase:hbase-hadoop-compat:2.2.3" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hbase:hbase-metrics-api:2.2.3" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hbase:hbase-hadoop2-compat:2.2.3" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hbase:hbase-metrics:2.2.3" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hbase:hbase-protocol-shaded:2.2.3" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hbase:hbase-protocol:2.2.3" level="project" /> + <orderEntry type="library" name="Maven: commons-codec:commons-codec:1.10" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hbase.thirdparty:hbase-shaded-miscellaneous:2.2.1" level="project" /> + <orderEntry type="library" name="Maven: com.google.protobuf:protobuf-java:2.5.0" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hbase.thirdparty:hbase-shaded-netty:2.2.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.htrace:htrace-core4:4.2.0-incubating" level="project" /> + <orderEntry type="library" name="Maven: org.jruby.jcodings:jcodings:1.0.18" level="project" /> + <orderEntry type="library" name="Maven: org.jruby.joni:joni:2.1.11" level="project" /> + <orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-core:3.2.6" level="project" /> + <orderEntry type="library" name="Maven: org.apache.commons:commons-crypto:1.0.0" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-auth:2.8.5" level="project" /> + <orderEntry type="library" name="Maven: com.nimbusds:nimbus-jose-jwt:4.41.1" level="project" /> + <orderEntry type="library" name="Maven: com.github.stephenc.jcip:jcip-annotations:1.0-1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.directory.server:apacheds-kerberos-codec:2.0.0-M15" level="project" /> + <orderEntry type="library" name="Maven: org.apache.directory.server:apacheds-i18n:2.0.0-M15" level="project" /> + <orderEntry type="library" name="Maven: org.apache.directory.api:api-asn1-api:1.0.0-M20" level="project" /> + <orderEntry type="library" name="Maven: org.apache.directory.api:api-util:1.0.0-M20" level="project" /> + <orderEntry type="library" name="Maven: org.apache.curator:curator-framework:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-common:2.8.5" level="project" /> + <orderEntry type="library" name="Maven: xmlenc:xmlenc:0.52" level="project" /> + <orderEntry type="library" name="Maven: commons-net:commons-net:3.1" level="project" /> + <orderEntry type="library" name="Maven: org.mortbay.jetty:jetty-sslengine:6.1.26" level="project" /> + <orderEntry type="library" name="Maven: commons-logging:commons-logging:1.1.3" level="project" /> + <orderEntry type="library" name="Maven: commons-lang:commons-lang:2.6" level="project" /> + <orderEntry type="library" name="Maven: commons-configuration:commons-configuration:1.6" level="project" /> + <orderEntry type="library" name="Maven: commons-digester:commons-digester:1.8" level="project" /> + <orderEntry type="library" name="Maven: commons-beanutils:commons-beanutils:1.7.0" level="project" /> + <orderEntry type="library" name="Maven: commons-beanutils:commons-beanutils-core:1.8.0" level="project" /> + <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.13" level="project" /> + <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.13" level="project" /> + <orderEntry type="library" name="Maven: org.apache.avro:avro:1.7.4" level="project" /> + <orderEntry type="library" name="Maven: com.thoughtworks.paranamer:paranamer:2.3" level="project" /> + <orderEntry type="library" name="Maven: com.google.code.gson:gson:2.2.4" level="project" /> + <orderEntry type="library" name="Maven: com.jcraft:jsch:0.1.54" level="project" /> + <orderEntry type="library" name="Maven: org.apache.curator:curator-client:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.curator:curator-recipes:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.yetus:audience-annotations:0.5.0" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-client:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-hdfs:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.mortbay.jetty:jetty-util:6.1.26" level="project" /> + <orderEntry type="library" name="Maven: io.netty:netty-all:4.0.23.Final" level="project" /> + <orderEntry type="library" name="Maven: xerces:xercesImpl:2.9.1" level="project" /> + <orderEntry type="library" name="Maven: xml-apis:xml-apis:1.3.04" level="project" /> + <orderEntry type="library" name="Maven: org.apache.htrace:htrace-core:3.1.0-incubating" level="project" /> + <orderEntry type="library" name="Maven: org.fusesource.leveldbjni:leveldbjni-all:1.8" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-app:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-common:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-client:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-server-common:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-shuffle:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-api:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-core:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-common:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.2.2" level="project" /> + <orderEntry type="library" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" /> + <orderEntry type="library" name="Maven: javax.activation:activation:1.1" level="project" /> + <orderEntry type="library" name="Maven: com.sun.jersey:jersey-core:1.9" level="project" /> + <orderEntry type="library" name="Maven: com.sun.jersey:jersey-client:1.9" level="project" /> + <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-jaxrs:1.9.13" level="project" /> + <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-xc:1.9.13" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-jobclient:2.7.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-annotations:2.7.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-json:1.13.1" level="project" /> + <orderEntry type="library" name="Maven: com.alibaba:fastjson:1.2.70" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-csv:1.13.1" level="project" /> + <orderEntry type="library" name="Maven: com.opencsv:opencsv:3.3" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-clients_2.11:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-runtime_2.11:1.13.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe.akka:akka-actor_2.11:2.5.21" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.scala-lang.modules:scala-java8-compat_2.11:0.7.0" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe.akka:akka-stream_2.11:2.5.21" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe:ssl-config-core_2.11:0.3.7" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.scala-lang.modules:scala-parser-combinators_2.11:1.1.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe.akka:akka-protobuf_2.11:2.5.21" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.typesafe.akka:akka-slf4j_2.11:2.5.21" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.clapper:grizzled-slf4j_2.11:1.3.2" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.github.scopt:scopt_2.11:3.5.0" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: com.twitter:chill_2.11:0.7.6" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-optimizer_2.11:1.13.1" level="project" /> + <orderEntry type="library" name="Maven: commons-cli:commons-cli:1.3.1" level="project" /> + <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.flink:flink-streaming-java_2.11:1.13.1" level="project" /> + </component> +</module>
\ No newline at end of file @@ -0,0 +1,240 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.galaxy.recommend</groupId> + <artifactId>flink-vpn-recommend</artifactId> + <version>21-01-07</version> + <repositories> + + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.40.125:8099/content/groups/public</url> + </repository> + + </repositories> + <properties> + <flink.version>1.13.1</flink.version> + <hadoop.version>2.7.1</hadoop.version> + </properties> + + <dependencies> + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> + <version>1.0.6</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>cn.hutool</groupId> + <artifactId>hutool-all</artifactId> + <version>5.5.2</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.21</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.21</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.12</artifactId> + <version>1.13.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_2.12</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.9</version> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>2.2.3</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>2.7.1</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + + </exclusions> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table --> + <!-- <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table</artifactId> + <version>${flink.version}</version> + <type>pom</type> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-connector-kafka_2.11</artifactId> + <version>${flink.version}</version> + </dependency>--> + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.70</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-csv</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.opencsv</groupId> + <artifactId>opencsv</artifactId> + <version>3.3</version> + </dependency> + + + <!-- Flink modules --> + <!-- <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency>--> + + + <!-- <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> +--> + <!-- <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency>--> + + <!-- CLI dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.0</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + <!-- The semantics of this option are reversed, see MCOMPILER-209. --> + <useIncrementalCompilation>false</useIncrementalCompilation> + <compilerArgs> + <!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 --> + <arg>-Xpkginfo:always</arg> + </compilerArgs> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>flink-vpn-recommend</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + + <configuration> + <finalName>flink-vpn-recommend-21-01-07</finalName> + <filters> + <filter> + <!-- Do not copy the signatures in the META-INF folder. + Otherwise, this might cause SecurityExceptions when using the JAR. --> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>com.galaxy.recommend.Recommendation</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> + +</project>
\ No newline at end of file diff --git a/src/main/java/com/galaxy/cn/common/CipEntity.java b/src/main/java/com/galaxy/cn/common/CipEntity.java new file mode 100644 index 0000000..dd6c1ba --- /dev/null +++ b/src/main/java/com/galaxy/cn/common/CipEntity.java @@ -0,0 +1,37 @@ +package com.galaxy.cn.common; + +public class CipEntity { + + public long common_recv_time ; + public String common_client_list ; + public String common_app_label ; + + + public long getCommon_recv_time() { + return common_recv_time; + } + + public void setCommon_recv_time(long common_recv_time) { + this.common_recv_time = common_recv_time; + } + + public String getCommon_client_list() { + return common_client_list; + } + + public void setCommon_client_list(String common_client_list) { + this.common_client_list = common_client_list; + } + + public String getCommon_app_label() { + return common_app_label; + } + + public void setCommon_app_label(String common_app_label) { + this.common_app_label = common_app_label; + } + + public CipEntity() { + } + +} diff --git a/src/main/java/com/galaxy/cn/common/Entity.java b/src/main/java/com/galaxy/cn/common/Entity.java new file mode 100644 index 0000000..e1a8324 --- /dev/null +++ b/src/main/java/com/galaxy/cn/common/Entity.java @@ -0,0 +1,55 @@ +package com.galaxy.cn.common; + +import java.io.Serializable; + +public class Entity implements Serializable { + + public int ifError; + public long common_processing_time ; + public String common_client_ip ; + public String common_app_label ; + public long common_sessions; + + public Entity() { + } + + public int getIfError() { + return ifError; + } + + public void setIfError(int ifError) { + this.ifError = ifError; + } + + public long getCommon_recv_time() { + return common_processing_time; + } + + public String getCommon_app_label() { + return common_app_label; + } + + public void setCommon_app_label(String common_app_label) { + this.common_app_label = common_app_label; + } + + public void setCommon_recv_time(long common_recv_time) { + this.common_processing_time = common_recv_time; + } + + public String getCommon_client_ip() { + return common_client_ip; + } + + public void setCommon_client_ip(String common_client_ip) { + this.common_client_ip = common_client_ip; + } + + public long getCommon_sessions() { + return common_sessions; + } + + public void setCommon_sessions(long common_sessions) { + this.common_sessions = common_sessions; + } +} diff --git a/src/main/java/com/galaxy/cn/common/ObjectEntity.java b/src/main/java/com/galaxy/cn/common/ObjectEntity.java new file mode 100644 index 0000000..f45641b --- /dev/null +++ b/src/main/java/com/galaxy/cn/common/ObjectEntity.java @@ -0,0 +1,55 @@ +package com.galaxy.cn.common; + +public class ObjectEntity implements Comparable<ObjectEntity> { + + public String common_client_ip ; + public String common_app_label ; + public long sessions ; + public long common_recv_time ; + + public long getCommon_recv_time() { + return common_recv_time; + } + + public void setCommon_recv_time(long common_recv_time) { + this.common_recv_time = common_recv_time; + } + + public String getCommon_client_ip() { + return common_client_ip; + } + + public void setCommon_client_ip(String common_client_ip) { + this.common_client_ip = common_client_ip; + } + + public String getCommon_app_label() { + return common_app_label; + } + + public void setCommon_app_label(String common_app_label) { + this.common_app_label = common_app_label; + } + + public long getSessions() { + return sessions; + } + + public void setSessions(long sessions) { + this.sessions = sessions; + } + + @Override + public int compareTo(ObjectEntity per) { + if(this.sessions<per.sessions){ + return -1 ; + }else if(this.sessions>per.sessions){ + return 1 ; + }else{ + return this.common_client_ip.compareTo(per.common_client_ip) ; // 调用String中的compareTo()方法 + } + } + + + +} diff --git a/src/main/java/com/galaxy/cn/common/ResultEntity.java b/src/main/java/com/galaxy/cn/common/ResultEntity.java new file mode 100644 index 0000000..31d3697 --- /dev/null +++ b/src/main/java/com/galaxy/cn/common/ResultEntity.java @@ -0,0 +1,57 @@ +package com.galaxy.cn.common; + +public class ResultEntity implements Comparable<ResultEntity> { + + public String common_client_ip ; + public long sessions ; +/* + public String common_app_label ; + public long common_recv_time ; +*/ + +/* public String getCommon_app_label() { + return common_app_label; + } + + public void setCommon_app_label(String common_app_label) { + this.common_app_label = common_app_label; + } + + public long getCommon_recv_time() { + return common_recv_time; + } + + public void setCommon_recv_time(long common_recv_time) { + this.common_recv_time = common_recv_time; + }*/ + + public String getCommon_client_ip() { + return common_client_ip; + } + + public void setCommon_client_ip(String common_client_ip) { + this.common_client_ip = common_client_ip; + } + + public long getSessions() { + return sessions; + } + + public void setSessions(long sessions) { + this.sessions = sessions; + } + + @Override + public int compareTo(ResultEntity per) { + if(this.sessions<per.sessions){ + return -1 ; + }else if(this.sessions>per.sessions){ + return 1 ; + }else{ + return this.common_client_ip.compareTo(per.common_client_ip) ; // 调用String中的compareTo()方法 + } + } + + + +} diff --git a/src/main/java/com/galaxy/cn/config/CommonConfigurations.java b/src/main/java/com/galaxy/cn/config/CommonConfigurations.java new file mode 100644 index 0000000..88ebc6d --- /dev/null +++ b/src/main/java/com/galaxy/cn/config/CommonConfigurations.java @@ -0,0 +1,62 @@ +package com.galaxy.cn.config; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class CommonConfigurations { + + private static Properties propService = new Properties(); + + public static Map<String,String> getHashTableProperty(String key) { + + Map<String,String> map = new HashMap<>(); + + + String[] keyarray = propService.getProperty(key).split(","); + for(String k :keyarray){ + + if(k!=null && !"".equals(k.trim())){ + map.put(k,""); + } + + } + + return map; + } + + public static String getStringProperty(String key) { + + return propService.getProperty(key); + + + } + + public static Integer getIntProperty( String key) { + + return Integer.parseInt(propService.getProperty(key)); + + } + + public static Long getLongProperty(String key) { + return Long.parseLong(propService.getProperty(key)); + + } + + public static Boolean getBooleanProperty(String key) { + return "true".equals(propService.getProperty(key).toLowerCase().trim()); + } + + static { + try { + propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties")); + } catch (Exception e) { + propService = null; + } + } +} diff --git a/src/main/java/com/galaxy/cn/config/commonConfig.java b/src/main/java/com/galaxy/cn/config/commonConfig.java new file mode 100644 index 0000000..b242a31 --- /dev/null +++ b/src/main/java/com/galaxy/cn/config/commonConfig.java @@ -0,0 +1,40 @@ +package com.galaxy.cn.config; + +import java.util.Map; + +/** + * Created by wk on 2021/1/6. + */ +public class commonConfig { + + + public static final String SOURCE_KAFKA_BROKER = CommonConfigurations.getStringProperty("source.kafka.broker"); + public static final String SOURCE_KAFKA_GROUP_ID = CommonConfigurations.getStringProperty("source.kafka.group.id"); + public static final String SOURCE_KAFKA_TOPIC = CommonConfigurations.getStringProperty("source.kafka.topic"); + public static final String ZK_HOST = CommonConfigurations.getStringProperty("zk.host"); + + public static final int SOURCE_KAFKA_PARALLELISM = CommonConfigurations.getIntProperty("source.kafka.parallelism"); + public static final int SINK_HBASE_PARALLELISM = CommonConfigurations.getIntProperty("sink.hbase.parallelism"); + public static final String SINK_HBASE_FM = CommonConfigurations.getStringProperty("sink.hbase.fm"); + public static final String SINK_HBASE_TABLE = CommonConfigurations.getStringProperty("sink.hbase.table"); + + public static final int TASK_PARALLELISM = CommonConfigurations.getIntProperty("task.parallelism"); + + public static final int WATERMARK_TIME = CommonConfigurations.getIntProperty("watermark.time"); + public static final int SLIDINGWINDOW_TIME_MINUTE = CommonConfigurations.getIntProperty("slidingwindow.time.minute"); + public static final int SLIDINGWINDOWSLOT_TIME_MINUTE = CommonConfigurations.getIntProperty("slidingwindowslot.time.minute"); + public static final int TOP_LIMIT = CommonConfigurations.getIntProperty("top.limit"); + + public static final String KAFKA_USER = CommonConfigurations.getStringProperty("kafka.user"); + public static final String KAFKA_PIN = CommonConfigurations.getStringProperty("kafka.pin"); + public static final int KAFKA_SECURITY = CommonConfigurations.getIntProperty("kafka.security"); + public static final String TOOLS_LIBRARY = CommonConfigurations.getStringProperty("tools.library"); + + public static final Boolean no_filter = CommonConfigurations.getBooleanProperty("has.filter"); + public static final Map<String,String> app_white_list= CommonConfigurations.getHashTableProperty("app.white.list"); + + public static final String SESSION_TIMEOUT_MS=CommonConfigurations.getStringProperty("session.timeout.ms"); + public static final String MAX_POLL_RECORD=CommonConfigurations.getStringProperty("max.poll.records"); + public static final String MAX_PARTITION_FETCH_BYTES=CommonConfigurations.getStringProperty("max.partition.fetch.bytes"); + +} diff --git a/src/main/java/com/galaxy/cn/function/TopNHotItems.java b/src/main/java/com/galaxy/cn/function/TopNHotItems.java new file mode 100644 index 0000000..f43a257 --- /dev/null +++ b/src/main/java/com/galaxy/cn/function/TopNHotItems.java @@ -0,0 +1,137 @@ +package com.galaxy.cn.function; + +import com.alibaba.fastjson.JSONObject; +import com.galaxy.cn.common.ObjectEntity; +import com.galaxy.cn.common.ResultEntity; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.*; + +public class TopNHotItems extends KeyedProcessFunction<Tuple1<String>, ObjectEntity, Tuple3<String,String,Long>> { + private final int topSize; + // Set<ObjectEntity> allSet = new TreeSet<ObjectEntity>() ; + private Map<String,PriorityQueue<ResultEntity>> resultMap ; + private Map<String,Map<String,Long>> countMap ; + + public TopNHotItems(int i) { + this.topSize = i; + + + } + @Override + public void open(Configuration parameters) throws Exception { + this.resultMap= new HashMap<>(); + this.countMap= new HashMap<>(); + + } + + + @Override + public void processElement(ObjectEntity objectEntity, Context context, Collector<Tuple3<String,String,Long>> collector) throws Exception { + //allSet.add(objectEntity); + + + if(countMap.containsKey(objectEntity.getCommon_app_label())) { + + PriorityQueue<ResultEntity> queue =resultMap.get(objectEntity.getCommon_app_label()); + Map<String,Long> map = countMap.get(objectEntity.getCommon_app_label()); + if (map.size() < topSize) { + ResultEntity resultEntity =new ResultEntity(); + resultEntity.setSessions(objectEntity.getSessions()); + resultEntity.setCommon_client_ip(objectEntity.getCommon_client_ip()); + // resultEntity.setCommon_app_label(objectEntity.getCommon_app_label()); + // resultEntity.setCommon_recv_time(objectEntity.getCommon_recv_time()); + queue.add(resultEntity); + map.put(objectEntity.getCommon_client_ip(),objectEntity.getSessions()); + } else { + if (queue.peek() != null) { + ResultEntity res=queue.peek(); + if (res.getSessions() <= objectEntity.getSessions()) { + queue.poll(); + map.remove(res.getCommon_client_ip()); + ResultEntity resultEntity =new ResultEntity(); + resultEntity.setSessions(objectEntity.getSessions()); + resultEntity.setCommon_client_ip(objectEntity.getCommon_client_ip()); + // resultEntity.setCommon_app_label(objectEntity.getCommon_app_label()); + // resultEntity.setCommon_recv_time(objectEntity.getCommon_recv_time()); + queue.add(resultEntity); + map.put(objectEntity.getCommon_client_ip(),objectEntity.getSessions()); + } + } + } + } + else{ + + PriorityQueue<ResultEntity> que = new PriorityQueue<>(); + Map<String,Long> map = new HashMap<>(); + + ResultEntity resultEntity =new ResultEntity(); + resultEntity.setSessions(objectEntity.getSessions()); + resultEntity.setCommon_client_ip(objectEntity.getCommon_client_ip()); + // resultEntity.setCommon_app_label(objectEntity.getCommon_app_label()); + // resultEntity.setCommon_recv_time(objectEntity.getCommon_recv_time()); + que.add(resultEntity); + map.put(objectEntity.getCommon_client_ip(),objectEntity.getSessions()); + resultMap.put(objectEntity.getCommon_app_label(),que); + countMap.put(objectEntity.getCommon_app_label(),map); + + } + //注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收集好了所有 windowEnd的商品数据 + context.timerService().registerEventTimeTimer(objectEntity.getCommon_recv_time() + 1); + } + + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String,String,Long>> out) throws Exception { + + if(resultMap.size()>0) { +/* + List<Map.Entry<String, Long>> objectlist = new ArrayList<Map.Entry<String, Long>>(resultMap.entrySet()); + + + Collections.sort(objectlist, new Comparator<Map.Entry<String, Long>>() { + public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) { + return Math.toIntExact((o2.getValue() - o1.getValue())); + } + }); +*/ + + for (Map.Entry<String, PriorityQueue<ResultEntity>> entry : resultMap.entrySet()) { + PriorityQueue<ResultEntity> queue= entry.getValue(); + String jsonStr = JSONObject.toJSONString(queue); + Tuple3<String,String,Long> emit= new Tuple3<>(entry.getKey(),jsonStr,ctx.timestamp()); + out.collect(emit); + } + resultMap.clear(); + countMap.clear(); + } + + /* if(queue.size()>0) { + int i = 0; + StringBuilder sb = new StringBuilder(); + CipEntity ce = new CipEntity(); + for (ObjectEntity obj : queue) { + + if (i == 0) { + ce.setCommon_app_label(obj.getCommon_app_label()); + ce.setCommon_recv_time(obj.getCommon_recv_time()); + + } + sb.append(obj.getCommon_client_ip()); + sb.append(","); + i++; + if (i >= topSize) { + break; + } + } + ce.setCommon_client_list(sb.toString()); + out.collect(ce); + queue.clear(); + }*/ + + } +}
\ No newline at end of file diff --git a/src/main/java/com/galaxy/cn/function/TopnHotItem.java b/src/main/java/com/galaxy/cn/function/TopnHotItem.java new file mode 100644 index 0000000..02f1fab --- /dev/null +++ b/src/main/java/com/galaxy/cn/function/TopnHotItem.java @@ -0,0 +1,98 @@ +package com.galaxy.cn.function; + +import com.alibaba.fastjson.JSONObject; +import com.galaxy.cn.common.ObjectEntity; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.*; + +public class TopnHotItem extends KeyedProcessFunction<Tuple1<String>, ObjectEntity, Tuple3<String,String,Long>> { + private final int topSize; + // Set<ObjectEntity> allSet = new TreeSet<ObjectEntity>() ; + private PriorityQueue<ObjectEntity> queue ; + private Map<String,Long> resultMap ; + + public TopnHotItem(int i) { + this.topSize = i; + + + } + @Override + public void open(Configuration parameters) throws Exception { + this.queue= new PriorityQueue<>(); + this.resultMap= new HashMap<>(); + } + + + @Override + public void processElement(ObjectEntity objectEntity, Context context, Collector<Tuple3<String,String,Long>> collector) throws Exception { + //allSet.add(objectEntity); + + if (resultMap.size() < topSize) { + queue.add(objectEntity); + resultMap.put(objectEntity.getCommon_client_ip(),objectEntity.getSessions()); + } else { + if(queue.peek()!=null){ + if (queue.peek().getSessions()<objectEntity.getSessions()) { + + resultMap.put(objectEntity.getCommon_client_ip(),objectEntity.getSessions()); + queue.poll(); + queue.add(objectEntity); + } + } + } + //注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收集好了所有 windowEnd的商品数据 + context.timerService().registerEventTimeTimer(objectEntity.getCommon_recv_time() + 1); + + } + + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String,String,Long>> out) throws Exception { + + if(resultMap.size()>0) { + List<Map.Entry<String, Long>> objectlist = new ArrayList<Map.Entry<String, Long>>(resultMap.entrySet()); + + + Collections.sort(objectlist, new Comparator<Map.Entry<String, Long>>() { + public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) { + return Math.toIntExact((o2.getValue() - o1.getValue())); + } + }); + + String jsonStr = JSONObject.toJSONString(objectlist); + + Tuple3<String,String,Long> emit= new Tuple3<>(ctx.getCurrentKey().f0,jsonStr,ctx.timestamp()); + out.collect(emit); + queue.clear(); + resultMap.clear(); + } + /* if(queue.size()>0) { + int i = 0; + StringBuilder sb = new StringBuilder(); + CipEntity ce = new CipEntity(); + for (ObjectEntity obj : queue) { + + if (i == 0) { + ce.setCommon_app_label(obj.getCommon_app_label()); + ce.setCommon_recv_time(obj.getCommon_recv_time()); + + } + sb.append(obj.getCommon_client_ip()); + sb.append(","); + i++; + if (i >= topSize) { + break; + } + } + ce.setCommon_client_list(sb.toString()); + out.collect(ce); + queue.clear(); + }*/ + + } +}
\ No newline at end of file diff --git a/src/main/java/com/galaxy/cn/function/metricsCalculate.java b/src/main/java/com/galaxy/cn/function/metricsCalculate.java new file mode 100644 index 0000000..55f8d76 --- /dev/null +++ b/src/main/java/com/galaxy/cn/function/metricsCalculate.java @@ -0,0 +1,30 @@ +package com.galaxy.cn.function; + +import com.galaxy.cn.common.Entity; +import com.galaxy.cn.common.ObjectEntity; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +public class metricsCalculate extends ProcessWindowFunction< + Entity, // 输入类型 + ObjectEntity, // 输出类型 + Tuple2<String,String>, // 键类型 + TimeWindow> { // 窗口类型 + @Override + public void process(Tuple2<String,String> s, + Context context, + Iterable<Entity> elements, Collector<ObjectEntity> out) throws Exception { + + ObjectEntity objEntity= new ObjectEntity(); + objEntity.setCommon_recv_time(context.window().getEnd()/1000); + objEntity.setCommon_app_label(s.f0); + objEntity.setCommon_client_ip(s.f1); + + for (Entity event : elements) { + objEntity.sessions+=event.common_sessions; + } + out.collect(objEntity); + } +}
\ No newline at end of file diff --git a/src/main/java/com/galaxy/cn/sink/HbaseSink.java b/src/main/java/com/galaxy/cn/sink/HbaseSink.java new file mode 100644 index 0000000..4a14b34 --- /dev/null +++ b/src/main/java/com/galaxy/cn/sink/HbaseSink.java @@ -0,0 +1,67 @@ +package com.galaxy.cn.sink; + +import com.galaxy.cn.config.commonConfig; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; + +import java.io.Serializable; + +import static cn.hutool.crypto.SecureUtil.md5; + +public class HbaseSink extends RichSinkFunction<Tuple3<String,String,Long>> implements Serializable, SinkFunction<Tuple3<String, String,Long>> { + private Logger log; + + private String hbase_zookeeper_host; + + private Connection connection; + private Admin admin; + + public HbaseSink(String hbase_zookeeper_host) { + this.hbase_zookeeper_host = hbase_zookeeper_host; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + log = Logger.getLogger(HbaseSink.class); + + org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create(); + configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host); + + connection = ConnectionFactory.createConnection(configuration); + admin = connection.getAdmin(); + } + + public void invoke(Tuple3<String,String,Long> data, Context context) throws Exception { + // 按 project:table 归纳 + + + Table table = null; + try { + table = connection.getTable(TableName.valueOf(commonConfig.SINK_HBASE_TABLE)); + Put put = new Put(Bytes.toBytes(md5(data.f0))); + put.addColumn(Bytes.toBytes(commonConfig.SINK_HBASE_FM), Bytes.toBytes("app_label"), Bytes.toBytes(data.f0)); + put.addColumn(Bytes.toBytes(commonConfig.SINK_HBASE_FM), Bytes.toBytes("client_ip_list"), Bytes.toBytes(data.f1)); + put.addColumn(Bytes.toBytes(commonConfig.SINK_HBASE_FM), Bytes.toBytes("last_update_time"), Bytes.toBytes(data.f2)); + table.put(put); + } catch (Exception e) { + log.error(e.toString()); + } finally { + table.close(); + } + + } + + @Override + public void close() throws Exception { + super.close(); + } + +} diff --git a/src/main/java/com/galaxy/recommend/Recommendation.java b/src/main/java/com/galaxy/recommend/Recommendation.java new file mode 100644 index 0000000..da19bf2 --- /dev/null +++ b/src/main/java/com/galaxy/recommend/Recommendation.java @@ -0,0 +1,143 @@ +package com.galaxy.recommend; + +import com.alibaba.fastjson.JSON; +import com.galaxy.cn.common.Entity; +import com.galaxy.cn.common.ObjectEntity; +import com.galaxy.cn.config.commonConfig; +import com.galaxy.cn.function.TopNHotItems; +import com.galaxy.cn.function.metricsCalculate; +import com.galaxy.cn.sink.HbaseSink; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.kafka.common.config.SslConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Properties; + +public class Recommendation { + private static final Logger LOG = LoggerFactory.getLogger(Recommendation.class); + + public static void main(String[] args) throws Exception { + + //1.创建执行环境 + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + //指定使用事件时间 + //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + String topic = parameterTool.get("topic", commonConfig.SOURCE_KAFKA_TOPIC); + Properties properties = parameterTool.getProperties(); + properties.setProperty("group.id", commonConfig.SOURCE_KAFKA_GROUP_ID); + properties.setProperty("bootstrap.servers", commonConfig.SOURCE_KAFKA_BROKER); + properties.setProperty("session.timeout.ms", commonConfig.SESSION_TIMEOUT_MS); + properties.setProperty("max.poll.records", commonConfig.MAX_POLL_RECORD); + properties.setProperty("max.partition.fetch.bytes", commonConfig.MAX_PARTITION_FETCH_BYTES); + + switch (commonConfig.KAFKA_SECURITY) { + case 1: + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", commonConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", commonConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", commonConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", commonConfig.KAFKA_PIN); + properties.put("ssl.key.password", commonConfig.KAFKA_PIN); + break; + case 2: + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + commonConfig.KAFKA_USER + " password=" + commonConfig.KAFKA_PIN + ";"); + break; + default: + } + DataStream<String> source = env.addSource(new FlinkKafkaConsumer<String>( + topic, + new SimpleStringSchema(), properties)).setParallelism(commonConfig.SOURCE_KAFKA_PARALLELISM); + WatermarkStrategy<Entity> strategy = WatermarkStrategy + .<Entity>forBoundedOutOfOrderness(Duration.ofSeconds(commonConfig.WATERMARK_TIME)) + .withTimestampAssigner((Entity, timestamp) -> Entity.getCommon_recv_time()*1000); + + + SingleOutputStreamOperator<Entity> input = source.map(new MapFunction<String, Entity>() { + @Override + public Entity map(String message) { + Entity entity =new Entity(); + try { + entity = JSON.parseObject(message, Entity.class); + + + if(!commonConfig.no_filter){ + + if(entity.getCommon_app_label()==null ||"".equals(entity.getCommon_app_label().trim())){ + entity.setIfError(1); + } + } + else{ + if(!commonConfig.app_white_list.containsKey(entity.getCommon_app_label())){ + entity.setIfError(1); + } + } + } + catch (Exception e){ + LOG.error("Entity Parsing ERROR"); + entity.setIfError(1); + } + return entity; + } + }).filter(new FilterFunction<Entity>() { + @Override + public boolean filter(Entity entity) throws Exception { + + return entity.ifError!=1; + } + }); + + + + KeyedStream<Entity, Tuple2<String,String>> keyedStream = input.assignTimestampsAndWatermarks(strategy).keyBy(new MyKeySelector()); + + SingleOutputStreamOperator<ObjectEntity> windowedStream = keyedStream + .window(SlidingEventTimeWindows.of(Time.minutes(commonConfig.SLIDINGWINDOW_TIME_MINUTE), Time.minutes(commonConfig.SLIDINGWINDOWSLOT_TIME_MINUTE))) + .process(new metricsCalculate()); + + SingleOutputStreamOperator<Tuple3<String,String,Long>> windoweddStream = windowedStream.keyBy(new oneKeySelector()) + .process(new TopNHotItems(commonConfig.TOP_LIMIT)); + windoweddStream.addSink(new HbaseSink(commonConfig.ZK_HOST)).setParallelism(commonConfig.SINK_HBASE_PARALLELISM); + env.setParallelism(commonConfig.TASK_PARALLELISM); + env.execute("RECOMMENDATION-APP-CIP"); + + } + public static class MyKeySelector implements KeySelector<Entity,Tuple2<String,String>> { + + @Override + public Tuple2<String,String> getKey(Entity entity) throws Exception { + return new Tuple2<>(entity.getCommon_app_label(),entity.getCommon_client_ip()); + } + } + public static class oneKeySelector implements KeySelector<ObjectEntity,Tuple1<String>> { + + @Override + public Tuple1<String> getKey(ObjectEntity entity) throws Exception { + return new Tuple1<>(entity.getCommon_app_label()); + } + } + + +}
\ No newline at end of file diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties new file mode 100644 index 0000000..d3f24f4 --- /dev/null +++ b/src/main/resources/common.properties @@ -0,0 +1,74 @@ +#kafka�ĵ�ַ��Ϣ +source.kafka.broker=192.168.44.11:9094 +source.kafka.group.id =vpn-1206-1 +source.kafka.topic=SESSION-RECORD-COMPLETED +source.kafka.parallelism=1 +max.poll.records=3000 +session.timeout.ms=60000 +max.partition.fetch.bytes=31457280 +#hbase��zk��ַ +zk.host=192.168.44.12:2181 +#д��hbase���ж� +sink.hbase.parallelism=1 +#д��hbase�д� +sink.hbase.fm=common +#д��hbase���� +sink.hbase.table=tsg_galaxy:recommendation_app_cip +#�����ж� +task.parallelism=1 +#�����ӳٵȴ�ʱ�䵥λ�� +watermark.time=1 +#top������� +top.limit=2 +#����������ʱ�䵥λ���� +slidingwindow.time.minute=30 +#ÿ������ʱ�䵥λ���� +slidingwindowslot.time.minute=1 +#kafka�Ƿ�����ȫ��֤ 0������ 1SSL 2 SASL +kafka.security=2 +#kafka SASL��֤�û��� +kafka.user=admin +#kafka SASL��SSL��֤���� +kafka.pin=galaxy2019 +#1SSL��Ҫ +tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ +#�Ƿ����ȫ��app ��������false ����������true +has.filter=false +#ֻ����filter���е�common_app_label���ŷָ� baidu.com,qq �ɲ���д +app.white.list= + +#source.kafka.broker=10.111.136.55:9092,10.111.136.56:9092,10.111.136.57:9092,10.111.136.58:9092,10.111.136.59:9092,10.111.136.60:9092,10.111.136.61:9092,10.111.136.62:9092,10.111.136.63:9092,10.111.136.64:9092,10.111.136.65:9092,10.111.136.66:9092,10.111.136.67:9092,10.111.136.68:9092,10.111.136.69:9092,10.111.136.70:9092,10.111.136.71:9092,10.111.136.72:9092,10.111.136.73:9092,10.111.136.74:9092,10.111.136.75:9092,10.111.136.76:9092,10.111.136.77:9092,10.111.136.78:9092,10.111.136.79:9092,10.111.136.80:9092,10.111.136.81:9092,10.111.136.82:9092,10.111.136.83:9092,10.111.136.84:9092,10.111.136.85:9092,10.111.136.86:9092,10.111.136.87:9092,10.111.136.88:9092,10.111.136.89:9092,10.111.136.90:9092,10.111.136.91:9092,10.111.136.92:9092,10.111.136.93:9092,10.111.136.94:9092,10.111.136.95:9092,10.111.136.96:9092,10.111.136.97:9092,10.111.136.98:9092,10.111.136.99:9092,10.111.136.100:9092,10.111.136.101:9092,10.111.136.102:9092,10.111.136.103:9092,10.111.136.104:9092 +##source.kafka.broker=10.111.200.135:9092,10.111.200.136:9092,10.111.200.137:9092,10.111.200.138:9092,10.111.200.139:9092,10.111.200.140:9092,10.111.200.141:9092,10.111.200.142:9092,10.111.200.143:9092,10.111.200.144:9092,10.111.200.145:9092,10.111.200.146:9092,10.111.200.147:9092,10.111.200.148:9092,10.111.200.149:9092,10.111.200.150:9092,10.111.200.151:9092,10.111.200.152:9092,10.111.200.153:9092,10.111.200.154:9092,10.111.200.155:9092,10.111.200.156:9092,10.111.200.158:9092,10.111.200.159:9092,10.111.200.160:9092,10.111.200.161:9092,10.111.200.162:9092,10.111.200.163:9092,10.111.200.164:9092 +#source.kafka.group.id=vpn-1120-1 +#source.kafka.topic=test +##source.kafka.topic=CONNECTION-RECORD-COMPLETED-LOG +#source.kafka.parallelism=60 +#sink.hbase.parallelism=60 +#sink.hbase.fm=common +#sink.hbase.table=tsg_galaxy:recommendation_app_cip +# +#task.parallelism=60 +#watermark.time=30 +#top.limit=10000 +#zk.host=10.111.200.165,10.111.200.166,10.111.200.167,10.111.200.168,10.111.200.169 +#zk.port=2181 +#slidingwindow.time.minute=30 +#slidingwindowslot.time.minute=1 +# +# +# +# +#kafka.security=0 +##kafka SASL��֤�û��� +#kafka.user=admin +##kafka SASL��SSL��֤���� +#kafka.pin=galaxy2019 +##1SSL��Ҫ +#tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ +# +# +##�Ƿ����ȫ��app ��������false ����������true +#has.filter=false +##ֻ����filter���е�common_app_label���ŷָ� baidu.com,qq +#app.white.list=, +# diff --git a/src/test/java/com/galaxy/cn/catalog/CatalogTest.java b/src/test/java/com/galaxy/cn/catalog/CatalogTest.java new file mode 100644 index 0000000..be8c696 --- /dev/null +++ b/src/test/java/com/galaxy/cn/catalog/CatalogTest.java @@ -0,0 +1,7 @@ +package com.galaxy.cn.catalog; + +public class CatalogTest { + public static void main(String[] args) { + + } +} |
