Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Close tcp after sending a close frame #72

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,17 +523,20 @@ impl<'f, S> WebSocket<S> {
where
S: AsyncReadExt + AsyncWriteExt + Unpin,
{
eprintln!("read_frame call");
loop {
let (res, obligated_send) =
self.read_half.read_frame_inner(&mut self.stream).await;
let is_closed = self.write_half.closed;
if let Some(frame) = obligated_send {
if !is_closed {
eprintln!("writing obligated send");
self.write_half.write_frame(&mut self.stream, frame).await?;
}
}
if let Some(frame) = res? {
if is_closed && frame.opcode != OpCode::Close {
eprintln!("return connection closed");
return Err(WebSocketError::ConnectionClosed);
}
break Ok(frame);
Expand Down Expand Up @@ -572,6 +575,7 @@ impl ReadHalf {
where
S: AsyncReadExt + Unpin,
{
eprintln!("read_frame_inner call");
let mut frame = match self.parse_frame_header(stream).await {
Ok(frame) => frame,
Err(e) => return (Err(e), None),
Expand All @@ -581,8 +585,10 @@ impl ReadHalf {
frame.unmask()
};

eprintln!("here 1");
match frame.opcode {
OpCode::Close if self.auto_close => {
eprintln!("here 2");
match frame.payload.len() {
0 => {}
1 => return (Err(WebSocketError::InvalidCloseFrame), None),
Expand Down Expand Up @@ -614,9 +620,11 @@ impl ReadHalf {
(Ok(Some(frame)), Some(obligated_send))
}
OpCode::Ping if self.auto_pong => {
eprintln!("here 3");
(Ok(None), Some(Frame::pong(frame.payload)))
}
OpCode::Text => {
eprintln!("here 4");
if frame.fin && !frame.is_utf8() {
(Err(WebSocketError::InvalidUTF8), None)
} else {
Expand All @@ -634,6 +642,7 @@ impl ReadHalf {
where
S: AsyncReadExt + Unpin,
{
eprintln!("parse_frame_header call");
macro_rules! eof {
($n:expr) => {{
if $n == 0 {
Expand Down Expand Up @@ -741,23 +750,33 @@ impl WriteHalf {
where
S: AsyncWriteExt + Unpin,
{
eprintln!("write_frame");
if self.role == Role::Client && self.auto_apply_mask {
frame.mask();
}

if frame.opcode == OpCode::Close {
eprintln!("close opcode, setting self.closed");
self.closed = true;
} else if self.closed {
eprintln!("already closed, throw");
return Err(WebSocketError::ConnectionClosed);
}

if self.vectored && frame.payload.len() > self.writev_threshold {
eprintln!("writev");
frame.writev(stream).await?;
} else {
eprintln!("write_all text");
let text = frame.write(&mut self.write_buffer);
stream.write_all(text).await?;
}

if frame.opcode == OpCode::Close {
eprintln!("shutdown!");
stream.shutdown().await?;
}

Ok(())
}
}
Expand Down
16 changes: 16 additions & 0 deletions tests/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,28 @@ async fn hyper() {
assert!(message.opcode == fastwebsockets::OpCode::Text);
assert!(message.payload == b"Hello!");

eprintln!("saying goodbye");
let_assert!(
Ok(()) = stream
.write_frame(fastwebsockets::Frame::text(b"Goodbye!".to_vec().into()))
.await
);
let_assert!(Ok(close_frame) = stream.read_frame().await);
assert!(close_frame.opcode == fastwebsockets::OpCode::Close);
// let frame = stream.read_frame().await;
// // match frame {
// // Err(e) => {
// // eprintln!("frame is error {}", e);
// // },
// // Ok(result) => {
// // eprintln!("frame is ok");
// // }
// // }
// // let_assert!(Err(error) = frame);
// // let_assert!(error = WebSocketError::)
// // let_assert!(Ok(()) = Err({}));
// let_assert!(Ok(close_frame) = frame);
// assert!(close_frame.opcode == fastwebsockets::OpCode::Close);
}

async fn upgrade_websocket(
Expand All @@ -100,6 +115,7 @@ async fn upgrade_websocket(
assert!(reply.opcode == fastwebsockets::OpCode::Text);
assert!(reply.payload == b"Goodbye!");

eprintln!("server sending close frame");
assert!(let Ok(()) = stream.write_frame(fastwebsockets::Frame::close_raw(vec![].into())).await);
});

Expand Down
Loading