Skip to content

Commit

Permalink
*: update rust toolchain to 2022-01-07 (tikv#11875)
Browse files Browse the repository at this point in the history
* update rust toolchain to 2022-01-17

Signed-off-by: tabokie <[email protected]>

* address comment: clean up unnecessary `as_ref()`

Signed-off-by: tabokie <[email protected]>

* try fixing the underflow

Signed-off-by: tabokie <[email protected]>

* mute underflow warning for raft metrics

Signed-off-by: tabokie <[email protected]>

* clean up unused data members

Signed-off-by: tabokie <[email protected]>

* format

Signed-off-by: tabokie <[email protected]>

* step back to 2022-01-07

Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie authored Jan 20, 2022
1 parent 5f0e9ae commit a401f78
Show file tree
Hide file tree
Showing 95 changed files with 258 additions and 300 deletions.
21 changes: 5 additions & 16 deletions components/backup/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ impl fmt::Debug for Task {
}
}

#[derive(Clone)]
struct LimitedStorage {
limiter: Limiter,
storage: Arc<dyn ExternalStorage>,
}

impl Task {
/// Create a backup task based on the given backup request.
pub fn new(
Expand Down Expand Up @@ -483,7 +477,7 @@ impl BackupRange {
&self,
engine: E,
db: Arc<DB>,
storage: &LimitedStorage,
limiter: &Limiter,
file_name: String,
cf: CfNameWrap,
compression_type: Option<SstCompressionType>,
Expand All @@ -495,7 +489,7 @@ impl BackupRange {
db,
&file_name,
cf,
storage.limiter.clone(),
limiter.clone(),
compression_type,
compression_level,
cipher,
Expand Down Expand Up @@ -793,7 +787,7 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
request: Request,
saver_tx: async_channel::Sender<InMemBackupFiles>,
resp_tx: UnboundedSender<BackupResponse>,
backend: Arc<dyn ExternalStorage>,
_backend: Arc<dyn ExternalStorage>,
) {
let start_ts = request.start_ts;
let backup_ts = request.end_ts;
Expand All @@ -806,11 +800,6 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
let limit = self.softlimit.limit();

self.pool.borrow_mut().spawn(async move {
let storage = LimitedStorage {
limiter: request.limiter,
storage: backend,
};

loop {
// when get the guard, release it until we finish scanning a batch,
// because if we were suspended during scanning,
Expand Down Expand Up @@ -868,7 +857,7 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
.backup_raw_kv_to_file(
engine,
db.clone(),
&storage,
&request.limiter,
name,
cf.into(),
ct,
Expand All @@ -880,7 +869,7 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
} else {
let writer_builder = BackupWriterBuilder::new(
store_id,
storage.limiter.clone(),
request.limiter.clone(),
brange.region.clone(),
db.clone(),
ct,
Expand Down
4 changes: 2 additions & 2 deletions components/cdc/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,12 @@ impl<'a> Drain {
});
let (event_bytes, resolved_ts_bytes) = batcher.statistics();
let resps = batcher.build();
let last_idx = resps.len() - 1;
let resps_len = resps.len();
// Events are about to be sent, free pending events memory counter.
memory_quota.free(bytes as _);
for (i, e) in resps.into_iter().enumerate() {
// Buffer messages and flush them at once.
let write_flags = WriteFlags::default().buffer_hint(i != last_idx);
let write_flags = WriteFlags::default().buffer_hint(i + 1 != resps_len);
sink.feed((e, write_flags)).await?;
}
sink.flush().await?;
Expand Down
2 changes: 2 additions & 0 deletions components/cdc/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ impl TestSuiteBuilder {
}
}

#[must_use]
pub fn cluster(mut self, cluster: Cluster<ServerCluster>) -> TestSuiteBuilder {
self.cluster = Some(cluster);
self
}

#[must_use]
pub fn memory_quota(mut self, memory_quota: usize) -> TestSuiteBuilder {
self.memory_quota = Some(memory_quota);
self
Expand Down
8 changes: 4 additions & 4 deletions components/encryption/src/file_dict_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,14 +548,14 @@ mod tests {
)
.unwrap();

file_dict.insert(&"f1".to_owned(), &info1).unwrap();
file_dict.insert(&"f2".to_owned(), &info2).unwrap();
file_dict.insert(&"f3".to_owned(), &info3).unwrap();
file_dict.insert("f1", &info1).unwrap();
file_dict.insert("f2", &info2).unwrap();
file_dict.insert("f3", &info3).unwrap();

file_dict.insert("f4", &info4).unwrap();
file_dict.remove("f3").unwrap();

file_dict.remove(&"f2".to_owned()).unwrap();
file_dict.remove("f2").unwrap();
}
// Try open as v1 file. Should fail.
{
Expand Down
5 changes: 1 addition & 4 deletions components/encryption/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,7 @@ impl CrypterCore {

fn reset_buffer(&mut self, size: usize) {
// OCrypter require the output buffer to have block_size extra bytes, or it will panic.
self.buffer.reserve(size + self.block_size);
unsafe {
self.buffer.set_len(size + self.block_size);
}
self.buffer.resize(size + self.block_size, 0);
}

pub fn reset_crypter(&mut self, offset: u64) -> IoResult<()> {
Expand Down
2 changes: 1 addition & 1 deletion components/engine_rocks/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub mod compression_type_serde {
"disable" => DBCompressionType::Disable,
_ => {
return Err(E::invalid_value(
Unexpected::Other(&"invalid compression type".to_string()),
Unexpected::Other("invalid compression type"),
&self,
));
}
Expand Down
14 changes: 7 additions & 7 deletions components/engine_rocks/src/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,19 +598,19 @@ mod tests {
assert_eq!(props.get_approximate_keys_in_range(b"", b"k"), 11_u64);

assert_eq!(props.offsets.len(), 7);
let a = props.get(b"a".as_ref());
let a = props.get(b"a");
assert_eq!(a.size, 1);
let e = props.get(b"e".as_ref());
let e = props.get(b"e");
assert_eq!(e.size, DEFAULT_PROP_SIZE_INDEX_DISTANCE + 5);
let i = props.get(b"i".as_ref());
let i = props.get(b"i");
assert_eq!(i.size, DEFAULT_PROP_SIZE_INDEX_DISTANCE / 8 * 17 + 9);
let k = props.get(b"k".as_ref());
let k = props.get(b"k");
assert_eq!(k.size, DEFAULT_PROP_SIZE_INDEX_DISTANCE / 8 * 25 + 11);
let m = props.get(b"m".as_ref());
let m = props.get(b"m");
assert_eq!(m.keys, 11 + DEFAULT_PROP_KEYS_INDEX_DISTANCE);
let n = props.get(b"n".as_ref());
let n = props.get(b"n");
assert_eq!(n.keys, 11 + 2 * DEFAULT_PROP_KEYS_INDEX_DISTANCE);
let o = props.get(b"o".as_ref());
let o = props.get(b"o");
assert_eq!(o.keys, 12 + 2 * DEFAULT_PROP_KEYS_INDEX_DISTANCE);
let empty = RangeOffsets::default();
let cases = [
Expand Down
11 changes: 1 addition & 10 deletions components/engine_traits/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Default for ReadOptions {
}
}

#[derive(Clone)]
#[derive(Clone, Default)]
pub struct WriteOptions {
sync: bool,
no_slowdown: bool,
Expand Down Expand Up @@ -60,15 +60,6 @@ impl WriteOptions {
}
}

impl Default for WriteOptions {
fn default() -> WriteOptions {
WriteOptions {
sync: false,
no_slowdown: false,
}
}
}

#[derive(Clone, PartialEq)]
pub enum SeekMode {
TotalOrder,
Expand Down
5 changes: 5 additions & 0 deletions components/engine_traits/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,22 @@ where
fn new() -> Self;

/// Set DB for the builder. The builder may need some config from the DB.
#[must_use]
fn set_db(self, db: &E) -> Self;

/// Set CF for the builder. The builder may need some config from the CF.
#[must_use]
fn set_cf(self, cf: &str) -> Self;

/// Set it to true, the builder builds a in-memory SST builder.
#[must_use]
fn set_in_memory(self, in_memory: bool) -> Self;

/// set other config specified by writer
#[must_use]
fn set_compression_type(self, compression: Option<SstCompressionType>) -> Self;

#[must_use]
fn set_compression_level(self, level: i32) -> Self;

/// Builder a SstWriter.
Expand Down
10 changes: 2 additions & 8 deletions components/file_system/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

#![feature(test)]
#![feature(duration_consts_2)]
#![feature(duration_consts_float)]

#[macro_use]
extern crate lazy_static;
Expand Down Expand Up @@ -108,18 +108,12 @@ impl Drop for WithIOType {
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Default)]
pub struct IOBytes {
read: u64,
write: u64,
}

impl Default for IOBytes {
fn default() -> Self {
IOBytes { read: 0, write: 0 }
}
}

impl std::ops::Sub for IOBytes {
type Output = Self;

Expand Down
4 changes: 2 additions & 2 deletions components/raftstore/src/coprocessor/split_check/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ mod tests {
type Case = (Option<Vec<u8>>, Option<Vec<u8>>, Option<i64>);
let mut check_cases = |cases: Vec<Case>| {
for (encoded_start_key, encoded_end_key, table_id) in cases {
region.set_start_key(encoded_start_key.unwrap_or_else(Vec::new));
region.set_end_key(encoded_end_key.unwrap_or_else(Vec::new));
region.set_start_key(encoded_start_key.unwrap_or_default());
region.set_end_key(encoded_end_key.unwrap_or_default());
runnable.run(SplitCheckTask::split_check(
region.clone(),
true,
Expand Down
1 change: 0 additions & 1 deletion components/raftstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#![cfg_attr(test, feature(test))]
#![feature(cell_update)]
#![feature(shrink_to)]
#![feature(div_duration)]
#![feature(min_specialization)]
#![feature(box_patterns)]
Expand Down
2 changes: 0 additions & 2 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4794,9 +4794,7 @@ mod tests {

#[derive(Clone, Default)]
struct ApplyObserver {
pre_admin_count: Arc<AtomicUsize>,
pre_query_count: Arc<AtomicUsize>,
post_admin_count: Arc<AtomicUsize>,
post_query_count: Arc<AtomicUsize>,
cmd_sink: Option<Arc<Mutex<Sender<CmdBatch>>>>,
}
Expand Down
27 changes: 11 additions & 16 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ where
// FIXME: should use `bcast_check_stale_peer_message` instead.
// Sending a new enum type msg to a old tikv may cause panic during rolling update
// we should change the protobuf behavior and check if properly handled in all place
self.fsm.peer.bcast_wake_up_message(&mut self.ctx);
self.fsm.peer.bcast_wake_up_message(self.ctx);
}
}
CasualMessage::SnapshotGenerated => {
Expand Down Expand Up @@ -920,7 +920,7 @@ where
.iter()
.any(|p| p.get_id() == self.fsm.peer_id())
{
self.fsm.peer.send_wake_up_message(&mut self.ctx, &leader);
self.fsm.peer.send_wake_up_message(self.ctx, &leader);
}
}
CasualMessage::RejectRaftAppend { peer_id } => {
Expand All @@ -930,7 +930,7 @@ where
msg.from = self.fsm.peer.peer_id();

let raft_msg = self.fsm.peer.build_raft_messages(self.ctx, vec![msg]);
self.fsm.peer.send_raft_messages(&mut self.ctx, raft_msg);
self.fsm.peer.send_raft_messages(self.ctx, raft_msg);
}
}
}
Expand Down Expand Up @@ -1695,9 +1695,7 @@ where
}
}

if result.is_err() {
return result;
}
result?;

if self.fsm.peer.any_new_peer_catch_up(from_peer_id) {
self.fsm.peer.heartbeat_pd(self.ctx);
Expand Down Expand Up @@ -2289,7 +2287,7 @@ where
match self
.fsm
.peer
.ready_to_transfer_leader(&mut self.ctx, msg.get_index(), &from)
.ready_to_transfer_leader(self.ctx, msg.get_index(), &from)
{
Some(reason) => {
info!(
Expand Down Expand Up @@ -2337,12 +2335,9 @@ where
}
}
} else {
self.fsm.peer.execute_transfer_leader(
&mut self.ctx,
msg.get_from(),
peer_disk_usage,
false,
);
self.fsm
.peer
.execute_transfer_leader(self.ctx, msg.get_from(), peer_disk_usage, false);
}
}

Expand Down Expand Up @@ -3336,7 +3331,7 @@ where
.as_ref()
.unwrap()
.get_commit(),
&mut self.ctx,
self.ctx,
);
}
}
Expand Down Expand Up @@ -4632,7 +4627,7 @@ where
"expect" => %self.ctx.cfg.max_leader_missing_duration,
);

self.fsm.peer.bcast_check_stale_peer_message(&mut self.ctx);
self.fsm.peer.bcast_check_stale_peer_message(self.ctx);

let task = PdTask::ValidatePeer {
peer: self.fsm.peer.peer.clone(),
Expand Down Expand Up @@ -4736,7 +4731,7 @@ where
// As the leader can propose the TransferLeader request successfully, the disk of
// the leader is probably not full.
self.fsm.peer.execute_transfer_leader(
&mut self.ctx,
self.ctx,
self.fsm.peer.leader_id(),
DiskUsage::Normal,
true,
Expand Down
20 changes: 16 additions & 4 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,9 +859,21 @@ impl<EK: KvEngine, ER: RaftEngine, T: Transport> PollHandler<PeerFsm<EK, ER>, St
self.tag,
self.poll_ctx.pending_count,
self.poll_ctx.ready_count,
self.poll_ctx.raft_metrics.ready.append - self.previous_metrics.append,
self.poll_ctx.raft_metrics.ready.message - self.previous_metrics.message,
self.poll_ctx.raft_metrics.ready.snapshot - self.previous_metrics.snapshot
self.poll_ctx
.raft_metrics
.ready
.append
.saturating_sub(self.previous_metrics.append),
self.poll_ctx
.raft_metrics
.ready
.message
.saturating_sub(self.previous_metrics.message),
self.poll_ctx
.raft_metrics
.ready
.snapshot
.saturating_sub(self.previous_metrics.snapshot),
);
dur
} else {
Expand Down Expand Up @@ -1263,7 +1275,7 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
}

pub fn refresh_config_scheduler(&mut self) -> Scheduler<RefreshConfigTask> {
assert!(!self.workers.is_none());
assert!(self.workers.is_some());
self.workers
.as_ref()
.unwrap()
Expand Down
Loading

0 comments on commit a401f78

Please sign in to comment.