Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(protocol): Brotli Compression #14

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 98 additions & 43 deletions crates/protocol/src/compression/brotli/compress.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! Brotli Compression

use crate::{BrotliLevel, ChannelCompressor, CompressorError, CompressorResult, CompressorWriter};
use std::vec::Vec;
use std::{cell::RefCell, io::Write, rc::Rc, vec::Vec};

const DEFAULT_BROTLI_LGWIN: u32 = 22;

/// A Brotli Compression Error.
#[derive(thiserror::Error, Debug)]
Expand All @@ -14,24 +16,100 @@ pub enum BrotliCompressionError {
CompressionError(#[from] std::io::Error),
}

/// The brotli compressor.
/// A buffer wrapped in an Rc<RefCell<>>
#[derive(Debug, Clone)]
struct BrotliBuffer(Rc<RefCell<Vec<u8>>>);

impl BrotliBuffer {
/// Create a new BrotliBuffer.
pub(crate) fn new() -> Self {
Self(Rc::new(RefCell::new(Vec::new())))
}

/// Get the buffer.
pub(crate) fn get(&self) -> Rc<RefCell<Vec<u8>>> {
self.0.clone()
}

/// Returns the length of the buffer.
pub(crate) fn len(&self) -> usize {
self.0.borrow().len()
}
}

impl From<Vec<u8>> for BrotliBuffer {
fn from(vec: Vec<u8>) -> Self {
Self(Rc::new(RefCell::new(vec)))
}
}

impl Write for BrotliBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.borrow_mut().write(buf)
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

/// The brotli compressor.
pub struct BrotliCompressor {
/// The compressed bytes.
compressed: Vec<u8>,
/// The raw bytes (need to store on reset).
raw: Vec<u8>,
/// The writer.
writer: brotli::CompressorWriter<BrotliBuffer>,
/// The buffer to write to.
buffer: BrotliBuffer,
/// Marks that the compressor is closed.
closed: bool,
/// The compression level.
pub level: BrotliLevel,
}

impl std::fmt::Debug for BrotliCompressor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{{buffer:{:?},closed:{},level:{:?}}}",
self.buffer.get().borrow(),
self.closed,
self.level
)
}
}

impl Clone for BrotliCompressor {
fn clone(&self) -> Self {
let buffer = self.buffer.get().borrow().clone();
Self {
closed: self.closed,
buffer: buffer.into(),
writer: brotli::CompressorWriter::new(
BrotliBuffer::new(),
0,
self.level.into(),
DEFAULT_BROTLI_LGWIN,
),
level: self.level,
}
}
}

impl BrotliCompressor {
/// Creates a new brotli compressor with the given compression level.
pub fn new(level: impl Into<BrotliLevel>) -> Self {
let level = level.into();
Self { compressed: Vec::new(), raw: Vec::new(), closed: false, level }
let buffer = BrotliBuffer::new();
Self {
closed: false,
writer: brotli::CompressorWriter::new(
buffer.clone(),
0,
level.into(),
DEFAULT_BROTLI_LGWIN,
),
buffer,
level,
}
}
}

Expand All @@ -41,44 +119,17 @@ impl From<BrotliLevel> for BrotliCompressor {
}
}

/// Compresses the given bytes data using the Brotli compressor implemented
/// in the [`brotli`](https://crates.io/crates/brotli) crate.
///
/// Note: The level must be between 0 and 11. In Optimism, the levels 9, 10, and 11 are used.
/// By default, [BrotliLevel::Brotli10] is used.
#[allow(unused_variables)]
#[allow(unused_mut)]
fn compress_brotli(
mut input: &[u8],
level: BrotliLevel,
) -> Result<Vec<u8>, BrotliCompressionError> {
use brotli::enc::{BrotliCompress, BrotliEncoderParams};
let mut output = alloc::vec![];
BrotliCompress(
&mut input,
&mut output,
&BrotliEncoderParams { quality: level as i32, ..Default::default() },
)?;
Ok(output)
}

