From ee13b31a917cb369ae9eaaeb779a02381fa1f3bc Mon Sep 17 00:00:00 2001 From: ww9xlll Date: Tue, 5 Nov 2024 17:17:04 +0800 Subject: [PATCH] opensearch support --- go.mod | 7 +- go.sum | 30 ++++---- storage/es/base.go | 6 +- storage/es/opensearch.go | 150 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 170 insertions(+), 23 deletions(-) create mode 100644 storage/es/opensearch.go diff --git a/go.mod b/go.mod index cb3c42c..af3cd88 100644 --- a/go.mod +++ b/go.mod @@ -16,19 +16,17 @@ require ( github.com/milvus-io/milvus-sdk-go v1.1.1 github.com/milvus-io/milvus-sdk-go/v2 v2.4.2-0.20240821073850-0c84a71df153 github.com/minio/minio-go/v7 v7.0.66 - github.com/olivere/elastic/v7 v7.0.32 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/patrickmn/go-cache v2.1.0+incompatible - github.com/satori/go.uuid v1.2.0 github.com/shopspring/decimal v1.3.1 github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.18.2 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 github.com/swaggo/files v1.0.1 github.com/swaggo/gin-swagger v1.6.0 github.com/swaggo/swag v1.16.2 github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.853 - github.com/tidwall/gjson v1.17.0 + github.com/tidwall/gjson v1.17.1 github.com/uber/jaeger-client-go v2.30.0+incompatible go.uber.org/atomic v1.11.0 go.uber.org/zap v1.26.0 @@ -98,6 +96,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/opensearch-project/opensearch-go/v4 v4.3.0 github.com/pelletier/go-toml/v2 v2.1.1 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/go.sum b/go.sum index 65bcaa5..0d66f27 100644 --- a/go.sum +++ b/go.sum @@ -47,8 +47,6 @@ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZe github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= -github.com/congqixia/milvus-sdk-go/v2 v2.0.0-20240819072515-04d118f1639d h1:JkYXo7c1r/ncfAqt32clf196c1mVr3aucCOIVTVjKDs= -github.com/congqixia/milvus-sdk-go/v2 v2.0.0-20240819072515-04d118f1639d/go.mod h1:kQ7SuDWugDhCqEHIvokitjVO9CyATIFa+h1dHEk86Zc= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -70,8 +68,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= -github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= @@ -185,14 +181,10 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbWfGI= github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.4.6 h1:rSkwp5Mg/7KBSUqXcrPBUgTQGZNdvYWEKB+rHo9YJtk= -github.com/milvus-io/milvus-proto/go-api/v2 v2.4.6/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/milvus-proto/go-api/v2 v2.4.10-0.20240819025435-512e3b98866a h1:0B/8Fo66D8Aa23Il0yrQvg1KKz92tE/BJ5BvkUxxAAk= github.com/milvus-io/milvus-proto/go-api/v2 v2.4.10-0.20240819025435-512e3b98866a/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/milvus-sdk-go v1.1.1 h1:QseeGBb92T4ny5jSscaWJ+toG74p5zE0kaxFyeskWSE= github.com/milvus-io/milvus-sdk-go v1.1.1/go.mod h1:jyYc5vzEQS9V+ZrSL2h/oeKcYiixvKBV/TrxXbLu01w= -github.com/milvus-io/milvus-sdk-go/v2 v2.4.2-0.20240819094545-542b5a0158c1 h1:tmPS7djDZ0VrXmgp/UbWKDFwsBuIVTjF+p4jF4Pd3KI= -github.com/milvus-io/milvus-sdk-go/v2 v2.4.2-0.20240819094545-542b5a0158c1/go.mod h1:kQ7SuDWugDhCqEHIvokitjVO9CyATIFa+h1dHEk86Zc= github.com/milvus-io/milvus-sdk-go/v2 v2.4.2-0.20240821073850-0c84a71df153 h1:3kIbxhjuXuVUMOnJI3R1oPkfM4IIRKs7W1tH1spnLnk= github.com/milvus-io/milvus-sdk-go/v2 v2.4.2-0.20240821073850-0c84a71df153/go.mod h1:ulO1YUXKH0PGg50q27grw048GDY9ayB4FPmh7D+FFTA= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= @@ -209,8 +201,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E= -github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k= +github.com/opensearch-project/opensearch-go/v4 v4.3.0 h1:gmQ+ILFJW6AJimivf+lHGVqCS2SCr/PBBf2Qr1xOCgE= +github.com/opensearch-project/opensearch-go/v4 v4.3.0/go.mod h1:+w6KAvEX3S0fVVmZciNLN0CkXhxxem26+F6Y7DoPp04= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= @@ -240,8 +232,6 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= -github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= -github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -265,8 +255,9 @@ github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMV github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -277,8 +268,9 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/swaggo/files v1.0.1 h1:J1bVJ4XHZNq0I46UU90611i9/YzdrF7x92oX1ig5IdE= @@ -289,19 +281,23 @@ github.com/swaggo/swag v1.16.2 h1:28Pp+8DkQoV+HLzLx8RGJZXNGKbFqnuvSbAAtoxiY04= github.com/swaggo/swag v1.16.2/go.mod h1:6YzXnDcpr0767iOejs318CwYkCQqyGer6BizOg03f+E= github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.853 h1:TNYjF1jDLLNTirAkq7zRT9iF9xC2ZjgwpXsVSEBQvgQ= github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.853/go.mod h1:r5r4xbfxSaeR04b166HGsBa/R4U3SueirEUpXGuw+Q0= -github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM= -github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= +github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +github.com/wI2L/jsondiff v0.6.0 h1:zrsH3FbfVa3JO9llxrcDy/XLkYPLgoMX6Mz3T2PP2AI= +github.com/wI2L/jsondiff v0.6.0/go.mod h1:D6aQ5gKgPF9g17j+E9N7aasmU1O+XvfmWm1y8UMmNpw= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= @@ -368,8 +364,6 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= -golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/storage/es/base.go b/storage/es/base.go index 773f5db..75d89b8 100644 --- a/storage/es/base.go +++ b/storage/es/base.go @@ -2,16 +2,18 @@ package es import ( "bytes" + "io" + "github.com/tidwall/gjson" "github.com/zilliztech/milvus-migration/core/config" "github.com/zilliztech/milvus-migration/core/type/estype" "github.com/zilliztech/milvus-migration/internal/log" "go.uber.org/zap" - "io" ) const VER7 = "7" const VER8 = "8" +const VER_OS = "o" type ESServerClient interface { InitScroll(idxCfg *estype.IdxCfg, batchSize int) (*SearchRes, error) @@ -45,6 +47,8 @@ func CreateESClient(esCfg *config.ESConfig) (*ESClient, error) { esClient.Cli, err = NewES7ServerCli(esCfg) case VER8: esClient.Cli, err = NewES8ServerCli(esCfg) + case VER_OS: + esClient.Cli, err = NewOpenSearchServerCli(esCfg) default: log.Warn("ES version not contain, will use default sdk version", zap.String("Version", esCfg.Version)) esClient.Cli, err = NewES8ServerCli(esCfg) diff --git a/storage/es/opensearch.go b/storage/es/opensearch.go new file mode 100644 index 0000000..6700cd3 --- /dev/null +++ b/storage/es/opensearch.go @@ -0,0 +1,150 @@ +package es + +import ( + "context" + "crypto/tls" + "encoding/json" + "errors" + "net/http" + "time" + + "github.com/opensearch-project/opensearch-go/v4" + "github.com/opensearch-project/opensearch-go/v4/opensearchapi" + "github.com/tidwall/gjson" + "github.com/zilliztech/milvus-migration/core/common" + "github.com/zilliztech/milvus-migration/core/config" + "github.com/zilliztech/milvus-migration/core/type/estype" + "github.com/zilliztech/milvus-migration/internal/log" + "go.uber.org/zap" +) + +type OpenSearchServerClient struct { + _client *opensearchapi.Client +} + +func (os *OpenSearchServerClient) Close(scrollId string) error { + if os._client != nil { + os._client.Scroll.Delete(context.Background(), opensearchapi.ScrollDeleteReq{ScrollIDs: []string{scrollId}}) + } + return nil +} + +func NewOpenSearchServerCli(esConfig *config.ESConfig) (ESServerClient, error) { + osCli, err := _createOpenSearchClient(esConfig) + if err != nil { + return nil, err + } + return &OpenSearchServerClient{ + _client: osCli, + }, nil +} + +func _createOpenSearchClient(esConfig *config.ESConfig) (*opensearchapi.Client, error) { + cfg := opensearchapi.Config{ + Client: opensearch.Config{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + Addresses: esConfig.Urls, + Username: esConfig.Username, + Password: esConfig.Password, + }, + } + esClient, err := opensearchapi.NewClient(cfg) + if err != nil { + log.Error("new ES _client error", zap.Error(err)) + } + return esClient, err +} + +func (os *OpenSearchServerClient) InitScroll(idxCfg *estype.IdxCfg, batchSize int) (*SearchRes, error) { + err := os.Count(idxCfg) + if err != nil { + return nil, err + } + + log.Info("start scrolling index", zap.String("index", idxCfg.Index), + zap.Int("BatchSize", batchSize)) + + resp, err := os._client.Search(context.Background(), &opensearchapi.SearchReq{ + Indices: []string{idxCfg.Index}, + Params: opensearchapi.SearchParams{ + Size: &batchSize, + Sort: []string{"_doc"}, + Scroll: time.Minute, + SourceIncludes: getFieldNames(idxCfg), + }, + }) + if err != nil { + log.Error("init es search err", zap.Error(err)) + return nil, err + } + return os.packResult(resp.Hits.Hits, *resp.ScrollID) +} + +func (os *OpenSearchServerClient) NextScroll(scrollID string) (*SearchRes, error) { + + var start time.Time + if common.DEBUG { + start = time.Now() + } + + resp, err := os._client.Scroll.Get(context.Background(), opensearchapi.ScrollGetReq{ + ScrollID: scrollID, + Params: opensearchapi.ScrollGetParams{ + Scroll: time.Minute, + }, + }) + + if common.DEBUG { + log.Info("[ES] 1 NextScroll data ======>", zap.Float64("Cost", time.Since(start).Seconds())) + start = time.Now() + } + + if err != nil { + log.Error("next scroll Error", zap.Error(err)) + return nil, err + } + res, err := os.packResult(resp.Hits.Hits, *resp.ScrollID) + if common.DEBUG { + log.Debug("[ES] 2.NextScroll data pack to Result =====>", zap.Float64("Cost", time.Since(start).Seconds())) + } + return res, err +} + +func (os *OpenSearchServerClient) Count(idxCfg *estype.IdxCfg) error { + resp, err := os._client.Indices.Count(context.Background(), &opensearchapi.IndicesCountReq{ + Indices: []string{idxCfg.Index}, + }) + if err != nil { + log.Error("Count ES Index Response Error", + zap.String("Index", idxCfg.Index), zap.Error(err)) + return err + } + log.Info("[Count ES Info]", zap.String("Index", idxCfg.Index), zap.Int("CountInfo", resp.Count)) + count := resp.Count + if count <= 0 { + log.Warn("Count ES data is empty", zap.String("Index", idxCfg.Index)) + return errors.New("Count ES data is empty, Index:" + idxCfg.Index) + } + idxCfg.Rows = int64(count) + return nil +} + +func (os *OpenSearchServerClient) packResult(hits []opensearchapi.SearchHit, newScrollID string) (*SearchRes, error) { + isFinish := false + if len(hits) <= 0 { + log.Info("Finished scrolling") + isFinish = true + } + bytes, err := json.Marshal(hits) + if err != nil { + log.Error("ES response error", zap.ByteString("Response", bytes)) + return nil, err + } + return &SearchRes{ + ScrollId: newScrollID, + Hits: gjson.Parse(string(bytes)), + IsEmpty: isFinish, + }, nil +}