Skip to content

Commit

Permalink
add rpcs for stream p dir
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaji-kharse committed Mar 4, 2025
1 parent 8b19abf commit 57ab93a
Show file tree
Hide file tree
Showing 21 changed files with 1,803 additions and 1,072 deletions.
3 changes: 2 additions & 1 deletion dgraph/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/hypermodeinc/dgraph/v24/dgraph/cmd/migrate"
"github.com/hypermodeinc/dgraph/v24/dgraph/cmd/version"
"github.com/hypermodeinc/dgraph/v24/dgraph/cmd/zero"
dgraphimport "github.com/hypermodeinc/dgraph/v24/dgraph_import"
"github.com/hypermodeinc/dgraph/v24/upgrade"
"github.com/hypermodeinc/dgraph/v24/x"
)
Expand Down Expand Up @@ -74,7 +75,7 @@ var rootConf = viper.New()
var subcommands = []*x.SubCommand{
&bulk.Bulk, &cert.Cert, &conv.Conv, &live.Live, &alpha.Alpha, &zero.Zero, &version.Version,
&debug.Debug, &migrate.Migrate, &debuginfo.DebugInfo, &upgrade.Upgrade, &decrypt.Decrypt, &increment.Increment,
&checkupgrade.CheckUpgrade,
&checkupgrade.CheckUpgrade, &dgraphimport.Import,
}

func initCmds() {
Expand Down
4 changes: 4 additions & 0 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ func run() {
x.WorkerConfig.MyAddr = fmt.Sprintf("localhost:%d", x.PortZeroGrpc+opts.portOffset)
}

if x.WorkerConfig.MyGrpcAddr == "" {
x.WorkerConfig.MyGrpcAddr = fmt.Sprintf("localhost:%d", x.Config.PortOffset+x.PortGrpc)
}

nodeId := opts.raft.GetUint64("idx")
if nodeId == 0 {
log.Fatalf("ERROR: raft.idx flag cannot be 0. Please set idx to a unique positive integer.")
Expand Down
15 changes: 15 additions & 0 deletions dgraph/cmd/zero/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ func (s *Server) MoveTablet(ctx context.Context, req *pb.MoveTabletRequest) (*pb
req.DstGroup)}, nil
}

func (s *Server) ApplyDrainmode(ctx context.Context, req *pb.Drainmode) (*pb.Status, error) {
knownGroups := s.KnownGroups()

for _, grp := range knownGroups {
pl := s.Leader(grp)
wc := pb.NewWorkerClient(pl.Get())
in := &pb.Drainmode{State: req.State}

if status, err := wc.ApplyDrainmode(ctx, in); err != nil {
return status, errors.Wrapf(err, "while applying drainmode")
}
}
return nil, nil
}

// movePredicate is the main entry point for move predicate logic. This Zero must remain the leader
// for the entire duration of predicate move. If this Zero stops being the leader, the final
// proposal of reassigning the tablet to the destination would fail automatically.
Expand Down
49 changes: 49 additions & 0 deletions dgraph_import/dgraph_import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package dgraphimport

import (
"context"
"fmt"
"os"

"github.com/hypermodeinc/dgraph/v24/edgraph"
"github.com/hypermodeinc/dgraph/v24/x"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var (
Import x.SubCommand
)

func init() {
Import.Cmd = &cobra.Command{
Use: "import",
Short: "Run the import tool",
Run: func(cmd *cobra.Command, args []string) {
run()
},
Annotations: map[string]string{"group": "tool"},
}
Import.Cmd.SetHelpTemplate(x.NonRootTemplate)
}

func run() {
if err := importP(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}

func importP() error {
client, err := edgraph.NewImportClient("localhost:9080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}

alphas, err := client.InitiateSnapShotStream(context.Background())

Check failure on line 43 in dgraph_import/dgraph_import.go

View check run for this annotation

Trunk.io / Trunk Check

golangci-lint(ineffassign)

[new] ineffectual assignment to err
if err := client.StreamSnapshot(context.Background(), "/home/shiva/workspace/dgraph-work/stream_data/out", alphas.LeaderAlphas); err != nil {
fmt.Println("error is---------", err)
}

return nil
}
54 changes: 40 additions & 14 deletions dgraphapi/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"time"

"github.com/pkg/errors"
"google.golang.org/protobuf/encoding/protojson"

"github.com/dgraph-io/dgo/v240"
"github.com/dgraph-io/dgo/v240/protos/api"
"github.com/hypermodeinc/dgraph/v24/graphql/schema"
"github.com/hypermodeinc/dgraph/v24/protos/pb"
"github.com/hypermodeinc/dgraph/v24/x"
)

Expand Down Expand Up @@ -57,12 +59,13 @@ type HttpToken struct {
// HTTPClient allows doing operations on Dgraph over http
type HTTPClient struct {
*HttpToken
adminURL string
graphqlURL string
licenseURL string
stateURL string
dqlURL string
dqlMutateUrl string
adminURL string
graphqlURL string
licenseURL string
stateURL string
dqlURL string
dqlMutateUrl string
alphaStateURL string
}

// GraphQLParams are used for making graphql requests to dgraph
Expand Down Expand Up @@ -656,7 +659,7 @@ func (hc *HTTPClient) ApplyLicenseGraphQL(license []byte) ([]byte, error) {
return hc.RunGraphqlQuery(params, true)
}

func (hc *HTTPClient) GetZeroState() (*LicenseResponse, error) {
func (hc *HTTPClient) GetLicenseInfo() (*LicenseResponse, error) {
response, err := http.Get(hc.stateURL)
if err != nil {
return nil, errors.Wrap(err, "error getting zero state http response")
Expand All @@ -679,6 +682,27 @@ func (hc *HTTPClient) GetZeroState() (*LicenseResponse, error) {
return &stateResponse, nil
}

func (hc *HTTPClient) GetAlphaState() (*pb.MembershipState, error) {
req, err := http.NewRequest(http.MethodGet, hc.alphaStateURL, nil)
if err != nil {
return nil, err
}
if hc.HttpToken != nil {
req.Header.Add("X-Dgraph-AccessToken", hc.AccessJwt)
}

resp, err := DoReq(req)
if err != nil {
return nil, err
}

var state pb.MembershipState
if err = protojson.Unmarshal(resp, &state); err != nil {
return nil, err
}
return &state, err
}

func (hc *HTTPClient) PostDqlQuery(query string) ([]byte, error) {
req, err := http.NewRequest(http.MethodPost, hc.dqlURL, bytes.NewBufferString(query))
if err != nil {
Expand Down Expand Up @@ -795,13 +819,15 @@ func GetHttpClient(alphaUrl, zeroUrl string) (*HTTPClient, error) {
stateUrl := "http://" + zeroUrl + "/state"
dqlUrl := "http://" + alphaUrl + "/query"
dqlMutateUrl := "http://" + alphaUrl + "/mutate"
alphaStateUrl := "http://" + alphaUrl + "/state"
return &HTTPClient{
adminURL: adminUrl,
graphqlURL: graphQLUrl,
licenseURL: licenseUrl,
stateURL: stateUrl,
dqlURL: dqlUrl,
dqlMutateUrl: dqlMutateUrl,
HttpToken: &HttpToken{},
adminURL: adminUrl,
graphqlURL: graphQLUrl,
licenseURL: licenseUrl,
stateURL: stateUrl,
dqlURL: dqlUrl,
dqlMutateUrl: dqlMutateUrl,
alphaStateURL: alphaStateUrl,
HttpToken: &HttpToken{},
}, nil
}
6 changes: 5 additions & 1 deletion dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func (c *LocalCluster) serverURL(server, endpoint string) (string, error) {
if err != nil {
return "", err
}
url := "0.0.0.0:" + pubPort + endpoint
url := GetLocalHostUrl(pubPort, endpoint)
return url, nil
}

Expand Down Expand Up @@ -1190,3 +1190,7 @@ func (c *LocalCluster) GetZeroGrpcPublicPort() (string, error) {
func (c *LocalCluster) GetTempDir() string {
return c.tempBinDir
}

func GetLocalHostUrl(pubPort, endpoint string) string {
return "0.0.0.0:" + pubPort + endpoint
}
138 changes: 138 additions & 0 deletions edgraph/import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package edgraph

import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"strconv"

"github.com/dgraph-io/badger/v4"
apiv25 "github.com/dgraph-io/dgo/v240/protos/api.v25"
"github.com/dgraph-io/ristretto/v2/z"
"github.com/golang/glog"
"google.golang.org/grpc"
)

type Client struct {
opts grpc.DialOption
dg apiv25.DgraphHMClient
}

func NewImportClient(endpoint string, opts grpc.DialOption) (*Client, error) {
conn, err := grpc.NewClient(endpoint, opts)
if err != nil {
return nil, fmt.Errorf("failed to connect to endpoint [%s]: %w", endpoint, err)
}

return &Client{dg: apiv25.NewDgraphHMClient(conn), opts: opts}, nil
}

func (c *Client) InitiateSnapShotStream(ctx context.Context) (*apiv25.InitiateSnapShotStreamResponse, error) {
req := &apiv25.InitiateSnapShotStreamRequest{}
return c.dg.InitiateSnapShotStream(ctx, req)
}
func (c *Client) StreamSnapshot(ctx context.Context, pDir string, alphas map[uint32]string) error {
groupDirs, err := getPDiPdirrectories(pDir)
if err != nil {
return fmt.Errorf("Error getting p directories: %v", err)
}

for key, leader := range alphas {
pDir, exists := groupDirs[key-1]
if !exists {
fmt.Printf("No p directory found for group %d, skipping...\n", key)
continue
}

if _, err := os.Stat(pDir); os.IsNotExist(err) {
fmt.Printf("p directory does not exist: %s, skipping...\n", pDir)
continue
}

conn, err := grpc.NewClient(leader, c.opts)
if err != nil {
return fmt.Errorf("Failed to connect to leader %s: %v", leader, err)
}
defer conn.Close()

dg := apiv25.NewDgraphHMClient(conn)
err = stream(ctx, dg, pDir)
if err != nil {
return err
}
}

return nil
}

func stream(ctx context.Context, dg apiv25.DgraphHMClient, pdir string) error {
out, err := dg.StreamPSnapshot(ctx)
if err != nil {
return fmt.Errorf("failed to start snapshot stream: %w", err)
}

opt := badger.DefaultOptions(pdir)
ps, err := badger.OpenManaged(opt)
if err != nil {
return fmt.Errorf("failed to open BadgerDB: %w", err)
}
defer ps.Close()

stream := ps.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Sending P dir"
stream.KeyToList = nil
stream.Send = func(buf *z.Buffer) error {
kvs := &apiv25.KVS{Data: buf.Bytes()}
if err := out.Send(kvs); err != nil {
return fmt.Errorf("failed to send data: %w", err)
}
return nil
}

if err := stream.Orchestrate(ctx); err != nil {
return fmt.Errorf("stream orchestration failed: %w", err)
}

done := &apiv25.KVS{
Done: true,
Predicates: []string{},
Types: []string{},
}
if err := out.Send(done); err != nil {
return fmt.Errorf("failed to send 'done' signal: %w", err)
}

fmt.Println("Snapshot writes DONE. Sending ACK")
ack, err := out.CloseAndRecv()
if err != nil {
return fmt.Errorf("failed to receive ACK: %w", err)
}
glog.Infof("Received ACK with message: %v\n", ack.Message)

return nil
}

func getPDiPdirrectories(basePath string) (map[uint32]string, error) {
groupDirs := make(map[uint32]string)

entries, err := os.ReadDir(basePath)
if err != nil {
return nil, err
}

for _, entry := range entries {
if entry.IsDir() {
groupID, err := strconv.Atoi(entry.Name())
if err == nil {
pDir := filepath.Join(basePath, entry.Name(), "p")
if _, err := os.Stat(pDir); err == nil {
groupDirs[uint32(groupID)] = pDir

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an integer with architecture-dependent bit size from
strconv.Atoi
to a lower bit size type uint32 without an upper bound check.
}
}
}
}

return groupDirs, nil
}
Loading

0 comments on commit 57ab93a

Please sign in to comment.