Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream bulk #9308

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dgraph/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/hypermodeinc/dgraph/v24/dgraph/cmd/migrate"
"github.com/hypermodeinc/dgraph/v24/dgraph/cmd/version"
"github.com/hypermodeinc/dgraph/v24/dgraph/cmd/zero"
dgraphimport "github.com/hypermodeinc/dgraph/v24/dgraph_import"
"github.com/hypermodeinc/dgraph/v24/upgrade"
"github.com/hypermodeinc/dgraph/v24/x"
)
Expand Down Expand Up @@ -74,7 +75,7 @@ var rootConf = viper.New()
var subcommands = []*x.SubCommand{
&bulk.Bulk, &cert.Cert, &conv.Conv, &live.Live, &alpha.Alpha, &zero.Zero, &version.Version,
&debug.Debug, &migrate.Migrate, &debuginfo.DebugInfo, &upgrade.Upgrade, &decrypt.Decrypt, &increment.Increment,
&checkupgrade.CheckUpgrade,
&checkupgrade.CheckUpgrade, &dgraphimport.Import,
}

func initCmds() {
Expand Down
4 changes: 4 additions & 0 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ func run() {
x.WorkerConfig.MyAddr = fmt.Sprintf("localhost:%d", x.PortZeroGrpc+opts.portOffset)
}

if x.WorkerConfig.MyGrpcAddr == "" {
x.WorkerConfig.MyGrpcAddr = fmt.Sprintf("localhost:%d", x.Config.PortOffset+x.PortGrpc)
}

nodeId := opts.raft.GetUint64("idx")
if nodeId == 0 {
log.Fatalf("ERROR: raft.idx flag cannot be 0. Please set idx to a unique positive integer.")
Expand Down
15 changes: 15 additions & 0 deletions dgraph/cmd/zero/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ func (s *Server) MoveTablet(ctx context.Context, req *pb.MoveTabletRequest) (*pb
req.DstGroup)}, nil
}

func (s *Server) ApplyDrainmode(ctx context.Context, req *pb.Drainmode) (*pb.Status, error) {
knownGroups := s.KnownGroups()

for _, grp := range knownGroups {
pl := s.Leader(grp)
wc := pb.NewWorkerClient(pl.Get())
in := &pb.Drainmode{State: req.State}

if status, err := wc.ApplyDrainmode(ctx, in); err != nil {
return status, errors.Wrapf(err, "while applying drainmode")
}
}
return nil, nil
}

// movePredicate is the main entry point for move predicate logic. This Zero must remain the leader
// for the entire duration of predicate move. If this Zero stops being the leader, the final
// proposal of reassigning the tablet to the destination would fail automatically.
Expand Down
49 changes: 49 additions & 0 deletions dgraph_import/dgraph_import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package dgraphimport

