diff options
| author | Handingkang <[email protected]> | 2023-07-20 20:51:11 +0800 |
|---|---|---|
| committer | Handingkang <[email protected]> | 2023-07-20 20:51:11 +0800 |
| commit | d66e93ccc19f7fb3d190952ef8f041f6462004cb (patch) | |
| tree | 08b3c223dfa62505e3c4ae6b3f05ec3545a3083e | |
| parent | 38c3fb8500e86f2d81591baa3c7b663f88bc83b7 (diff) | |
prober优化
| -rw-r--r-- | core/prober/prober.go | 415 | ||||
| -rw-r--r-- | core/prober/prober_args.go | 9 | ||||
| -rw-r--r-- | core/prober/prober_http.go | 4 | ||||
| -rw-r--r-- | core/prober/prober_serve.go | 439 | ||||
| -rw-r--r-- | go.mod | 1 | ||||
| -rw-r--r-- | go.sum | 14 | ||||
| -rw-r--r-- | plugin/pkg/prober/prober.go | 21 | ||||
| -rw-r--r-- | plugin/pkg/prober/proberutil.go | 2 | ||||
| -rw-r--r-- | plugin/prober/qname/qname.go | 76 | ||||
| -rw-r--r-- | plugin/prober/qname/qnameutil.go | 13 | ||||
| -rw-r--r-- | plugin/prober/qname/setup.go | 23 | ||||
| -rw-r--r-- | plugin/register.go | 2 |
12 files changed, 601 insertions, 418 deletions
diff --git a/core/prober/prober.go b/core/prober/prober.go index 48f7052..22f4163 100644 --- a/core/prober/prober.go +++ b/core/prober/prober.go @@ -1,413 +1,18 @@ package prober import ( - "context" - "encoding/json" - "errors" - "fmt" - "github.com/coredns/caddy" "github.com/miekg/dns" - ot "github.com/opentracing/opentracing-go" - "net" - "net/http" - "ohmydns2/plugin" - ohttp "ohmydns2/plugin/pkg/http" - olog "ohmydns2/plugin/pkg/log" - "ohmydns2/plugin/pkg/request" - "ohmydns2/plugin/pkg/reuseport" - "ohmydns2/plugin/pkg/trace" - "ohmydns2/plugin/pkg/transport" - "ohmydns2/plugin/prometheus/vars" - "runtime" - "runtime/debug" - "strconv" "sync" - "time" ) -// ProbeServer represents an instance of a server, which serves -// DNS requests at a particular address (host and port). A -// server is capable of serving numerous zones on -// the same address and the listener may be stopped for -// graceful termination (POSIX only). -type ProbeServer struct { - Addr string // Address we listen on - - server *http.Server // http服务 - m sync.Mutex // protects the servers - - conf *PBConfig // zones keyed by their port - httpWg sync.WaitGroup // used to wait on outstanding connections - graceTimeout time.Duration // the maximum duration of a graceful shutdown - trace trace.Trace // the trace plugin for the server - debug bool // disable recover() - stacktrace bool // enable stacktrace in recover error log - classChaos bool // allow non-INET class queries - idleTimeout time.Duration // Idle timeout for TCP - readTimeout time.Duration // Read timeout for TCP - writeTimeout time.Duration // Write timeout for TCP - - tsigSecret map[string]string -} - -// response 是Prober控制响应的抽象 -type response struct { - Code int `json:"code"` - Msg string `json:"msg"` -} - -// MetadataCollector is a plugin that can retrieve metadata functions from all metadata providing plugins -type ProberMetadataCollector interface { - Collect(context.Context, request.HTTPRequest) context.Context -} - -// NewServer returns a new OhmyDNS2 probe server and compiles all plugins in to it. -func NewServer(addr string, conf *PBConfig) (*ProbeServer, error) { - s := &ProbeServer{ - Addr: addr, - graceTimeout: 5 * time.Second, - idleTimeout: 10 * time.Second, - readTimeout: 3 * time.Second, - writeTimeout: 5 * time.Second, - tsigSecret: make(map[string]string), - } - olog.Infof("服务启动,监听地址: %v", addr) - - // We have to bound our wg with one increment - // to prevent a "race condition" that is hard-coded - // into sync.WaitGroup.Wait() - basically, an add - // with a positive delta must be guaranteed to - // occur before Wait() is called on the wg. - // In a way, this kind of acts as a safety barrier. - s.httpWg.Add(1) - - if conf.Debug { - s.debug = true - olog.D.Set() - } - s.stacktrace = conf.Stacktrace - - // append the config to the zone's configs - s.conf = conf - - // set timeouts - if conf.ReadTimeout != 0 { - s.readTimeout = conf.ReadTimeout - } - if conf.WriteTimeout != 0 { - s.writeTimeout = conf.WriteTimeout - } - if conf.IdleTimeout != 0 { - s.idleTimeout = conf.IdleTimeout - } - - //// copy tsig secrets - //for key, secret := range conf.TsigSecret { - // s.tsigSecret[key] = secret - //} - - // compile custom plugin for everything - var stack plugin.Prober - for i := len(conf.Plugin) - 1; i >= 0; i-- { - stack = conf.Plugin[i](stack) - - // register the *handler* also - conf.registerProber(stack) - - // If the current plugin is a MetadataCollector, bookmark it for later use. This loop traverses the plugin - // list backwards, so the first MetadataCollector plugin wins. - if mdc, ok := stack.(ProberMetadataCollector); ok { - conf.metaCollector = mdc - } - - if s.trace == nil && stack.Name() == "trace" { - // we have to stash away the plugin, not the - // Tracer object, because the Tracer won't be initialized yet - if t, ok := stack.(trace.Trace); ok { - s.trace = t - } - } - // Unblock CH class queries when any of these plugins are loaded. - if _, ok := EnableChaos[stack.Name()]; ok { - s.classChaos = true - } - conf.pluginChain = stack - } - - if !s.debug { - // When reloading we need to explicitly disable debug logging if it is now disabled. - olog.D.Clear() - } - - return s, nil -} - -// Compile-time check to ensure Server implements the caddy.GracefulServer interface -var _ caddy.GracefulServer = &ProbeServer{} - -// Serve starts the server with an existing listener. It blocks until the server stops. -// This implements caddy.TCPServer interface. -func (ps *ProbeServer) Serve(l net.Listener) error { - ps.m.Lock() - - ps.server = &http.Server{ - Addr: l.Addr().String(), - Handler: http.HandlerFunc(func(writer http.ResponseWriter, r *http.Request) { - ctx := context.WithValue(context.Background(), Key{}, ps) - ctx = context.WithValue(ctx, LoopKey{}, 0) - ps.ServeProbe(ctx, writer, r) - }), - DisableGeneralOptionsHandler: false, - ReadTimeout: ps.readTimeout, - WriteTimeout: ps.writeTimeout, - IdleTimeout: ps.idleTimeout} - //&dns.Server{Listener: l, - //Net: "tcp", - //TsigSecret: ps.tsigSecret, - //MaxTCPQueries: tcpMaxQueries, - //ReadTimeout: ps.readTimeout, - //WriteTimeout: ps.writeTimeout, - //IdleTimeout: func() time.Duration { - // return ps.idleTimeout - //}, - //Handler: dns.HandlerFunc(func(w dns.ResponseWriter, r *dns.Msg) { - // ctx := context.WithValue(context.Background(), Key{}, ps) - // ctx = context.WithValue(ctx, LoopKey{}, 0) - // s.ServeDNS(ctx, w, r) - //})} - - ps.m.Unlock() - - return ps.server.ListenAndServe() -} - -// ServePacket starts the server with an existing packetconn. It blocks until the server stops. -// This implements caddy.UDPServer interface. -func (ps *ProbeServer) ServePacket(p net.PacketConn) error { - return nil -} - -// Listen implements caddy.TCPServer interface. -func (ps *ProbeServer) Listen() (net.Listener, error) { - l, err := reuseport.Listen("tcp", ps.Addr[len(transport.DNS+"://"):]) - if err != nil { - return nil, err - } - return l, nil -} - -// WrapListener Listen implements caddy.GracefulServer interface. -func (ps *ProbeServer) WrapListener(ln net.Listener) net.Listener { - return ln +type Prober struct { + Prange []string `json:"prange"` // 探测范围 + Ptype string `json:"ptype"` // 探针类型 + AllAddr int `json:"allAddr"` // 总共需要探测的地址数 + ScannedAddr int `json:"scannedAddr"` // 已探测过的地址数 + Pid int `json:"pid"` // 探测器ID + Loop bool `json:"loop"` //是否持续探测 + m sync.Mutex + stop chan bool // stop信号量 + c *dns.Client } - -// ListenPacket implements caddy.UDPServer interface. -func (ps *ProbeServer) ListenPacket() (net.PacketConn, error) { - p, err := reuseport.ListenPacket("udp", ps.Addr[len(transport.DNS+"://"):]) - if err != nil { - return nil, err - } - - return p, nil -} - -// Stop stops the server. It blocks until the server is -// totally stopped. On POSIX systems, it will wait for -// connections to close (up to a max timeout of a few -// seconds); on Windows it will close the listener -// immediately. -// This implements Caddy.Stopper interface. -func (ps *ProbeServer) Stop() (err error) { - if runtime.GOOS != "windows" { - // force connections to close after timeout - done := make(chan struct{}) - go func() { - ps.httpWg.Done() // decrement our initial increment used as a barrier - ps.httpWg.Wait() - close(done) - }() - - // Wait for remaining connections to finish or - // force them all to close after timeout - select { - case <-time.After(ps.graceTimeout): - case <-done: - } - } - - // Close the listener now; this stops the server without delay - ps.m.Lock() - // We might not have started and initialized the full set of servers - if ps.server != nil { - err = ps.server.Shutdown(context.Background()) - } - - ps.m.Unlock() - return -} - -// Address together with Stop() implement caddy.GracefulServer. -func (ps *ProbeServer) Address() string { return ps.Addr } - -// ServeProbe 是每一个prober控制请求的入口 -// It acts as a multiplexer for the requests zonename as -// defined in the request so that the correct zone -// (configuration and plugin stack) will handle the request. -func (ps *ProbeServer) ServeProbe(ctx context.Context, w http.ResponseWriter, req *http.Request) (error, string) { - if !ps.debug { - defer func() { - // In case the user doesn't enable error plugin, we still - // need to make sure that we stay alive up here - if rec := recover(); rec != nil { - if ps.stacktrace { - olog.Errorf("Recovered from panic in server: %q %v\n%s", ps.Addr, rec, string(debug.Stack())) - } else { - olog.Errorf("Recovered from panic in server: %q %v", ps.Addr, rec) - } - vars.Panic.Inc() - errorAndMetricsFunc(ps.Addr, w, "ProbeServer-ServeHTTP-Error", http.StatusInternalServerError) - } - }() - } - - // Wrap the response writer in a ScrubWriter so we automatically make the reply fit in the client's buffer. - //w = request.NewScrubWriter(r, w) - - // 获取请求参数 - param, _ := ohttp.ParseRequest(req) - //var ( - // off int - // end bool - // dshandler *PBConfig - //) - // 用于探测的客户端 - c := new(dns.Client) - msg := new(dns.Msg) - - pcf := ps.conf - if pcf.pluginChain == nil { // can not get any plugins - errorAndMetricsFunc(ps.Addr, w, "探测器缺少插件链", http.StatusNotImplemented) - return errors.New("探测器缺少插件链"), "探测器缺少插件链" - } - - if pcf.metaCollector != nil { - // Collect metadata now, so it can be used before we send a request down the plugin chain. - ctx = pcf.metaCollector.Collect(ctx, request.HTTPRequest{Req: req, W: w}) - } - - // If all filter funcs pass, use this config. - if passAllFilterFuncs(ctx, pcf.FilterFuncs, &request.HTTPRequest{Req: req, W: w}) { - if pcf.ViewName != "" { - // if there was a view defined for this Config, set the view name in the context - ctx = context.WithValue(ctx, ViewKey{}, pcf.ViewName) - } - ctx = context.WithValue(ctx, "httpparam", param) - rcode, _ := pcf.pluginChain.ProbeDNS(ctx, c, msg) - if !plugin.HTTPClientWrite(rcode) { - errorAndMetricsFunc(ps.Addr, w, pcf.pluginChain.Name()+" 错误", rcode) - } - return errors.New(pcf.pluginChain.Name() + " 错误"), strconv.Itoa(rcode) - } - - // 都不匹配,尝试利用“.”指向的服务块 - //if z, ok := ps.zones["."]; ok { - // - // for _, h := range z { - // if h.pluginChain == nil { - // continue - // } - // - // if h.metaCollector != nil { - // // Collect metadata now, so it can be used before we send a request down the plugin chain. - // ctx = h.metaCollector.Collect(ctx, request.HTTPRequest{Req: req, W: w}) - // } - // - // // If all filter funcs pass, use this config. - // if passAllFilterFuncs(ctx, h.FilterFuncs, &request.HTTPRequest{Req: req, W: w}) { - // if h.ViewName != "" { - // // if there was a view defined for this Config, set the view name in the context - // ctx = context.WithValue(ctx, ViewKey{}, h.ViewName) - // } - // rcode, _ := h.pluginChain.ProbeDNS(ctx, c, msg) - // if !plugin.ClientWrite(rcode) { - // errorAndMetricsFunc(ps.Addr, w, " . --"+h.pluginChain.Name()+"错误", rcode) - // } - // return - // } - // } - //} - - //// Still here? Error out with REFUSED. - //errorAndMetricsFunc(ps.Addr, w, "请求存在错误", http.StatusForbidden) - return nil, "成功" -} - -// passAllFilterFuncs returns true if all filter funcs evaluate to true for the given request -func passAllFilterFuncs(ctx context.Context, filterFuncs []FilterFunc, req *request.HTTPRequest) bool { - for _, ff := range filterFuncs { - if !ff(ctx, req) { - return false - } - } - return true -} - -// OnStartupComplete lists the sites served by this server -// and any relevant information, assuming Quiet is false. -func (ps *ProbeServer) OnStartupComplete() { - if Quiet { - return - } - - out := startUpZones(transport.PROBER+"://", ps.Addr) - if out != "" { - fmt.Print(out) - } -} - -// Tracer returns the tracer in the server if defined. -func (ps *ProbeServer) Tracer() ot.Tracer { - if ps.trace == nil { - return nil - } - - return ps.trace.Tracer() -} - -// errorAndMetricsFunc 通过HTTP返回错误信息,并记录到Metrics中 -func errorAndMetricsFunc(server string, w http.ResponseWriter, rs string, rc int) { - defer vars.HTTPResponsesCount.WithLabelValues(server, http.StatusText(rc)).Inc() - w.WriteHeader(rc) - r := &response{Code: http.StatusInternalServerError, Msg: rs} - msg, _ := json.Marshal(r) - w.Write(msg) - return - -} - -const ( - tcp = 0 - - tcpMaxQueries = -1 -) - -type ( - // Key is the context key for the current server added to the context. - Key struct{} - - // LoopKey is the context key to detect server wide loops. - LoopKey struct{} - - // ViewKey is the context key for the current view, if defined - ViewKey struct{} -) - -// EnableChaos is a map with plugin names for which we should open CH class queries as we block these by default. -var EnableChaos = map[string]struct{}{ - "chaos": {}, - "forward": {}, - "proxy": {}, -} - -// Quiet mode will not show any informative output on initialization. -var Quiet bool diff --git a/core/prober/prober_args.go b/core/prober/prober_args.go new file mode 100644 index 0000000..dede273 --- /dev/null +++ b/core/prober/prober_args.go @@ -0,0 +1,9 @@ +package prober + +const ( + globalRange = "globe" +) + +const ( + rangeParam = "prange" +) diff --git a/core/prober/prober_http.go b/core/prober/prober_http.go index dc9d794..b1da528 100644 --- a/core/prober/prober_http.go +++ b/core/prober/prober_http.go @@ -31,7 +31,7 @@ type ProberHTTP struct { httpServer *http.Server listenAddr net.Addr validRequest func(*http.Request) bool - proberlist *prober.ProberList + proberlist *prober.ProberAndGoroutList m sync.Mutex } @@ -81,7 +81,7 @@ func NewProberHTTP(addr string, conf *PBConfig) (*ProberHTTP, error) { ErrorLog: stdlog.New(&loggerAdapter{}, "", 0), } - pl := &prober.ProberList{ + pl := &prober.ProberAndGoroutList{ Pl: make(map[int]*prober.Prober), } sh := &ProberHTTP{ diff --git a/core/prober/prober_serve.go b/core/prober/prober_serve.go new file mode 100644 index 0000000..2a7e770 --- /dev/null +++ b/core/prober/prober_serve.go @@ -0,0 +1,439 @@ +package prober + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/coredns/caddy" + "github.com/miekg/dns" + ot "github.com/opentracing/opentracing-go" + "net" + "net/http" + "ohmydns2/plugin" + ohttp "ohmydns2/plugin/pkg/http" + olog "ohmydns2/plugin/pkg/log" + "ohmydns2/plugin/pkg/prober" + "ohmydns2/plugin/pkg/request" + "ohmydns2/plugin/pkg/reuseport" + "ohmydns2/plugin/pkg/trace" + "ohmydns2/plugin/pkg/transport" + "ohmydns2/plugin/prometheus/vars" + "runtime" + "runtime/debug" + "strconv" + "sync" + "time" +) + +// ProbeServer represents an instance of a server, which serves +// DNS requests at a particular address (host and port). A +// server is capable of serving numerous zones on +// the same address and the listener may be stopped for +// graceful termination (POSIX only). +type ProbeServer struct { + Addr string // Address we listen on + + server *http.Server // http服务 + m sync.Mutex // protects the servers + + conf *PBConfig // zones keyed by their port + httpWg sync.WaitGroup // used to wait on outstanding connections + graceTimeout time.Duration // the maximum duration of a graceful shutdown + trace trace.Trace // the trace plugin for the server + debug bool // disable recover() + stacktrace bool // enable stacktrace in recover error log + classChaos bool // allow non-INET class queries + idleTimeout time.Duration // Idle timeout for TCP + readTimeout time.Duration // Read timeout for TCP + writeTimeout time.Duration // Write timeout for TCP + + tsigSecret map[string]string +} + +// response 是Prober控制响应的抽象 +type response struct { + Code int `json:"code"` + Msg string `json:"msg"` +} + +// MetadataCollector is a plugin that can retrieve metadata functions from all metadata providing plugins +type ProberMetadataCollector interface { + Collect(context.Context, request.HTTPRequest) context.Context +} + +// NewServer returns a new OhmyDNS2 probe server and compiles all plugins in to it. +func NewServer(addr string, conf *PBConfig) (*ProbeServer, error) { + s := &ProbeServer{ + Addr: addr, + graceTimeout: 5 * time.Second, + idleTimeout: 10 * time.Second, + readTimeout: 3 * time.Second, + writeTimeout: 5 * time.Second, + tsigSecret: make(map[string]string), + } + olog.Infof("服务启动,监听地址: %v", addr) + + // We have to bound our wg with one increment + // to prevent a "race condition" that is hard-coded + // into sync.WaitGroup.Wait() - basically, an add + // with a positive delta must be guaranteed to + // occur before Wait() is called on the wg. + // In a way, this kind of acts as a safety barrier. + s.httpWg.Add(1) + + if conf.Debug { + s.debug = true + olog.D.Set() + } + s.stacktrace = conf.Stacktrace + + // append the config to the zone's configs + s.conf = conf + + // set timeouts + if conf.ReadTimeout != 0 { + s.readTimeout = conf.ReadTimeout + } + if conf.WriteTimeout != 0 { + s.writeTimeout = conf.WriteTimeout + } + if conf.IdleTimeout != 0 { + s.idleTimeout = conf.IdleTimeout + } + + //// copy tsig secrets + //for key, secret := range conf.TsigSecret { + // s.tsigSecret[key] = secret + //} + + // compile custom plugin for everything + var stack plugin.Prober + for i := len(conf.Plugin) - 1; i >= 0; i-- { + stack = conf.Plugin[i](stack) + + // register the *handler* also + conf.registerProber(stack) + + // If the current plugin is a MetadataCollector, bookmark it for later use. This loop traverses the plugin + // list backwards, so the first MetadataCollector plugin wins. + if mdc, ok := stack.(ProberMetadataCollector); ok { + conf.metaCollector = mdc + } + + if s.trace == nil && stack.Name() == "trace" { + // we have to stash away the plugin, not the + // Tracer object, because the Tracer won't be initialized yet + if t, ok := stack.(trace.Trace); ok { + s.trace = t + } + } + // Unblock CH class queries when any of these plugins are loaded. + if _, ok := EnableChaos[stack.Name()]; ok { + s.classChaos = true + } + conf.pluginChain = stack + } + + if !s.debug { + // When reloading we need to explicitly disable debug logging if it is now disabled. + olog.D.Clear() + } + + return s, nil +} + +// Compile-time check to ensure Server implements the caddy.GracefulServer interface +var _ caddy.GracefulServer = &ProbeServer{} + +// Serve starts the server with an existing listener. It blocks until the server stops. +// This implements caddy.TCPServer interface. +func (ps *ProbeServer) Serve(l net.Listener) error { + ps.m.Lock() + + ps.server = &http.Server{ + Addr: l.Addr().String(), + Handler: http.HandlerFunc(func(writer http.ResponseWriter, r *http.Request) { + ctx := context.WithValue(context.Background(), Key{}, ps) + ctx = context.WithValue(ctx, LoopKey{}, 0) + ps.ServeProbe(ctx, writer, r) + }), + DisableGeneralOptionsHandler: false, + ReadTimeout: ps.readTimeout, + WriteTimeout: ps.writeTimeout, + IdleTimeout: ps.idleTimeout} + //&dns.Server{Listener: l, + //Net: "tcp", + //TsigSecret: ps.tsigSecret, + //MaxTCPQueries: tcpMaxQueries, + //ReadTimeout: ps.readTimeout, + //WriteTimeout: ps.writeTimeout, + //IdleTimeout: func() time.Duration { + // return ps.idleTimeout + //}, + //Handler: dns.HandlerFunc(func(w dns.ResponseWriter, r *dns.Msg) { + // ctx := context.WithValue(context.Background(), Key{}, ps) + // ctx = context.WithValue(ctx, LoopKey{}, 0) + // s.ServeDNS(ctx, w, r) + //})} + + ps.m.Unlock() + + return ps.server.ListenAndServe() +} + +// ServePacket starts the server with an existing packetconn. It blocks until the server stops. +// This implements caddy.UDPServer interface. +func (ps *ProbeServer) ServePacket(p net.PacketConn) error { + return nil +} + +// Listen implements caddy.TCPServer interface. +func (ps *ProbeServer) Listen() (net.Listener, error) { + l, err := reuseport.Listen("tcp", ps.Addr[len(transport.DNS+"://"):]) + if err != nil { + return nil, err + } + return l, nil +} + +// WrapListener Listen implements caddy.GracefulServer interface. +func (ps *ProbeServer) WrapListener(ln net.Listener) net.Listener { + return ln +} + +// ListenPacket implements caddy.UDPServer interface. +func (ps *ProbeServer) ListenPacket() (net.PacketConn, error) { + p, err := reuseport.ListenPacket("udp", ps.Addr[len(transport.DNS+"://"):]) + if err != nil { + return nil, err + } + + return p, nil +} + +// Stop stops the server. It blocks until the server is +// totally stopped. On POSIX systems, it will wait for +// connections to close (up to a max timeout of a few +// seconds); on Windows it will close the listener +// immediately. +// This implements Caddy.Stopper interface. +func (ps *ProbeServer) Stop() (err error) { + if runtime.GOOS != "windows" { + // force connections to close after timeout + done := make(chan struct{}) + go func() { + ps.httpWg.Done() // decrement our initial increment used as a barrier + ps.httpWg.Wait() + close(done) + }() + + // Wait for remaining connections to finish or + // force them all to close after timeout + select { + case <-time.After(ps.graceTimeout): + case <-done: + } + } + + // Close the listener now; this stops the server without delay + ps.m.Lock() + // We might not have started and initialized the full set of servers + if ps.server != nil { + err = ps.server.Shutdown(context.Background()) + } + + ps.m.Unlock() + return +} + +// Address together with Stop() implement caddy.GracefulServer. +func (ps *ProbeServer) Address() string { return ps.Addr } + +// ServeProbe 是每一个prober控制请求的入口 +// It acts as a multiplexer for the requests zonename as +// defined in the request so that the correct zone +// (configuration and plugin stack) will handle the request. +func (ps *ProbeServer) ServeProbe(ctx context.Context, w http.ResponseWriter, req *http.Request) (error, string) { + if !ps.debug { + defer func() { + // In case the user doesn't enable error plugin, we still + // need to make sure that we stay alive up here + if rec := recover(); rec != nil { + if ps.stacktrace { + olog.Errorf("Recovered from panic in server: %q %v\n%s", ps.Addr, rec, string(debug.Stack())) + } else { + olog.Errorf("Recovered from panic in server: %q %v", ps.Addr, rec) + } + vars.Panic.Inc() + errorAndMetricsFunc(ps.Addr, w, "ProbeServer-ServeHTTP-Error", http.StatusInternalServerError) + } + }() + } + + // Wrap the response writer in a ScrubWriter so we automatically make the reply fit in the client's buffer. + //w = request.NewScrubWriter(r, w) + + // 获取请求参数 + param, _ := ohttp.ParseRequest(req) + //var ( + // off int + // end bool + // dshandler *PBConfig + //) + // 用于探测的客户端 + c := new(dns.Client) + + pcf := ps.conf + if pcf.pluginChain == nil { // can not get any plugins + errorAndMetricsFunc(ps.Addr, w, "探测器缺少插件链", http.StatusNotImplemented) + return errors.New("探测器缺少插件链"), "探测器缺少插件链" + } + if pcf.metaCollector != nil { + // Collect metadata now, so it can be used before we send a request down the plugin chain. + ctx = pcf.metaCollector.Collect(ctx, request.HTTPRequest{Req: req, W: w}) + } + // 生成目标,开始探测 + targets := getTarget(param[rangeParam]) + for { + msg := new(dns.Msg) + t, ok := <-targets + if !ok { + break + } + // If all filter funcs pass, use this config. + if passAllFilterFuncs(ctx, pcf.FilterFuncs, &request.HTTPRequest{Req: req, W: w}) { + if pcf.ViewName != "" { + // if there was a view defined for this Config, set the view name in the context + ctx = context.WithValue(ctx, ViewKey{}, pcf.ViewName) + } + // 将所有的http请求参数传入上下文 + ctx = context.WithValue(ctx, Paramkey, param) + // 将目标IP传入上下文 + ctx = context.WithValue(ctx, Target, t) + rcode, _ := pcf.pluginChain.ProbeDNS(ctx, c, msg) + if !plugin.HTTPClientWrite(rcode) { + errorAndMetricsFunc(ps.Addr, w, pcf.pluginChain.Name()+" 错误", rcode) + return errors.New(pcf.pluginChain.Name() + " 错误"), strconv.Itoa(rcode) + } + + } + } + + // 都不匹配,尝试利用“.”指向的服务块 + //if z, ok := ps.zones["."]; ok { + // + // for _, h := range z { + // if h.pluginChain == nil { + // continue + // } + // + // if h.metaCollector != nil { + // // Collect metadata now, so it can be used before we send a request down the plugin chain. + // ctx = h.metaCollector.Collect(ctx, request.HTTPRequest{Req: req, W: w}) + // } + // + // // If all filter funcs pass, use this config. + // if passAllFilterFuncs(ctx, h.FilterFuncs, &request.HTTPRequest{Req: req, W: w}) { + // if h.ViewName != "" { + // // if there was a view defined for this Config, set the view name in the context + // ctx = context.WithValue(ctx, ViewKey{}, h.ViewName) + // } + // rcode, _ := h.pluginChain.ProbeDNS(ctx, c, msg) + // if !plugin.ClientWrite(rcode) { + // errorAndMetricsFunc(ps.Addr, w, " . --"+h.pluginChain.Name()+"错误", rcode) + // } + // return + // } + // } + //} + + //// Still here? Error out with REFUSED. + //errorAndMetricsFunc(ps.Addr, w, "请求存在错误", http.StatusForbidden) + return nil, "成功" +} + +// passAllFilterFuncs returns true if all filter funcs evaluate to true for the given request +func passAllFilterFuncs(ctx context.Context, filterFuncs []FilterFunc, req *request.HTTPRequest) bool { + for _, ff := range filterFuncs { + if !ff(ctx, req) { + return false + } + } + return true +} + +// OnStartupComplete lists the sites served by this server +// and any relevant information, assuming Quiet is false. +func (ps *ProbeServer) OnStartupComplete() { + if Quiet { + return + } + + out := startUpZones(transport.PROBER+"://", ps.Addr) + if out != "" { + fmt.Print(out) + } +} + +// Tracer returns the tracer in the server if defined. +func (ps *ProbeServer) Tracer() ot.Tracer { + if ps.trace == nil { + return nil + } + + return ps.trace.Tracer() +} + +// errorAndMetricsFunc 通过HTTP返回错误信息,并记录到Metrics中 +func errorAndMetricsFunc(server string, w http.ResponseWriter, rs string, rc int) { + defer vars.HTTPResponsesCount.WithLabelValues(server, http.StatusText(rc)).Inc() + w.WriteHeader(rc) + r := &response{Code: http.StatusInternalServerError, Msg: rs} + msg, _ := json.Marshal(r) + w.Write(msg) + return + +} + +func getTarget(s []string) chan net.IP { + if s[0] == globalRange { + // 全球探测 + return prober.GenGlobIPv4() + } + // 局部探测 + ipchan := make(chan net.IP, 100) + go func() { + defer close(ipchan) + for _, v := range s { + ipchan <- net.ParseIP(v) + } + }() + return ipchan +} + +const ( + Paramkey = "httpparam" + Target = "targetip" +) + +type ( + // Key is the context key for the current server added to the context. + Key struct{} + + // LoopKey is the context key to detect server wide loops. + LoopKey struct{} + + // ViewKey is the context key for the current view, if defined + ViewKey struct{} +) + +// EnableChaos is a map with plugin names for which we should open CH class queries as we block these by default. +var EnableChaos = map[string]struct{}{ + "chaos": {}, + "forward": {}, + "proxy": {}, +} + +// Quiet mode will not show any informative output on initialization. +var Quiet bool @@ -10,6 +10,7 @@ require ( github.com/farsightsec/golang-framestream v0.3.0 github.com/miekg/dns v1.1.54 github.com/opentracing/opentracing-go v1.2.0 + github.com/panjf2000/ants/v2 v2.8.1 github.com/pochard/commons v1.1.2 github.com/prometheus/client_golang v1.15.1 golang.org/x/sys v0.10.0 @@ -10,6 +10,7 @@ github.com/coredns/caddy v1.1.1 h1:2eYKZT7i6yxIfGP3qLJoJ7HAsDJqYB+X68g4NYjSrE0= github.com/coredns/caddy v1.1.1/go.mod h1:A6ntJQlAWuQfFlsd9hvigKbo2WS0VUs2l1e2F+BawD4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dnstap/golang-dnstap v0.4.0 h1:KRHBoURygdGtBjDI2w4HifJfMAhhOqDuktAokaSa234= github.com/dnstap/golang-dnstap v0.4.0/go.mod h1:FqsSdH58NAmkAvKcpyxht7i4FoBjKu8E4JUPt8ipSUs= github.com/farsightsec/golang-framestream v0.3.0 h1:/spFQHucTle/ZIPkYqrfshQqPe2VQEzesH243TjIwqA= @@ -39,6 +40,8 @@ github.com/miekg/dns v1.1.54 h1:5jon9mWcb0sFJGpnI99tOMhCPyJ+RPVz5b63MQG0VWI= github.com/miekg/dns v1.1.54/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ= +github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pochard/commons v1.1.2 h1:65SlPrtLqJgCboQitD72Wrdw7xsGJ2wD6HS1hUpk6pc= @@ -52,8 +55,13 @@ github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= @@ -66,6 +74,7 @@ golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -94,4 +103,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/plugin/pkg/prober/prober.go b/plugin/pkg/prober/prober.go index 99d390f..d34160b 100644 --- a/plugin/pkg/prober/prober.go +++ b/plugin/pkg/prober/prober.go @@ -3,6 +3,7 @@ package prober import ( "context" "github.com/miekg/dns" + "github.com/panjf2000/ants/v2" "math" "net" olog "ohmydns2/plugin/pkg/log" @@ -11,18 +12,19 @@ import ( "time" ) -// 探测器列表 -type ProberList struct { - Pl map[int]*Prober // 探测器 +// 探测器和协程状态列表 +type ProberAndGoroutList struct { + Pl map[int]*Prober // 探测器 + GRPool ants.Pool } // 获取当前正在运行的探测器数量 -func (pl ProberList) GetNum() int { +func (pl *ProberAndGoroutList) GetNum() int { return len(pl.Pl) } // 增加一个探测器,并返回对应的pid -func (pl ProberList) AddProber(arg map[string][]string) (string, error) { +func (pl *ProberAndGoroutList) AddProber(arg map[string][]string) (string, error) { //当前时间戳,作为探测器ID t := time.Now().Unix() p := NewProber() @@ -42,7 +44,7 @@ func (pl ProberList) AddProber(arg map[string][]string) (string, error) { } // 列举所有探测器信息 -func (pl ProberList) ListAllProber() (int, map[int]Prober, error) { +func (pl *ProberAndGoroutList) ListAllProber() (int, map[int]Prober, error) { rm := make(map[int]Prober) for k, v := range pl.Pl { rm[k] = *v @@ -51,7 +53,7 @@ func (pl ProberList) ListAllProber() (int, map[int]Prober, error) { return pl.GetNum(), rm, nil } -func (pl ProberList) DeleteProberById(pid int) error { +func (pl *ProberAndGoroutList) DeleteProberById(pid int) error { err := pl.Pl[pid].Stop() delete(pl.Pl, pid) if err != nil { @@ -61,7 +63,7 @@ func (pl ProberList) DeleteProberById(pid int) error { } // 删除所有的运行的探测器 -func (pl ProberList) DeleteProber() error { +func (pl *ProberAndGoroutList) DeleteProber() error { for k, _ := range pl.Pl { err := pl.DeleteProberById(k) if err != nil { @@ -163,3 +165,6 @@ func (p *Prober) Probev64(ip net.IP) error { } return nil } + +var proberNum = 2000 +var goroutinPool, _ = ants.NewPool(proberNum) diff --git a/plugin/pkg/prober/proberutil.go b/plugin/pkg/prober/proberutil.go index 619f5f6..b00148c 100644 --- a/plugin/pkg/prober/proberutil.go +++ b/plugin/pkg/prober/proberutil.go @@ -61,7 +61,7 @@ func MakeProbev64(ip net.IP) string { return fmt.Sprintf("c1.rip%v.%v.%v.%v.", ipstr, strings.ToLower(randstr.RandomAlphanumeric(5)), defaultStartsubv64, defaultTarget) } -func MakeTestProbev64(subv64 string,targetzone string) string { +func MakeTestProbev64(subv64 string, targetzone string) string { ipstr := ip2Eid(net.ParseIP("0.0.0.0")) return fmt.Sprintf("c1.rip%v.%v.%v.%v", ipstr, strings.ToLower(randstr.RandomAlphanumeric(5)), subv64, targetzone) } diff --git a/plugin/prober/qname/qname.go b/plugin/prober/qname/qname.go new file mode 100644 index 0000000..81645bb --- /dev/null +++ b/plugin/prober/qname/qname.go @@ -0,0 +1,76 @@ +package qname + +import ( + "context" + "errors" + "github.com/miekg/dns" + "github.com/pochard/commons/randstr" + "ohmydns2/core/prober" + "ohmydns2/plugin" + "strconv" + "strings" +) + +type Qname struct { + next plugin.Prober + setFromFile string +} + +func (q Qname) ProbeDNS(ctx context.Context, c *dns.Client, msg *dns.Msg) (int, error) { + param := ctx.Value(prober.Paramkey).(map[string][]string) + // api中设定了qname参数,则以该参数为准 + if v, ok := param["qname"]; ok { + msg.Question[0].Name = generateQname(v[0]) + // 交由下一个插件处理 + return plugin.PNextOrFailure(q.Name(), q.next, ctx, c, msg) + } + if q.setFromFile != "" { + // api中未设定,读取配置文件 + msg.Question[0].Name = generateQname(q.setFromFile) + return plugin.PNextOrFailure(q.Name(), q.next, ctx, c, msg) + } + return 2, errors.New(q.Name() + ": 未指定参数") + +} + +func (q Qname) Name() string { + return "qname" +} + +func generateQname(arg string) string { + args := strings.Split(arg, ".") + qname := "" + for _, v := range args { + // 含有( { [的视为函数部分 + p, ok := intersectionStringsList(v, funcList) + if ok { + i := strings.Index(v, p) //进入循环表明一定有匹配 + if f, fok := funcMap[v[:i]]; fok { + qname += f(v[i+1:len(v)-1]) + "." + } + } + // 不含直接添加 + qname += v + "." + } + return qname +} + +func rand(d string) string { + num, _ := strconv.Atoi(d) + return strings.ToLower(randstr.RandomAlphanumeric(num)) +} + +// 起始水印标识生成 +func genMask(d string) string { + if d == "null" { + return + } +} + +var funcList = []string{"(", ")", "{", "}", "[", "]"} + +// 表达式中的函数与实际处理函数的映射 +var funcMap = map[string]func(str string) string{ + "rand": rand, + "smask": genMask, +} diff --git a/plugin/prober/qname/qnameutil.go b/plugin/prober/qname/qnameutil.go new file mode 100644 index 0000000..177d6c8 --- /dev/null +++ b/plugin/prober/qname/qnameutil.go @@ -0,0 +1,13 @@ +package qname + +import "strings" + +// intersectionStringsList 判断字符串内是否有sublist中的元素,并返回匹配的元素 +func intersectionStringsList(str string, sublist []string) (string, bool) { + for _, item := range sublist { + if strings.Contains(str, item) { + return item, true + } + } + return "", false +} diff --git a/plugin/prober/qname/setup.go b/plugin/prober/qname/setup.go new file mode 100644 index 0000000..ddf6f23 --- /dev/null +++ b/plugin/prober/qname/setup.go @@ -0,0 +1,23 @@ +package qname + +import ( + "github.com/coredns/caddy" + "ohmydns2/core/prober" + "ohmydns2/plugin" +) + +func init() { plugin.ProbeRegister("qname", setup) } + +func setup(c *caddy.Controller) error { + qname := new(Qname) + // 存在参数,加载到结构体中 + if c.NextArg() { + qname.setFromFile = c.RemainingArgs()[0] + } + + prober.GetPBConfig(c).AddPlugin(func(next plugin.Prober) plugin.Prober { + return Qname{} + }) + + return nil +} diff --git a/plugin/register.go b/plugin/register.go index e25c4dc..b3b7a14 100644 --- a/plugin/register.go +++ b/plugin/register.go @@ -13,7 +13,7 @@ func Register(name string, action caddy.SetupFunc) { // ProbeRegister registers your plugin with OhmyDNS2 and allows it to be called when the prober is running. func ProbeRegister(name string, action caddy.SetupFunc) { caddy.RegisterPlugin(name, caddy.Plugin{ - ServerType: "dns", + ServerType: "dnsprober", Action: action, }) } |
