Skip to content

Commit

Permalink
using disk storage for large kv
Browse files Browse the repository at this point in the history
  • Loading branch information
Mzack9999 committed Mar 1, 2024
1 parent 6abff96 commit 4d29bff
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 36 deletions.
7 changes: 7 additions & 0 deletions cmd/integration-test/library.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/projectdiscovery/nuclei/v3/pkg/reporting"
"github.com/projectdiscovery/nuclei/v3/pkg/testutils"
"github.com/projectdiscovery/nuclei/v3/pkg/types"
"github.com/projectdiscovery/nuclei/v3/pkg/utils/storage"
"github.com/projectdiscovery/ratelimit"
)

Expand Down Expand Up @@ -99,6 +100,11 @@ func executeNucleiAsLibrary(templatePath, templateURL string) ([]string, error)
catalog := disk.NewCatalog(path.Join(home, "nuclei-templates"))
ratelimiter := ratelimit.New(context.Background(), 150, time.Second)
defer ratelimiter.Stop()
storage, err := storage.New()
if err != nil {
return nil, errors.Wrap(err, "could not create storage")
}
defer storage.Close()
executerOpts := protocols.ExecutorOptions{
Output: outputWriter,
Options: defaultOpts,
Expand All @@ -110,6 +116,7 @@ func executeNucleiAsLibrary(templatePath, templateURL string) ([]string, error)
HostErrorsCache: cache,
Colorizer: aurora.NewAurora(true),
ResumeCfg: types.NewResumeCfg(),
Storage: storage,
}
engine := core.New(defaultOpts)
engine.SetExecuterOptions(executerOpts)
Expand Down
13 changes: 13 additions & 0 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/projectdiscovery/nuclei/v3/pkg/types"
"github.com/projectdiscovery/nuclei/v3/pkg/utils"
"github.com/projectdiscovery/nuclei/v3/pkg/utils/stats"
"github.com/projectdiscovery/nuclei/v3/pkg/utils/storage"
"github.com/projectdiscovery/nuclei/v3/pkg/utils/yaml"
"github.com/projectdiscovery/retryablehttp-go"
ptrutil "github.com/projectdiscovery/utils/ptr"
Expand Down Expand Up @@ -83,6 +84,7 @@ type Runner struct {
hostErrors hosterrorscache.CacheInterface
resumeCfg *types.ResumeCfg
pprofServer *http.Server
storage *storage.Storage
// pdcp auto-save options
pdcpUploadErrMsg string
}
Expand Down Expand Up @@ -315,6 +317,13 @@ func New(options *types.Options) (*Runner, error) {
} else {
runner.rateLimiter = ratelimit.NewUnlimited(context.Background())
}

if storage, err := storage.New(); err != nil {
gologger.Error().Msgf("Could not create storage: %s", err)
} else {
runner.storage = storage
}

return runner, nil
}

Expand Down Expand Up @@ -349,6 +358,9 @@ func (r *Runner) Close() {
if r.browser != nil {
r.browser.Close()
}
if r.storage != nil {
r.storage.Close()
}
}

// setupPDCPUpload sets up the PDCP upload writer
Expand Down Expand Up @@ -420,6 +432,7 @@ func (r *Runner) RunEnumeration() error {
ResumeCfg: r.resumeCfg,
ExcludeMatchers: excludematchers.New(r.options.ExcludeMatchers),
InputHelper: input.NewHelper(),
Storage: r.storage,
}

