diff options
| author | HDK <[email protected]> | 2023-12-26 10:53:49 +0000 |
|---|---|---|
| committer | HDK <[email protected]> | 2023-12-26 10:53:49 +0000 |
| commit | 85b676d3b1d3a2b855619cf8bbff12471d65ab85 (patch) | |
| tree | e20ce126c4e914502288e9c00d081237ce84d896 | |
| parent | ae0ee1e271cd7aecbc62ed63834731f4ed26d475 (diff) | |
1. 解决内存泄露问题
2. 精简无用代码
3. 优化分析器处理逻辑
| -rw-r--r-- | conf/Corefile | 1 | ||||
| -rw-r--r-- | go.mod | 2 | ||||
| -rw-r--r-- | go.sum | 2 | ||||
| -rw-r--r-- | plugin/v64dns/analyze/analyze.go | 38 | ||||
| -rw-r--r-- | plugin/v64dns/analyze/analyze_test.go | 2 | ||||
| -rw-r--r-- | plugin/v64dns/analyze/pb/analyzer.py | 85 | ||||
| -rw-r--r-- | plugin/v64dns/setup.go | 12 | ||||
| -rw-r--r-- | plugin/v64dns/v64dns.go | 38 | ||||
| -rw-r--r-- | plugin/v64dns/v64dns_policy.go | 41 |
9 files changed, 78 insertions, 143 deletions
diff --git a/conf/Corefile b/conf/Corefile index 8006b84..29a6cc1 100644 --- a/conf/Corefile +++ b/conf/Corefile @@ -1,5 +1,4 @@ n64.top:53 { - debug v64dns n64.top { v4ns ns41 139.180.153.62 v6ns ns61 2401:c080:1400:6e47:5400:04ff:fea1:1856 @@ -23,6 +23,7 @@ require ( github.com/openzipkin/zipkin-go v0.4.2 github.com/oschwald/geoip2-golang v1.9.0 github.com/panjf2000/ants/v2 v2.9.0 + github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 github.com/pochard/commons v1.1.2 github.com/prometheus/client_golang v1.17.0 github.com/prometheus/client_model v0.5.0 @@ -79,6 +80,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -223,6 +223,8 @@ github.com/outcaste-io/ristretto v0.2.3 h1:AK4zt/fJ76kjlYObOeNwh4T3asEuaCmp26pOv github.com/outcaste-io/ristretto v0.2.3/go.mod h1:W8HywhmtlopSB1jeMg3JtdIhf+DYkLAr0VN/s4+MHac= github.com/panjf2000/ants/v2 v2.9.0 h1:SztCLkVxBRigbg+vt0S5QvF5vxAbxbKt09/YfAJ0tEo= github.com/panjf2000/ants/v2 v2.9.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= +github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 h1:jik8PHtAIsPlCRJjJzl4udgEf7hawInF9texMeO2jrU= +github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw= github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/plugin/v64dns/analyze/analyze.go b/plugin/v64dns/analyze/analyze.go index b656e52..e8ced05 100644 --- a/plugin/v64dns/analyze/analyze.go +++ b/plugin/v64dns/analyze/analyze.go @@ -77,25 +77,23 @@ func newGrpcConn(target string) (*grpc.ClientConn, error) { } // 远程调用Python脚本方法 -func (a Analyzer) Go2py(i []string) { - lck.Lock() - defer lck.Unlock() - // 调用服务端函数 - r, err := a.grpcClient.AnalyzeService(context.Background(), &pb.DnsChain{ - Gtype: a.Graphtype, - Guri: a.Graphuri, - Guser: a.GraphUser, - Gpass: a.GraphPass, - Data: i, - }) - if err != nil { - print(r) - olog.Errorf("调用解析链分析器代码失败: %s", err) - return +func (a Analyzer) Go2py(q chan []string) { + for i := range q { + // 调用服务端函数 + r, err := a.grpcClient.AnalyzeService(context.Background(), &pb.DnsChain{ + Gtype: a.Graphtype, + Guri: a.Graphuri, + Guser: a.GraphUser, + Gpass: a.GraphPass, + Data: i, + }) + if err != nil { + olog.Errorf("调用解析链分析器代码失败: %s", err) + return + } + // 处理不成功则显示警告信息 + if r.Res != "success" { + olog.Warning(r.Res) + } } - // 处理不成功则显示警告信息 - if r.Res != "success" { - olog.Warning(r.Res) - } - } diff --git a/plugin/v64dns/analyze/analyze_test.go b/plugin/v64dns/analyze/analyze_test.go index cc2bac5..d6a0c6b 100644 --- a/plugin/v64dns/analyze/analyze_test.go +++ b/plugin/v64dns/analyze/analyze_test.go @@ -36,7 +36,7 @@ func TestAnalyzer_go2py(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { NewAnalyzer(tt.fields.Graphtype, tt.fields.Graphuri, tt.fields.GraphUser, tt.fields.GraphPass) - A.Go2py(tt.args.i) + // A.Go2py(tt.args.i) }) } } diff --git a/plugin/v64dns/analyze/pb/analyzer.py b/plugin/v64dns/analyze/pb/analyzer.py index 54dd238..d38d3ad 100644 --- a/plugin/v64dns/analyze/pb/analyzer.py +++ b/plugin/v64dns/analyze/pb/analyzer.py @@ -94,7 +94,7 @@ def serve(): class RelResolver53(StructuredRel): W = IntegerProperty(default=1) - FTIME = DateTimeFormatProperty(format="%Y-%m-%d %H:%M:%S") + # FTIME = DateTimeFormatProperty(default_now=True,format="%Y-%m-%d %H:%M:%S") # 查询记录定义 class NodeResolverQuery(StructuredNode): @@ -102,7 +102,7 @@ class NodeResolverQuery(StructuredNode): QTYPE=StringProperty() # 解析器和查询记录的关系 class RelResolverQuery(StructuredRel): - W = IntegerProperty() + W = IntegerProperty(default=1) class NodeResolver53(StructuredNode): IP = StringProperty(required=True, unique_index=True) @@ -127,9 +127,6 @@ class NodeResolver53(StructuredNode): class neo4j_connector: graph = "" - # nodematcher = "" - # relatmatcher = "" - def __init__(self, url): # 连接neo4j #config.ENCRYPTED = True @@ -162,83 +159,25 @@ class neo4j_connector: nodelist[0].LINK.connect(nodelist[1]).save() else: L.W+=1 - L.FTIME=datetime.datetime.now(pytz.UTC) + # L.FTIME=datetime.datetime.now(pytz.UTC) L.save() - - - ################################################ 对查询记录节点进行处理################################################ - # 查询是否存在节点 - # q,exist=self.checknode_neo4j(q=data[3],qtype=data[4]) - # # 不存在则新建 - # if not exist: - # q = NodeResolverQuery(QNAME=data[3],QTYPE=data[4]) - # q.save() - # 存在则不做处理 - - ############################################ 查询解析器是否存在关系############################################# - # L, lexist = self.checklink_neo4j(nodelist[0], nodelist[1]) - # # 数据存在问题则退出 - # if L == "Err": - # return "node err when link" - # # 不存在则建立关联 - # if not lexist: - - + # 查询是否存在节点,不存在则新建 + querynode=NodeResolverQuery.get_or_create({"QNAME":data[3], "QTYPE":data[4],})[0] + querynode.save() - - - # 存在则修改权重 - # else: - # L.W += 1 - # L.LTIME = datetime.datetime.now(pytz.UTC) - # L.save() - # 提交链接 - - ############################################查询解析器和记录间的关系######################################### - # QL, lexist = self.checkquerylink(data[1], data[3],data[4]) - # # 数据存在问题则退出 - # if QL == "Err": - # return "node err when link" - # # 不存在则建立关联 - # if not lexist: - # QL[0].QLINK.connect(QL[1], {'W': 1}).save() - # # 存在则修改权重 - # else: - # QL.W += 1 - # QL.save() + QL=nodelist[0].QLINK.relationship(querynode) + if QL is None: + nodelist[0].QLINK.connect(querynode).save() + else: + QL.W+=1 + QL.save() # 完成处理,返回 logging.debug("完成处理数据:{"+datastr+"}") return "success" - # def checknode_neo4j(self, ip=None,q=None,qtype=None): - # # 查询IP - # if ip!=None: - # a = NodeResolver53.nodes.get_or_none(IP=ip) - # # 查询记录 - # else: - # a=NodeResolverQuery.nodes.get_or_none(QNAME=q,QTYPE=qtype) - # if a is not None: - # return a, True - # return None, False - - def checklink_neo4j(self, ip_from, ip_to): - rel = ip_from.LINK.relationship(ip_to) - if rel is not None: - return rel, True - return [ip_from, ip_to], False - - def checkquerylink(self,ip,qname,qtype): - r=NodeResolver53.nodes.get_or_none(IP=ip) - q=NodeResolverQuery.nodes.get_or_none(QNAME=qname,QTYPE=qtype) - if r is None or q is None: - return "Err", False - rel=r.QLINK.relationship(q) - if rel is not None: - return rel, True - return [r, q], False if __name__ == '__main__': serve() diff --git a/plugin/v64dns/setup.go b/plugin/v64dns/setup.go index 0da4577..437dbd4 100644 --- a/plugin/v64dns/setup.go +++ b/plugin/v64dns/setup.go @@ -1,7 +1,6 @@ package v64dns import ( - "github.com/panjf2000/ants/v2" "ohmydns/core/dnsserver" "ohmydns/plugin" clog "ohmydns/plugin/pkg/log" @@ -34,7 +33,7 @@ func parseArg(c *caddy.Controller) (*V64dns, error) { v.p.dmChange = false var wg sync.WaitGroup v.w = &wg - v.pool, _ = ants.NewPool(3000, ants.WithPreAlloc(true)) + v.debug = dnsserver.GetConfig(c).Debug for c.Next() { arg := c.RemainingArgs() @@ -92,11 +91,14 @@ func parseArg(c *caddy.Controller) (*V64dns, error) { } break } - // 监控停止信号 - //go EL.Stop() + log.Infof("v64权威服务器启动, 工作参数为 \n Zone:%v, NS4:%v, NS6:%v, IPv4子域:%v, IPv6子域:%v", v.zone, v.ipv4NS, v.ipv6NS, v.p.v4subdomain, v.p.v6subdomain) log.Infof("分析器启动, 工作参数为 \n url:%v maxlen:%v", v.a.Graphuri, v.p.maxLen) log.Infof("测试样例: " + v.makeProbev64()) + v.workqueue = make(chan []string, quelen) + for i := 0; i < worker; i++ { + go v.a.Go2py(v.workqueue) + } return v, nil } @@ -104,4 +106,6 @@ var log = clog.NewWithPlugin("v64dns") const ( chain_maxlen = 4 + quelen = 50000 + worker = 100 ) diff --git a/plugin/v64dns/v64dns.go b/plugin/v64dns/v64dns.go index 140d2c6..577262f 100644 --- a/plugin/v64dns/v64dns.go +++ b/plugin/v64dns/v64dns.go @@ -2,31 +2,27 @@ package v64dns import ( "context" - "github.com/miekg/dns" - "github.com/panjf2000/ants/v2" olog "ohmydns/plugin/pkg/log" "ohmydns/plugin/pkg/request" "ohmydns/plugin/v64dns/analyze" "strings" "sync" -) -// 针对v64dns请求的抽象 -//type v64Request struct { -// eid string // 请求归属的实验ID -//} + "github.com/miekg/dns" +) // V64dns代表了水印权威 type V64dns struct { - zone string - ipv4NS string - ipv6NS string - V4NSAddr string - V6NSAddr string - p Policy // 生成响应的策略 - a *analyze.Analyzer // 分析器配置 - pool *ants.Pool - w *sync.WaitGroup + zone string + ipv4NS string + ipv6NS string + V4NSAddr string + V6NSAddr string + p Policy // 生成响应的策略 + a *analyze.Analyzer // 分析器配置 + workqueue chan []string + w *sync.WaitGroup + debug bool // debug输出控制 } // ServeDNS @@ -48,11 +44,15 @@ func (v V64dns) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) qname := strings.ToLower(state.QName()) switch v.VaildRequest(qname) { case 0: - // 正常请求 - log.Info("Receive:" + state.QName() + "====>" + state.Type()) + // 正常请求 + if v.debug { + log.Info("Receive:" + state.QName() + "====>" + state.Type()) + } msg = v.ResponseHandler(msg, state) case 2: - log.Info("按照mini请求处理" + state.QName()) + if v.debug { + log.Info("按照mini请求处理" + state.QName()) + } // Qname mini msg = v.ResponseNSorAdd(msg, state, 1) case 1: diff --git a/plugin/v64dns/v64dns_policy.go b/plugin/v64dns/v64dns_policy.go index 85a2343..e249e06 100644 --- a/plugin/v64dns/v64dns_policy.go +++ b/plugin/v64dns/v64dns_policy.go @@ -1,11 +1,12 @@ package v64dns import ( - "github.com/miekg/dns" "net" "ohmydns/plugin/pkg/request" "strconv" "strings" + + "github.com/miekg/dns" ) // Policy 定义了权威生成记录可用的方法 @@ -30,12 +31,12 @@ func (v V64dns) ResponseHandler(msg *dns.Msg, state request.Request) *dns.Msg { // 到达最后一步 switch state.QType() { // 只处理TXT,CNAME,AAAA记录 - case dns.TypeTXT, dns.TypeCNAME, dns.TypeAAAA: + case dns.TypeTXT, dns.TypeCNAME, dns.TypeAAAA, dns.TypeA: return v.ResponseTXT(msg, state) case dns.TypeNS: return v.ResponseNSorAdd(msg, state, 0) default: - log.Info("no") + log.Infof("域名 %v 的 %v 记录查询不符合要求", state.QName(), state.Type()) // 不符合要求的请求类型直接返回空响应 return msg } @@ -62,11 +63,7 @@ func (v V64dns) ResponseTXT(msg *dns.Msg, state request.Request) *dns.Msg { iaddr = net.ParseIP(iaddr).String() oaddr := state.IP() // 调用Python - v.w.Add(1) - _ = v.pool.Submit(func() { - v.a.Go2py([]string{iaddr, oaddr, "1", state.QName(), state.Type()}) - v.w.Done() - }) + v.handledata([]string{iaddr, oaddr, "1", state.QName(), state.Type()}) } @@ -80,13 +77,6 @@ func (v V64dns) ResponseTXT(msg *dns.Msg, state request.Request) *dns.Msg { // ResponseNSorAdd 0返回NS记录应答,1返回胶水记录 func (v V64dns) ResponseNSorAdd(msg *dns.Msg, state request.Request, flag int) *dns.Msg { qname := strings.ToLower(state.QName()) - //sub := "" - //for _, qs := range strings.Split(qname, ".") { - // switch v.MatchType(qs) { - // case 2: - // sub = qs - // } - //} answer := new(dns.NS) answer.Hdr.Ttl = 3600 @@ -134,17 +124,10 @@ func (v V64dns) ResponseCNAME(msg *dns.Msg, state request.Request) *dns.Msg { oaddr := state.IP() // 调用Python脚本 step, _ := strconv.Atoi(string(rune(state.QName()[1]))) - v.w.Add(1) if step == 1 { - _ = v.pool.Submit(func() { - v.a.Go2py([]string{iaddr, oaddr, "0", state.QName(), state.Type()}) - v.w.Done() - }) + v.handledata([]string{iaddr, oaddr, "0", state.QName(), state.Type()}) } else { - _ = v.pool.Submit(func() { - v.a.Go2py([]string{iaddr, oaddr, "1", state.QName(), state.Type()}) - v.w.Done() - }) + v.handledata([]string{iaddr, oaddr, "1", state.QName(), state.Type()}) } } @@ -169,10 +152,18 @@ func (v V64dns) ResponseCNAME(msg *dns.Msg, state request.Request) *dns.Msg { answer.Target += i + "." } msg.Answer = append(msg.Answer, answer) - log.Debug(answer) return msg } +func (v V64dns) handledata(d []string) { + select { + case v.workqueue <- d: + return + default: + return + } +} + const ( v6 = 0 v4 = 1 |
