Skip to content

Commit

Permalink
feat: Add Repository::to_indexed_checked and ::to_index_ids_checked() (
Browse files Browse the repository at this point in the history
…#168)

This allows to use a repository even in the case of missing index files
without running an index repair. Instead the information is read from
the pack files and stored in-memory but not written.
This allows to use real read-only commands on repository with defect
index.
  • Loading branch information
aawsome authored Feb 25, 2024
1 parent 3907a5c commit 0605c9c
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 99 deletions.
74 changes: 34 additions & 40 deletions crates/core/src/commands/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,50 +269,44 @@ fn check_packs(
IndexType::DataIds
});

let mut process_pack = |p: IndexPack, check_time: bool| {
let blob_type = p.blob_type();
let pack_size = p.pack_size();
_ = packs.insert(p.id, pack_size);
if hot_be.is_some() && blob_type == BlobType::Tree {
_ = tree_packs.insert(p.id, pack_size);
}

// Check if time is set _
if check_time && p.time.is_none() {
error!("pack {}: No time is set! Run prune to correct this!", p.id);
}

// check offsests in index
let mut expected_offset: u32 = 0;
let mut blobs = p.blobs;
blobs.sort_unstable();
for blob in blobs {
if blob.tpe != blob_type {
error!(
"pack {}: blob {} blob type does not match: type: {:?}, expected: {:?}",
p.id, blob.id, blob.tpe, blob_type
);
}

if blob.offset != expected_offset {
error!(
"pack {}: blob {} offset in index: {}, expected: {}",
p.id, blob.id, blob.offset, expected_offset
);
}
expected_offset += blob.length;
}
};

let p = pb.progress_counter("reading index...");
for index in be.stream_all::<IndexFile>(&p)? {
let index = index?.1;
index_collector.extend(index.packs.clone());
for p in index.packs {
process_pack(p, false);
}
for p in index.packs_to_delete {
process_pack(p, true);
for (p, to_delete) in index.all_packs() {
let check_time = to_delete; // Check if time is set for packs marked to delete
let blob_type = p.blob_type();
let pack_size = p.pack_size();
_ = packs.insert(p.id, pack_size);
if hot_be.is_some() && blob_type == BlobType::Tree {
_ = tree_packs.insert(p.id, pack_size);
}

// Check if time is set _
if check_time && p.time.is_none() {
error!("pack {}: No time is set! Run prune to correct this!", p.id);
}

// check offsests in index
let mut expected_offset: u32 = 0;
let mut blobs = p.blobs;
blobs.sort_unstable();
for blob in blobs {
if blob.tpe != blob_type {
error!(
"pack {}: blob {} blob type does not match: type: {:?}, expected: {:?}",
p.id, blob.id, blob.tpe, blob_type
);
}

if blob.offset != expected_offset {
error!(
"pack {}: blob {} offset in index: {}, expected: {}",
p.id, blob.id, blob.offset, expected_offset
);
}
expected_offset += blob.length;
}
}
}

Expand Down
164 changes: 111 additions & 53 deletions crates/core/src/commands/repair/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use crate::{
FileType, ReadBackend, WriteBackend,
},
error::{CommandErrorKind, RusticErrorKind, RusticResult},
index::indexer::Indexer,
index::{binarysorted::IndexCollector, indexer::Indexer, GlobalIndex},
progress::{Progress, ProgressBars},
repofile::{IndexFile, IndexPack, PackHeader, PackHeaderRef},
repository::{Open, Repository},
Id,
};

#[cfg_attr(feature = "clap", derive(clap::Parser))]
Expand Down Expand Up @@ -49,60 +50,14 @@ impl RepairIndexOptions {
CommandErrorKind::NotAllowedWithAppendOnly("index repair".to_string()).into(),
);
}
let be = repo.dbe();
let p = repo.pb.progress_spinner("listing packs...");
let mut packs: HashMap<_, _> = be
.list_with_size(FileType::Pack)
.map_err(RusticErrorKind::Backend)?
.into_iter()
.collect();
p.finish();

let mut pack_read_header = Vec::new();

let mut process_pack = |p: IndexPack,
to_delete: bool,
new_index: &mut IndexFile,
changed: &mut bool| {
let index_size = p.pack_size();
let id = p.id;
match packs.remove(&id) {
None => {
// this pack either does not exist or was already indexed in another index file => remove from index!
*changed = true;
debug!("removing non-existing pack {id} from index");
}
Some(size) => {
if index_size != size {
info!("pack {id}: size computed by index: {index_size}, actual size: {size}, will re-read header");
}

if index_size != size || self.read_all {
// pack exists, but sizes do not match or we want to read all pack files
pack_read_header.push((
id,
Some(PackHeaderRef::from_index_pack(&p).size()),
size,
));
*changed = true;
} else {
new_index.add(p, to_delete);
}
}
}
};
let be = repo.dbe();
let mut checker = PackChecker::new(repo)?;

