wit-bindgen
Advanced tools
| use super::FutureState; | ||
| #[derive(Default)] | ||
| pub struct State; | ||
| impl FutureState<'_> { | ||
| pub(super) fn read_inter_task_stream(&mut self) { | ||
| assert!( | ||
| self.remaining_work(), | ||
| " | ||
| Rust task cannot sleep waiting only on Rust-originating events unless the | ||
| `wit-bindgen` crate is compiled with the `inter-task-wakeup` feature | ||
| enabled. | ||
| " | ||
| ); | ||
| } | ||
| pub(super) fn cancel_inter_task_stream_read(&mut self) { | ||
| // nothing to do | ||
| } | ||
| } | ||
| impl State { | ||
| pub fn consume_waitable_event(&mut self, _waitable: u32, _code: u32) -> bool { | ||
| false | ||
| } | ||
| } | ||
| #[derive(Default)] | ||
| pub struct WakerState; | ||
| impl WakerState { | ||
| pub fn wake(&self) { | ||
| panic!( | ||
| " | ||
| Cannot support cross-component-model-task wakeup unlses the `wit-bindgen` | ||
| crate is compiled with the `inter-task-wakeup` feature enabled. | ||
| " | ||
| ); | ||
| } | ||
| } |
| use super::FutureState; | ||
| use crate::rt::async_support::{BLOCKED, COMPLETED}; | ||
| use crate::{RawStreamReader, RawStreamWriter, StreamOps, UnitStreamOps}; | ||
| use std::ptr; | ||
| use std::sync::Mutex; | ||
| #[derive(Default)] | ||
| pub struct State { | ||
| /// A lazily-initialized stream used to signal inter-task notifications. | ||
| /// | ||
| /// This stream is used when one component-model task is used to wake up | ||
| /// another component-model task. This can happen when the async event being | ||
| /// waited on is defined purely in Rust, for example, and doesn't rely on | ||
| /// any component-model primitives. | ||
| stream: Option<RawStreamReader<UnitStreamOps>>, | ||
| /// Boolean if there's an active read of `inter_task_stream`. Used to handle | ||
| /// cancellation/deduplication of the read. | ||
| stream_reading: bool, | ||
| } | ||
| impl FutureState<'_> { | ||
| pub(super) fn read_inter_task_stream(&mut self) { | ||
| // Lazily allocate the inter-task stream now that we're actually going | ||
| // to sleep. We don't know where the wakeup notification will come from | ||
| // so it's required to allocate one here. | ||
| if self.inter_task_wakeup.stream.is_none() { | ||
| assert!(!self.inter_task_wakeup.stream_reading); | ||
| let (writer, reader) = UnitStreamOps::new(); | ||
| self.inter_task_wakeup.stream = Some(reader); | ||
| let mut waker_stream = self.waker.inter_task_stream.lock.lock().unwrap(); | ||
| assert!(waker_stream.is_none()); | ||
| *waker_stream = Some(writer); | ||
| } | ||
| // If there's not already a pending read then schedule a new read here. | ||
| // | ||
| // Note that this should always return `BLOCKED` since as the only task | ||
| // running it's not possible for a read to be anywhere else in the | ||
| // system. Additionally we keep the read end alive, so this shouldn't | ||
| // ever returned dropped/closed either. | ||
| if !self.inter_task_wakeup.stream_reading { | ||
| let stream = self.inter_task_wakeup.stream.as_mut().unwrap(); | ||
| let handle = stream.handle(); | ||
| let rc = unsafe { UnitStreamOps.start_read(handle, ptr::null_mut(), 1) }; | ||
| assert_eq!(rc, BLOCKED); | ||
| self.inter_task_wakeup.stream_reading = true; | ||
| self.add_waitable(handle); | ||
| } | ||
| } | ||
| /// Cancels the active read of the inter-task stream, if any. | ||
| /// | ||
| /// Has no effect if there is no active read. | ||
| pub(super) fn cancel_inter_task_stream_read(&mut self) { | ||
| if !self.inter_task_wakeup.stream_reading { | ||
| return; | ||
| } | ||
| self.inter_task_wakeup.stream_reading = false; | ||
| let handle = self.inter_task_wakeup.stream.as_mut().unwrap().handle(); | ||
| // Note that the return code here is discarded. No matter what the read | ||
| // is cancelled, and whether we actually read something or whether we | ||
| // cancelled doesn't matter. | ||
| unsafe { | ||
| UnitStreamOps.cancel_read(handle); | ||
| } | ||
| self.remove_waitable(handle); | ||
| } | ||
| } | ||
| impl State { | ||
| pub fn consume_waitable_event(&mut self, waitable: u32, _code: u32) -> bool { | ||
| if let Some(reader) = self.stream.as_mut() { | ||
| if reader.handle() == waitable { | ||
| self.stream_reading = false; | ||
| return true; | ||
| } | ||
| } | ||
| false | ||
| } | ||
| } | ||
| #[derive(Default)] | ||
| pub struct WakerState { | ||
| lock: Mutex<Option<RawStreamWriter<UnitStreamOps>>>, | ||
| } | ||
| impl WakerState { | ||
| pub fn wake(&self) { | ||
| // Here the wakeup stream should already have been filled in by the | ||
| // original future itself. The stream should also have an active read | ||
| // while the future is sleeping. This means that this write should | ||
| // succeed immediately. | ||
| let mut inter_task_stream = self.lock.lock().unwrap(); | ||
| let stream = inter_task_stream.as_mut().unwrap(); | ||
| let rc = unsafe { UnitStreamOps.start_write(stream.handle(), ptr::null_mut(), 1) }; | ||
| assert_eq!(rc, COMPLETED | (1 << 4)); | ||
| } | ||
| } |
| use crate::rt::async_support::raw_stream_new; | ||
| use crate::{RawStreamReader, RawStreamWriter, StreamOps}; | ||
| use std::alloc::Layout; | ||
| /// Operations for `stream<()>`. | ||
| /// | ||
| /// Can be combined with [`RawStreamWriter`] and [`RawStreamReader`] as created | ||
| /// through [`UnitStreamOps::new`] | ||
| #[derive(Copy, Clone)] | ||
| pub struct UnitStreamOps; | ||
| extern_wasm! { | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[stream-new-unit]"] | ||
| fn unit_new() -> u64; | ||
| #[link_name = "[async-lower][stream-write-unit]"] | ||
| fn unit_write(stream: u32, val: *const u8, amt: usize) -> u32; | ||
| #[link_name = "[async-lower][stream-read-unit]"] | ||
| fn unit_read(stream: u32, val: *mut u8, amt: usize) -> u32; | ||
| #[link_name = "[stream-cancel-read-unit]"] | ||
| fn unit_cancel_read(stream: u32) -> u32; | ||
| #[link_name = "[stream-cancel-write-unit]"] | ||
| fn unit_cancel_write(stream: u32) -> u32; | ||
| #[link_name = "[stream-drop-readable-unit]"] | ||
| fn unit_drop_readable(stream: u32) ; | ||
| #[link_name = "[stream-drop-writable-unit]"] | ||
| fn unit_drop_writable(stream: u32) ; | ||
| } | ||
| } | ||
| impl UnitStreamOps { | ||
| /// Creates a new unit stream read/write pair. | ||
| pub fn new() -> (RawStreamWriter<Self>, RawStreamReader<Self>) { | ||
| unsafe { raw_stream_new(UnitStreamOps) } | ||
| } | ||
| } | ||
| unsafe impl StreamOps for UnitStreamOps { | ||
| type Payload = (); | ||
| fn new(&mut self) -> u64 { | ||
| unsafe { unit_new() } | ||
| } | ||
| fn elem_layout(&self) -> Layout { | ||
| Layout::new::<()>() | ||
| } | ||
| fn native_abi_matches_canonical_abi(&self) -> bool { | ||
| true | ||
| } | ||
| fn contains_lists(&self) -> bool { | ||
| false | ||
| } | ||
| unsafe fn lower(&mut self, (): (), _dst: *mut u8) { | ||
| unreachable!() | ||
| } | ||
| unsafe fn dealloc_lists(&mut self, _dst: *mut u8) { | ||
| unreachable!() | ||
| } | ||
| unsafe fn lift(&mut self, _dst: *mut u8) -> Self::Payload { | ||
| unreachable!() | ||
| } | ||
| unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32 { | ||
| unsafe { unit_write(stream, val, amt) } | ||
| } | ||
| unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32 { | ||
| unsafe { unit_read(stream, val, amt) } | ||
| } | ||
| unsafe fn cancel_read(&mut self, stream: u32) -> u32 { | ||
| unsafe { unit_cancel_read(stream) } | ||
| } | ||
| unsafe fn cancel_write(&mut self, stream: u32) -> u32 { | ||
| unsafe { unit_cancel_write(stream) } | ||
| } | ||
| unsafe fn drop_readable(&mut self, stream: u32) { | ||
| unsafe { unit_drop_readable(stream) } | ||
| } | ||
| unsafe fn drop_writable(&mut self, stream: u32) { | ||
| unsafe { unit_drop_writable(stream) } | ||
| } | ||
| } |
| { | ||
| "git": { | ||
| "sha1": "d44d953b9bf9841d1bf5d1744fa7400cbecbefd7" | ||
| "sha1": "ba933bda98214ffe2ae16c63b2b91d0932021bbc" | ||
| }, | ||
| "path_in_vcs": "crates/guest-rust" | ||
| } |
+32
-32
@@ -13,5 +13,5 @@ # This file is automatically @generated by Cargo. | ||
| name = "bitflags" | ||
| version = "2.9.4" | ||
| version = "2.10.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" | ||
| checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" | ||
@@ -130,5 +130,5 @@ [[package]] | ||
| name = "hashbrown" | ||
| version = "0.16.0" | ||
| version = "0.16.1" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" | ||
| checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" | ||
@@ -149,8 +149,8 @@ [[package]] | ||
| name = "indexmap" | ||
| version = "2.11.4" | ||
| version = "2.12.1" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" | ||
| checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" | ||
| dependencies = [ | ||
| "equivalent", | ||
| "hashbrown 0.16.0", | ||
| "hashbrown 0.16.1", | ||
| "serde", | ||
@@ -208,5 +208,5 @@ "serde_core", | ||
| name = "proc-macro2" | ||
| version = "1.0.101" | ||
| version = "1.0.103" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" | ||
| checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" | ||
| dependencies = [ | ||
@@ -218,5 +218,5 @@ "unicode-ident", | ||
| name = "quote" | ||
| version = "1.0.41" | ||
| version = "1.0.42" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" | ||
| checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" | ||
| dependencies = [ | ||
@@ -300,5 +300,5 @@ "proc-macro2", | ||
| name = "syn" | ||
| version = "2.0.106" | ||
| version = "2.0.111" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" | ||
| checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87" | ||
| dependencies = [ | ||
@@ -312,5 +312,5 @@ "proc-macro2", | ||
| name = "unicode-ident" | ||
| version = "1.0.19" | ||
| version = "1.0.22" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" | ||
| checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" | ||
@@ -325,5 +325,5 @@ [[package]] | ||
| name = "wasm-encoder" | ||
| version = "0.241.2" | ||
| version = "0.243.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "e01164c9dda68301e34fdae536c23ed6fe90ce6d97213ccc171eebbd3d02d6b8" | ||
| checksum = "c55db9c896d70bd9fa535ce83cd4e1f2ec3726b0edd2142079f594fc3be1cb35" | ||
| dependencies = [ | ||
@@ -336,5 +336,5 @@ "leb128fmt", | ||
| name = "wasm-metadata" | ||
| version = "0.241.2" | ||
| version = "0.243.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "876fe286f2fa416386deedebe8407e6f19e0b5aeaef3d03161e77a15fa80f167" | ||
| checksum = "eae05bf9579f45a62e8d0a4e3f52eaa8da518883ac5afa482ec8256c329ecd56" | ||
| dependencies = [ | ||
@@ -349,5 +349,5 @@ "anyhow", | ||
| name = "wasmparser" | ||
| version = "0.241.2" | ||
| version = "0.243.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "46d90019b1afd4b808c263e428de644f3003691f243387d30d673211ee0cb8e8" | ||
| checksum = "f6d8db401b0528ec316dfbe579e6ab4152d61739cfe076706d2009127970159d" | ||
| dependencies = [ | ||
@@ -362,3 +362,3 @@ "bitflags", | ||
| name = "wit-bindgen" | ||
| version = "0.48.1" | ||
| version = "0.49.0" | ||
| dependencies = [ | ||
@@ -374,5 +374,5 @@ "bitflags", | ||
| name = "wit-bindgen-core" | ||
| version = "0.48.1" | ||
| version = "0.49.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "9b881a098cae03686d7a0587f8f306f8a58102ad8da8b5599100fbe0e7f5800b" | ||
| checksum = "886e8e938e4e9fe54143c080cbb99d7db5d19242b62ef225dbb28e17b3223bd8" | ||
| dependencies = [ | ||
@@ -386,5 +386,5 @@ "anyhow", | ||
| name = "wit-bindgen-rust" | ||
| version = "0.48.1" | ||
| version = "0.49.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "69667efa439a453e1d50dac939c6cab6d2c3ac724a9d232b6631dad2472a5b70" | ||
| checksum = "145cac8fb12d99aea13a3f9e0d07463fa030edeebab2c03805eda0e1cc229bba" | ||
| dependencies = [ | ||
@@ -403,5 +403,5 @@ "anyhow", | ||
| name = "wit-bindgen-rust-macro" | ||
| version = "0.48.1" | ||
| version = "0.49.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "eae2e22cceb5d105d52326c07e3e67603a861cc7add70fc467f7cc7ec5265017" | ||
| checksum = "6042452ac4e58891cdb6321bb98aabb9827dbaf6f4e971734d8dd86813319aea" | ||
| dependencies = [ | ||
@@ -419,5 +419,5 @@ "anyhow", | ||
| name = "wit-component" | ||
| version = "0.241.2" | ||
| version = "0.243.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "1fd0c57df25e7ee612d946d3b7646c1ddb2310f8280aa2c17e543b66e0812241" | ||
| checksum = "36f9fc53513e461ce51dcf17a3e331752cb829f1d187069e54af5608fc998fe4" | ||
| dependencies = [ | ||
@@ -439,5 +439,5 @@ "anyhow", | ||
| name = "wit-parser" | ||
| version = "0.241.2" | ||
| version = "0.243.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "09ef1c6ad67f35c831abd4039c02894de97034100899614d1c44e2268ad01c91" | ||
| checksum = "df983a8608e513d8997f435bb74207bf0933d0e49ca97aa9d8a6157164b9b7fc" | ||
| dependencies = [ | ||
@@ -444,0 +444,0 @@ "anyhow", |
+3
-2
@@ -16,3 +16,3 @@ # THIS FILE IS AUTOMATICALLY GENERATED BY CARGO | ||
| name = "wit-bindgen" | ||
| version = "0.48.1" | ||
| version = "0.49.0" | ||
| authors = ["Alex Crichton <alex@alexcrichton.com>"] | ||
@@ -54,2 +54,3 @@ build = "build.rs" | ||
| ] | ||
| inter-task-wakeup = ["async"] | ||
| macros = ["dep:wit-bindgen-rust-macro"] | ||
@@ -86,3 +87,3 @@ realloc = [] | ||
| [dependencies.wit-bindgen-rust-macro] | ||
| version = "0.48.1" | ||
| version = "0.49.0" | ||
| optional = true |
+6
-2
@@ -881,7 +881,11 @@ //! Bindings generation support for Rust with the Component Model. | ||
| pub use rt::async_support::spawn; | ||
| #[cfg(feature = "inter-task-wakeup")] | ||
| pub use rt::async_support::UnitStreamOps; | ||
| #[cfg(feature = "async")] | ||
| pub use rt::async_support::{ | ||
| backpressure_dec, backpressure_inc, block_on, yield_async, yield_blocking, AbiBuffer, | ||
| FutureRead, FutureReader, FutureWrite, FutureWriteCancel, FutureWriteError, FutureWriter, | ||
| StreamRead, StreamReader, StreamResult, StreamWrite, StreamWriter, | ||
| FutureOps, FutureRead, FutureReader, FutureWrite, FutureWriteCancel, FutureWriteError, | ||
| FutureWriter, RawFutureRead, RawFutureReader, RawFutureWrite, RawFutureWriter, RawStreamRead, | ||
| RawStreamReader, RawStreamWrite, RawStreamWriter, StreamOps, StreamRead, StreamReader, | ||
| StreamResult, StreamWrite, StreamWriter, | ||
| }; |
+213
-116
| #![deny(missing_docs)] | ||
| extern crate std; | ||
| use core::sync::atomic::{AtomicBool, Ordering}; | ||
| use core::sync::atomic::{AtomicU32, Ordering}; | ||
| use std::boxed::Box; | ||
@@ -24,3 +24,36 @@ use std::collections::BTreeMap; | ||
| } | ||
| } | ||
| /// Helper macro to deduplicate foreign definitions of wasm functions. | ||
| /// | ||
| /// This automatically imports when on wasm targets and then defines a dummy | ||
| /// panicking shim for native targets to support native compilation but fail at | ||
| /// runtime. | ||
| macro_rules! extern_wasm { | ||
| ( | ||
| $(#[$extern_attr:meta])* | ||
| extern "C" { | ||
| $( | ||
| $(#[$func_attr:meta])* | ||
| $vis:vis fn $func_name:ident ( $($args:tt)* ) $(-> $ret:ty)?; | ||
| )* | ||
| } | ||
| ) => { | ||
| $( | ||
| #[cfg(not(target_family = "wasm"))] | ||
| #[allow(unused)] | ||
| $vis unsafe fn $func_name($($args)*) $(-> $ret)? { | ||
| unreachable!(); | ||
| } | ||
| )* | ||
| #[cfg(target_family = "wasm")] | ||
| $(#[$extern_attr])* | ||
| extern "C" { | ||
| $( | ||
| $(#[$func_attr])* | ||
| $vis fn $func_name($($args)*) $(-> $ret)?; | ||
| )* | ||
| } | ||
| }; | ||
| } | ||
@@ -32,7 +65,16 @@ | ||
| mod future_support; | ||
| #[cfg(feature = "inter-task-wakeup")] | ||
| mod inter_task_wakeup; | ||
| mod stream_support; | ||
| mod subtask; | ||
| #[cfg(feature = "inter-task-wakeup")] | ||
| mod unit_stream; | ||
| mod waitable; | ||
| mod waitable_set; | ||
| #[cfg(not(feature = "inter-task-wakeup"))] | ||
| use inter_task_wakeup_disabled as inter_task_wakeup; | ||
| #[cfg(not(feature = "inter-task-wakeup"))] | ||
| mod inter_task_wakeup_disabled; | ||
| use self::waitable_set::WaitableSet; | ||
@@ -45,2 +87,4 @@ pub use abi_buffer::*; | ||
| pub use subtask::Subtask; | ||
| #[cfg(feature = "inter-task-wakeup")] | ||
| pub use unit_stream::*; | ||
@@ -90,2 +134,5 @@ type BoxFuture<'a> = Pin<Box<dyn Future<Output = ()> + 'a>>; | ||
| waker_clone: Waker, | ||
| /// State related to supporting inter-task wakeup scenarios. | ||
| inter_task_wakeup: inter_task_wakeup::State, | ||
| } | ||
@@ -109,2 +156,3 @@ | ||
| }, | ||
| inter_task_wakeup: Default::default(), | ||
| } | ||
@@ -131,3 +179,3 @@ } | ||
| /// return code along with a flag whether this future is "done" or not. | ||
| fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> (u32, bool) { | ||
| fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> CallbackCode { | ||
| match event0 { | ||
@@ -147,37 +195,40 @@ EVENT_NONE => rtdebug!("EVENT_NONE"), | ||
| // transitively run all destructors. | ||
| return (CALLBACK_CODE_EXIT, true); | ||
| return CallbackCode::Exit; | ||
| } | ||
| _ => unreachable!(), | ||
| } | ||
| if event0 != EVENT_NONE { | ||
| self.deliver_waitable_event(event1, event2) | ||
| } | ||
| self.poll() | ||
| } | ||
| self.with_p3_task_set(|me| { | ||
| // Transition our sleep state to ensure that the inter-task stream | ||
| // isn't used since there's no need to use that here. | ||
| me.waker | ||
| .sleep_state | ||
| .store(SLEEP_STATE_WOKEN, Ordering::Relaxed); | ||
| /// Deliver the `code` event to the `waitable` store within our map. This | ||
| /// waitable should be present because it's part of the waitable set which | ||
| /// is kept in-sync with our map. | ||
| fn deliver_waitable_event(&mut self, waitable: u32, code: u32) { | ||
| self.remove_waitable(waitable); | ||
| let (ptr, callback) = self.waitables.remove(&waitable).unwrap(); | ||
| unsafe { | ||
| callback(ptr, code); | ||
| } | ||
| } | ||
| // With all of our context now configured, deliver the event | ||
| // notification this callback corresponds to. | ||
| // | ||
| // Note that this should happen under the reset of | ||
| // `waker.sleep_state` above to ensure that if a waker is woken it | ||
| // won't actually signal our inter-task stream since we're already | ||
| // in the process of handling the future. | ||
| if event0 != EVENT_NONE { | ||
| me.deliver_waitable_event(event1, event2) | ||
| } | ||
| /// Poll this task until it either completes or can't make immediate | ||
| /// progress. | ||
| /// | ||
| /// Returns the code representing what happened along with a boolean as to | ||
| /// whether this execution is done. | ||
| fn poll(&mut self) -> (u32, bool) { | ||
| self.with_p3_task_set(|me| { | ||
| // If there's still an in-progress read (e.g. `event{1,2}`) wasn't | ||
| // ourselves getting woken up, then cancel the read since we're | ||
| // processing the future here anyway. | ||
| me.cancel_inter_task_stream_read(); | ||
| let mut context = Context::from_waker(&me.waker_clone); | ||
| loop { | ||
| // Reset the waker before polling to clear out any pending | ||
| // notification, if any. | ||
| me.waker.0.store(false, Ordering::Relaxed); | ||
| // On each turn of this loop reset the state to "polling" | ||
| // which clears out any pending wakeup if one was sent. This | ||
| // in theory helps minimize wakeups from previous iterations | ||
| // happening in this iteration. | ||
| me.waker | ||
| .sleep_state | ||
| .store(SLEEP_STATE_POLLING, Ordering::Relaxed); | ||
@@ -200,5 +251,5 @@ // Poll our future, seeing if it was able to make progress. | ||
| let waitable = me.waitable_set.as_ref().unwrap().as_raw(); | ||
| break (CALLBACK_CODE_WAIT | (waitable << 4), false); | ||
| break CallbackCode::Wait(waitable); | ||
| } else { | ||
| break (CALLBACK_CODE_EXIT, true); | ||
| break CallbackCode::Exit; | ||
| } | ||
@@ -213,9 +264,19 @@ } | ||
| assert!(!me.tasks.is_empty()); | ||
| if me.waker.0.load(Ordering::Relaxed) { | ||
| break (CALLBACK_CODE_YIELD, false); | ||
| if me.waker.sleep_state.load(Ordering::Relaxed) == SLEEP_STATE_WOKEN { | ||
| if me.remaining_work() { | ||
| let waitable = me.waitable_set.as_ref().unwrap().as_raw(); | ||
| break CallbackCode::Poll(waitable); | ||
| } | ||
| break CallbackCode::Yield; | ||
| } | ||
| assert!(me.remaining_work()); | ||
| // Transition our state to "sleeping" so wakeup | ||
| // notifications know that they need to signal the | ||
| // inter-task stream. | ||
| me.waker | ||
| .sleep_state | ||
| .store(SLEEP_STATE_SLEEPING, Ordering::Relaxed); | ||
| me.read_inter_task_stream(); | ||
| let waitable = me.waitable_set.as_ref().unwrap().as_raw(); | ||
| break (CALLBACK_CODE_WAIT | (waitable << 4), false); | ||
| break CallbackCode::Wait(waitable); | ||
| } | ||
@@ -227,2 +288,21 @@ } | ||
| /// Deliver the `code` event to the `waitable` store within our map. This | ||
| /// waitable should be present because it's part of the waitable set which | ||
| /// is kept in-sync with our map. | ||
| fn deliver_waitable_event(&mut self, waitable: u32, code: u32) { | ||
| self.remove_waitable(waitable); | ||
| if self | ||
| .inter_task_wakeup | ||
| .consume_waitable_event(waitable, code) | ||
| { | ||
| return; | ||
| } | ||
| let (ptr, callback) = self.waitables.remove(&waitable).unwrap(); | ||
| unsafe { | ||
| callback(ptr, code); | ||
| } | ||
| } | ||
| fn with_p3_task_set<R>(&mut self, f: impl FnOnce(&mut Self) -> R) -> R { | ||
@@ -252,2 +332,6 @@ // Finish our `wasip3_task` by initializing its self-referential pointer, | ||
| fn drop(&mut self) { | ||
| // If there's an active read of the inter-task stream, go ahead and | ||
| // cancel it, since we're about to drop the stream anyway. | ||
| self.cancel_inter_task_stream_read(); | ||
| // If this state has active tasks then they need to be dropped which may | ||
@@ -291,4 +375,17 @@ // execute arbitrary code. This arbitrary code might require the p3 APIs | ||
| /// Status for "this task is actively being polled" | ||
| const SLEEP_STATE_POLLING: u32 = 0; | ||
| /// Status for "this task has a wakeup scheduled, no more action need be taken". | ||
| const SLEEP_STATE_WOKEN: u32 = 1; | ||
| /// Status for "this task is not being polled and has not been woken" | ||
| /// | ||
| /// Wakeups on this status signal the inter-task stream. | ||
| const SLEEP_STATE_SLEEPING: u32 = 2; | ||
| #[derive(Default)] | ||
| struct FutureWaker(AtomicBool); | ||
| struct FutureWaker { | ||
| /// One of `SLEEP_STATE_*` indicating the current status. | ||
| sleep_state: AtomicU32, | ||
| inter_task_stream: inter_task_wakeup::WakerState, | ||
| } | ||
@@ -301,3 +398,14 @@ impl Wake for FutureWaker { | ||
| fn wake_by_ref(self: &Arc<Self>) { | ||
| self.0.store(true, Ordering::Relaxed) | ||
| match self.sleep_state.swap(SLEEP_STATE_WOKEN, Ordering::Relaxed) { | ||
| // If this future was currently being polled, or if someone else | ||
| // already woke it up, then there's nothing to do. | ||
| SLEEP_STATE_POLLING | SLEEP_STATE_WOKEN => {} | ||
| // If this future is sleeping, however, then this is a cross-task | ||
| // wakeup meaning that we need to write to its wakeup stream. | ||
| other => { | ||
| assert_eq!(other, SLEEP_STATE_SLEEPING); | ||
| self.inter_task_stream.wake(); | ||
| } | ||
| } | ||
| } | ||
@@ -314,7 +422,21 @@ } | ||
| const CALLBACK_CODE_EXIT: u32 = 0; | ||
| const CALLBACK_CODE_YIELD: u32 = 1; | ||
| const CALLBACK_CODE_WAIT: u32 = 2; | ||
| const _CALLBACK_CODE_POLL: u32 = 3; | ||
| #[derive(PartialEq, Debug)] | ||
| enum CallbackCode { | ||
| Exit, | ||
| Yield, | ||
| Wait(u32), | ||
| Poll(u32), | ||
| } | ||
| impl CallbackCode { | ||
| fn encode(self) -> u32 { | ||
| match self { | ||
| CallbackCode::Exit => 0, | ||
| CallbackCode::Yield => 1, | ||
| CallbackCode::Wait(waitable) => 2 | (waitable << 4), | ||
| CallbackCode::Poll(waitable) => 3 | (waitable << 4), | ||
| } | ||
| } | ||
| } | ||
| const STATUS_STARTING: u32 = 0; | ||
@@ -409,4 +531,4 @@ const STATUS_STARTED: u32 = 1; | ||
| unsafe { | ||
| let (rc, done) = (*state).callback(event0, event1, event2); | ||
| if done { | ||
| let rc = (*state).callback(event0, event1, event2); | ||
| if rc == CallbackCode::Exit { | ||
| drop(Box::from_raw(state)); | ||
@@ -416,4 +538,4 @@ } else { | ||
| } | ||
| rtdebug!(" => (cb) {rc:#x}"); | ||
| rc | ||
| rtdebug!(" => (cb) {rc:?}"); | ||
| rc.encode() | ||
| } | ||
@@ -435,8 +557,10 @@ } | ||
| match state.callback(event.0, event.1, event.2) { | ||
| (_, true) => { | ||
| CallbackCode::Exit => { | ||
| drop(state); | ||
| break result.unwrap(); | ||
| } | ||
| (CALLBACK_CODE_YIELD, false) => event = state.waitable_set.as_ref().unwrap().poll(), | ||
| _ => event = state.waitable_set.as_ref().unwrap().wait(), | ||
| CallbackCode::Yield | CallbackCode::Poll(_) => { | ||
| event = state.waitable_set.as_ref().unwrap().poll() | ||
| } | ||
| CallbackCode::Wait(_) => event = state.waitable_set.as_ref().unwrap().wait(), | ||
| } | ||
@@ -467,13 +591,10 @@ } | ||
| pub fn yield_blocking() -> bool { | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn yield_() -> bool { | ||
| unreachable!(); | ||
| extern_wasm! { | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[thread-yield]"] | ||
| fn yield_() -> bool; | ||
| } | ||
| } | ||
| #[cfg(target_arch = "wasm32")] | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[thread-yield]"] | ||
| fn yield_() -> bool; | ||
| } | ||
| // Note that the return value from the raw intrinsic is inverted, the | ||
@@ -529,14 +650,10 @@ // canonical ABI returns "did this task get cancelled" while this function | ||
| pub fn backpressure_set(enabled: bool) { | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn backpressure_set(_: i32) { | ||
| unreachable!(); | ||
| extern_wasm! { | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[backpressure-set]"] | ||
| fn backpressure_set(_: i32); | ||
| } | ||
| } | ||
| #[cfg(target_arch = "wasm32")] | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[backpressure-set]"] | ||
| fn backpressure_set(_: i32); | ||
| } | ||
| unsafe { backpressure_set(if enabled { 1 } else { 0 }) } | ||
@@ -547,14 +664,10 @@ } | ||
| pub fn backpressure_inc() { | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn backpressure_inc() { | ||
| unreachable!(); | ||
| extern_wasm! { | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[backpressure-inc]"] | ||
| fn backpressure_inc(); | ||
| } | ||
| } | ||
| #[cfg(target_arch = "wasm32")] | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[backpressure-inc]"] | ||
| fn backpressure_inc(); | ||
| } | ||
| unsafe { backpressure_inc() } | ||
@@ -565,14 +678,10 @@ } | ||
| pub fn backpressure_dec() { | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn backpressure_dec() { | ||
| unreachable!(); | ||
| extern_wasm! { | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[backpressure-dec]"] | ||
| fn backpressure_dec(); | ||
| } | ||
| } | ||
| #[cfg(target_arch = "wasm32")] | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[backpressure-dec]"] | ||
| fn backpressure_dec(); | ||
| } | ||
| unsafe { backpressure_dec() } | ||
@@ -582,14 +691,10 @@ } | ||
| fn context_get() -> *mut u8 { | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn get() -> *mut u8 { | ||
| unreachable!() | ||
| extern_wasm! { | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[context-get-0]"] | ||
| fn get() -> *mut u8; | ||
| } | ||
| } | ||
| #[cfg(target_arch = "wasm32")] | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[context-get-0]"] | ||
| fn get() -> *mut u8; | ||
| } | ||
| unsafe { get() } | ||
@@ -599,14 +704,10 @@ } | ||
| unsafe fn context_set(value: *mut u8) { | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn set(_: *mut u8) { | ||
| unreachable!() | ||
| extern_wasm! { | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[context-set-0]"] | ||
| fn set(value: *mut u8); | ||
| } | ||
| } | ||
| #[cfg(target_arch = "wasm32")] | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[context-set-0]"] | ||
| fn set(value: *mut u8); | ||
| } | ||
| unsafe { set(value) } | ||
@@ -634,16 +735,12 @@ } | ||
| fn drop(&mut self) { | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn cancel() { | ||
| unreachable!() | ||
| extern_wasm! { | ||
| #[link(wasm_import_module = "[export]$root")] | ||
| extern "C" { | ||
| #[link_name = "[task-cancel]"] | ||
| fn cancel(); | ||
| } | ||
| } | ||
| #[cfg(target_arch = "wasm32")] | ||
| #[link(wasm_import_module = "[export]$root")] | ||
| extern "C" { | ||
| #[link_name = "[task-cancel]"] | ||
| fn cancel(); | ||
| } | ||
| unsafe { cancel() } | ||
| } | ||
| } |
@@ -1,2 +0,2 @@ | ||
| use crate::rt::async_support::StreamVtable; | ||
| use crate::rt::async_support::StreamOps; | ||
| use crate::rt::Cleanup; | ||
@@ -19,5 +19,5 @@ use std::alloc::Layout; | ||
| /// future's return value. | ||
| pub struct AbiBuffer<T: 'static> { | ||
| rust_storage: Vec<MaybeUninit<T>>, | ||
| vtable: &'static StreamVtable<T>, | ||
| pub struct AbiBuffer<O: StreamOps> { | ||
| rust_storage: Vec<MaybeUninit<O::Payload>>, | ||
| ops: O, | ||
| alloc: Option<Cleanup>, | ||
@@ -27,6 +27,4 @@ cursor: usize, | ||
| impl<T: 'static> AbiBuffer<T> { | ||
| pub(crate) fn new(mut vec: Vec<T>, vtable: &'static StreamVtable<T>) -> AbiBuffer<T> { | ||
| assert_eq!(vtable.lower.is_some(), vtable.lift.is_some()); | ||
| impl<O: StreamOps> AbiBuffer<O> { | ||
| pub(crate) fn new(mut vec: Vec<O::Payload>, mut ops: O) -> AbiBuffer<O> { | ||
| // SAFETY: We're converting `Vec<T>` to `Vec<MaybeUninit<T>>`, which | ||
@@ -39,3 +37,3 @@ // should be safe. | ||
| mem::forget(vec); | ||
| Vec::<MaybeUninit<T>>::from_raw_parts(ptr.cast(), len, cap) | ||
| Vec::<MaybeUninit<O::Payload>>::from_raw_parts(ptr.cast(), len, cap) | ||
| }; | ||
@@ -49,10 +47,12 @@ | ||
| // skip this entirely. | ||
| let alloc = vtable.lower.and_then(|lower| { | ||
| let alloc = if ops.native_abi_matches_canonical_abi() { | ||
| None | ||
| } else { | ||
| let elem_layout = ops.elem_layout(); | ||
| let layout = Layout::from_size_align( | ||
| vtable.layout.size() * rust_storage.len(), | ||
| vtable.layout.align(), | ||
| elem_layout.size() * rust_storage.len(), | ||
| elem_layout.align(), | ||
| ) | ||
| .unwrap(); | ||
| let (mut ptr, cleanup) = Cleanup::new(layout); | ||
| let cleanup = cleanup?; | ||
| // SAFETY: All items in `rust_storage` are already initialized so | ||
@@ -64,13 +64,12 @@ // it should be safe to read them and move ownership into the | ||
| let item = item.assume_init_read(); | ||
| lower(item, ptr); | ||
| ptr = ptr.add(vtable.layout.size()); | ||
| ops.lower(item, ptr); | ||
| ptr = ptr.add(elem_layout.size()); | ||
| } | ||
| } | ||
| Some(cleanup) | ||
| }); | ||
| cleanup | ||
| }; | ||
| AbiBuffer { | ||
| rust_storage, | ||
| alloc, | ||
| vtable, | ||
| ops, | ||
| cursor: 0, | ||
@@ -86,3 +85,3 @@ } | ||
| // situation the list would have been un-tampered with above. | ||
| if self.vtable.lower.is_none() { | ||
| if self.ops.native_abi_matches_canonical_abi() { | ||
| // SAFETY: this should be in-bounds, so it should be safe. | ||
@@ -103,3 +102,3 @@ let ptr = unsafe { self.rust_storage.as_ptr().add(self.cursor).cast() }; | ||
| // SAFETY: this should be in-bounds, so it should be safe. | ||
| unsafe { ptr.add(self.cursor * self.vtable.layout.size()) }, | ||
| unsafe { ptr.add(self.cursor * self.ops.elem_layout().size()) }, | ||
| self.rust_storage.len() - self.cursor, | ||
@@ -121,3 +120,3 @@ ) | ||
| /// to the start of the vector. | ||
| pub fn into_vec(mut self) -> Vec<T> { | ||
| pub fn into_vec(mut self) -> Vec<O::Payload> { | ||
| self.take_vec() | ||
@@ -138,6 +137,6 @@ } | ||
| assert!(amt + self.cursor <= self.rust_storage.len()); | ||
| let Some(dealloc_lists) = self.vtable.dealloc_lists else { | ||
| if !self.ops.contains_lists() { | ||
| self.cursor += amt; | ||
| return; | ||
| }; | ||
| } | ||
| let (mut ptr, len) = self.abi_ptr_and_len(); | ||
@@ -150,4 +149,4 @@ assert!(amt <= len); | ||
| unsafe { | ||
| dealloc_lists(ptr.cast_mut()); | ||
| ptr = ptr.add(self.vtable.layout.size()); | ||
| self.ops.dealloc_lists(ptr.cast_mut()); | ||
| ptr = ptr.add(self.ops.elem_layout().size()); | ||
| } | ||
@@ -158,3 +157,3 @@ } | ||
| fn take_vec(&mut self) -> Vec<T> { | ||
| fn take_vec(&mut self) -> Vec<O::Payload> { | ||
| // First, if necessary, convert remaining values within `self.alloc` | ||
@@ -169,3 +168,3 @@ // back into `self.rust_storage`. This is necessary when a lift | ||
| // operation, moving all the values back into the vector. | ||
| if let Some(lift) = self.vtable.lift { | ||
| if !self.ops.native_abi_matches_canonical_abi() { | ||
| let (mut ptr, mut len) = self.abi_ptr_and_len(); | ||
@@ -177,4 +176,4 @@ // SAFETY: this should be safe as `lift` is operating on values that | ||
| for dst in self.rust_storage[self.cursor..].iter_mut() { | ||
| dst.write(lift(ptr.cast_mut())); | ||
| ptr = ptr.add(self.vtable.layout.size()); | ||
| dst.write(self.ops.lift(ptr.cast_mut())); | ||
| ptr = ptr.add(self.ops.elem_layout().size()); | ||
| len -= 1; | ||
@@ -203,3 +202,3 @@ } | ||
| mem::forget(storage); | ||
| Vec::<T>::from_raw_parts(ptr.cast(), len, cap) | ||
| Vec::<O::Payload>::from_raw_parts(ptr.cast(), len, cap) | ||
| } | ||
@@ -209,3 +208,6 @@ } | ||
| impl<T> Drop for AbiBuffer<T> { | ||
| impl<O> Drop for AbiBuffer<O> | ||
| where | ||
| O: StreamOps, | ||
| { | ||
| fn drop(&mut self) { | ||
@@ -219,2 +221,3 @@ let _ = self.take_vec(); | ||
| use super::*; | ||
| use crate::rt::async_support::StreamVtable; | ||
| use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; | ||
@@ -221,0 +224,0 @@ use std::vec; |
@@ -49,24 +49,19 @@ //! Definition of the "C ABI" of how imported functions interact with exported | ||
| #[cfg(target_family = "wasm")] | ||
| extern "C" { | ||
| /// Sets the global task pointer to `ptr` provided. Returns the previous | ||
| /// value. | ||
| /// | ||
| /// This function acts as a dual getter and a setter. To get the | ||
| /// current task pointer a dummy `ptr` can be provided (e.g. NULL) and then | ||
| /// it's passed back when you're done working with it. When setting the | ||
| /// current task pointer it's recommended to call this and then call it | ||
| /// again with the previous value when the tasks's work is done. | ||
| /// | ||
| /// For executors they need to ensure that the `ptr` passed in lives for | ||
| /// the entire lifetime of the component model task. | ||
| pub fn wasip3_task_set(ptr: *mut wasip3_task) -> *mut wasip3_task; | ||
| extern_wasm! { | ||
| extern "C" { | ||
| /// Sets the global task pointer to `ptr` provided. Returns the previous | ||
| /// value. | ||
| /// | ||
| /// This function acts as a dual getter and a setter. To get the | ||
| /// current task pointer a dummy `ptr` can be provided (e.g. NULL) and then | ||
| /// it's passed back when you're done working with it. When setting the | ||
| /// current task pointer it's recommended to call this and then call it | ||
| /// again with the previous value when the tasks's work is done. | ||
| /// | ||
| /// For executors they need to ensure that the `ptr` passed in lives for | ||
| /// the entire lifetime of the component model task. | ||
| pub fn wasip3_task_set(ptr: *mut wasip3_task) -> *mut wasip3_task; | ||
| } | ||
| } | ||
| #[cfg(not(target_family = "wasm"))] | ||
| pub unsafe extern "C" fn wasip3_task_set(ptr: *mut wasip3_task) -> *mut wasip3_task { | ||
| let _ = ptr; | ||
| unreachable!(); | ||
| } | ||
| /// The first version of `wasip3_task` which implies the existence of the | ||
@@ -73,0 +68,0 @@ /// fields `ptr`, `waitable_register`, and `waitable_unregister`. |
@@ -63,6 +63,3 @@ //! Raw bindings to `error-context` in the canonical ABI. | ||
| fn drop(&mut self) { | ||
| #[cfg(target_arch = "wasm32")] | ||
| unsafe { | ||
| drop(self.handle) | ||
| } | ||
| unsafe { drop(self.handle) } | ||
| } | ||
@@ -77,20 +74,12 @@ } | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn new(_: *const u8, _: usize) -> u32 { | ||
| unreachable!() | ||
| extern_wasm! { | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[error-context-new-utf8]"] | ||
| fn new(_: *const u8, _: usize) -> u32; | ||
| #[link_name = "[error-context-drop]"] | ||
| fn drop(_: u32); | ||
| #[link_name = "[error-context-debug-message-utf8]"] | ||
| fn debug_message(_: u32, _: &mut RetPtr); | ||
| } | ||
| } | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| fn debug_message(_: u32, _: &mut RetPtr) { | ||
| unreachable!() | ||
| } | ||
| #[cfg(target_arch = "wasm32")] | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[error-context-new-utf8]"] | ||
| fn new(_: *const u8, _: usize) -> u32; | ||
| #[link_name = "[error-context-drop]"] | ||
| fn drop(_: u32); | ||
| #[link_name = "[error-context-debug-message-utf8]"] | ||
| fn debug_message(_: u32, _: &mut RetPtr); | ||
| } |
@@ -12,3 +12,2 @@ //! For a high-level overview of how this module is implemented see the | ||
| future::Future, | ||
| marker, | ||
| pin::Pin, | ||
@@ -26,2 +25,48 @@ ptr, | ||
| #[doc(hidden)] | ||
| pub unsafe trait StreamOps: Clone { | ||
| /// The Rust type that's sent or received on this stream. | ||
| type Payload: 'static; | ||
| /// The `stream.new` intrinsic. | ||
| fn new(&mut self) -> u64; | ||
| /// The canonical ABI layout of the type that this stream is | ||
| /// sending/receiving. | ||
| fn elem_layout(&self) -> Layout; | ||
| /// Returns whether `lift` or `lower` is required to create `Self::Payload`. | ||
| /// | ||
| /// If this returns `false` then `Self::Payload` is natively in its | ||
| /// canonical ABI representation. | ||
| fn native_abi_matches_canonical_abi(&self) -> bool; | ||
| /// Returns whether `O::Payload` has lists that need to be deallocated with | ||
| /// `dealloc_lists`. | ||
| fn contains_lists(&self) -> bool; | ||
| /// Converts a Rust type to its canonical ABI representation. | ||
| unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8); | ||
| /// Used to deallocate any Rust-owned lists in the canonical ABI | ||
| /// representation for when a value is successfully sent but needs to be | ||
| /// cleaned up. | ||
| unsafe fn dealloc_lists(&mut self, dst: *mut u8); | ||
| /// Converts from the canonical ABI representation to a Rust value. | ||
| unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload; | ||
| /// The `stream.write` intrinsic | ||
| unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32; | ||
| /// The `stream.read` intrinsic | ||
| unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32; | ||
| /// The `stream.cancel-read` intrinsic | ||
| unsafe fn cancel_read(&mut self, stream: u32) -> u32; | ||
| /// The `stream.cancel-write` intrinsic | ||
| unsafe fn cancel_write(&mut self, stream: u32) -> u32; | ||
| /// The `stream.drop-readable` intrinsic | ||
| unsafe fn drop_readable(&mut self, stream: u32); | ||
| /// The `stream.drop-writable` intrinsic | ||
| unsafe fn drop_writable(&mut self, stream: u32); | ||
| } | ||
| /// Operations that a stream requires throughout the implementation. | ||
| /// | ||
| /// This is generated by `wit_bindgen::generate!` primarily. | ||
| #[doc(hidden)] | ||
| pub struct StreamVtable<T> { | ||
@@ -69,2 +114,50 @@ /// The in-memory canonical ABI layout of a single value of `T`. | ||
| unsafe impl<T: 'static> StreamOps for &StreamVtable<T> { | ||
| type Payload = T; | ||
| fn new(&mut self) -> u64 { | ||
| unsafe { (self.new)() } | ||
| } | ||
| fn elem_layout(&self) -> Layout { | ||
| self.layout | ||
| } | ||
| fn native_abi_matches_canonical_abi(&self) -> bool { | ||
| self.lift.is_none() | ||
| } | ||
| fn contains_lists(&self) -> bool { | ||
| self.dealloc_lists.is_some() | ||
| } | ||
| unsafe fn lower(&mut self, payload: Self::Payload, dst: *mut u8) { | ||
| if let Some(f) = self.lower { | ||
| f(payload, dst) | ||
| } | ||
| } | ||
| unsafe fn dealloc_lists(&mut self, dst: *mut u8) { | ||
| if let Some(f) = self.dealloc_lists { | ||
| f(dst) | ||
| } | ||
| } | ||
| unsafe fn lift(&mut self, dst: *mut u8) -> Self::Payload { | ||
| (self.lift.unwrap())(dst) | ||
| } | ||
| unsafe fn start_write(&mut self, stream: u32, val: *const u8, amt: usize) -> u32 { | ||
| (self.start_write)(stream, val, amt) | ||
| } | ||
| unsafe fn start_read(&mut self, stream: u32, val: *mut u8, amt: usize) -> u32 { | ||
| (self.start_read)(stream, val, amt) | ||
| } | ||
| unsafe fn cancel_read(&mut self, stream: u32) -> u32 { | ||
| (self.cancel_read)(stream) | ||
| } | ||
| unsafe fn cancel_write(&mut self, stream: u32) -> u32 { | ||
| (self.cancel_write)(stream) | ||
| } | ||
| unsafe fn drop_readable(&mut self, stream: u32) { | ||
| (self.drop_readable)(stream) | ||
| } | ||
| unsafe fn drop_writable(&mut self, stream: u32) { | ||
| (self.drop_writable)(stream) | ||
| } | ||
| } | ||
| /// Helper function to create a new read/write pair for a component model | ||
@@ -75,4 +168,13 @@ /// stream. | ||
| ) -> (StreamWriter<T>, StreamReader<T>) { | ||
| unsafe { raw_stream_new(vtable) } | ||
| } | ||
| /// Helper function to create a new read/write pair for a component model | ||
| /// stream. | ||
| pub unsafe fn raw_stream_new<O>(mut ops: O) -> (RawStreamWriter<O>, RawStreamReader<O>) | ||
| where | ||
| O: StreamOps + Clone, | ||
| { | ||
| unsafe { | ||
| let handles = (vtable.new)(); | ||
| let handles = ops.new(); | ||
| let reader = handles as u32; | ||
@@ -82,4 +184,4 @@ let writer = (handles >> 32) as u32; | ||
| ( | ||
| StreamWriter::new(writer, vtable), | ||
| StreamReader::new(reader, vtable), | ||
| RawStreamWriter::new(writer, ops.clone()), | ||
| RawStreamReader::new(reader, ops), | ||
| ) | ||
@@ -90,14 +192,20 @@ } | ||
| /// Represents the writable end of a Component Model `stream`. | ||
| pub struct StreamWriter<T: 'static> { | ||
| pub type StreamWriter<T> = RawStreamWriter<&'static StreamVtable<T>>; | ||
| /// Represents the writable end of a Component Model `stream`. | ||
| pub struct RawStreamWriter<O: StreamOps> { | ||
| handle: u32, | ||
| vtable: &'static StreamVtable<T>, | ||
| ops: O, | ||
| done: bool, | ||
| } | ||
| impl<T> StreamWriter<T> { | ||
| impl<O> RawStreamWriter<O> | ||
| where | ||
| O: StreamOps, | ||
| { | ||
| #[doc(hidden)] | ||
| pub unsafe fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self { | ||
| pub unsafe fn new(handle: u32, ops: O) -> Self { | ||
| Self { | ||
| handle, | ||
| vtable, | ||
| ops, | ||
| done: false, | ||
@@ -107,2 +215,8 @@ } | ||
| /// Returns the index of the component-model handle that this stream is | ||
| /// using. | ||
| pub fn handle(&self) -> u32 { | ||
| self.handle | ||
| } | ||
| /// Initiate a write of the `values` provided into this stream. | ||
@@ -145,4 +259,4 @@ /// | ||
| /// happened must be done with [`StreamWrite::cancel`]. | ||
| pub fn write(&mut self, values: Vec<T>) -> StreamWrite<'_, T> { | ||
| self.write_buf(AbiBuffer::new(values, self.vtable)) | ||
| pub fn write(&mut self, values: Vec<O::Payload>) -> RawStreamWrite<'_, O> { | ||
| self.write_buf(AbiBuffer::new(values, self.ops.clone())) | ||
| } | ||
@@ -152,5 +266,5 @@ | ||
| /// instead of `Vec<T>`. | ||
| pub fn write_buf(&mut self, values: AbiBuffer<T>) -> StreamWrite<'_, T> { | ||
| StreamWrite { | ||
| op: WaitableOperation::new(StreamWriteOp(marker::PhantomData), (self, values)), | ||
| pub fn write_buf(&mut self, values: AbiBuffer<O>) -> RawStreamWrite<'_, O> { | ||
| RawStreamWrite { | ||
| op: WaitableOperation::new(StreamWriteOp { writer: self }, values), | ||
| } | ||
@@ -166,3 +280,3 @@ } | ||
| /// not sent because the stream was dropped. | ||
| pub async fn write_all(&mut self, values: Vec<T>) -> Vec<T> { | ||
| pub async fn write_all(&mut self, values: Vec<O::Payload>) -> Vec<O::Payload> { | ||
| // Perform an initial write which converts `values` into `AbiBuffer`. | ||
@@ -200,3 +314,3 @@ let (mut status, mut buf) = self.write(values).await; | ||
| /// sent. | ||
| pub async fn write_one(&mut self, value: T) -> Option<T> { | ||
| pub async fn write_one(&mut self, value: O::Payload) -> Option<O::Payload> { | ||
| // TODO: can probably be a bit more efficient about this and avoid | ||
@@ -209,3 +323,6 @@ // moving `value` onto the heap in some situations, but that's left as | ||
| impl<T> fmt::Debug for StreamWriter<T> { | ||
| impl<O> fmt::Debug for RawStreamWriter<O> | ||
| where | ||
| O: StreamOps, | ||
| { | ||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
@@ -218,7 +335,10 @@ f.debug_struct("StreamWriter") | ||
| impl<T> Drop for StreamWriter<T> { | ||
| impl<O> Drop for RawStreamWriter<O> | ||
| where | ||
| O: StreamOps, | ||
| { | ||
| fn drop(&mut self) { | ||
| rtdebug!("stream.drop-writable({})", self.handle); | ||
| unsafe { | ||
| (self.vtable.drop_writable)(self.handle); | ||
| self.ops.drop_writable(self.handle); | ||
| } | ||
@@ -229,7 +349,12 @@ } | ||
| /// Represents a write operation which may be cancelled prior to completion. | ||
| pub struct StreamWrite<'a, T: 'static> { | ||
| op: WaitableOperation<StreamWriteOp<'a, T>>, | ||
| pub type StreamWrite<'a, T> = RawStreamWrite<'a, &'static StreamVtable<T>>; | ||
| /// Represents a write operation which may be cancelled prior to completion. | ||
| pub struct RawStreamWrite<'a, O: StreamOps> { | ||
| op: WaitableOperation<StreamWriteOp<'a, O>>, | ||
| } | ||
| struct StreamWriteOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamWriter<T>, T)>); | ||
| struct StreamWriteOp<'a, O: StreamOps> { | ||
| writer: &'a mut RawStreamWriter<O>, | ||
| } | ||
@@ -251,14 +376,14 @@ /// Result of a [`StreamWriter::write`] or [`StreamReader::read`] operation, | ||
| unsafe impl<'a, T> WaitableOp for StreamWriteOp<'a, T> | ||
| unsafe impl<'a, O> WaitableOp for StreamWriteOp<'a, O> | ||
| where | ||
| T: 'static, | ||
| O: StreamOps, | ||
| { | ||
| type Start = (&'a mut StreamWriter<T>, AbiBuffer<T>); | ||
| type InProgress = (&'a mut StreamWriter<T>, AbiBuffer<T>); | ||
| type Result = (StreamResult, AbiBuffer<T>); | ||
| type Cancel = (StreamResult, AbiBuffer<T>); | ||
| type Start = AbiBuffer<O>; | ||
| type InProgress = AbiBuffer<O>; | ||
| type Result = (StreamResult, AbiBuffer<O>); | ||
| type Cancel = (StreamResult, AbiBuffer<O>); | ||
| fn start(&mut self, (writer, buf): Self::Start) -> (u32, Self::InProgress) { | ||
| if writer.done { | ||
| return (DROPPED, (writer, buf)); | ||
| fn start(&mut self, buf: Self::Start) -> (u32, Self::InProgress) { | ||
| if self.writer.done { | ||
| return (DROPPED, buf); | ||
| } | ||
@@ -269,11 +394,11 @@ | ||
| // `AbiBuffer` is trying to make this safe. | ||
| let code = unsafe { (writer.vtable.start_write)(writer.handle, ptr, len) }; | ||
| let code = unsafe { self.writer.ops.start_write(self.writer.handle, ptr, len) }; | ||
| rtdebug!( | ||
| "stream.write({}, {ptr:?}, {len}) = {code:#x}", | ||
| writer.handle | ||
| self.writer.handle | ||
| ); | ||
| (code, (writer, buf)) | ||
| (code, buf) | ||
| } | ||
| fn start_cancelled(&mut self, (_writer, buf): Self::Start) -> Self::Cancel { | ||
| fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel { | ||
| (StreamResult::Cancelled, buf) | ||
@@ -284,7 +409,7 @@ } | ||
| &mut self, | ||
| (writer, mut buf): Self::InProgress, | ||
| mut buf: Self::InProgress, | ||
| code: u32, | ||
| ) -> Result<Self::Result, Self::InProgress> { | ||
| match ReturnCode::decode(code) { | ||
| ReturnCode::Blocked => Err((writer, buf)), | ||
| ReturnCode::Blocked => Err(buf), | ||
| ReturnCode::Dropped(0) => Ok((StreamResult::Dropped, buf)), | ||
@@ -298,3 +423,3 @@ ReturnCode::Cancelled(0) => Ok((StreamResult::Cancelled, buf)), | ||
| if let ReturnCode::Dropped(_) = code { | ||
| writer.done = true; | ||
| self.writer.done = true; | ||
| } | ||
@@ -306,11 +431,11 @@ Ok((StreamResult::Complete(amt), buf)) | ||
| fn in_progress_waitable(&mut self, (writer, _): &Self::InProgress) -> u32 { | ||
| writer.handle | ||
| fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 { | ||
| self.writer.handle | ||
| } | ||
| fn in_progress_cancel(&mut self, (writer, _): &mut Self::InProgress) -> u32 { | ||
| fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 { | ||
| // SAFETY: we're managing `writer` and all the various operational bits, | ||
| // so this relies on `WaitableOperation` being safe. | ||
| let code = unsafe { (writer.vtable.cancel_write)(writer.handle) }; | ||
| rtdebug!("stream.cancel-write({}) = {code:#x}", writer.handle); | ||
| let code = unsafe { self.writer.ops.cancel_write(self.writer.handle) }; | ||
| rtdebug!("stream.cancel-write({}) = {code:#x}", self.writer.handle); | ||
| code | ||
@@ -324,4 +449,4 @@ } | ||
| impl<T: 'static> Future for StreamWrite<'_, T> { | ||
| type Output = (StreamResult, AbiBuffer<T>); | ||
| impl<O: StreamOps> Future for RawStreamWrite<'_, O> { | ||
| type Output = (StreamResult, AbiBuffer<O>); | ||
@@ -333,4 +458,4 @@ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| impl<'a, T: 'static> StreamWrite<'a, T> { | ||
| fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, T>>> { | ||
| impl<'a, O: StreamOps> RawStreamWrite<'a, O> { | ||
| fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamWriteOp<'a, O>>> { | ||
| // SAFETY: we've chosen that when `Self` is pinned that it translates to | ||
@@ -351,3 +476,3 @@ // always pinning the inner field, so that's codified here. | ||
| /// or if this method is called twice. | ||
| pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<T>) { | ||
| pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, AbiBuffer<O>) { | ||
| self.pin_project().cancel() | ||
@@ -358,9 +483,12 @@ } | ||
| /// Represents the readable end of a Component Model `stream`. | ||
| pub struct StreamReader<T: 'static> { | ||
| pub type StreamReader<T> = RawStreamReader<&'static StreamVtable<T>>; | ||
| /// Represents the readable end of a Component Model `stream`. | ||
| pub struct RawStreamReader<O: StreamOps> { | ||
| handle: AtomicU32, | ||
| vtable: &'static StreamVtable<T>, | ||
| ops: O, | ||
| done: bool, | ||
| } | ||
| impl<T> fmt::Debug for StreamReader<T> { | ||
| impl<O: StreamOps> fmt::Debug for RawStreamReader<O> { | ||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
@@ -373,8 +501,8 @@ f.debug_struct("StreamReader") | ||
| impl<T> StreamReader<T> { | ||
| impl<O: StreamOps> RawStreamReader<O> { | ||
| #[doc(hidden)] | ||
| pub fn new(handle: u32, vtable: &'static StreamVtable<T>) -> Self { | ||
| pub fn new(handle: u32, ops: O) -> Self { | ||
| Self { | ||
| handle: AtomicU32::new(handle), | ||
| vtable, | ||
| ops, | ||
| done: false, | ||
@@ -391,3 +519,5 @@ } | ||
| fn handle(&self) -> u32 { | ||
| /// Returns the index of the component-model handle that this stream is | ||
| /// using. | ||
| pub fn handle(&self) -> u32 { | ||
| self.opt_handle().unwrap() | ||
@@ -418,5 +548,5 @@ } | ||
| /// used. | ||
| pub fn read(&mut self, buf: Vec<T>) -> StreamRead<'_, T> { | ||
| StreamRead { | ||
| op: WaitableOperation::new(StreamReadOp(marker::PhantomData), (self, buf)), | ||
| pub fn read(&mut self, buf: Vec<O::Payload>) -> RawStreamRead<'_, O> { | ||
| RawStreamRead { | ||
| op: WaitableOperation::new(StreamReadOp { reader: self }, buf), | ||
| } | ||
@@ -429,3 +559,3 @@ } | ||
| /// reads only a single item and does not expose control over cancellation. | ||
| pub async fn next(&mut self) -> Option<T> { | ||
| pub async fn next(&mut self) -> Option<O::Payload> { | ||
| // TODO: should amortize this allocation and avoid doing it every time. | ||
@@ -441,3 +571,3 @@ // Or somehow perhaps make this more optimal. | ||
| /// and await the stream to be dropped. | ||
| pub async fn collect(mut self) -> Vec<T> { | ||
| pub async fn collect(mut self) -> Vec<O::Payload> { | ||
| let mut ret = Vec::new(); | ||
@@ -463,3 +593,3 @@ loop { | ||
| impl<T> Drop for StreamReader<T> { | ||
| impl<O: StreamOps> Drop for RawStreamReader<O> { | ||
| fn drop(&mut self) { | ||
@@ -471,3 +601,3 @@ let Some(handle) = self.opt_handle() else { | ||
| rtdebug!("stream.drop-readable({})", handle); | ||
| (self.vtable.drop_readable)(handle); | ||
| self.ops.drop_readable(handle); | ||
| } | ||
@@ -478,20 +608,22 @@ } | ||
| /// Represents a read operation which may be cancelled prior to completion. | ||
| pub struct StreamRead<'a, T: 'static> { | ||
| op: WaitableOperation<StreamReadOp<'a, T>>, | ||
| pub type StreamRead<'a, T> = RawStreamRead<'a, &'static StreamVtable<T>>; | ||
| /// Represents a read operation which may be cancelled prior to completion. | ||
| pub struct RawStreamRead<'a, O: StreamOps> { | ||
| op: WaitableOperation<StreamReadOp<'a, O>>, | ||
| } | ||
| struct StreamReadOp<'a, T: 'static>(marker::PhantomData<(&'a mut StreamReader<T>, T)>); | ||
| struct StreamReadOp<'a, O: StreamOps> { | ||
| reader: &'a mut RawStreamReader<O>, | ||
| } | ||
| unsafe impl<'a, T> WaitableOp for StreamReadOp<'a, T> | ||
| where | ||
| T: 'static, | ||
| { | ||
| type Start = (&'a mut StreamReader<T>, Vec<T>); | ||
| type InProgress = (&'a mut StreamReader<T>, Vec<T>, Option<Cleanup>); | ||
| type Result = (StreamResult, Vec<T>); | ||
| type Cancel = (StreamResult, Vec<T>); | ||
| unsafe impl<'a, O: StreamOps> WaitableOp for StreamReadOp<'a, O> { | ||
| type Start = Vec<O::Payload>; | ||
| type InProgress = (Vec<O::Payload>, Option<Cleanup>); | ||
| type Result = (StreamResult, Vec<O::Payload>); | ||
| type Cancel = (StreamResult, Vec<O::Payload>); | ||
| fn start(&mut self, (reader, mut buf): Self::Start) -> (u32, Self::InProgress) { | ||
| if reader.done { | ||
| return (DROPPED, (reader, buf, None)); | ||
| fn start(&mut self, mut buf: Self::Start) -> (u32, Self::InProgress) { | ||
| if self.reader.done { | ||
| return (DROPPED, (buf, None)); | ||
| } | ||
@@ -505,25 +637,28 @@ | ||
| // raw capacity in `buf` itself. | ||
| if reader.vtable.lift.is_some() { | ||
| let layout = Layout::from_size_align( | ||
| reader.vtable.layout.size() * cap.len(), | ||
| reader.vtable.layout.align(), | ||
| ) | ||
| .unwrap(); | ||
| (ptr, cleanup) = Cleanup::new(layout); | ||
| } else { | ||
| if self.reader.ops.native_abi_matches_canonical_abi() { | ||
| ptr = cap.as_mut_ptr().cast(); | ||
| cleanup = None; | ||
| } else { | ||
| let elem_layout = self.reader.ops.elem_layout(); | ||
| let layout = | ||
| Layout::from_size_align(elem_layout.size() * cap.len(), elem_layout.align()) | ||
| .unwrap(); | ||
| (ptr, cleanup) = Cleanup::new(layout); | ||
| } | ||
| // SAFETY: `ptr` is either in `buf` or in `cleanup`, both of which will | ||
| // persist with this async operation itself. | ||
| let code = unsafe { (reader.vtable.start_read)(reader.handle(), ptr, cap.len()) }; | ||
| let code = unsafe { | ||
| self.reader | ||
| .ops | ||
| .start_read(self.reader.handle(), ptr, cap.len()) | ||
| }; | ||
| rtdebug!( | ||
| "stream.read({}, {ptr:?}, {}) = {code:#x}", | ||
| reader.handle(), | ||
| self.reader.handle(), | ||
| cap.len() | ||
| ); | ||
| (code, (reader, buf, cleanup)) | ||
| (code, (buf, cleanup)) | ||
| } | ||
| fn start_cancelled(&mut self, (_, buf): Self::Start) -> Self::Cancel { | ||
| fn start_cancelled(&mut self, buf: Self::Start) -> Self::Cancel { | ||
| (StreamResult::Cancelled, buf) | ||
@@ -534,7 +669,7 @@ } | ||
| &mut self, | ||
| (reader, mut buf, cleanup): Self::InProgress, | ||
| (mut buf, cleanup): Self::InProgress, | ||
| code: u32, | ||
| ) -> Result<Self::Result, Self::InProgress> { | ||
| match ReturnCode::decode(code) { | ||
| ReturnCode::Blocked => Err((reader, buf, cleanup)), | ||
| ReturnCode::Blocked => Err((buf, cleanup)), | ||
@@ -558,22 +693,22 @@ // Note that the `cleanup`, if any, is discarded here. | ||
| match reader.vtable.lift { | ||
| if self.reader.ops.native_abi_matches_canonical_abi() { | ||
| // If no `lift` was necessary, then the results of this operation | ||
| // were read directly into `buf`, so just update its length now that | ||
| // values have been initialized. | ||
| unsafe { | ||
| buf.set_len(cur_len + amt); | ||
| } | ||
| } else { | ||
| // With a `lift` operation this now requires reading `amt` items | ||
| // from `cleanup` and pushing them into `buf`. | ||
| Some(lift) => { | ||
| let mut ptr = cleanup | ||
| .as_ref() | ||
| .map(|c| c.ptr.as_ptr()) | ||
| .unwrap_or(ptr::null_mut()); | ||
| for _ in 0..amt { | ||
| unsafe { | ||
| buf.push(lift(ptr)); | ||
| ptr = ptr.add(reader.vtable.layout.size()); | ||
| } | ||
| let mut ptr = cleanup | ||
| .as_ref() | ||
| .map(|c| c.ptr.as_ptr()) | ||
| .unwrap_or(ptr::null_mut()); | ||
| for _ in 0..amt { | ||
| unsafe { | ||
| buf.push(self.reader.ops.lift(ptr)); | ||
| ptr = ptr.add(self.reader.ops.elem_layout().size()); | ||
| } | ||
| } | ||
| // If no `lift` was necessary, then the results of this operation | ||
| // were read directly into `buf`, so just update its length now that | ||
| // values have been initialized. | ||
| None => unsafe { buf.set_len(cur_len + amt) }, | ||
| } | ||
@@ -585,3 +720,3 @@ | ||
| if let ReturnCode::Dropped(_) = code { | ||
| reader.done = true; | ||
| self.reader.done = true; | ||
| } | ||
@@ -593,11 +728,11 @@ Ok((StreamResult::Complete(amt), buf)) | ||
| fn in_progress_waitable(&mut self, (reader, ..): &Self::InProgress) -> u32 { | ||
| reader.handle() | ||
| fn in_progress_waitable(&mut self, _: &Self::InProgress) -> u32 { | ||
| self.reader.handle() | ||
| } | ||
| fn in_progress_cancel(&mut self, (reader, ..): &mut Self::InProgress) -> u32 { | ||
| fn in_progress_cancel(&mut self, _: &mut Self::InProgress) -> u32 { | ||
| // SAFETY: we're managing `reader` and all the various operational bits, | ||
| // so this relies on `WaitableOperation` being safe. | ||
| let code = unsafe { (reader.vtable.cancel_read)(reader.handle()) }; | ||
| rtdebug!("stream.cancel-read({}) = {code:#x}", reader.handle()); | ||
| let code = unsafe { self.reader.ops.cancel_read(self.reader.handle()) }; | ||
| rtdebug!("stream.cancel-read({}) = {code:#x}", self.reader.handle()); | ||
| code | ||
@@ -611,4 +746,4 @@ } | ||
| impl<T: 'static> Future for StreamRead<'_, T> { | ||
| type Output = (StreamResult, Vec<T>); | ||
| impl<O: StreamOps> Future for RawStreamRead<'_, O> { | ||
| type Output = (StreamResult, Vec<O::Payload>); | ||
@@ -620,4 +755,7 @@ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| impl<'a, T> StreamRead<'a, T> { | ||
| fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, T>>> { | ||
| impl<'a, O> RawStreamRead<'a, O> | ||
| where | ||
| O: StreamOps, | ||
| { | ||
| fn pin_project(self: Pin<&mut Self>) -> Pin<&mut WaitableOperation<StreamReadOp<'a, O>>> { | ||
| // SAFETY: we've chosen that when `Self` is pinned that it translates to | ||
@@ -641,5 +779,5 @@ // always pinning the inner field, so that's codified here. | ||
| /// or if this method is called twice. | ||
| pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<T>) { | ||
| pub fn cancel(self: Pin<&mut Self>) -> (StreamResult, Vec<O::Payload>) { | ||
| self.pin_project().cancel() | ||
| } | ||
| } |
@@ -276,19 +276,10 @@ //! Bindings used to manage subtasks, or invocations of imported functions. | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn drop(_: u32) { | ||
| unreachable!() | ||
| extern_wasm! { | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[subtask-cancel]"] | ||
| fn cancel(handle: u32) -> u32; | ||
| #[link_name = "[subtask-drop]"] | ||
| fn drop(handle: u32); | ||
| } | ||
| } | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn cancel(_: u32) -> u32 { | ||
| unreachable!() | ||
| } | ||
| #[cfg(target_arch = "wasm32")] | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[subtask-cancel]"] | ||
| fn cancel(handle: u32) -> u32; | ||
| #[link_name = "[subtask-drop]"] | ||
| fn drop(handle: u32); | ||
| } |
@@ -20,3 +20,3 @@ //! Low-level FFI-like bindings around `waitable-set` in the canonical ABI. | ||
| pub fn remove_waitable_from_all_sets(waitable: u32) { | ||
| rtdebug!("waitable-set.join({waitable}, 0)"); | ||
| rtdebug!("waitable.join({waitable}, 0)"); | ||
| unsafe { join(waitable, 0) } | ||
@@ -28,2 +28,3 @@ } | ||
| let mut payload = [0; 2]; | ||
| rtdebug!("waitable-set.wait({}) = ...", self.0.get()); | ||
| let event0 = wait(self.0.get(), &mut payload); | ||
@@ -43,2 +44,3 @@ rtdebug!( | ||
| let mut payload = [0; 2]; | ||
| rtdebug!("waitable-set.poll({}) = ...", self.0.get()); | ||
| let event0 = poll(self.0.get(), &mut payload); | ||
@@ -69,36 +71,16 @@ rtdebug!( | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn new() -> u32 { | ||
| unreachable!() | ||
| extern_wasm! { | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[waitable-set-new]"] | ||
| fn new() -> u32; | ||
| #[link_name = "[waitable-set-drop]"] | ||
| fn drop(set: u32); | ||
| #[link_name = "[waitable-join]"] | ||
| fn join(waitable: u32, set: u32); | ||
| #[link_name = "[waitable-set-wait]"] | ||
| fn wait(_: u32, _: *mut [u32; 2]) -> u32; | ||
| #[link_name = "[waitable-set-poll]"] | ||
| fn poll(_: u32, _: *mut [u32; 2]) -> u32; | ||
| } | ||
| } | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn drop(_: u32) { | ||
| unreachable!() | ||
| } | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn join(_: u32, _: u32) { | ||
| unreachable!() | ||
| } | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn wait(_: u32, _: *mut [u32; 2]) -> u32 { | ||
| unreachable!(); | ||
| } | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| unsafe fn poll(_: u32, _: *mut [u32; 2]) -> u32 { | ||
| unreachable!(); | ||
| } | ||
| #[cfg(target_arch = "wasm32")] | ||
| #[link(wasm_import_module = "$root")] | ||
| extern "C" { | ||
| #[link_name = "[waitable-set-new]"] | ||
| fn new() -> u32; | ||
| #[link_name = "[waitable-set-drop]"] | ||
| fn drop(set: u32); | ||
| #[link_name = "[waitable-join]"] | ||
| fn join(waitable: u32, set: u32); | ||
| #[link_name = "[waitable-set-wait]"] | ||
| fn wait(_: u32, _: *mut [u32; 2]) -> u32; | ||
| #[link_name = "[waitable-set-poll]"] | ||
| fn poll(_: u32, _: *mut [u32; 2]) -> u32; | ||
| } |
@@ -5,7 +5,7 @@ // This file is generated by ./ci/rebuild-libwit-bindgen-cabi.sh | ||
| extern void *cabi_realloc_wit_bindgen_0_48_1(void *ptr, size_t old_size, size_t align, size_t new_size); | ||
| extern void *cabi_realloc_wit_bindgen_0_49_0(void *ptr, size_t old_size, size_t align, size_t new_size); | ||
| __attribute__((__weak__, __export_name__("cabi_realloc"))) | ||
| void *cabi_realloc(void *ptr, size_t old_size, size_t align, size_t new_size) { | ||
| return cabi_realloc_wit_bindgen_0_48_1(ptr, old_size, align, new_size); | ||
| return cabi_realloc_wit_bindgen_0_49_0(ptr, old_size, align, new_size); | ||
| } |
| // This file is generated by ./ci/rebuild-libwit-bindgen-cabi.sh | ||
| #[unsafe(no_mangle)] | ||
| pub unsafe extern "C" fn cabi_realloc_wit_bindgen_0_48_1( | ||
| pub unsafe extern "C" fn cabi_realloc_wit_bindgen_0_49_0( | ||
| old_ptr: *mut u8, | ||
@@ -6,0 +6,0 @@ old_len: usize, |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet