Skip to content

Commit

Permalink
feat: store stamp hash (#4717)
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill authored Jul 16, 2024
1 parent 62eed01 commit 289f4c8
Show file tree
Hide file tree
Showing 26 changed files with 786 additions and 186 deletions.
14 changes: 14 additions & 0 deletions pkg/postage/stamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ func (s *Stamp) Clone() swarm.Stamp {
}
}

// Hash returns the hash of the stamp.
func (s *Stamp) Hash() ([]byte, error) {
hasher := swarm.NewHasher()
b, err := s.MarshalBinary()
if err != nil {
return nil, err
}
_, err = hasher.Write(b)
if err != nil {
return nil, err
}
return hasher.Sum(nil), nil
}

// MarshalBinary gives the byte slice serialisation of a stamp:
// batchID[32]|index[8]|timestamp[8]|Signature[65].
func (s *Stamp) MarshalBinary() ([]byte, error) {
Expand Down
99 changes: 76 additions & 23 deletions pkg/pullsync/pb/pullsync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/pullsync/pb/pullsync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ message Get {
message Chunk {
bytes Address = 1;
bytes BatchID = 2;
bytes StampHash = 3;
}

message Offer {
Expand Down
13 changes: 7 additions & 6 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/pullsync/pb"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storage"
storer "github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"resenje.org/singleflight"
)
Expand All @@ -35,7 +35,7 @@ const loggerName = "pullsync"

const (
protocolName = "pullsync"
protocolVersion = "1.3.0"
protocolVersion = "1.4.0"
streamName = "pullsync"
cursorStreamName = "cursors"
)
Expand Down Expand Up @@ -170,6 +170,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start

addr := offer.Chunks[i].Address
batchID := offer.Chunks[i].BatchID
stampHash := offer.Chunks[i].StampHash
if len(addr) != swarm.HashSize {
return 0, 0, fmt.Errorf("inconsistent hash length")
}
Expand All @@ -182,7 +183,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
}
s.metrics.Offered.Inc()
if s.store.IsWithinStorageRadius(a) {
have, err = s.store.ReserveHas(a, batchID)
have, err = s.store.ReserveHas(a, batchID, stampHash)
if err != nil {
s.logger.Debug("storage has", "error", err)
return 0, 0, err
Expand Down Expand Up @@ -377,7 +378,7 @@ func (s *Syncer) makeOffer(ctx context.Context, rn pb.Get) (*pb.Offer, error) {
o.Topmost = top
o.Chunks = make([]*pb.Chunk, 0, len(addrs))
for _, v := range addrs {
o.Chunks = append(o.Chunks, &pb.Chunk{Address: v.Address.Bytes(), BatchID: v.BatchID})
o.Chunks = append(o.Chunks, &pb.Chunk{Address: v.Address.Bytes(), BatchID: v.BatchID, StampHash: v.StampHash})
}
return o, nil
}
Expand Down Expand Up @@ -419,7 +420,7 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*
break LOOP // The stream has been closed.
}

chs = append(chs, &storer.BinC{Address: c.Address, BatchID: c.BatchID})
chs = append(chs, &storer.BinC{Address: c.Address, BatchID: c.BatchID, StampHash: c.StampHash})
if c.BinID > topmost {
topmost = c.BinID
}
Expand Down Expand Up @@ -465,7 +466,7 @@ func (s *Syncer) processWant(ctx context.Context, o *pb.Offer, w *pb.Want) ([]sw
if bv.Get(i) {
ch := o.Chunks[i]
addr := swarm.NewAddress(ch.Address)
c, err := s.store.ReserveGet(ctx, addr, ch.BatchID)
c, err := s.store.ReserveGet(ctx, addr, ch.BatchID, ch.StampHash)
if err != nil {
s.logger.Debug("processing want: unable to find chunk", "chunk_address", addr, "batch_id", hex.EncodeToString(ch.BatchID))
chunks = append(chunks, swarm.NewChunk(swarm.ZeroAddress, nil))
Expand Down
25 changes: 18 additions & 7 deletions pkg/pullsync/pullsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ func init() {
for i := 0; i < n; i++ {
chunks[i] = testingc.GenerateTestRandomChunk()
addrs[i] = chunks[i].Address()
stampHash, _ := chunks[i].Stamp().Hash()
results[i] = &storer.BinC{
Address: addrs[i],
BatchID: chunks[i].Stamp().BatchID(),
BinID: uint64(i),
Address: addrs[i],
BatchID: chunks[i].Stamp().BatchID(),
BinID: uint64(i),
StampHash: stampHash,
}
}
}
Expand Down Expand Up @@ -158,10 +160,15 @@ func TestIncoming_WantErrors(t *testing.T) {

tResults := make([]*storer.BinC, len(tChunks))
for i, c := range tChunks {
stampHash, err := c.Stamp().Hash()
if err != nil {
t.Fatal(err)
}
tResults[i] = &storer.BinC{
Address: c.Address(),
BatchID: c.Stamp().BatchID(),
BinID: uint64(i + 5), // start from a higher bin id
Address: c.Address(),
BatchID: c.Stamp().BatchID(),
BinID: uint64(i + 5), // start from a higher bin id
StampHash: stampHash,
}
}

Expand Down Expand Up @@ -305,7 +312,11 @@ func TestGetCursorsError(t *testing.T) {
func haveChunks(t *testing.T, s *mock.ReserveStore, chunks ...swarm.Chunk) {
t.Helper()
for _, c := range chunks {
have, err := s.ReserveHas(c.Address(), c.Stamp().BatchID())
stampHash, err := c.Stamp().Hash()
if err != nil {
t.Fatal(err)
}
have, err := s.ReserveHas(c.Address(), c.Stamp().BatchID(), stampHash)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/storagetest/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"testing"

"github.com/ethersphere/bee/v2/pkg/encryption"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/storageutil"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -793,7 +793,7 @@ func TestItemMarshalAndUnmarshal(t *testing.T, test *ItemMarshalAndUnmarshalTest

want, have := test.Item, item2
if !cmp.Equal(want, have, test.CmpOpts...) {
t.Errorf("Marshal/Unmarshal mismatch (-want +have):\n%s", cmp.Diff(want, have))
t.Errorf("Marshal/Unmarshal mismatch (-want +have):\n%s", cmp.Diff(want, have, test.CmpOpts...))
}
}

Expand Down
14 changes: 11 additions & 3 deletions pkg/storer/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing"
pullerMock "github.com/ethersphere/bee/v2/pkg/puller/mock"
chunk "github.com/ethersphere/bee/v2/pkg/storage/testing"
storer "github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

Expand Down Expand Up @@ -92,7 +92,11 @@ func TestCompact(t *testing.T) {
}

for _, ch := range chunks {
has, err := st.ReserveHas(ch.Address(), ch.Stamp().BatchID())
stampHash, err := ch.Stamp().Hash()
if err != nil {
t.Fatal(err)
}
has, err := st.ReserveHas(ch.Address(), ch.Stamp().BatchID(), stampHash)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -174,7 +178,11 @@ func TestCompactNoEvictions(t *testing.T) {
}

for _, ch := range chunks {
has, err := st.ReserveHas(ch.Address(), ch.Stamp().BatchID())
stampHash, err := ch.Stamp().Hash()
if err != nil {
t.Fatal(err)
}
has, err := st.ReserveHas(ch.Address(), ch.Stamp().BatchID(), stampHash)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 289f4c8

Please sign in to comment.