summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyx <[email protected]>2021-08-05 21:14:55 +0800
committerhyx <[email protected]>2021-08-05 21:14:55 +0800
commit76bed25d3fb9ef0f2f004ae0e1efd8564197c58c (patch)
tree0990947938539b11d5fd15e2ba57df98e6cf3b26
parent8bbe4f87321d8424d79c5da831611aaf6a689e7b (diff)
NEZ-890 agent 组件增加 loki配置下发接口,代理接口
-rw-r--r--src/main/java/net/geedge/confagent/controller/LokiController.java280
-rw-r--r--src/main/java/net/geedge/confagent/thread/RequestThread.java119
-rw-r--r--src/main/java/net/geedge/confagent/util/YamlUtil.java17
-rw-r--r--src/main/resources/application-dev.yml5
-rw-r--r--src/main/resources/application-prod.yml5
-rw-r--r--src/main/resources/application.yml3
-rw-r--r--src/main/resources/config/lokiPushUrls.yml8
7 files changed, 428 insertions, 9 deletions
diff --git a/src/main/java/net/geedge/confagent/controller/LokiController.java b/src/main/java/net/geedge/confagent/controller/LokiController.java
new file mode 100644
index 0000000..e981e1e
--- /dev/null
+++ b/src/main/java/net/geedge/confagent/controller/LokiController.java
@@ -0,0 +1,280 @@
+package net.geedge.confagent.controller;
+
+import cn.hutool.core.net.url.UrlBuilder;
+import cn.hutool.core.net.url.UrlPath;
+import cn.hutool.core.util.ReflectUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.http.HttpConnection;
+import cn.hutool.log.Log;
+import net.geedge.confagent.annotation.UnCheckToken;
+import net.geedge.confagent.entity.LokiPushUrlEntity;
+import net.geedge.confagent.entity.LokiPushUrlsEntity;
+import net.geedge.confagent.thread.RequestThread;
+import net.geedge.confagent.util.ConfagentUtil;
+import net.geedge.confagent.util.R;
+import net.geedge.confagent.util.RCode;
+import net.geedge.confagent.util.Tool;
+import net.geedge.confagent.util.Tool.YamlUtil;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.*;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+@RestController
+@RequestMapping("/loki")
+public class LokiController extends BaseController{
+ private final static Log log = Log.get();
+
+ @Autowired
+ private ConfagentUtil confagentUtil;
+
+ @Value("${confagent.loki.config}")
+ private String lokiConfPath;
+
+ @Value("${confagent.loki.query.auth:true}")
+ private Boolean queryAuth;
+
+ private final String[] QUERY_API_SUFFIX = {"query","query_range","series","labels","values"};
+
+ @Value("${confagent.loki.startCmd:systemctl start loki.service}")
+ private String startCmd;
+
+ @Value("${confagent.loki.stopCmd:systemctl stop loki.service}")
+ private String stopCmd;
+
+ @Value("${confagent.lokiPushUrlsFile:lokiPushUrls.yml}")
+ private String lokiFile;
+
+ private static String rootPath = Tool.WebPathUtil.getRootPath();
+
+ /**
+ * @Description 写入loki配置文件
+ * @Author han
+ * @Date 2021/8/4
+ */
+ @PostMapping("/config")
+ public R overwriteConfig( @RequestBody Map<String,Object> configs){
+
+ Map<String,Object> lokiConf =(Map<String,Object>) configs.get("config");
+
+ if(!Tool.MapUtil.isEmpty(lokiConf)){
+ log.info("write loki conf:{}", Tool.JSONUtil.toJsonStr(lokiConf));
+ Tool.YamlUtil.writeAsMap(lokiConf,lokiConfPath);
+ if(Tool.StrUtil.isNotBlank(stopCmd)) {
+ log.info("stop loki:"+stopCmd);
+ Tool.RuntimeUtil.exec(stopCmd);
+ log.info("stop loki:"+stopCmd+" end");
+ }
+ Tool.ThreadUtil.sleep(1000);
+ if(Tool.StrUtil.isNotBlank(startCmd)) {
+ log.info("start loki:"+startCmd);
+// String[] b={"sh","-c",startCmd};
+// Tool.RuntimeUtil.exec(b);
+ Tool.RuntimeUtil.exec(startCmd);
+ log.info("start loki:"+startCmd+" end");
+ }
+ }
+
+ return R.ok();
+ }
+
+ /**
+ * @Description 获取loki相关配置
+ * @Author han
+ * @Date 2021/8/4
+ */
+ @GetMapping("/config")
+ public R queryConfig(HttpServletRequest request){
+
+ Map<String,Object> prometheusConf = Tool.YamlUtil.readAsMap(lokiConfPath);
+
+ Map<String,Object> result = new HashMap<>();
+ result.put("config",prometheusConf);
+
+ return R.ok(result);
+ }
+
+ /**
+ * @Description 代理本loki 接口
+ * @Author han
+ * @Date 2021/8/4
+ */
+ @RequestMapping("/proxy/**")
+ @UnCheckToken
+ public void proxy(HttpServletRequest request, HttpServletResponse response){
+ String method = request.getMethod();
+ String lokiPath = request.getServletPath().replace("/loki/proxy","");
+ String token = request.getHeader("Authorization");
+
+ R r = confagentUtil.checkToken(token);
+
+ Boolean isQuery=false;
+
+ //queryAuth 配置只限制查询是否需要校验
+ if(queryAuth){
+ isQuery = Tool.StrUtil.isNotBlank(Arrays.stream(QUERY_API_SUFFIX).filter(t -> lokiPath.indexOf(t) != -1).findAny().orElse(null));
+ }
+
+ if(isQuery &&r.getCode() != RCode.SUCCESS.getCode()){
+ ConfagentUtil.writeResponse(response,r);
+ return;
+ }
+
+ int port = 3100;//ConfagentUtil.getConfFilePort(lokiConfPath,LOKI_LISTEN_ADDR,3100);
+ Map<String,Object> lokiMap= Tool.YamlUtil.readAsMap(lokiConfPath);
+ Map<String,Object> serverObj = (Map<String,Object>)lokiMap.get("server");
+ if(serverObj!=null) {
+ port = (Integer)serverObj.get("http_listen_port");
+ }
+
+ File lf = Tool.FileUtil.file(rootPath, lokiFile);
+ LokiPushUrlsEntity lokiPushUrlsEntity = YamlUtil.readAsLokiPushUrlsEntity(lf.getAbsolutePath());
+ if(lokiPushUrlsEntity!=null && lokiPushUrlsEntity.getLokiPushUrls()!=null
+ && lokiPushUrlsEntity.getLokiPushUrls().size()>0) {
+ List<LokiPushUrlEntity> list = lokiPushUrlsEntity.getLokiPushUrls();
+ //loki 代理接口 为/loki/api/v1/push 接口 时,根据lokiPushUrls 配置 转发所有的target,否则随机选择一个target转发即可
+ if("/loki/api/v1/push".equals(lokiPath)) {
+ List<String> rltList = new ArrayList<String>();
+
+ for(LokiPushUrlEntity urlEntity:list) {
+ String ip = urlEntity.getTarget();
+ if(ip.startsWith("http://")) {
+ ip = ip.substring(7);
+ }
+ RequestThread requestThread = new RequestThread();
+ requestThread.setHost(ip);
+ requestThread.setToken(urlEntity.getToken());
+ requestThread.setPort(port);
+ requestThread.setMethod(method);
+ requestThread.setPath(lokiPath);
+ requestThread.setRequest(request);
+ requestThread.setResponse(response);
+ Future<String> f = Tool.ThreadUtil.execAsync(requestThread);//.execute(requestThread);
+ try {
+ String rlt = f.get(5, TimeUnit.SECONDS);
+ rltList.add(rlt);
+ } catch (Exception e) {
+ log.error(e);
+ }
+ }
+ if(rltList.size()>0) {
+ OutputStream outputStream = null;
+ try {
+ log.info("loki 返回内容:"+rltList.toString());
+ outputStream = response.getOutputStream();
+ Tool.IoUtil.writeUtf8(outputStream,true, rltList.get(0));
+ outputStream.flush();
+
+ } catch (Exception e) {
+ log.error(e);
+ } finally {
+ Tool.IoUtil.close(outputStream);
+ }
+ }
+ }else {
+ LokiPushUrlEntity urlEntity = list.get(0);
+ String ip = urlEntity.getTarget();
+ if(ip.startsWith("http://")) {
+ ip = ip.substring(7);
+ }
+ requestProm(ip,token,port,lokiPath,request,response);
+ }
+ }else {
+ OutputStream outputStream = null;
+ try {
+ outputStream = response.getOutputStream();
+ Tool.IoUtil.writeUtf8(outputStream,true, "The lokiPushUrls file does not exist");
+ outputStream.flush();
+
+ } catch (Exception e) {
+ log.error(e);
+ } finally {
+ Tool.IoUtil.close(outputStream);
+ }
+ }
+
+ }
+
+ public void requestProm(String host, String token,int port, String path, HttpServletRequest request, HttpServletResponse response) {
+ String queryString = ReflectUtil.invoke(request, "getQueryString");
+ queryString = StrUtil.isNotBlank(queryString)?queryString:"";
+ String url = UrlBuilder.create().setScheme("http").setHost(host).setPort(port).setPath(UrlPath.of(path, Charset.forName("UTF-8"))).toURL().toString() + "?" + queryString;
+
+ log.info("loki url: {}", url);
+ String method = request.getMethod();
+ HttpURLConnection conn = null;
+ ServletInputStream reqInputStream = null;
+ ServletOutputStream resOutputStream = null;
+ OutputStream connOutputStream = null;
+ InputStream connInputStream = null;
+ try {
+ conn = HttpConnection.create(url, null).getHttpURLConnection();
+ reqInputStream = request.getInputStream();
+ resOutputStream = response.getOutputStream();
+ conn.setRequestMethod(method);
+ // 复制请求头
+ Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String hn = headerNames.nextElement();
+ if(!"authorization".equalsIgnoreCase(hn)) {
+ ReflectUtil.invoke(conn,"addRequestProperty",hn,request.getHeader(hn));
+ }
+ }
+ ReflectUtil.invoke(conn,"addRequestProperty","authorization",token);
+
+ if (!"GET".equalsIgnoreCase(method)) {
+ conn.setDoOutput(true);
+ conn.setDoInput(true);
+ conn.setUseCaches(false);
+ connOutputStream = conn.getOutputStream();
+ Tool.IoUtil.copy(reqInputStream, connOutputStream);
+ }
+ conn.connect();
+ int responseCode = conn.getResponseCode();
+ connInputStream = (responseCode < 400)? conn.getInputStream():conn.getErrorStream();
+ String responseMessage = conn.getResponseMessage();
+ Map<String, List<String>> responseHeaders = conn.getHeaderFields();
+ //复制响应头
+ for(Map.Entry<String, List<String>> en : responseHeaders.entrySet()) {
+ String key = en.getKey();
+ if (Tool.StrUtil.isEmpty(key) || "Transfer-Encoding".equals(key)) continue;
+ List<String> value = en.getValue();
+ ReflectUtil.invoke(response,"addHeader",key,Tool.StrUtil.join("; ",value));
+ }
+ ReflectUtil.invoke(response,"addHeader","Authorization",token);
+ // response.setStatus(responseCode, responseMessage);
+ ReflectUtil.invoke(response, "setStatus", responseCode, responseMessage);
+
+ Tool.IoUtil.copy(connInputStream, resOutputStream);
+ resOutputStream.flush();//flush 输出流
+ } catch (Exception e) {
+ try {
+ response.sendError(500, "request error");
+ } catch (IOException e1) {
+ log.error("proxy request error",e1);
+ }
+ log.error("request error : ",e);
+ }finally {
+ Tool.IoUtil.close(reqInputStream,resOutputStream,connOutputStream,connInputStream);
+ if(conn != null){
+ conn.disconnect();
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/net/geedge/confagent/thread/RequestThread.java b/src/main/java/net/geedge/confagent/thread/RequestThread.java
new file mode 100644
index 0000000..eec6a48
--- /dev/null
+++ b/src/main/java/net/geedge/confagent/thread/RequestThread.java
@@ -0,0 +1,119 @@
+package net.geedge.confagent.thread;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.nio.charset.Charset;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import cn.hutool.core.net.url.UrlBuilder;
+import cn.hutool.core.net.url.UrlPath;
+import cn.hutool.core.util.ReflectUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.http.HttpConnection;
+import cn.hutool.log.Log;
+import lombok.Data;
+import net.geedge.confagent.util.Tool;
+
+@Data
+public class RequestThread implements Callable<String>{
+
+ private final static Log log = Log.get();
+ private String host;
+ private String token;
+ private String method;
+ private int port;
+ private String path;
+ public HttpServletRequest request;
+ private HttpServletResponse response;
+
+ @Override
+ public String call() throws Exception {
+
+ String queryString = ReflectUtil.invoke(request, "getQueryString");
+ queryString = StrUtil.isNotBlank(queryString)?queryString:"";
+ String url = UrlBuilder.create().setScheme("http").setHost(host).setPort(port).setPath(UrlPath.of(path, Charset.forName("UTF-8"))).toURL().toString() + "?" + queryString;
+
+ log.info("loki url: {}", url);
+// String method = request.getMethod();
+ HttpURLConnection conn = null;
+ ServletInputStream reqInputStream = null;
+// ServletOutputStream resOutputStream = null;
+ OutputStream connOutputStream = null;
+ InputStream connInputStream = null;
+ try {
+ conn = HttpConnection.create(url, null).getHttpURLConnection();
+ reqInputStream = request.getInputStream();
+
+// resOutputStream = response.getOutputStream();
+ conn.setRequestMethod(method);
+// conn.addRequestProperty("authorization", token);
+ // 复制请求头
+ Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String hn = headerNames.nextElement();
+ if(!"authorization".equalsIgnoreCase(hn)) {
+ ReflectUtil.invoke(conn,"addRequestProperty",hn,request.getHeader(hn));
+ }
+ log.info(hn+"===="+request.getHeader(hn));
+ }
+ ReflectUtil.invoke(conn,"addRequestProperty","authorization",token);
+
+
+ if (!"GET".equalsIgnoreCase(method)) {
+ conn.setDoOutput(true);
+ conn.setDoInput(true);
+ conn.setUseCaches(false);
+ connOutputStream = conn.getOutputStream();
+ Tool.IoUtil.copy(reqInputStream, connOutputStream);
+ }
+ conn.connect();
+ int responseCode = conn.getResponseCode();
+ connInputStream = (responseCode < 400)? conn.getInputStream():conn.getErrorStream();
+ String responseMessage = conn.getResponseMessage();
+ Map<String, List<String>> responseHeaders = conn.getHeaderFields();
+
+ if(!response.isCommitted()) {
+ log.info("复制响应头");
+ //复制响应头
+ for(Map.Entry<String, List<String>> en : responseHeaders.entrySet()) {
+ String key = en.getKey();
+ if (Tool.StrUtil.isEmpty(key) || "Transfer-Encoding".equals(key)) continue;
+ List<String> value = en.getValue();
+ ReflectUtil.invoke(response,"addHeader",key,Tool.StrUtil.join("; ",value));
+ }
+ ReflectUtil.invoke(response, "setStatus", responseCode, responseMessage);
+ }
+
+ String str = Tool.IoUtil.read(connInputStream, "UTF-8");
+ log.info(str);
+
+// Tool.IoUtil.copy(connInputStream, resOutputStream);
+// resOutputStream.flush();//flush 输出流
+ return str;
+ } catch (Exception e) {
+ try {
+// response.sendError(500, "request error");
+ return "request error";
+ } catch (Exception e1) {
+ log.error("proxy request error",e1);
+ }
+ log.error("request error : ",e);
+ }finally {
+// Tool.IoUtil.close(reqInputStream,resOutputStream,connOutputStream,connInputStream);
+ Tool.IoUtil.close(reqInputStream,connOutputStream,connInputStream);
+ if(conn != null){
+ conn.disconnect();
+ }
+ }
+ return "";
+ }
+} \ No newline at end of file
diff --git a/src/main/java/net/geedge/confagent/util/YamlUtil.java b/src/main/java/net/geedge/confagent/util/YamlUtil.java
index 8fd2415..7849cd1 100644
--- a/src/main/java/net/geedge/confagent/util/YamlUtil.java
+++ b/src/main/java/net/geedge/confagent/util/YamlUtil.java
@@ -1,6 +1,7 @@
package net.geedge.confagent.util;
import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
import net.geedge.confagent.entity.LokiPushUrlsEntity;
import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
@@ -15,7 +16,8 @@ import java.util.Map;
import java.util.Properties;
public class YamlUtil {
-
+ private final static Log log = Log.get();
+
public static void writeAsMap(Map map,String path){
Map writeObj = map;
if(writeObj == null)
@@ -53,10 +55,15 @@ public class YamlUtil {
* @Date 2021/3/24
*/
public static LokiPushUrlsEntity readAsLokiPushUrlsEntity(String path){
- File file = Tool.FileUtil.file(path);
- Yaml yamlRead = new Yaml();
- BufferedReader reader = Tool.FileUtil.getReader(file, Tool.CharsetUtil.UTF_8);
- LokiPushUrlsEntity pr = yamlRead.loadAs(reader, LokiPushUrlsEntity.class);
+ LokiPushUrlsEntity pr = null;
+ try {
+ File file = Tool.FileUtil.file(path);
+ Yaml yamlRead = new Yaml();
+ BufferedReader reader = Tool.FileUtil.getReader(file, Tool.CharsetUtil.UTF_8);
+ pr = yamlRead.loadAs(reader, LokiPushUrlsEntity.class);
+ }catch(Exception e) {
+ log.error(e);
+ }
return pr;
}
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index e28b3ee..b7c7632 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -16,6 +16,11 @@ confagent:
defaultIP: 192.168.40.42
resourcePath: ./
+
+ loki:
+ config: D:\config\loki.yml
+ startCmd: start_loki.sh
+ stopCmd: kill -9 26590
logging:
config: src/main/resources/logback-spring.xml \ No newline at end of file
diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml
index 532cc61..2c66f44 100644
--- a/src/main/resources/application-prod.yml
+++ b/src/main/resources/application-prod.yml
@@ -14,6 +14,11 @@ confagent:
cmdLine: /opt/nezha/nz-agent/blackbox_exporter/config.conf
config: /opt/nezha/nz-agent/blackbox_exporter/blackbox.yml
defaultIP: 127.0.0.1
+
+ loki:
+ config: /opt/nezha/nz-agent/loki/loki.yml
+ startCmd: systemctl start loki
+ stopCmd: systemctl stop loki
logging:
config: config/logback-spring.xml
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index a6cfac7..2c216cf 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -16,4 +16,7 @@ confagent:
lokiPushUrlsFile: config/lokiPushUrls.yml
prometheus:
query:
+ auth: false
+ loki:
+ query:
auth: false \ No newline at end of file
diff --git a/src/main/resources/config/lokiPushUrls.yml b/src/main/resources/config/lokiPushUrls.yml
index 88dfaaf..d090083 100644
--- a/src/main/resources/config/lokiPushUrls.yml
+++ b/src/main/resources/config/lokiPushUrls.yml
@@ -1,6 +1,6 @@
!!net.geedge.confagent.entity.LokiPushUrlsEntity
lokiPushUrls:
-- {target: 'http://192.168.3.3', token: aaa}
-- {target: 'http://127.0.0.1', token: bbb}
-- {target: 'http://127.0.3.1', token: cc2}
-- {target: 'http://127.0.2.1', token: dd3}
+- {target: 'http://192.168.44.64', token: aaa}
+- {target: 'http://192.168.44.61', token: bbb}
+- {target: 'http://192.168.44.64', token: cc2}
+- {target: 'http://192.168.44.60', token: dd3}