impl CompressorWriter for BrotliCompressor {
fn write(&mut self, data: &[u8]) -> CompressorResult<usize> {
if self.closed {
return Err(CompressorError::Brotli);
}

// First append the new data to the raw buffer.
self.raw.extend_from_slice(data);

// Compress the raw buffer.
self.compressed =
compress_brotli(&self.raw, self.level).map_err(|_| CompressorError::Brotli)?;

Ok(data.len())
let written = self.writer.write(data).map_err(|_| CompressorError::Brotli)?;
Ok(written)
}
Comment on lines 123 to 129
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is broken and it's not clear to me why.


fn flush(&mut self) -> CompressorResult<()> {
self.writer.flush().map_err(|_| CompressorError::Brotli)?;
Ok(())
}

Expand All @@ -90,24 +141,28 @@ impl CompressorWriter for BrotliCompressor {

fn reset(&mut self) {
self.closed = false;
self.raw.clear();
self.compressed.clear();
self.writer = brotli::CompressorWriter::new(
BrotliBuffer::new(),
0,
self.level.into(),
DEFAULT_BROTLI_LGWIN,
);
}

fn read(&mut self, buf: &mut [u8]) -> CompressorResult<usize> {
let len = self.compressed.len().min(buf.len());
buf[..len].copy_from_slice(&self.compressed[..len]);
let len = self.buffer.get().borrow().len().min(buf.len());
buf[..len].copy_from_slice(&self.buffer.get().borrow()[..len]);
Ok(len)
}

fn len(&self) -> usize {
self.compressed.len()
self.writer.get_ref().len()
}
}

impl ChannelCompressor for BrotliCompressor {
fn get_compressed(&self) -> Vec<u8> {
self.compressed.clone()
self.buffer.get().borrow().clone()
}
}

Expand Down
10 changes: 5 additions & 5 deletions crates/protocol/src/compression/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use op_alloy_genesis::RollupConfig;
#[derive(Debug, Clone)]
pub enum VariantCompressor {
/// The brotli compressor.
Brotli(BrotliCompressor),
Brotli(Box<BrotliCompressor>),
/// The zlib compressor.
Zlib(ZlibCompressor),
}
Expand All @@ -20,7 +20,7 @@ impl VariantCompressor {
/// Constructs a [VariantCompressor] using the given [RollupConfig] and timestamp.
pub fn from_timestamp(config: &RollupConfig, timestamp: u64) -> Self {
if config.is_fjord_active(timestamp) {
Self::Brotli(BrotliCompressor::new(CompressionAlgo::Brotli10))
Self::Brotli(Box::new(BrotliCompressor::new(CompressionAlgo::Brotli10)))
} else {
Self::Zlib(ZlibCompressor::new())
}
Expand Down Expand Up @@ -83,9 +83,9 @@ impl ChannelCompressor for VariantCompressor {
impl From<CompressionAlgo> for VariantCompressor {
fn from(algo: CompressionAlgo) -> Self {
match algo {
lvl @ CompressionAlgo::Brotli9 => Self::Brotli(BrotliCompressor::new(lvl)),
lvl @ CompressionAlgo::Brotli10 => Self::Brotli(BrotliCompressor::new(lvl)),
lvl @ CompressionAlgo::Brotli11 => Self::Brotli(BrotliCompressor::new(lvl)),
lvl @ CompressionAlgo::Brotli9 => Self::Brotli(Box::new(BrotliCompressor::new(lvl))),
lvl @ CompressionAlgo::Brotli10 => Self::Brotli(Box::new(BrotliCompressor::new(lvl))),
lvl @ CompressionAlgo::Brotli11 => Self::Brotli(Box::new(BrotliCompressor::new(lvl))),
CompressionAlgo::Zlib => Self::Zlib(ZlibCompressor::new()),
}
}
Expand Down
Loading