Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable stream/future payload lift/lower for non-Wasm platforms #1120

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 43 additions & 29 deletions crates/guest-rust/rt/src/async_support/future_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use {
collections::hash_map::Entry,
fmt,
future::{Future, IntoFuture},
mem::ManuallyDrop,
pin::Pin,
sync::atomic::{AtomicU32, Ordering::Relaxed},
task::{Context, Poll},
},
};
Expand Down Expand Up @@ -199,7 +199,8 @@ impl<T> CancelableRead<T> {

fn cancel_mut(&mut self) -> FutureReader<T> {
let reader = self.reader.take().unwrap();
super::with_entry(reader.handle, |entry| match entry {
let handle = reader.handle.load(Relaxed);
super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::LocalOpen
Expand All @@ -209,7 +210,7 @@ impl<T> CancelableRead<T> {
Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalOpen);
}
Handle::Read => (reader.vtable.cancel_read)(reader.handle),
Handle::Read => (reader.vtable.cancel_read)(handle),
},
});
reader
Expand All @@ -226,7 +227,7 @@ impl<T> Drop for CancelableRead<T> {

/// Represents the readable end of a Component Model `future`.
pub struct FutureReader<T: 'static> {
handle: u32,
handle: AtomicU32,
vtable: &'static FutureVtable<T>,
}

Expand All @@ -241,7 +242,10 @@ impl<T> fmt::Debug for FutureReader<T> {
impl<T> FutureReader<T> {
#[doc(hidden)]
pub fn new(handle: u32, vtable: &'static FutureVtable<T>) -> Self {
Self { handle, vtable }
Self {
handle: AtomicU32::new(handle),
vtable,
}
}

#[doc(hidden)]
Expand All @@ -264,12 +268,16 @@ impl<T> FutureReader<T> {
},
});

Self { handle, vtable }
Self {
handle: AtomicU32::new(handle),
vtable,
}
}

