summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHDK <[email protected]>2023-12-26 10:53:49 +0000
committerHDK <[email protected]>2023-12-26 10:53:49 +0000
commit85b676d3b1d3a2b855619cf8bbff12471d65ab85 (patch)
treee20ce126c4e914502288e9c00d081237ce84d896
parentae0ee1e271cd7aecbc62ed63834731f4ed26d475 (diff)
1. 解决内存泄露问题
2. 精简无用代码 3. 优化分析器处理逻辑
-rw-r--r--conf/Corefile1
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--plugin/v64dns/analyze/analyze.go38
-rw-r--r--plugin/v64dns/analyze/analyze_test.go2
-rw-r--r--plugin/v64dns/analyze/pb/analyzer.py85
-rw-r--r--plugin/v64dns/setup.go12
-rw-r--r--plugin/v64dns/v64dns.go38
-rw-r--r--plugin/v64dns/v64dns_policy.go41
9 files changed, 78 insertions, 143 deletions
diff --git a/conf/Corefile b/conf/Corefile
index 8006b84..29a6cc1 100644
--- a/conf/Corefile
+++ b/conf/Corefile
@@ -1,5 +1,4 @@
n64.top:53 {
- debug
v64dns n64.top {
v4ns ns41 139.180.153.62
v6ns ns61 2401:c080:1400:6e47:5400:04ff:fea1:1856
diff --git a/go.mod b/go.mod
index cfb141f..e9c9ace 100644
--- a/go.mod
+++ b/go.mod
@@ -23,6 +23,7 @@ require (
github.com/openzipkin/zipkin-go v0.4.2
github.com/oschwald/geoip2-golang v1.9.0
github.com/panjf2000/ants/v2 v2.9.0
+ github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67
github.com/pochard/commons v1.1.2
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/client_model v0.5.0
@@ -79,6 +80,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
+ github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
diff --git a/go.sum b/go.sum
index 62e519d..04109f5 100644
--- a/go.sum
+++ b/go.sum
@@ -223,6 +223,8 @@ github.com/outcaste-io/ristretto v0.2.3 h1:AK4zt/fJ76kjlYObOeNwh4T3asEuaCmp26pOv
github.com/outcaste-io/ristretto v0.2.3/go.mod h1:W8HywhmtlopSB1jeMg3JtdIhf+DYkLAr0VN/s4+MHac=
github.com/panjf2000/ants/v2 v2.9.0 h1:SztCLkVxBRigbg+vt0S5QvF5vxAbxbKt09/YfAJ0tEo=
github.com/panjf2000/ants/v2 v2.9.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
+github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 h1:jik8PHtAIsPlCRJjJzl4udgEf7hawInF9texMeO2jrU=
+github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw=
github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
diff --git a/plugin/v64dns/analyze/analyze.go b/plugin/v64dns/analyze/analyze.go
index b656e52..e8ced05 100644
--- a/plugin/v64dns/analyze/analyze.go
+++ b/plugin/v64dns/analyze/analyze.go
@@ -77,25 +77,23 @@ func newGrpcConn(target string) (*grpc.ClientConn, error) {
}
// 远程调用Python脚本方法
-func (a Analyzer) Go2py(i []string) {
- lck.Lock()
- defer lck.Unlock()
- // 调用服务端函数
- r, err := a.grpcClient.AnalyzeService(context.Background(), &pb.DnsChain{
- Gtype: a.Graphtype,
- Guri: a.Graphuri,
- Guser: a.GraphUser,
- Gpass: a.GraphPass,
- Data: i,
- })
- if err != nil {
- print(r)
- olog.Errorf("调用解析链分析器代码失败: %s", err)
- return
+func (a Analyzer) Go2py(q chan []string) {
+ for i := range q {
+ // 调用服务端函数
+ r, err := a.grpcClient.AnalyzeService(context.Background(), &pb.DnsChain{
+ Gtype: a.Graphtype,
+ Guri: a.Graphuri,
+ Guser: a.GraphUser,
+ Gpass: a.GraphPass,
+ Data: i,
+ })
+ if err != nil {
+ olog.Errorf("调用解析链分析器代码失败: %s", err)
+ return
+ }
+ // 处理不成功则显示警告信息
+ if r.Res != "success" {
+ olog.Warning(r.Res)
+ }
}
- // 处理不成功则显示警告信息
- if r.Res != "success" {
- olog.Warning(r.Res)
- }
-
}
diff --git a/plugin/v64dns/analyze/analyze_test.go b/plugin/v64dns/analyze/analyze_test.go
index cc2bac5..d6a0c6b 100644
--- a/plugin/v64dns/analyze/analyze_test.go
+++ b/plugin/v64dns/analyze/analyze_test.go
@@ -36,7 +36,7 @@ func TestAnalyzer_go2py(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
NewAnalyzer(tt.fields.Graphtype, tt.fields.Graphuri, tt.fields.GraphUser, tt.fields.GraphPass)
- A.Go2py(tt.args.i)
+ // A.Go2py(tt.args.i)
})
}
}
diff --git a/plugin/v64dns/analyze/pb/analyzer.py b/plugin/v64dns/analyze/pb/analyzer.py
index 54dd238..d38d3ad 100644
--- a/plugin/v64dns/analyze/pb/analyzer.py
+++ b/plugin/v64dns/analyze/pb/analyzer.py
@@ -94,7 +94,7 @@ def serve():
class RelResolver53(StructuredRel):
W = IntegerProperty(default=1)
- FTIME = DateTimeFormatProperty(format="%Y-%m-%d %H:%M:%S")
+ # FTIME = DateTimeFormatProperty(default_now=True,format="%Y-%m-%d %H:%M:%S")
# 查询记录定义
class NodeResolverQuery(StructuredNode):
@@ -102,7 +102,7 @@ class NodeResolverQuery(StructuredNode):
QTYPE=StringProperty()
# 解析器和查询记录的关系
class RelResolverQuery(StructuredRel):
- W = IntegerProperty()
+ W = IntegerProperty(default=1)
class NodeResolver53(StructuredNode):
IP = StringProperty(required=True, unique_index=True)
@@ -127,9 +127,6 @@ class NodeResolver53(StructuredNode):
class neo4j_connector:
graph = ""
- # nodematcher = ""
- # relatmatcher = ""
-
def __init__(self, url):
# 连接neo4j
#config.ENCRYPTED = True
@@ -162,83 +159,25 @@ class neo4j_connector:
nodelist[0].LINK.connect(nodelist[1]).save()
else:
L.W+=1
- L.FTIME=datetime.datetime.now(pytz.UTC)
+ # L.FTIME=datetime.datetime.now(pytz.UTC)
L.save()
-
-
-
################################################ 对查询记录节点进行处理################################################
- # 查询是否存在节点
- # q,exist=self.checknode_neo4j(q=data[3],qtype=data[4])
- # # 不存在则新建
- # if not exist:
- # q = NodeResolverQuery(QNAME=data[3],QTYPE=data[4])
- # q.save()
- # 存在则不做处理
-
- ############################################ 查询解析器是否存在关系#############################################
- # L, lexist = self.checklink_neo4j(nodelist[0], nodelist[1])
- # # 数据存在问题则退出
- # if L == "Err":
- # return "node err when link"
- # # 不存在则建立关联
- # if not lexist:
-
-
+ # 查询是否存在节点,不存在则新建
+ querynode=NodeResolverQuery.get_or_create({"QNAME":data[3], "QTYPE":data[4],})[0]
+ querynode.save()
-
-
- # 存在则修改权重
- # else:
- # L.W += 1
- # L.LTIME = datetime.datetime.now(pytz.UTC)
- # L.save()
- # 提交链接
-
- ############################################查询解析器和记录间的关系#########################################
- # QL, lexist = self.checkquerylink(data[1], data[3],data[4])
- # # 数据存在问题则退出
- # if QL == "Err":
- # return "node err when link"
- # # 不存在则建立关联
- # if not lexist:
- # QL[0].QLINK.connect(QL[1], {'W': 1}).save()
- # # 存在则修改权重
- # else:
- # QL.W += 1
- # QL.save()
+ QL=nodelist[0].QLINK.relationship(querynode)
+ if QL is None:
+ nodelist[0].QLINK.connect(querynode).save()
+ else:
+ QL.W+=1
+ QL.save()
# 完成处理,返回
logging.debug("完成处理数据:{"+datastr+"}")
return "success"
- # def checknode_neo4j(self, ip=None,q=None,qtype=None):
- # # 查询IP
- # if ip!=None:
- # a = NodeResolver53.nodes.get_or_none(IP=ip)
- # # 查询记录
- # else:
- # a=NodeResolverQuery.nodes.get_or_none(QNAME=q,QTYPE=qtype)
- # if a is not None:
- # return a, True
- # return None, False
-
- def checklink_neo4j(self, ip_from, ip_to):
- rel = ip_from.LINK.relationship(ip_to)
- if rel is not None:
- return rel, True
- return [ip_from, ip_to], False
-
- def checkquerylink(self,ip,qname,qtype):
- r=NodeResolver53.nodes.get_or_none(IP=ip)
- q=NodeResolverQuery.nodes.get_or_none(QNAME=qname,QTYPE=qtype)
- if r is None or q is None:
- return "Err", False
- rel=r.QLINK.relationship(q)
- if rel is not None:
- return rel, True
- return [r, q], False
if __name__ == '__main__':
serve()
diff --git a/plugin/v64dns/setup.go b/plugin/v64dns/setup.go
index 0da4577..437dbd4 100644
--- a/plugin/v64dns/setup.go
+++ b/plugin/v64dns/setup.go
@@ -1,7 +1,6 @@
package v64dns
import (
- "github.com/panjf2000/ants/v2"
"ohmydns/core/dnsserver"
"ohmydns/plugin"
clog "ohmydns/plugin/pkg/log"
@@ -34,7 +33,7 @@ func parseArg(c *caddy.Controller) (*V64dns, error) {
v.p.dmChange = false
var wg sync.WaitGroup
v.w = &wg
- v.pool, _ = ants.NewPool(3000, ants.WithPreAlloc(true))
+ v.debug = dnsserver.GetConfig(c).Debug
for c.Next() {
arg := c.RemainingArgs()
@@ -92,11 +91,14 @@ func parseArg(c *caddy.Controller) (*V64dns, error) {
}
break
}
- // 监控停止信号
- //go EL.Stop()
+
log.Infof("v64权威服务器启动, 工作参数为 \n Zone:%v, NS4:%v, NS6:%v, IPv4子域:%v, IPv6子域:%v", v.zone, v.ipv4NS, v.ipv6NS, v.p.v4subdomain, v.p.v6subdomain)
log.Infof("分析器启动, 工作参数为 \n url:%v maxlen:%v", v.a.Graphuri, v.p.maxLen)
log.Infof("测试样例: " + v.makeProbev64())
+ v.workqueue = make(chan []string, quelen)
+ for i := 0; i < worker; i++ {
+ go v.a.Go2py(v.workqueue)
+ }
return v, nil
}
@@ -104,4 +106,6 @@ var log = clog.NewWithPlugin("v64dns")
const (
chain_maxlen = 4
+ quelen = 50000
+ worker = 100
)
diff --git a/plugin/v64dns/v64dns.go b/plugin/v64dns/v64dns.go
index 140d2c6..577262f 100644
--- a/plugin/v64dns/v64dns.go
+++ b/plugin/v64dns/v64dns.go
@@ -2,31 +2,27 @@ package v64dns
import (
"context"
- "github.com/miekg/dns"
- "github.com/panjf2000/ants/v2"
olog "ohmydns/plugin/pkg/log"
"ohmydns/plugin/pkg/request"
"ohmydns/plugin/v64dns/analyze"
"strings"
"sync"
-)
-// 针对v64dns请求的抽象
-//type v64Request struct {
-// eid string // 请求归属的实验ID
-//}
+ "github.com/miekg/dns"
+)
// V64dns代表了水印权威
type V64dns struct {
- zone string
- ipv4NS string
- ipv6NS string
- V4NSAddr string
- V6NSAddr string
- p Policy // 生成响应的策略
- a *analyze.Analyzer // 分析器配置
- pool *ants.Pool
- w *sync.WaitGroup
+ zone string
+ ipv4NS string
+ ipv6NS string
+ V4NSAddr string
+ V6NSAddr string
+ p Policy // 生成响应的策略
+ a *analyze.Analyzer // 分析器配置
+ workqueue chan []string
+ w *sync.WaitGroup
+ debug bool // debug输出控制
}
// ServeDNS
@@ -48,11 +44,15 @@ func (v V64dns) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg)
qname := strings.ToLower(state.QName())
switch v.VaildRequest(qname) {
case 0:
- // 正常请求
- log.Info("Receive:" + state.QName() + "====>" + state.Type())
+ // 正常请求
+ if v.debug {
+ log.Info("Receive:" + state.QName() + "====>" + state.Type())
+ }
msg = v.ResponseHandler(msg, state)
case 2:
- log.Info("按照mini请求处理" + state.QName())
+ if v.debug {
+ log.Info("按照mini请求处理" + state.QName())
+ }
// Qname mini
msg = v.ResponseNSorAdd(msg, state, 1)
case 1:
diff --git a/plugin/v64dns/v64dns_policy.go b/plugin/v64dns/v64dns_policy.go
index 85a2343..e249e06 100644
--- a/plugin/v64dns/v64dns_policy.go
+++ b/plugin/v64dns/v64dns_policy.go
@@ -1,11 +1,12 @@
package v64dns
import (
- "github.com/miekg/dns"
"net"
"ohmydns/plugin/pkg/request"
"strconv"
"strings"
+
+ "github.com/miekg/dns"
)
// Policy 定义了权威生成记录可用的方法
@@ -30,12 +31,12 @@ func (v V64dns) ResponseHandler(msg *dns.Msg, state request.Request) *dns.Msg {
// 到达最后一步
switch state.QType() {
// 只处理TXT,CNAME,AAAA记录
- case dns.TypeTXT, dns.TypeCNAME, dns.TypeAAAA:
+ case dns.TypeTXT, dns.TypeCNAME, dns.TypeAAAA, dns.TypeA:
return v.ResponseTXT(msg, state)
case dns.TypeNS:
return v.ResponseNSorAdd(msg, state, 0)
default:
- log.Info("no")
+ log.Infof("域名 %v 的 %v 记录查询不符合要求", state.QName(), state.Type())
// 不符合要求的请求类型直接返回空响应
return msg
}
@@ -62,11 +63,7 @@ func (v V64dns) ResponseTXT(msg *dns.Msg, state request.Request) *dns.Msg {
iaddr = net.ParseIP(iaddr).String()
oaddr := state.IP()
// 调用Python
- v.w.Add(1)
- _ = v.pool.Submit(func() {
- v.a.Go2py([]string{iaddr, oaddr, "1", state.QName(), state.Type()})
- v.w.Done()
- })
+ v.handledata([]string{iaddr, oaddr, "1", state.QName(), state.Type()})
}
@@ -80,13 +77,6 @@ func (v V64dns) ResponseTXT(msg *dns.Msg, state request.Request) *dns.Msg {
// ResponseNSorAdd 0返回NS记录应答,1返回胶水记录
func (v V64dns) ResponseNSorAdd(msg *dns.Msg, state request.Request, flag int) *dns.Msg {
qname := strings.ToLower(state.QName())
- //sub := ""
- //for _, qs := range strings.Split(qname, ".") {
- // switch v.MatchType(qs) {
- // case 2:
- // sub = qs
- // }
- //}
answer := new(dns.NS)
answer.Hdr.Ttl = 3600
@@ -134,17 +124,10 @@ func (v V64dns) ResponseCNAME(msg *dns.Msg, state request.Request) *dns.Msg {
oaddr := state.IP()
// 调用Python脚本
step, _ := strconv.Atoi(string(rune(state.QName()[1])))
- v.w.Add(1)
if step == 1 {
- _ = v.pool.Submit(func() {
- v.a.Go2py([]string{iaddr, oaddr, "0", state.QName(), state.Type()})
- v.w.Done()
- })
+ v.handledata([]string{iaddr, oaddr, "0", state.QName(), state.Type()})
} else {
- _ = v.pool.Submit(func() {
- v.a.Go2py([]string{iaddr, oaddr, "1", state.QName(), state.Type()})
- v.w.Done()
- })
+ v.handledata([]string{iaddr, oaddr, "1", state.QName(), state.Type()})
}
}
@@ -169,10 +152,18 @@ func (v V64dns) ResponseCNAME(msg *dns.Msg, state request.Request) *dns.Msg {
answer.Target += i + "."
}
msg.Answer = append(msg.Answer, answer)
- log.Debug(answer)
return msg
}
+func (v V64dns) handledata(d []string) {
+ select {
+ case v.workqueue <- d:
+ return
+ default:
+ return
+ }
+}
+
const (
v6 = 0
v4 = 1