Skip to content

Commit

Permalink
Add support for async/streams/futures
Browse files Browse the repository at this point in the history
This adds support for loading, compiling, linking, and running components which
use the [Async
ABI](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Async.md)
along with the [`stream`, `future`, and
`error-context`](WebAssembly/component-model#405) types.
It also adds support for generating host bindings such that multiple host
functions can be run concurrently with guest tasks -- without monopolizing the
`Store`.

See the [implementation RFC](bytecodealliance/rfcs#38)
for details, as well as [this
repo](https://github.com/dicej/component-async-demo) containing end-to-end smoke
tests.

Signed-off-by: Joel Dice <[email protected]>

fix clippy warnings and bench/fuzzing errors

Signed-off-by: Joel Dice <[email protected]>

revert atomic.wit whitespace change

Signed-off-by: Joel Dice <[email protected]>

fix build when component-model disabled

Signed-off-by: Joel Dice <[email protected]>

bless component-macro expected output

Signed-off-by: Joel Dice <[email protected]>

fix no-std build error

Signed-off-by: Joel Dice <[email protected]>

fix build with --no-default-features --features runtime,component-model

Signed-off-by: Joel Dice <[email protected]>

partly fix no-std build

It's still broken due to the use of `std::collections::HashMap` in
crates/wasmtime/src/runtime/vm/component.rs.  I'll address that as part of the
work to avoid exposing global task/future/stream/error-context handles to
guests.

Signed-off-by: Joel Dice <[email protected]>

maintain per-instance tables for futures, streams, and error-contexts

Signed-off-by: Joel Dice <[email protected]>

refactor task/stream/future handle lifting/lowering

This addresses a couple of issues:

- Previously, we were passing task/stream/future/error-context reps directly to
  instances while keeping track of which instance had access to which rep.  That
  worked fine in that there was no way to forge access to inaccessible reps, but
  it leaked information about what other instances were doing.  Now we maintain
  per-instance waitable and error-context tables which map the reps to and from
  the handles which the instance sees.

- The `no_std` build was broken due to use of `HashMap` in
  `runtime::vm::component`, which is now fixed.

Note that we use one single table per instance for all tasks, streams, and
futures.  This is partly necessary because, when async events are delivered to
the guest, it wouldn't have enough context to know which stream or future we're
talking about if each unique stream and future type had its own table.  So at
minimum, we need to use the same table for all streams (regardless of payload
type), and likewise for futures.  Also, per
WebAssembly/component-model#395 (comment),
the plan is to move towards a shared table for all resource types as well, so
this moves us in that direction.

Signed-off-by: Joel Dice <[email protected]>

fix wave breakage due to new stream/future/error-context types

Signed-off-by: Joel Dice <[email protected]>

switch wasm-tools to v1.220.0-based branch

Signed-off-by: Joel Dice <[email protected]>

check `task.return` type at runtime

We can't statically verify a given call to `task.return` corresponds to the
expected core signature appropriate for the currently running task, so we must
do so at runtime.  In order to make that check efficient, we intern the types.

My initial plan was to use `ModuleInternedTypeIndex` and/or `VMSharedTypeIndex`
for interning, but that got hairy with WasmGC considerations, so instead I added
new fields to `ComponentTypes` and `ComponentTypesBuilder`.

Signed-off-by: Joel Dice <[email protected]>

add `TypedFunc::call_concurrent` and refine stream/future APIs

This implements what I proposed in https://github.com/dicej/rfcs/blob/component-async/accepted/component-model-async.md#wasmtime.  Specifically, it adds:

- A new `Promise` type, useful for working with concurrent operations that require access to a `Store` to make progress.
- A new `PromisesUnordered` type for `await`ing multiple promises concurrently
-`TypedFunc::call_concurrent` (which returns a `Promise`), allowing multiple host->guest calls to run concurrently on the same instance.
- Updated `{Stream|Future}{Writer|Reader}` APIs which use `Promise`

The upshot is that the embedder can now ergonomically manage arbitrary numbers
of concurrent operations.  Previously, this was a lot more difficult to do
without accidentally starving some of the operations due to another one
monopolizing the `Store`.

Finally, this includes various refactorings and fixes for bugs exposed by the
newer, more versatile APIs.

Signed-off-by: Joel Dice <[email protected]>

clean up verbosity in component/func.rs

Signed-off-by: Joel Dice <[email protected]>

snapshot

Signed-off-by: Joel Dice <[email protected]>

implement stream/future read/write cancellation

This required a somewhat viral addition of `Send` and `Sync` bounds for async
host function closure types, unfortunately.

Signed-off-by: Joel Dice <[email protected]>

add `Func::call_concurrent` and `LinkerInstance::func_new_concurrent`

Signed-off-by: Joel Dice <[email protected]>

dynamic API support for streams/futures/error-contexts

Signed-off-by: Joel Dice <[email protected]>

support callback-less (AKA stackful) async lifts

Signed-off-by: Joel Dice <[email protected]>

fix `call_host` regression

Signed-off-by: Joel Dice <[email protected]>

add component model async end-to-end tests

I've ported these over from https://github.com/dicej/component-async-demo

Signed-off-by: Joel Dice <[email protected]>

fix test regressions and clippy warnings

Signed-off-by: Joel Dice <[email protected]>

satisfy clippy

Signed-off-by: Joel Dice <[email protected]>

fix async tests when `component-model-async` enabled

Enabling this feature for all tests revealed various missing pieces in the new
`concurrent.rs` fiber mechanism, which I've addressed.

This adds a bunch of ugly `#[cfg(feature = "component-model-async")]` guards,
but those will all go away once I unify the two async fiber implementations.

Signed-off-by: Joel Dice <[email protected]>
  • Loading branch information
dicej committed Jan 2, 2025
1 parent b93e1bc commit 401a195
Show file tree
Hide file tree
Showing 199 changed files with 17,675 additions and 2,325 deletions.
369 changes: 218 additions & 151 deletions Cargo.lock

Large diffs are not rendered by default.

30 changes: 17 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ rustix = { workspace = true, features = ["mm", "param", "process"] }

[dev-dependencies]
# depend again on wasmtime to activate its default features for tests
wasmtime = { workspace = true, features = ['default', 'winch', 'pulley', 'all-arch', 'call-hook', 'memory-protection-keys', 'signals-based-traps'] }
wasmtime = { workspace = true, features = ['default', 'winch', 'pulley', 'all-arch', 'call-hook', 'memory-protection-keys', 'signals-based-traps', 'component-model-async'] }
env_logger = { workspace = true }
log = { workspace = true }
filecheck = { workspace = true }
Expand Down Expand Up @@ -146,6 +146,7 @@ members = [
"crates/bench-api",
"crates/c-api/artifact",
"crates/environ/fuzz",
"crates/misc/component-async-tests",
"crates/test-programs",
"crates/wasi-preview1-component-adapter",
"crates/wasi-preview1-component-adapter/verify",
Expand Down Expand Up @@ -229,6 +230,7 @@ wasmtime-versioned-export-macros = { path = "crates/versioned-export-macros", ve
wasmtime-slab = { path = "crates/slab", version = "=29.0.0" }
component-test-util = { path = "crates/misc/component-test-util" }
component-fuzz-util = { path = "crates/misc/component-fuzz-util" }
component-async-tests = { path = "crates/misc/component-async-tests" }
wiggle = { path = "crates/wiggle", version = "=29.0.0", default-features = false }
wiggle-macro = { path = "crates/wiggle/macro", version = "=29.0.0" }
wiggle-generate = { path = "crates/wiggle/generate", version = "=29.0.0" }
Expand Down Expand Up @@ -281,20 +283,22 @@ io-lifetimes = { version = "2.0.3", default-features = false }
io-extras = "0.18.1"
rustix = "0.38.31"
# wit-bindgen:
wit-bindgen = { version = "0.35.0", default-features = false }
wit-bindgen-rust-macro = { version = "0.35.0", default-features = false }
wit-bindgen = { git = "https://github.com/dicej/wit-bindgen", branch = "async-v1.221.2", default-features = false }
wit-bindgen-rt = { git = "https://github.com/dicej/wit-bindgen", branch = "async-v1.221.2", default-features = false }
wit-bindgen-rust-macro = { git = "https://github.com/dicej/wit-bindgen", branch = "async-v1.221.2", default-features = false }

# wasm-tools family:
wasmparser = { version = "0.221.2", default-features = false, features = ['simd'] }
wat = "1.221.2"
wast = "221.0.2"
wasmprinter = "0.221.2"
wasm-encoder = "0.221.2"
wasm-smith = "0.221.2"
wasm-mutate = "0.221.2"
wit-parser = "0.221.2"
wit-component = "0.221.2"
wasm-wave = "0.221.2"
wasmparser = { git = "https://github.com/dicej/wasm-tools", branch = "async-v1.221.2", default-features = false, features = ['simd'] }
wat = { git = "https://github.com/dicej/wasm-tools", branch = "async-v1.221.2" }
wast = { git = "https://github.com/dicej/wasm-tools", branch = "async-v1.221.2" }
wasmprinter = { git = "https://github.com/dicej/wasm-tools", branch = "async-v1.221.2" }
wasm-encoder = { git = "https://github.com/dicej/wasm-tools", branch = "async-v1.221.2" }
wasm-smith = { git = "https://github.com/dicej/wasm-tools", branch = "async-v1.221.2" }
wasm-mutate = { git = "https://github.com/dicej/wasm-tools", branch = "async-v1.221.2" }
wit-parser = { git = "https://github.com/dicej/wasm-tools", branch = "async-v1.221.2" }
wit-component = { git = "https://github.com/dicej/wasm-tools", branch = "async-v1.221.2" }
wasm-wave = { git = "https://github.com/dicej/wasm-tools", branch = "async-v1.221.2" }
wasm-compose = { git = "https://github.com/dicej/wasm-tools", branch = "async-v1.221.2" }

# Non-Bytecode Alliance maintained dependencies:
# --------------------------
Expand Down
5 changes: 3 additions & 2 deletions benches/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ fn bench_host_to_wasm<Params, Results>(
typed_results: Results,
) where
Params: WasmParams + ToVals + Copy,
Results: WasmResults + ToVals + Copy + PartialEq + Debug,
Results: WasmResults + ToVals + Copy + PartialEq + Debug + Sync + 'static,
{
// Benchmark the "typed" version, which should be faster than the versions
// below.
Expand Down Expand Up @@ -628,7 +628,8 @@ mod component {
+ PartialEq
+ Debug
+ Send
+ Sync,
+ Sync
+ 'static,
{
// Benchmark the "typed" version.
c.bench_function(&format!("component - host-to-wasm - typed - {name}"), |b| {
Expand Down
1 change: 1 addition & 0 deletions crates/component-macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ similar = { workspace = true }
[features]
async = []
std = ['wasmtime-wit-bindgen/std']
component-model-async = ['std', 'async', 'wasmtime-wit-bindgen/component-model-async']
48 changes: 41 additions & 7 deletions crates/component-macro/src/bindgen.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use proc_macro2::{Span, TokenStream};
use quote::ToTokens;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::env;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use syn::parse::{Error, Parse, ParseStream, Result};
use syn::punctuated::Punctuated;
use syn::{braced, token, Token};
use wasmtime_wit_bindgen::{AsyncConfig, Opts, Ownership, TrappableError, TrappableImports};
use wasmtime_wit_bindgen::{
AsyncConfig, CallStyle, Opts, Ownership, TrappableError, TrappableImports,
};
use wit_parser::{PackageId, Resolve, UnresolvedPackageGroup, WorldId};

pub struct Config {
Expand All @@ -20,13 +21,22 @@ pub struct Config {
}

pub fn expand(input: &Config) -> Result<TokenStream> {
if !cfg!(feature = "async") && input.opts.async_.maybe_async() {
if let (CallStyle::Async | CallStyle::Concurrent, false) =
(input.opts.call_style(), cfg!(feature = "async"))
{
return Err(Error::new(
Span::call_site(),
"cannot enable async bindings unless `async` crate feature is active",
));
}

if input.opts.concurrent_imports && !cfg!(feature = "component-model-async") {
return Err(Error::new(
Span::call_site(),
"cannot enable `concurrent_imports` option unless `component-model-async` crate feature is active",
));
}

let mut src = match input.opts.generate(&input.resolve, input.world) {
Ok(s) => s,
Err(e) => return Err(Error::new(Span::call_site(), e.to_string())),
Expand All @@ -40,7 +50,10 @@ pub fn expand(input: &Config) -> Result<TokenStream> {
// place a formatted version of the expanded code into a file. This file
// will then show up in rustc error messages for any codegen issues and can
// be inspected manually.
if input.include_generated_code_from_file || std::env::var("WASMTIME_DEBUG_BINDGEN").is_ok() {
if input.include_generated_code_from_file
|| input.opts.debug
|| std::env::var("WASMTIME_DEBUG_BINDGEN").is_ok()
{
static INVOCATION: AtomicUsize = AtomicUsize::new(0);
let root = Path::new(env!("DEBUG_OUTPUT_DIR"));
let world_name = &input.resolve.worlds[input.world].name;
Expand Down Expand Up @@ -107,13 +120,16 @@ impl Parse for Config {
}
Opt::Tracing(val) => opts.tracing = val,
Opt::VerboseTracing(val) => opts.verbose_tracing = val,
Opt::Debug(val) => opts.debug = val,
Opt::Async(val, span) => {
if async_configured {
return Err(Error::new(span, "cannot specify second async config"));
}
async_configured = true;
opts.async_ = val;
}
Opt::ConcurrentImports(val) => opts.concurrent_imports = val,
Opt::ConcurrentExports(val) => opts.concurrent_exports = val,
Opt::TrappableErrorType(val) => opts.trappable_error_type = val,
Opt::TrappableImports(val) => opts.trappable_imports = val,
Opt::Ownership(val) => opts.ownership = val,
Expand All @@ -138,7 +154,7 @@ impl Parse for Config {
"cannot specify a world with `interfaces`",
));
}
world = Some("interfaces".to_string());
world = Some("wasmtime:component-macro-synthesized/interfaces".to_string());

opts.only_interfaces = true;
}
Expand Down Expand Up @@ -281,6 +297,9 @@ mod kw {
syn::custom_keyword!(require_store_data_send);
syn::custom_keyword!(wasmtime_crate);
syn::custom_keyword!(include_generated_code_from_file);
syn::custom_keyword!(concurrent_imports);
syn::custom_keyword!(concurrent_exports);
syn::custom_keyword!(debug);
}

enum Opt {
Expand All @@ -301,12 +320,19 @@ enum Opt {
RequireStoreDataSend(bool),
WasmtimeCrate(syn::Path),
IncludeGeneratedCodeFromFile(bool),
ConcurrentImports(bool),
ConcurrentExports(bool),
Debug(bool),
}

impl Parse for Opt {
fn parse(input: ParseStream<'_>) -> Result<Self> {
let l = input.lookahead1();
if l.peek(kw::path) {
if l.peek(kw::debug) {
input.parse::<kw::debug>()?;
input.parse::<Token![:]>()?;
Ok(Opt::Debug(input.parse::<syn::LitBool>()?.value))
} else if l.peek(kw::path) {
input.parse::<kw::path>()?;
input.parse::<Token![:]>()?;

Expand Down Expand Up @@ -380,6 +406,14 @@ impl Parse for Opt {
span,
))
}
} else if l.peek(kw::concurrent_imports) {
input.parse::<kw::concurrent_imports>()?;
input.parse::<Token![:]>()?;
Ok(Opt::ConcurrentImports(input.parse::<syn::LitBool>()?.value))
} else if l.peek(kw::concurrent_exports) {
input.parse::<kw::concurrent_exports>()?;
input.parse::<Token![:]>()?;
Ok(Opt::ConcurrentExports(input.parse::<syn::LitBool>()?.value))
} else if l.peek(kw::ownership) {
input.parse::<kw::ownership>()?;
input.parse::<Token![:]>()?;
Expand Down
29 changes: 21 additions & 8 deletions crates/component-macro/tests/expanded/char.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<T> Clone for TheWorldPre<T> {
}
}
}
impl<_T> TheWorldPre<_T> {
impl<_T: 'static> TheWorldPre<_T> {
/// Creates a new copy of `TheWorldPre` bindings which can then
/// be used to instantiate into a particular store.
///
Expand Down Expand Up @@ -152,7 +152,10 @@ const _: () = {
mut store: impl wasmtime::AsContextMut<Data = _T>,
component: &wasmtime::component::Component,
linker: &wasmtime::component::Linker<_T>,
) -> wasmtime::Result<TheWorld> {
) -> wasmtime::Result<TheWorld>
where
_T: 'static,
{
let pre = linker.instantiate_pre(component)?;
TheWorldPre::new(pre)?.instantiate(store)
}
Expand Down Expand Up @@ -194,19 +197,23 @@ pub mod foo {
}
pub trait GetHost<
T,
>: Fn(T) -> <Self as GetHost<T>>::Host + Send + Sync + Copy + 'static {
D,
>: Fn(T) -> <Self as GetHost<T, D>>::Host + Send + Sync + Copy + 'static {
type Host: Host;
}
impl<F, T, O> GetHost<T> for F
impl<F, T, D, O> GetHost<T, D> for F
where
F: Fn(T) -> O + Send + Sync + Copy + 'static,
O: Host,
{
type Host = O;
}
pub fn add_to_linker_get_host<T>(
pub fn add_to_linker_get_host<
T,
G: for<'a> GetHost<&'a mut T, T, Host: Host>,
>(
linker: &mut wasmtime::component::Linker<T>,
host_getter: impl for<'a> GetHost<&'a mut T>,
host_getter: G,
) -> wasmtime::Result<()> {
let mut inst = linker.instance("foo:foo/chars")?;
inst.func_wrap(
Expand Down Expand Up @@ -354,7 +361,10 @@ pub mod exports {
&self,
mut store: S,
arg0: char,
) -> wasmtime::Result<()> {
) -> wasmtime::Result<()>
where
<S as wasmtime::AsContext>::Data: Send + 'static,
{
let callee = unsafe {
wasmtime::component::TypedFunc::<
(char,),
Expand All @@ -369,7 +379,10 @@ pub mod exports {
pub fn call_return_char<S: wasmtime::AsContextMut>(
&self,
mut store: S,
) -> wasmtime::Result<char> {
) -> wasmtime::Result<char>
where
<S as wasmtime::AsContext>::Data: Send + 'static,
{
let callee = unsafe {
wasmtime::component::TypedFunc::<
(),
Expand Down
25 changes: 13 additions & 12 deletions crates/component-macro/tests/expanded/char_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<T> Clone for TheWorldPre<T> {
}
}
}
impl<_T> TheWorldPre<_T> {
impl<_T: Send + 'static> TheWorldPre<_T> {
/// Creates a new copy of `TheWorldPre` bindings which can then
/// be used to instantiate into a particular store.
///
Expand Down Expand Up @@ -46,10 +46,7 @@ impl<_T> TheWorldPre<_T> {
pub async fn instantiate_async(
&self,
mut store: impl wasmtime::AsContextMut<Data = _T>,
) -> wasmtime::Result<TheWorld>
where
_T: Send,
{
) -> wasmtime::Result<TheWorld> {
let mut store = store.as_context_mut();
let instance = self.instance_pre.instantiate_async(&mut store).await?;
self.indices.load(&mut store, &instance)
Expand Down Expand Up @@ -157,7 +154,7 @@ const _: () = {
linker: &wasmtime::component::Linker<_T>,
) -> wasmtime::Result<TheWorld>
where
_T: Send,
_T: Send + 'static,
{
let pre = linker.instantiate_pre(component)?;
TheWorldPre::new(pre)?.instantiate_async(store).await
Expand Down Expand Up @@ -202,19 +199,23 @@ pub mod foo {
}
pub trait GetHost<
T,
>: Fn(T) -> <Self as GetHost<T>>::Host + Send + Sync + Copy + 'static {
D,
>: Fn(T) -> <Self as GetHost<T, D>>::Host + Send + Sync + Copy + 'static {
type Host: Host + Send;
}
impl<F, T, O> GetHost<T> for F
impl<F, T, D, O> GetHost<T, D> for F
where
F: Fn(T) -> O + Send + Sync + Copy + 'static,
O: Host + Send,
{
type Host = O;
}
pub fn add_to_linker_get_host<T>(
pub fn add_to_linker_get_host<
T,
G: for<'a> GetHost<&'a mut T, T, Host: Host + Send>,
>(
linker: &mut wasmtime::component::Linker<T>,
host_getter: impl for<'a> GetHost<&'a mut T>,
host_getter: G,
) -> wasmtime::Result<()>
where
T: Send,
Expand Down Expand Up @@ -372,7 +373,7 @@ pub mod exports {
arg0: char,
) -> wasmtime::Result<()>
where
<S as wasmtime::AsContext>::Data: Send,
<S as wasmtime::AsContext>::Data: Send + 'static,
{
let callee = unsafe {
wasmtime::component::TypedFunc::<
Expand All @@ -392,7 +393,7 @@ pub mod exports {
mut store: S,
) -> wasmtime::Result<char>
where
<S as wasmtime::AsContext>::Data: Send,
<S as wasmtime::AsContext>::Data: Send + 'static,
{
let callee = unsafe {
wasmtime::component::TypedFunc::<
Expand Down
Loading

0 comments on commit 401a195

Please sign in to comment.