From b6771e01c482a9b7db26072638024dd22efeef55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= <stephane.duchesneau@streamingfast.io> Date: Thu, 7 Dec 2023 15:32:01 -0500 Subject: [PATCH] fix merger to convert legacy blocks --- merger/merger_io.go | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/merger/merger_io.go b/merger/merger_io.go index 647835e..aeec9d1 100644 --- a/merger/merger_io.go +++ b/merger/merger_io.go @@ -1,11 +1,11 @@ package merger import ( + "bytes" "context" "errors" "fmt" "io" - "io/ioutil" "sort" "strconv" "strings" @@ -174,6 +174,34 @@ func (s *DStoreIO) WalkOneBlockFiles(ctx context.Context, lowestBlock uint64, ca } +// fixLegacyBlock reads the header and looks for "Version 0", rewriting to Version 1 on the fly if needed +func fixLegacyBlock(in []byte) ([]byte, error) { + dbinReader, err := bstream.NewDBinBlockReader(bytes.NewReader(in)) + if err != nil { + return nil, fmt.Errorf("creating block reader in fixLegacyBlock: %w", err) + } + + if dbinReader.Header.Version != 0 { + return in, nil + } + + blk, err := dbinReader.Read() + if err != nil { + return nil, err + } + + out := new(bytes.Buffer) + writer, err := bstream.NewDBinBlockWriter(out) + if err != nil { + return nil, err + } + if err := writer.Write(blk); err != nil { + return nil, err + } + return out.Bytes(), nil + +} + func (s *DStoreIO) DownloadOneBlockFile(ctx context.Context, oneBlockFile *bstream.OneBlockFile) (data []byte, err error) { for filename := range oneBlockFile.Filenames { // will try to get MemoizeData from any of those files var out io.ReadCloser @@ -190,9 +218,14 @@ func (s *DStoreIO) DownloadOneBlockFile(ctx context.Context, oneBlockFile *bstre default: } - data, err = ioutil.ReadAll(out) + data, err = io.ReadAll(out) + if err != nil { + continue + } + + data, err = fixLegacyBlock(data) if err == nil { - return data, nil + break } }