diff options
| author | fangshunjian <[email protected]> | 2021-08-27 11:01:18 +0800 |
|---|---|---|
| committer | fangshunjian <[email protected]> | 2021-08-27 11:01:18 +0800 |
| commit | 27dc7bc7b32c6a17dde2635776f369bec1e3dccc (patch) | |
| tree | 3aae79be8543ef9956f53c0e9c7be514b283c78f /src | |
| parent | e9fd64997b300fb55d64fb89902dd55533d00962 (diff) | |
fix: loki push 代理接口修改
1、loki push 代理接口接收到数据立即返回 204,并存放到有界队列
2、每个推送接口启动一个 线程推送 到 目标loki push 接口
3、新增 httpclient ,开启client 连接池
Diffstat (limited to 'src')
10 files changed, 641 insertions, 415 deletions
diff --git a/src/main/java/net/geedge/confagent/ConfagentApplication.java b/src/main/java/net/geedge/confagent/ConfagentApplication.java index ff8758e..c8869f5 100644 --- a/src/main/java/net/geedge/confagent/ConfagentApplication.java +++ b/src/main/java/net/geedge/confagent/ConfagentApplication.java @@ -1,35 +1,22 @@ package net.geedge.confagent; -import cn.hutool.core.io.FileUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.log.Log; -import net.geedge.confagent.util.ConfagentUtil; -import net.geedge.confagent.util.Tool; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import java.io.File; -import java.io.IOException; -import java.util.Properties; +import cn.hutool.extra.spring.EnableSpringUtil; +import cn.hutool.log.Log; +import net.geedge.confagent.util.Tool; +@EnableSpringUtil @SpringBootApplication public class ConfagentApplication { private final static Log log = Log.get(); - + public static void main(String[] args) { + log.debug("application start,args: {}",Tool.StrUtil.join(",", args)); SpringApplication.run(ConfagentApplication.class, args); - try { - Properties properties = new Properties(); - File configFile = Tool.FileUtil.file(Tool.WebPathUtil.getRootPath(), "config/application.yml"); - properties.load(FileUtil.getInputStream(configFile)); - String tokenPath = (String) properties.get("tokenFile"); - File tokenFile = Tool.FileUtil.file(Tool.WebPathUtil.getRootPath(), StrUtil.emptyToDefault(tokenPath, "config/token.auth")); - String token = Tool.FileUtil.readString(tokenFile, Tool.CharsetUtil.UTF_8); - ConfagentUtil.tokenInMemory = token; - } catch (IOException e) { - log.error(e); - } + } } diff --git a/src/main/java/net/geedge/confagent/config/InitAgent.java b/src/main/java/net/geedge/confagent/config/InitAgent.java new file mode 100644 index 0000000..f1ceb37 --- /dev/null +++ b/src/main/java/net/geedge/confagent/config/InitAgent.java @@ -0,0 +1,50 @@ +package net.geedge.confagent.config; + +import java.io.File; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.stereotype.Component; + +import cn.hutool.log.Log; +import net.geedge.confagent.entity.LokiPushUrlsEntity; +import net.geedge.confagent.thread.LokiContext; +import net.geedge.confagent.util.ConfagentUtil; +import net.geedge.confagent.util.Tool; + +@Component +public class InitAgent implements ApplicationListener<ContextRefreshedEvent> { + private final static Log log = Log.get(); + + @Value("${confagent.tokenFile:config/token.auth}") + private String tokenFilePath; + + @Value("${confagent.lokiPushUrlsFile:config/lokiPushUrls.yml}") + private String lokiPushUrlsPath; + + @Override + public void onApplicationEvent(ContextRefreshedEvent event) { + if (event.getApplicationContext().getParent() == null) { + // 加载 token + try { + File tokenFile = Tool.FileUtil.file(Tool.WebPathUtil.getRootPath(), tokenFilePath); + String token = Tool.FileUtil.readString(tokenFile, Tool.CharsetUtil.UTF_8); + ConfagentUtil.tokenInMemory = token; + } catch (Exception e) { + log.error("load token error", e); + } + // 加载 loki push url + try { + File lokiFile = Tool.FileUtil.file(Tool.WebPathUtil.getRootPath(), lokiPushUrlsPath); + LokiPushUrlsEntity urlsEntity = Tool.YamlUtil.readAsLokiPushUrlsEntity(lokiFile.getAbsolutePath()); + if (urlsEntity != null && urlsEntity.getLokiPushUrls() != null) { + LokiContext.loadUrls(urlsEntity.getLokiPushUrls()); + } + } catch (Exception e) { + log.error("load loki push url error", e); + } + } + + } +}
\ No newline at end of file diff --git a/src/main/java/net/geedge/confagent/config/RestTemplateConfig.java b/src/main/java/net/geedge/confagent/config/RestTemplateConfig.java new file mode 100644 index 0000000..899af73 --- /dev/null +++ b/src/main/java/net/geedge/confagent/config/RestTemplateConfig.java @@ -0,0 +1,122 @@ +package net.geedge.confagent.config; + +import org.apache.http.HeaderElement; +import org.apache.http.HeaderElementIterator; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.ConnectionKeepAliveStrategy; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicHeaderElementIterator; +import org.apache.http.protocol.HTTP; +import org.apache.http.protocol.HttpContext; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.ClientHttpRequestFactory; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.web.client.RestTemplate; + +/** + * Http resttemplate 配置 + **/ +@Configuration +public class RestTemplateConfig { + /** + * # 从连接池获取连接的timeout,不宜过大,ms + */ + @Value("${http-pool.connection-request-timeout:200}") + private int connectionRequestTimeout; + /** + * 指客户端和服务器建立连接的超时时间,ms , 最大约21秒,因为内部tcp在进行三次握手建立连接时,默认tcp超时时间是20秒 + */ + @Value("${http-pool.connection-timeout:1000}") + private int connectionTimeout; + /** + * 指客户端从服务器读取数据包的间隔超时时间,不是总读取时间,也就是socket timeout,ms + */ + @Value("${http-pool.socket-timeout:500}") + private int socketTimeout; + /** + * #每个路由的最大连接数,如果只调用一个地址,可以将其设置为最大连接数 + */ + @Value("${http-pool.max-per-route:10000}") + private int maxPerRoute; + /** + * #连接池的最大连接数,0代表不限;如果取0,需要考虑连接泄露导致系统崩溃的后果 + */ + @Value("${http-pool.max-total:1000}") + private int maxTotal; + + /** + * 长连接保持时间 单位s,不宜过长 + */ + @Value("${http-pool.keep-alive-time:60}") + private int keepAliveTime; + + @Bean + public RestTemplate restTemplate(ClientHttpRequestFactory factory) { + return new RestTemplate(factory); + } + + @Bean + public ClientHttpRequestFactory httpRequestFactory(HttpClient client) { + return new HttpComponentsClientHttpRequestFactory(client); + } + + @Bean + public HttpClient httpClient(ConnectionKeepAliveStrategy keepAliveStrategy) { + Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create() + .register("http", PlainConnectionSocketFactory.getSocketFactory()) + .register("https", SSLConnectionSocketFactory.getSocketFactory()).build(); + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(registry); + // 设置整个连接池最大连接数 根据自己的场景决定 + connectionManager.setMaxTotal(maxTotal); + // 路由是对maxTotal的细分 + connectionManager.setDefaultMaxPerRoute(maxPerRoute); + RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(socketTimeout)// 服务器返回数据(response)的时间,超过该时间抛出readtimeout + .setConnectTimeout(connectionTimeout)// 连接上服务器(握手成功)的时间,超出该时间抛出connect timeout + .setConnectionRequestTimeout(connectionRequestTimeout)// 从连接池中获取连接的超时时间,超过该时间未拿到可用连接,会抛出org.apache.http.conn.ConnectionPoolTimeoutException:Timeout + // waiting for connection from pool + .build(); + return HttpClientBuilder.create() + .setDefaultRequestConfig(requestConfig) + .setConnectionManager(connectionManager) + .setKeepAliveStrategy(keepAliveStrategy) + .build(); + } + + /** + * 配置长连接保持策略 + * + * @return + */ + @Bean + public ConnectionKeepAliveStrategy connectionKeepAliveStrategy() { + return new ConnectionKeepAliveStrategy() { + @Override + public long getKeepAliveDuration(HttpResponse response, HttpContext context) { + // Honor 'keep-alive' header + HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE)); + while (it.hasNext()) { + HeaderElement he = it.nextElement(); + String param = he.getName(); + String value = he.getValue(); + if (value != null && "timeout".equalsIgnoreCase(param)) { + try { + return Long.parseLong(value) * 1000; + } catch (NumberFormatException ignore) { + } + } + } + return keepAliveTime*1000; + } + }; + } +} diff --git a/src/main/java/net/geedge/confagent/controller/ConfigController.java b/src/main/java/net/geedge/confagent/controller/ConfigController.java index 6dde0bc..7b94125 100644 --- a/src/main/java/net/geedge/confagent/controller/ConfigController.java +++ b/src/main/java/net/geedge/confagent/controller/ConfigController.java @@ -1,26 +1,32 @@ package net.geedge.confagent.controller; +import java.io.File; +import java.util.List; +import java.util.Map; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import com.alibaba.fastjson.JSONObject; + import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; -import com.alibaba.fastjson.JSONObject; import net.geedge.confagent.entity.LokiPushUrlEntity; import net.geedge.confagent.entity.LokiPushUrlsEntity; +import net.geedge.confagent.thread.LokiContext; import net.geedge.confagent.util.ConfagentUtil; import net.geedge.confagent.util.R; import net.geedge.confagent.util.RCode; import net.geedge.confagent.util.Tool; import net.geedge.confagent.util.Tool.YamlUtil; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.web.bind.annotation.*; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import java.io.File; -import java.util.*; - @RestController @RequestMapping("/config") public class ConfigController extends BaseController{ @@ -35,6 +41,7 @@ public class ConfigController extends BaseController{ @Value("${confagent.loki.config}") private String lokiConfPath; + @Autowired private ConfagentUtil confagentUtil; @@ -46,19 +53,20 @@ public class ConfigController extends BaseController{ * @Author han * @Date 2021/8/3 */ + @SuppressWarnings("unchecked") @PostMapping public R overwriteConfig(HttpServletRequest request, HttpServletResponse response,@RequestBody LokiPushUrlsEntity lokiPushUrls){ + List<LokiPushUrlEntity> lokiPushUrlList = Tool.ListUtil.list(false); //进行验证:target和token均非空 if(lokiPushUrls!=null && lokiPushUrls.getLokiPushUrls()!=null) { int lokiPort = 13100; Map<String, Object> lokiMap = Tool.YamlUtil.readAsMap(lokiConfPath); - Map<String, Object> serverObj = (Map<String, Object>) lokiMap.get("server"); + Map<String, Object> serverObj = (Map<String, Object>) lokiMap.get("server"); if (serverObj != null) { lokiPort = (Integer) serverObj.get("http_listen_port"); } - - List<LokiPushUrlEntity> lokiPushUrlList = lokiPushUrls.getLokiPushUrls(); + lokiPushUrlList.addAll(lokiPushUrls.getLokiPushUrls()); for(LokiPushUrlEntity urlEntity:lokiPushUrlList) { if(!validLokiPushUrl(urlEntity)) { return R.error(RCode.LOKI_PUSH_URL_ISNULL); @@ -70,10 +78,17 @@ public class ConfigController extends BaseController{ } } } - + /* + *更新 urls ,并重启线程 + */ + LokiContext.loadUrls(lokiPushUrlList); + /* + * 写入到配置文件 + */ File lf = Tool.FileUtil.file(rootPath, lokiFile); YamlUtil.writeAsObject(lokiPushUrls, lf.getAbsolutePath()); LOKI_PUSH_URL_JSONSTR = JSONObject.toJSONString(lokiPushUrls); + log.info("config update success"); return R.ok(); } diff --git a/src/main/java/net/geedge/confagent/controller/LokiController.java b/src/main/java/net/geedge/confagent/controller/LokiController.java index b4ea551..31f41af 100644 --- a/src/main/java/net/geedge/confagent/controller/LokiController.java +++ b/src/main/java/net/geedge/confagent/controller/LokiController.java @@ -1,291 +1,248 @@ package net.geedge.confagent.controller; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.nio.charset.Charset; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import cn.hutool.core.io.IORuntimeException; import cn.hutool.core.net.url.UrlBuilder; import cn.hutool.core.util.ReflectUtil; -import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpConnection; import cn.hutool.log.Log; -import com.alibaba.fastjson.JSONObject; import net.geedge.confagent.annotation.UnCheckToken; -import net.geedge.confagent.entity.LokiPushUrlEntity; -import net.geedge.confagent.entity.LokiPushUrlsEntity; -import net.geedge.confagent.thread.RequestThread; +import net.geedge.confagent.thread.LokiPushThread; import net.geedge.confagent.util.ConfagentUtil; import net.geedge.confagent.util.R; import net.geedge.confagent.util.RCode; import net.geedge.confagent.util.Tool; -import net.geedge.confagent.util.Tool.YamlUtil; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.web.bind.annotation.*; - -import javax.servlet.ServletInputStream; -import javax.servlet.ServletOutputStream; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.nio.charset.Charset; -import java.util.*; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; @RestController @RequestMapping("/loki") -public class LokiController extends BaseController{ - private final static Log log = Log.get(); +public class LokiController extends BaseController { + private final static Log log = Log.get(); - @Autowired + @Autowired private ConfagentUtil confagentUtil; - - @Value("${confagent.loki.config}") - private String lokiConfPath; - - @Value("${confagent.loki.query.auth:true}") - private Boolean queryAuth; - - private final String[] QUERY_API_SUFFIX = {"query","query_range","series","labels","values"}; - @Value("${confagent.loki.startCmd:systemctl start loki.service}") - private String startCmd; - - @Value("${confagent.loki.stopCmd:systemctl stop loki.service}") - private String stopCmd; - - @Value("${confagent.lokiPushUrlsFile:lokiPushUrls.yml}") - private String lokiFile; - - private static String rootPath = Tool.WebPathUtil.getRootPath(); - - /** - * @Description 写入loki配置文件 - * @Author han - * @Date 2021/8/4 - */ - @PostMapping("/config") - public R overwriteConfig( @RequestBody Map<String,Object> configs){ + @Value("${confagent.loki.config}") + private String lokiConfPath; - Map<String,Object> lokiConf =(Map<String,Object>) configs.get("config"); + @Value("${confagent.loki.query.auth:true}") + private Boolean queryAuth; - if(!Tool.MapUtil.isEmpty(lokiConf)){ - log.info("write loki conf:{}", Tool.JSONUtil.toJsonStr(lokiConf)); - Tool.YamlUtil.writeAsMap(lokiConf,lokiConfPath); - if(Tool.StrUtil.isNotBlank(stopCmd)) { - log.info("stop loki:"+stopCmd); - try { - Tool.RuntimeUtil.exec(stopCmd); - }catch(Exception e) { - log.error(e); - return R.error(RCode.STOP_CMD_ERROR); - } - log.info("stop loki:"+stopCmd+" end"); - } - Tool.ThreadUtil.sleep(1000); - if(Tool.StrUtil.isNotBlank(startCmd)) { - log.info("start loki:"+startCmd); - try { -// String[] b={"sh","-c",startCmd}; -// Tool.RuntimeUtil.exec(b); - Tool.RuntimeUtil.exec(startCmd); - }catch(Exception e) { - log.error(e); - return R.error(RCode.START_CMD_ERROR); - } - log.info("start loki:"+startCmd+" end"); - } - }else { - return R.error(RCode.LOKI_CONFIG_ISNULL); - } - - return R.ok(); - } - - /** - * @Description 获取loki相关配置 - * @Author han - * @Date 2021/8/4 - */ - @GetMapping("/config") - public R queryConfig(HttpServletRequest request){ + private final String[] QUERY_API_SUFFIX = { "query", "query_range", "series", "labels", "values" }; - Map<String,Object> lokiConf = Tool.YamlUtil.readAsMap(lokiConfPath); - - Map<String,Object> result = new HashMap<>(); - result.put("config",lokiConf); + @Value("${confagent.loki.startCmd:systemctl start loki.service}") + private String startCmd; - return R.ok(result); - } + @Value("${confagent.loki.stopCmd:systemctl stop loki.service}") + private String stopCmd; - /** - * @Description 代理本loki 接口 - * @Author han - * @Date 2021/8/4 - */ - @RequestMapping("/proxy/**") - @UnCheckToken - public void proxy(HttpServletRequest request, HttpServletResponse response){ - String method = request.getMethod(); - String lokiPath = request.getServletPath().replace("/loki/proxy",""); - String token = request.getHeader("Authorization"); + @Value("${confagent.lokiPushUrlsFile:lokiPushUrls.yml}") + private String lokiFile; - R r = confagentUtil.checkToken(token); + private static String rootPath = Tool.WebPathUtil.getRootPath(); - Boolean isQuery=false; + private static final String LOKI_PUSH_URL = "/loki/api/v1/push"; - //queryAuth 配置只限制查询是否需要校验 - if(queryAuth){ - isQuery = Tool.StrUtil.isNotBlank(Arrays.stream(QUERY_API_SUFFIX).filter(t -> lokiPath.indexOf(t) != -1).findAny().orElse(null)); - } + /** + * @Description 写入loki配置文件 + * @Author han + * @Date 2021/8/4 + */ + @PostMapping("/config") + public R overwriteConfig(@RequestBody Map<String, Object> configs) { - if(isQuery &&r.getCode() != RCode.SUCCESS.getCode()){ - ConfagentUtil.writeResponse(response,r); - return; - } + Map<String, Object> lokiConf = (Map<String, Object>) configs.get("config"); - /*int port = 3100;//ConfagentUtil.getConfFilePort(lokiConfPath,LOKI_LISTEN_ADDR,3100); - Map<String,Object> lokiMap= Tool.YamlUtil.readAsMap(lokiConfPath); - Map<String,Object> serverObj = (Map<String,Object>)lokiMap.get("server"); - if(serverObj!=null) { - port = (Integer)serverObj.get("http_listen_port"); - }*/ + if (!Tool.MapUtil.isEmpty(lokiConf)) { + log.info("write loki conf:{}", Tool.JSONUtil.toJsonStr(lokiConf)); + Tool.YamlUtil.writeAsMap(lokiConf, lokiConfPath); + if (Tool.StrUtil.isNotBlank(stopCmd)) { + log.info("stop loki:" + stopCmd); + try { + Tool.RuntimeUtil.exec(stopCmd); + } catch (Exception e) { + log.error(e); + return R.error(RCode.STOP_CMD_ERROR); + } + log.info("stop loki:" + stopCmd + " end"); + } + Tool.ThreadUtil.sleep(1000); + if (Tool.StrUtil.isNotBlank(startCmd)) { + log.info("start loki:" + startCmd); + try { +// String[] b={"sh","-c",startCmd}; +// Tool.RuntimeUtil.exec(b); + Tool.RuntimeUtil.exec(startCmd); + } catch (Exception e) { + log.error(e); + return R.error(RCode.START_CMD_ERROR); + } + log.info("start loki:" + startCmd + " end"); + } + } else { + return R.error(RCode.LOKI_CONFIG_ISNULL); + } + + return R.ok(); + } + + /** + * @Description 获取loki相关配置 + * @Author han + * @Date 2021/8/4 + */ + @GetMapping("/config") + public R queryConfig(HttpServletRequest request) { + Map<String, Object> lokiConf = Tool.YamlUtil.readAsMap(lokiConfPath); + Map<String, Object> result = new HashMap<>(); + result.put("config", lokiConf); + return R.ok(result); + } + + /** + * @throws IOException + * @throws IORuntimeException + * @Description 代理本loki 接口 + * @Author han + * @Date 2021/8/4 + */ + @RequestMapping("/proxy/**") + @UnCheckToken + public void proxy(HttpServletRequest request, HttpServletResponse response) throws IORuntimeException, IOException { + String lokiPath = request.getServletPath().replace("/loki/proxy", ""); + String token = request.getHeader("Authorization"); + R r = confagentUtil.checkToken(token); + Boolean isQuery = false; + // queryAuth 配置只限制查询是否需要校验 + if (queryAuth) { + isQuery = Tool.ArrayUtil.contains(QUERY_API_SUFFIX, lokiPath); + } + + if (isQuery && r.getCode() != RCode.SUCCESS.getCode()) { + ConfagentUtil.writeResponse(response, r); + return; + } + + /* + * push 接口 特殊处理 + * 1、解析请求头 + * 2、解析请求体 + * 3、重新组装 请求参数 + * 4、放到缓存 + * 5、直接详情成功 + */ + if (Tool.StrUtil.equals(lokiPath, LOKI_PUSH_URL)) { + // 复制请求头 + HttpHeaders headers = new HttpHeaders(); + Enumeration<String> headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String hn = headerNames.nextElement(); + headers.add(hn, request.getHeader(hn)); + } + // 复制 body + byte[] body = Tool.IoUtil.readBytes(request.getInputStream()); + // 重新构造请求参数内容 + HttpEntity<byte[]> httpEntity = new HttpEntity<byte[]>(body, headers); + // 放到缓存队列中,等待其它线程消费发送 + LokiPushThread.addLogCache(httpEntity); + //响应 成功 + response.setStatus(204); + return; + } + + // 其他api 代理查询本机 LOKI api + String queryString = ReflectUtil.invoke(request, "getQueryString"); + String targetUrl = String.format("%s:%s%s?%s", "127.0.0.1", 13100, lokiPath,Tool.StrUtil.emptyToDefault(queryString, "")); + requestLoki(targetUrl, token, request, response); + + } + + public void requestLoki(String targetUrl, String token, HttpServletRequest request, HttpServletResponse response) { + String url = UrlBuilder.ofHttp(targetUrl, Charset.forName("utf-8")).toString(); + + log.info("loki url: {}", url); + String method = request.getMethod(); + HttpURLConnection conn = null; + ServletInputStream reqInputStream = null; + ServletOutputStream resOutputStream = null; + OutputStream connOutputStream = null; + InputStream connInputStream = null; + try { + conn = HttpConnection.create(url, null).getHttpURLConnection(); + reqInputStream = request.getInputStream(); + resOutputStream = response.getOutputStream(); + conn.setRequestMethod(method); + // 复制请求头 + Enumeration<String> headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String hn = headerNames.nextElement(); + if (!"authorization".equalsIgnoreCase(hn)) { + ReflectUtil.invoke(conn, "addRequestProperty", hn, request.getHeader(hn)); + } + } + ReflectUtil.invoke(conn, "addRequestProperty", "authorization", token); + + if (!"GET".equalsIgnoreCase(method)) { + conn.setDoOutput(true); + conn.setDoInput(true); + conn.setUseCaches(false); + connOutputStream = conn.getOutputStream(); + Tool.IoUtil.copy(reqInputStream, connOutputStream); + } + conn.connect(); + int responseCode = conn.getResponseCode(); + connInputStream = (responseCode < 400) ? conn.getInputStream() : conn.getErrorStream(); + String responseMessage = conn.getResponseMessage(); + Map<String, List<String>> responseHeaders = conn.getHeaderFields(); + // 复制响应头 + for (Map.Entry<String, List<String>> en : responseHeaders.entrySet()) { + String key = en.getKey(); + if (Tool.StrUtil.isEmpty(key) || "Transfer-Encoding".equals(key)) + continue; + List<String> value = en.getValue(); + ReflectUtil.invoke(response, "addHeader", key, Tool.StrUtil.join("; ", value)); + } + // ReflectUtil.invoke(response, "setStatus", responseCode, responseMessage); + response.setStatus(responseCode, responseMessage); - LokiPushUrlsEntity lokiPushUrlsEntity; - if (StrUtil.isEmpty(LOKI_PUSH_URL_JSONSTR)) { - File lf = Tool.FileUtil.file(rootPath, lokiFile); - lokiPushUrlsEntity = YamlUtil.readAsLokiPushUrlsEntity(lf.getAbsolutePath()); - } else { - lokiPushUrlsEntity = JSONObject.parseObject(LOKI_PUSH_URL_JSONSTR, LokiPushUrlsEntity.class); - } - if (lokiPushUrlsEntity != null && lokiPushUrlsEntity.getLokiPushUrls() != null - && lokiPushUrlsEntity.getLokiPushUrls().size() > 0) { - List<LokiPushUrlEntity> list = lokiPushUrlsEntity.getLokiPushUrls(); - //loki 代理接口 为/loki/api/v1/push 接口 时,根据lokiPushUrls 配置 转发所有的target,否则随机选择一个target转发即可 - if("/loki/api/v1/push".equals(lokiPath)) { - List<String> rltList = new ArrayList<String>(); - - for(LokiPushUrlEntity urlEntity:list) { - String targetUrl = urlEntity.getTarget(); - if(targetUrl.startsWith("http://")) { - targetUrl = targetUrl.substring(7); - } - RequestThread requestThread = new RequestThread(); - requestThread.setTargetUrl(targetUrl); - // requestThread.setHost(ip); - requestThread.setToken(urlEntity.getToken()); - // requestThread.setPort(port); - requestThread.setMethod(method); - // requestThread.setPath(lokiPath); - requestThread.setRequest(request); - requestThread.setResponse(response); - Future<String> f = Tool.ThreadUtil.execAsync(requestThread);//.execute(requestThread); - try { - String rlt = f.get(5, TimeUnit.SECONDS); - rltList.add(rlt); - } catch (Exception e) { - log.error(e); - } - } - if(rltList.size()>0) { - OutputStream outputStream = null; - try { - log.info("loki 返回内容:" + JSONObject.toJSONString(rltList)); - outputStream = response.getOutputStream(); - Tool.IoUtil.writeUtf8(outputStream,true, rltList.get(0)); - outputStream.flush(); - - } catch (Exception e) { - log.error(e); - } finally { - Tool.IoUtil.close(outputStream); - } - } - } else { - // 其他api 代理查询本机 LOKI api - String queryString = ReflectUtil.invoke(request, "getQueryString"); - String targetUrl = String.format("%s:%s%s?%s", "127.0.0.1", 13100, lokiPath, StrUtil.emptyToDefault(queryString, "")); - requestLoki(targetUrl, token, request, response); - } - }else { - OutputStream outputStream = null; + Tool.IoUtil.copy(connInputStream, resOutputStream); + resOutputStream.flush();// flush 输出流 + } catch (Exception e) { try { - outputStream = response.getOutputStream(); - Tool.IoUtil.writeUtf8(outputStream,true, "The lokiPushUrls file does not exist"); - outputStream.flush(); - - } catch (Exception e) { - log.error(e); - } finally { - Tool.IoUtil.close(outputStream); + response.sendError(500, "request error"); + } catch (IOException e1) { + log.error("proxy request error", e1); } - } - - } - - public void requestLoki(String targetUrl, String token, HttpServletRequest request, HttpServletResponse response) { - String url = UrlBuilder.ofHttp(targetUrl, Charset.forName("utf-8")).toString(); - - log.info("loki url: {}", url); - String method = request.getMethod(); - HttpURLConnection conn = null; - ServletInputStream reqInputStream = null; - ServletOutputStream resOutputStream = null; - OutputStream connOutputStream = null; - InputStream connInputStream = null; - try { - conn = HttpConnection.create(url, null).getHttpURLConnection(); - reqInputStream = request.getInputStream(); - resOutputStream = response.getOutputStream(); - conn.setRequestMethod(method); - // 复制请求头 - Enumeration<String> headerNames = request.getHeaderNames(); - while (headerNames.hasMoreElements()) { - String hn = headerNames.nextElement(); - if(!"authorization".equalsIgnoreCase(hn)) { - ReflectUtil.invoke(conn,"addRequestProperty",hn,request.getHeader(hn)); - } - } - ReflectUtil.invoke(conn,"addRequestProperty","authorization",token); - - if (!"GET".equalsIgnoreCase(method)) { - conn.setDoOutput(true); - conn.setDoInput(true); - conn.setUseCaches(false); - connOutputStream = conn.getOutputStream(); - Tool.IoUtil.copy(reqInputStream, connOutputStream); - } - conn.connect(); - int responseCode = conn.getResponseCode(); - connInputStream = (responseCode < 400)? conn.getInputStream():conn.getErrorStream(); - String responseMessage = conn.getResponseMessage(); - Map<String, List<String>> responseHeaders = conn.getHeaderFields(); - //复制响应头 - for(Map.Entry<String, List<String>> en : responseHeaders.entrySet()) { - String key = en.getKey(); - if (Tool.StrUtil.isEmpty(key) || "Transfer-Encoding".equals(key)) continue; - List<String> value = en.getValue(); - ReflectUtil.invoke(response,"addHeader",key,Tool.StrUtil.join("; ",value)); - } - // ReflectUtil.invoke(response, "setStatus", responseCode, responseMessage); - response.setStatus(responseCode, responseMessage); - - Tool.IoUtil.copy(connInputStream, resOutputStream); - resOutputStream.flush();//flush 输出流 - } catch (Exception e) { - try { - response.sendError(500, "request error"); - } catch (IOException e1) { - log.error("proxy request error",e1); - } - log.error("request error : ",e); - }finally { - Tool.IoUtil.close(reqInputStream,resOutputStream,connOutputStream,connInputStream); - if(conn != null){ - conn.disconnect(); - } - } - } + log.error("request error : ", e); + } finally { + Tool.IoUtil.close(reqInputStream, resOutputStream, connOutputStream, connInputStream); + if (conn != null) { + conn.disconnect(); + } + } + } } diff --git a/src/main/java/net/geedge/confagent/entity/LokiPushUrlEntity.java b/src/main/java/net/geedge/confagent/entity/LokiPushUrlEntity.java index 83c6be7..9bd92d1 100644 --- a/src/main/java/net/geedge/confagent/entity/LokiPushUrlEntity.java +++ b/src/main/java/net/geedge/confagent/entity/LokiPushUrlEntity.java @@ -4,7 +4,7 @@ import lombok.Data; @Data public class LokiPushUrlEntity { - private String mode; - private String token; - private String target; + private String mode; + private String token; + private String target; } diff --git a/src/main/java/net/geedge/confagent/thread/LokiContext.java b/src/main/java/net/geedge/confagent/thread/LokiContext.java new file mode 100644 index 0000000..764a578 --- /dev/null +++ b/src/main/java/net/geedge/confagent/thread/LokiContext.java @@ -0,0 +1,74 @@ +package net.geedge.confagent.thread; + +import java.util.Hashtable; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +import cn.hutool.log.Log; +import net.geedge.confagent.entity.LokiPushUrlEntity; +import net.geedge.confagent.util.Tool; + + +@Configuration +public class LokiContext { + private static Log log = Log.get(); + + private static int lokiPushCacheSize; + + /** + * loki push thread 注册表 + */ + public final static Hashtable<LokiPushUrlEntity,LokiPushThread> LOKI_PUSH_TABLE = new Hashtable<LokiPushUrlEntity,LokiPushThread>(); + /** + * loki push urls + */ + public static List<LokiPushUrlEntity> LOKI_PUSH_URLS = new CopyOnWriteArrayList<LokiPushUrlEntity>(); + + /** + * 更新推送路径 + * @param lokiPushUrlList + */ + public static void loadUrls(List<LokiPushUrlEntity> lokiPushUrlList) { + /* + * 更新 push url + */ + LokiContext.LOKI_PUSH_URLS.clear(); + LokiContext.LOKI_PUSH_URLS.addAll(lokiPushUrlList); + /* + * 关闭不再推送线程,开启新增url + */ + Hashtable<LokiPushUrlEntity,LokiPushThread> lokiPushTable = LokiContext.LOKI_PUSH_TABLE; + Set<Entry<LokiPushUrlEntity,LokiPushThread>> entrySet = lokiPushTable.entrySet(); + //关闭最新url中不包含的线程 + for(Entry<LokiPushUrlEntity,LokiPushThread> en : entrySet) { + LokiPushUrlEntity urlEntity = en.getKey(); + LokiPushThread thread = en.getValue(); + if(!lokiPushUrlList.contains(urlEntity)) { + thread.stop(); + lokiPushTable.remove(urlEntity); + log.info("删除loki push url: {}", urlEntity.getTarget()); + } + } + //添加新增的url并启动线程 + for(LokiPushUrlEntity urlEntity: lokiPushUrlList) { + if(!lokiPushTable.containsKey(urlEntity)) { + LokiPushThread thread = new LokiPushThread(lokiPushCacheSize,urlEntity); + //启动线程 + Tool.ThreadUtil.execute(thread); + lokiPushTable.put(urlEntity, thread); + log.info("新增loki push url: {}", urlEntity.getTarget()); + } + } + } + + @Value("${confagent.loki.push.cache.size:10000}") + public void setLokiPushCacheSize(int size) { + LokiContext.lokiPushCacheSize = size; + } + +} diff --git a/src/main/java/net/geedge/confagent/thread/LokiPushThread.java b/src/main/java/net/geedge/confagent/thread/LokiPushThread.java new file mode 100644 index 0000000..cdac7a6 --- /dev/null +++ b/src/main/java/net/geedge/confagent/thread/LokiPushThread.java @@ -0,0 +1,117 @@ +package net.geedge.confagent.thread; + +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; + +import cn.hutool.core.date.StopWatch; +import cn.hutool.log.Log; +import net.geedge.confagent.entity.LokiPushUrlEntity; +import net.geedge.confagent.util.LimitedQueue; +import net.geedge.confagent.util.Tool; + +/** + * loki 日志数据推送线程 + * @author ThinkPad + * + */ +public class LokiPushThread implements Runnable{ + private static final Log log = Log.get(); + + private final static int LOG_CACHE_DEFAULT_SIZE = 10000; + private final LokiPushUrlEntity url; + private static final String LOKI_PUSH_URL = "/loki/api/v1/push"; + /** + * 缓存list + */ + private final LimitedQueue<HttpEntity<byte[]>> LOG_CACHE; + /** + * 运行标志 + */ + private boolean runFlag = true; + + private RestTemplate restTemplate = Tool.SpringUtil.getBean(RestTemplate.class); + + public LokiPushThread(LokiPushUrlEntity url) { + this(LOG_CACHE_DEFAULT_SIZE,url); + } + + public LokiPushThread(int cacheSize,LokiPushUrlEntity url) { + this.url = url; + LOG_CACHE = new LimitedQueue<HttpEntity<byte[]>>(cacheSize); + } + + public void add(HttpEntity<byte[]> entity) { + this.LOG_CACHE.add(entity); + } + + @Override + public void run() { + //设置线程名称 + Thread.currentThread().setName("LokiPushThread-" + Tool.IdUtil.fastShortUUID()); + log.info("thread start: {}", url.getTarget()); + String targetPushUrl = LokiPushThread.genPushUrl(url.getTarget()); + while(runFlag) { + try { + StopWatch sw = StopWatch.create("test"); + sw.start(); + // 1、从队列获取内容 + HttpEntity<byte[]> entity = LOG_CACHE.poll(500, TimeUnit.MILLISECONDS); + if(entity == null) { + continue; + } + //2、设置header token + if(Tool.StrUtil.isNotBlank(url.getToken())) { + HttpHeaders headers = new HttpHeaders(); + headers.addAll(entity.getHeaders()); + headers.set("Authorization", url.getToken()); + entity = new HttpEntity<byte[]>(entity.getBody(), headers); + } + //3、发送 + ResponseEntity<String> responseEntity = restTemplate.exchange(targetPushUrl, HttpMethod.valueOf("POST"), entity, String.class); + if(!responseEntity.getStatusCode().is2xxSuccessful()) { + //发送失败记录日志 + log.error("loki push error,target:{},response:{}", url.getTarget(),responseEntity.getBody()); + } + sw.stop(); + if(log.isDebugEnabled()) { + log.debug("sw statistics: {}", Tool.DateUtil.formatBetween(sw.getTotalTimeMillis())); + } + } catch (Exception e) { + log.error("loki push error",e); + } + } + LOG_CACHE.clear(); + log.info("thread close: {}",url.getTarget()); + } + + /** + * 关闭线程 + */ + public void stop() { + this.runFlag = false; + } + + /** + * 将 loki 接收到 log 日志内容 放到 各个缓存中 + * @param entity + */ + public static void addLogCache(HttpEntity<byte[]> entity) { + Set<Entry<LokiPushUrlEntity,LokiPushThread>> entrySet = LokiContext.LOKI_PUSH_TABLE.entrySet(); + for(Entry<LokiPushUrlEntity,LokiPushThread> en : entrySet) { + en.getValue().add(entity); + } + } + + private static String genPushUrl(String url) { + url = url.endsWith("/") ? url.substring(0, url.length()-1) : url; + return Tool.StrUtil.concat(true,url, LOKI_PUSH_URL); + } +} diff --git a/src/main/java/net/geedge/confagent/thread/RequestThread.java b/src/main/java/net/geedge/confagent/thread/RequestThread.java deleted file mode 100644 index 7dc04e4..0000000 --- a/src/main/java/net/geedge/confagent/thread/RequestThread.java +++ /dev/null @@ -1,118 +0,0 @@ -package net.geedge.confagent.thread;
-
-import cn.hutool.core.net.url.UrlBuilder;
-import cn.hutool.core.util.ReflectUtil;
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.http.HttpConnection;
-import cn.hutool.log.Log;
-import lombok.Data;
-import net.geedge.confagent.util.Tool;
-
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.nio.charset.Charset;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-@Data
-public class RequestThread implements Callable<String>{
-
- private final static Log log = Log.get();
- private String targetUrl;
- private String token;
- private String method;
- //private String host;
- //private int port;
- //private String path;
- public HttpServletRequest request;
- private HttpServletResponse response;
-
- @Override
- public String call() throws Exception {
-
- String queryString = ReflectUtil.invoke(request, "getQueryString");
- queryString = StrUtil.isNotBlank(queryString)?queryString:"";
- // String url = UrlBuilder.create().setScheme("http").setHost(host).setPort(port).setPath(UrlPath.of(path, Charset.forName("UTF-8"))).toURL().toString() + "?" + queryString;
- String url = UrlBuilder.ofHttp(targetUrl, Charset.forName("utf-8")).toString();
- log.info("loki url: {}", url);
-// String method = request.getMethod();
- HttpURLConnection conn = null;
- ServletInputStream reqInputStream = null;
-// ServletOutputStream resOutputStream = null;
- OutputStream connOutputStream = null;
- InputStream connInputStream = null;
- try {
- conn = HttpConnection.create(url, null).getHttpURLConnection();
- reqInputStream = request.getInputStream();
-
-// resOutputStream = response.getOutputStream();
- conn.setRequestMethod(method);
-// conn.addRequestProperty("authorization", token);
- // 复制请求头
- Enumeration<String> headerNames = request.getHeaderNames();
- while (headerNames.hasMoreElements()) {
- String hn = headerNames.nextElement();
- if(!"authorization".equalsIgnoreCase(hn)) {
- ReflectUtil.invoke(conn,"addRequestProperty",hn,request.getHeader(hn));
- }
- log.info(hn+"===="+request.getHeader(hn));
- }
- ReflectUtil.invoke(conn,"addRequestProperty","authorization",token);
-
-
- if (!"GET".equalsIgnoreCase(method)) {
- conn.setDoOutput(true);
- conn.setDoInput(true);
- conn.setUseCaches(false);
- connOutputStream = conn.getOutputStream();
- Tool.IoUtil.copy(reqInputStream, connOutputStream);
- }
- conn.connect();
- int responseCode = conn.getResponseCode();
- connInputStream = (responseCode < 400)? conn.getInputStream():conn.getErrorStream();
- String responseMessage = conn.getResponseMessage();
- Map<String, List<String>> responseHeaders = conn.getHeaderFields();
-
- if(!response.isCommitted()) {
- log.info("复制响应头");
- //复制响应头
- for(Map.Entry<String, List<String>> en : responseHeaders.entrySet()) {
- String key = en.getKey();
- if (Tool.StrUtil.isEmpty(key) || "Transfer-Encoding".equals(key)) continue;
- List<String> value = en.getValue();
- ReflectUtil.invoke(response,"addHeader",key,Tool.StrUtil.join("; ",value));
- }
- // ReflectUtil.invoke(response, "setStatus", responseCode, responseMessage);
- response.setStatus(responseCode, responseMessage);
- }
-
- String str = Tool.IoUtil.read(connInputStream, "UTF-8");
- log.info(str);
-
-// Tool.IoUtil.copy(connInputStream, resOutputStream);
-// resOutputStream.flush();//flush 输出流
- return str;
- } catch (Exception e) {
- log.error("request error : ", e);
- try {
-// response.sendError(500, "request error");
- return "request error";
- } catch (Exception e1) {
- log.error("proxy request error",e1);
- }
- }finally {
-// Tool.IoUtil.close(reqInputStream,resOutputStream,connOutputStream,connInputStream);
- Tool.IoUtil.close(reqInputStream,connOutputStream,connInputStream);
- if(conn != null){
- conn.disconnect();
- }
- }
- return "";
- }
-}
diff --git a/src/main/java/net/geedge/confagent/util/LimitedQueue.java b/src/main/java/net/geedge/confagent/util/LimitedQueue.java new file mode 100644 index 0000000..0bd2c13 --- /dev/null +++ b/src/main/java/net/geedge/confagent/util/LimitedQueue.java @@ -0,0 +1,22 @@ +package net.geedge.confagent.util; + +import java.util.concurrent.LinkedBlockingQueue; + +public class LimitedQueue<E> extends LinkedBlockingQueue<E> { + private static final long serialVersionUID = 1878117398478874074L; + + private int limit; + + public LimitedQueue(int limit) { + this.limit = limit; + } + + @Override + public boolean add(E o) { + boolean added = super.add(o); + while (added && size() > limit) { + super.remove(); + } + return added; + } +} |
