From 38191c62e0fffc12ed73dc1b334d5f1da65c70f4 Mon Sep 17 00:00:00 2001 From: liangyongrui Date: Tue, 22 Jun 2021 11:30:52 +0800 Subject: [PATCH] fmt --- readme.md | 14 +++++++------- rustfmt.toml | 5 +++++ src/bin/cli.rs | 6 ++---- src/bin/server.rs | 10 ++++++---- src/buffer.rs | 11 ++++++----- src/client.rs | 13 +++++++++---- src/cmd/get.rs | 4 ++-- src/cmd/mod.rs | 4 ++-- src/cmd/publish.rs | 4 ++-- src/cmd/set.rs | 9 ++++++--- src/cmd/subscribe.rs | 12 +++++++----- src/cmd/unknown.rs | 4 ++-- src/connection.rs | 17 ++++++++++++----- src/db/mod.rs | 2 -- src/db/shared.rs | 27 --------------------------- src/db/slot.rs | 41 ++++++++++++++++++++++++++++++++++------- src/frame.rs | 16 +++++++++++----- src/parse.rs | 5 +++-- src/server.rs | 14 ++++++++------ 19 files changed, 124 insertions(+), 94 deletions(-) create mode 100644 rustfmt.toml delete mode 100644 src/db/shared.rs diff --git a/readme.md b/readme.md index b6fdcf0..c7700fa 100644 --- a/readme.md +++ b/readme.md @@ -4,12 +4,12 @@ rust cloud cache ## todo -1. [ ] 异步锁 -1. [ ] 使用[resp3 协议](https://www.zeekling.cn/articles/2021/01/10/1610263628832.html) -1. [ ] 多slot (shard) - - 热key 单独 slot(加锁转移) -1. [ ] 复杂数据结构(大key), 持久化数据结构 mvcc -1. [ ] 单key,多次更新聚合 +1. [x] 多 slot (shard) + - [ ] 热 key 单独 slot(加锁转移) +1. [ ] 复杂数据结构(大 key), 持久化数据结构 mvcc +1. [ ] 单 key,多次更新聚合 1. [ ] 持久化 1. [ ] 主备 -1. [ ] pipeline(停止服务器的时候,处理干净pipeline) +1. [ ] pipeline(停止服务器的时候,处理干净 pipeline) +1. [ ] 异步锁 (shutdown 改成原子的可能就好了) +1. [ ] 支持[resp3 协议](https://www.zeekling.cn/articles/2021/01/10/1610263628832.html) \ No newline at end of file diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..5d39c2d --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,5 @@ +imports_granularity = "Crate" +use_field_init_shorthand = true +format_strings = true +group_imports = "StdExternalCrate" +use_try_shorthand = true diff --git a/src/bin/cli.rs b/src/bin/cli.rs index 817e880..11ae17d 100644 --- a/src/bin/cli.rs +++ b/src/bin/cli.rs @@ -1,9 +1,7 @@ -use rcc::{client, DEFAULT_PORT}; +use std::{num::ParseIntError, str, time::Duration}; use bytes::Bytes; -use std::num::ParseIntError; -use std::str; -use std::time::Duration; +use rcc::{client, DEFAULT_PORT}; use structopt::StructOpt; #[derive(StructOpt, Debug)] diff --git a/src/bin/server.rs b/src/bin/server.rs index edb5a56..3e22df0 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -7,16 +7,18 @@ //! The `clap` crate is used for parsing arguments. use rcc::{server, DEFAULT_PORT}; - use structopt::StructOpt; -use tokio::net::TcpListener; -use tokio::signal; +use tokio::{net::TcpListener, signal}; +use tracing::Level; #[tokio::main] pub async fn main() -> rcc::Result<()> { // enable logging // see https://docs.rs/tracing for more info - tracing_subscriber::fmt::try_init()?; + // tracing_subscriber::fmt::try_init()?; + tracing_subscriber::fmt::Subscriber::builder() + .with_max_level(Level::DEBUG) + .try_init()?; let cli = Cli::from_args(); let port = cli.port.as_deref().unwrap_or(DEFAULT_PORT); diff --git a/src/buffer.rs b/src/buffer.rs index be7b0ee..f9db64d 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -1,9 +1,10 @@ -use crate::client::Client; -use crate::Result; - use bytes::Bytes; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::sync::oneshot; +use tokio::sync::{ + mpsc::{channel, Receiver, Sender}, + oneshot, +}; + +use crate::{client::Client, Result}; /// Create a new client request buffer /// diff --git a/src/client.rs b/src/client.rs index b841d6a..1c1e464 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,17 +2,22 @@ //! //! Provides an async connect and methods for issuing the supported commands. -use crate::cmd::{Get, Publish, Set, Subscribe, Unsubscribe}; -use crate::{Connection, Frame}; +use std::{ + io::{Error, ErrorKind}, + time::Duration, +}; use async_stream::try_stream; use bytes::Bytes; -use std::io::{Error, ErrorKind}; -use std::time::Duration; use tokio::net::{TcpStream, ToSocketAddrs}; use tokio_stream::Stream; use tracing::{debug, instrument}; +use crate::{ + cmd::{Get, Publish, Set, Subscribe, Unsubscribe}, + Connection, Frame, +}; + /// Established connection with a Redis server. /// /// Backed by a single `TcpStream`, `Client` provides basic network client diff --git a/src/cmd/get.rs b/src/cmd/get.rs index debc621..6ecf8ec 100644 --- a/src/cmd/get.rs +++ b/src/cmd/get.rs @@ -1,8 +1,8 @@ -use crate::{Connection, Frame, Parse, Db}; - use bytes::Bytes; use tracing::{debug, instrument}; +use crate::{Connection, Db, Frame, Parse}; + /// Get the value of key. /// /// If the key does not exist the special value nil is returned. An error is diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index 5549e35..3280bdf 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -13,7 +13,7 @@ pub use subscribe::{Subscribe, Unsubscribe}; mod unknown; pub use unknown::Unknown; -use crate::{Connection, Frame, Parse, ParseError, Shutdown, Db}; +use crate::{Connection, Db, Frame, Parse, ParseError, Shutdown}; /// Enumeration of supported Redis commands. /// @@ -21,8 +21,8 @@ use crate::{Connection, Frame, Parse, ParseError, Shutdown, Db}; #[derive(Debug)] pub enum Command { Get(Get), - Publish(Publish), Set(Set), + Publish(Publish), Subscribe(Subscribe), Unsubscribe(Unsubscribe), Unknown(Unknown), diff --git a/src/cmd/publish.rs b/src/cmd/publish.rs index 9f01079..3005153 100644 --- a/src/cmd/publish.rs +++ b/src/cmd/publish.rs @@ -1,7 +1,7 @@ -use crate::{Connection, Frame, Parse, Db}; - use bytes::Bytes; +use crate::{Connection, Db, Frame, Parse}; + /// Posts a message to the given channel. /// /// Send a message into a channel without any knowledge of individual consumers. diff --git a/src/cmd/set.rs b/src/cmd/set.rs index 18c7dbf..6607ab9 100644 --- a/src/cmd/set.rs +++ b/src/cmd/set.rs @@ -1,10 +1,13 @@ -use crate::cmd::{Parse, ParseError}; -use crate::{Connection, Db, Frame}; +use std::time::Duration; use bytes::Bytes; -use std::time::Duration; use tracing::{debug, instrument}; +use crate::{ + cmd::{Parse, ParseError}, + Connection, Db, Frame, +}; + /// Set `key` to hold the string `value`. /// /// If `key` already holds a value, it is overwritten, regardless of its type. diff --git a/src/cmd/subscribe.rs b/src/cmd/subscribe.rs index 4b339c4..5cf1022 100644 --- a/src/cmd/subscribe.rs +++ b/src/cmd/subscribe.rs @@ -1,12 +1,14 @@ -use crate::cmd::{Parse, ParseError, Unknown}; -use crate::{Command, Connection, Db, Frame, Shutdown}; +use std::pin::Pin; use bytes::Bytes; -use std::pin::Pin; -use tokio::select; -use tokio::sync::broadcast; +use tokio::{select, sync::broadcast}; use tokio_stream::{Stream, StreamExt, StreamMap}; +use crate::{ + cmd::{Parse, ParseError, Unknown}, + Command, Connection, Db, Frame, Shutdown, +}; + /// Subscribes the client to one or more channels. /// /// Once the client enters the subscribed state, it is not supposed to issue any diff --git a/src/cmd/unknown.rs b/src/cmd/unknown.rs index 79bfe28..ba83f25 100644 --- a/src/cmd/unknown.rs +++ b/src/cmd/unknown.rs @@ -1,7 +1,7 @@ -use crate::{Connection, Frame}; - use tracing::{debug, instrument}; +use crate::{Connection, Frame}; + /// Represents an "unknown" command. This is not a real `Redis` command. #[derive(Debug)] pub struct Unknown { diff --git a/src/connection.rs b/src/connection.rs index b941a36..29502b5 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,9 +1,13 @@ -use crate::frame::{self, Frame}; +use std::io::{self, Cursor}; use bytes::{Buf, BytesMut}; -use std::io::{self, Cursor}; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; -use tokio::net::TcpStream; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, BufWriter}, + net::TcpStream, +}; +use tracing::debug; + +use crate::frame::{self, Frame}; /// Send and receive `Frame` values from a remote peer. /// @@ -92,7 +96,10 @@ impl Connection { // which provides a number of helpful utilities for working // with bytes. let mut buf = Cursor::new(&self.buffer[..]); - + debug!( + "parse_frame, raw buf: {}", + std::str::from_utf8(&self.buffer[..]).unwrap() + ); // The first step is to check if enough data has been buffered to parse // a single frame. This step is usually much faster than doing a full // parse of the frame, and allows us to skip allocating data structures diff --git a/src/db/mod.rs b/src/db/mod.rs index 9c9302c..315366d 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -6,12 +6,10 @@ use std::{ }; use bytes::Bytes; - use tokio::sync::broadcast; use self::slot::Slot; -mod shared; mod slot; mod state; diff --git a/src/db/shared.rs b/src/db/shared.rs deleted file mode 100644 index 9c37f9e..0000000 --- a/src/db/shared.rs +++ /dev/null @@ -1,27 +0,0 @@ -use std::sync::Mutex; - -use tokio::sync::Notify; - -use super::state::State; - -#[derive(Debug)] -pub struct Shared { - /// The shared state is guarded by a mutex. This is a `std::sync::Mutex` and - /// not a Tokio mutex. This is because there are no asynchronous operations - /// being performed while holding the mutex. Additionally, the critical - /// sections are very small. - /// - /// A Tokio mutex is mostly intended to be used when locks need to be held - /// across `.await` yield points. All other cases are **usually** best - /// served by a std mutex. If the critical section does not include any - /// async operations but is long (CPU intensive or performing blocking - /// operations), then the entire operation, including waiting for the mutex, - /// is considered a "blocking" operation and `tokio::task::spawn_blocking` - /// should be used. - pub state: Mutex, - - /// Notifies the background task handling entry expiration. The background - /// task waits on this to be notified, then checks for expired values or the - /// shutdown signal. - pub background_task: Notify, -} diff --git a/src/db/slot.rs b/src/db/slot.rs index 821156b..d0fa34e 100644 --- a/src/db/slot.rs +++ b/src/db/slot.rs @@ -1,13 +1,39 @@ -use tokio::sync::{broadcast, Notify}; -use tokio::time::{self, Duration, Instant}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::{Arc, Mutex}, +}; use bytes::Bytes; -use std::collections::{BTreeMap, HashMap}; -use std::sync::{Arc, Mutex}; +use tokio::{ + sync::{broadcast, Notify}, + time::{self, Duration, Instant}, +}; +use tracing::debug; -use super::shared::Shared; use super::state::{Entry, State}; +#[derive(Debug)] +struct Shared { + /// The shared state is guarded by a mutex. This is a `std::sync::Mutex` and + /// not a Tokio mutex. This is because there are no asynchronous operations + /// being performed while holding the mutex. Additionally, the critical + /// sections are very small. + /// + /// A Tokio mutex is mostly intended to be used when locks need to be held + /// across `.await` yield points. All other cases are **usually** best + /// served by a std mutex. If the critical section does not include any + /// async operations but is long (CPU intensive or performing blocking + /// operations), then the entire operation, including waiting for the mutex, + /// is considered a "blocking" operation and `tokio::task::spawn_blocking` + /// should be used. + state: Mutex, + + /// Notifies the background task handling entry expiration. The background + /// task waits on this to be notified, then checks for expired values or the + /// shutdown signal. + background_task: Notify, +} + /// Server state shared across all connections. /// /// `Slot` contains a `HashMap` storing the key/value data and all @@ -24,7 +50,7 @@ use super::state::{Entry, State}; pub(crate) struct Slot { /// Handle to shared state. The background task will also have an /// `Arc`. - pub shared: Arc, + shared: Arc, } impl Slot { @@ -43,7 +69,7 @@ impl Slot { }); // Start the background task. - tokio::spawn(purge_expired_tasks(shared.clone())); + tokio::spawn(purge_expired_tasks(Arc::clone(&shared))); Slot { shared } } @@ -232,6 +258,7 @@ impl Shared { // expires. The worker task will wait until this instant. return Some(when); } + debug!("purge_expired_keys: {}", key); // The key expired, remove it state.entries.remove(key); state.expirations.remove(&(when, id)); diff --git a/src/frame.rs b/src/frame.rs index 6b26719..8c91922 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -1,14 +1,15 @@ //! Provides a type representing a Redis protocol frame as well as utilities for //! parsing frames from a byte array. +//! +//! 目前使用的是 RESP2 +//! todo 支持 RESP3 + +use std::{convert::TryInto, fmt, io::Cursor, num::TryFromIntError, string::FromUtf8Error}; use bytes::{Buf, Bytes}; -use std::convert::TryInto; -use std::fmt; -use std::io::Cursor; -use std::num::TryFromIntError; -use std::string::FromUtf8Error; /// A frame in the Redis protocol. +/// #[derive(Clone, Debug)] pub enum Frame { Simple(String), @@ -105,6 +106,7 @@ impl Frame { /// The message has already been validated with `check`. pub fn parse(src: &mut Cursor<&[u8]>) -> Result { match get_u8(src)? { + // simple string b'+' => { // Read the line and convert it to `Vec` let line = get_line(src)?.to_vec(); @@ -114,6 +116,7 @@ impl Frame { Ok(Frame::Simple(string)) } + // simple error b'-' => { // Read the line and convert it to `Vec` let line = get_line(src)?.to_vec(); @@ -123,10 +126,12 @@ impl Frame { Ok(Frame::Error(string)) } + // number b':' => { let len = get_decimal(src)?; Ok(Frame::Integer(len)) } + // blob string b'$' => { if b'-' == peek_u8(src)? { let line = get_line(src)?; @@ -153,6 +158,7 @@ impl Frame { Ok(Frame::Bulk(data)) } } + // array b'*' => { let len = get_decimal(src)?.try_into()?; let mut out = Vec::with_capacity(len); diff --git a/src/parse.rs b/src/parse.rs index f2a73b5..a941d4d 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -1,7 +1,8 @@ -use crate::Frame; +use std::{fmt, str, vec}; use bytes::Bytes; -use std::{fmt, str, vec}; + +use crate::Frame; /// Utility for parsing a command /// diff --git a/src/server.rs b/src/server.rs index 4f1d546..e1bac59 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,15 +3,17 @@ //! Provides an async `run` function that listens for inbound connections, //! spawning a task per connection. -use crate::{Command, Connection, Shutdown, Db}; +use std::{future::Future, sync::Arc}; -use std::future::Future; -use std::sync::Arc; -use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{broadcast, mpsc, Semaphore}; -use tokio::time::{self, Duration}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::{broadcast, mpsc, Semaphore}, + time::{self, Duration}, +}; use tracing::{debug, error, info, instrument}; +use crate::{Command, Connection, Db, Shutdown}; + /// Server listener state. Created in the `run` call. It includes a `run` method /// which performs the TCP listening and initialization of per-connection state. #[derive(Debug)]