Make everything async

Signed-off-by: Marcel Müller <neikos@neikos.email>
This commit is contained in:
Marcel Müller 2025-11-04 09:42:03 +01:00
parent 9ff1cf1bac
commit f3f816acd9
5 changed files with 497 additions and 36 deletions

View file

@ -13,5 +13,8 @@ categories = []
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1"
[dev-dependencies]
macro_rules_attribute = "0.2"
smol-macros = "0.1"

View file

@ -4,6 +4,8 @@
use std::any::Any;
use std::any::TypeId;
pub use anyhow;
pub trait Message: Send + Any {
type Reply: Send + Any;
}
@ -16,28 +18,44 @@ impl<M: Message> MessageIdentifier for M {
const IDENT: BundleChain = BundleChain::of::<M>();
}
pub struct InternalMessage(Box<dyn std::any::Any>);
pub struct InternalMessage {
value: Box<dyn std::any::Any>,
name: &'static str,
}
impl InternalMessage {
pub fn new(message: impl Message) -> InternalMessage {
InternalMessage(Box::new(message))
pub fn new<M: Any>(message: M) -> InternalMessage {
InternalMessage {
value: Box::new(message),
name: std::any::type_name::<M>(),
}
}
pub fn into_inner<M: Message>(self) -> Result<M, InternalMessage> {
self.0.downcast().map(|v| *v).map_err(InternalMessage)
pub fn into_inner<M: Any>(self) -> Result<M, InternalMessage> {
self.value
.downcast()
.map(|v| *v)
.map_err(|value| InternalMessage {
value,
name: self.name,
})
}
pub fn type_name(&self) -> &'static str {
self.name
}
}
pub trait Address<MB>
where
MB: MessageBundle,
{
fn send<M: Message>(&mut self, message: M);
pub trait Address<MB> {
fn send<M: Message>(&mut self, message: M) -> impl Future<Output = anyhow::Result<M::Reply>>;
}
pub trait InternalMessageHandler {
type HandledMessages: MessageBundle;
fn handle_message(&mut self, msg: InternalMessage);
fn handle_message(
&mut self,
msg: InternalMessage,
) -> impl Future<Output = anyhow::Result<InternalMessage>>;
}
impl<MB, IMH> Address<MB> for IMH
@ -45,14 +63,26 @@ where
MB: MessageBundle,
IMH: InternalMessageHandler<HandledMessages = MB>,
{
fn send<M: Message>(&mut self, message: M) {
fn send<M: Message>(&mut self, message: M) -> impl Future<Output = anyhow::Result<M::Reply>> {
const {
let true = <M as IsContainedInBundle<MB>>::IS_CONTAINED else {
panic!("Message is not contained in MessageBundle",);
};
}
self.handle_message(InternalMessage(Box::new(message)));
let message = self.handle_message(InternalMessage::new(message));
async {
message.await.and_then(|msg| {
msg.into_inner::<M::Reply>().map_err(|e| {
anyhow::anyhow!(
"Expected a {}, but got a {}",
std::any::type_name::<M::Reply>(),
e.type_name()
)
})
})
}
}
}
@ -173,6 +203,9 @@ const fn remove_from_chain(prev: &'static BundleChain, to_remove: TypeId) -> Bun
#[cfg(test)]
mod tests {
use macro_rules_attribute::apply;
use smol_macros::test;
use super::*;
struct Foo;
@ -218,18 +251,23 @@ mod tests {
));
}
#[test]
fn check_sending_messages() {
#[apply(test!)]
async fn check_sending_messages() {
struct Sender;
impl InternalMessageHandler for Sender {
type HandledMessages = (Foo, Bar);
fn handle_message(&mut self, _msg: InternalMessage) {}
async fn handle_message(
&mut self,
_msg: InternalMessage,
) -> anyhow::Result<InternalMessage> {
Err(anyhow::anyhow!("Not implemented!"))
}
}
let mut s = Sender;
s.send(Foo);
s.send(Foo).await.unwrap_err();
}
}

View file

@ -7,6 +7,9 @@ authors.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures = "0.3.31"
tytix-core = { path = "../tytix-core/" }
[dev-dependencies]
macro_rules_attribute = "0.2.2"
smol-macros = "0.1.1"

View file

@ -1,26 +1,31 @@
use std::marker::PhantomData;
use futures::FutureExt;
use tytix_core::Address;
use tytix_core::InternalMessage;
use tytix_core::InternalMessageHandler;
use tytix_core::IsContainedInBundle;
use tytix_core::Message;
use tytix_core::MessageBundle;
use tytix_core::anyhow;
pub trait AddressExt<MB> {
fn map_before<F, M, R>(self, f: F) -> MappedBeforeAddress<Self, F, M, R>
fn map_message<F, M, R, U>(self, f: F) -> MappedBeforeAddress<Self, F, M, R>
where
MB: MessageBundle,
F: Fn(M) -> R + 'static,
F: Fn(M) -> U + 'static,
U: Future<Output = R>,
M: Message,
R: Message + IsContainedInBundle<MB>,
Self: Sized;
}
impl<MB: MessageBundle, A: Address<MB>> AddressExt<MB> for A {
fn map_before<F, M, R>(self, f: F) -> MappedBeforeAddress<Self, F, M, R>
fn map_message<F, M, R, U>(self, f: F) -> MappedBeforeAddress<Self, F, M, R>
where
MB: MessageBundle,
F: Fn(M) -> R + 'static,
F: Fn(M) -> U + 'static,
U: Future<Output = R>,
M: Message,
R: Message + IsContainedInBundle<MB>,
Self: Sized,
@ -58,23 +63,32 @@ where
const IDS: tytix_core::BundleChain = MB::IDS.without::<R>().with::<M>();
}
impl<A, F, M, R> InternalMessageHandler for MappedBeforeAddress<A, F, M, R>
impl<A, F, M, R, U> InternalMessageHandler for MappedBeforeAddress<A, F, M, R>
where
A: InternalMessageHandler,
M: Message,
R: Message + IsContainedInBundle<A::HandledMessages>,
F: FnMut(M) -> R,
F: FnMut(M) -> U,
U: Future<Output = R>,
{
type HandledMessages = MappedHandledMessages<A::HandledMessages, M, R>;
fn handle_message(&mut self, msg: tytix_core::InternalMessage) {
match msg.into_inner::<M>() {
Ok(removed_message) => {
let new_message = (self.func)(removed_message);
self.address.send(new_message);
fn handle_message(
&mut self,
msg: tytix_core::InternalMessage,
) -> impl Future<Output = anyhow::Result<InternalMessage>> {
let msg = match msg.into_inner::<M>() {
Ok(incoming) => {
let map_fut = (self.func)(incoming);
async { InternalMessage::new(map_fut.await) }.left_future()
}
Err(other_message) => self.address.handle_message(other_message),
Err(other) => async { other }.right_future(),
};
async {
let msg = msg.await;
self.address.handle_message(msg).await
}
}
}
@ -83,6 +97,9 @@ where
mod tests {
use std::sync::OnceLock;
use macro_rules_attribute::apply;
use smol_macros::test;
use super::*;
struct Foo;
@ -102,21 +119,26 @@ mod tests {
impl InternalMessageHandler for SimpleAddress {
type HandledMessages = (Foo,);
fn handle_message(&mut self, msg: tytix_core::InternalMessage) {
fn handle_message(
&mut self,
msg: tytix_core::InternalMessage,
) -> impl Future<Output = anyhow::Result<InternalMessage>> {
drop(msg);
async { Ok(InternalMessage::new(())) }
}
}
#[test]
fn check_mapping() {
#[apply(test!)]
async fn check_mapping() {
static MSG: OnceLock<bool> = OnceLock::new();
let mut sa = SimpleAddress.map_before(|_b: Bar| {
let mut sa = SimpleAddress.map_message(|_b: Bar| {
let _ = MSG.set(true);
Foo
async { Foo }
});
sa.send(Bar);
sa.send(Bar).await.unwrap();
MSG.get().expect("The message was mapped!");
}