import (
"context"
"fmt"
"os"

"github.com/hypermodeinc/dgraph/v24/edgraph"
"github.com/hypermodeinc/dgraph/v24/x"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var (
Import x.SubCommand
)

func init() {
Import.Cmd = &cobra.Command{
Use: "import",
Short: "Run the import tool",
Run: func(cmd *cobra.Command, args []string) {
run()
},
Annotations: map[string]string{"group": "tool"},
}
Import.Cmd.SetHelpTemplate(x.NonRootTemplate)
}

func run() {
if err := importP(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}

func importP() error {
client, err := edgraph.NewImportClient("localhost:9080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}

alphas, err := client.InitiateSnapShotStream(context.Background())

Check failure on line 43 in dgraph_import/dgraph_import.go

View check run for this annotation

Trunk.io / Trunk Check

golangci-lint(ineffassign)

[new] ineffectual assignment to err
if err := client.StreamSnapshot(context.Background(), "/home/shiva/workspace/dgraph-work/stream_data/out", alphas.LeaderAlphas); err != nil {
fmt.Println("error is---------", err)
}

return nil
}
54 changes: 40 additions & 14 deletions dgraphapi/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"time"

"github.com/pkg/errors"
"google.golang.org/protobuf/encoding/protojson"

"github.com/dgraph-io/dgo/v240"
"github.com/dgraph-io/dgo/v240/protos/api"
"github.com/hypermodeinc/dgraph/v24/graphql/schema"
"github.com/hypermodeinc/dgraph/v24/protos/pb"
"github.com/hypermodeinc/dgraph/v24/x"
)

Expand Down Expand Up @@ -57,12 +59,13 @@ type HttpToken struct {
// HTTPClient allows doing operations on Dgraph over http
type HTTPClient struct {
*HttpToken
adminURL string
graphqlURL string
licenseURL string
stateURL string
dqlURL string
dqlMutateUrl string
adminURL string
graphqlURL string
licenseURL string
stateURL string
dqlURL string
dqlMutateUrl string
alphaStateURL string
}

// GraphQLParams are used for making graphql requests to dgraph
Expand Down Expand Up @@ -656,7 +659,7 @@ func (hc *HTTPClient) ApplyLicenseGraphQL(license []byte) ([]byte, error) {
return hc.RunGraphqlQuery(params, true)
}

func (hc *HTTPClient) GetZeroState() (*LicenseResponse, error) {
func (hc *HTTPClient) GetLicenseInfo() (*LicenseResponse, error) {
response, err := http.Get(hc.stateURL)
if err != nil {
return nil, errors.Wrap(err, "error getting zero state http response")
Expand All @@ -679,6 +682,27 @@ func (hc *HTTPClient) GetZeroState() (*LicenseResponse, error) {
return &stateResponse, nil
}

func (hc *HTTPClient) GetAlphaState() (*pb.MembershipState, error) {
req, err := http.NewRequest(http.MethodGet, hc.alphaStateURL, nil)
if err != nil {
return nil, err
}
if hc.HttpToken != nil {
req.Header.Add("X-Dgraph-AccessToken", hc.AccessJwt)
}

resp, err := DoReq(req)
if err != nil {
return nil, err
}

var state pb.MembershipState
if err = protojson.Unmarshal(resp, &state); err != nil {
return nil, err
}
return &state, err
}

func (hc *HTTPClient) PostDqlQuery(query string) ([]byte, error) {
req, err := http.NewRequest(http.MethodPost, hc.dqlURL, bytes.NewBufferString(query))
if err != nil {
Expand Down Expand Up @@ -795,13 +819,15 @@ func GetHttpClient(alphaUrl, zeroUrl string) (*HTTPClient, error) {
stateUrl := "http://" + zeroUrl + "/state"
dqlUrl := "http://" + alphaUrl + "/query"
dqlMutateUrl := "http://" + alphaUrl + "/mutate"
alphaStateUrl := "http://" + alphaUrl + "/state"
return &HTTPClient{
adminURL: adminUrl,
graphqlURL: graphQLUrl,
licenseURL: licenseUrl,
stateURL: stateUrl,
dqlURL: dqlUrl,
dqlMutateUrl: dqlMutateUrl,
HttpToken: &HttpToken{},
adminURL: adminUrl,
graphqlURL: graphQLUrl,
licenseURL: licenseUrl,
stateURL: stateUrl,
dqlURL: dqlUrl,
dqlMutateUrl: dqlMutateUrl,
alphaStateURL: alphaStateUrl,
HttpToken: &HttpToken{},
}, nil
}
6 changes: 5 additions & 1 deletion dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func (c *LocalCluster) serverURL(server, endpoint string) (string, error) {
if err != nil {
return "", err
}
url := "0.0.0.0:" + pubPort + endpoint
url := GetLocalHostUrl(pubPort, endpoint)
return url, nil
}

Expand Down Expand Up @@ -1190,3 +1190,7 @@ func (c *LocalCluster) GetZeroGrpcPublicPort() (string, error) {
func (c *LocalCluster) GetTempDir() string {
return c.tempBinDir
}

func GetLocalHostUrl(pubPort, endpoint string) string {
return "0.0.0.0:" + pubPort + endpoint
}
138 changes: 138 additions & 0 deletions edgraph/import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package edgraph

import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"strconv"

"github.com/dgraph-io/badger/v4"
apiv25 "github.com/dgraph-io/dgo/v240/protos/api.v25"
"github.com/dgraph-io/ristretto/v2/z"
"github.com/golang/glog"
"google.golang.org/grpc"
)

type Client struct {
opts grpc.DialOption
dg apiv25.DgraphHMClient
}

func NewImportClient(endpoint string, opts grpc.DialOption) (*Client, error) {
conn, err := grpc.NewClient(endpoint, opts)
if err != nil {
return nil, fmt.Errorf("failed to connect to endpoint [%s]: %w", endpoint, err)
}

return &Client{dg: apiv25.NewDgraphHMClient(conn), opts: opts}, nil
}

func (c *Client) InitiateSnapShotStream(ctx context.Context) (*apiv25.InitiateSnapShotStreamResponse, error) {
req := &apiv25.InitiateSnapShotStreamRequest{}
return c.dg.InitiateSnapShotStream(ctx, req)
}
func (c *Client) StreamSnapshot(ctx context.Context, pDir string, alphas map[uint32]string) error {
groupDirs, err := getPDiPdirrectories(pDir)
if err != nil {
return fmt.Errorf("Error getting p directories: %v", err)
}

for key, leader := range alphas {
pDir, exists := groupDirs[key-1]
if !exists {
fmt.Printf("No p directory found for group %d, skipping...\n", key)
continue
}

if _, err := os.Stat(pDir); os.IsNotExist(err) {
fmt.Printf("p directory does not exist: %s, skipping...\n", pDir)
continue
}

conn, err := grpc.NewClient(leader, c.opts)
if err != nil {
return fmt.Errorf("Failed to connect to leader %s: %v", leader, err)
}
defer conn.Close()

dg := apiv25.NewDgraphHMClient(conn)
err = stream(ctx, dg, pDir)
if err != nil {
return err
}
}

return nil
}

func stream(ctx context.Context, dg apiv25.DgraphHMClient, pdir string) error {
out, err := dg.StreamPSnapshot(ctx)
if err != nil {
return fmt.Errorf("failed to start snapshot stream: %w", err)
}

opt := badger.DefaultOptions(pdir)
ps, err := badger.OpenManaged(opt)
if err != nil {
return fmt.Errorf("failed to open BadgerDB: %w", err)
}
defer ps.Close()

stream := ps.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Sending P dir"
stream.KeyToList = nil
stream.Send = func(buf *z.Buffer) error {
kvs := &apiv25.KVS{Data: buf.Bytes()}
if err := out.Send(kvs); err != nil {
return fmt.Errorf("failed to send data: %w", err)
}
return nil
}

if err := stream.Orchestrate(ctx); err != nil {
return fmt.Errorf("stream orchestration failed: %w", err)
}

done := &apiv25.KVS{
Done: true,
Predicates: []string{},
Types: []string{},
}
if err := out.Send(done); err != nil {
return fmt.Errorf("failed to send 'done' signal: %w", err)
}

fmt.Println("Snapshot writes DONE. Sending ACK")
ack, err := out.CloseAndRecv()
if err != nil {
return fmt.Errorf("failed to receive ACK: %w", err)
}
glog.Infof("Received ACK with message: %v\n", ack.Message)

return nil
}

func getPDiPdirrectories(basePath string) (map[uint32]string, error) {
groupDirs := make(map[uint32]string)

entries, err := os.ReadDir(basePath)
if err != nil {
return nil, err
}

for _, entry := range entries {
if entry.IsDir() {
groupID, err := strconv.Atoi(entry.Name())
if err == nil {
pDir := filepath.Join(basePath, entry.Name(), "p")
if _, err := os.Stat(pDir); err == nil {
groupDirs[uint32(groupID)] = pDir

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an integer with architecture-dependent bit size from
strconv.Atoi
to a lower bit size type uint32 without an upper bound check.

Copilot Autofix AI 7 days ago

To fix the problem, we need to ensure that the value parsed from the directory name is within the valid range for uint32 before performing the conversion. We can achieve this by using strconv.ParseUint with a bit size of 32, which directly parses the string into a uint32 value. This approach avoids the need for an additional bounds check.

Suggested changeset 1
edgraph/import.go

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/edgraph/import.go b/edgraph/import.go
--- a/edgraph/import.go
+++ b/edgraph/import.go
@@ -126,3 +126,3 @@
 		if entry.IsDir() {
-			groupID, err := strconv.Atoi(entry.Name())
+			groupID, err := strconv.ParseUint(entry.Name(), 10, 32)
 			if err == nil {
EOF
@@ -126,3 +126,3 @@
if entry.IsDir() {
groupID, err := strconv.Atoi(entry.Name())
groupID, err := strconv.ParseUint(entry.Name(), 10, 32)
if err == nil {
Copilot is powered by AI and may make mistakes. Always verify output.
Positive Feedback
Negative Feedback

Provide additional feedback

Please help us improve GitHub Copilot by sharing more details about this comment.

Please select one or more of the options
}
}
}
}

return groupDirs, nil
}
Loading
Loading