summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfangshunjian <[email protected]>2021-08-27 11:01:18 +0800
committerfangshunjian <[email protected]>2021-08-27 11:01:18 +0800
commit27dc7bc7b32c6a17dde2635776f369bec1e3dccc (patch)
tree3aae79be8543ef9956f53c0e9c7be514b283c78f
parente9fd64997b300fb55d64fb89902dd55533d00962 (diff)
fix: loki push 代理接口修改
1、loki push 代理接口接收到数据立即返回 204,并存放到有界队列 2、每个推送接口启动一个 线程推送 到 目标loki push 接口 3、新增 httpclient ,开启client 连接池
-rw-r--r--pom.xml109
-rw-r--r--src/main/java/net/geedge/confagent/ConfagentApplication.java27
-rw-r--r--src/main/java/net/geedge/confagent/config/InitAgent.java50
-rw-r--r--src/main/java/net/geedge/confagent/config/RestTemplateConfig.java122
-rw-r--r--src/main/java/net/geedge/confagent/controller/ConfigController.java45
-rw-r--r--src/main/java/net/geedge/confagent/controller/LokiController.java475
-rw-r--r--src/main/java/net/geedge/confagent/entity/LokiPushUrlEntity.java6
-rw-r--r--src/main/java/net/geedge/confagent/thread/LokiContext.java74
-rw-r--r--src/main/java/net/geedge/confagent/thread/LokiPushThread.java117
-rw-r--r--src/main/java/net/geedge/confagent/thread/RequestThread.java118
-rw-r--r--src/main/java/net/geedge/confagent/util/LimitedQueue.java22
11 files changed, 680 insertions, 485 deletions
diff --git a/pom.xml b/pom.xml
index 9e6509e..1eb7af3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,13 +47,19 @@
<artifactId>fastjson</artifactId>
<version>1.2.45</version>
</dependency>
+ <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+
<!-- 添加 XJar 依赖 -->
-<!-- <dependency>-->
-<!-- <groupId>com.github.core-lib</groupId>-->
-<!-- <artifactId>xjar</artifactId>-->
-<!-- <version>4.0.1</version>-->
-<!-- &lt;!&ndash; <scope>test</scope> &ndash;&gt;-->
-<!-- </dependency>-->
+ <!-- <dependency> -->
+ <!-- <groupId>com.github.core-lib</groupId> -->
+ <!-- <artifactId>xjar</artifactId> -->
+ <!-- <version>4.0.1</version> -->
+ <!-- &lt;!&ndash; <scope>test</scope> &ndash;&gt; -->
+ <!-- </dependency> -->
</dependencies>
<build>
@@ -123,78 +129,41 @@
<phase>package</phase>
<configuration>
-<!-- <password>111111</password>-->
- <!-- <includes>-->
- <!-- <include>cn/nis/ntc/**</include>-->
- <!-- </includes>-->
- <!-- <excludes>-->
- <!-- <exclude>cn/nis/ntc/api/config/Swagger2Configuration.class</exclude>-->
- <!-- </excludes>-->
+ <!-- <password>111111</password> -->
+ <!-- <includes> -->
+ <!-- <include>cn/nis/ntc/**</include> -->
+ <!-- </includes> -->
+ <!-- <excludes> -->
+ <!-- <exclude>cn/nis/ntc/api/config/Swagger2Configuration.class</exclude> -->
+ <!-- </excludes> -->
</configuration>
</execution>
</executions>
</plugin>
- <!-- <plugin>
- <groupId>pl.project13.maven</groupId>
- <artifactId>git-commit-id-plugin</artifactId>
- <version>4.0.0</version>
-
- <executions>
- <execution>
- <id>get-the-git-infos</id>
- <goals>
- 目标:revision
- <goal>revision</goal>
- </goals>
- 默认绑定阶段initialize
- <phase>initialize</phase>
- </execution>
- </executions>
- <configuration>
- 检查的仓库根目录,${project.basedir}:项目根目录,即包含pom.xml文件的目录
- <dotGitDirectory>${project.basedir}/.git</dotGitDirectory>
- false:扫描路径时不打印更多信息,默认值false,可以不配置
- <verbose>true</verbose>
- 定义插件中所有时间格式,默认值:yyyy-MM-dd’T’HH:mm:ssZ
- <dateFormat>yyyy-MM-dd HH:mm:ss</dateFormat>
- git属性文件中各属性前缀,默认值git,可以不配置
- <prefix>git</prefix>
- 生成git属性文件,默认false:不生成
- <generateGitPropertiesFile>true</generateGitPropertiesFile>
- 生成git属性文件路径及文件名,默认${project.build.outputDirectory}/git.properties
- <generateGitPropertiesFilename>${project.build.outputDirectory}/git.properties</generateGitPropertiesFilename>
- 生成git属性文件格式,默认值properties
- <format>json</format>
- <commitIdGenerationMode>full</commitIdGenerationMode>
-
- 配置git-describe命令
- <gitDescribe>
- <skip>false</skip>
- <always>false</always>
- <dirty>-dirty</dirty>
- </gitDescribe>
-
- 获取指定属性
- <includeOnlyProperties>
- <includeOnlyProperty>^git.branch$</includeOnlyProperty>
- <includeOnlyProperty>^git.build.version$</includeOnlyProperty>
- <includeOnlyProperty>^git.commit.id.full$</includeOnlyProperty>
- <includeOnlyProperty>^git.commit.time$</includeOnlyProperty>
- <includeOnlyProperty>^git.commit.tags$</includeOnlyProperty>
- <includeOnlyProperty>^git.closest.tag.name$</includeOnlyProperty>
- </includeOnlyProperties>
-
- <evaluateOnCommit>HEAD</evaluateOnCommit>
- <useBranchNameFromBuildEnvironment>true</useBranchNameFromBuildEnvironment>
- <injectIntoSysProperties>false</injectIntoSysProperties>
- </configuration>
- </plugin> -->
+ <!-- <plugin> <groupId>pl.project13.maven</groupId> <artifactId>git-commit-id-plugin</artifactId>
+ <version>4.0.0</version> <executions> <execution> <id>get-the-git-infos</id>
+ <goals> 目标:revision <goal>revision</goal> </goals> 默认绑定阶段initialize <phase>initialize</phase>
+ </execution> </executions> <configuration> 检查的仓库根目录,${project.basedir}:项目根目录,即包含pom.xml文件的目录
+ <dotGitDirectory>${project.basedir}/.git</dotGitDirectory> false:扫描路径时不打印更多信息,默认值false,可以不配置
+ <verbose>true</verbose> 定义插件中所有时间格式,默认值:yyyy-MM-dd’T’HH:mm:ssZ <dateFormat>yyyy-MM-dd
+ HH:mm:ss</dateFormat> git属性文件中各属性前缀,默认值git,可以不配置 <prefix>git</prefix> 生成git属性文件,默认false:不生成
+ <generateGitPropertiesFile>true</generateGitPropertiesFile> 生成git属性文件路径及文件名,默认${project.build.outputDirectory}/git.properties
+ <generateGitPropertiesFilename>${project.build.outputDirectory}/git.properties</generateGitPropertiesFilename>
+ 生成git属性文件格式,默认值properties <format>json</format> <commitIdGenerationMode>full</commitIdGenerationMode>
+ 配置git-describe命令 <gitDescribe> <skip>false</skip> <always>false</always>
+ <dirty>-dirty</dirty> </gitDescribe> 获取指定属性 <includeOnlyProperties> <includeOnlyProperty>^git.branch$</includeOnlyProperty>
+ <includeOnlyProperty>^git.build.version$</includeOnlyProperty> <includeOnlyProperty>^git.commit.id.full$</includeOnlyProperty>
+ <includeOnlyProperty>^git.commit.time$</includeOnlyProperty> <includeOnlyProperty>^git.commit.tags$</includeOnlyProperty>
+ <includeOnlyProperty>^git.closest.tag.name$</includeOnlyProperty> </includeOnlyProperties>
+ <evaluateOnCommit>HEAD</evaluateOnCommit> <useBranchNameFromBuildEnvironment>true</useBranchNameFromBuildEnvironment>
+ <injectIntoSysProperties>false</injectIntoSysProperties> </configuration>
+ </plugin> -->
</plugins>
</build>
<repositories>
- <!-- 配置私服地址 -->
+ <!-- 配置私服地址 -->
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
@@ -209,7 +178,7 @@
</releases>
</repository>
- <!--opennum仓库-->
+ <!--opennum仓库 -->
<repository>
<id>opennms</id>
<name>opennms</name>
diff --git a/src/main/java/net/geedge/confagent/ConfagentApplication.java b/src/main/java/net/geedge/confagent/ConfagentApplication.java
index ff8758e..c8869f5 100644
--- a/src/main/java/net/geedge/confagent/ConfagentApplication.java
+++ b/src/main/java/net/geedge/confagent/ConfagentApplication.java
@@ -1,35 +1,22 @@
package net.geedge.confagent;
-import cn.hutool.core.io.FileUtil;
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.log.Log;
-import net.geedge.confagent.util.ConfagentUtil;
-import net.geedge.confagent.util.Tool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import java.io.File;
-import java.io.IOException;
-import java.util.Properties;
+import cn.hutool.extra.spring.EnableSpringUtil;
+import cn.hutool.log.Log;
+import net.geedge.confagent.util.Tool;
+@EnableSpringUtil
@SpringBootApplication
public class ConfagentApplication {
private final static Log log = Log.get();
-
+
public static void main(String[] args) {
+ log.debug("application start,args: {}",Tool.StrUtil.join(",", args));
SpringApplication.run(ConfagentApplication.class, args);
- try {
- Properties properties = new Properties();
- File configFile = Tool.FileUtil.file(Tool.WebPathUtil.getRootPath(), "config/application.yml");
- properties.load(FileUtil.getInputStream(configFile));
- String tokenPath = (String) properties.get("tokenFile");
- File tokenFile = Tool.FileUtil.file(Tool.WebPathUtil.getRootPath(), StrUtil.emptyToDefault(tokenPath, "config/token.auth"));
- String token = Tool.FileUtil.readString(tokenFile, Tool.CharsetUtil.UTF_8);
- ConfagentUtil.tokenInMemory = token;
- } catch (IOException e) {
- log.error(e);
- }
+
}
}
diff --git a/src/main/java/net/geedge/confagent/config/InitAgent.java b/src/main/java/net/geedge/confagent/config/InitAgent.java
new file mode 100644
index 0000000..f1ceb37
--- /dev/null
+++ b/src/main/java/net/geedge/confagent/config/InitAgent.java
@@ -0,0 +1,50 @@
+package net.geedge.confagent.config;
+
+import java.io.File;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.stereotype.Component;
+
+import cn.hutool.log.Log;
+import net.geedge.confagent.entity.LokiPushUrlsEntity;
+import net.geedge.confagent.thread.LokiContext;
+import net.geedge.confagent.util.ConfagentUtil;
+import net.geedge.confagent.util.Tool;
+
+@Component
+public class InitAgent implements ApplicationListener<ContextRefreshedEvent> {
+ private final static Log log = Log.get();
+
+ @Value("${confagent.tokenFile:config/token.auth}")
+ private String tokenFilePath;
+
+ @Value("${confagent.lokiPushUrlsFile:config/lokiPushUrls.yml}")
+ private String lokiPushUrlsPath;
+
+ @Override
+ public void onApplicationEvent(ContextRefreshedEvent event) {
+ if (event.getApplicationContext().getParent() == null) {
+ // 加载 token
+ try {
+ File tokenFile = Tool.FileUtil.file(Tool.WebPathUtil.getRootPath(), tokenFilePath);
+ String token = Tool.FileUtil.readString(tokenFile, Tool.CharsetUtil.UTF_8);
+ ConfagentUtil.tokenInMemory = token;
+ } catch (Exception e) {
+ log.error("load token error", e);
+ }
+ // 加载 loki push url
+ try {
+ File lokiFile = Tool.FileUtil.file(Tool.WebPathUtil.getRootPath(), lokiPushUrlsPath);
+ LokiPushUrlsEntity urlsEntity = Tool.YamlUtil.readAsLokiPushUrlsEntity(lokiFile.getAbsolutePath());
+ if (urlsEntity != null && urlsEntity.getLokiPushUrls() != null) {
+ LokiContext.loadUrls(urlsEntity.getLokiPushUrls());
+ }
+ } catch (Exception e) {
+ log.error("load loki push url error", e);
+ }
+ }
+
+ }
+} \ No newline at end of file
diff --git a/src/main/java/net/geedge/confagent/config/RestTemplateConfig.java b/src/main/java/net/geedge/confagent/config/RestTemplateConfig.java
new file mode 100644
index 0000000..899af73
--- /dev/null
+++ b/src/main/java/net/geedge/confagent/config/RestTemplateConfig.java
@@ -0,0 +1,122 @@
+package net.geedge.confagent.config;
+
+import org.apache.http.HeaderElement;
+import org.apache.http.HeaderElementIterator;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.message.BasicHeaderElementIterator;
+import org.apache.http.protocol.HTTP;
+import org.apache.http.protocol.HttpContext;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.client.ClientHttpRequestFactory;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * Http resttemplate 配置
+ **/
+@Configuration
+public class RestTemplateConfig {
+ /**
+ * # 从连接池获取连接的timeout,不宜过大,ms
+ */
+ @Value("${http-pool.connection-request-timeout:200}")
+ private int connectionRequestTimeout;
+ /**
+ * 指客户端和服务器建立连接的超时时间,ms , 最大约21秒,因为内部tcp在进行三次握手建立连接时,默认tcp超时时间是20秒
+ */
+ @Value("${http-pool.connection-timeout:1000}")
+ private int connectionTimeout;
+ /**
+ * 指客户端从服务器读取数据包的间隔超时时间,不是总读取时间,也就是socket timeout,ms
+ */
+ @Value("${http-pool.socket-timeout:500}")
+ private int socketTimeout;
+ /**
+ * #每个路由的最大连接数,如果只调用一个地址,可以将其设置为最大连接数
+ */
+ @Value("${http-pool.max-per-route:10000}")
+ private int maxPerRoute;
+ /**
+ * #连接池的最大连接数,0代表不限;如果取0,需要考虑连接泄露导致系统崩溃的后果
+ */
+ @Value("${http-pool.max-total:1000}")
+ private int maxTotal;
+
+ /**
+ * 长连接保持时间 单位s,不宜过长
+ */
+ @Value("${http-pool.keep-alive-time:60}")
+ private int keepAliveTime;
+
+ @Bean
+ public RestTemplate restTemplate(ClientHttpRequestFactory factory) {
+ return new RestTemplate(factory);
+ }
+
+ @Bean
+ public ClientHttpRequestFactory httpRequestFactory(HttpClient client) {
+ return new HttpComponentsClientHttpRequestFactory(client);
+ }
+
+ @Bean
+ public HttpClient httpClient(ConnectionKeepAliveStrategy keepAliveStrategy) {
+ Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("http", PlainConnectionSocketFactory.getSocketFactory())
+ .register("https", SSLConnectionSocketFactory.getSocketFactory()).build();
+ PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(registry);
+ // 设置整个连接池最大连接数 根据自己的场景决定
+ connectionManager.setMaxTotal(maxTotal);
+ // 路由是对maxTotal的细分
+ connectionManager.setDefaultMaxPerRoute(maxPerRoute);
+ RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(socketTimeout)// 服务器返回数据(response)的时间,超过该时间抛出readtimeout
+ .setConnectTimeout(connectionTimeout)// 连接上服务器(握手成功)的时间,超出该时间抛出connect timeout
+ .setConnectionRequestTimeout(connectionRequestTimeout)// 从连接池中获取连接的超时时间,超过该时间未拿到可用连接,会抛出org.apache.http.conn.ConnectionPoolTimeoutException:Timeout
+ // waiting for connection from pool
+ .build();
+ return HttpClientBuilder.create()
+ .setDefaultRequestConfig(requestConfig)
+ .setConnectionManager(connectionManager)
+ .setKeepAliveStrategy(keepAliveStrategy)
+ .build();
+ }
+
+ /**
+ * 配置长连接保持策略
+ *
+ * @return
+ */
+ @Bean
+ public ConnectionKeepAliveStrategy connectionKeepAliveStrategy() {
+ return new ConnectionKeepAliveStrategy() {
+ @Override
+ public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
+ // Honor 'keep-alive' header
+ HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
+ while (it.hasNext()) {
+ HeaderElement he = it.nextElement();
+ String param = he.getName();
+ String value = he.getValue();
+ if (value != null && "timeout".equalsIgnoreCase(param)) {
+ try {
+ return Long.parseLong(value) * 1000;
+ } catch (NumberFormatException ignore) {
+ }
+ }
+ }
+ return keepAliveTime*1000;
+ }
+ };
+ }
+}
diff --git a/src/main/java/net/geedge/confagent/controller/ConfigController.java b/src/main/java/net/geedge/confagent/controller/ConfigController.java
index 6dde0bc..7b94125 100644
--- a/src/main/java/net/geedge/confagent/controller/ConfigController.java
+++ b/src/main/java/net/geedge/confagent/controller/ConfigController.java
@@ -1,26 +1,32 @@
package net.geedge.confagent.controller;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.alibaba.fastjson.JSONObject;
+
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
-import com.alibaba.fastjson.JSONObject;
import net.geedge.confagent.entity.LokiPushUrlEntity;
import net.geedge.confagent.entity.LokiPushUrlsEntity;
+import net.geedge.confagent.thread.LokiContext;
import net.geedge.confagent.util.ConfagentUtil;
import net.geedge.confagent.util.R;
import net.geedge.confagent.util.RCode;
import net.geedge.confagent.util.Tool;
import net.geedge.confagent.util.Tool.YamlUtil;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.web.bind.annotation.*;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import java.io.File;
-import java.util.*;
-
@RestController
@RequestMapping("/config")
public class ConfigController extends BaseController{
@@ -35,6 +41,7 @@ public class ConfigController extends BaseController{
@Value("${confagent.loki.config}")
private String lokiConfPath;
+
@Autowired
private ConfagentUtil confagentUtil;
@@ -46,19 +53,20 @@ public class ConfigController extends BaseController{
* @Author han
* @Date 2021/8/3
*/
+ @SuppressWarnings("unchecked")
@PostMapping
public R overwriteConfig(HttpServletRequest request, HttpServletResponse response,@RequestBody LokiPushUrlsEntity lokiPushUrls){
+ List<LokiPushUrlEntity> lokiPushUrlList = Tool.ListUtil.list(false);
//进行验证:target和token均非空
if(lokiPushUrls!=null && lokiPushUrls.getLokiPushUrls()!=null) {
int lokiPort = 13100;
Map<String, Object> lokiMap = Tool.YamlUtil.readAsMap(lokiConfPath);
- Map<String, Object> serverObj = (Map<String, Object>) lokiMap.get("server");
+ Map<String, Object> serverObj = (Map<String, Object>) lokiMap.get("server");
if (serverObj != null) {
lokiPort = (Integer) serverObj.get("http_listen_port");
}
-
- List<LokiPushUrlEntity> lokiPushUrlList = lokiPushUrls.getLokiPushUrls();
+ lokiPushUrlList.addAll(lokiPushUrls.getLokiPushUrls());
for(LokiPushUrlEntity urlEntity:lokiPushUrlList) {
if(!validLokiPushUrl(urlEntity)) {
return R.error(RCode.LOKI_PUSH_URL_ISNULL);
@@ -70,10 +78,17 @@ public class ConfigController extends BaseController{
}
}
}
-
+ /*
+ *更新 urls ,并重启线程
+ */
+ LokiContext.loadUrls(lokiPushUrlList);
+ /*
+ * 写入到配置文件
+ */
File lf = Tool.FileUtil.file(rootPath, lokiFile);
YamlUtil.writeAsObject(lokiPushUrls, lf.getAbsolutePath());
LOKI_PUSH_URL_JSONSTR = JSONObject.toJSONString(lokiPushUrls);
+ log.info("config update success");
return R.ok();
}
diff --git a/src/main/java/net/geedge/confagent/controller/LokiController.java b/src/main/java/net/geedge/confagent/controller/LokiController.java
index b4ea551..31f41af 100644
--- a/src/main/java/net/geedge/confagent/controller/LokiController.java
+++ b/src/main/java/net/geedge/confagent/controller/LokiController.java
@@ -1,291 +1,248 @@
package net.geedge.confagent.controller;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.nio.charset.Charset;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import cn.hutool.core.io.IORuntimeException;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.util.ReflectUtil;
-import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpConnection;
import cn.hutool.log.Log;
-import com.alibaba.fastjson.JSONObject;
import net.geedge.confagent.annotation.UnCheckToken;
-import net.geedge.confagent.entity.LokiPushUrlEntity;
-import net.geedge.confagent.entity.LokiPushUrlsEntity;
-import net.geedge.confagent.thread.RequestThread;
+import net.geedge.confagent.thread.LokiPushThread;
import net.geedge.confagent.util.ConfagentUtil;
import net.geedge.confagent.util.R;
import net.geedge.confagent.util.RCode;
import net.geedge.confagent.util.Tool;
-import net.geedge.confagent.util.Tool.YamlUtil;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.web.bind.annotation.*;
-
-import javax.servlet.ServletInputStream;
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.nio.charset.Charset;
-import java.util.*;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/loki")
-public class LokiController extends BaseController{
- private final static Log log = Log.get();
+public class LokiController extends BaseController {
+ private final static Log log = Log.get();
- @Autowired
+ @Autowired
private ConfagentUtil confagentUtil;
-
- @Value("${confagent.loki.config}")
- private String lokiConfPath;
-
- @Value("${confagent.loki.query.auth:true}")
- private Boolean queryAuth;
-
- private final String[] QUERY_API_SUFFIX = {"query","query_range","series","labels","values"};
- @Value("${confagent.loki.startCmd:systemctl start loki.service}")
- private String startCmd;
-
- @Value("${confagent.loki.stopCmd:systemctl stop loki.service}")
- private String stopCmd;
-
- @Value("${confagent.lokiPushUrlsFile:lokiPushUrls.yml}")
- private String lokiFile;
-
- private static String rootPath = Tool.WebPathUtil.getRootPath();
-
- /**
- * @Description 写入loki配置文件
- * @Author han
- * @Date 2021/8/4
- */
- @PostMapping("/config")
- public R overwriteConfig( @RequestBody Map<String,Object> configs){
+ @Value("${confagent.loki.config}")
+ private String lokiConfPath;
- Map<String,Object> lokiConf =(Map<String,Object>) configs.get("config");
+ @Value("${confagent.loki.query.auth:true}")
+ private Boolean queryAuth;
- if(!Tool.MapUtil.isEmpty(lokiConf)){
- log.info("write loki conf:{}", Tool.JSONUtil.toJsonStr(lokiConf));
- Tool.YamlUtil.writeAsMap(lokiConf,lokiConfPath);
- if(Tool.StrUtil.isNotBlank(stopCmd)) {
- log.info("stop loki:"+stopCmd);
- try {
- Tool.RuntimeUtil.exec(stopCmd);
- }catch(Exception e) {
- log.error(e);
- return R.error(RCode.STOP_CMD_ERROR);
- }
- log.info("stop loki:"+stopCmd+" end");
- }
- Tool.ThreadUtil.sleep(1000);
- if(Tool.StrUtil.isNotBlank(startCmd)) {
- log.info("start loki:"+startCmd);
- try {
-// String[] b={"sh","-c",startCmd};
-// Tool.RuntimeUtil.exec(b);
- Tool.RuntimeUtil.exec(startCmd);
- }catch(Exception e) {
- log.error(e);
- return R.error(RCode.START_CMD_ERROR);
- }
- log.info("start loki:"+startCmd+" end");
- }
- }else {
- return R.error(RCode.LOKI_CONFIG_ISNULL);
- }
-
- return R.ok();
- }
-
- /**
- * @Description 获取loki相关配置
- * @Author han
- * @Date 2021/8/4
- */
- @GetMapping("/config")
- public R queryConfig(HttpServletRequest request){
+ private final String[] QUERY_API_SUFFIX = { "query", "query_range", "series", "labels", "values" };
- Map<String,Object> lokiConf = Tool.YamlUtil.readAsMap(lokiConfPath);
-
- Map<String,Object> result = new HashMap<>();
- result.put("config",lokiConf);
+ @Value("${confagent.loki.startCmd:systemctl start loki.service}")
+ private String startCmd;
- return R.ok(result);
- }
+ @Value("${confagent.loki.stopCmd:systemctl stop loki.service}")
+ private String stopCmd;
- /**
- * @Description 代理本loki 接口
- * @Author han
- * @Date 2021/8/4
- */
- @RequestMapping("/proxy/**")
- @UnCheckToken
- public void proxy(HttpServletRequest request, HttpServletResponse response){
- String method = request.getMethod();
- String lokiPath = request.getServletPath().replace("/loki/proxy","");
- String token = request.getHeader("Authorization");
+ @Value("${confagent.lokiPushUrlsFile:lokiPushUrls.yml}")
+ private String lokiFile;
- R r = confagentUtil.checkToken(token);
+ private static String rootPath = Tool.WebPathUtil.getRootPath();
- Boolean isQuery=false;
+ private static final String LOKI_PUSH_URL = "/loki/api/v1/push";
- //queryAuth 配置只限制查询是否需要校验
- if(queryAuth){
- isQuery = Tool.StrUtil.isNotBlank(Arrays.stream(QUERY_API_SUFFIX).filter(t -> lokiPath.indexOf(t) != -1).findAny().orElse(null));
- }
+ /**
+ * @Description 写入loki配置文件
+ * @Author han
+ * @Date 2021/8/4
+ */
+ @PostMapping("/config")
+ public R overwriteConfig(@RequestBody Map<String, Object> configs) {
- if(isQuery &&r.getCode() != RCode.SUCCESS.getCode()){
- ConfagentUtil.writeResponse(response,r);
- return;
- }
+ Map<String, Object> lokiConf = (Map<String, Object>) configs.get("config");
- /*int port = 3100;//ConfagentUtil.getConfFilePort(lokiConfPath,LOKI_LISTEN_ADDR,3100);
- Map<String,Object> lokiMap= Tool.YamlUtil.readAsMap(lokiConfPath);
- Map<String,Object> serverObj = (Map<String,Object>)lokiMap.get("server");
- if(serverObj!=null) {
- port = (Integer)serverObj.get("http_listen_port");
- }*/
+ if (!Tool.MapUtil.isEmpty(lokiConf)) {
+ log.info("write loki conf:{}", Tool.JSONUtil.toJsonStr(lokiConf));
+ Tool.YamlUtil.writeAsMap(lokiConf, lokiConfPath);
+ if (Tool.StrUtil.isNotBlank(stopCmd)) {
+ log.info("stop loki:" + stopCmd);
+ try {
+ Tool.RuntimeUtil.exec(stopCmd);
+ } catch (Exception e) {
+ log.error(e);
+ return R.error(RCode.STOP_CMD_ERROR);
+ }
+ log.info("stop loki:" + stopCmd + " end");
+ }
+ Tool.ThreadUtil.sleep(1000);
+ if (Tool.StrUtil.isNotBlank(startCmd)) {
+ log.info("start loki:" + startCmd);
+ try {
+// String[] b={"sh","-c",startCmd};
+// Tool.RuntimeUtil.exec(b);
+ Tool.RuntimeUtil.exec(startCmd);
+ } catch (Exception e) {
+ log.error(e);
+ return R.error(RCode.START_CMD_ERROR);
+ }
+ log.info("start loki:" + startCmd + " end");
+ }
+ } else {
+ return R.error(RCode.LOKI_CONFIG_ISNULL);
+ }
+
+ return R.ok();
+ }
+
+ /**
+ * @Description 获取loki相关配置
+ * @Author han
+ * @Date 2021/8/4
+ */
+ @GetMapping("/config")
+ public R queryConfig(HttpServletRequest request) {
+ Map<String, Object> lokiConf = Tool.YamlUtil.readAsMap(lokiConfPath);
+ Map<String, Object> result = new HashMap<>();
+ result.put("config", lokiConf);
+ return R.ok(result);
+ }
+
+ /**
+ * @throws IOException
+ * @throws IORuntimeException
+ * @Description 代理本loki 接口
+ * @Author han
+ * @Date 2021/8/4
+ */
+ @RequestMapping("/proxy/**")
+ @UnCheckToken
+ public void proxy(HttpServletRequest request, HttpServletResponse response) throws IORuntimeException, IOException {
+ String lokiPath = request.getServletPath().replace("/loki/proxy", "");
+ String token = request.getHeader("Authorization");
+ R r = confagentUtil.checkToken(token);
+ Boolean isQuery = false;
+ // queryAuth 配置只限制查询是否需要校验
+ if (queryAuth) {
+ isQuery = Tool.ArrayUtil.contains(QUERY_API_SUFFIX, lokiPath);
+ }
+
+ if (isQuery && r.getCode() != RCode.SUCCESS.getCode()) {
+ ConfagentUtil.writeResponse(response, r);
+ return;
+ }
+
+ /*
+ * push 接口 特殊处理
+ * 1、解析请求头
+ * 2、解析请求体
+ * 3、重新组装 请求参数
+ * 4、放到缓存
+ * 5、直接详情成功
+ */
+ if (Tool.StrUtil.equals(lokiPath, LOKI_PUSH_URL)) {
+ // 复制请求头
+ HttpHeaders headers = new HttpHeaders();
+ Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String hn = headerNames.nextElement();
+ headers.add(hn, request.getHeader(hn));
+ }
+ // 复制 body
+ byte[] body = Tool.IoUtil.readBytes(request.getInputStream());
+ // 重新构造请求参数内容
+ HttpEntity<byte[]> httpEntity = new HttpEntity<byte[]>(body, headers);
+ // 放到缓存队列中,等待其它线程消费发送
+ LokiPushThread.addLogCache(httpEntity);
+ //响应 成功
+ response.setStatus(204);
+ return;
+ }
+
+ // 其他api 代理查询本机 LOKI api
+ String queryString = ReflectUtil.invoke(request, "getQueryString");
+ String targetUrl = String.format("%s:%s%s?%s", "127.0.0.1", 13100, lokiPath,Tool.StrUtil.emptyToDefault(queryString, ""));
+ requestLoki(targetUrl, token, request, response);
+
+ }
+
+ public void requestLoki(String targetUrl, String token, HttpServletRequest request, HttpServletResponse response) {
+ String url = UrlBuilder.ofHttp(targetUrl, Charset.forName("utf-8")).toString();
+
+ log.info("loki url: {}", url);
+ String method = request.getMethod();
+ HttpURLConnection conn = null;
+ ServletInputStream reqInputStream = null;
+ ServletOutputStream resOutputStream = null;
+ OutputStream connOutputStream = null;
+ InputStream connInputStream = null;
+ try {
+ conn = HttpConnection.create(url, null).getHttpURLConnection();
+ reqInputStream = request.getInputStream();
+ resOutputStream = response.getOutputStream();
+ conn.setRequestMethod(method);
+ // 复制请求头
+ Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String hn = headerNames.nextElement();
+ if (!"authorization".equalsIgnoreCase(hn)) {
+ ReflectUtil.invoke(conn, "addRequestProperty", hn, request.getHeader(hn));
+ }
+ }
+ ReflectUtil.invoke(conn, "addRequestProperty", "authorization", token);
+
+ if (!"GET".equalsIgnoreCase(method)) {
+ conn.setDoOutput(true);
+ conn.setDoInput(true);
+ conn.setUseCaches(false);
+ connOutputStream = conn.getOutputStream();
+ Tool.IoUtil.copy(reqInputStream, connOutputStream);
+ }
+ conn.connect();
+ int responseCode = conn.getResponseCode();
+ connInputStream = (responseCode < 400) ? conn.getInputStream() : conn.getErrorStream();
+ String responseMessage = conn.getResponseMessage();
+ Map<String, List<String>> responseHeaders = conn.getHeaderFields();
+ // 复制响应头
+ for (Map.Entry<String, List<String>> en : responseHeaders.entrySet()) {
+ String key = en.getKey();
+ if (Tool.StrUtil.isEmpty(key) || "Transfer-Encoding".equals(key))
+ continue;
+ List<String> value = en.getValue();
+ ReflectUtil.invoke(response, "addHeader", key, Tool.StrUtil.join("; ", value));
+ }
+ // ReflectUtil.invoke(response, "setStatus", responseCode, responseMessage);
+ response.setStatus(responseCode, responseMessage);
- LokiPushUrlsEntity lokiPushUrlsEntity;
- if (StrUtil.isEmpty(LOKI_PUSH_URL_JSONSTR)) {
- File lf = Tool.FileUtil.file(rootPath, lokiFile);
- lokiPushUrlsEntity = YamlUtil.readAsLokiPushUrlsEntity(lf.getAbsolutePath());
- } else {
- lokiPushUrlsEntity = JSONObject.parseObject(LOKI_PUSH_URL_JSONSTR, LokiPushUrlsEntity.class);
- }
- if (lokiPushUrlsEntity != null && lokiPushUrlsEntity.getLokiPushUrls() != null
- && lokiPushUrlsEntity.getLokiPushUrls().size() > 0) {
- List<LokiPushUrlEntity> list = lokiPushUrlsEntity.getLokiPushUrls();
- //loki 代理接口 为/loki/api/v1/push 接口 时,根据lokiPushUrls 配置 转发所有的target,否则随机选择一个target转发即可
- if("/loki/api/v1/push".equals(lokiPath)) {
- List<String> rltList = new ArrayList<String>();
-
- for(LokiPushUrlEntity urlEntity:list) {
- String targetUrl = urlEntity.getTarget();
- if(targetUrl.startsWith("http://")) {
- targetUrl = targetUrl.substring(7);
- }
- RequestThread requestThread = new RequestThread();
- requestThread.setTargetUrl(targetUrl);
- // requestThread.setHost(ip);
- requestThread.setToken(urlEntity.getToken());
- // requestThread.setPort(port);
- requestThread.setMethod(method);
- // requestThread.setPath(lokiPath);
- requestThread.setRequest(request);
- requestThread.setResponse(response);
- Future<String> f = Tool.ThreadUtil.execAsync(requestThread);//.execute(requestThread);
- try {
- String rlt = f.get(5, TimeUnit.SECONDS);
- rltList.add(rlt);
- } catch (Exception e) {
- log.error(e);
- }
- }
- if(rltList.size()>0) {
- OutputStream outputStream = null;
- try {
- log.info("loki 返回内容:" + JSONObject.toJSONString(rltList));
- outputStream = response.getOutputStream();
- Tool.IoUtil.writeUtf8(outputStream,true, rltList.get(0));
- outputStream.flush();
-
- } catch (Exception e) {
- log.error(e);
- } finally {
- Tool.IoUtil.close(outputStream);
- }
- }
- } else {
- // 其他api 代理查询本机 LOKI api
- String queryString = ReflectUtil.invoke(request, "getQueryString");
- String targetUrl = String.format("%s:%s%s?%s", "127.0.0.1", 13100, lokiPath, StrUtil.emptyToDefault(queryString, ""));
- requestLoki(targetUrl, token, request, response);
- }
- }else {
- OutputStream outputStream = null;
+ Tool.IoUtil.copy(connInputStream, resOutputStream);
+ resOutputStream.flush();// flush 输出流
+ } catch (Exception e) {
try {
- outputStream = response.getOutputStream();
- Tool.IoUtil.writeUtf8(outputStream,true, "The lokiPushUrls file does not exist");
- outputStream.flush();
-
- } catch (Exception e) {
- log.error(e);
- } finally {
- Tool.IoUtil.close(outputStream);
+ response.sendError(500, "request error");
+ } catch (IOException e1) {
+ log.error("proxy request error", e1);
}
- }
-
- }
-
- public void requestLoki(String targetUrl, String token, HttpServletRequest request, HttpServletResponse response) {
- String url = UrlBuilder.ofHttp(targetUrl, Charset.forName("utf-8")).toString();
-
- log.info("loki url: {}", url);
- String method = request.getMethod();
- HttpURLConnection conn = null;
- ServletInputStream reqInputStream = null;
- ServletOutputStream resOutputStream = null;
- OutputStream connOutputStream = null;
- InputStream connInputStream = null;
- try {
- conn = HttpConnection.create(url, null).getHttpURLConnection();
- reqInputStream = request.getInputStream();
- resOutputStream = response.getOutputStream();
- conn.setRequestMethod(method);
- // 复制请求头
- Enumeration<String> headerNames = request.getHeaderNames();
- while (headerNames.hasMoreElements()) {
- String hn = headerNames.nextElement();
- if(!"authorization".equalsIgnoreCase(hn)) {
- ReflectUtil.invoke(conn,"addRequestProperty",hn,request.getHeader(hn));
- }
- }
- ReflectUtil.invoke(conn,"addRequestProperty","authorization",token);
-
- if (!"GET".equalsIgnoreCase(method)) {
- conn.setDoOutput(true);
- conn.setDoInput(true);
- conn.setUseCaches(false);
- connOutputStream = conn.getOutputStream();
- Tool.IoUtil.copy(reqInputStream, connOutputStream);
- }
- conn.connect();
- int responseCode = conn.getResponseCode();
- connInputStream = (responseCode < 400)? conn.getInputStream():conn.getErrorStream();
- String responseMessage = conn.getResponseMessage();
- Map<String, List<String>> responseHeaders = conn.getHeaderFields();
- //复制响应头
- for(Map.Entry<String, List<String>> en : responseHeaders.entrySet()) {
- String key = en.getKey();
- if (Tool.StrUtil.isEmpty(key) || "Transfer-Encoding".equals(key)) continue;
- List<String> value = en.getValue();
- ReflectUtil.invoke(response,"addHeader",key,Tool.StrUtil.join("; ",value));
- }
- // ReflectUtil.invoke(response, "setStatus", responseCode, responseMessage);
- response.setStatus(responseCode, responseMessage);
-
- Tool.IoUtil.copy(connInputStream, resOutputStream);
- resOutputStream.flush();//flush 输出流
- } catch (Exception e) {
- try {
- response.sendError(500, "request error");
- } catch (IOException e1) {
- log.error("proxy request error",e1);
- }
- log.error("request error : ",e);
- }finally {
- Tool.IoUtil.close(reqInputStream,resOutputStream,connOutputStream,connInputStream);
- if(conn != null){
- conn.disconnect();
- }
- }
- }
+ log.error("request error : ", e);
+ } finally {
+ Tool.IoUtil.close(reqInputStream, resOutputStream, connOutputStream, connInputStream);
+ if (conn != null) {
+ conn.disconnect();
+ }
+ }
+ }
}
diff --git a/src/main/java/net/geedge/confagent/entity/LokiPushUrlEntity.java b/src/main/java/net/geedge/confagent/entity/LokiPushUrlEntity.java
index 83c6be7..9bd92d1 100644
--- a/src/main/java/net/geedge/confagent/entity/LokiPushUrlEntity.java
+++ b/src/main/java/net/geedge/confagent/entity/LokiPushUrlEntity.java
@@ -4,7 +4,7 @@ import lombok.Data;
@Data
public class LokiPushUrlEntity {
- private String mode;
- private String token;
- private String target;
+ private String mode;
+ private String token;
+ private String target;
}
diff --git a/src/main/java/net/geedge/confagent/thread/LokiContext.java b/src/main/java/net/geedge/confagent/thread/LokiContext.java
new file mode 100644
index 0000000..764a578
--- /dev/null
+++ b/src/main/java/net/geedge/confagent/thread/LokiContext.java
@@ -0,0 +1,74 @@
+package net.geedge.confagent.thread;
+
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+import cn.hutool.log.Log;
+import net.geedge.confagent.entity.LokiPushUrlEntity;
+import net.geedge.confagent.util.Tool;
+
+
+@Configuration
+public class LokiContext {
+ private static Log log = Log.get();
+
+ private static int lokiPushCacheSize;
+
+ /**
+ * loki push thread 注册表
+ */
+ public final static Hashtable<LokiPushUrlEntity,LokiPushThread> LOKI_PUSH_TABLE = new Hashtable<LokiPushUrlEntity,LokiPushThread>();
+ /**
+ * loki push urls
+ */
+ public static List<LokiPushUrlEntity> LOKI_PUSH_URLS = new CopyOnWriteArrayList<LokiPushUrlEntity>();
+
+ /**
+ * 更新推送路径
+ * @param lokiPushUrlList
+ */
+ public static void loadUrls(List<LokiPushUrlEntity> lokiPushUrlList) {
+ /*
+ * 更新 push url
+ */
+ LokiContext.LOKI_PUSH_URLS.clear();
+ LokiContext.LOKI_PUSH_URLS.addAll(lokiPushUrlList);
+ /*
+ * 关闭不再推送线程,开启新增url
+ */
+ Hashtable<LokiPushUrlEntity,LokiPushThread> lokiPushTable = LokiContext.LOKI_PUSH_TABLE;
+ Set<Entry<LokiPushUrlEntity,LokiPushThread>> entrySet = lokiPushTable.entrySet();
+ //关闭最新url中不包含的线程
+ for(Entry<LokiPushUrlEntity,LokiPushThread> en : entrySet) {
+ LokiPushUrlEntity urlEntity = en.getKey();
+ LokiPushThread thread = en.getValue();
+ if(!lokiPushUrlList.contains(urlEntity)) {
+ thread.stop();
+ lokiPushTable.remove(urlEntity);
+ log.info("删除loki push url: {}", urlEntity.getTarget());
+ }
+ }
+ //添加新增的url并启动线程
+ for(LokiPushUrlEntity urlEntity: lokiPushUrlList) {
+ if(!lokiPushTable.containsKey(urlEntity)) {
+ LokiPushThread thread = new LokiPushThread(lokiPushCacheSize,urlEntity);
+ //启动线程
+ Tool.ThreadUtil.execute(thread);
+ lokiPushTable.put(urlEntity, thread);
+ log.info("新增loki push url: {}", urlEntity.getTarget());
+ }
+ }
+ }
+
+ @Value("${confagent.loki.push.cache.size:10000}")
+ public void setLokiPushCacheSize(int size) {
+ LokiContext.lokiPushCacheSize = size;
+ }
+
+}
diff --git a/src/main/java/net/geedge/confagent/thread/LokiPushThread.java b/src/main/java/net/geedge/confagent/thread/LokiPushThread.java
new file mode 100644
index 0000000..cdac7a6
--- /dev/null
+++ b/src/main/java/net/geedge/confagent/thread/LokiPushThread.java
@@ -0,0 +1,117 @@
+package net.geedge.confagent.thread;
+
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+
+import cn.hutool.core.date.StopWatch;
+import cn.hutool.log.Log;
+import net.geedge.confagent.entity.LokiPushUrlEntity;
+import net.geedge.confagent.util.LimitedQueue;
+import net.geedge.confagent.util.Tool;
+
+/**
+ * loki 日志数据推送线程
+ * @author ThinkPad
+ *
+ */
+public class LokiPushThread implements Runnable{
+ private static final Log log = Log.get();
+
+ private final static int LOG_CACHE_DEFAULT_SIZE = 10000;
+ private final LokiPushUrlEntity url;
+ private static final String LOKI_PUSH_URL = "/loki/api/v1/push";
+ /**
+ * 缓存list
+ */
+ private final LimitedQueue<HttpEntity<byte[]>> LOG_CACHE;
+ /**
+ * 运行标志
+ */
+ private boolean runFlag = true;
+
+ private RestTemplate restTemplate = Tool.SpringUtil.getBean(RestTemplate.class);
+
+ public LokiPushThread(LokiPushUrlEntity url) {
+ this(LOG_CACHE_DEFAULT_SIZE,url);
+ }
+
+ public LokiPushThread(int cacheSize,LokiPushUrlEntity url) {
+ this.url = url;
+ LOG_CACHE = new LimitedQueue<HttpEntity<byte[]>>(cacheSize);
+ }
+
+ public void add(HttpEntity<byte[]> entity) {
+ this.LOG_CACHE.add(entity);
+ }
+
+ @Override
+ public void run() {
+ //设置线程名称
+ Thread.currentThread().setName("LokiPushThread-" + Tool.IdUtil.fastShortUUID());
+ log.info("thread start: {}", url.getTarget());
+ String targetPushUrl = LokiPushThread.genPushUrl(url.getTarget());
+ while(runFlag) {
+ try {
+ StopWatch sw = StopWatch.create("test");
+ sw.start();
+ // 1、从队列获取内容
+ HttpEntity<byte[]> entity = LOG_CACHE.poll(500, TimeUnit.MILLISECONDS);
+ if(entity == null) {
+ continue;
+ }
+ //2、设置header token
+ if(Tool.StrUtil.isNotBlank(url.getToken())) {
+ HttpHeaders headers = new HttpHeaders();
+ headers.addAll(entity.getHeaders());
+ headers.set("Authorization", url.getToken());
+ entity = new HttpEntity<byte[]>(entity.getBody(), headers);
+ }
+ //3、发送
+ ResponseEntity<String> responseEntity = restTemplate.exchange(targetPushUrl, HttpMethod.valueOf("POST"), entity, String.class);
+ if(!responseEntity.getStatusCode().is2xxSuccessful()) {
+ //发送失败记录日志
+ log.error("loki push error,target:{},response:{}", url.getTarget(),responseEntity.getBody());
+ }
+ sw.stop();
+ if(log.isDebugEnabled()) {
+ log.debug("sw statistics: {}", Tool.DateUtil.formatBetween(sw.getTotalTimeMillis()));
+ }
+ } catch (Exception e) {
+ log.error("loki push error",e);
+ }
+ }
+ LOG_CACHE.clear();
+ log.info("thread close: {}",url.getTarget());
+ }
+
+ /**
+ * 关闭线程
+ */
+ public void stop() {
+ this.runFlag = false;
+ }
+
+ /**
+ * 将 loki 接收到 log 日志内容 放到 各个缓存中
+ * @param entity
+ */
+ public static void addLogCache(HttpEntity<byte[]> entity) {
+ Set<Entry<LokiPushUrlEntity,LokiPushThread>> entrySet = LokiContext.LOKI_PUSH_TABLE.entrySet();
+ for(Entry<LokiPushUrlEntity,LokiPushThread> en : entrySet) {
+ en.getValue().add(entity);
+ }
+ }
+
+ private static String genPushUrl(String url) {
+ url = url.endsWith("/") ? url.substring(0, url.length()-1) : url;
+ return Tool.StrUtil.concat(true,url, LOKI_PUSH_URL);
+ }
+}
diff --git a/src/main/java/net/geedge/confagent/thread/RequestThread.java b/src/main/java/net/geedge/confagent/thread/RequestThread.java
deleted file mode 100644
index 7dc04e4..0000000
--- a/src/main/java/net/geedge/confagent/thread/RequestThread.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package net.geedge.confagent.thread;
-
-import cn.hutool.core.net.url.UrlBuilder;
-import cn.hutool.core.util.ReflectUtil;
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.http.HttpConnection;
-import cn.hutool.log.Log;
-import lombok.Data;
-import net.geedge.confagent.util.Tool;
-
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.nio.charset.Charset;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-@Data
-public class RequestThread implements Callable<String>{
-
- private final static Log log = Log.get();
- private String targetUrl;
- private String token;
- private String method;
- //private String host;
- //private int port;
- //private String path;
- public HttpServletRequest request;
- private HttpServletResponse response;
-
- @Override
- public String call() throws Exception {
-
- String queryString = ReflectUtil.invoke(request, "getQueryString");
- queryString = StrUtil.isNotBlank(queryString)?queryString:"";
- // String url = UrlBuilder.create().setScheme("http").setHost(host).setPort(port).setPath(UrlPath.of(path, Charset.forName("UTF-8"))).toURL().toString() + "?" + queryString;
- String url = UrlBuilder.ofHttp(targetUrl, Charset.forName("utf-8")).toString();
- log.info("loki url: {}", url);
-// String method = request.getMethod();
- HttpURLConnection conn = null;
- ServletInputStream reqInputStream = null;
-// ServletOutputStream resOutputStream = null;
- OutputStream connOutputStream = null;
- InputStream connInputStream = null;
- try {
- conn = HttpConnection.create(url, null).getHttpURLConnection();
- reqInputStream = request.getInputStream();
-
-// resOutputStream = response.getOutputStream();
- conn.setRequestMethod(method);
-// conn.addRequestProperty("authorization", token);
- // 复制请求头
- Enumeration<String> headerNames = request.getHeaderNames();
- while (headerNames.hasMoreElements()) {
- String hn = headerNames.nextElement();
- if(!"authorization".equalsIgnoreCase(hn)) {
- ReflectUtil.invoke(conn,"addRequestProperty",hn,request.getHeader(hn));
- }
- log.info(hn+"===="+request.getHeader(hn));
- }
- ReflectUtil.invoke(conn,"addRequestProperty","authorization",token);
-
-
- if (!"GET".equalsIgnoreCase(method)) {
- conn.setDoOutput(true);
- conn.setDoInput(true);
- conn.setUseCaches(false);
- connOutputStream = conn.getOutputStream();
- Tool.IoUtil.copy(reqInputStream, connOutputStream);
- }
- conn.connect();
- int responseCode = conn.getResponseCode();
- connInputStream = (responseCode < 400)? conn.getInputStream():conn.getErrorStream();
- String responseMessage = conn.getResponseMessage();
- Map<String, List<String>> responseHeaders = conn.getHeaderFields();
-
- if(!response.isCommitted()) {
- log.info("复制响应头");
- //复制响应头
- for(Map.Entry<String, List<String>> en : responseHeaders.entrySet()) {
- String key = en.getKey();
- if (Tool.StrUtil.isEmpty(key) || "Transfer-Encoding".equals(key)) continue;
- List<String> value = en.getValue();
- ReflectUtil.invoke(response,"addHeader",key,Tool.StrUtil.join("; ",value));
- }
- // ReflectUtil.invoke(response, "setStatus", responseCode, responseMessage);
- response.setStatus(responseCode, responseMessage);
- }
-
- String str = Tool.IoUtil.read(connInputStream, "UTF-8");
- log.info(str);
-
-// Tool.IoUtil.copy(connInputStream, resOutputStream);
-// resOutputStream.flush();//flush 输出流
- return str;
- } catch (Exception e) {
- log.error("request error : ", e);
- try {
-// response.sendError(500, "request error");
- return "request error";
- } catch (Exception e1) {
- log.error("proxy request error",e1);
- }
- }finally {
-// Tool.IoUtil.close(reqInputStream,resOutputStream,connOutputStream,connInputStream);
- Tool.IoUtil.close(reqInputStream,connOutputStream,connInputStream);
- if(conn != null){
- conn.disconnect();
- }
- }
- return "";
- }
-}
diff --git a/src/main/java/net/geedge/confagent/util/LimitedQueue.java b/src/main/java/net/geedge/confagent/util/LimitedQueue.java
new file mode 100644
index 0000000..0bd2c13
--- /dev/null
+++ b/src/main/java/net/geedge/confagent/util/LimitedQueue.java
@@ -0,0 +1,22 @@
+package net.geedge.confagent.util;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class LimitedQueue<E> extends LinkedBlockingQueue<E> {
+ private static final long serialVersionUID = 1878117398478874074L;
+
+ private int limit;
+
+ public LimitedQueue(int limit) {
+ this.limit = limit;
+ }
+
+ @Override
+ public boolean add(E o) {
+ boolean added = super.add(o);
+ while (added && size() > limit) {
+ super.remove();
+ }
+ return added;
+ }
+}