Skip to content

Commit

Permalink
enable stream/future payload lift/lower for non-Wasm platforms (#1120)
Browse files Browse the repository at this point in the history
* enable stream/future payload lift/lower for non-Wasm platforms

Previously, we generated no code for non-Wasm platforms; which meant our codegen
tests weren't really testing much as far as streams and futures go.  Now that
the tests actually do something, they uncovered a few issues which I've fixed:

- Invalid code generation when using `duplicate_if_necessary: true`
- Invalid code generation for stream or futures whose payloads contain one or more a streams or futures

For the latter, I mimicked what we do for resources: use interior mutability to
provide `take_handle` methods for `StreamReader` and `FutureReader`.

Signed-off-by: Joel Dice <[email protected]>

* avoid lowering same values more than once in `StreamVtable::write`

Previously, we would optimistically lower all the values in the input array and
then re-lower the subset which wasn't accepted the first time.  Aside from being
inefficient, that was also incorrect since re-lowering would fail in the cases
of any resource handles, futures, or streams in the payload since we would have
already taken the handles using `take_handle`.

Signed-off-by: Joel Dice <[email protected]>

---------

Signed-off-by: Joel Dice <[email protected]>
  • Loading branch information
dicej authored Jan 10, 2025
1 parent 51e1ef5 commit 11b04d7
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 162 deletions.
72 changes: 43 additions & 29 deletions crates/guest-rust/rt/src/async_support/future_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use {
collections::hash_map::Entry,
fmt,
future::{Future, IntoFuture},
mem::ManuallyDrop,
pin::Pin,
sync::atomic::{AtomicU32, Ordering::Relaxed},
task::{Context, Poll},
},
};
Expand Down Expand Up @@ -199,7 +199,8 @@ impl<T> CancelableRead<T> {

fn cancel_mut(&mut self) -> FutureReader<T> {
let reader = self.reader.take().unwrap();
super::with_entry(reader.handle, |entry| match entry {
let handle = reader.handle.load(Relaxed);
super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::LocalOpen
Expand All @@ -209,7 +210,7 @@ impl<T> CancelableRead<T> {
Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalOpen);
}
Handle::Read => (reader.vtable.cancel_read)(reader.handle),
Handle::Read => (reader.vtable.cancel_read)(handle),
},
});
reader
Expand All @@ -226,7 +227,7 @@ impl<T> Drop for CancelableRead<T> {

/// Represents the readable end of a Component Model `future`.
pub struct FutureReader<T: 'static> {
handle: u32,
handle: AtomicU32,
vtable: &'static FutureVtable<T>,
}

Expand All @@ -241,7 +242,10 @@ impl<T> fmt::Debug for FutureReader<T> {
impl<T> FutureReader<T> {
#[doc(hidden)]
pub fn new(handle: u32, vtable: &'static FutureVtable<T>) -> Self {
Self { handle, vtable }
Self {
handle: AtomicU32::new(handle),
vtable,
}
}

#[doc(hidden)]
Expand All @@ -264,12 +268,16 @@ impl<T> FutureReader<T> {
},
});

Self { handle, vtable }
Self {
handle: AtomicU32::new(handle),
vtable,
}
}

