diff options
| author | hyx <[email protected]> | 2021-08-05 21:14:55 +0800 |
|---|---|---|
| committer | hyx <[email protected]> | 2021-08-05 21:14:55 +0800 |
| commit | 76bed25d3fb9ef0f2f004ae0e1efd8564197c58c (patch) | |
| tree | 0990947938539b11d5fd15e2ba57df98e6cf3b26 | |
| parent | 8bbe4f87321d8424d79c5da831611aaf6a689e7b (diff) | |
NEZ-890 agent 组件增加 loki配置下发接口,代理接口
| -rw-r--r-- | src/main/java/net/geedge/confagent/controller/LokiController.java | 280 | ||||
| -rw-r--r-- | src/main/java/net/geedge/confagent/thread/RequestThread.java | 119 | ||||
| -rw-r--r-- | src/main/java/net/geedge/confagent/util/YamlUtil.java | 17 | ||||
| -rw-r--r-- | src/main/resources/application-dev.yml | 5 | ||||
| -rw-r--r-- | src/main/resources/application-prod.yml | 5 | ||||
| -rw-r--r-- | src/main/resources/application.yml | 3 | ||||
| -rw-r--r-- | src/main/resources/config/lokiPushUrls.yml | 8 |
7 files changed, 428 insertions, 9 deletions
diff --git a/src/main/java/net/geedge/confagent/controller/LokiController.java b/src/main/java/net/geedge/confagent/controller/LokiController.java new file mode 100644 index 0000000..e981e1e --- /dev/null +++ b/src/main/java/net/geedge/confagent/controller/LokiController.java @@ -0,0 +1,280 @@ +package net.geedge.confagent.controller; + +import cn.hutool.core.net.url.UrlBuilder; +import cn.hutool.core.net.url.UrlPath; +import cn.hutool.core.util.ReflectUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.http.HttpConnection; +import cn.hutool.log.Log; +import net.geedge.confagent.annotation.UnCheckToken; +import net.geedge.confagent.entity.LokiPushUrlEntity; +import net.geedge.confagent.entity.LokiPushUrlsEntity; +import net.geedge.confagent.thread.RequestThread; +import net.geedge.confagent.util.ConfagentUtil; +import net.geedge.confagent.util.R; +import net.geedge.confagent.util.RCode; +import net.geedge.confagent.util.Tool; +import net.geedge.confagent.util.Tool.YamlUtil; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.web.bind.annotation.*; + +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.nio.charset.Charset; +import java.util.*; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +@RestController +@RequestMapping("/loki") +public class LokiController extends BaseController{ + private final static Log log = Log.get(); + + @Autowired + private ConfagentUtil confagentUtil; + + @Value("${confagent.loki.config}") + private String lokiConfPath; + + @Value("${confagent.loki.query.auth:true}") + private Boolean queryAuth; + + private final String[] QUERY_API_SUFFIX = {"query","query_range","series","labels","values"}; + + @Value("${confagent.loki.startCmd:systemctl start loki.service}") + private String startCmd; + + @Value("${confagent.loki.stopCmd:systemctl stop loki.service}") + private String stopCmd; + + @Value("${confagent.lokiPushUrlsFile:lokiPushUrls.yml}") + private String lokiFile; + + private static String rootPath = Tool.WebPathUtil.getRootPath(); + + /** + * @Description 写入loki配置文件 + * @Author han + * @Date 2021/8/4 + */ + @PostMapping("/config") + public R overwriteConfig( @RequestBody Map<String,Object> configs){ + + Map<String,Object> lokiConf =(Map<String,Object>) configs.get("config"); + + if(!Tool.MapUtil.isEmpty(lokiConf)){ + log.info("write loki conf:{}", Tool.JSONUtil.toJsonStr(lokiConf)); + Tool.YamlUtil.writeAsMap(lokiConf,lokiConfPath); + if(Tool.StrUtil.isNotBlank(stopCmd)) { + log.info("stop loki:"+stopCmd); + Tool.RuntimeUtil.exec(stopCmd); + log.info("stop loki:"+stopCmd+" end"); + } + Tool.ThreadUtil.sleep(1000); + if(Tool.StrUtil.isNotBlank(startCmd)) { + log.info("start loki:"+startCmd); +// String[] b={"sh","-c",startCmd}; +// Tool.RuntimeUtil.exec(b); + Tool.RuntimeUtil.exec(startCmd); + log.info("start loki:"+startCmd+" end"); + } + } + + return R.ok(); + } + + /** + * @Description 获取loki相关配置 + * @Author han + * @Date 2021/8/4 + */ + @GetMapping("/config") + public R queryConfig(HttpServletRequest request){ + + Map<String,Object> prometheusConf = Tool.YamlUtil.readAsMap(lokiConfPath); + + Map<String,Object> result = new HashMap<>(); + result.put("config",prometheusConf); + + return R.ok(result); + } + + /** + * @Description 代理本loki 接口 + * @Author han + * @Date 2021/8/4 + */ + @RequestMapping("/proxy/**") + @UnCheckToken + public void proxy(HttpServletRequest request, HttpServletResponse response){ + String method = request.getMethod(); + String lokiPath = request.getServletPath().replace("/loki/proxy",""); + String token = request.getHeader("Authorization"); + + R r = confagentUtil.checkToken(token); + + Boolean isQuery=false; + + //queryAuth 配置只限制查询是否需要校验 + if(queryAuth){ + isQuery = Tool.StrUtil.isNotBlank(Arrays.stream(QUERY_API_SUFFIX).filter(t -> lokiPath.indexOf(t) != -1).findAny().orElse(null)); + } + + if(isQuery &&r.getCode() != RCode.SUCCESS.getCode()){ + ConfagentUtil.writeResponse(response,r); + return; + } + + int port = 3100;//ConfagentUtil.getConfFilePort(lokiConfPath,LOKI_LISTEN_ADDR,3100); + Map<String,Object> lokiMap= Tool.YamlUtil.readAsMap(lokiConfPath); + Map<String,Object> serverObj = (Map<String,Object>)lokiMap.get("server"); + if(serverObj!=null) { + port = (Integer)serverObj.get("http_listen_port"); + } + + File lf = Tool.FileUtil.file(rootPath, lokiFile); + LokiPushUrlsEntity lokiPushUrlsEntity = YamlUtil.readAsLokiPushUrlsEntity(lf.getAbsolutePath()); + if(lokiPushUrlsEntity!=null && lokiPushUrlsEntity.getLokiPushUrls()!=null + && lokiPushUrlsEntity.getLokiPushUrls().size()>0) { + List<LokiPushUrlEntity> list = lokiPushUrlsEntity.getLokiPushUrls(); + //loki 代理接口 为/loki/api/v1/push 接口 时,根据lokiPushUrls 配置 转发所有的target,否则随机选择一个target转发即可 + if("/loki/api/v1/push".equals(lokiPath)) { + List<String> rltList = new ArrayList<String>(); + + for(LokiPushUrlEntity urlEntity:list) { + String ip = urlEntity.getTarget(); + if(ip.startsWith("http://")) { + ip = ip.substring(7); + } + RequestThread requestThread = new RequestThread(); + requestThread.setHost(ip); + requestThread.setToken(urlEntity.getToken()); + requestThread.setPort(port); + requestThread.setMethod(method); + requestThread.setPath(lokiPath); + requestThread.setRequest(request); + requestThread.setResponse(response); + Future<String> f = Tool.ThreadUtil.execAsync(requestThread);//.execute(requestThread); + try { + String rlt = f.get(5, TimeUnit.SECONDS); + rltList.add(rlt); + } catch (Exception e) { + log.error(e); + } + } + if(rltList.size()>0) { + OutputStream outputStream = null; + try { + log.info("loki 返回内容:"+rltList.toString()); + outputStream = response.getOutputStream(); + Tool.IoUtil.writeUtf8(outputStream,true, rltList.get(0)); + outputStream.flush(); + + } catch (Exception e) { + log.error(e); + } finally { + Tool.IoUtil.close(outputStream); + } + } + }else { + LokiPushUrlEntity urlEntity = list.get(0); + String ip = urlEntity.getTarget(); + if(ip.startsWith("http://")) { + ip = ip.substring(7); + } + requestProm(ip,token,port,lokiPath,request,response); + } + }else { + OutputStream outputStream = null; + try { + outputStream = response.getOutputStream(); + Tool.IoUtil.writeUtf8(outputStream,true, "The lokiPushUrls file does not exist"); + outputStream.flush(); + + } catch (Exception e) { + log.error(e); + } finally { + Tool.IoUtil.close(outputStream); + } + } + + } + + public void requestProm(String host, String token,int port, String path, HttpServletRequest request, HttpServletResponse response) { + String queryString = ReflectUtil.invoke(request, "getQueryString"); + queryString = StrUtil.isNotBlank(queryString)?queryString:""; + String url = UrlBuilder.create().setScheme("http").setHost(host).setPort(port).setPath(UrlPath.of(path, Charset.forName("UTF-8"))).toURL().toString() + "?" + queryString; + + log.info("loki url: {}", url); + String method = request.getMethod(); + HttpURLConnection conn = null; + ServletInputStream reqInputStream = null; + ServletOutputStream resOutputStream = null; + OutputStream connOutputStream = null; + InputStream connInputStream = null; + try { + conn = HttpConnection.create(url, null).getHttpURLConnection(); + reqInputStream = request.getInputStream(); + resOutputStream = response.getOutputStream(); + conn.setRequestMethod(method); + // 复制请求头 + Enumeration<String> headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String hn = headerNames.nextElement(); + if(!"authorization".equalsIgnoreCase(hn)) { + ReflectUtil.invoke(conn,"addRequestProperty",hn,request.getHeader(hn)); + } + } + ReflectUtil.invoke(conn,"addRequestProperty","authorization",token); + + if (!"GET".equalsIgnoreCase(method)) { + conn.setDoOutput(true); + conn.setDoInput(true); + conn.setUseCaches(false); + connOutputStream = conn.getOutputStream(); + Tool.IoUtil.copy(reqInputStream, connOutputStream); + } + conn.connect(); + int responseCode = conn.getResponseCode(); + connInputStream = (responseCode < 400)? conn.getInputStream():conn.getErrorStream(); + String responseMessage = conn.getResponseMessage(); + Map<String, List<String>> responseHeaders = conn.getHeaderFields(); + //复制响应头 + for(Map.Entry<String, List<String>> en : responseHeaders.entrySet()) { + String key = en.getKey(); + if (Tool.StrUtil.isEmpty(key) || "Transfer-Encoding".equals(key)) continue; + List<String> value = en.getValue(); + ReflectUtil.invoke(response,"addHeader",key,Tool.StrUtil.join("; ",value)); + } + ReflectUtil.invoke(response,"addHeader","Authorization",token); + // response.setStatus(responseCode, responseMessage); + ReflectUtil.invoke(response, "setStatus", responseCode, responseMessage); + + Tool.IoUtil.copy(connInputStream, resOutputStream); + resOutputStream.flush();//flush 输出流 + } catch (Exception e) { + try { + response.sendError(500, "request error"); + } catch (IOException e1) { + log.error("proxy request error",e1); + } + log.error("request error : ",e); + }finally { + Tool.IoUtil.close(reqInputStream,resOutputStream,connOutputStream,connInputStream); + if(conn != null){ + conn.disconnect(); + } + } + } + +} diff --git a/src/main/java/net/geedge/confagent/thread/RequestThread.java b/src/main/java/net/geedge/confagent/thread/RequestThread.java new file mode 100644 index 0000000..eec6a48 --- /dev/null +++ b/src/main/java/net/geedge/confagent/thread/RequestThread.java @@ -0,0 +1,119 @@ +package net.geedge.confagent.thread;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.nio.charset.Charset;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import cn.hutool.core.net.url.UrlBuilder;
+import cn.hutool.core.net.url.UrlPath;
+import cn.hutool.core.util.ReflectUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.http.HttpConnection;
+import cn.hutool.log.Log;
+import lombok.Data;
+import net.geedge.confagent.util.Tool;
+
+@Data
+public class RequestThread implements Callable<String>{
+
+ private final static Log log = Log.get();
+ private String host;
+ private String token;
+ private String method;
+ private int port;
+ private String path;
+ public HttpServletRequest request;
+ private HttpServletResponse response;
+
+ @Override
+ public String call() throws Exception {
+
+ String queryString = ReflectUtil.invoke(request, "getQueryString");
+ queryString = StrUtil.isNotBlank(queryString)?queryString:"";
+ String url = UrlBuilder.create().setScheme("http").setHost(host).setPort(port).setPath(UrlPath.of(path, Charset.forName("UTF-8"))).toURL().toString() + "?" + queryString;
+
+ log.info("loki url: {}", url);
+// String method = request.getMethod();
+ HttpURLConnection conn = null;
+ ServletInputStream reqInputStream = null;
+// ServletOutputStream resOutputStream = null;
+ OutputStream connOutputStream = null;
+ InputStream connInputStream = null;
+ try {
+ conn = HttpConnection.create(url, null).getHttpURLConnection();
+ reqInputStream = request.getInputStream();
+
+// resOutputStream = response.getOutputStream();
+ conn.setRequestMethod(method);
+// conn.addRequestProperty("authorization", token);
+ // 复制请求头
+ Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String hn = headerNames.nextElement();
+ if(!"authorization".equalsIgnoreCase(hn)) {
+ ReflectUtil.invoke(conn,"addRequestProperty",hn,request.getHeader(hn));
+ }
+ log.info(hn+"===="+request.getHeader(hn));
+ }
+ ReflectUtil.invoke(conn,"addRequestProperty","authorization",token);
+
+
+ if (!"GET".equalsIgnoreCase(method)) {
+ conn.setDoOutput(true);
+ conn.setDoInput(true);
+ conn.setUseCaches(false);
+ connOutputStream = conn.getOutputStream();
+ Tool.IoUtil.copy(reqInputStream, connOutputStream);
+ }
+ conn.connect();
+ int responseCode = conn.getResponseCode();
+ connInputStream = (responseCode < 400)? conn.getInputStream():conn.getErrorStream();
+ String responseMessage = conn.getResponseMessage();
+ Map<String, List<String>> responseHeaders = conn.getHeaderFields();
+
+ if(!response.isCommitted()) {
+ log.info("复制响应头");
+ //复制响应头
+ for(Map.Entry<String, List<String>> en : responseHeaders.entrySet()) {
+ String key = en.getKey();
+ if (Tool.StrUtil.isEmpty(key) || "Transfer-Encoding".equals(key)) continue;
+ List<String> value = en.getValue();
+ ReflectUtil.invoke(response,"addHeader",key,Tool.StrUtil.join("; ",value));
+ }
+ ReflectUtil.invoke(response, "setStatus", responseCode, responseMessage);
+ }
+
+ String str = Tool.IoUtil.read(connInputStream, "UTF-8");
+ log.info(str);
+
+// Tool.IoUtil.copy(connInputStream, resOutputStream);
+// resOutputStream.flush();//flush 输出流
+ return str;
+ } catch (Exception e) {
+ try {
+// response.sendError(500, "request error");
+ return "request error";
+ } catch (Exception e1) {
+ log.error("proxy request error",e1);
+ }
+ log.error("request error : ",e);
+ }finally {
+// Tool.IoUtil.close(reqInputStream,resOutputStream,connOutputStream,connInputStream);
+ Tool.IoUtil.close(reqInputStream,connOutputStream,connInputStream);
+ if(conn != null){
+ conn.disconnect();
+ }
+ }
+ return "";
+ }
+}
\ No newline at end of file diff --git a/src/main/java/net/geedge/confagent/util/YamlUtil.java b/src/main/java/net/geedge/confagent/util/YamlUtil.java index 8fd2415..7849cd1 100644 --- a/src/main/java/net/geedge/confagent/util/YamlUtil.java +++ b/src/main/java/net/geedge/confagent/util/YamlUtil.java @@ -1,6 +1,7 @@ package net.geedge.confagent.util; import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; import net.geedge.confagent.entity.LokiPushUrlsEntity; import org.springframework.beans.factory.config.YamlPropertiesFactoryBean; @@ -15,7 +16,8 @@ import java.util.Map; import java.util.Properties; public class YamlUtil { - + private final static Log log = Log.get(); + public static void writeAsMap(Map map,String path){ Map writeObj = map; if(writeObj == null) @@ -53,10 +55,15 @@ public class YamlUtil { * @Date 2021/3/24 */ public static LokiPushUrlsEntity readAsLokiPushUrlsEntity(String path){ - File file = Tool.FileUtil.file(path); - Yaml yamlRead = new Yaml(); - BufferedReader reader = Tool.FileUtil.getReader(file, Tool.CharsetUtil.UTF_8); - LokiPushUrlsEntity pr = yamlRead.loadAs(reader, LokiPushUrlsEntity.class); + LokiPushUrlsEntity pr = null; + try { + File file = Tool.FileUtil.file(path); + Yaml yamlRead = new Yaml(); + BufferedReader reader = Tool.FileUtil.getReader(file, Tool.CharsetUtil.UTF_8); + pr = yamlRead.loadAs(reader, LokiPushUrlsEntity.class); + }catch(Exception e) { + log.error(e); + } return pr; } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index e28b3ee..b7c7632 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -16,6 +16,11 @@ confagent: defaultIP: 192.168.40.42 resourcePath: ./ + + loki: + config: D:\config\loki.yml + startCmd: start_loki.sh + stopCmd: kill -9 26590 logging: config: src/main/resources/logback-spring.xml
\ No newline at end of file diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 532cc61..2c66f44 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -14,6 +14,11 @@ confagent: cmdLine: /opt/nezha/nz-agent/blackbox_exporter/config.conf config: /opt/nezha/nz-agent/blackbox_exporter/blackbox.yml defaultIP: 127.0.0.1 + + loki: + config: /opt/nezha/nz-agent/loki/loki.yml + startCmd: systemctl start loki + stopCmd: systemctl stop loki logging: config: config/logback-spring.xml diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a6cfac7..2c216cf 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -16,4 +16,7 @@ confagent: lokiPushUrlsFile: config/lokiPushUrls.yml prometheus: query: + auth: false + loki: + query: auth: false
\ No newline at end of file diff --git a/src/main/resources/config/lokiPushUrls.yml b/src/main/resources/config/lokiPushUrls.yml index 88dfaaf..d090083 100644 --- a/src/main/resources/config/lokiPushUrls.yml +++ b/src/main/resources/config/lokiPushUrls.yml @@ -1,6 +1,6 @@ !!net.geedge.confagent.entity.LokiPushUrlsEntity lokiPushUrls: -- {target: 'http://192.168.3.3', token: aaa} -- {target: 'http://127.0.0.1', token: bbb} -- {target: 'http://127.0.3.1', token: cc2} -- {target: 'http://127.0.2.1', token: dd3} +- {target: 'http://192.168.44.64', token: aaa} +- {target: 'http://192.168.44.61', token: bbb} +- {target: 'http://192.168.44.64', token: cc2} +- {target: 'http://192.168.44.60', token: dd3} |
