Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
liangyongrui committed Jun 22, 2021
1 parent d00dd8a commit 38191c6
Show file tree
Hide file tree
Showing 19 changed files with 124 additions and 94 deletions.
14 changes: 7 additions & 7 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ rust cloud cache

## todo

1. [ ] 异步锁
1. [ ] 使用[resp3 协议](https://www.zeekling.cn/articles/2021/01/10/1610263628832.html)
1. [ ] 多slot (shard)
- 热key 单独 slot(加锁转移)
1. [ ] 复杂数据结构(大key), 持久化数据结构 mvcc
1. [ ] 单key,多次更新聚合
1. [x] 多 slot (shard)
- [ ] 热 key 单独 slot(加锁转移)
1. [ ] 复杂数据结构(大 key), 持久化数据结构 mvcc
1. [ ] 单 key,多次更新聚合
1. [ ] 持久化
1. [ ] 主备
1. [ ] pipeline(停止服务器的时候,处理干净pipeline)
1. [ ] pipeline(停止服务器的时候,处理干净 pipeline)
1. [ ] 异步锁 (shutdown 改成原子的可能就好了)
1. [ ] 支持[resp3 协议](https://www.zeekling.cn/articles/2021/01/10/1610263628832.html)
5 changes: 5 additions & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
imports_granularity = "Crate"
use_field_init_shorthand = true
format_strings = true
group_imports = "StdExternalCrate"
use_try_shorthand = true
6 changes: 2 additions & 4 deletions src/bin/cli.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use rcc::{client, DEFAULT_PORT};
use std::{num::ParseIntError, str, time::Duration};

use bytes::Bytes;
use std::num::ParseIntError;
use std::str;
use std::time::Duration;
use rcc::{client, DEFAULT_PORT};
use structopt::StructOpt;

#[derive(StructOpt, Debug)]
Expand Down
10 changes: 6 additions & 4 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@
//! The `clap` crate is used for parsing arguments.
use rcc::{server, DEFAULT_PORT};

use structopt::StructOpt;
use tokio::net::TcpListener;
use tokio::signal;
use tokio::{net::TcpListener, signal};
use tracing::Level;

#[tokio::main]
pub async fn main() -> rcc::Result<()> {
// enable logging
// see https://docs.rs/tracing for more info
tracing_subscriber::fmt::try_init()?;
// tracing_subscriber::fmt::try_init()?;
tracing_subscriber::fmt::Subscriber::builder()
.with_max_level(Level::DEBUG)
.try_init()?;

let cli = Cli::from_args();
let port = cli.port.as_deref().unwrap_or(DEFAULT_PORT);
Expand Down
11 changes: 6 additions & 5 deletions src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::client::Client;
use crate::Result;

use bytes::Bytes;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot;
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
oneshot,
};

use crate::{client::Client, Result};

/// Create a new client request buffer
///
Expand Down
13 changes: 9 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@
//!
//! Provides an async connect and methods for issuing the supported commands.
use crate::cmd::{Get, Publish, Set, Subscribe, Unsubscribe};
use crate::{Connection, Frame};
use std::{
io::{Error, ErrorKind},
time::Duration,
};

use async_stream::try_stream;
use bytes::Bytes;
use std::io::{Error, ErrorKind};
use std::time::Duration;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio_stream::Stream;
use tracing::{debug, instrument};

use crate::{
cmd::{Get, Publish, Set, Subscribe, Unsubscribe},
Connection, Frame,
};

/// Established connection with a Redis server.
///
/// Backed by a single `TcpStream`, `Client` provides basic network client
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/get.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{Connection, Frame, Parse, Db};

use bytes::Bytes;
use tracing::{debug, instrument};

use crate::{Connection, Db, Frame, Parse};

/// Get the value of key.
///
/// If the key does not exist the special value nil is returned. An error is
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ pub use subscribe::{Subscribe, Unsubscribe};
mod unknown;
pub use unknown::Unknown;

use crate::{Connection, Frame, Parse, ParseError, Shutdown, Db};
use crate::{Connection, Db, Frame, Parse, ParseError, Shutdown};

/// Enumeration of supported Redis commands.
///
/// Methods called on `Command` are delegated to the command implementation.
#[derive(Debug)]
pub enum Command {
Get(Get),
Publish(Publish),
Set(Set),
Publish(Publish),
Subscribe(Subscribe),
Unsubscribe(Unsubscribe),
Unknown(Unknown),
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/publish.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{Connection, Frame, Parse, Db};

use bytes::Bytes;

use crate::{Connection, Db, Frame, Parse};

/// Posts a message to the given channel.
///
/// Send a message into a channel without any knowledge of individual consumers.
Expand Down
9 changes: 6 additions & 3 deletions src/cmd/set.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::cmd::{Parse, ParseError};
use crate::{Connection, Db, Frame};
use std::time::Duration;

use bytes::Bytes;
use std::time::Duration;
use tracing::{debug, instrument};

use crate::{
cmd::{Parse, ParseError},
Connection, Db, Frame,
};

/// Set `key` to hold the string `value`.
///
/// If `key` already holds a value, it is overwritten, regardless of its type.
Expand Down
12 changes: 7 additions & 5 deletions src/cmd/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::cmd::{Parse, ParseError, Unknown};
use crate::{Command, Connection, Db, Frame, Shutdown};
use std::pin::Pin;

use bytes::Bytes;
use std::pin::Pin;
use tokio::select;
use tokio::sync::broadcast;
use tokio::{select, sync::broadcast};
use tokio_stream::{Stream, StreamExt, StreamMap};

use crate::{
cmd::{Parse, ParseError, Unknown},
Command, Connection, Db, Frame, Shutdown,
};

/// Subscribes the client to one or more channels.
///
/// Once the client enters the subscribed state, it is not supposed to issue any
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/unknown.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{Connection, Frame};

use tracing::{debug, instrument};

use crate::{Connection, Frame};

/// Represents an "unknown" command. This is not a real `Redis` command.
#[derive(Debug)]
pub struct Unknown {
Expand Down
17 changes: 12 additions & 5 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::frame::{self, Frame};
use std::io::{self, Cursor};

use bytes::{Buf, BytesMut};
use std::io::{self, Cursor};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::TcpStream;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
net::TcpStream,
};
use tracing::debug;

use crate::frame::{self, Frame};

/// Send and receive `Frame` values from a remote peer.
///
Expand Down Expand Up @@ -92,7 +96,10 @@ impl Connection {
// which provides a number of helpful utilities for working
// with bytes.
let mut buf = Cursor::new(&self.buffer[..]);

debug!(
"parse_frame, raw buf: {}",
std::str::from_utf8(&self.buffer[..]).unwrap()
);
// The first step is to check if enough data has been buffered to parse
// a single frame. This step is usually much faster than doing a full
// parse of the frame, and allows us to skip allocating data structures
Expand Down
2 changes: 0 additions & 2 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ use std::{
};

use bytes::Bytes;

use tokio::sync::broadcast;

use self::slot::Slot;

mod shared;
mod slot;
mod state;

Expand Down
27 changes: 0 additions & 27 deletions src/db/shared.rs

This file was deleted.

41 changes: 34 additions & 7 deletions src/db/slot.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
use tokio::sync::{broadcast, Notify};
use tokio::time::{self, Duration, Instant};
use std::{
collections::{BTreeMap, HashMap},
sync::{Arc, Mutex},
};

use bytes::Bytes;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
use tokio::{
sync::{broadcast, Notify},
time::{self, Duration, Instant},
};
use tracing::debug;

use super::shared::Shared;
use super::state::{Entry, State};

#[derive(Debug)]
struct Shared {
/// The shared state is guarded by a mutex. This is a `std::sync::Mutex` and
/// not a Tokio mutex. This is because there are no asynchronous operations
/// being performed while holding the mutex. Additionally, the critical
/// sections are very small.
///
/// A Tokio mutex is mostly intended to be used when locks need to be held
/// across `.await` yield points. All other cases are **usually** best
/// served by a std mutex. If the critical section does not include any
/// async operations but is long (CPU intensive or performing blocking
/// operations), then the entire operation, including waiting for the mutex,
/// is considered a "blocking" operation and `tokio::task::spawn_blocking`
/// should be used.
state: Mutex<State>,

/// Notifies the background task handling entry expiration. The background
/// task waits on this to be notified, then checks for expired values or the
/// shutdown signal.
background_task: Notify,
}

/// Server state shared across all connections.
///
/// `Slot` contains a `HashMap` storing the key/value data and all
Expand All @@ -24,7 +50,7 @@ use super::state::{Entry, State};
pub(crate) struct Slot {
/// Handle to shared state. The background task will also have an
/// `Arc<Shared>`.
pub shared: Arc<Shared>,
shared: Arc<Shared>,
}

impl Slot {
Expand All @@ -43,7 +69,7 @@ impl Slot {
});

// Start the background task.
tokio::spawn(purge_expired_tasks(shared.clone()));
tokio::spawn(purge_expired_tasks(Arc::clone(&shared)));

Slot { shared }
}
Expand Down Expand Up @@ -232,6 +258,7 @@ impl Shared {
// expires. The worker task will wait until this instant.
return Some(when);
}
debug!("purge_expired_keys: {}", key);
// The key expired, remove it
state.entries.remove(key);
state.expirations.remove(&(when, id));
Expand Down
16 changes: 11 additions & 5 deletions src/frame.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
//! Provides a type representing a Redis protocol frame as well as utilities for
//! parsing frames from a byte array.
//!
//! 目前使用的是 RESP2
//! todo 支持 RESP3
use std::{convert::TryInto, fmt, io::Cursor, num::TryFromIntError, string::FromUtf8Error};

use bytes::{Buf, Bytes};
use std::convert::TryInto;
use std::fmt;
use std::io::Cursor;
use std::num::TryFromIntError;
use std::string::FromUtf8Error;

/// A frame in the Redis protocol.
///
#[derive(Clone, Debug)]
pub enum Frame {
Simple(String),
Expand Down Expand Up @@ -105,6 +106,7 @@ impl Frame {
/// The message has already been validated with `check`.
pub fn parse(src: &mut Cursor<&[u8]>) -> Result<Frame, Error> {
match get_u8(src)? {
// simple string
b'+' => {
// Read the line and convert it to `Vec<u8>`
let line = get_line(src)?.to_vec();
Expand All @@ -114,6 +116,7 @@ impl Frame {

Ok(Frame::Simple(string))
}
// simple error
b'-' => {
// Read the line and convert it to `Vec<u8>`
let line = get_line(src)?.to_vec();
Expand All @@ -123,10 +126,12 @@ impl Frame {

Ok(Frame::Error(string))
}
// number
b':' => {
let len = get_decimal(src)?;
Ok(Frame::Integer(len))
}
// blob string
b'$' => {
if b'-' == peek_u8(src)? {
let line = get_line(src)?;
Expand All @@ -153,6 +158,7 @@ impl Frame {
Ok(Frame::Bulk(data))
}
}
// array
b'*' => {
let len = get_decimal(src)?.try_into()?;
let mut out = Vec::with_capacity(len);
Expand Down
Loading

0 comments on commit 38191c6

Please sign in to comment.