diff options
Diffstat (limited to 'azalea/src')
| -rw-r--r-- | azalea/src/lib.rs | 2 | ||||
| -rw-r--r-- | azalea/src/swarm/mod.rs | 269 |
2 files changed, 139 insertions, 132 deletions
diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs index 0fec9fc0..daed8451 100644 --- a/azalea/src/lib.rs +++ b/azalea/src/lib.rs @@ -63,7 +63,7 @@ pub enum StartError { /// /// ```no_run /// # use azalea::prelude::*; -/// # #[tokio::main(flavor = "current_thread")] +/// # #[tokio::main] /// # async fn main() { /// ClientBuilder::new() /// .set_handler(handle) diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs index d49a5190..a744e0bc 100644 --- a/azalea/src/swarm/mod.rs +++ b/azalea/src/swarm/mod.rs @@ -32,7 +32,7 @@ use bevy_app::{App, AppExit, PluginGroup, PluginGroupBuilder, Plugins, SubApp}; use bevy_ecs::prelude::*; use futures::future::{BoxFuture, join_all}; use parking_lot::{Mutex, RwLock}; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task}; use tracing::{debug, error, warn}; use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError}; @@ -458,153 +458,160 @@ where let main_schedule_label = self.app.update_schedule.unwrap(); - let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app); + let local_set = task::LocalSet::new(); - let swarm = Swarm { - ecs_lock: ecs_lock.clone(), + let appexit = local_set.run_until(async move { + // start_ecs_runner must be run inside of the LocalSet + let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app); - resolved_address: Arc::new(RwLock::new(resolved_address)), - address: Arc::new(RwLock::new(address)), - instance_container, + let swarm = Swarm { + ecs_lock: ecs_lock.clone(), - bots_tx, + resolved_address: Arc::new(RwLock::new(resolved_address)), + address: Arc::new(RwLock::new(address)), + instance_container, - swarm_tx: swarm_tx.clone(), - }; + bots_tx, - // run the main schedule so the startup systems run - { - let mut ecs = ecs_lock.lock(); - ecs.insert_resource(swarm.clone()); - ecs.insert_resource(self.swarm_state.clone()); - if let Some(reconnect_after) = self.reconnect_after { - ecs.insert_resource(AutoReconnectDelay { - delay: reconnect_after, - }); - } else { - ecs.remove_resource::<AutoReconnectDelay>(); - } - ecs.run_schedule(main_schedule_label); - ecs.clear_trackers(); - } + swarm_tx: swarm_tx.clone(), + }; - // only do this after we inserted the Swarm and state resources to avoid errors - // where Res<Swarm> is inaccessible - start_running_systems(); - - // SwarmBuilder (self) isn't Send so we have to take all the things we need out - // of it - let swarm_clone = swarm.clone(); - let join_delay = self.join_delay; - let accounts = self.accounts.clone(); - let states = self.states.clone(); - - tokio::spawn(async move { - if let Some(join_delay) = join_delay { - // if there's a join delay, then join one by one - for ((account, bot_join_opts), state) in accounts.iter().zip(states) { - let mut join_opts = default_join_opts.clone(); - join_opts.update(bot_join_opts); - let _ = swarm_clone.add_with_opts(account, state, &join_opts).await; - tokio::time::sleep(join_delay).await; + // run the main schedule so the startup systems run + { + let mut ecs = ecs_lock.lock(); + ecs.insert_resource(swarm.clone()); + ecs.insert_resource(self.swarm_state.clone()); + if let Some(reconnect_after) = self.reconnect_after { + ecs.insert_resource(AutoReconnectDelay { + delay: reconnect_after, + }); + } else { + ecs.remove_resource::<AutoReconnectDelay>(); } - } else { - // otherwise, join all at once - let swarm_borrow = &swarm_clone; - join_all(accounts.iter().zip(states).map( - |((account, bot_join_opts), state)| async { - let mut join_opts = default_join_opts.clone(); - join_opts.update(bot_join_opts); - let _ = swarm_borrow - .clone() - .add_with_opts(account, state, &join_opts) - .await; - }, - )) - .await; + ecs.run_schedule(main_schedule_label); + ecs.clear_trackers(); } - swarm_tx.send(SwarmEvent::Login).unwrap(); - }); - - let swarm_state = self.swarm_state; - - // Watch swarm_rx and send those events to the swarm_handle. - let swarm_clone = swarm.clone(); - let swarm_handler_task = tokio::spawn(async move { - while let Some(event) = swarm_rx.recv().await { - if let Some(swarm_handler) = &self.swarm_handler { - tokio::spawn((swarm_handler)( - swarm_clone.clone(), - event, - swarm_state.clone(), - )); + // only do this after we inserted the Swarm and state resources to avoid errors + // where Res<Swarm> is inaccessible + start_running_systems(); + + // SwarmBuilder (self) isn't Send so we have to take all the things we need out + // of it + let swarm_clone = swarm.clone(); + let join_delay = self.join_delay; + let accounts = self.accounts.clone(); + let states = self.states.clone(); + + task::spawn_local(async move { + if let Some(join_delay) = join_delay { + // if there's a join delay, then join one by one + for ((account, bot_join_opts), state) in accounts.iter().zip(states) { + let mut join_opts = default_join_opts.clone(); + join_opts.update(bot_join_opts); + let _ = swarm_clone.add_with_opts(account, state, &join_opts).await; + tokio::time::sleep(join_delay).await; + } + } else { + // otherwise, join all at once + let swarm_borrow = &swarm_clone; + join_all(accounts.iter().zip(states).map( + |((account, bot_join_opts), state)| async { + let mut join_opts = default_join_opts.clone(); + join_opts.update(bot_join_opts); + let _ = swarm_borrow + .clone() + .add_with_opts(account, state, &join_opts) + .await; + }, + )) + .await; } - } - unreachable!( - "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail" - ); - }); - - // bot events - let client_handler_task = tokio::spawn(async move { - while let Some((Some(first_event), first_bot)) = bots_rx.recv().await { - if bots_rx.len() > 1_000 { - static WARNED: AtomicBool = AtomicBool::new(false); - if !WARNED.swap(true, atomic::Ordering::Relaxed) { - warn!("the Client Event channel has more than 1000 items!") + swarm_tx.send(SwarmEvent::Login).unwrap(); + }); + + let swarm_state = self.swarm_state; + + // Watch swarm_rx and send those events to the swarm_handle. + let swarm_clone = swarm.clone(); + let swarm_handler_task = task::spawn_local(async move { + while let Some(event) = swarm_rx.recv().await { + if let Some(swarm_handler) = &self.swarm_handler { + task::spawn_local((swarm_handler)( + swarm_clone.clone(), + event, + swarm_state.clone(), + )); } } - if let Some(handler) = &self.handler { - let ecs_mutex = first_bot.ecs.clone(); - let mut ecs = ecs_mutex.lock(); - let mut query = ecs.query::<Option<&S>>(); - let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else { - error!( - "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.", - first_bot.username(), - first_bot.entity - ); - continue; - }; - let first_bot_entity = first_bot.entity; - let first_bot_state = first_bot_state.clone(); - - tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone())); - - // this makes it not have to keep locking the ecs - let mut states = HashMap::new(); - states.insert(first_bot_entity, first_bot_state); - while let Ok((Some(event), bot)) = bots_rx.try_recv() { - let state = match states.entry(bot.entity) { - hash_map::Entry::Occupied(e) => e.into_mut(), - hash_map::Entry::Vacant(e) => { - let Ok(Some(state)) = query.get(&ecs, bot.entity) else { - error!( - "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.", - bot.username(), - bot.entity - ); - continue; - }; - let state = state.clone(); - e.insert(state) - } + unreachable!( + "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail" + ); + }); + + // bot events + let client_handler_task = task::spawn_local(async move { + while let Some((Some(first_event), first_bot)) = bots_rx.recv().await { + if bots_rx.len() > 1_000 { + static WARNED: AtomicBool = AtomicBool::new(false); + if !WARNED.swap(true, atomic::Ordering::Relaxed) { + warn!("the Client Event channel has more than 1000 items!") + } + } + + if let Some(handler) = &self.handler { + let ecs_mutex = first_bot.ecs.clone(); + let mut ecs = ecs_mutex.lock(); + let mut query = ecs.query::<Option<&S>>(); + let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else { + error!( + "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.", + first_bot.username(), + first_bot.entity + ); + continue; }; - tokio::spawn((handler)(bot, event, state.clone())); + let first_bot_entity = first_bot.entity; + let first_bot_state = first_bot_state.clone(); + + task::spawn_local((handler)(first_bot, first_event, first_bot_state.clone())); + + // this makes it not have to keep locking the ecs + let mut states = HashMap::new(); + states.insert(first_bot_entity, first_bot_state); + while let Ok((Some(event), bot)) = bots_rx.try_recv() { + let state = match states.entry(bot.entity) { + hash_map::Entry::Occupied(e) => e.into_mut(), + hash_map::Entry::Vacant(e) => { + let Ok(Some(state)) = query.get(&ecs, bot.entity) else { + error!( + "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.", + bot.username(), + bot.entity + ); + continue; + }; + let state = state.clone(); + e.insert(state) + } + }; + task::spawn_local((handler)(bot, event, state.clone())); + } } } - } - }); + }); - let appexit = appexit_rx - .await - .expect("appexit_tx shouldn't be dropped by the ECS runner before sending"); + let appexit = appexit_rx + .await + .expect("appexit_tx shouldn't be dropped by the ECS runner before sending"); + + swarm_handler_task.abort(); + client_handler_task.abort(); - swarm_handler_task.abort(); - client_handler_task.abort(); + appexit + }).await; Ok(appexit) } @@ -659,7 +666,7 @@ pub type BoxSwarmHandleFn<SS, R> = /// #[derive(Default, Clone, Resource)] /// struct SwarmState {} /// -/// #[tokio::main(flavor = "current_thread")] +/// #[tokio::main] /// async fn main() { /// let mut accounts = Vec::new(); /// let mut states = Vec::new(); @@ -767,7 +774,7 @@ impl Swarm { let bots_tx = self.bots_tx.clone(); let join_opts = join_opts.clone(); - tokio::spawn(Self::event_copying_task( + task::spawn_local(Self::event_copying_task( rx, swarm_tx, bots_tx, cloned_bot, join_opts, )); |
