summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhoujinchuan <[email protected]>2021-12-01 16:19:14 +0800
committerhoujinchuan <[email protected]>2021-12-01 16:19:14 +0800
commit5b998bd30c956bfe12c7749aa16cd1d72289e599 (patch)
tree61f1f8350b18a6b40ff4d4b843203d4e6f2c1ea8
parent45eaa5af4acb81c7d5755c6eee72aa0e2c657151 (diff)
首次提交
-rw-r--r--config/application.yml19
-rw-r--r--config/log4j2-dev.xml56
-rw-r--r--docker/Dockerfile24
-rw-r--r--pom.xml269
-rw-r--r--src/main/java/com/zdjizhi/syncfile/P19FileSyncServiceApplication.java19
-rw-r--r--src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java125
-rw-r--r--src/main/java/com/zdjizhi/syncfile/config/KafkaConsumerConfig.java126
-rw-r--r--src/main/java/com/zdjizhi/syncfile/config/ThreadPoolFactory.java31
-rw-r--r--src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java59
-rw-r--r--src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java89
-rw-r--r--src/main/java/com/zdjizhi/syncfile/entity/Data.java40
-rw-r--r--src/main/java/com/zdjizhi/syncfile/entity/PostFileResponse.java43
-rw-r--r--src/main/java/com/zdjizhi/syncfile/entity/Source.java23
-rw-r--r--src/main/java/com/zdjizhi/syncfile/entity/SysFileSync.java51
-rw-r--r--src/main/java/com/zdjizhi/syncfile/monitor/LogChartMetricsFilter.java95
-rw-r--r--src/main/java/com/zdjizhi/syncfile/monitor/MonitorProperties.java81
-rw-r--r--src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java103
17 files changed, 1253 insertions, 0 deletions
diff --git a/config/application.yml b/config/application.yml
new file mode 100644
index 0000000..a2f9d98
--- /dev/null
+++ b/config/application.yml
@@ -0,0 +1,19 @@
+nacos:
+ config:
+ type: yaml
+ server-addr: 192.168.44.12:8848
+ namespace: dev
+ data-id: p19-file-sync-service
+ auto-refresh: true
+ group: Galaxy
+ username: nacos
+ password: nacos
+ bootstrap:
+ enable: true
+ log:
+ enable: true
+spring:
+ profiles:
+ active: dev
+logging:
+ config: ./config/log4j2-dev.xml \ No newline at end of file
diff --git a/config/log4j2-dev.xml b/config/log4j2-dev.xml
new file mode 100644
index 0000000..0cf368f
--- /dev/null
+++ b/config/log4j2-dev.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+ <!--日志打印相关参数配置-->
+ <Properties>
+ <!--每5M压缩日志文件-->
+ <property name="LOG_SIZE">200M</property>
+ <!--最多产生10个压缩文件-->
+ <property name="LOG_NUMS">10</property>
+ <!--日志打印等级-->
+ <property name="LOG_LEVEL">info</property>
+ <!--日志文件路径-->
+ <property name="LOG_PATH">logs</property>
+ <!--日志文件名称-->
+ <property name="LOG_FILE_NAME">p19-file-sync-service</property>
+ <!--日志打印格式-->
+ <property name="LOG_PATTERN">[%d{yyyy-MM-dd HH:mm:ss}] [%p] [Thread:%t] %l %x - %m%n</property>
+ </Properties>
+
+ <appenders>
+ <Console name="consoleSystemOutAppender" target="SYSTEM_OUT">
+ <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout pattern="${LOG_PATTERN}"/>
+ </Console>
+
+ <RollingFile name="rollingFileAllAppender"
+ fileName="${LOG_PATH}/${LOG_FILE_NAME}.log"
+ filePattern="${LOG_PATH}/history/$${date:yyyy-MM-dd}/${LOG_FILE_NAME}-%d{yyyy-MM-dd}-%i.log.gz">
+ <PatternLayout pattern="${LOG_PATTERN}"/>
+ <Policies>
+ <SizeBasedTriggeringPolicy size="${LOG_SIZE}"/>
+ <TimeBasedTriggeringPolicy interval="1" modulate="true"/>
+ </Policies>
+ <Filters>
+ <ThresholdFilter level="all" onMatch="ACCEPT" onMismatch="DENY"/>
+ </Filters>
+ <DefaultRolloverStrategy max="${LOG_NUMS}">
+ <Delete basePath="${LOG_PATH}/history" maxDepth="1">
+ <IfFileName glob="*.log.gz">
+ <IfLastModified age="90d">
+ <IfAny>
+ <IfAccumulatedFileSize exceeds="200 GB" />
+ </IfAny>
+ </IfLastModified>
+ </IfFileName>
+ </Delete>
+ </DefaultRolloverStrategy>
+ </RollingFile>
+ </appenders>
+ <loggers>
+ <root level="${LOG_LEVEL}">
+ <appender-ref ref="consoleSystemOutAppender"/>
+ <appender-ref ref="rollingFileAllAppender"/>
+ </root>
+ </loggers>
+</configuration> \ No newline at end of file
diff --git a/docker/Dockerfile b/docker/Dockerfile
new file mode 100644
index 0000000..003dc2b
--- /dev/null
+++ b/docker/Dockerfile
@@ -0,0 +1,24 @@
+ARG JDK_IMAGE
+ARG GO_IMAGE
+#编译环境,生成二进制文件
+#FROM 192.168.40.153:9080/common/golang:1.15.6 as builder
+FROM ${GO_IMAGE} as builder
+
+WORKDIR /build
+COPY xjar.go /build/
+RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o xjar .
+
+FROM ${JDK_IMAGE}
+MAINTAINER Galaxy
+VOLUME /tmp
+WORKDIR /home/tsg/galaxy/p19-file-sync-service
+COPY --from=builder /build .
+COPY config config
+ARG JAR_FILE
+COPY ${JAR_FILE} p19-file-sync-service.xjar
+#dockercompose set JAVA_OPTS
+ENV JAVA_OPTS=" -Xmx1g -Xms1g -Xmn128m -Xss256k -XX:MetaspaceSize=128m -XX:MaxPermSize=128m -XX:SurvivorRatio=2 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSParallelRemarkEnabled -XX:MaxTenuringThreshold=15 -XX:+UseCMSCompactAtFullCollection -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 "
+ENV LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8
+#ENV TZ=Asia/Almaty
+#RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
+ENTRYPOINT [ "sh", "-c", "./xjar java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar p19-file-sync-service.xjar" ] \ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..7d84db1
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,269 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>p19-file-sync-service</artifactId>
+ <version>21.12.01</version>
+ <name>p19-file-sync-service</name>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.2.7.RELEASE</version>
+ <relativePath/>
+ </parent>
+
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public/</url>
+ </repository>
+ </repositories>
+ <pluginRepositories>
+ <pluginRepository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public/</url>
+ </pluginRepository>
+ </pluginRepositories>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <java.version>1.8</java.version>
+ <nacos.config.version>0.2.8</nacos.config.version>
+ <docker.build>192.168.40.153</docker.build>
+ <docker.build.port>2375</docker.build.port>
+ <!--docker-registry地址-->
+ <docker.registry>192.168.40.153</docker.registry>
+ <!--docker-registry的端口-->
+ <docker.registry.port>9080</docker.registry.port>
+ <docker.image.prefix>tsg/galaxy</docker.image.prefix>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-xml</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.30</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.30</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.69</version>
+ </dependency>
+ <dependency>
+ <groupId>cn.hutool</groupId>
+ <artifactId>hutool-all</artifactId>
+ <version>5.5.7</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ <version>4.1.3</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba.boot</groupId>
+ <artifactId>nacos-config-spring-boot-starter</artifactId>
+ <version>${nacos.config.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.micrometer</groupId>
+ <artifactId>micrometer-registry-prometheus</artifactId>
+ <version>1.5.4</version>
+ </dependency>
+ <dependency>
+ <groupId>io.micrometer</groupId>
+ <artifactId>micrometer-core</artifactId>
+ <version>1.5.4</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.github.zlika</groupId>
+ <artifactId>reproducible-build-maven-plugin</artifactId>
+ <version>0.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>strip-jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>com.mesalab.xjar-maven-plugin</groupId>
+ <artifactId>mesalab-xjar-maven-plugin</artifactId>
+ <version>1.0.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <password>Geedge2020!</password>
+ <excludes>
+ <exclude>
+ static/**
+ </exclude>
+ <exclude>
+ templates/**
+ </exclude>
+ <exclude>
+ resources/**
+ </exclude>
+ <exclude>
+ META-INF/resources/**
+ </exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>com.spotify</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <serverId>153-docker-repo</serverId>
+ <registryUrl>${docker.registry}:${docker.registry.port}</registryUrl>
+ <!--是否向镜像registry(harbor)中推送镜像,如果为false则需要在mvn命令时添加-DpushImage参数-->
+ <pushImage>true</pushImage>
+ <!--指定镜像名称 仓库/镜像名:标签-->
+ <imageName>${docker.registry}:${docker.registry.port}/${docker.image.prefix}/${project.artifactId}
+ </imageName>
+ <!--覆盖相同标签镜像-->
+ <forceTags>true</forceTags>
+
+ <imageTags>
+ <imageTag>${project.version}</imageTag>
+ </imageTags>
+ <!--指定仓库地址 远程docker构建,供dockerfile使用-->
+ <dockerHost>http://192.168.40.153:2375</dockerHost>
+ <!--Dockerfile文件所在目录-->
+ <dockerDirectory>docker</dockerDirectory>
+ <buildArgs>
+ <JDK_IMAGE>192.168.40.153:9080/common/jdk:1.8.0_73-jre</JDK_IMAGE>
+ <GO_IMAGE>192.168.40.153:9080/common/golang:1.15.6</GO_IMAGE>
+ <JAR_FILE>${project.build.finalName}.xjar</JAR_FILE>
+ </buildArgs>
+
+
+ <!--将构建jar拷贝到/target/docker 目录下与dockerfile一起-->
+ <resources>
+ <resource>
+ <!-- 指定要复制的目录路径,这里是当前目录 -->
+ <targetPath>/</targetPath>
+ <!-- 指定要复制的根目录,这里是target目录 -->
+ <directory>${project.build.directory}</directory>
+ <!-- 指定需要拷贝的文件,这里指最后生成的jar包 -->
+ <include>${project.build.finalName}.xjar</include>
+ <include>xjar.go</include>
+ </resource>
+
+ <resource>
+ <targetPath>/config</targetPath>
+ <directory>config</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project> \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/syncfile/P19FileSyncServiceApplication.java b/src/main/java/com/zdjizhi/syncfile/P19FileSyncServiceApplication.java
new file mode 100644
index 0000000..0208866
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/P19FileSyncServiceApplication.java
@@ -0,0 +1,19 @@
+package com.zdjizhi.syncfile;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class P19FileSyncServiceApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(P19FileSyncServiceApplication.class, args);
+ }
+ @Bean
+ MeterRegistryCustomizer<MeterRegistry> configurer(@Value("${spring.application.name}") String applicationName){
+ return registry -> registry.config().commonTags("application", applicationName);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java b/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java
new file mode 100644
index 0000000..e9f47c5
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java
@@ -0,0 +1,125 @@
+package com.zdjizhi.syncfile.config;
+
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+
+@Component
+//@ConfigurationProperties(prefix = "http")
+@NacosConfigurationProperties(prefix = "http", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+public class HttpClientPool {
+
+ private Integer maxTotal;
+
+ private Integer defaultMaxPerRoute;
+
+ private Integer connectTimeout;
+
+ private Integer connectionRequestTimeout;
+
+ private Integer socketTimeout;
+
+ private boolean staleConnectionCheckEnabled;
+
+
+ public void setMaxTotal(Integer maxTotal) {
+ this.maxTotal = maxTotal;
+ }
+
+ public void setDefaultMaxPerRoute(Integer defaultMaxPerRoute) {
+ this.defaultMaxPerRoute = defaultMaxPerRoute;
+ }
+
+ public void setConnectTimeout(Integer connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public void setConnectionRequestTimeout(Integer connectionRequestTimeout) {
+ this.connectionRequestTimeout = connectionRequestTimeout;
+ }
+
+ public void setSocketTimeout(Integer socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+
+ public void setStaleConnectionCheckEnabled(boolean staleConnectionCheckEnabled) {
+ this.staleConnectionCheckEnabled = staleConnectionCheckEnabled;
+ }
+
+ /**
+ * 首先实例化一个连接池管理器,设置最大连接数、并发连接数
+ *
+ * @return
+ */
+ @Bean(name = "httpClientConnectionManager")
+ public PoolingHttpClientConnectionManager getHttpClientConnectionManager() {
+ PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager();
+ //最大连接数
+ httpClientConnectionManager.setMaxTotal(maxTotal);
+ //并发数
+ httpClientConnectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute);
+ return httpClientConnectionManager;
+ }
+
+ /**
+ * 实例化连接池,设置连接池管理器。
+ * 这里需要以参数形式注入上面实例化的连接池管理器
+ *
+ * @param httpClientConnectionManager
+ * @return
+ */
+ @Bean(name = "httpClientBuilder")
+ public HttpClientBuilder getHttpClientBuilder(@Qualifier("httpClientConnectionManager") PoolingHttpClientConnectionManager httpClientConnectionManager) {
+ //HttpClientBuilder中的构造方法被protected修饰,所以这里不能直接使用new来实例化一个HttpClientBuilder,可以使用HttpClientBuilder提供的静态方法create()来获取HttpClientBuilder对象
+ HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+ httpClientBuilder.setConnectionManager(httpClientConnectionManager);
+ return httpClientBuilder;
+ }
+
+ /**
+ * 注入连接池,用于获取httpClient
+ *
+ * @param httpClientBuilder
+ * @return
+ */
+ @Bean
+ public CloseableHttpClient getCloseableHttpClient(@Qualifier("httpClientBuilder") HttpClientBuilder httpClientBuilder) {
+ return httpClientBuilder.build();
+ }
+
+ /**
+ * Builder是RequestConfig的一个内部类
+ * 通过RequestConfig的custom方法来获取到一个Builder对象
+ * 设置builder的连接信息
+ * 这里还可以设置proxy,cookieSpec等属性。有需要的话可以在此设置
+ *
+ * @return
+ */
+ @Bean(name = "builder")
+ public RequestConfig.Builder getBuilder() {
+ RequestConfig.Builder builder = RequestConfig.custom();
+ return builder.setConnectTimeout(connectTimeout)
+ .setConnectionRequestTimeout(connectionRequestTimeout)
+ .setSocketTimeout(socketTimeout)
+ .setStaleConnectionCheckEnabled(staleConnectionCheckEnabled);
+
+ }
+
+ /**
+ * 使用builder构建一个RequestConfig对象
+ */
+ @Bean(name = "requestConfig")
+ public RequestConfig getRequestConfig(@Qualifier("builder") RequestConfig.Builder builder) {
+ return builder.build();
+ }
+
+
+}
+
diff --git a/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumerConfig.java b/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumerConfig.java
new file mode 100644
index 0000000..ace726a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumerConfig.java
@@ -0,0 +1,126 @@
+package com.zdjizhi.syncfile.config;
+
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
+import com.zdjizhi.syncfile.consumer.KafkaConsumerListener;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.listener.ContainerProperties;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+//@ConfigurationProperties(prefix = "kafka.consumer")
+@NacosConfigurationProperties(prefix = "kafka.consumer", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+@EnableKafka
+public class KafkaConsumerConfig {
+
+ private String servers;
+ private boolean enable_auto_commit;
+ private String session_timeout;
+ private String auto_commit_interval;
+ private String group_id;
+ private String auto_offset_reset;
+ private int poll_timeout;
+ private int concurrency;
+ private boolean batch_listener;
+ private int pool_record;
+ private String sasl_username;
+ private String sasl_password;
+
+ public void setServers(String servers) {
+ this.servers = servers;
+ }
+
+ public void setEnable_auto_commit(boolean enable_auto_commit) {
+ this.enable_auto_commit = enable_auto_commit;
+ }
+
+ public void setSession_timeout(String session_timeout) {
+ this.session_timeout = session_timeout;
+ }
+
+ public void setAuto_commit_interval(String auto_commit_interval) {
+ this.auto_commit_interval = auto_commit_interval;
+ }
+
+ public void setGroup_id(String group_id) {
+ this.group_id = group_id;
+ }
+
+ public void setAuto_offset_reset(String auto_offset_reset) {
+ this.auto_offset_reset = auto_offset_reset;
+ }
+
+ public void setPoll_timeout(int poll_timeout) {
+ this.poll_timeout = poll_timeout;
+ }
+
+ public void setConcurrency(int concurrency) {
+ this.concurrency = concurrency;
+ }
+
+ public void setBatch_listener(boolean batch_listener) {
+ this.batch_listener = batch_listener;
+ }
+
+ public void setPool_record(int pool_record) {
+ this.pool_record = pool_record;
+ }
+
+ public void setSasl_username(String sasl_username) {
+ this.sasl_username = sasl_username;
+ }
+
+ public void setSasl_password(String sasl_password) {
+ this.sasl_password = sasl_password;
+ }
+
+ @Bean
+ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ factory.setConcurrency(concurrency);
+ factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
+ factory.setBatchListener(batch_listener);//设置为批量消费
+ factory.getContainerProperties().setPollTimeout(poll_timeout);
+ return factory;
+ }
+
+ private ConsumerFactory<String, String> consumerFactory() {
+ return new DefaultKafkaConsumerFactory<>(consumerConfigs());
+ }
+
+
+ private Map<String, Object> consumerConfigs() {
+ Map<String, Object> propsMap = new HashMap<>();
+ propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+ propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit);
+ propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval);
+ propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pool_record);//每一批数量
+ propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, session_timeout);
+ propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
+ propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
+ propsMap.put("security.protocol", "SASL_PLAINTEXT");
+ propsMap.put("sasl.mechanism", "PLAIN");
+ propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="+sasl_username+" password="+sasl_password+";");
+ return propsMap;
+ }
+
+ @Bean
+ public KafkaConsumerListener listener() {
+ return new KafkaConsumerListener();
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/syncfile/config/ThreadPoolFactory.java b/src/main/java/com/zdjizhi/syncfile/config/ThreadPoolFactory.java
new file mode 100644
index 0000000..f0ea5ba
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/config/ThreadPoolFactory.java
@@ -0,0 +1,31 @@
+package com.zdjizhi.syncfile.config;
+
+
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.config.annotation.NacosConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Configuration
+//@ConfigurationProperties(prefix = "thread")
+@NacosConfigurationProperties(prefix = "thread", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
+public class ThreadPoolFactory {
+
+ private Integer maxSize;
+
+ public void setMaxSize(Integer maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ public Integer getMaxSize() {
+ return maxSize;
+ }
+
+ @Bean(name = "threadPool")
+ public ExecutorService getThreadPool() {
+ return Executors.newFixedThreadPool(maxSize);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java b/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java
new file mode 100644
index 0000000..ff33ba2
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java
@@ -0,0 +1,59 @@
+package com.zdjizhi.syncfile.consumer;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+
+import com.zdjizhi.syncfile.core.SyncFiles;
+import com.zdjizhi.syncfile.entity.Source;
+import com.zdjizhi.syncfile.entity.SysFileSync;
+import com.zdjizhi.syncfile.monitor.MonitorProperties;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+public class KafkaConsumerListener {
+ private Log log = LogFactory.get();
+
+ @Autowired
+ SyncFiles syncFiles;
+ @Autowired
+ MonitorProperties monitorProperties;
+
+ @KafkaListener(topics = {"${kafka.consumer.topic}"}, containerFactory = "kafkaListenerContainerFactory")
+ public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+ try {
+ List<List<Source>> fileList = new ArrayList<>();
+ for (ConsumerRecord<?, ?> record : records) {
+ log.info("消费kafka的数据的value: " + record.value().toString());
+ JSONObject jsonObj = (JSONObject) JSON.parse(record.value().toString());
+ SysFileSync sysFileSync = JSON.toJavaObject(jsonObj, SysFileSync.class);
+ if (sysFileSync != null) {
+ List<Source> sourceList = sysFileSync.getSourceList();
+ if(sourceList.size() < 1){
+ log.error("kafka data error, sourceList is null. kafka data: "+record.value().toString());
+ monitorProperties.addFileSyncError();
+ }else {
+ fileList.add(sourceList);
+ }
+ }else {
+ log.error("parse kafka data error. kafka data: "+record.value().toString());
+ monitorProperties.addFileSyncError();
+ }
+ }
+ syncFiles.syncFiles(fileList);
+ } catch (Exception e) {
+ log.error("consume kafka data error.", e);
+ monitorProperties.addFileSyncError();
+ } finally {
+ ack.acknowledge();
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java b/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java
new file mode 100644
index 0000000..6fc49df
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java
@@ -0,0 +1,89 @@
+package com.zdjizhi.syncfile.core;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.syncfile.config.ThreadPoolFactory;
+import com.zdjizhi.syncfile.entity.Source;
+import com.zdjizhi.syncfile.monitor.MonitorProperties;
+import com.zdjizhi.syncfile.utils.HttpUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+@Component
+public class SyncFiles {
+ private Log log = LogFactory.get();
+
+ @Autowired
+ private HttpUtil httpUtil;
+ @Autowired
+ private ExecutorService threadPool;
+ @Autowired
+ private ThreadPoolFactory threadPoolFactory;
+ @Autowired
+ MonitorProperties monitorProperties;
+
+ public void syncFiles(List<List<Source>> fileList) {
+ List<Callable<Boolean>> callableList = new ArrayList<>();
+ try {
+ for (List<Source> sourceList : fileList) {
+ callableList.add(() -> {
+ boolean status = false;
+ InputStream content = null;
+ try {
+ for (Source source : sourceList) {
+ String source_oss_path = source.getSource_oss_path();
+ String destination_oss_path = source.getDestination_oss_path();
+ if (source_oss_path != null && !"".equals(source_oss_path)
+ && destination_oss_path != null && !"".equals(destination_oss_path)) {
+ content = httpUtil.httpGetInputStream(source_oss_path);
+ if (content != null) {
+ boolean isSuccess = httpUtil.httpPostInputStream(destination_oss_path, content);
+ if (!isSuccess) {
+ log.error("Sync file failed, post oss file error. destination_oss_path: {}", destination_oss_path);
+ monitorProperties.addPostFileErrorCount();
+ return false;
+ } else {
+ status = true;
+ }
+ } else {
+ log.error("Sync file failed, get hos file error. source_oss_path: {}", source_oss_path);
+ monitorProperties.addDownloadFileErrorCount();
+ return false;
+ }
+ } else {
+ log.error("Sync file failed, source_oss_path or destination_oss_path is incorrect. source_oss_path: {} ,destination_oss_path: {}", source_oss_path, destination_oss_path);
+ monitorProperties.addFileSyncError();
+ return false;
+ }
+ }
+ } catch (Exception e) {
+ log.error("Sync file failed.", e);
+ monitorProperties.addFileSyncError();
+ status = false;
+ } finally {
+ IoUtil.close(content);
+ }
+ return status;
+ });
+ if (callableList.size() == threadPoolFactory.getMaxSize()) {
+ threadPool.invokeAll(callableList);
+ callableList.clear();
+ }
+ }
+ if (callableList.size() > 0) {
+ threadPool.invokeAll(callableList);
+ callableList.clear();
+ }
+ } catch (InterruptedException e) {
+ log.error("Sync files failed.", e);
+ monitorProperties.addFileSyncError();
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/entity/Data.java b/src/main/java/com/zdjizhi/syncfile/entity/Data.java
new file mode 100644
index 0000000..8963823
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/entity/Data.java
@@ -0,0 +1,40 @@
+package com.zdjizhi.syncfile.entity;
+
+public class Data {
+ private int fileSize;
+ private String sign;
+ private String state;
+
+ public int getFileSize() {
+ return fileSize;
+ }
+
+ public void setFileSize(int fileSize) {
+ this.fileSize = fileSize;
+ }
+
+ public String getSign() {
+ return sign;
+ }
+
+ public void setSign(String sign) {
+ this.sign = sign;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ @Override
+ public String toString() {
+ return "Data{" +
+ "fileSize=" + fileSize +
+ ", sign='" + sign + '\'' +
+ ", state='" + state + '\'' +
+ '}';
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/syncfile/entity/PostFileResponse.java b/src/main/java/com/zdjizhi/syncfile/entity/PostFileResponse.java
new file mode 100644
index 0000000..3e7a80b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/entity/PostFileResponse.java
@@ -0,0 +1,43 @@
+package com.zdjizhi.syncfile.entity;
+
+public class PostFileResponse {
+ private int code;
+ private Data data;
+ private String message;
+
+ public PostFileResponse() {
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public Data getData() {
+ return data;
+ }
+
+ public void setData(Data data) {
+ this.data = data;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public String toString() {
+ return "PostFileResponse{" +
+ "code=" + code +
+ ", data=" + data +
+ ", message='" + message + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/entity/Source.java b/src/main/java/com/zdjizhi/syncfile/entity/Source.java
new file mode 100644
index 0000000..366787e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/entity/Source.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.syncfile.entity;
+
+public class Source {
+
+ private String source_oss_path;
+ private String destination_oss_path;
+
+ public String getSource_oss_path() {
+ return source_oss_path;
+ }
+
+ public void setSource_oss_path(String source_oss_path) {
+ this.source_oss_path = source_oss_path;
+ }
+
+ public String getDestination_oss_path() {
+ return destination_oss_path;
+ }
+
+ public void setDestination_oss_path(String destination_oss_path) {
+ this.destination_oss_path = destination_oss_path;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/entity/SysFileSync.java b/src/main/java/com/zdjizhi/syncfile/entity/SysFileSync.java
new file mode 100644
index 0000000..d979f3a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/entity/SysFileSync.java
@@ -0,0 +1,51 @@
+package com.zdjizhi.syncfile.entity;
+
+import java.util.List;
+
+public class SysFileSync {
+ private List<Source> sourceList;
+ private long common_log_id;
+ private long common_recv_time;
+ private String common_schema_type;
+ private long processing_time;
+
+ public List<Source> getSourceList() {
+ return sourceList;
+ }
+
+ public void setSourceList(List<Source> sourceList) {
+ this.sourceList = sourceList;
+ }
+
+ public long getCommon_log_id() {
+ return common_log_id;
+ }
+
+ public void setCommon_log_id(long common_log_id) {
+ this.common_log_id = common_log_id;
+ }
+
+ public String getCommon_schema_type() {
+ return common_schema_type;
+ }
+
+ public void setCommon_schema_type(String common_schema_type) {
+ this.common_schema_type = common_schema_type;
+ }
+
+ public long getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(long common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }
+
+ public long getProcessing_time() {
+ return processing_time;
+ }
+
+ public void setProcessing_time(long processing_time) {
+ this.processing_time = processing_time;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/monitor/LogChartMetricsFilter.java b/src/main/java/com/zdjizhi/syncfile/monitor/LogChartMetricsFilter.java
new file mode 100644
index 0000000..942c0e6
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/monitor/LogChartMetricsFilter.java
@@ -0,0 +1,95 @@
+package com.zdjizhi.syncfile.monitor;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import javax.servlet.*;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+public class LogChartMetricsFilter implements Filter {
+ @Autowired
+ private MonitorProperties monitorProperties;
+
+ private MeterRegistry registry;
+ private Map<String, Long> dashboardMap = new HashMap<>();
+ private Map<String, Long> errorTypeMap = new HashMap<>();
+
+ public LogChartMetricsFilter(MeterRegistry registry) {
+ this.registry = registry;
+ }
+
+ @Override
+ public void init(FilterConfig filterConfig) {
+ }
+
+ @Override
+ public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
+ HttpServletRequest req = (HttpServletRequest) servletRequest;
+ filterChain.doFilter(servletRequest, servletResponse);
+ if (req.getRequestURI().contains("/prometheus")) {
+ registryDashboardInfo();
+ }
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ /**
+ * 更新info指标
+ */
+ private void registryDashboardInfo() {
+ Long downloadFileSuccessCount = monitorProperties.getDownloadFileSuccessCount();
+ Long downloadFileErrorCount = monitorProperties.getDownloadFileErrorCount();
+ Long postFileSuccessCount = monitorProperties.getPostFileSuccessCount();
+ Long postFileErrorCount = monitorProperties.getPostFileErrorCount();
+ Long downloadFileSize = monitorProperties.getDownloadFileSize();
+ Long postFileSize = monitorProperties.getPostFileSize();
+
+ Long fileSyncError = monitorProperties.getFileSyncError();
+ Long hosError = monitorProperties.getHosError();
+ Long ossError = monitorProperties.getOssError();
+
+ dashboardMap.put("downloadFileSuccessCount", downloadFileSuccessCount);
+ dashboardMap.put("downloadFileErrorCount",downloadFileErrorCount);
+ dashboardMap.put("postFileSuccessCount", postFileSuccessCount);
+ dashboardMap.put("postFileErrorCount", postFileErrorCount);
+ dashboardMap.put("downloadFileSize", downloadFileSize);
+ dashboardMap.put("postFileSize", postFileSize);
+
+ errorTypeMap.put("fileSyncError",fileSyncError);
+ errorTypeMap.put("hosError",hosError);
+ errorTypeMap.put("ossError",ossError);
+
+ registryMetrics(dashboardMap,errorTypeMap);
+ }
+
+ private void registryMetrics(Map<String, Long> map ,Map<String, Long> errorTypeMap) {
+ if (!ObjectUtils.isEmpty(map)) {
+ for (Map.Entry<String, Long> entry : map.entrySet()) {
+ //去除容器中值防止影响
+ Gauge.builder("dashInfo", map, x -> x.get(entry.getKey()))
+ .tags("id", entry.getKey(), "name", entry.getKey(), "severity", entry.getKey())
+ .description("file sync service info")
+ .register(registry);
+ }
+ }
+ if (!ObjectUtils.isEmpty(errorTypeMap)) {
+ for (Map.Entry<String, Long> entry : errorTypeMap.entrySet()) {
+ //去除容器中值防止影响
+ Gauge.builder("error_type_total", errorTypeMap, x -> x.get(entry.getKey()))
+ .tags("id", entry.getKey(), "name", entry.getKey(), "severity", entry.getKey())
+ .description("Number of error type")
+ .register(registry);
+ }
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/syncfile/monitor/MonitorProperties.java b/src/main/java/com/zdjizhi/syncfile/monitor/MonitorProperties.java
new file mode 100644
index 0000000..d81dbd0
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/monitor/MonitorProperties.java
@@ -0,0 +1,81 @@
+package com.zdjizhi.syncfile.monitor;
+
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MonitorProperties {
+ private static Long downloadFileSuccessCount = 0L;
+ private static Long downloadFileErrorCount = 0L;
+ private static Long postFileSuccessCount = 0L;
+ private static Long postFileErrorCount = 0L;
+ private static Long downloadFileSize = 0L;
+ private static Long postFileSize = 0L;
+
+ private static Long fileSyncError = 0L;
+ private static Long hosError = 0L;
+ private static Long ossError = 0L;
+
+ public void addDownloadFileSuccessCount() {
+ downloadFileSuccessCount = downloadFileSuccessCount + 1;
+ }
+ public void addDownloadFileErrorCount() {
+ downloadFileErrorCount = downloadFileErrorCount + 1;
+ }
+ public void addPostFileSuccessCount() {
+ postFileSuccessCount = postFileSuccessCount + 1;
+ }
+ public void addPostFileErrorCount() {
+ postFileErrorCount = postFileErrorCount + 1;
+ }
+ public void addDownloadFileSize(long fileSize) {
+ downloadFileSize = downloadFileSize + fileSize;
+ }
+ public void addPostFileSize(long fileSize) {
+ postFileSize = postFileSize + fileSize;
+ }
+ public void addFileSyncError() {
+ fileSyncError = fileSyncError + 1;
+ }
+ public void addHosError() {
+ hosError = hosError + 1;
+ }
+ public void addOssError() {
+ ossError = ossError + 1;
+ }
+
+ public Long getDownloadFileSuccessCount() {
+ return downloadFileSuccessCount;
+ }
+
+ public Long getDownloadFileErrorCount() {
+ return downloadFileErrorCount;
+ }
+
+ public Long getPostFileSuccessCount() {
+ return postFileSuccessCount;
+ }
+
+ public Long getPostFileErrorCount() {
+ return postFileErrorCount;
+ }
+
+ public Long getDownloadFileSize() {
+ return downloadFileSize;
+ }
+
+ public Long getPostFileSize() {
+ return postFileSize;
+ }
+
+ public Long getFileSyncError() {
+ return fileSyncError;
+ }
+
+ public Long getHosError() {
+ return hosError;
+ }
+
+ public Long getOssError() {
+ return ossError;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java b/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java
new file mode 100644
index 0000000..df8e6ce
--- /dev/null
+++ b/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java
@@ -0,0 +1,103 @@
+package com.zdjizhi.syncfile.utils;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.syncfile.entity.PostFileResponse;
+import com.zdjizhi.syncfile.monitor.MonitorProperties;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.InputStream;
+
+@Component
+public class HttpUtil {
+ private static Log log = LogFactory.get();
+
+ @Autowired
+ private CloseableHttpClient httpClient;
+ @Autowired
+ private RequestConfig requestConfig;
+ @Autowired
+ MonitorProperties monitorProperties;
+
+ public InputStream httpGetInputStream(String url) {
+ InputStream result = null;
+ CloseableHttpResponse response = null;
+ try {
+ HttpGet httpGet = new HttpGet(url);
+ httpGet.setConfig(requestConfig);
+ response = httpClient.execute(httpGet);
+ if (response.getStatusLine().getStatusCode() == 200) {
+ result = IOUtils.toBufferedInputStream(response.getEntity().getContent());
+ log.info("get file success. current url: {}", url);
+ monitorProperties.addDownloadFileSuccessCount();
+ monitorProperties.addDownloadFileSize(Integer.parseInt(response.getFirstHeader("Content-Length").getValue()));
+ }else if (response.getStatusLine().getStatusCode() == 500){
+ log.error("get file error. Hos service error, please check hos. current url: {}", url);
+ monitorProperties.addHosError();
+ } else {
+ log.error("get file error. current url: {}, code: {}, msg: {}", url, response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity(), "UTF-8"));
+ monitorProperties.addFileSyncError();
+ }
+ } catch (Exception e) {
+ log.error("get file error. current url: {}, error: {}", url, e.toString());
+ monitorProperties.addFileSyncError();
+ } finally {
+ IoUtil.close(response);
+ }
+ return result;
+ }
+
+ public boolean httpPostInputStream(String url, InputStream data) {
+ boolean isSuccess = false;
+ CloseableHttpResponse response = null;
+ try {
+ HttpPost httpPost = new HttpPost(url);
+ httpPost.setConfig(requestConfig);
+ httpPost.setEntity(new InputStreamEntity(data));
+ response = httpClient.execute(httpPost);
+ if (response.getStatusLine().getStatusCode() == 200) {
+ String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
+ JSONObject jsonObj = (JSONObject) JSON.parse(responseEntity);
+ if(jsonObj!=null){
+ if (responseEntity.contains("\"code\": 200")) {
+ PostFileResponse postFileResponse = JSON.toJavaObject(jsonObj, PostFileResponse.class);
+ isSuccess = true;
+ log.info("post file success. current url: {}, msg: {}", url, responseEntity);
+ monitorProperties.addPostFileSuccessCount();
+ monitorProperties.addPostFileSize(postFileResponse.getData().getFileSize());
+ } else {
+ log.error("post file error. current url: {}, msg: {}", url,responseEntity);
+ monitorProperties.addFileSyncError();
+ }
+ }else {
+ log.error("post file error, response body error. current url: {}", url);
+ monitorProperties.addOssError();
+ }
+ } else if(response.getStatusLine().getStatusCode() == 500){
+ log.error("post file error. Oss service error.current url: {}, code: 500, msg: {}", url, response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity(), "UTF-8"));
+ monitorProperties.addOssError();
+ }else {
+ log.error("post file error. current url: {}, code: {}, msg: {}", url, response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity(), "UTF-8"));
+ monitorProperties.addFileSyncError();
+ }
+ } catch (Exception e) {
+ log.error("post file error. current url: " + url, e);
+ monitorProperties.addFileSyncError();
+ } finally {
+ IoUtil.close(response);
+ }
+ return isSuccess;
+ }
+}