Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

poll_* methods to support custom futures implementations #78

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5f1d0e5
frame: Use abstract trait to write the header to
dgrr May 27, 2024
9ea19e6
Implement poll* methods to support future-based systems
dgrr May 27, 2024
3c779ff
Merge branch 'main' of github.com:dgrr/fastwebsockets
dgrr May 27, 2024
7fc1617
poll methods for WebSocketStream
dgrr May 28, 2024
f1696f1
WebSocket: start_send_frame function
dgrr May 29, 2024
30c1586
Cargo.toml: futures version
dgrr May 29, 2024
909cd67
Cargo.toml: tokio_util version
dgrr May 29, 2024
78f81dd
Set the waker before flushing
dgrr May 29, 2024
23a7101
Implement poll_read_frame & poll_write_frame for WebSocketRead and We…
dgrr May 29, 2024
59904ad
Bring back unstable-split feature
dgrr Jun 23, 2024
8285b55
write_frame: Check readiness of the underlying connection by flushing…
dgrr Jun 24, 2024
3f050bf
Function docs
dgrr Jun 24, 2024
b7b7696
Fix return type when not using SIMD for utf8 processing
dgrr Jun 24, 2024
b1b087c
Generate state machine for reading
dgrr Jun 30, 2024
3636e5b
WriteHalf: Advance buffer on poll_ready
dgrr Jun 30, 2024
3a537e3
Simplify loop by conradludgate
dgrr Jun 30, 2024
da4f6d7
use vec
conradludgate Jul 1, 2024
9abbf6c
better buffer manipulation
conradludgate Jul 1, 2024
e91e922
re-introduce vectored writes
conradludgate Jul 1, 2024
5d45077
refactor
conradludgate Jul 1, 2024
1d579a1
Use a Vec<u8> instead of BytesMut for the write buffer
dgrr Jul 1, 2024
988cbd5
Merge branch 'main' into dgrr/main
dgrr Jul 1, 2024
53b2b96
Merge pull request #1 from conradludgate/dgrr/main
dgrr Jul 6, 2024
28c331a
Remove underscore from used variable names
dgrr Jul 6, 2024
7656018
Added test to check that simple and vectored serialization do not con…
dgrr Jul 6, 2024
b960f15
WebSocket: Handle obligated send flushing with states to ensure delivery
dgrr Jul 7, 2024
9eebea5
Merge branch 'denoland:main' into main
dgrr Sep 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 16 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ path = "examples/axum.rs"
required-features = ["upgrade", "with_axum"]

[dependencies]
tokio = { version = "1.25.0", default-features = false, features = ["io-util"] }
tokio = { version = "1.25.0", default-features = false, features = ["io-util"] }
simdutf8 = { version = "0.1.4", optional = true }
hyper-util = { version = "0.1.0", features = ["tokio"], optional = true }
http-body-util = { version = "0.1.0", optional = true }
hyper = { version = "1", features = ["http1", "server", "client"], optional = true }
hyper = { version = "1", features = [
"http1",
"server",
"client",
], optional = true }
pin-project = { version = "1.0.8", optional = true }
base64 = { version = "0.21.0", optional = true }
sha1 = { version = "0.10.5", optional = true }
Expand All @@ -45,11 +49,20 @@ bytes = "1.5.0"
axum-core = { version = "0.4.3", optional = true }
http = { version = "1", optional = true }
async-trait = { version = "0.1", optional = true }
tokio-util = { version = "0.7", features = ["codec", "io"] }
futures = { version = "0.3", default-features = false, features = ["std"] }

[features]
default = ["simd"]
simd = ["simdutf8/aarch64_neon"]
upgrade = ["hyper", "pin-project", "base64", "sha1", "hyper-util", "http-body-util"]
upgrade = [
"hyper",
"pin-project",
"base64",
"sha1",
"hyper-util",
"http-body-util",
]
unstable-split = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional to remove unstable-split? I would like to keep it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can put it back in

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, the question is, why is it unstable?

# Axum integration
with_axum = ["axum-core", "http", "async-trait"]
Expand Down
25 changes: 13 additions & 12 deletions src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use tokio::io::AsyncWriteExt;

use bytes::BytesMut;
use bytes::{BufMut, BytesMut};
use core::ops::Deref;

use crate::WebSocketError;
Expand Down Expand Up @@ -259,26 +259,27 @@ impl<'f> Frame<'f> {
/// # Panics
///
/// This method panics if the head buffer is not at least n-bytes long, where n is the size of the length field (0, 2, 4, or 10)
pub fn fmt_head(&mut self, head: &mut [u8]) -> usize {
head[0] = (self.fin as u8) << 7 | (self.opcode as u8);
pub fn fmt_head(&mut self, mut head: impl BufMut) -> usize {
head.put_u8((self.fin as u8) << 7 | (self.opcode as u8));

let mask_bit = if self.mask.is_some() { 0x80 } else { 0x0 };

let len = self.payload.len();
let size = if len < 126 {
head[1] = len as u8;
head.put_u8(len as u8 | mask_bit);
2
} else if len < 65536 {
head[1] = 126;
head[2..4].copy_from_slice(&(len as u16).to_be_bytes());
head.put_u8(126u8 | mask_bit);
head.put_slice(&(len as u16).to_be_bytes());
4
} else {
head[1] = 127;
head[2..10].copy_from_slice(&(len as u64).to_be_bytes());
head.put_u8(127u8 | mask_bit);
head.put_slice(&(len as u64).to_be_bytes());
10
};

if let Some(mask) = self.mask {
head[1] |= 0x80;
head[size..size + 4].copy_from_slice(&mask);
head.put_slice(&mask);
size + 4
} else {
size
Expand All @@ -295,7 +296,7 @@ impl<'f> Frame<'f> {
use std::io::IoSlice;

let mut head = [0; MAX_HEAD_SIZE];
let size = self.fmt_head(&mut head);
let size = self.fmt_head(&mut head[..]);

let total = size + self.payload.len();

Expand Down Expand Up @@ -330,7 +331,7 @@ impl<'f> Frame<'f> {
let len = self.payload.len();
reserve_enough(buf, len + MAX_HEAD_SIZE);

let size = self.fmt_head(buf);
let size = self.fmt_head(&mut *buf);
buf[size..size + len].copy_from_slice(&self.payload);
&buf[..size + len]
}
Expand Down
Loading
Loading