aboutsummaryrefslogtreecommitdiff
path: root/azalea/src/swarm
diff options
context:
space:
mode:
authormat <27899617+mat-1@users.noreply.github.com>2025-12-11 21:00:37 -0600
committerGitHub <noreply@github.com>2025-12-11 21:00:37 -0600
commitff1e28f88e93ba83cf76569b5613445b841efd45 (patch)
tree2dc6a20bbd0fa3d038fe0e655d1cf96f0e3bb838 /azalea/src/swarm
parent9bfb1705afb8a48ceace712bc4ee8c0b4d507f49 (diff)
downloadazalea-drasl-ff1e28f88e93ba83cf76569b5613445b841efd45.tar.xz
Run handler function in a Tokio LocalSet (#295)
* Run handler function in a Tokio LocalSet * remove tokio flavor=current_thread from examples * update changelog
Diffstat (limited to 'azalea/src/swarm')
-rw-r--r--azalea/src/swarm/mod.rs269
1 files changed, 138 insertions, 131 deletions
diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs
index d49a5190..a744e0bc 100644
--- a/azalea/src/swarm/mod.rs
+++ b/azalea/src/swarm/mod.rs
@@ -32,7 +32,7 @@ use bevy_app::{App, AppExit, PluginGroup, PluginGroupBuilder, Plugins, SubApp};
use bevy_ecs::prelude::*;
use futures::future::{BoxFuture, join_all};
use parking_lot::{Mutex, RwLock};
-use tokio::sync::mpsc;
+use tokio::{sync::mpsc, task};
use tracing::{debug, error, warn};
use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError};
@@ -458,153 +458,160 @@ where
let main_schedule_label = self.app.update_schedule.unwrap();
- let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app);
+ let local_set = task::LocalSet::new();
- let swarm = Swarm {
- ecs_lock: ecs_lock.clone(),
+ let appexit = local_set.run_until(async move {
+ // start_ecs_runner must be run inside of the LocalSet
+ let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app);
- resolved_address: Arc::new(RwLock::new(resolved_address)),
- address: Arc::new(RwLock::new(address)),
- instance_container,
+ let swarm = Swarm {
+ ecs_lock: ecs_lock.clone(),
- bots_tx,
+ resolved_address: Arc::new(RwLock::new(resolved_address)),
+ address: Arc::new(RwLock::new(address)),
+ instance_container,
- swarm_tx: swarm_tx.clone(),
- };
+ bots_tx,
- // run the main schedule so the startup systems run
- {
- let mut ecs = ecs_lock.lock();
- ecs.insert_resource(swarm.clone());
- ecs.insert_resource(self.swarm_state.clone());
- if let Some(reconnect_after) = self.reconnect_after {
- ecs.insert_resource(AutoReconnectDelay {
- delay: reconnect_after,
- });
- } else {
- ecs.remove_resource::<AutoReconnectDelay>();
- }
- ecs.run_schedule(main_schedule_label);
- ecs.clear_trackers();
- }
+ swarm_tx: swarm_tx.clone(),
+ };
- // only do this after we inserted the Swarm and state resources to avoid errors
- // where Res<Swarm> is inaccessible
- start_running_systems();
-
- // SwarmBuilder (self) isn't Send so we have to take all the things we need out
- // of it
- let swarm_clone = swarm.clone();
- let join_delay = self.join_delay;
- let accounts = self.accounts.clone();
- let states = self.states.clone();
-
- tokio::spawn(async move {
- if let Some(join_delay) = join_delay {
- // if there's a join delay, then join one by one
- for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
- let mut join_opts = default_join_opts.clone();
- join_opts.update(bot_join_opts);
- let _ = swarm_clone.add_with_opts(account, state, &join_opts).await;
- tokio::time::sleep(join_delay).await;
+ // run the main schedule so the startup systems run
+ {
+ let mut ecs = ecs_lock.lock();
+ ecs.insert_resource(swarm.clone());
+ ecs.insert_resource(self.swarm_state.clone());
+ if let Some(reconnect_after) = self.reconnect_after {
+ ecs.insert_resource(AutoReconnectDelay {
+ delay: reconnect_after,
+ });
+ } else {
+ ecs.remove_resource::<AutoReconnectDelay>();
}
- } else {
- // otherwise, join all at once
- let swarm_borrow = &swarm_clone;
- join_all(accounts.iter().zip(states).map(
- |((account, bot_join_opts), state)| async {
- let mut join_opts = default_join_opts.clone();
- join_opts.update(bot_join_opts);
- let _ = swarm_borrow
- .clone()
- .add_with_opts(account, state, &join_opts)
- .await;
- },
- ))
- .await;
+ ecs.run_schedule(main_schedule_label);
+ ecs.clear_trackers();
}
- swarm_tx.send(SwarmEvent::Login).unwrap();
- });
-
- let swarm_state = self.swarm_state;
-
- // Watch swarm_rx and send those events to the swarm_handle.
- let swarm_clone = swarm.clone();
- let swarm_handler_task = tokio::spawn(async move {
- while let Some(event) = swarm_rx.recv().await {
- if let Some(swarm_handler) = &self.swarm_handler {
- tokio::spawn((swarm_handler)(
- swarm_clone.clone(),
- event,
- swarm_state.clone(),
- ));
+ // only do this after we inserted the Swarm and state resources to avoid errors
+ // where Res<Swarm> is inaccessible
+ start_running_systems();
+
+ // SwarmBuilder (self) isn't Send so we have to take all the things we need out
+ // of it
+ let swarm_clone = swarm.clone();
+ let join_delay = self.join_delay;
+ let accounts = self.accounts.clone();
+ let states = self.states.clone();
+
+ task::spawn_local(async move {
+ if let Some(join_delay) = join_delay {
+ // if there's a join delay, then join one by one
+ for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
+ let mut join_opts = default_join_opts.clone();
+ join_opts.update(bot_join_opts);
+ let _ = swarm_clone.add_with_opts(account, state, &join_opts).await;
+ tokio::time::sleep(join_delay).await;
+ }
+ } else {
+ // otherwise, join all at once
+ let swarm_borrow = &swarm_clone;
+ join_all(accounts.iter().zip(states).map(
+ |((account, bot_join_opts), state)| async {
+ let mut join_opts = default_join_opts.clone();
+ join_opts.update(bot_join_opts);
+ let _ = swarm_borrow
+ .clone()
+ .add_with_opts(account, state, &join_opts)
+ .await;
+ },
+ ))
+ .await;
}
- }
- unreachable!(
- "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail"
- );
- });
-
- // bot events
- let client_handler_task = tokio::spawn(async move {
- while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
- if bots_rx.len() > 1_000 {
- static WARNED: AtomicBool = AtomicBool::new(false);
- if !WARNED.swap(true, atomic::Ordering::Relaxed) {
- warn!("the Client Event channel has more than 1000 items!")
+ swarm_tx.send(SwarmEvent::Login).unwrap();
+ });
+
+ let swarm_state = self.swarm_state;
+
+ // Watch swarm_rx and send those events to the swarm_handle.
+ let swarm_clone = swarm.clone();
+ let swarm_handler_task = task::spawn_local(async move {
+ while let Some(event) = swarm_rx.recv().await {
+ if let Some(swarm_handler) = &self.swarm_handler {
+ task::spawn_local((swarm_handler)(
+ swarm_clone.clone(),
+ event,
+ swarm_state.clone(),
+ ));
}
}
- if let Some(handler) = &self.handler {
- let ecs_mutex = first_bot.ecs.clone();
- let mut ecs = ecs_mutex.lock();
- let mut query = ecs.query::<Option<&S>>();
- let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
- error!(
- "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
- first_bot.username(),
- first_bot.entity
- );
- continue;
- };
- let first_bot_entity = first_bot.entity;
- let first_bot_state = first_bot_state.clone();
-
- tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone()));
-
- // this makes it not have to keep locking the ecs
- let mut states = HashMap::new();
- states.insert(first_bot_entity, first_bot_state);
- while let Ok((Some(event), bot)) = bots_rx.try_recv() {
- let state = match states.entry(bot.entity) {
- hash_map::Entry::Occupied(e) => e.into_mut(),
- hash_map::Entry::Vacant(e) => {
- let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
- error!(
- "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
- bot.username(),
- bot.entity
- );
- continue;
- };
- let state = state.clone();
- e.insert(state)
- }
+ unreachable!(
+ "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail"
+ );
+ });
+
+ // bot events
+ let client_handler_task = task::spawn_local(async move {
+ while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
+ if bots_rx.len() > 1_000 {
+ static WARNED: AtomicBool = AtomicBool::new(false);
+ if !WARNED.swap(true, atomic::Ordering::Relaxed) {
+ warn!("the Client Event channel has more than 1000 items!")
+ }
+ }
+
+ if let Some(handler) = &self.handler {
+ let ecs_mutex = first_bot.ecs.clone();
+ let mut ecs = ecs_mutex.lock();
+ let mut query = ecs.query::<Option<&S>>();
+ let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
+ error!(
+ "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
+ first_bot.username(),
+ first_bot.entity
+ );
+ continue;
};
- tokio::spawn((handler)(bot, event, state.clone()));
+ let first_bot_entity = first_bot.entity;
+ let first_bot_state = first_bot_state.clone();
+
+ task::spawn_local((handler)(first_bot, first_event, first_bot_state.clone()));
+
+ // this makes it not have to keep locking the ecs
+ let mut states = HashMap::new();
+ states.insert(first_bot_entity, first_bot_state);
+ while let Ok((Some(event), bot)) = bots_rx.try_recv() {
+ let state = match states.entry(bot.entity) {
+ hash_map::Entry::Occupied(e) => e.into_mut(),
+ hash_map::Entry::Vacant(e) => {
+ let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
+ error!(
+ "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
+ bot.username(),
+ bot.entity
+ );
+ continue;
+ };
+ let state = state.clone();
+ e.insert(state)
+ }
+ };
+ task::spawn_local((handler)(bot, event, state.clone()));
+ }
}
}
- }
- });
+ });
- let appexit = appexit_rx
- .await
- .expect("appexit_tx shouldn't be dropped by the ECS runner before sending");
+ let appexit = appexit_rx
+ .await
+ .expect("appexit_tx shouldn't be dropped by the ECS runner before sending");
+
+ swarm_handler_task.abort();
+ client_handler_task.abort();
- swarm_handler_task.abort();
- client_handler_task.abort();
+ appexit
+ }).await;
Ok(appexit)
}
@@ -659,7 +666,7 @@ pub type BoxSwarmHandleFn<SS, R> =
/// #[derive(Default, Clone, Resource)]
/// struct SwarmState {}
///
-/// #[tokio::main(flavor = "current_thread")]
+/// #[tokio::main]
/// async fn main() {
/// let mut accounts = Vec::new();
/// let mut states = Vec::new();
@@ -767,7 +774,7 @@ impl Swarm {
let bots_tx = self.bots_tx.clone();
let join_opts = join_opts.clone();
- tokio::spawn(Self::event_copying_task(
+ task::spawn_local(Self::event_copying_task(
rx, swarm_tx, bots_tx, cloned_bot, join_opts,
));