Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

wit-bindgen

Package Overview
Dependencies
Maintainers
0
Versions
61
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

wit-bindgen - npm Package Compare versions

Comparing version
0.48.1
to
0.49.0
+45
src/rt/async_support/inter_task_wakeup_disabled.rs
use super::FutureState;
#[derive(Default)]
pub struct State;
impl FutureState<'_> {
pub(super) fn read_inter_task_stream(&mut self) {
assert!(
self.remaining_work(),
"
Rust task cannot sleep waiting only on Rust-originating events unless the
`wit-bindgen` crate is compiled with the `inter-task-wakeup` feature
enabled.
"
);
}
pub(super) fn cancel_inter_task_stream_read(&mut self) {
// nothing to do
}
}
impl State {
pub fn consume_waitable_event(&mut self, _waitable: u32, _code: u32) -> bool {
false
}
}
#[derive(Default)]
pub struct WakerState;
impl WakerState {
pub fn wake(&self) {
panic!(
"
Cannot support cross-component-model-task wakeup unlses the `wit-bindgen`
crate is compiled with the `inter-task-wakeup` feature enabled.
"
);
}
}
use super::FutureState;
use crate::rt::async_support::{BLOCKED, COMPLETED};
use crate::{RawStreamReader, RawStreamWriter, StreamOps, UnitStreamOps};
use std::ptr;
use std::sync::Mutex;
#[derive(Default)]
pub struct State {
/// A lazily-initialized stream used to signal inter-task notifications.
///
/// This stream is used when one component-model task is used to wake up
/// another component-model task. This can happen when the async event being
/// waited on is defined purely in Rust, for example, and doesn't rely on
/// any component-model primitives.
stream: Option<RawStreamReader<UnitStreamOps>>,
/// Boolean if there's an active read of `inter_task_stream`. Used to handle
/// cancellation/deduplication of the read.
stream_reading: bool,
}
impl FutureState<'_> {
pub(super) fn read_inter_task_stream(&mut self) {
// Lazily allocate the inter-task stream now that we're actually going
// to sleep. We don't know where the wakeup notification will come from
// so it's required to allocate one here.
if self.inter_task_wakeup.stream.is_none() {
assert!(!self.inter_task_wakeup.stream_reading);
let (writer, reader) = UnitStreamOps::new();
self.inter_task_wakeup.stream = Some(reader);
let mut waker_stream = self.waker.inter_task_stream.lock.lock().unwrap();
assert!(waker_stream.is_none());
*waker_stream = Some(writer);
}
// If there's not already a pending read then schedule a new read here.
//
// Note that this should always return `BLOCKED` since as the only task
// running it's not possible for a read to be anywhere else in the
// system. Additionally we keep the read end alive, so this shouldn't
// ever returned dropped/closed either.
if !self.inter_task_wakeup.stream_reading {
let stream = self.inter_task_wakeup.stream.as_mut().unwrap();
let handle = stream.handle();
let rc = unsafe { UnitStreamOps.start_read(handle, ptr::null_mut(), 1) };
assert_eq!(rc, BLOCKED);
self.inter_task_wakeup.stream_reading = true;
self.add_waitable(handle);
}
}
/// Cancels the active read of the inter-task stream, if any.
///
/// Has no effect if there is no active read.
pub(super) fn cancel_inter_task_stream_read(&mut self) {
if !self.inter_task_wakeup.stream_reading {
return;
}
self.inter_task_wakeup.stream_reading = false;
let handle = self.inter_task_wakeup.stream.as_mut().unwrap().handle();
// Note that the return code here is discarded. No matter what the read
// is cancelled, and whether we actually read something or whether we
// cancelled doesn't matter.
unsafe {
UnitStreamOps.cancel_read(handle);
}
self.remove_waitable(handle);
}
}
impl State {
pub fn consume_waitable_event(&mut self, waitable: u32, _code: u32) -> bool {
if let Some(reader) = self.stream.as_mut() {
if reader.handle() == waitable {
self.stream_reading = false;
return true;
}
}
false
}
}
#[derive(Default)]
pub struct WakerState {
lock: Mutex<Option<RawStreamWriter<UnitStreamOps>>>,
}
impl WakerState {
pub fn wake(&self) {
// Here the wakeup stream should already have been filled in by the
// original future itself. The stream should also have an active read
// while the future is sleeping. This means that this write should
// succeed immediately.
let mut inter_task_stream = self.lock.lock().unwrap();
let stream = inter_task_stream.as_mut().unwrap();
let rc = unsafe { UnitStreamOps.start_write(stream.handle(), ptr::null_mut(), 1) };
assert_eq!(rc, COMPLETED | (1 << 4));
}
}
use crate::rt::async_support::raw_stream_new;
use crate::{RawStreamReader, RawStreamWriter, StreamOps};
use std::alloc::Layout;
/// Operations for `stream<()>`.
///
/// Can be combined with [`RawStreamWriter`] and [`RawStreamReader`] as created
/// through [`UnitStreamOps::new`]
#[derive(Copy, Clone)]
pub struct UnitStreamOps;
extern_wasm! {
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[stream-new-unit]"]
fn unit_new() -> u64;
#[link_name = "[async-lower][stream-write-unit]"]
fn unit_write(stream: u32, val: *const u8, amt: usize) -> u32;
#[link_name = "[async-lower][stream-read-unit]"]
fn unit_read(stream: u32, val: *mut u8, amt: usize) -> u32;
#[link_name = "[stream-cancel-read-unit]"]
fn unit_cancel_read(stream: u32) -> u32;
#[link_name = "[stream-cancel-write-unit]"]
fn unit_cancel_write(stream: u32) -> u32;
#[link_name = "[stream-drop-readable-unit]"]
fn unit_drop_readable(stream: u32) ;
#[link_name = "[stream-drop-writable-unit]"]
fn unit_drop_writable(stream: u32) ;
}
}
impl UnitStreamOps {
/// Creates a new unit stream read/write pair.
pub fn new() -> (RawStreamWriter<Self>, RawStreamReader<Self>) {
unsafe { raw_stream_new(UnitStreamOps) }
}
}
unsafe impl StreamOps for UnitStreamOps {
type Payload = ();
fn new(&mut self) -> u64 {
unsafe { unit_new() }
}
fn elem_layout(&self) -> Layout {
Layout::new::<()>()
}
fn native_abi_matches_canonical_abi(&self) -> bool {
true
}
fn contains_lists(&self) -> bool {
false
}
unsafe fn lower(&mut self, (): (), _dst: *mut u8) {
unreachable!()
}
unsafe fn dealloc_lists(&mut self, _dst: *mut u8) {
unreachable!()
}
unsafe fn lift(&mut self, _dst: *mut u8) -> Self::Payload {
unreachable!()
}
unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32 {
unsafe { unit_write(stream, val, amt) }
}
unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32 {
unsafe { unit_read(stream, val, amt) }
}
unsafe fn cancel_read(&mut self, stream: u32) -> u32 {
unsafe { unit_cancel_read(stream) }
}
unsafe fn cancel_write(&mut self, stream: u32) -> u32 {
unsafe { unit_cancel_write(stream) }
}
unsafe fn drop_readable(&mut self, stream: u32) {
unsafe { unit_drop_readable(stream) }
}
unsafe fn drop_writable(&mut self, stream: u32) {
unsafe { unit_drop_writable(stream) }
}
}
+1
-1
{
"git": {
"sha1": "d44d953b9bf9841d1bf5d1744fa7400cbecbefd7"
"sha1": "ba933bda98214ffe2ae16c63b2b91d0932021bbc"
},
"path_in_vcs": "crates/guest-rust"
}