if r.options.ShouldUseHostError() {
Expand Down
1 change: 1 addition & 0 deletions lib/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func createEphemeralObjects(base *NucleiEngine, opts *types.Options) (*unsafeOpt
HostErrorsCache: base.hostErrCache,
Colorizer: aurora.NewAurora(true),
ResumeCfg: types.NewResumeCfg(),
Storage: base.storage,
}
if opts.RateLimitMinute > 0 {
u.executerOpts.RateLimiter = ratelimit.New(context.Background(), uint(opts.RateLimitMinute), time.Minute)
Expand Down
3 changes: 3 additions & 0 deletions lib/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/projectdiscovery/nuclei/v3/pkg/templates"
"github.com/projectdiscovery/nuclei/v3/pkg/templates/signer"
"github.com/projectdiscovery/nuclei/v3/pkg/types"
"github.com/projectdiscovery/nuclei/v3/pkg/utils/storage"
"github.com/projectdiscovery/ratelimit"
"github.com/projectdiscovery/retryablehttp-go"
errorutil "github.com/projectdiscovery/utils/errors"
Expand Down Expand Up @@ -71,6 +72,7 @@ type NucleiEngine struct {
mode engineMode
browserInstance *engine.Browser
httpClient *retryablehttp.Client
storage *storage.Storage

// unexported meta options
opts *types.Options
Expand Down Expand Up @@ -169,6 +171,7 @@ func (e *NucleiEngine) Close() {
e.customWriter.Close()
e.hostErrCache.Close()
e.executerOpts.RateLimiter.Stop()
e.storage.Close()
}

// ExecuteWithCallback executes templates on targets and calls callback on each result(only if results are found)
Expand Down
8 changes: 8 additions & 0 deletions lib/sdk_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/projectdiscovery/nuclei/v3/pkg/reporting"
"github.com/projectdiscovery/nuclei/v3/pkg/testutils"
"github.com/projectdiscovery/nuclei/v3/pkg/types"
"github.com/projectdiscovery/nuclei/v3/pkg/utils/storage"
"github.com/projectdiscovery/ratelimit"
)

Expand Down Expand Up @@ -141,6 +142,12 @@ func (e *NucleiEngine) init() error {

e.catalog = disk.NewCatalog(config.DefaultConfig.TemplatesDirectory)

if storage, err := storage.New(); err != nil {
return err
} else {
e.storage = storage
}

e.executerOpts = protocols.ExecutorOptions{
Output: e.customWriter,
Options: e.opts,
Expand All @@ -153,6 +160,7 @@ func (e *NucleiEngine) init() error {
Colorizer: aurora.NewAurora(true),
ResumeCfg: types.NewResumeCfg(),
Browser: e.browserInstance,
Storage: e.storage,
}

if e.opts.RateLimitMinute > 0 {
Expand Down
8 changes: 0 additions & 8 deletions pkg/protocols/common/tostring/tostring.go

This file was deleted.

12 changes: 10 additions & 2 deletions pkg/protocols/http/operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,16 @@ func (request *Request) responseToDSLMap(resp *http.Response, host, matched, raw
data["host"] = host
data["type"] = request.Type().String()
data["matched"] = matched
data["request"] = rawReq
data["response"] = rawResp
if hash, err := request.options.Storage.SetString(rawReq); err == nil {
data["request"] = hash
} else {
data["request"] = rawReq
}
if hash, err := request.options.Storage.SetString(rawResp); err == nil {
data["request"] = hash
} else {
data["response"] = rawResp
}
data["status_code"] = resp.StatusCode
data["body"] = body
data["all_headers"] = headers
Expand Down
3 changes: 1 addition & 2 deletions pkg/protocols/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/common/helpers/eventcreator"
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/common/helpers/responsehighlighter"
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/common/interactsh"
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/common/tostring"
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/http/httpclientpool"
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/http/httputils"
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/http/signer"
Expand Down Expand Up @@ -770,7 +769,7 @@ func (request *Request) executeRequest(input *contextargs.Context, generatedRequ
// In case of interactsh markers and request times out, still send
// a callback event so in case we receive an interaction, correlation is possible.
// Also, to log failed use-cases.
outputEvent := request.responseToDSLMap(&http.Response{}, input.MetaInput.Input, formedURL, tostring.UnsafeToString(dumpedRequest), "", "", "", 0, generatedRequest.meta)
outputEvent := request.responseToDSLMap(&http.Response{}, input.MetaInput.Input, formedURL, convUtil.String(dumpedRequest), "", "", "", 0, generatedRequest.meta)
if i := strings.LastIndex(hostname, ":"); i != -1 {
hostname = hostname[:i]
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/protocols/offlinehttp/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/common/contextargs"
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/common/generators"
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/common/helpers/eventcreator"
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/common/tostring"
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/utils"
templateTypes "github.com/projectdiscovery/nuclei/v3/pkg/templates/types"
"github.com/projectdiscovery/utils/conversion"
)

var _ protocols.Request = &Request{}
Expand Down Expand Up @@ -60,7 +60,7 @@ func (request *Request) ExecuteWithResults(input *contextargs.Context, metadata
gologger.Error().Msgf("Could not read file path %s: %s\n", data, err)
return
}
dataStr := tostring.UnsafeToString(buffer)
dataStr := conversion.String(buffer)

resp, err := readResponseFromString(dataStr)
if err != nil {
Expand All @@ -86,7 +86,7 @@ func (request *Request) ExecuteWithResults(input *contextargs.Context, metadata
return
}

outputEvent := request.responseToDSLMap(resp, data, data, data, tostring.UnsafeToString(dumpedResponse), tostring.UnsafeToString(body), utils.HeadersToString(resp.Header), 0, nil)
outputEvent := request.responseToDSLMap(resp, data, data, data, conversion.String(dumpedResponse), conversion.String(body), utils.HeadersToString(resp.Header), 0, nil)
// add response fields to template context and merge templatectx variables to output event
request.options.AddTemplateVars(input.MetaInput, request.Type(), request.GetID(), outputEvent)
if request.options.HasTemplateCtx(input.MetaInput) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/protocols/protocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/projectdiscovery/nuclei/v3/pkg/scan"
templateTypes "github.com/projectdiscovery/nuclei/v3/pkg/templates/types"
"github.com/projectdiscovery/nuclei/v3/pkg/types"
"github.com/projectdiscovery/nuclei/v3/pkg/utils/storage"
)

// Optional Callback to update Thread count in payloads across all requests
Expand Down Expand Up @@ -116,6 +117,7 @@ type ExecutorOptions struct {
// based on given logic. by default nuclei reverts to using value of `-c` when threads count
// is not specified or is 0 in template
OverrideThreadsCount PayloadThreadSetterCallback
Storage *storage.Storage
}

// GetThreadsForPayloadRequests returns the number of threads to use as default for
Expand Down
29 changes: 10 additions & 19 deletions pkg/reporting/dedupe/dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ package dedupe
import (
"crypto/sha1"
"os"
"unsafe"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"

"github.com/projectdiscovery/nuclei/v3/pkg/output"
"github.com/projectdiscovery/nuclei/v3/pkg/types"
"github.com/projectdiscovery/utils/conversion"
)

// Storage is a duplicate detecting storage for nuclei scan events.
Expand Down Expand Up @@ -75,29 +75,29 @@ func (s *Storage) Close() {
func (s *Storage) Index(result *output.ResultEvent) (bool, error) {
hasher := sha1.New()
if result.TemplateID != "" {
_, _ = hasher.Write(unsafeToBytes(result.TemplateID))
_, _ = hasher.Write(conversion.Bytes(result.TemplateID))
}
if result.MatcherName != "" {
_, _ = hasher.Write(unsafeToBytes(result.MatcherName))
_, _ = hasher.Write(conversion.Bytes(result.MatcherName))
}
if result.ExtractorName != "" {
_, _ = hasher.Write(unsafeToBytes(result.ExtractorName))
_, _ = hasher.Write(conversion.Bytes(result.ExtractorName))
}
if result.Type != "" {
_, _ = hasher.Write(unsafeToBytes(result.Type))
_, _ = hasher.Write(conversion.Bytes(result.Type))
}
if result.Host != "" {
_, _ = hasher.Write(unsafeToBytes(result.Host))
_, _ = hasher.Write(conversion.Bytes(result.Host))
}
if result.Matched != "" {
_, _ = hasher.Write(unsafeToBytes(result.Matched))
_, _ = hasher.Write(conversion.Bytes(result.Matched))
}
for _, v := range result.ExtractedResults {
_, _ = hasher.Write(unsafeToBytes(v))
_, _ = hasher.Write(conversion.Bytes(v))
}
for k, v := range result.Metadata {
_, _ = hasher.Write(unsafeToBytes(k))
_, _ = hasher.Write(unsafeToBytes(types.ToString(v)))
_, _ = hasher.Write(conversion.Bytes(k))
_, _ = hasher.Write(conversion.Bytes(types.ToString(v)))
}
hash := hasher.Sum(nil)

Expand All @@ -112,12 +112,3 @@ func (s *Storage) Index(result *output.ResultEvent) (bool, error) {
}
return false, err
}

// unsafeToBytes converts a string to byte slice and does it with
// zero allocations.
//
// Reference - https://stackoverflow.com/questions/59209493/how-to-use-unsafe-get-a-byte-slice-from-a-string-without-memory-copy
func unsafeToBytes(data string) []byte {
var buf = (*[]byte)(unsafe.Pointer(&data))
return *buf
}
10 changes: 8 additions & 2 deletions pkg/scan/scan_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ type ScanContext struct {
Input *contextargs.Context

// callbacks or hooks
OnError func(error)
OnResult func(e *output.InternalWrappedEvent)
OnError func(error)
OnResult func(e *output.InternalWrappedEvent)
OnWarning func(string)

// unexported state fields
errors []error
Expand Down Expand Up @@ -82,6 +83,11 @@ func (s *ScanContext) LogWarning(format string, args ...any) {
s.m.Lock()
defer s.m.Unlock()
val := fmt.Sprintf(format, args...)

if s.OnWarning != nil {
s.OnWarning(val)
}

s.warnings = append(s.warnings, val)

for _, e := range s.events {
Expand Down
70 changes: 70 additions & 0 deletions pkg/utils/storage/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package storage

import (
"crypto/sha1"
"encoding/hex"
"os"

"github.com/projectdiscovery/utils/conversion"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)

type Storage struct {
dbPath string
storage *leveldb.DB
}

func New() (*Storage, error) {
storage := &Storage{}

dbPath, err := os.MkdirTemp("", "nuclei-storage-*")
storage.dbPath = dbPath
if err != nil {
return nil, err
}

storage.storage, err = leveldb.OpenFile(dbPath, &opt.Options{})
if err != nil {
return nil, err
}
return storage, nil
}

func (s *Storage) Close() {
s.storage.Close()
os.RemoveAll(s.dbPath)
}

func Hash(v []byte) []byte {
hasher := sha1.New()
_, _ = hasher.Write(v)
return hasher.Sum(nil)
}

func HashString(v []byte) string {
return hex.EncodeToString(v)
}

func HashBytes(v string) []byte {
hash, _ := hex.DecodeString(v)
return hash
}

func (s *Storage) Get(k string) (string, error) {
hash := HashBytes(k)

v, err := s.storage.Get(hash, nil)

return conversion.String(v), err
}

func (s *Storage) SetString(v string) (string, error) {
return s.Set(conversion.Bytes(v))
}

func (s *Storage) Set(v []byte) (string, error) {
hash := Hash(v)

return HashString(hash), s.storage.Put(hash, v, nil)
}

0 comments on commit 4d29bff

Please sign in to comment.