let p = repo.pb.progress_counter("reading index...");
for index in be.stream_all::<IndexFile>(&p)? {
let (index_id, index) = index?;
let mut new_index = IndexFile::default();
let mut changed = false;
for p in index.packs {
process_pack(p, false, &mut new_index, &mut changed);
}
for p in index.packs_to_delete {
process_pack(p, true, &mut new_index, &mut changed);
}
let (new_index, changed) = checker.check_pack(index, self.read_all);
match (changed, dry_run) {
(true, true) => info!("would have modified index file {index_id}"),
(true, false) => {
Expand All @@ -117,9 +72,7 @@ impl RepairIndexOptions {
}
p.finish();

// process packs which are listed but not contained in the index
pack_read_header.extend(packs.into_iter().map(|(id, size)| (id, None, size)));

let pack_read_header = checker.into_pack_to_read();
repo.warm_up_wait(pack_read_header.iter().map(|(id, _, _)| *id))?;

let indexer = Indexer::new(be.clone()).into_shared();
Expand Down Expand Up @@ -156,3 +109,108 @@ impl RepairIndexOptions {
Ok(())
}
}

struct PackChecker {
packs: HashMap<Id, u32>,
packs_to_read: Vec<(Id, Option<u32>, u32)>,
}

impl PackChecker {
fn new<P: ProgressBars, S: Open>(repo: &Repository<P, S>) -> RusticResult<Self> {
let be = repo.dbe();
let p = repo.pb.progress_spinner("listing packs...");
let packs: HashMap<_, _> = be
.list_with_size(FileType::Pack)
.map_err(RusticErrorKind::Backend)?
.into_iter()
.collect();
p.finish();

Ok(Self {
packs,
packs_to_read: Vec::new(),
})
}

fn check_pack(&mut self, indexfile: IndexFile, read_all: bool) -> (IndexFile, bool) {
let mut new_index = IndexFile::default();
let mut changed = false;
for (p, to_delete) in indexfile.all_packs() {
let index_size = p.pack_size();
let id = p.id;
match self.packs.remove(&id) {
None => {
// this pack either does not exist or was already indexed in another index file => remove from index!
debug!("removing non-existing pack {id} from index");
changed = true;
}
Some(size) => {
if index_size != size {
info!("pack {id}: size computed by index: {index_size}, actual size: {size}, will re-read header");
}

if index_size != size || read_all {
// pack exists, but sizes do not match or we want to read all pack files
self.packs_to_read.push((
id,
Some(PackHeaderRef::from_index_pack(&p).size()),
size,
));
} else {
new_index.add(p, to_delete);
}
}
}
}
(new_index, changed)
}

fn into_pack_to_read(mut self) -> Vec<(Id, Option<u32>, u32)> {
// add packs which are listed but not contained in the index
self.packs_to_read
.extend(self.packs.into_iter().map(|(id, size)| (id, None, size)));
self.packs_to_read
}
}

pub(crate) fn index_checked_from_collector<P: ProgressBars, S: Open>(
repo: &Repository<P, S>,
mut collector: IndexCollector,
) -> RusticResult<GlobalIndex> {
let mut checker = PackChecker::new(repo)?;
let be = repo.dbe();

let p = repo.pb.progress_counter("reading index...");
for index in be.stream_all::<IndexFile>(&p)? {
collector.extend(checker.check_pack(index?.1, false).0.packs);
}
p.finish();

let pack_read_header = checker.into_pack_to_read();
repo.warm_up_wait(pack_read_header.iter().map(|(id, _, _)| *id))?;

let p = repo.pb.progress_counter("reading pack headers");
p.set_length(
pack_read_header
.len()
.try_into()
.map_err(CommandErrorKind::ConversionToU64Failed)?,
);
let index_packs: Vec<_> = pack_read_header
.into_iter()
.map(|(id, size_hint, packsize)| {
debug!("reading pack {id}...");
let pack = IndexPack {
id,
blobs: PackHeader::from_file(be, id, size_hint, packsize)?.into_blobs(),
..Default::default()
};
p.inc(1);
Ok(pack)
})
.collect::<RusticResult<_>>()?;
p.finish();

collector.extend(index_packs);
Ok(GlobalIndex::new_from_index(collector.into_index()))
}
7 changes: 7 additions & 0 deletions crates/core/src/repofile/indexfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ impl IndexFile {
self.packs.push(p);
}
}

pub(crate) fn all_packs(self) -> impl Iterator<Item = (IndexPack, bool)> {
self.packs
.into_iter()
.map(|pack| (pack, false))
.chain(self.packs_to_delete.into_iter().map(|pack| (pack, true)))
}
}

#[derive(Serialize, Deserialize, Default, Debug, Clone)]
Expand Down
Loading

0 comments on commit 0605c9c

Please sign in to comment.