#[doc(hidden)]
pub fn into_handle(self) -> u32 {
super::with_entry(self.handle, |entry| match entry {
pub fn take_handle(&self) -> u32 {
let handle = self.handle.swap(u32::MAX, Relaxed);
super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::LocalOpen => {
Expand All @@ -282,7 +290,7 @@ impl<T> FutureReader<T> {
},
});

ManuallyDrop::new(self).handle
handle
}
}

Expand All @@ -294,7 +302,7 @@ impl<T> IntoFuture for FutureReader<T> {
/// written to the writable end of this `future` (yielding a `Some` result)
/// or when the writable end is dropped (yielding a `None` result).
fn into_future(self) -> Self::IntoFuture {
let handle = self.handle;
let handle = self.handle.load(Relaxed);
let vtable = self.vtable;
CancelableRead {
reader: Some(self),
Expand Down Expand Up @@ -325,24 +333,30 @@ impl<T> IntoFuture for FutureReader<T> {

impl<T> Drop for FutureReader<T> {
fn drop(&mut self) {
super::with_entry(self.handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get_mut() {
Handle::LocalReady(..) => {
let Handle::LocalReady(_, waker) = entry.insert(Handle::LocalClosed) else {
unreachable!()
};
waker.wake();
}
Handle::LocalOpen | Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalClosed);
}
Handle::Read | Handle::LocalClosed => {
entry.remove();
(self.vtable.close_readable)(self.handle);
}
Handle::Write => unreachable!(),
},
});
match self.handle.load(Relaxed) {
u32::MAX => {}
handle => {
super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get_mut() {
Handle::LocalReady(..) => {
let Handle::LocalReady(_, waker) = entry.insert(Handle::LocalClosed)
else {
unreachable!()
};
waker.wake();
}
Handle::LocalOpen | Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalClosed);
}
Handle::Read | Handle::LocalClosed => {
entry.remove();
(self.vtable.close_readable)(handle);
}
Handle::Write => unreachable!(),
},
});
}
}
}
}
182 changes: 94 additions & 88 deletions crates/guest-rust/rt/src/async_support/stream_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use {
fmt,
future::Future,
iter,
mem::{self, ManuallyDrop, MaybeUninit},
mem::{self, MaybeUninit},
pin::Pin,
sync::atomic::{AtomicU32, Ordering::Relaxed},
task::{Context, Poll},
vec::Vec,
},
Expand All @@ -28,7 +29,7 @@ fn ceiling(x: usize, y: usize) -> usize {

#[doc(hidden)]
pub struct StreamVtable<T> {
pub write: fn(future: u32, values: &[T]) -> Pin<Box<dyn Future<Output = Option<usize>> + '_>>,
pub write: fn(future: u32, values: &[T]) -> Pin<Box<dyn Future<Output = usize> + '_>>,
pub read: fn(
future: u32,
values: &mut [MaybeUninit<T>],
Expand Down Expand Up @@ -173,14 +174,7 @@ impl<T> Sink<Vec<T>> for StreamWriter<T> {
vtable,
};
self.get_mut().future = Some(Box::pin(async move {
let mut offset = 0;
while offset < item.len() {
if let Some(count) = (vtable.write)(handle, &item[offset..]).await {
offset += count;
} else {
break;
}
}
(vtable.write)(handle, &item).await;
cancel_on_drop.handle = None;
drop(cancel_on_drop);
}));
Expand Down Expand Up @@ -246,7 +240,7 @@ impl<T> Drop for CancelReadOnDrop<T> {

/// Represents the readable end of a Component Model `stream`.
pub struct StreamReader<T: 'static> {
handle: u32,
handle: AtomicU32,
future: Option<Pin<Box<dyn Future<Output = Option<Vec<T>>> + 'static>>>,
vtable: &'static StreamVtable<T>,
}
Expand All @@ -273,7 +267,7 @@ impl<T> StreamReader<T> {
#[doc(hidden)]
pub fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
Self {
handle,
handle: AtomicU32::new(handle),
future: None,
vtable,
}
Expand All @@ -300,15 +294,16 @@ impl<T> StreamReader<T> {
});

Self {
handle,
handle: AtomicU32::new(handle),
future: None,
vtable,
}
}

#[doc(hidden)]
pub fn into_handle(self) -> u32 {
super::with_entry(self.handle, |entry| match entry {
pub fn take_handle(&self) -> u32 {
let handle = self.handle.swap(u32::MAX, Relaxed);
super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::LocalOpen => {
Expand All @@ -321,7 +316,7 @@ impl<T> StreamReader<T> {
},
});

ManuallyDrop::new(self).handle
handle
}
}

Expand All @@ -332,60 +327,65 @@ impl<T> Stream for StreamReader<T> {
let me = self.get_mut();

if me.future.is_none() {
me.future = Some(super::with_entry(me.handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::Write | Handle::LocalWaiting(_) => unreachable!(),
Handle::Read => {
let handle = me.handle;
let vtable = me.vtable;
let mut cancel_on_drop = CancelReadOnDrop::<T> {
handle: Some(handle),
vtable,
};
Box::pin(async move {
let mut buffer = iter::repeat_with(MaybeUninit::uninit)
.take(ceiling(64 * 1024, mem::size_of::<T>()))
.collect::<Vec<_>>();

let result =
if let Some(count) = (vtable.read)(handle, &mut buffer).await {
buffer.truncate(count);
Some(unsafe {
mem::transmute::<Vec<MaybeUninit<T>>, Vec<T>>(buffer)
})
} else {
None
};
cancel_on_drop.handle = None;
drop(cancel_on_drop);
result
}) as Pin<Box<dyn Future<Output = _>>>
}
Handle::LocalOpen => {
let (tx, rx) = oneshot::channel();
entry.insert(Handle::LocalWaiting(tx));
let mut cancel_on_drop = CancelReadOnDrop::<T> {
handle: Some(me.handle),
vtable: me.vtable,
};
Box::pin(async move {
let result = rx.map(|v| v.ok().map(|v| *v.downcast().unwrap())).await;
cancel_on_drop.handle = None;
drop(cancel_on_drop);
result
})
}
Handle::LocalClosed => Box::pin(future::ready(None)),
Handle::LocalReady(..) => {
let Handle::LocalReady(v, waker) = entry.insert(Handle::LocalOpen) else {
unreachable!()
};
waker.wake();
Box::pin(future::ready(Some(*v.downcast().unwrap())))
}
me.future = Some(super::with_entry(
me.handle.load(Relaxed),
|entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::Write | Handle::LocalWaiting(_) => unreachable!(),
Handle::Read => {
let handle = me.handle.load(Relaxed);
let vtable = me.vtable;
let mut cancel_on_drop = CancelReadOnDrop::<T> {
handle: Some(handle),
vtable,
};
Box::pin(async move {
let mut buffer = iter::repeat_with(MaybeUninit::uninit)
.take(ceiling(64 * 1024, mem::size_of::<T>()))
.collect::<Vec<_>>();

let result =
if let Some(count) = (vtable.read)(handle, &mut buffer).await {
buffer.truncate(count);
Some(unsafe {
mem::transmute::<Vec<MaybeUninit<T>>, Vec<T>>(buffer)
})
} else {
None
};
cancel_on_drop.handle = None;
drop(cancel_on_drop);
result
}) as Pin<Box<dyn Future<Output = _>>>
}
Handle::LocalOpen => {
let (tx, rx) = oneshot::channel();
entry.insert(Handle::LocalWaiting(tx));
let mut cancel_on_drop = CancelReadOnDrop::<T> {
handle: Some(me.handle.load(Relaxed)),
vtable: me.vtable,
};
Box::pin(async move {
let result =
rx.map(|v| v.ok().map(|v| *v.downcast().unwrap())).await;
cancel_on_drop.handle = None;
drop(cancel_on_drop);
result
})
}
Handle::LocalClosed => Box::pin(future::ready(None)),
Handle::LocalReady(..) => {
let Handle::LocalReady(v, waker) = entry.insert(Handle::LocalOpen)
else {
unreachable!()
};
waker.wake();
Box::pin(future::ready(Some(*v.downcast().unwrap())))
}
},
},
}));
));
}

match me.future.as_mut().unwrap().as_mut().poll(cx) {
Expand All @@ -402,24 +402,30 @@ impl<T> Drop for StreamReader<T> {
fn drop(&mut self) {
self.future = None;

super::with_entry(self.handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get_mut() {
Handle::LocalReady(..) => {
let Handle::LocalReady(_, waker) = entry.insert(Handle::LocalClosed) else {
unreachable!()
};
waker.wake();
}
Handle::LocalOpen | Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalClosed);
}
Handle::Read | Handle::LocalClosed => {
entry.remove();
(self.vtable.close_readable)(self.handle);
}
Handle::Write => unreachable!(),
},
});
match self.handle.load(Relaxed) {
u32::MAX => {}
handle => {
super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get_mut() {
Handle::LocalReady(..) => {
let Handle::LocalReady(_, waker) = entry.insert(Handle::LocalClosed)
else {
unreachable!()
};
waker.wake();
}
Handle::LocalOpen | Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalClosed);
}
Handle::Read | Handle::LocalClosed => {
entry.remove();
(self.vtable.close_readable)(handle);
}
Handle::Write => unreachable!(),
},
});
}
}
}
}
Loading
Loading