@@ -13,5 +13,5 @@ # This file is automatically @generated by Cargo.

name = "bitflags"
version = "2.9.4"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394"
checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3"

@@ -130,5 +130,5 @@ [[package]]

name = "hashbrown"
version = "0.16.0"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"

@@ -149,8 +149,8 @@ [[package]]

name = "indexmap"
version = "2.11.4"
version = "2.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5"
checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2"
dependencies = [
"equivalent",
"hashbrown 0.16.0",
"hashbrown 0.16.1",
"serde",

@@ -208,5 +208,5 @@ "serde_core",

name = "proc-macro2"
version = "1.0.101"
version = "1.0.103"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de"
checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
dependencies = [

@@ -218,5 +218,5 @@ "unicode-ident",

name = "quote"
version = "1.0.41"
version = "1.0.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1"
checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f"
dependencies = [

@@ -300,5 +300,5 @@ "proc-macro2",

name = "syn"
version = "2.0.106"
version = "2.0.111"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6"
checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87"
dependencies = [

@@ -312,5 +312,5 @@ "proc-macro2",

name = "unicode-ident"
version = "1.0.19"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d"
checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"

@@ -325,5 +325,5 @@ [[package]]

name = "wasm-encoder"
version = "0.241.2"
version = "0.243.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e01164c9dda68301e34fdae536c23ed6fe90ce6d97213ccc171eebbd3d02d6b8"
checksum = "c55db9c896d70bd9fa535ce83cd4e1f2ec3726b0edd2142079f594fc3be1cb35"
dependencies = [

@@ -336,5 +336,5 @@ "leb128fmt",

name = "wasm-metadata"
version = "0.241.2"
version = "0.243.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "876fe286f2fa416386deedebe8407e6f19e0b5aeaef3d03161e77a15fa80f167"
checksum = "eae05bf9579f45a62e8d0a4e3f52eaa8da518883ac5afa482ec8256c329ecd56"
dependencies = [

@@ -349,5 +349,5 @@ "anyhow",

name = "wasmparser"
version = "0.241.2"
version = "0.243.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46d90019b1afd4b808c263e428de644f3003691f243387d30d673211ee0cb8e8"
checksum = "f6d8db401b0528ec316dfbe579e6ab4152d61739cfe076706d2009127970159d"
dependencies = [

@@ -362,3 +362,3 @@ "bitflags",

name = "wit-bindgen"
version = "0.48.1"
version = "0.49.0"
dependencies = [

@@ -374,5 +374,5 @@ "bitflags",

name = "wit-bindgen-core"
version = "0.48.1"
version = "0.49.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b881a098cae03686d7a0587f8f306f8a58102ad8da8b5599100fbe0e7f5800b"
checksum = "886e8e938e4e9fe54143c080cbb99d7db5d19242b62ef225dbb28e17b3223bd8"
dependencies = [

@@ -386,5 +386,5 @@ "anyhow",

name = "wit-bindgen-rust"
version = "0.48.1"
version = "0.49.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69667efa439a453e1d50dac939c6cab6d2c3ac724a9d232b6631dad2472a5b70"
checksum = "145cac8fb12d99aea13a3f9e0d07463fa030edeebab2c03805eda0e1cc229bba"
dependencies = [

@@ -403,5 +403,5 @@ "anyhow",

name = "wit-bindgen-rust-macro"
version = "0.48.1"
version = "0.49.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae2e22cceb5d105d52326c07e3e67603a861cc7add70fc467f7cc7ec5265017"
checksum = "6042452ac4e58891cdb6321bb98aabb9827dbaf6f4e971734d8dd86813319aea"
dependencies = [

@@ -419,5 +419,5 @@ "anyhow",

name = "wit-component"
version = "0.241.2"
version = "0.243.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0c57df25e7ee612d946d3b7646c1ddb2310f8280aa2c17e543b66e0812241"
checksum = "36f9fc53513e461ce51dcf17a3e331752cb829f1d187069e54af5608fc998fe4"
dependencies = [

@@ -439,5 +439,5 @@ "anyhow",

name = "wit-parser"
version = "0.241.2"
version = "0.243.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ef1c6ad67f35c831abd4039c02894de97034100899614d1c44e2268ad01c91"
checksum = "df983a8608e513d8997f435bb74207bf0933d0e49ca97aa9d8a6157164b9b7fc"
dependencies = [

@@ -444,0 +444,0 @@ "anyhow",

@@ -16,3 +16,3 @@ # THIS FILE IS AUTOMATICALLY GENERATED BY CARGO

name = "wit-bindgen"
version = "0.48.1"
version = "0.49.0"
authors = ["Alex Crichton <alex@alexcrichton.com>"]

@@ -54,2 +54,3 @@ build = "build.rs"

]
inter-task-wakeup = ["async"]
macros = ["dep:wit-bindgen-rust-macro"]

@@ -86,3 +87,3 @@ realloc = []

[dependencies.wit-bindgen-rust-macro]
version = "0.48.1"
version = "0.49.0"
optional = true

@@ -881,7 +881,11 @@ //! Bindings generation support for Rust with the Component Model.

pub use rt::async_support::spawn;
#[cfg(feature = "inter-task-wakeup")]
pub use rt::async_support::UnitStreamOps;
#[cfg(feature = "async")]
pub use rt::async_support::{
backpressure_dec, backpressure_inc, block_on, yield_async, yield_blocking, AbiBuffer,
FutureRead, FutureReader, FutureWrite, FutureWriteCancel, FutureWriteError, FutureWriter,
StreamRead, StreamReader, StreamResult, StreamWrite, StreamWriter,
FutureOps, FutureRead, FutureReader, FutureWrite, FutureWriteCancel, FutureWriteError,
FutureWriter, RawFutureRead, RawFutureReader, RawFutureWrite, RawFutureWriter, RawStreamRead,
RawStreamReader, RawStreamWrite, RawStreamWriter, StreamOps, StreamRead, StreamReader,
StreamResult, StreamWrite, StreamWriter,
};
#![deny(missing_docs)]
extern crate std;
use core::sync::atomic::{AtomicBool, Ordering};
use core::sync::atomic::{AtomicU32, Ordering};
use std::boxed::Box;

@@ -24,3 +24,36 @@ use std::collections::BTreeMap;

}
}
/// Helper macro to deduplicate foreign definitions of wasm functions.
///
/// This automatically imports when on wasm targets and then defines a dummy
/// panicking shim for native targets to support native compilation but fail at
/// runtime.
macro_rules! extern_wasm {
(
$(#[$extern_attr:meta])*
extern "C" {
$(
$(#[$func_attr:meta])*
$vis:vis fn $func_name:ident ( $($args:tt)* ) $(-> $ret:ty)?;
)*
}
) => {
$(
#[cfg(not(target_family = "wasm"))]
#[allow(unused)]
$vis unsafe fn $func_name($($args)*) $(-> $ret)? {
unreachable!();
}
)*
#[cfg(target_family = "wasm")]
$(#[$extern_attr])*
extern "C" {
$(
$(#[$func_attr])*
$vis fn $func_name($($args)*) $(-> $ret)?;
)*
}
};
}

@@ -32,7 +65,16 @@

mod future_support;
#[cfg(feature = "inter-task-wakeup")]
mod inter_task_wakeup;
mod stream_support;
mod subtask;
#[cfg(feature = "inter-task-wakeup")]
mod unit_stream;
mod waitable;
mod waitable_set;
#[cfg(not(feature = "inter-task-wakeup"))]
use inter_task_wakeup_disabled as inter_task_wakeup;
#[cfg(not(feature = "inter-task-wakeup"))]
mod inter_task_wakeup_disabled;
use self::waitable_set::WaitableSet;

@@ -45,2 +87,4 @@ pub use abi_buffer::*;

pub use subtask::Subtask;
#[cfg(feature = "inter-task-wakeup")]
pub use unit_stream::*;

@@ -90,2 +134,5 @@ type BoxFuture<'a> = Pin<Box<dyn Future<Output = ()> + 'a>>;

waker_clone: Waker,
/// State related to supporting inter-task wakeup scenarios.
inter_task_wakeup: inter_task_wakeup::State,
}

@@ -109,2 +156,3 @@

},
inter_task_wakeup: Default::default(),
}

@@ -131,3 +179,3 @@ }

/// return code along with a flag whether this future is "done" or not.
fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> (u32, bool) {
fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> CallbackCode {
match event0 {

@@ -147,37 +195,40 @@ EVENT_NONE => rtdebug!("EVENT_NONE"),

// transitively run all destructors.
return (CALLBACK_CODE_EXIT, true);
return CallbackCode::Exit;
}
_ => unreachable!(),
}
if event0 != EVENT_NONE {
self.deliver_waitable_event(event1, event2)
}
self.poll()
}
self.with_p3_task_set(|me| {
// Transition our sleep state to ensure that the inter-task stream
// isn't used since there's no need to use that here.
me.waker
.sleep_state
.store(SLEEP_STATE_WOKEN, Ordering::Relaxed);
/// Deliver the `code` event to the `waitable` store within our map. This
/// waitable should be present because it's part of the waitable set which
/// is kept in-sync with our map.
fn deliver_waitable_event(&mut self, waitable: u32, code: u32) {
self.remove_waitable(waitable);
let (ptr, callback) = self.waitables.remove(&waitable).unwrap();
unsafe {
callback(ptr, code);
}
}
// With all of our context now configured, deliver the event
// notification this callback corresponds to.
//
// Note that this should happen under the reset of
// `waker.sleep_state` above to ensure that if a waker is woken it
// won't actually signal our inter-task stream since we're already
// in the process of handling the future.
if event0 != EVENT_NONE {
me.deliver_waitable_event(event1, event2)
}
/// Poll this task until it either completes or can't make immediate
/// progress.
///
/// Returns the code representing what happened along with a boolean as to
/// whether this execution is done.
fn poll(&mut self) -> (u32, bool) {
self.with_p3_task_set(|me| {
// If there's still an in-progress read (e.g. `event{1,2}`) wasn't
// ourselves getting woken up, then cancel the read since we're
// processing the future here anyway.
me.cancel_inter_task_stream_read();
let mut context = Context::from_waker(&me.waker_clone);
loop {
// Reset the waker before polling to clear out any pending
// notification, if any.
me.waker.0.store(false, Ordering::Relaxed);
// On each turn of this loop reset the state to "polling"
// which clears out any pending wakeup if one was sent. This
// in theory helps minimize wakeups from previous iterations
// happening in this iteration.
me.waker
.sleep_state
.store(SLEEP_STATE_POLLING, Ordering::Relaxed);

@@ -200,5 +251,5 @@ // Poll our future, seeing if it was able to make progress.

let waitable = me.waitable_set.as_ref().unwrap().as_raw();
break (CALLBACK_CODE_WAIT | (waitable << 4), false);
break CallbackCode::Wait(waitable);
} else {
break (CALLBACK_CODE_EXIT, true);
break CallbackCode::Exit;
}

@@ -213,9 +264,19 @@ }

assert!(!me.tasks.is_empty());
if me.waker.0.load(Ordering::Relaxed) {
break (CALLBACK_CODE_YIELD, false);
if me.waker.sleep_state.load(Ordering::Relaxed) == SLEEP_STATE_WOKEN {
if me.remaining_work() {
let waitable = me.waitable_set.as_ref().unwrap().as_raw();
break CallbackCode::Poll(waitable);
}
break CallbackCode::Yield;
}
assert!(me.remaining_work());
// Transition our state to "sleeping" so wakeup
// notifications know that they need to signal the
// inter-task stream.
me.waker
.sleep_state
.store(SLEEP_STATE_SLEEPING, Ordering::Relaxed);
me.read_inter_task_stream();
let waitable = me.waitable_set.as_ref().unwrap().as_raw();
break (CALLBACK_CODE_WAIT | (waitable << 4), false);
break CallbackCode::Wait(waitable);
}

@@ -227,2 +288,21 @@ }

/// Deliver the `code` event to the `waitable` store within our map. This
/// waitable should be present because it's part of the waitable set which
/// is kept in-sync with our map.
fn deliver_waitable_event(&mut self, waitable: u32, code: u32) {
self.remove_waitable(waitable);
if self
.inter_task_wakeup
.consume_waitable_event(waitable, code)
{
return;
}
let (ptr, callback) = self.waitables.remove(&waitable).unwrap();
unsafe {
callback(ptr, code);
}
}
fn with_p3_task_set<R>(&mut self, f: impl FnOnce(&mut Self) -> R) -> R {

@@ -252,2 +332,6 @@ // Finish our `wasip3_task` by initializing its self-referential pointer,

fn drop(&mut self) {
// If there's an active read of the inter-task stream, go ahead and
// cancel it, since we're about to drop the stream anyway.
self.cancel_inter_task_stream_read();
// If this state has active tasks then they need to be dropped which may

@@ -291,4 +375,17 @@ // execute arbitrary code. This arbitrary code might require the p3 APIs

/// Status for "this task is actively being polled"
const SLEEP_STATE_POLLING: u32 = 0;
/// Status for "this task has a wakeup scheduled, no more action need be taken".
const SLEEP_STATE_WOKEN: u32 = 1;
/// Status for "this task is not being polled and has not been woken"
///
/// Wakeups on this status signal the inter-task stream.
const SLEEP_STATE_SLEEPING: u32 = 2;
#[derive(Default)]
struct FutureWaker(AtomicBool);
struct FutureWaker {
/// One of `SLEEP_STATE_*` indicating the current status.
sleep_state: AtomicU32,
inter_task_stream: inter_task_wakeup::WakerState,
}

@@ -301,3 +398,14 @@ impl Wake for FutureWaker {

fn wake_by_ref(self: &Arc<Self>) {
self.0.store(true, Ordering::Relaxed)
match self.sleep_state.swap(SLEEP_STATE_WOKEN, Ordering::Relaxed) {
// If this future was currently being polled, or if someone else
// already woke it up, then there's nothing to do.
SLEEP_STATE_POLLING | SLEEP_STATE_WOKEN => {}
// If this future is sleeping, however, then this is a cross-task
// wakeup meaning that we need to write to its wakeup stream.
other => {
assert_eq!(other, SLEEP_STATE_SLEEPING);
self.inter_task_stream.wake();
}
}
}

@@ -314,7 +422,21 @@ }

const CALLBACK_CODE_EXIT: u32 = 0;
const CALLBACK_CODE_YIELD: u32 = 1;
const CALLBACK_CODE_WAIT: u32 = 2;
const _CALLBACK_CODE_POLL: u32 = 3;
#[derive(PartialEq, Debug)]
enum CallbackCode {
Exit,
Yield,
Wait(u32),
Poll(u32),
}
impl CallbackCode {
fn encode(self) -> u32 {
match self {
CallbackCode::Exit => 0,
CallbackCode::Yield => 1,
CallbackCode::Wait(waitable) => 2 | (waitable << 4),
CallbackCode::Poll(waitable) => 3 | (waitable << 4),
}
}
}
const STATUS_STARTING: u32 = 0;

@@ -409,4 +531,4 @@ const STATUS_STARTED: u32 = 1;

unsafe {
let (rc, done) = (*state).callback(event0, event1, event2);
if done {
let rc = (*state).callback(event0, event1, event2);
if rc == CallbackCode::Exit {
drop(Box::from_raw(state));

@@ -416,4 +538,4 @@ } else {

}
rtdebug!(" => (cb) {rc:#x}");
rc
rtdebug!(" => (cb) {rc:?}");
rc.encode()
}

@@ -435,8 +557,10 @@ }

match state.callback(event.0, event.1, event.2) {
(_, true) => {
CallbackCode::Exit => {
drop(state);
break result.unwrap();
}
(CALLBACK_CODE_YIELD, false) => event = state.waitable_set.as_ref().unwrap().poll(),
_ => event = state.waitable_set.as_ref().unwrap().wait(),
CallbackCode::Yield | CallbackCode::Poll(_) => {
event = state.waitable_set.as_ref().unwrap().poll()
}
CallbackCode::Wait(_) => event = state.waitable_set.as_ref().unwrap().wait(),
}

@@ -467,13 +591,10 @@ }

pub fn yield_blocking() -> bool {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn yield_() -> bool {
unreachable!();
extern_wasm! {
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[thread-yield]"]
fn yield_() -> bool;
}
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[thread-yield]"]
fn yield_() -> bool;
}
// Note that the return value from the raw intrinsic is inverted, the

@@ -529,14 +650,10 @@ // canonical ABI returns "did this task get cancelled" while this function

pub fn backpressure_set(enabled: bool) {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn backpressure_set(_: i32) {
unreachable!();
extern_wasm! {
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[backpressure-set]"]
fn backpressure_set(_: i32);
}
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[backpressure-set]"]
fn backpressure_set(_: i32);
}
unsafe { backpressure_set(if enabled { 1 } else { 0 }) }

@@ -547,14 +664,10 @@ }

pub fn backpressure_inc() {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn backpressure_inc() {
unreachable!();
extern_wasm! {
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[backpressure-inc]"]
fn backpressure_inc();
}
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[backpressure-inc]"]
fn backpressure_inc();
}
unsafe { backpressure_inc() }

@@ -565,14 +678,10 @@ }

pub fn backpressure_dec() {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn backpressure_dec() {
unreachable!();
extern_wasm! {
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[backpressure-dec]"]
fn backpressure_dec();
}
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[backpressure-dec]"]
fn backpressure_dec();
}
unsafe { backpressure_dec() }

@@ -582,14 +691,10 @@ }

fn context_get() -> *mut u8 {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn get() -> *mut u8 {
unreachable!()
extern_wasm! {
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[context-get-0]"]
fn get() -> *mut u8;
}
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[context-get-0]"]
fn get() -> *mut u8;
}
unsafe { get() }

@@ -599,14 +704,10 @@ }

unsafe fn context_set(value: *mut u8) {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn set(_: *mut u8) {
unreachable!()
extern_wasm! {
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[context-set-0]"]
fn set(value: *mut u8);
}
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[context-set-0]"]
fn set(value: *mut u8);
}
unsafe { set(value) }

@@ -634,16 +735,12 @@ }

fn drop(&mut self) {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn cancel() {
unreachable!()
extern_wasm! {
#[link(wasm_import_module = "[export]$root")]
extern "C" {
#[link_name = "[task-cancel]"]
fn cancel();
}
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "[export]$root")]
extern "C" {
#[link_name = "[task-cancel]"]
fn cancel();
}
unsafe { cancel() }
}
}

@@ -1,2 +0,2 @@

use crate::rt::async_support::StreamVtable;
use crate::rt::async_support::StreamOps;
use crate::rt::Cleanup;

@@ -19,5 +19,5 @@ use std::alloc::Layout;

/// future's return value.
pub struct AbiBuffer<T: 'static> {
rust_storage: Vec<MaybeUninit<T>>,
vtable: &'static StreamVtable<T>,
pub struct AbiBuffer<O: StreamOps> {
rust_storage: Vec<MaybeUninit<O::Payload>>,
ops: O,
alloc: Option<Cleanup>,

@@ -27,6 +27,4 @@ cursor: usize,

impl<T: 'static> AbiBuffer<T> {
pub(crate) fn new(mut vec: Vec<T>, vtable: &'static StreamVtable<T>) -> AbiBuffer<T> {
assert_eq!(vtable.lower.is_some(), vtable.lift.is_some());
impl<O: StreamOps> AbiBuffer<O> {
pub(crate) fn new(mut vec: Vec<O::Payload>, mut ops: O) -> AbiBuffer<O> {
// SAFETY: We're converting `Vec<T>` to `Vec<MaybeUninit<T>>`, which

@@ -39,3 +37,3 @@ // should be safe.

mem::forget(vec);
Vec::<MaybeUninit<T>>::from_raw_parts(ptr.cast(), len, cap)
Vec::<MaybeUninit<O::Payload>>::from_raw_parts(ptr.cast(), len, cap)
};

@@ -49,10 +47,12 @@

// skip this entirely.
let alloc = vtable.lower.and_then(|lower| {
let alloc = if ops.native_abi_matches_canonical_abi() {
None
} else {
let elem_layout = ops.elem_layout();
let layout = Layout::from_size_align(
vtable.layout.size() * rust_storage.len(),
vtable.layout.align(),
elem_layout.size() * rust_storage.len(),
elem_layout.align(),
)
.unwrap();
let (mut ptr, cleanup) = Cleanup::new(layout);
let cleanup = cleanup?;
// SAFETY: All items in `rust_storage` are already initialized so

@@ -64,13 +64,12 @@ // it should be safe to read them and move ownership into the

let item = item.assume_init_read();
lower(item, ptr);
ptr = ptr.add(vtable.layout.size());
ops.lower(item, ptr);
ptr = ptr.add(elem_layout.size());
}
}
Some(cleanup)
});
cleanup
};
AbiBuffer {
rust_storage,
alloc,
vtable,
ops,
cursor: 0,

@@ -86,3 +85,3 @@ }

// situation the list would have been un-tampered with above.
if self.vtable.lower.is_none() {
if self.ops.native_abi_matches_canonical_abi() {
// SAFETY: this should be in-bounds, so it should be safe.

@@ -103,3 +102,3 @@ let ptr = unsafe { self.rust_storage.as_ptr().add(self.cursor).cast() };

// SAFETY: this should be in-bounds, so it should be safe.
unsafe { ptr.add(self.cursor * self.vtable.layout.size()) },
unsafe { ptr.add(self.cursor * self.ops.elem_layout().size()) },
self.rust_storage.len() - self.cursor,

@@ -121,3 +120,3 @@ )

/// to the start of the vector.
pub fn into_vec(mut self) -> Vec<T> {
pub fn into_vec(mut self) -> Vec<O::Payload> {
self.take_vec()

@@ -138,6 +137,6 @@ }

assert!(amt + self.cursor <= self.rust_storage.len());
let Some(dealloc_lists) = self.vtable.dealloc_lists else {
if !self.ops.contains_lists() {
self.cursor += amt;
return;
};
}
let (mut ptr, len) = self.abi_ptr_and_len();

@@ -150,4 +149,4 @@ assert!(amt <= len);

unsafe {
dealloc_lists(ptr.cast_mut());
ptr = ptr.add(self.vtable.layout.size());
self.ops.dealloc_lists(ptr.cast_mut());
ptr = ptr.add(self.ops.elem_layout().size());
}

@@ -158,3 +157,3 @@ }

fn take_vec(&mut self) -> Vec<T> {
fn take_vec(&mut self) -> Vec<O::Payload> {
// First, if necessary, convert remaining values within `self.alloc`

@@ -169,3 +168,3 @@ // back into `self.rust_storage`. This is necessary when a lift

// operation, moving all the values back into the vector.
if let Some(lift) = self.vtable.lift {
if !self.ops.native_abi_matches_canonical_abi() {
let (mut ptr, mut len) = self.abi_ptr_and_len();

@@ -177,4 +176,4 @@ // SAFETY: this should be safe as `lift` is operating on values that

for dst in self.rust_storage[self.cursor..].iter_mut() {
dst.write(lift(ptr.cast_mut()));
ptr = ptr.add(self.vtable.layout.size());
dst.write(self.ops.lift(ptr.cast_mut()));
ptr = ptr.add(self.ops.elem_layout().size());
len -= 1;

@@ -203,3 +202,3 @@ }

mem::forget(storage);
Vec::<T>::from_raw_parts(ptr.cast(), len, cap)
Vec::<O::Payload>::from_raw_parts(ptr.cast(), len, cap)
}

@@ -209,3 +208,6 @@ }

impl<T> Drop for AbiBuffer<T> {
impl<O> Drop for AbiBuffer<O>
where
O: StreamOps,
{
fn drop(&mut self) {

@@ -219,2 +221,3 @@ let _ = self.take_vec();

use super::*;
use crate::rt::async_support::StreamVtable;
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};

@@ -221,0 +224,0 @@ use std::vec;

@@ -49,24 +49,19 @@ //! Definition of the "C ABI" of how imported functions interact with exported

#[cfg(target_family = "wasm")]
extern "C" {
/// Sets the global task pointer to `ptr` provided. Returns the previous
/// value.
///
/// This function acts as a dual getter and a setter. To get the
/// current task pointer a dummy `ptr` can be provided (e.g. NULL) and then
/// it's passed back when you're done working with it. When setting the
/// current task pointer it's recommended to call this and then call it
/// again with the previous value when the tasks's work is done.
///
/// For executors they need to ensure that the `ptr` passed in lives for
/// the entire lifetime of the component model task.
pub fn wasip3_task_set(ptr: *mut wasip3_task) -> *mut wasip3_task;
extern_wasm! {
extern "C" {
/// Sets the global task pointer to `ptr` provided. Returns the previous
/// value.
///
/// This function acts as a dual getter and a setter. To get the
/// current task pointer a dummy `ptr` can be provided (e.g. NULL) and then
/// it's passed back when you're done working with it. When setting the
/// current task pointer it's recommended to call this and then call it
/// again with the previous value when the tasks's work is done.
///
/// For executors they need to ensure that the `ptr` passed in lives for
/// the entire lifetime of the component model task.
pub fn wasip3_task_set(ptr: *mut wasip3_task) -> *mut wasip3_task;
}
}
#[cfg(not(target_family = "wasm"))]
pub unsafe extern "C" fn wasip3_task_set(ptr: *mut wasip3_task) -> *mut wasip3_task {
let _ = ptr;
unreachable!();
}
/// The first version of `wasip3_task` which implies the existence of the

@@ -73,0 +68,0 @@ /// fields `ptr`, `waitable_register`, and `waitable_unregister`.

@@ -63,6 +63,3 @@ //! Raw bindings to `error-context` in the canonical ABI.

fn drop(&mut self) {
#[cfg(target_arch = "wasm32")]
unsafe {
drop(self.handle)
}
unsafe { drop(self.handle) }
}

@@ -77,20 +74,12 @@ }

#[cfg(not(target_arch = "wasm32"))]
unsafe fn new(_: *const u8, _: usize) -> u32 {
unreachable!()
extern_wasm! {
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[error-context-new-utf8]"]
fn new(_: *const u8, _: usize) -> u32;
#[link_name = "[error-context-drop]"]
fn drop(_: u32);
#[link_name = "[error-context-debug-message-utf8]"]
fn debug_message(_: u32, _: &mut RetPtr);
}
}
#[cfg(not(target_arch = "wasm32"))]
fn debug_message(_: u32, _: &mut RetPtr) {
unreachable!()
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[error-context-new-utf8]"]
fn new(_: *const u8, _: usize) -> u32;
#[link_name = "[error-context-drop]"]
fn drop(_: u32);
#[link_name = "[error-context-debug-message-utf8]"]
fn debug_message(_: u32, _: &mut RetPtr);
}

@@ -12,3 +12,2 @@ //! For a high-level overview of how this module is implemented see the

future::Future,
marker,
pin::Pin,

@@ -26,2 +25,48 @@ ptr,

#[doc(hidden)]
pub unsafe trait StreamOps: Clone {
/// The Rust type that's sent or received on this stream.
type Payload: 'static;
/// The `stream.new` intrinsic.
fn new(&mut self) -> u64;
/// The canonical ABI layout of the type that this stream is
/// sending/receiving.
fn elem_layout(&self) -> Layout;
/// Returns whether `lift` or `lower` is required to create `Self::Payload`.
///
/// If this returns `false` then `Self::Payload` is natively in its
/// canonical ABI representation.
fn native_abi_matches_canonical_abi(&self) -> bool;
/// Returns whether `O::Payload` has lists that need to be deallocated with
/// `dealloc_lists`.
fn contains_lists(&self) -> bool;
/// Converts a Rust type to its canonical ABI representation.
unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8);
/// Used to deallocate any Rust-owned lists in the canonical ABI
/// representation for when a value is successfully sent but needs to be
/// cleaned up.
unsafe fn dealloc_lists(&mut self, dst: *mut u8);
/// Converts from the canonical ABI representation to a Rust value.
unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload;
/// The `stream.write` intrinsic
unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32;
/// The `stream.read` intrinsic
unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32;
/// The `stream.cancel-read` intrinsic
unsafe fn cancel_read(&mut self, stream: u32) -> u32;
/// The `stream.cancel-write` intrinsic
unsafe fn cancel_write(&mut self, stream: u32) -> u32;
/// The `stream.drop-readable` intrinsic
unsafe fn drop_readable(&mut self, stream: u32);
/// The `stream.drop-writable` intrinsic
unsafe fn drop_writable(&mut self, stream: u32);
}
/// Operations that a stream requires throughout the implementation.
///
/// This is generated by `wit_bindgen::generate!` primarily.
#[doc(hidden)]
pub struct StreamVtable<T> {

@@ -69,2 +114,50 @@ /// The in-memory canonical ABI layout of a single value of `T`.

unsafe impl<T: 'static> StreamOps for &StreamVtable<T> {
type Payload = T;
fn new(&mut self) -> u64 {
unsafe { (self.new)() }
}
fn elem_layout(&self) -> Layout {
self.layout
}
fn native_abi_matches_canonical_abi(&self) -> bool {
self.lift.is_none()
}
fn contains_lists(&self) -> bool {
self.dealloc_lists.is_some()
}
unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8) {
if let Some(f) = self.lower {
f(payload, dst)
}
}
unsafe fn dealloc_lists(&mut self, dst: *mut u8) {
if let Some(f) = self.dealloc_lists {
f(dst)
}
}
unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload {
(self.lift.unwrap())(dst)
}
unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32 {
(self.start_write)(stream, val, amt)
}
unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32 {
(self.start_read)(stream, val, amt)
}
unsafe fn cancel_read(&mut self, stream: u32) -> u32 {
(self.cancel_read)(stream)
}
unsafe fn cancel_write(&mut self, stream: u32) -> u32 {
(self.cancel_write)(stream)
}
unsafe fn drop_readable(&mut self, stream: u32) {
(self.drop_readable)(stream)
}
unsafe fn drop_writable(&mut self, stream: u32) {
(self.drop_writable)(stream)
}
}
/// Helper function to create a new read/write pair for a component model

@@ -75,4 +168,13 @@ /// stream.

) -> (StreamWriter<T>, StreamReader<T>) {
unsafe { raw_stream_new(vtable) }
}
/// Helper function to create a new read/write pair for a component model
/// stream.
pub unsafe fn raw_stream_new<O>(mut ops: O) -> (RawStreamWriter<O>, RawStreamReader<O>)
where
O: StreamOps + Clone,
{
unsafe {
let handles = (vtable.new)();
let handles = ops.new();
let reader = handles as u32;

@@ -82,4 +184,4 @@ let writer = (handles >> 32) as u32;

(
StreamWriter::new(writer, vtable),
StreamReader::new(reader, vtable),
RawStreamWriter::new(writer, ops.clone()),
RawStreamReader::new(reader, ops),
)

@@ -90,14 +192,20 @@ }

/// Represents the writable end of a Component Model `stream`.
pub struct StreamWriter<T: 'static> {
pub type StreamWriter<T> = RawStreamWriter<&'static StreamVtable<T>>;
/// Represents the writable end of a Component Model `stream`.
pub struct RawStreamWriter<O: StreamOps> {
handle: u32,
vtable: &'static StreamVtable<T>,
ops: O,
done: bool,
}
impl<T> StreamWriter<T> {
impl<O> RawStreamWriter<O>
where
O: StreamOps,
{
#[doc(hidden)]
pub unsafe fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
pub unsafe fn new(handle: u32, ops: O) -> Self {
Self {
handle,
vtable,
ops,
done: false,

@@ -107,2 +215,8 @@ }

/// Returns the index of the component-model handle that this stream is
/// using.
pub fn handle(&self) -> u32 {
self.handle
}
/// Initiate a write of the `values` provided into this stream.

@@ -145,4 +259,4 @@ ///

/// happened must be done with [`StreamWrite::cancel`].
pub fn write(&mut self, values: Vec<T>) -> StreamWrite<'_, T> {
self.write_buf(AbiBuffer::new(values, self.vtable))
pub fn write(&mut self, values: Vec<O::Payload>) -> RawStreamWrite<'_, O> {
self.write_buf(AbiBuffer::new(values, self.ops.clone()))
}

@@ -152,5 +266,5 @@

/// instead of `Vec<T>`.
pub fn write_buf(&mut self, values: AbiBuffer<T>) -> StreamWrite<'_, T> {
StreamWrite {
op: WaitableOperation::new(StreamWriteOp(marker::PhantomData), (self, values)),
pub fn write_buf(&mut self, values: AbiBuffer<O>) -> RawStreamWrite<'_, O> {
RawStreamWrite {
op: WaitableOperation::new(StreamWriteOp { writer: self }, values),
}

@@ -166,3 +280,3 @@ }

/// not sent because the stream was dropped.
pub async fn write_all(&mut self, values: Vec<T>) -> Vec<T> {
pub async fn write_all(&mut self, values: Vec<O::Payload>) -> Vec<O::Payload> {
// Perform an initial write which converts `values` into `AbiBuffer`.

@@ -200,3 +314,3 @@ let (mut status, mut buf) = self.write(values).await;

/// sent.
pub async fn write_one(&mut self, value: T) -> Option<T> {
pub async fn write_one(&mut self, value: O::Payload) -> Option<O::Payload> {
// TODO: can probably be a bit more efficient about this and avoid

@@ -209,3 +323,6 @@ // moving `value` onto the heap in some situations, but that's left as

impl<T> fmt::Debug for StreamWriter<T> {
impl<O> fmt::Debug for RawStreamWriter<O>
where
O: StreamOps,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {

@@ -218,7 +335,10 @@ f.debug_struct("StreamWriter")

impl<T> Drop for StreamWriter<T> {
impl<O> Drop for RawStreamWriter<O>
where
O: StreamOps,
{
fn drop(&mut self) {
rtdebug!("stream.drop-writable({})", self.handle);
unsafe {
(self.vtable.drop_writable)(self.handle);
self.ops.drop_writable(self.handle);
}

@@ -229,7 +349,12 @@ }

/// Represents a write operation which may be cancelled prior to completion.
pub struct StreamWrite<'a, T: 'static> {
op: WaitableOperation<StreamWriteOp<'a, T>>,
pub type StreamWrite<'a, T> = RawStreamWrite<'a, &'static StreamVtable<T>>;
/// Represents a write operation which may be cancelled prior to completion.
pub struct RawStreamWrite<'a, O: StreamOps> {
op: WaitableOperation<StreamWriteOp<'a, O>>,
}
struct StreamWriteOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamWriter<T>, T)>);
struct StreamWriteOp<'a, O: StreamOps> {
writer: &'a mut RawStreamWriter<O>,
}

@@ -251,14 +376,14 @@ /// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation,

unsafe impl<'a, T> WaitableOp for StreamWriteOp<'a, T>
unsafe impl<'a, O> WaitableOp for StreamWriteOp<'a, O>
where
T: 'static,
O: StreamOps,
{
type Start = (&'a mut StreamWriter<T>, AbiBuffer<T>);
type InProgress = (&'a mut StreamWriter<T>, AbiBuffer<T>);
type Result = (StreamResult, AbiBuffer<T>);
type Cancel = (StreamResult, AbiBuffer<T>);
type Start = AbiBuffer<O>;
type InProgress = AbiBuffer<O>;
type Result = (StreamResult, AbiBuffer<O>);
type Cancel = (StreamResult, AbiBuffer<O>);
fn start(&mut self, (writer, buf): Self::Start) -> (u32, Self::InProgress) {
if writer.done {
return (DROPPED, (writer, buf));
fn start(&mut self, buf: Self::Start) -> (u32, Self::InProgress) {
if self.writer.done {
return (DROPPED, buf);
}

@@ -269,11 +394,11 @@

// `AbiBuffer` is trying to make this safe.
let code = unsafe { (writer.vtable.start_write)(writer.handle, ptr, len) };
let code = unsafe { self.writer.ops.start_write(self.writer.handle, ptr, len) };
rtdebug!(
"stream.write({}, {ptr:?}, {len}) = {code:#x}",
writer.handle
self.writer.handle
);
(code, (writer, buf))
(code, buf)
}
fn start_cancelled(&mut self, (_writer, buf): Self::Start) -> Self::Cancel {
fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
(StreamResult::Cancelled, buf)

@@ -284,7 +409,7 @@ }

&mut self,
(writer, mut buf): Self::InProgress,
mut buf: Self::InProgress,
code: u32,
) -> Result<Self::Result, Self::InProgress> {
match ReturnCode::decode(code) {
ReturnCode::Blocked => Err((writer, buf)),
ReturnCode::Blocked => Err(buf),
ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)),

@@ -298,3 +423,3 @@ ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)),

if let ReturnCode::Dropped(_) = code {
writer.done = true;
self.writer.done = true;
}

@@ -306,11 +431,11 @@ Ok((StreamResult::Complete(amt), buf))

fn in_progress_waitable(&mut self, (writer, _): &Self::InProgress) -> u32 {
writer.handle
fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
self.writer.handle
}
fn in_progress_cancel(&mut self, (writer, _): &mut Self::InProgress) -> u32 {
fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
// SAFETY: we're managing `writer` and all the various operational bits,
// so this relies on `WaitableOperation` being safe.
let code = unsafe { (writer.vtable.cancel_write)(writer.handle) };
rtdebug!("stream.cancel-write({}) = {code:#x}", writer.handle);
let code = unsafe { self.writer.ops.cancel_write(self.writer.handle) };
rtdebug!("stream.cancel-write({}) = {code:#x}", self.writer.handle);
code

@@ -324,4 +449,4 @@ }

impl<T: 'static> Future for StreamWrite<'_, T> {
type Output = (StreamResult, AbiBuffer<T>);
impl<O: StreamOps> Future for RawStreamWrite<'_, O> {
type Output = (StreamResult, AbiBuffer<O>);

@@ -333,4 +458,4 @@ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

impl<'a, T: 'static> StreamWrite<'a, T> {
fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, T>>> {
impl<'a, O: StreamOps> RawStreamWrite<'a, O> {
fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, O>>> {
// SAFETY: we've chosen that when `Self` is pinned that it translates to

@@ -351,3 +476,3 @@ // always pinning the inner field, so that's codified here.

/// or if this method is called twice.
pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<T>) {
pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<O>) {
self.pin_project().cancel()

@@ -358,9 +483,12 @@ }

/// Represents the readable end of a Component Model `stream`.
pub struct StreamReader<T: 'static> {
pub type StreamReader<T> = RawStreamReader<&'static StreamVtable<T>>;
/// Represents the readable end of a Component Model `stream`.
pub struct RawStreamReader<O: StreamOps> {
handle: AtomicU32,
vtable: &'static StreamVtable<T>,
ops: O,
done: bool,
}
impl<T> fmt::Debug for StreamReader<T> {
impl<O: StreamOps> fmt::Debug for RawStreamReader<O> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {

@@ -373,8 +501,8 @@ f.debug_struct("StreamReader")

impl<T> StreamReader<T> {
impl<O: StreamOps> RawStreamReader<O> {
#[doc(hidden)]
pub fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self {
pub fn new(handle: u32, ops: O) -> Self {
Self {
handle: AtomicU32::new(handle),
vtable,
ops,
done: false,

@@ -391,3 +519,5 @@ }

fn handle(&self) -> u32 {
/// Returns the index of the component-model handle that this stream is
/// using.
pub fn handle(&self) -> u32 {
self.opt_handle().unwrap()

@@ -418,5 +548,5 @@ }

/// used.
pub fn read(&mut self, buf: Vec<T>) -> StreamRead<'_, T> {
StreamRead {
op: WaitableOperation::new(StreamReadOp(marker::PhantomData), (self, buf)),
pub fn read(&mut self, buf: Vec<O::Payload>) -> RawStreamRead<'_, O> {
RawStreamRead {
op: WaitableOperation::new(StreamReadOp { reader: self }, buf),
}

@@ -429,3 +559,3 @@ }

/// reads only a single item and does not expose control over cancellation.
pub async fn next(&mut self) -> Option<T> {
pub async fn next(&mut self) -> Option<O::Payload> {
// TODO: should amortize this allocation and avoid doing it every time.

@@ -441,3 +571,3 @@ // Or somehow perhaps make this more optimal.

/// and await the stream to be dropped.
pub async fn collect(mut self) -> Vec<T> {
pub async fn collect(mut self) -> Vec<O::Payload> {
let mut ret = Vec::new();

@@ -463,3 +593,3 @@ loop {

impl<T> Drop for StreamReader<T> {
impl<O: StreamOps> Drop for RawStreamReader<O> {
fn drop(&mut self) {

@@ -471,3 +601,3 @@ let Some(handle) = self.opt_handle() else {

rtdebug!("stream.drop-readable({})", handle);
(self.vtable.drop_readable)(handle);
self.ops.drop_readable(handle);
}

@@ -478,20 +608,22 @@ }

/// Represents a read operation which may be cancelled prior to completion.
pub struct StreamRead<'a, T: 'static> {
op: WaitableOperation<StreamReadOp<'a, T>>,
pub type StreamRead<'a, T> = RawStreamRead<'a, &'static StreamVtable<T>>;
/// Represents a read operation which may be cancelled prior to completion.
pub struct RawStreamRead<'a, O: StreamOps> {
op: WaitableOperation<StreamReadOp<'a, O>>,
}
struct StreamReadOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamReader<T>, T)>);
struct StreamReadOp<'a, O: StreamOps> {
reader: &'a mut RawStreamReader<O>,
}
unsafe impl<'a, T> WaitableOp for StreamReadOp<'a, T>
where
T: 'static,
{
type Start = (&'a mut StreamReader<T>, Vec<T>);
type InProgress = (&'a mut StreamReader<T>, Vec<T>, Option<Cleanup>);
type Result = (StreamResult, Vec<T>);
type Cancel = (StreamResult, Vec<T>);
unsafe impl<'a, O: StreamOps> WaitableOp for StreamReadOp<'a, O> {
type Start = Vec<O::Payload>;
type InProgress = (Vec<O::Payload>, Option<Cleanup>);
type Result = (StreamResult, Vec<O::Payload>);
type Cancel = (StreamResult, Vec<O::Payload>);
fn start(&mut self, (reader, mut buf): Self::Start) -> (u32, Self::InProgress) {
if reader.done {
return (DROPPED, (reader, buf, None));
fn start(&mut self, mut buf: Self::Start) -> (u32, Self::InProgress) {
if self.reader.done {
return (DROPPED, (buf, None));
}

@@ -505,25 +637,28 @@

// raw capacity in `buf` itself.
if reader.vtable.lift.is_some() {
let layout = Layout::from_size_align(
reader.vtable.layout.size() * cap.len(),
reader.vtable.layout.align(),
)
.unwrap();
(ptr, cleanup) = Cleanup::new(layout);
} else {
if self.reader.ops.native_abi_matches_canonical_abi() {
ptr = cap.as_mut_ptr().cast();
cleanup = None;
} else {
let elem_layout = self.reader.ops.elem_layout();
let layout =
Layout::from_size_align(elem_layout.size() * cap.len(), elem_layout.align())
.unwrap();
(ptr, cleanup) = Cleanup::new(layout);
}
// SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will
// persist with this async operation itself.
let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr, cap.len()) };
let code = unsafe {
self.reader
.ops
.start_read(self.reader.handle(), ptr, cap.len())
};
rtdebug!(
"stream.read({}, {ptr:?}, {}) = {code:#x}",
reader.handle(),
self.reader.handle(),
cap.len()
);
(code, (reader, buf, cleanup))
(code, (buf, cleanup))
}
fn start_cancelled(&mut self, (_, buf): Self::Start) -> Self::Cancel {
fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel {
(StreamResult::Cancelled, buf)

@@ -534,7 +669,7 @@ }

&mut self,
(reader, mut buf, cleanup): Self::InProgress,
(mut buf, cleanup): Self::InProgress,
code: u32,
) -> Result<Self::Result, Self::InProgress> {
match ReturnCode::decode(code) {
ReturnCode::Blocked => Err((reader, buf, cleanup)),
ReturnCode::Blocked => Err((buf, cleanup)),

@@ -558,22 +693,22 @@ // Note that the `cleanup`, if any, is discarded here.

match reader.vtable.lift {
if self.reader.ops.native_abi_matches_canonical_abi() {
// If no `lift` was necessary, then the results of this operation
// were read directly into `buf`, so just update its length now that
// values have been initialized.
unsafe {
buf.set_len(cur_len + amt);
}
} else {
// With a `lift` operation this now requires reading `amt` items
// from `cleanup` and pushing them into `buf`.
Some(lift) => {
let mut ptr = cleanup
.as_ref()
.map(|c| c.ptr.as_ptr())
.unwrap_or(ptr::null_mut());
for _ in 0..amt {
unsafe {
buf.push(lift(ptr));
ptr = ptr.add(reader.vtable.layout.size());
}
let mut ptr = cleanup
.as_ref()
.map(|c| c.ptr.as_ptr())
.unwrap_or(ptr::null_mut());
for _ in 0..amt {
unsafe {
buf.push(self.reader.ops.lift(ptr));
ptr = ptr.add(self.reader.ops.elem_layout().size());
}
}
// If no `lift` was necessary, then the results of this operation
// were read directly into `buf`, so just update its length now that
// values have been initialized.
None => unsafe { buf.set_len(cur_len + amt) },
}

@@ -585,3 +720,3 @@

if let ReturnCode::Dropped(_) = code {
reader.done = true;
self.reader.done = true;
}

@@ -593,11 +728,11 @@ Ok((StreamResult::Complete(amt), buf))

fn in_progress_waitable(&mut self, (reader, ..): &Self::InProgress) -> u32 {
reader.handle()
fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 {
self.reader.handle()
}
fn in_progress_cancel(&mut self, (reader, ..): &mut Self::InProgress) -> u32 {
fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 {
// SAFETY: we're managing `reader` and all the various operational bits,
// so this relies on `WaitableOperation` being safe.
let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) };
rtdebug!("stream.cancel-read({}) = {code:#x}", reader.handle());
let code = unsafe { self.reader.ops.cancel_read(self.reader.handle()) };
rtdebug!("stream.cancel-read({}) = {code:#x}", self.reader.handle());
code

@@ -611,4 +746,4 @@ }

impl<T: 'static> Future for StreamRead<'_, T> {
type Output = (StreamResult, Vec<T>);
impl<O: StreamOps> Future for RawStreamRead<'_, O> {
type Output = (StreamResult, Vec<O::Payload>);

@@ -620,4 +755,7 @@ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

impl<'a, T> StreamRead<'a, T> {
fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, T>>> {
impl<'a, O> RawStreamRead<'a, O>
where
O: StreamOps,
{
fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, O>>> {
// SAFETY: we've chosen that when `Self` is pinned that it translates to

@@ -641,5 +779,5 @@ // always pinning the inner field, so that's codified here.

/// or if this method is called twice.
pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<T>) {
pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<O::Payload>) {
self.pin_project().cancel()
}
}

@@ -276,19 +276,10 @@ //! Bindings used to manage subtasks, or invocations of imported functions.

#[cfg(not(target_arch = "wasm32"))]
unsafe fn drop(_: u32) {
unreachable!()
extern_wasm! {
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[subtask-cancel]"]
fn cancel(handle: u32) -> u32;
#[link_name = "[subtask-drop]"]
fn drop(handle: u32);
}
}
#[cfg(not(target_arch = "wasm32"))]
unsafe fn cancel(_: u32) -> u32 {
unreachable!()
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[subtask-cancel]"]
fn cancel(handle: u32) -> u32;
#[link_name = "[subtask-drop]"]
fn drop(handle: u32);
}

@@ -20,3 +20,3 @@ //! Low-level FFI-like bindings around `waitable-set` in the canonical ABI.

pub fn remove_waitable_from_all_sets(waitable: u32) {
rtdebug!("waitable-set.join({waitable}, 0)");
rtdebug!("waitable.join({waitable}, 0)");
unsafe { join(waitable, 0) }

@@ -28,2 +28,3 @@ }

let mut payload = [0; 2];
rtdebug!("waitable-set.wait({}) = ...", self.0.get());
let event0 = wait(self.0.get(), &mut payload);

@@ -43,2 +44,3 @@ rtdebug!(

let mut payload = [0; 2];
rtdebug!("waitable-set.poll({}) = ...", self.0.get());
let event0 = poll(self.0.get(), &mut payload);

@@ -69,36 +71,16 @@ rtdebug!(

#[cfg(not(target_arch = "wasm32"))]
unsafe fn new() -> u32 {
unreachable!()
extern_wasm! {
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[waitable-set-new]"]
fn new() -> u32;
#[link_name = "[waitable-set-drop]"]
fn drop(set: u32);
#[link_name = "[waitable-join]"]
fn join(waitable: u32, set: u32);
#[link_name = "[waitable-set-wait]"]
fn wait(_: u32, _: *mut [u32; 2]) -> u32;
#[link_name = "[waitable-set-poll]"]
fn poll(_: u32, _: *mut [u32; 2]) -> u32;
}
}
#[cfg(not(target_arch = "wasm32"))]
unsafe fn drop(_: u32) {
unreachable!()
}
#[cfg(not(target_arch = "wasm32"))]
unsafe fn join(_: u32, _: u32) {
unreachable!()
}
#[cfg(not(target_arch = "wasm32"))]
unsafe fn wait(_: u32, _: *mut [u32; 2]) -> u32 {
unreachable!();
}
#[cfg(not(target_arch = "wasm32"))]
unsafe fn poll(_: u32, _: *mut [u32; 2]) -> u32 {
unreachable!();
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[waitable-set-new]"]
fn new() -> u32;
#[link_name = "[waitable-set-drop]"]
fn drop(set: u32);
#[link_name = "[waitable-join]"]
fn join(waitable: u32, set: u32);
#[link_name = "[waitable-set-wait]"]
fn wait(_: u32, _: *mut [u32; 2]) -> u32;
#[link_name = "[waitable-set-poll]"]
fn poll(_: u32, _: *mut [u32; 2]) -> u32;
}

@@ -5,7 +5,7 @@ // This file is generated by ./ci/rebuild-libwit-bindgen-cabi.sh

extern void *cabi_realloc_wit_bindgen_0_48_1(void *ptr, size_t old_size, size_t align, size_t new_size);
extern void *cabi_realloc_wit_bindgen_0_49_0(void *ptr, size_t old_size, size_t align, size_t new_size);
__attribute__((__weak__, __export_name__("cabi_realloc")))
void *cabi_realloc(void *ptr, size_t old_size, size_t align, size_t new_size) {
return cabi_realloc_wit_bindgen_0_48_1(ptr, old_size, align, new_size);
return cabi_realloc_wit_bindgen_0_49_0(ptr, old_size, align, new_size);
}
// This file is generated by ./ci/rebuild-libwit-bindgen-cabi.sh
#[unsafe(no_mangle)]
pub unsafe extern "C" fn cabi_realloc_wit_bindgen_0_48_1(
pub unsafe extern "C" fn cabi_realloc_wit_bindgen_0_49_0(
old_ptr: *mut u8,

@@ -6,0 +6,0 @@ old_len: usize,

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet