Skip to content

Commit

Permalink
fix(pb): change type of ranges Init and End from int32 to int64
Browse files Browse the repository at this point in the history
  • Loading branch information
leandro-driguez committed Jun 10, 2023
1 parent f452d8f commit b04e788
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 84 deletions.
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
keyHash := utils.GetSha1Hash(data)
dataBytes := []byte(data)
//fmt.Println("data bytes", dataBytes)
err = sender.Send(&pb.StoreData{Key: keyHash, Value: &pb.Data{Init: 0, End: int32(len(dataBytes)), Buffer: dataBytes}})
err = sender.Send(&pb.StoreData{Key: keyHash, Value: &pb.Data{Init: 0, End: int64(len(dataBytes)), Buffer: dataBytes}})
if err != nil {
fmt.Println(err.Error())
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func main() {

buffer := []byte{}
nearestNeighbors := []*pb.Node{}
var init int32 = 0
var init int64 = 0

for {
data, err := receiver.Recv()
Expand Down
9 changes: 7 additions & 2 deletions core/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@ type DHT struct {
}

func (fn *DHT) Store(key []byte, data *[]byte) error {
fmt.Println("I'm inside Store() in DHT and key is:", key)
fmt.Printf("INIT DHT.Store(%v) len(*data)=%d\n", key, len(*data))
defer fmt.Printf("END DHT.Store(%v)\n", key)

fmt.Println("Before Storage.Create()")
err := fn.Storage.Create(key, data)
fmt.Println("After Storage.Create()")
if err != nil {
fmt.Println("ERROR line:23 DHT.Storage.Create()")
return err
}
return nil
}

func (fn *DHT) FindValue(infoHash *[]byte, start int32, end int32) (value *[]byte, neighbors *[]structs.Node) {
func (fn *DHT) FindValue(infoHash *[]byte, start int64, end int64) (value *[]byte, neighbors *[]structs.Node) {
value, err := fn.Storage.Read(*infoHash, start, end)
if err != nil {
//fmt.Println("Find Value error: ", err)
Expand Down
120 changes: 60 additions & 60 deletions core/fullNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"errors"
"fmt"
"log"
"math"
"net"
"sort"
"time"

base58 "github.com/jbenet/go-base58"
"github.com/science-engineering-art/kademlia-grpc/interfaces"
"github.com/science-engineering-art/kademlia-grpc/pb"
"github.com/science-engineering-art/kademlia-grpc/structs"
Expand Down Expand Up @@ -95,21 +97,26 @@ func (fn *FullNode) Ping(ctx context.Context, sender *pb.Node) (*pb.Node, error)
}

func (fn *FullNode) Store(stream pb.FullNode_StoreServer) error {
//fmt.Printf("INIT Store() method\n\n")
fmt.Printf("INIT FullNode.Store()\n\n")
defer fmt.Printf("END FullNode.Store()\n\n")

key := []byte{}
buffer := []byte{}
var init int32 = 0
var init int64 = 0

for {
data, err := stream.Recv()
if data == nil {
//fmt.Printf("END Streaming\n\n")
fmt.Printf("END Streaming\n\n")
break
}
if err != nil {
fmt.Printf("EXIT line:133 Store() method\n\n")
return errors.New("missing chunck")
}

if init == 0 {
//fmt.Printf("INIT Streaming\n\n")
fmt.Printf("INIT Streaming\n\n")
// add the sender to the Routing Table
sender := structs.Node{
ID: data.Sender.ID,
Expand All @@ -124,22 +131,18 @@ func (fn *FullNode) Store(stream pb.FullNode_StoreServer) error {
buffer = append(buffer, data.Value.Buffer...)
init = data.Value.End
} else {
//fmt.Printf("EXIT Store() method\n\n")
return err
}

if err != nil {
//fmt.Printf("EXIT Store() method\n\n")
fmt.Printf("ERROR missing chunck\n\n")
return err
}
fmt.Printf("OKKKK ===> FullNode(%s).Recv(%d, %d)\n", fn.dht.IP, data.Value.Init, data.Value.End)
}
// fmt.Println("Received Data:", buffer)

err := fn.dht.Store(key, &buffer)
if err != nil {
//fmt.Printf("EXIT Store() method\n\n")
fmt.Printf("ERROR line:140 DHT.Store()\n\n")
return err
}
//fmt.Printf("EXIT Store() method\n\n")
return nil
}

Expand Down Expand Up @@ -184,7 +187,7 @@ func (fn *FullNode) FindValue(target *pb.Target, stream pb.FullNode_FindValueSer
KNeartestBuckets: &pb.KBucket{Bucket: []*pb.Node{}},
Value: &pb.Data{
Init: 0,
End: int32(len(*value)),
End: int64(len(*value)),
Buffer: *value,
},
}
Expand All @@ -203,7 +206,6 @@ func (fn *FullNode) FindValue(target *pb.Target, stream pb.FullNode_FindValueSer
///////////////////////////////////////////////////////

func (fn *FullNode) LookUp(target []byte) ([]structs.Node, error) {

sl := fn.dht.RoutingTable.GetClosestContacts(structs.Alpha, target, []*structs.Node{})

contacted := make(map[string]bool)
Expand Down Expand Up @@ -300,86 +302,84 @@ func (fn *FullNode) LookUp(target []byte) ([]structs.Node, error) {
}

func (fn *FullNode) StoreValue(key string, data *[]byte) (string, error) {
fmt.Printf("INIT StoreValue() method\n\n")
fmt.Println("The requested key is:", key)
keyHash := utils.GetSha1Hash(key)
fmt.Println("Keyhash before:", keyHash)
fmt.Printf("INIT FullNode.StoreValue(%s) method\n\n", key)
defer fmt.Printf("EXIT FullNode.StoreValue(%s) method\n\n", key)

keyHash := base58.Decode(key)
nearestNeighbors, err := fn.LookUp(keyHash)
//fmt.Printf("Neartest Neighbors:\n%v\n", nearestNeighbors)
if err != nil {
//fmt.Printf("ERROR LookUP() method\n\n")
//fmt.Printf("EXIT StoreValue() method\n\n")
fmt.Printf("ERROR LookUP() method\n\n")
return "", err
}
fmt.Println("Keyhash after:", keyHash)

if len(nearestNeighbors) < structs.K {
err := fn.dht.Store(keyHash, data)
if err != nil {
//fmt.Printf("ERROR Store(Me)\n\n")
//fmt.Printf("EXIT StoreValue() method\n\n")
return "", nil
fmt.Printf("ERROR DHT.Store(Me)\n\n")
}
//fmt.Printf("EXIT StoreValue() method\n\n")
return key, nil
}

for index, node := range nearestNeighbors {
if index == len(nearestNeighbors)-1 && utils.ClosestNodeToKey(keyHash, fn.dht.ID, node.ID) == -1 {
if index == structs.K-1 && utils.ClosestNodeToKey(keyHash, fn.dht.ID, node.ID) == -1 {
err := fn.dht.Store(keyHash, data)
if err != nil {
//fmt.Printf("ERROR Store(Me)\n\n")
//fmt.Printf("EXIT StoreValue() method\n\n")
return "", nil
fmt.Printf("ERROR DHT.Store(Me)\n\n")
}
//fmt.Printf("EXIT StoreValue() method\n\n")
return key, nil
break
}

client := NewClientNode(node.IP, node.Port)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

sender, err := client.Store(ctx)
if err != nil {
//fmt.Printf("ERROR Store(%v, %d) method", node.IP, node.Port)
fmt.Printf("ERROR Store(%v, %d) method", node.IP, node.Port)
if ctx.Err() == context.DeadlineExceeded {
// Handle timeout error
//fmt.Println("Timeout exceeded")
fmt.Println("Timeout exceeded")
continue
}
fmt.Println(err.Error())
}
//fmt.Println("data bytes", dataBytes)
err = sender.Send(
&pb.StoreData{
Sender: &pb.Node{
ID: fn.dht.ID,
IP: fn.dht.IP,
Port: int32(fn.dht.Port),
},
Key: keyHash,
// leandro_driguez: tiene sentido pasar todo el archivo?
Value: &pb.Data{
Init: 0,
End: int32(len(*data)),
Buffer: *data,
// fmt.Println("data bytes", dataBytes)

for i := 0; i < len(*data); i += 1024 {
j := int(math.Min(float64(i+1024), float64(len(*data))))

err = sender.Send(
&pb.StoreData{
Sender: &pb.Node{
ID: fn.dht.ID,
IP: fn.dht.IP,
Port: int32(fn.dht.Port),
},
Key: keyHash,
Value: &pb.Data{
Init: int64(i),
End: int64(j),
Buffer: (*data)[i:j],
},
},
},
)
if err != nil {
//fmt.Printf("ERROR SendChunck(0, %d) method\n\n", len(*data))
//fmt.Printf("EXIT StoreValue() method\n\n")
return "", err
)
if err != nil {
fmt.Printf("ERROR SendChunck(0, %d) method\n\n", len(*data))
break
// return "", err
}
fmt.Printf("OKKKK ===> FullNode(%s).Send(%d, %d)\n", fn.dht.IP, i, j)
}

}

//fmt.Println("Stored ID: ", key, "Stored Data:", data)
//fmt.Printf("EXIT StoreValue() method\n\n")
// fmt.Println("Stored ID: ", key, "Stored Data:", data)
fmt.Println("===> OKKKK")
return key, nil
}

func (fn *FullNode) GetValue(target string, start int32, end int32) ([]byte, error) {
keyHash := utils.GetSha1Hash(target)
func (fn *FullNode) GetValue(target string, start int64, end int64) ([]byte, error) {
keyHash := base58.Decode(target)

val, err := fn.dht.Storage.Read(keyHash, start, end)
if err == nil {
Expand Down Expand Up @@ -449,7 +449,7 @@ func (fn *FullNode) GetValue(target string, start int32, end int32) ([]byte, err
fmt.Println(err.Error())
continue
}
var init int32 = 0
var init int64 = 0

for {
data, err := receiver.Recv()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/science-engineering-art/kademlia-grpc
go 1.20

require (
github.com/jbenet/go-base58 v0.0.0-20150317085156-6237cf65f3a6
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
gopkg.in/readline.v1 v1.0.0-20160726135117-62c6fe619375
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/jbenet/go-base58 v0.0.0-20150317085156-6237cf65f3a6 h1:4zOlv2my+vf98jT1nQt4bT/yKWUImevYPJ2H344CloE=
github.com/jbenet/go-base58 v0.0.0-20150317085156-6237cf65f3a6/go.mod h1:r/8JmuR0qjuCiEhAolkfvdZgmPiHTnJaG0UXCSeR1Zo=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
Expand Down
2 changes: 1 addition & 1 deletion interfaces/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package interfaces
type Persistence interface {
Create(key []byte, data *[]byte) error

Read(key []byte, start int32, end int32) (data *[]byte, err error)
Read(key []byte, start int64, end int64) (data *[]byte, err error)

Delete(key []byte) error
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func main() {
start, _ := strconv.ParseInt(input[2], 10, 32)
end, _ := strconv.ParseInt(input[3], 10, 32)

value, err := fullNode.GetValue(key, int32(start), int32(end))
value, err := fullNode.GetValue(key, int64(start), int64(end))
if err != nil {
fmt.Println(err.Error())
}
Expand Down
24 changes: 12 additions & 12 deletions pb/kademlia.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b04e788

Please sign in to comment.