From fe9ce29ac082397c01775d65e1fdd8cfa253e332 Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Tue, 16 Jul 2024 16:37:16 +0800 Subject: [PATCH 01/10] Macro-ify StreamPack Signed-off-by: Luca Della Vedova --- src/buffer/bufferable.rs | 45 +++--- src/buffer/buffered.rs | 140 +++++++---------- src/stream.rs | 327 ++++++++++++++++++--------------------- 3 files changed, 218 insertions(+), 294 deletions(-) diff --git a/src/buffer/bufferable.rs b/src/buffer/bufferable.rs index 7564e752..deca6e44 100644 --- a/src/buffer/bufferable.rs +++ b/src/buffer/bufferable.rs @@ -15,6 +15,7 @@ * */ +use bevy::utils::all_tuples; use smallvec::SmallVec; use crate::{ @@ -76,36 +77,26 @@ impl Bufferable for Output { } } -impl Bufferable for (T0, T1) -where - T0: Bufferable, - T1: Bufferable, -{ - type BufferType = (T0::BufferType, T1::BufferType); - fn as_buffer(self, builder: &mut Builder) -> Self::BufferType { - ( - self.0.as_buffer(builder), - self.1.as_buffer(builder), - ) - } -} - -impl Bufferable for (T0, T1, T2) -where - T0: Bufferable, - T1: Bufferable, - T2: Bufferable, -{ - type BufferType = (T0::BufferType, T1::BufferType, T2::BufferType); - fn as_buffer(self, builder: &mut Builder) -> Self::BufferType { - ( - self.0.as_buffer(builder), - self.1.as_buffer(builder), - self.2.as_buffer(builder), - ) +macro_rules! impl_bufferable_for_tuple { + ($($T:ident),*) => { + #[allow(non_snake_case)] + impl<$($T: Bufferable),*> Bufferable for ($($T,)*) + { + type BufferType = ($($T::BufferType,)*); + fn as_buffer(self, builder: &mut Builder) -> Self::BufferType { + let ($($T,)*) = self; + ($( + $T.as_buffer(builder), + )*) + } + + } } } +// Implements the `Bufferable` trait for all tuples between size 2 and 15 +// (inclusive) made of types that implement `Bufferable` +all_tuples!(impl_bufferable_for_tuple, 2, 15, T); impl Bufferable for [T; N] { type BufferType = [T::BufferType; N]; diff --git a/src/buffer/buffered.rs b/src/buffer/buffered.rs index 06d5e4c4..e8a22606 100644 --- a/src/buffer/buffered.rs +++ b/src/buffer/buffered.rs @@ -16,6 +16,7 @@ */ use bevy::prelude::{Entity, World}; +use bevy::utils::all_tuples; use smallvec::SmallVec; @@ -121,100 +122,63 @@ impl Buffered for CloneFromBuffer { } } -impl Buffered for (T0, T1) -where - T0: Buffered, - T1: Buffered, -{ - fn buffered_count( - &self, - session: Entity, - world: &World, - ) -> Result { - Ok([ - self.0.buffered_count(session, world)?, - self.1.buffered_count(session, world)?, - ].iter().copied().min().unwrap_or(0)) - } +macro_rules! impl_buffered_for_tuple { + ($($T:ident),*) => { + #[allow(non_snake_case)] + impl<$($T: Buffered),*> Buffered for ($($T,)*) + { + fn buffered_count( + &self, + session: Entity, + world: &World, + ) -> Result { + let ($($T,)*) = self; + Ok([ + $( + $T.buffered_count(session, world)?, + )* + ].iter().copied().min().unwrap_or(0)) + } - type Item = (T0::Item, T1::Item); - fn pull( - &self, - session: Entity, - world: &mut World, - ) -> Result { - let t0 = self.0.pull(session, world)?; - let t1 = self.1.pull(session, world)?; - Ok((t0, t1)) - } + type Item = ($($T::Item),*); + fn pull( + &self, + session: Entity, + world: &mut World, + ) -> Result { + let ($($T,)*) = self; + Ok(($( + $T.pull(session, world)?, + )*)) + } - fn listen( - &self, - listener: Entity, - world: &mut World, - ) -> OperationResult { - self.0.listen(listener, world)?; - self.1.listen(listener, world)?; - Ok(()) - } + fn listen( + &self, + listener: Entity, + world: &mut World, + ) -> OperationResult { + let ($($T,)*) = self; + $( + $T.listen(listener, world)?; + )* + Ok(()) + } - fn as_input(&self) -> SmallVec<[Entity; 8]> { - let mut inputs = SmallVec::new(); - inputs.extend(self.0.as_input()); - inputs.extend(self.1.as_input()); - inputs + fn as_input(&self) -> SmallVec<[Entity; 8]> { + let mut inputs = SmallVec::new(); + let ($($T,)*) = self; + $( + inputs.extend($T.as_input()); + )* + inputs + } + } } } -impl Buffered for (T0, T1, T2) -where - T0: Buffered, - T1: Buffered, - T2: Buffered, -{ - fn buffered_count( - &self, - session: Entity, - world: &World, - ) -> Result { - Ok([ - self.0.buffered_count(session, world)?, - self.1.buffered_count(session, world)?, - self.2.buffered_count(session, world)?, - ].iter().copied().min().unwrap_or(0)) - } - - type Item = (T0::Item, T1::Item, T2::Item); - fn pull( - &self, - session: Entity, - world: &mut World, - ) -> Result { - let t0 = self.0.pull(session, world)?; - let t1 = self.1.pull(session, world)?; - let t2 = self.2.pull(session, world)?; - Ok((t0, t1, t2)) - } - - fn listen( - &self, - listener: Entity, - world: &mut World, - ) -> OperationResult { - self.0.listen(listener, world)?; - self.1.listen(listener, world)?; - self.2.listen(listener, world)?; - Ok(()) - } - - fn as_input(&self) -> SmallVec<[Entity; 8]> { - let mut inputs = SmallVec::new(); - inputs.extend(self.0.as_input()); - inputs.extend(self.1.as_input()); - inputs.extend(self.2.as_input()); - inputs - } -} +// Implements the `Buffered` trait for all tuples between size 2 and 15 +// (inclusive) made of types that implement `Buffered` +all_tuples!(impl_buffered_for_tuple, 2, 15, T); impl Buffered for [T; N] { fn buffered_count( diff --git a/src/stream.rs b/src/stream.rs index 1d0b9787..b93d866b 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -16,6 +16,7 @@ */ use bevy::prelude::{Component, Bundle, Entity, Commands, World, BuildChildren}; +use bevy::utils::all_tuples; use crossbeam::channel::{Receiver, unbounded}; @@ -518,186 +519,154 @@ impl StreamPack for (T1,) { } } -impl StreamPack for (T1, T2) { - type StreamAvailableBundle = (T1::StreamAvailableBundle, T2::StreamAvailableBundle); - type StreamStorageBundle = (T1::StreamStorageBundle, T2::StreamStorageBundle); - type StreamInputPack = (T1::StreamInputPack, T2::StreamInputPack); - type StreamOutputPack = (T1::StreamOutputPack, T2::StreamOutputPack); - type Receiver = (T1::Receiver, T2::Receiver); - type Channel = (T1::Channel, T2::Channel); - type Buffer = (T1::Buffer, T2::Buffer); - fn spawn_scope_streams( - in_scope: Entity, - out_scope: Entity, - commands: &mut Commands, - ) -> ( - Self::StreamInputPack, - Self::StreamOutputPack, - ) { - let t1 = T1::spawn_scope_streams(in_scope, out_scope, commands); - let t2 = T2::spawn_scope_streams(in_scope, out_scope, commands); - ((t1.0, t2.0), (t1.1, t2.1)) - } - - fn spawn_workflow_streams(builder: &mut Builder) -> Self::StreamInputPack { - let t1 = T1::spawn_workflow_streams(builder); - let t2 = T2::spawn_workflow_streams(builder); - (t1, t2) - } - - fn spawn_node_streams( - map: &mut StreamTargetMap, - builder: &mut Builder, - ) -> ( - Self::StreamStorageBundle, - Self::StreamOutputPack, - ) { - let t1 = T1::spawn_node_streams(map, builder); - let t2 = T2::spawn_node_streams(map, builder); - ((t1.0, t2.0), (t1.1, t2.1)) - } - - fn take_streams(source: Entity, map: &mut StreamTargetMap, builder: &mut Commands) -> ( - Self::StreamStorageBundle, - Self::Receiver, - ) { - let t1 = T1::take_streams(source, map, builder); - let t2 = T2::take_streams(source, map, builder); - ((t1.0, t2.0), (t1.1, t2.1)) - } - - fn collect_streams( - source: Entity, - target: Entity, - map: &mut StreamTargetMap, - commands: &mut Commands, - ) -> Self::StreamStorageBundle { - let t1 = T1::collect_streams(source, target, map, commands); - let t2 = T2::collect_streams(source, target, map, commands); - (t1, t2) - } - - fn make_channel( - inner: &Arc, - world: &World, - ) -> Self::Channel { - let t1 = T1::make_channel(inner, world); - let t2 = T2::make_channel(inner, world); - (t1, t2) - } - - fn make_buffer(source: Entity, world: &World) -> Self::Buffer { - let t1 = T1::make_buffer(source, world); - let t2 = T2::make_buffer(source, world); - (t1, t2) - } - - fn process_buffer( - buffer: Self::Buffer, - source: Entity, - session: Entity, - world: &mut World, - roster: &mut OperationRoster, - ) -> OperationResult { - T1::process_buffer(buffer.0, source, session, world, roster)?; - T2::process_buffer(buffer.1, source, session, world, roster)?; - Ok(()) +macro_rules! impl_streampack_for_tuple { + ($($T:ident),*) => { + #[allow(non_snake_case)] + impl<$($T: StreamPack),*> StreamPack for ($($T,)*) { + type StreamAvailableBundle = ($($T::StreamAvailableBundle,)*); + type StreamStorageBundle = ($($T::StreamStorageBundle,)*); + type StreamInputPack = ($($T::StreamInputPack,)*); + type StreamOutputPack = ($($T::StreamOutputPack,)*); + type Receiver = ($($T::Receiver,)*); + type Channel = ($($T::Channel,)*); + type Buffer = ($($T::Buffer,)*); + + fn spawn_scope_streams( + in_scope: Entity, + out_scope: Entity, + commands: &mut Commands, + ) -> ( + Self::StreamInputPack, + Self::StreamOutputPack, + ) { + let ($($T,)*) = ( + $( + $T::spawn_scope_streams(in_scope, out_scope, commands), + )* + ); + // Now unpack the tuples + ( + ( + $( + $T.0, + )* + ), + ( + $( + $T.1, + )* + ) + ) + } + + fn spawn_workflow_streams(builder: &mut Builder) -> Self::StreamInputPack { + ( + $($T::spawn_workflow_streams(builder), + )* + ) + } + + fn spawn_node_streams( + map: &mut StreamTargetMap, + builder: &mut Builder, + ) -> ( + Self::StreamStorageBundle, + Self::StreamOutputPack, + ) { + let ($($T,)*) = ( + $( + $T::spawn_node_streams(map, builder), + )* + ); + // Now unpack the tuples + ( + ( + $( + $T.0, + )* + ), + ( + $( + $T.1, + )* + ) + ) + } + + fn take_streams(source: Entity, map: &mut StreamTargetMap, builder: &mut Commands) -> ( + Self::StreamStorageBundle, + Self::Receiver, + ) { + let ($($T,)*) = ( + $( + $T::take_streams(source, map, builder), + )* + ); + // Now unpack the tuples + ( + ( + $( + $T.0, + )* + ), + ( + $( + $T.1, + )* + ) + ) + } + + fn collect_streams( + source: Entity, + target: Entity, + map: &mut StreamTargetMap, + commands: &mut Commands, + ) -> Self::StreamStorageBundle { + ( + $( + $T::collect_streams(source, target, map, commands), + )* + ) + } + + fn make_channel( + inner: &Arc, + world: &World, + ) -> Self::Channel { + ( + $( + $T::make_channel(inner, world), + )* + ) + } + + fn make_buffer(source: Entity, world: &World) -> Self::Buffer { + ( + $( + $T::make_buffer(source, world), + )* + ) + } + + fn process_buffer( + buffer: Self::Buffer, + source: Entity, + session: Entity, + world: &mut World, + roster: &mut OperationRoster, + ) -> OperationResult { + let ($($T,)*) = buffer; + $( + $T::process_buffer($T, source, session, world, roster)?; + )* + Ok(()) + } + } } } -impl StreamPack for (T1, T2, T3) { - type StreamAvailableBundle = (T1::StreamAvailableBundle, T2::StreamAvailableBundle, T3::StreamAvailableBundle); - type StreamStorageBundle = (T1::StreamStorageBundle, T2::StreamStorageBundle, T3::StreamStorageBundle); - type StreamInputPack = (T1::StreamInputPack, T2::StreamInputPack, T3::StreamInputPack); - type StreamOutputPack = (T1::StreamOutputPack, T2::StreamOutputPack, T3::StreamOutputPack); - type Receiver = (T1::Receiver, T2::Receiver, T3::Receiver); - type Channel = (T1::Channel, T2::Channel, T3::Channel); - type Buffer = (T1::Buffer, T2::Buffer, T3::Buffer); - - fn spawn_scope_streams( - in_scope: Entity, - out_scope: Entity, - commands: &mut Commands, - ) -> ( - Self::StreamInputPack, - Self::StreamOutputPack, - ) { - let t1 = T1::spawn_scope_streams(in_scope, out_scope, commands); - let t2 = T2::spawn_scope_streams(in_scope, out_scope, commands); - let t3 = T3::spawn_scope_streams(in_scope, out_scope, commands); - ((t1.0, t2.0, t3.0), (t1.1, t2.1, t3.1)) - } - - fn spawn_workflow_streams(builder: &mut Builder) -> Self::StreamInputPack { - let t1 = T1::spawn_workflow_streams(builder); - let t2 = T2::spawn_workflow_streams(builder); - let t3 = T3::spawn_workflow_streams(builder); - (t1, t2, t3) - } - - fn spawn_node_streams( - map: &mut StreamTargetMap, - builder: &mut Builder, - ) -> ( - Self::StreamStorageBundle, - Self::StreamOutputPack, - ) { - let t1 = T1::spawn_node_streams(map, builder); - let t2 = T2::spawn_node_streams(map, builder); - let t3 = T3::spawn_node_streams(map, builder); - ((t1.0, t2.0, t3.0), (t1.1, t2.1, t3.1)) - } - - fn take_streams(source: Entity, map: &mut StreamTargetMap, builder: &mut Commands) -> ( - Self::StreamStorageBundle, - Self::Receiver, - ) { - let t1 = T1::take_streams(source, map, builder); - let t2 = T2::take_streams(source, map, builder); - let t3 = T3::take_streams(source, map, builder); - ((t1.0, t2.0, t3.0), (t1.1, t2.1, t3.1)) - } - - fn collect_streams( - source: Entity, - target: Entity, - map: &mut StreamTargetMap, - commands: &mut Commands, - ) -> Self::StreamStorageBundle { - let t1 = T1::collect_streams(source, target, map, commands); - let t2 = T2::collect_streams(source, target, map, commands); - let t3 = T3::collect_streams(source, target, map, commands); - (t1, t2, t3) - } - - fn make_channel( - inner: &Arc, - world: &World, - ) -> Self::Channel { - let t1 = T1::make_channel(inner, world); - let t2 = T2::make_channel(inner, world); - let t3 = T3::make_channel(inner, world); - (t1, t2, t3) - } - - fn make_buffer(source: Entity, world: &World) -> Self::Buffer { - let t1 = T1::make_buffer(source, world); - let t2 = T2::make_buffer(source, world); - let t3 = T3::make_buffer(source, world); - (t1, t2, t3) - } - - fn process_buffer( - buffer: Self::Buffer, - source: Entity, - session: Entity, - world: &mut World, - roster: &mut OperationRoster, - ) -> OperationResult { - T1::process_buffer(buffer.0, source, session, world, roster)?; - T2::process_buffer(buffer.1, source, session, world, roster)?; - T3::process_buffer(buffer.2, source, session, world, roster)?; - Ok(()) - } -} +// Implements the `StreamPack` trait for all tuples between size 2 and 12 +// (inclusive) made of types that implement `StreamPack` +all_tuples!(impl_streampack_for_tuple, 2, 12, T); From 441bc7cb149a4643b537773f7b8d049ffe70c9b0 Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Wed, 17 Jul 2024 14:07:47 +0800 Subject: [PATCH 02/10] Consistency for implementation depth Signed-off-by: Luca Della Vedova --- src/buffer/buffered.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/buffer/buffered.rs b/src/buffer/buffered.rs index e8a22606..e1eaa80e 100644 --- a/src/buffer/buffered.rs +++ b/src/buffer/buffered.rs @@ -176,9 +176,9 @@ macro_rules! impl_buffered_for_tuple { } } -// Implements the `Buffered` trait for all tuples between size 2 and 15 +// Implements the `Buffered` trait for all tuples between size 2 and 12 // (inclusive) made of types that implement `Buffered` -all_tuples!(impl_buffered_for_tuple, 2, 15, T); +all_tuples!(impl_buffered_for_tuple, 2, 12, T); impl Buffered for [T; N] { fn buffered_count( From bbfb1ad1c5b0b7112744305069614069670ae62b Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Wed, 17 Jul 2024 14:17:52 +0800 Subject: [PATCH 03/10] Fix logic error in has_streams function Signed-off-by: Luca Della Vedova --- src/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream.rs b/src/stream.rs index 1a4d74ac..74f34770 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -710,7 +710,7 @@ macro_rules! impl_streampack_for_tuple { } fn has_streams() -> bool { - let mut has_streams = true; + let mut has_streams = false; $( has_streams = has_streams || $T::has_streams(); )* From 507573bee5ff9dfbedc2189765ab1b1f1a0b26d5 Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Wed, 17 Jul 2024 14:27:35 +0800 Subject: [PATCH 04/10] Fix size for tuple impl Signed-off-by: Luca Della Vedova --- src/buffer/bufferable.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/buffer/bufferable.rs b/src/buffer/bufferable.rs index deca6e44..aa49c834 100644 --- a/src/buffer/bufferable.rs +++ b/src/buffer/bufferable.rs @@ -94,9 +94,9 @@ macro_rules! impl_bufferable_for_tuple { } } -// Implements the `Bufferable` trait for all tuples between size 2 and 15 +// Implements the `Bufferable` trait for all tuples between size 2 and 12 // (inclusive) made of types that implement `Bufferable` -all_tuples!(impl_bufferable_for_tuple, 2, 15, T); +all_tuples!(impl_bufferable_for_tuple, 2, 12, T); impl Bufferable for [T; N] { type BufferType = [T::BufferType; N]; From 5b09a590391a354bf435c5242bfe92a670e5856e Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Thu, 18 Jul 2024 15:09:05 +0800 Subject: [PATCH 05/10] Implement unzip Signed-off-by: Luca Della Vedova --- src/chain/unzip.rs | 276 ++++++++++++++------------------------------- 1 file changed, 86 insertions(+), 190 deletions(-) diff --git a/src/chain/unzip.rs b/src/chain/unzip.rs index 6a31453f..9806e463 100644 --- a/src/chain/unzip.rs +++ b/src/chain/unzip.rs @@ -15,7 +15,7 @@ * */ -use bevy::prelude::{Entity, Commands}; +use bevy::utils::all_tuples; use smallvec::SmallVec; @@ -29,174 +29,82 @@ pub trait Unzippable: Sized { type Unzipped; fn unzip_output(output: Output, builder: &mut Builder) -> Self::Unzipped; - fn make_targets(commands: &mut Commands) -> SmallVec<[Entity; 8]>; - fn distribute_values(request: OperationRequest) -> OperationResult; type Prepended; fn prepend(self, value: T) -> Self::Prepended; } -impl Unzippable for (A,) { - type Unzipped = Output; - fn unzip_output(output: Output, builder: &mut Builder) -> Self::Unzipped { - assert_eq!(output.scope(), builder.scope()); - let targets = Self::make_targets(builder.commands); - - let result = Output::new(builder.scope, targets[0]); - - builder.commands.add(AddOperation::new( - Some(output.scope()), - output.id(), - ForkUnzip::::new(ForkTargetStorage(targets)), - )); - result - } - - fn make_targets(commands: &mut Commands) -> SmallVec<[Entity; 8]> { - SmallVec::from_iter([commands.spawn(UnusedTarget).id()]) - } - - fn distribute_values( - OperationRequest { source, world, roster }: OperationRequest - ) -> OperationResult { - let Input { session, data: inputs } = world - .get_entity_mut(source).or_broken()? - .take_input::()?; - - let targets = world.get::(source).or_broken()?; - let target = (targets.0)[0]; - if let Some(mut t_mut) = world.get_entity_mut(target) { - t_mut.give_input(session, inputs.0, roster)?; - } - Ok(()) - } - - type Prepended = (T, A); - fn prepend(self, value: T) -> Self::Prepended { - (value, self.0) - } -} - -impl Unzippable for (A, B) { - type Unzipped = (Output, Output); - fn unzip_output(output: Output, builder: &mut Builder) -> Self::Unzipped { - assert_eq!(output.scope(), builder.scope()); - let targets = Self::make_targets(builder.commands); - - let result = ( - Output::new(builder.scope, targets[0]), - Output::new(builder.scope, targets[1]), - ); - - builder.commands.add(AddOperation::new( - Some(output.scope()), - output.id(), - ForkUnzip::::new(ForkTargetStorage(targets)), - )); - result - } - - fn make_targets(commands: &mut Commands) -> SmallVec<[Entity; 8]> { - SmallVec::from_iter([ - commands.spawn(UnusedTarget).id(), - commands.spawn(UnusedTarget).id(), - ]) - } - - fn distribute_values( - OperationRequest { source, world, roster }: OperationRequest, - ) -> OperationResult { - let Input { session, data: inputs } = world - .get_entity_mut(source).or_broken()? - .take_input::()?; - - let targets = world.get::(source).or_broken()?; - let target_0 = *targets.0.get(0).or_broken()?; - let target_1 = *targets.0.get(1).or_broken()?; - - if let Some(mut t_mut) = world.get_entity_mut(target_0) { - t_mut.give_input(session, inputs.0, roster)?; - } - - if let Some(mut t_mut) = world.get_entity_mut(target_1) { - t_mut.give_input(session, inputs.1, roster)?; +macro_rules! impl_unzippable_for_tuple { + ($($T:ident),*) => { + #[allow(non_snake_case)] + impl<$($T: 'static + Send + Sync),*> Unzippable for ($($T,)*) + { + type Unzipped = ($(Output<$T>,)*); + fn unzip_output(output: Output, builder: &mut Builder) -> Self::Unzipped { + assert_eq!(output.scope(), builder.scope()); + let mut targets = SmallVec::new(); + let result = + ( + $( + { + // Variable is only used to make sure this cycle is repeated once + // for each instance of the $T type, but the type itself is not + // used. + #[allow(unused)] + let $T = std::marker::PhantomData::<$T>; + let target = builder.commands.spawn(UnusedTarget).id(); + targets.push(target); + Output::new(builder.scope, target) + }, + )* + ); + + builder.commands.add(AddOperation::new( + Some(output.scope()), + output.id(), + ForkUnzip::::new(ForkTargetStorage(targets)), + )); + result + } + + fn distribute_values( + OperationRequest { source, world, roster }: OperationRequest, + ) -> OperationResult { + let Input { session, data: inputs } = world + .get_entity_mut(source).or_broken()? + .take_input::()?; + + // Targets is cloned to avoid borrow checker issues when + // doing a mutable borrow of the world later + let targets = world.get::(source).or_broken()?.clone(); + // The compiler throws a warning when implementing this for + // tuple sizes that wouldn't use the result of the first _idx = _idx + 1 + // so we add a leading underscore to suppress the warning + let mut _idx = 0; + let ($($T,)*) = inputs; + $( + let target = *targets.0.get(_idx).or_broken()?; + if let Some(mut t_mut) = world.get_entity_mut(target) { + t_mut.give_input(session, $T, roster)?; + } + _idx = _idx + 1; + )* + Ok(()) + } + + type Prepended = (T, $($T,)*); + fn prepend(self, value: T) -> Self::Prepended { + let ($($T,)*) = self; + (value, $($T,)*) + } } - - Ok(()) - } - - type Prepended = (T, A, B); - fn prepend(self, value: T) -> Self::Prepended { - (value, self.0, self.1) } } -impl Unzippable for (A, B, C) -where - A: 'static + Send + Sync, - B: 'static + Send + Sync, - C: 'static + Send + Sync, -{ - type Unzipped = (Output, Output, Output); - fn unzip_output(output: Output, builder: &mut Builder) -> Self::Unzipped { - assert_eq!(output.scope(), builder.scope()); - let targets = Self::make_targets(builder.commands); - - let result = ( - Output::new(builder.scope, targets[0]), - Output::new(builder.scope, targets[1]), - Output::new(builder.scope, targets[2]), - ); - - builder.commands.add(AddOperation::new( - Some(output.scope()), - output.id(), - ForkUnzip::::new(ForkTargetStorage(targets)), - )); - result - } - - fn make_targets(commands: &mut Commands) -> SmallVec<[Entity; 8]> { - SmallVec::from_iter([ - commands.spawn(UnusedTarget).id(), - commands.spawn(UnusedTarget).id(), - commands.spawn(UnusedTarget).id(), - ]) - } - - fn distribute_values( - OperationRequest { source, world, roster }: OperationRequest, - ) -> OperationResult { - let Input { session, data: inputs } = world - .get_entity_mut(source).or_broken()? - .take_input::()?; - - let targets = world.get::(source).or_broken()?; - let target_0 = *targets.0.get(0).or_broken()?; - let target_1 = *targets.0.get(1).or_broken()?; - let target_2 = *targets.0.get(2).or_broken()?; - - if let Some(mut t_mut) = world.get_entity_mut(target_0) { - t_mut.give_input(session, inputs.0, roster)?; - } - - if let Some(mut t_mut) = world.get_entity_mut(target_1) { - t_mut.give_input(session, inputs.1, roster)?; - } - - if let Some(mut t_mut) = world.get_entity_mut(target_2) { - t_mut.give_input(session, inputs.2, roster)?; - } - - Ok(()) - } - - type Prepended = (T, A, B, C); - fn prepend(self, value: T) -> Self::Prepended { - (value, self.0, self.1, self.2) - } -} +// Implements the `Unzippable` trait for all tuples between size 1 and 15 +// (inclusive) made of 'static lifetime types that are `Send` and `Sync` +all_tuples!(impl_unzippable_for_tuple, 1, 15, T); /// A trait for constructs that are able to perform a forking unzip of an /// unzippable chain. An unzippable chain is one whose response type contains a @@ -206,37 +114,25 @@ pub trait UnzipBuilder { fn unzip_build(self, output: Output, builder: &mut Builder) -> Self::ReturnType; } -impl UnzipBuilder<(A, B)> for (Fa, Fb) -where - A: 'static + Send + Sync, - B: 'static + Send + Sync, - Fa: FnOnce(Chain) -> Ua, - Fb: FnOnce(Chain) -> Ub, -{ - type ReturnType = (Ua, Ub); - fn unzip_build(self, output: Output<(A, B)>, builder: &mut Builder) -> Self::ReturnType { - let outputs = <(A, B)>::unzip_output(output, builder); - let u_a = (self.0)(outputs.0.chain(builder)); - let u_b = (self.1)(outputs.1.chain(builder)); - (u_a, u_b) +macro_rules! impl_unzipbuilder_for_tuple { + ($(($A:ident, $F:ident, $U:ident)),*) => { + #[allow(non_snake_case)] + impl<$($A: 'static + Send + Sync),*, $($F: FnOnce(Chain<$A>) -> $U),*, $($U),*> UnzipBuilder<($($A,)*)> for ($($F,)*) + { + type ReturnType = ($($U),*); + fn unzip_build(self, output: Output<($($A,)*)>, builder: &mut Builder) -> Self::ReturnType { + let outputs = <($($A),*)>::unzip_output(output, builder); + let ($($A,)*) = outputs; + let ($($F,)*) = self; + ( + $( + ($F)($A.chain(builder)), + )* + ) + } + } } } -impl UnzipBuilder<(A, B, C)> for (Fa, Fb, Fc) -where - A: 'static + Send + Sync, - B: 'static + Send + Sync, - C: 'static + Send + Sync, - Fa: FnOnce(Chain) -> Ua, - Fb: FnOnce(Chain) -> Ub, - Fc: FnOnce(Chain) -> Uc, -{ - type ReturnType = (Ua, Ub, Uc); - fn unzip_build(self, output: Output<(A, B, C)>, builder: &mut Builder) -> Self::ReturnType { - let outputs = <(A, B, C)>::unzip_output(output, builder); - let u_a = (self.0)(outputs.0.chain(builder)); - let u_b = (self.1)(outputs.1.chain(builder)); - let u_c = (self.2)(outputs.2.chain(builder)); - (u_a, u_b, u_c) - } -} +// Implements the `UnzipBuilder` trait for all tuples between size 1 and 15 +all_tuples!(impl_unzipbuilder_for_tuple, 2, 15, A, F, U); From af7b7688b1741d22a09dcf13dd60e7f774c106b5 Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Thu, 18 Jul 2024 15:46:53 +0800 Subject: [PATCH 06/10] Implement ForkCloneBuilder Signed-off-by: Luca Della Vedova --- src/chain/fork_clone_builder.rs | 98 ++++++++++++++------------------- 1 file changed, 40 insertions(+), 58 deletions(-) diff --git a/src/chain/fork_clone_builder.rs b/src/chain/fork_clone_builder.rs index dce6d767..ef3ec167 100644 --- a/src/chain/fork_clone_builder.rs +++ b/src/chain/fork_clone_builder.rs @@ -15,6 +15,10 @@ * */ +use bevy::prelude::Entity; +use bevy::utils::all_tuples; +use smallvec::SmallVec; + use crate::{ Chain, UnusedTarget, AddOperation, ForkClone, ForkTargetStorage, Builder, Output, @@ -30,65 +34,43 @@ pub trait ForkCloneBuilder { ) -> Self::Outputs; } -impl ForkCloneBuilder for (F0, F1) -where - R: 'static + Send + Sync + Clone, - F0: FnOnce(Chain) -> U0, - F1: FnOnce(Chain) -> U1, -{ - type Outputs = (U0, U1); +macro_rules! impl_forkclonebuilder_for_tuple { + ($(($F:ident, $U:ident)),*) => { + #[allow(non_snake_case)] + impl) -> $U),*, $($U),*> ForkCloneBuilder for ($($F,)*) + { + type Outputs = ($($U,)*); + fn build_fork_clone( + self, + source: Output, + builder: &mut Builder, + ) -> Self::Outputs { + let mut targets = SmallVec::<[Entity; 8]>::new(); + let ($($F,)*) = self; + let u = + ( + $( + { + let target = builder.commands.spawn(UnusedTarget).id(); + targets.push(target); + ($F)(Chain::new(target, builder)) + }, + )* + ); - fn build_fork_clone( - self, - source: Output, - builder: &mut Builder, - ) -> Self::Outputs { - let target_0 = builder.commands.spawn(UnusedTarget).id(); - let target_1 = builder.commands.spawn(UnusedTarget).id(); - - builder.commands.add(AddOperation::new( - Some(source.scope()), - source.id(), - ForkClone::::new( - ForkTargetStorage::from_iter([target_0, target_1]) - ) - )); - - let u_0 = (self.0)(Chain::new(target_0, builder)); - let u_1 = (self.1)(Chain::new(target_1, builder)); - (u_0, u_1) + builder.commands.add(AddOperation::new( + Some(source.scope()), + source.id(), + ForkClone::::new( + ForkTargetStorage::from_iter(targets) + ) + )); + u + } + } } } -impl ForkCloneBuilder for (F0, F1, F2) -where - R: 'static + Send + Sync + Clone, - F0: FnOnce(Chain) -> U0, - F1: FnOnce(Chain) -> U1, - F2: FnOnce(Chain) -> U2, -{ - type Outputs = (U0, U1, U2); - - fn build_fork_clone( - self, - source: Output, - builder: &mut Builder, - ) -> Self::Outputs { - let target_0 = builder.commands.spawn(UnusedTarget).id(); - let target_1 = builder.commands.spawn(UnusedTarget).id(); - let target_2 = builder.commands.spawn(UnusedTarget).id(); - - builder.commands.add(AddOperation::new( - Some(source.scope()), - source.id(), - ForkClone::::new( - ForkTargetStorage::from_iter([target_0, target_1, target_2]) - ) - )); - - let u_0 = (self.0)(Chain::new(target_0, builder)); - let u_1 = (self.1)(Chain::new(target_1, builder)); - let u_2 = (self.2)(Chain::new(target_2, builder)); - (u_0, u_1, u_2) - } -} +// Implements the `ForkCloneBUilder` trait for all tuples between size 2 and 15 +// (inclusive) +all_tuples!(impl_forkclonebuilder_for_tuple, 2, 15, F, U); From 2c2f0baef05a399755893b2b362cc1de4affe449 Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Thu, 18 Jul 2024 18:23:27 +0800 Subject: [PATCH 07/10] Remove refactor that broke test Signed-off-by: Luca Della Vedova --- src/chain/fork_clone_builder.rs | 34 ++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/chain/fork_clone_builder.rs b/src/chain/fork_clone_builder.rs index ef3ec167..04513393 100644 --- a/src/chain/fork_clone_builder.rs +++ b/src/chain/fork_clone_builder.rs @@ -15,9 +15,7 @@ * */ -use bevy::prelude::Entity; use bevy::utils::all_tuples; -use smallvec::SmallVec; use crate::{ Chain, UnusedTarget, AddOperation, ForkClone, ForkTargetStorage, Builder, @@ -45,18 +43,19 @@ macro_rules! impl_forkclonebuilder_for_tuple { source: Output, builder: &mut Builder, ) -> Self::Outputs { - let mut targets = SmallVec::<[Entity; 8]>::new(); - let ($($F,)*) = self; - let u = - ( + let targets = + [ $( { - let target = builder.commands.spawn(UnusedTarget).id(); - targets.push(target); - ($F)(Chain::new(target, builder)) + // Variable is only used to make sure this cycle is repeated once + // for each instance of the $T type, but the type itself is not + // used. + #[allow(unused)] + let $F = std::marker::PhantomData::<$F>; + builder.commands.spawn(UnusedTarget).id() }, )* - ); + ]; builder.commands.add(AddOperation::new( Some(source.scope()), @@ -65,7 +64,20 @@ macro_rules! impl_forkclonebuilder_for_tuple { ForkTargetStorage::from_iter(targets) ) )); - u + let ($($F,)*) = self; + // The compiler throws a warning when implementing this for + // tuple sizes that wouldn't use the result of the first _idx = _idx + 1 + // so we add a leading underscore to suppress the warning + let mut _idx = 0; + ( + $( + { + let res = ($F)(Chain::new(targets[_idx], builder)); + _idx = _idx + 1; + res + }, + )* + ) } } } From dc95e402ded1e812b0132ece141e038a7b0a3103 Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Mon, 22 Jul 2024 12:18:38 +0800 Subject: [PATCH 08/10] Avoid vector clone Signed-off-by: Luca Della Vedova --- Cargo.toml | 1 + src/chain/unzip.rs | 24 +++++++++--------------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8de60a48..977422c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ authors = ["Grey "] [dependencies] bevy = "0.11" arrayvec = "*" +itertools = "*" smallvec = "*" crossbeam = "*" futures = "0.3" diff --git a/src/chain/unzip.rs b/src/chain/unzip.rs index 9806e463..0cde1612 100644 --- a/src/chain/unzip.rs +++ b/src/chain/unzip.rs @@ -17,6 +17,7 @@ use bevy::utils::all_tuples; +use itertools::Itertools; use smallvec::SmallVec; use crate::{ @@ -36,7 +37,7 @@ pub trait Unzippable: Sized { } macro_rules! impl_unzippable_for_tuple { - ($($T:ident),*) => { + ($(($T:ident, $D:ident)),*) => { #[allow(non_snake_case)] impl<$($T: 'static + Send + Sync),*> Unzippable for ($($T,)*) { @@ -75,20 +76,12 @@ macro_rules! impl_unzippable_for_tuple { .get_entity_mut(source).or_broken()? .take_input::()?; - // Targets is cloned to avoid borrow checker issues when - // doing a mutable borrow of the world later - let targets = world.get::(source).or_broken()?.clone(); - // The compiler throws a warning when implementing this for - // tuple sizes that wouldn't use the result of the first _idx = _idx + 1 - // so we add a leading underscore to suppress the warning - let mut _idx = 0; + let ($($D,)*) = world.get::(source).or_broken()?.0.iter().copied().next_tuple().or_broken()?; let ($($T,)*) = inputs; $( - let target = *targets.0.get(_idx).or_broken()?; - if let Some(mut t_mut) = world.get_entity_mut(target) { + if let Some(mut t_mut) = world.get_entity_mut($D) { t_mut.give_input(session, $T, roster)?; } - _idx = _idx + 1; )* Ok(()) } @@ -102,9 +95,10 @@ macro_rules! impl_unzippable_for_tuple { } } -// Implements the `Unzippable` trait for all tuples between size 1 and 15 +// Implements the `Unzippable` trait for all tuples between size 1 and 12 // (inclusive) made of 'static lifetime types that are `Send` and `Sync` -all_tuples!(impl_unzippable_for_tuple, 1, 15, T); +// D is a dummy type +all_tuples!(impl_unzippable_for_tuple, 1, 12, T, D); /// A trait for constructs that are able to perform a forking unzip of an /// unzippable chain. An unzippable chain is one whose response type contains a @@ -134,5 +128,5 @@ macro_rules! impl_unzipbuilder_for_tuple { } } -// Implements the `UnzipBuilder` trait for all tuples between size 1 and 15 -all_tuples!(impl_unzipbuilder_for_tuple, 2, 15, A, F, U); +// Implements the `UnzipBuilder` trait for all tuples between size 1 and 12 +all_tuples!(impl_unzipbuilder_for_tuple, 2, 12, A, F, U); From 903663c7d31ef50da5b69198011cd001c3e3dad7 Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Mon, 22 Jul 2024 14:24:40 +0800 Subject: [PATCH 09/10] Consistency for tuple size implementation Signed-off-by: Luca Della Vedova --- src/chain/fork_clone_builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/chain/fork_clone_builder.rs b/src/chain/fork_clone_builder.rs index 04513393..acb6ae84 100644 --- a/src/chain/fork_clone_builder.rs +++ b/src/chain/fork_clone_builder.rs @@ -83,6 +83,6 @@ macro_rules! impl_forkclonebuilder_for_tuple { } } -// Implements the `ForkCloneBUilder` trait for all tuples between size 2 and 15 +// Implements the `ForkCloneBUilder` trait for all tuples between size 2 and 12 // (inclusive) -all_tuples!(impl_forkclonebuilder_for_tuple, 2, 15, F, U); +all_tuples!(impl_forkclonebuilder_for_tuple, 2, 12, F, U); From c97a7fcd82c5a355c44514616a89399c916c65c3 Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Mon, 22 Jul 2024 14:32:00 +0800 Subject: [PATCH 10/10] Use macro implementation for tuple of size 1 Signed-off-by: Luca Della Vedova --- src/stream.rs | 83 ++------------------------------------------------- 1 file changed, 2 insertions(+), 81 deletions(-) diff --git a/src/stream.rs b/src/stream.rs index 74f34770..d3e9a919 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -484,85 +484,6 @@ impl StreamPack for () { } } -impl StreamPack for (T1,) { - type StreamAvailableBundle = T1::StreamAvailableBundle; - type StreamFilter = T1::StreamFilter; - type StreamStorageBundle = T1::StreamStorageBundle; - type StreamInputPack = T1::StreamInputPack; - type StreamOutputPack = T1::StreamOutputPack; - type Receiver = T1::Receiver; - type Channel = T1::Channel; - type Buffer = T1::Buffer; - - fn spawn_scope_streams( - in_scope: Entity, - out_scope: Entity, - commands: &mut Commands, - ) -> ( - Self::StreamInputPack, - Self::StreamOutputPack, - ) { - T1::spawn_scope_streams(in_scope, out_scope, commands) - } - - fn spawn_workflow_streams(builder: &mut Builder) -> Self::StreamInputPack { - T1::spawn_workflow_streams(builder) - } - - fn spawn_node_streams( - map: &mut StreamTargetMap, - builder: &mut Builder, - ) -> ( - Self::StreamStorageBundle, - Self::StreamOutputPack, - ) { - T1::spawn_node_streams(map, builder) - } - - fn take_streams(source: Entity, map: &mut StreamTargetMap, builder: &mut Commands) -> ( - Self::StreamStorageBundle, - Self::Receiver, - ) { - T1::take_streams(source, map, builder) - } - - fn collect_streams( - source: Entity, - target: Entity, - map: &mut StreamTargetMap, - commands: &mut Commands, - ) -> Self::StreamStorageBundle { - T1::collect_streams(source, target, map, commands) - } - - fn make_channel( - inner: &Arc, - world: &World, - ) -> Self::Channel { - T1::make_channel(inner, world) - } - - fn make_buffer(source: Entity, world: &World) -> Self::Buffer { - T1::make_buffer(source, world) - } - - fn process_buffer( - buffer: Self::Buffer, - source: Entity, - session: Entity, - unused: &mut UnusedStreams, - world: &mut World, - roster: &mut OperationRoster, - ) -> OperationResult { - T1::process_buffer(buffer, source, session, unused, world, roster)?; - Ok(()) - } - - fn has_streams() -> bool { - T1::has_streams() - } -} - macro_rules! impl_streampack_for_tuple { ($($T:ident),*) => { #[allow(non_snake_case)] @@ -720,9 +641,9 @@ macro_rules! impl_streampack_for_tuple { } } -// Implements the `StreamPack` trait for all tuples between size 2 and 12 +// Implements the `StreamPack` trait for all tuples between size 1 and 12 // (inclusive) made of types that implement `StreamPack` -all_tuples!(impl_streampack_for_tuple, 2, 12, T); +all_tuples!(impl_streampack_for_tuple, 1, 12, T); /// Used by [`ServiceDiscovery`](crate::ServiceDiscovery) to filter services /// based on what streams they provide. If a stream is required, you should wrap