Skip to content

Commit

Permalink
permessage-deflate
Browse files Browse the repository at this point in the history
  • Loading branch information
willrnch committed Aug 2, 2023
1 parent 875e6b7 commit 5955126
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 21 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ required-features = ["upgrade"]


[dependencies]
tokio = { version = "1.25.0", default-features = false, features = ["io-util"] }
tokio = { version = "1", default-features = false, features = ["io-util"] }
simdutf8 = { version = "0.1.4", optional = true }
hyper = { version = "0.14.26", features = ["http1", "server", "client"], optional = true }
pin-project = { version = "1.0.8", optional = true }
Expand All @@ -33,6 +33,7 @@ sha1 = { version = "0.10.5", optional = true }
utf-8 = "0.7.5"
rand = "0.8.4"
thiserror = "1.0.40"
miniz_oxide = "0.7.1"

[features]
default = ["simd"]
Expand Down
6 changes: 3 additions & 3 deletions autobahn/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
AUTOBAHN_TESTSUITE_DOCKER := crossbario/autobahn-testsuite:0.8.2@sha256:5d4ba3aa7d6ab2fdbf6606f3f4ecbe4b66f205ce1cbc176d6cdf650157e52242

build-server:
sudo cargo build --release --example echo_server --features "upgrade"
cargo build --release --example echo_server --features "upgrade"

run-server: build-server
echo ${PWD}
Expand All @@ -18,7 +18,7 @@ run-server: build-server
../target/release/examples/echo_server

build-client:
sudo cargo build --release --example autobahn_client --features "upgrade"
cargo build --release --example autobahn_client --features "upgrade"

run-client: build-client
echo ${PWD}
Expand All @@ -34,4 +34,4 @@ run-client: build-client
sleep 5
../target/release/examples/autobahn_client

.PHONY: build-server run-server build-client run-client
.PHONY: build-server run-server build-client run-client
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly
1.71
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub enum WebSocketError {
InvalidSecWebsocketVersion,
#[error("Invalid value")]
InvalidValue,
#[error("Invalid encoding")]
InvalidEncoding,
#[error("Sec-WebSocket-Key header is missing")]
MissingSecWebSocketKey,
#[error(transparent)]
Expand Down
2 changes: 2 additions & 0 deletions src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,15 @@ impl<'a, const N: usize> PartialEq<&'_ [u8; N]> for Payload<'a> {
}

/// Represents a WebSocket frame.
#[derive(Debug)]
pub struct Frame<'f> {
/// Indicates if this is the final frame in a message.
pub fin: bool,
/// The opcode of the frame.
pub opcode: OpCode,
/// The masking key of the frame, if any.
mask: Option<[u8; 4]>,

/// The payload of the frame.
pub payload: Payload<'f>,
}
Expand Down
63 changes: 47 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ mod recv;
#[cfg_attr(docsrs, doc(cfg(feature = "upgrade")))]
pub mod upgrade;

use miniz_oxide::{DataFormat, MZFlush};
use miniz_oxide::inflate::stream::{InflateState, inflate};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;

Expand Down Expand Up @@ -474,7 +476,11 @@ impl<'f, S> WebSocket<S> {
let rsv2 = head[0] & 0b00100000 != 0;
let rsv3 = head[0] & 0b00010000 != 0;

if rsv1 || rsv2 || rsv3 {
let mut compressed = false;

if rsv1 && !rsv2 && !rsv3 {
compressed = true;
} else if rsv1 || rsv2 || rsv3 {
return Err(WebSocketError::ReservedBitsNotZero);
}

Expand Down Expand Up @@ -526,29 +532,32 @@ impl<'f, S> WebSocket<S> {
}

let required = 2 + extra + mask.map(|_| 4).unwrap_or(0) + length;
if required > nread {
let mut payload = if required > nread {
// Allocate more space
let mut new_head = head.to_vec();
new_head.resize(required, 0);

stream.read_exact(&mut new_head[nread..]).await?;
return Ok(Frame::new(
fin,
opcode,
mask,
Payload::Owned(new_head[required - length..].to_vec()),
));
} else if nread > required {
// We read too much
self.spill = Some(head[required..nread].to_vec());
}

let payload = &mut head[required - length..required];
let payload = if payload.len() > self.writev_threshold {
Payload::BorrowedMut(payload)
Payload::Owned(new_head[required - length..].to_vec())
} else {
Payload::Owned(payload.to_vec())
if nread > required {
// We read too much
self.spill = Some(head[required..nread].to_vec());
}

let buff = &mut head[required - length..required];
if buff.len() > self.writev_threshold {
Payload::BorrowedMut(buff)
} else {
Payload::Owned(buff.to_vec())
}
};

if compressed {
payload = Payload::Owned(inflate_payload(&payload.to_vec())?);
}

let frame = Frame::new(fin, opcode, mask, payload);
Ok(frame)
}
Expand Down Expand Up @@ -583,3 +592,25 @@ mod tests {
assert_unsync::<WebSocket<tokio::net::TcpStream>>();
};
}

fn inflate_payload(
payload: &Vec<u8>
) -> Result<Vec<u8>, WebSocketError>
{
let max_output_size = usize::max_value();
let mut out: Vec<u8> = vec![0; payload.len().saturating_mul(2).min(max_output_size)];
let mut state = InflateState::new_boxed(DataFormat::Raw);

let payload = [payload.as_slice(), [0x00, 0x00, 0xff, 0xff].as_slice()].concat();
let res = inflate(&mut state, &payload, &mut out, MZFlush::Partial);

match res.status {
Ok(_) => {
out.truncate(res.bytes_written);
Ok(out)
}
Err(_) => {
Err(WebSocketError::InvalidEncoding)
}
}
}

0 comments on commit 5955126

Please sign in to comment.