#[doc(hidden)]
pub fn into_handle(self) -> u32 {
super::with_entry(self.handle, |entry| match entry {
pub fn take_handle(&self) -> u32 {
let handle = self.handle.swap(u32::MAX, Relaxed);
super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::LocalOpen => {
Expand All @@ -282,7 +290,7 @@ impl<T> FutureReader<T> {
},
});

ManuallyDrop::new(self).handle
handle
}
}

Expand All @@ -294,7 +302,7 @@ impl<T> IntoFuture for FutureReader<T> {
/// written to the writable end of this `future` (yielding a `Some` result)
/// or when the writable end is dropped (yielding a `None` result).
fn into_future(self) -> Self::IntoFuture {
let handle = self.handle;
let handle = self.handle.load(Relaxed);
let vtable = self.vtable;
CancelableRead {
reader: Some(self),
Expand Down Expand Up @@ -325,24 +333,30 @@ impl<T> IntoFuture for FutureReader<T> {

impl<T> Drop for FutureReader<T> {
fn drop(&mut self) {
super::with_entry(self.handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get_mut() {
Handle::LocalReady(..) => {
let Handle::LocalReady(_, waker) = entry.insert(Handle::LocalClosed) else {
unreachable!()
};
waker.wake();
}
Handle::LocalOpen | Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalClosed);
}
Handle::Read | Handle::LocalClosed => {
entry.remove();
(self.vtable.close_readable)(self.handle);
}
Handle::Write => unreachable!(),
},
});
match self.handle.load(Relaxed) {
u32::MAX => {}
handle => {
super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get_mut() {
Handle::LocalReady(..) => {
let Handle::LocalReady(_, waker) = entry.insert(Handle::LocalClosed)
else {
unreachable!()
};
waker.wake();
}
Handle::LocalOpen | Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalClosed);
}
Handle::Read | Handle::LocalClosed => {
entry.remove();
(self.vtable.close_readable)(handle);
}
Handle::Write => unreachable!(),
},
});
}
}
}
}
182 changes: 94 additions & 88 deletions crates/guest-rust/rt/src/async_support/stream_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use {
fmt,
future::Future,
iter,
mem::{self, ManuallyDrop, MaybeUninit},
mem::{self, MaybeUninit},
pin::Pin,
sync::atomic::{AtomicU32, Ordering::Relaxed},
task::{Context, Poll},
vec::Vec,
},
Expand All @@ -28,7 +29,7 @@ fn ceiling(x: usize, y: usize) -> usize {

#[doc(hidden)]
pub struct StreamVtable<T> {
pub write: fn(future: u32, values: &[T]) -> Pin<Box<dyn Future<Output = Option<usize>> + '_>>,
pub write: fn(future: u32, values: &[T]) -> Pin<Box<dyn Future<Output = usize> + '_>>,
pub read: fn(
future: u32,
values: &mut [MaybeUninit<T>],
Expand Down Expand Up @@ -173,14 +174,7 @@ impl<T> Sink<Vec<T>> for StreamWriter<T> {
vtable,
};
self.get_mut().future = Some(Box::pin(async move {
let mut offset = 0;
while offset < item.len() {
if let Some(count) = (vtable.write)(handle, &item[offset..]).await {
offset += count;
} else {
break;
}
}
(vtable.write)(handle, &item).await;
cancel_on_drop.handle = None;
drop(cancel_on_drop);
}));
Expand Down Expand Up @@ -246,7 +240,7 @@ impl<T> Drop for CancelReadOnDrop<T> {

/// Represents the readable end of a Component Model `stream`.
pub struct StreamReader<T: 'static> {
handle: u32,
handle: AtomicU32,
future: Option<Pin<Box<dyn Future<Output = Option<Vec<T>>> + 'static>>>,
vtable: &'static StreamVtable<T>,
}
Expand All @@ -273,7 +267,7 @@ impl<T> StreamReader<T> {
#[doc(hidden)]
pub fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
Self {
handle,
handle: AtomicU32::new(handle),
future: None,
vtable,
}
Expand All @@ -300,15 +294,16 @@ impl<T> StreamReader<T> {
});

Self {
handle,
handle: AtomicU32::new(handle),
future: None,
vtable,
}
}

#[doc(hidden)]
pub fn into_handle(self) -> u32 {
super::with_entry(self.handle, |entry| match entry {
pub fn take_handle(&self) -> u32 {
let handle = self.handle.swap(u32::MAX, Relaxed);
super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::LocalOpen => {
Expand All @@ -321,7 +316,7 @@ impl<T> StreamReader<T> {
},
});

ManuallyDrop::new(self).handle
handle
}
}

Expand All @@ -332,60 +327,65 @@ impl<T> Stream for StreamReader<T> {
let me = self.get_mut();

if me.future.is_none() {
me.future = Some(super::with_entry(me.handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::Write | Handle::LocalWaiting(_) => unreachable!(),
Handle::Read => {
let handle = me.handle;
let vtable = me.vtable;
let mut cancel_on_drop = CancelReadOnDrop::<T> {
handle: Some(handle),
vtable,
};
Box::pin(async move {
let mut buffer = iter::repeat_with(MaybeUninit::uninit)
.take(ceiling(64 * 1024, mem::size_of::<T>()))
.collect::<Vec<_>>();

let result =
if let Some(count) = (vtable.read)(handle, &mut buffer).await {
buffer.truncate(count);
Some(unsafe {
mem::transmute::<Vec<MaybeUninit<T>>, Vec<T>>(buffer)
})
} else {
None
};
cancel_on_drop.handle = None;
drop(cancel_on_drop);
result
}) as Pin<Box<dyn Future<Output = _>>>
}
Handle::LocalOpen => {
let (tx, rx) = oneshot::channel();
entry.insert(Handle::LocalWaiting(tx));
let mut cancel_on_drop = CancelReadOnDrop::<T> {
handle: Some(me.handle),
vtable: me.vtable,
};
Box::pin(async move {
let result = rx.map(|v| v.ok().map(|v| *v.downcast().unwrap())).await;
cancel_on_drop.handle = None;
drop(cancel_on_drop);
result
})
}
Handle::LocalClosed => Box::pin(future::ready(None)),
Handle::LocalReady(..) => {
let Handle::LocalReady(v, waker) = entry.insert(Handle::LocalOpen) else {
unreachable!()
};
waker.wake();
Box::pin(future::ready(Some(*v.downcast().unwrap())))
}
me.future = Some(super::with_entry(
me.handle.load(Relaxed),
|entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::Write | Handle::LocalWaiting(_) => unreachable!(),
Handle::Read => {
let handle = me.handle.load(Relaxed);
let vtable = me.vtable;
let mut cancel_on_drop = CancelReadOnDrop::<T> {
handle: Some(handle),
vtable,
};
Box::pin(async move {
let mut buffer = iter::repeat_with(MaybeUninit::uninit)
.take(ceiling(64 * 1024, mem::size_of::<T>()))
.collect::<Vec<_>>();

let result =
if let Some(count) = (vtable.read)(handle, &mut buffer).await {
buffer.truncate(count);
Some(unsafe {
mem::transmute::<Vec<MaybeUninit<T>>, Vec<T>>(buffer)
})
} else {
None
};
cancel_on_drop.handle = None;
drop(cancel_on_drop);
result
}) as Pin<Box<dyn Future<Output = _>>>
}
Handle::LocalOpen => {
let (tx, rx) = oneshot::channel();
entry.insert(Handle::LocalWaiting(tx));
let mut cancel_on_drop = CancelReadOnDrop::<T> {
handle: Some(me.handle.load(Relaxed)),
vtable: me.vtable,
};
Box::pin(async move {
let result =
rx.map(|v| v.ok().map(|v| *v.downcast().unwrap())).await;
cancel_on_drop.handle = None;
drop(cancel_on_drop);
result
})
}
Handle::LocalClosed => Box::pin(future::ready(None)),
Handle::LocalReady(..) => {
let Handle::LocalReady(v, waker) = entry.insert(Handle::LocalOpen)
else {
unreachable!()
};
waker.wake();
Box::pin(future::ready(Some(*v.downcast().unwrap())))
}
},
},
}));
));
}

match me.future.as_mut().unwrap().as_mut().poll(cx) {
Expand All @@ -402,24 +402,30 @@ impl<T> Drop for StreamReader<T> {
fn drop(&mut self) {
self.future = None;

super::with_entry(self.handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get_mut() {
Handle::LocalReady(..) => {
let Handle::LocalReady(_, waker) = entry.insert(Handle::LocalClosed) else {
unreachable!()
};
waker.wake();
}
Handle::LocalOpen | Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalClosed);
}
Handle::Read | Handle::LocalClosed => {
entry.remove();
(self.vtable.close_readable)(self.handle);
}
Handle::Write => unreachable!(),
},
});
match self.handle.load(Relaxed) {
u32::MAX => {}
handle => {
super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get_mut() {
Handle::LocalReady(..) => {
let Handle::LocalReady(_, waker) = entry.insert(Handle::LocalClosed)
else {
unreachable!()
};
waker.wake();
}
Handle::LocalOpen | Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalClosed);
}
Handle::Read | Handle::LocalClosed => {
entry.remove();
(self.vtable.close_readable)(handle);
}
Handle::Write => unreachable!(),
},
});
}
}
}
}
Loading

0 comments on commit 11b04d7

Please sign in to comment.