diff options
| author | mat <27899617+mat-1@users.noreply.github.com> | 2025-12-11 21:00:37 -0600 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-11 21:00:37 -0600 |
| commit | ff1e28f88e93ba83cf76569b5613445b841efd45 (patch) | |
| tree | 2dc6a20bbd0fa3d038fe0e655d1cf96f0e3bb838 | |
| parent | 9bfb1705afb8a48ceace712bc4ee8c0b4d507f49 (diff) | |
| download | azalea-drasl-ff1e28f88e93ba83cf76569b5613445b841efd45.tar.xz | |
Run handler function in a Tokio LocalSet (#295)
* Run handler function in a Tokio LocalSet
* remove tokio flavor=current_thread from examples
* update changelog
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | azalea-client/src/client.rs | 10 | ||||
| -rw-r--r-- | azalea-client/src/plugins/tick_counter.rs | 2 | ||||
| -rw-r--r-- | azalea/README.md | 12 | ||||
| -rw-r--r-- | azalea/examples/echo.rs | 2 | ||||
| -rw-r--r-- | azalea/examples/nearest_entity.rs | 2 | ||||
| -rw-r--r-- | azalea/examples/steal.rs | 2 | ||||
| -rw-r--r-- | azalea/examples/testbot/main.rs | 2 | ||||
| -rw-r--r-- | azalea/examples/todo/craft_dig_straight_down.rs | 2 | ||||
| -rw-r--r-- | azalea/examples/todo/mine_a_chunk.rs | 2 | ||||
| -rw-r--r-- | azalea/examples/todo/pvp.rs | 2 | ||||
| -rw-r--r-- | azalea/src/lib.rs | 2 | ||||
| -rw-r--r-- | azalea/src/swarm/mod.rs | 269 |
13 files changed, 163 insertions, 147 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index a5a6c323..6953a9c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ is breaking anyways, semantic versioning is not followed. - Rename `ResourceLocation` to `Identifier` to match Minecraft's new internal naming. - Rename `azalea_protocol::resolver` to `resolve` and `ResolverError` to `ResolveError`. - Refactor `RegistryHolder` to pre-deserialize some registries. +- The handler function is now automatically single-threaded, making `#[tokio::main(flavor = "current_thread")]` unnecessary. ### Fixed diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index 439f4d29..18a54125 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -158,7 +158,7 @@ impl Client { /// ```rust,no_run /// use azalea_client::{Account, Client}; /// - /// #[tokio::main(flavor = "current_thread")] + /// #[tokio::main] /// async fn main() -> Result<(), Box<dyn std::error::Error>> { /// let account = Account::offline("bot"); /// let (client, rx) = Client::join(account, "localhost").await?; @@ -587,6 +587,12 @@ impl Plugin for AzaleaPlugin { /// /// You can create your app with `App::new()`, but don't forget to add /// [`DefaultPlugins`]. +/// +/// # Panics +/// +/// This function panics if it's called outside of a Tokio `LocalSet` (or +/// `LocalRuntime`). This exists so Azalea doesn't unexpectedly run game ticks +/// in the middle of blocking user code. #[doc(hidden)] pub fn start_ecs_runner( app: &mut SubApp, @@ -615,7 +621,7 @@ pub fn start_ecs_runner( let (appexit_tx, appexit_rx) = oneshot::channel(); let start_running_systems = move || { - tokio::spawn(async move { + tokio::task::spawn_local(async move { let appexit = run_schedule_loop(ecs_clone, outer_schedule_label).await; appexit_tx.send(appexit) }); diff --git a/azalea-client/src/plugins/tick_counter.rs b/azalea-client/src/plugins/tick_counter.rs index 9d07991a..43100bba 100644 --- a/azalea-client/src/plugins/tick_counter.rs +++ b/azalea-client/src/plugins/tick_counter.rs @@ -8,7 +8,7 @@ use crate::{mining::MiningSystems, movement::send_position, tick_broadcast::send /// Counts the number of game ticks elapsed on the **local client** since the /// `login` packet was received. -#[derive(Component, Clone, Debug, Default)] +#[derive(Component, Clone, Debug, Default, Eq, PartialEq)] pub struct TicksConnected(pub u64); /// Inserts the counter-increment system into the `GameTick` schedule **before** diff --git a/azalea/README.md b/azalea/README.md index 7860741a..3055b0c0 100644 --- a/azalea/README.md +++ b/azalea/README.md @@ -42,7 +42,7 @@ use std::sync::Arc; use azalea::prelude::*; use parking_lot::Mutex; -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() { let account = Account::offline("bot"); // or Account::microsoft("example@example.com").await.unwrap(); @@ -97,10 +97,6 @@ Also note that just because something is an entity in the ECS doesn't mean that See the [Bevy Cheatbook](https://bevy-cheatbook.github.io/programming/ecs-intro.html) to learn more about Bevy ECS (and the ECS paradigm in general). -# Using a single-threaded Tokio runtime - -Due to the fact that Azalea clients store the ECS in a Mutex that's frequently locked and unlocked, bots that rely on the `Client` or `Swarm` types may run into race condition bugs (like out-of-order events and ticks happening at suboptimal moments) if they do not set Tokio to use a single thread with `#[tokio::main(flavor = "current_thread")]`. This may change in a future version of Azalea. Setting this option will usually not result in a performance hit, and Azalea internally will keep using multiple threads for running the ECS itself (because Tokio is not used for this). - # Debugging Azalea uses several relatively complex features of Rust, which may make debugging certain issues more tricky if you're not familiar with them. @@ -121,4 +117,10 @@ If your code is simply hanging, it might be a deadlock. Enable `parking_lot`'s ` Backtraces are also useful, though they're sometimes hard to read and don't always contain the actual location of the error. Run your code with `RUST_BACKTRACE=1` to enable full backtraces. If it's very long, often searching for the keyword "azalea" will help you filter out unrelated things and find the actual source of the issue. +# Other common problems + +## Using `tokio::task::spawn_local` instead of `tokio::spawn` + +If you spawn a task with `tokio::spawn` and move your bot into it, it's possible for Tokio to run the handler function or schedule a Minecraft tick at an unexpected moment. For instance, `bot.component::<TicksConnected>() == bot.component::<TicksConnected>()` is not guaranteed to be true inside of a `tokio::spawn`. Azalea already mitigates this in the handler function by using a Tokio [LocalSet](https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html), but that mitigation does not apply if you call `tokio::spawn` yourself. To avoid this, you must call `tokio::task::spawn_local` in place of `tokio::spawn`. Alternatively, you could also mark your main function with `#[tokio::main(flavor = "current_thread")]`. + [`bevy_log`]: https://docs.rs/bevy_log diff --git a/azalea/examples/echo.rs b/azalea/examples/echo.rs index 0f59be2a..a2219008 100644 --- a/azalea/examples/echo.rs +++ b/azalea/examples/echo.rs @@ -2,7 +2,7 @@ use azalea::prelude::*; -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() { let account = Account::offline("bot"); // or let account = Account::microsoft("email").await.unwrap(); diff --git a/azalea/examples/nearest_entity.rs b/azalea/examples/nearest_entity.rs index 51aa26f6..19223589 100644 --- a/azalea/examples/nearest_entity.rs +++ b/azalea/examples/nearest_entity.rs @@ -17,7 +17,7 @@ use bevy_ecs::{ system::Query, }; -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() { let account = Account::offline("bot"); diff --git a/azalea/examples/steal.rs b/azalea/examples/steal.rs index 87a1561b..899c2568 100644 --- a/azalea/examples/steal.rs +++ b/azalea/examples/steal.rs @@ -6,7 +6,7 @@ use azalea::{BlockPos, pathfinder::goals::RadiusGoal, prelude::*}; use azalea_inventory::{ItemStack, operations::QuickMoveClick}; use parking_lot::Mutex; -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() { let account = Account::offline("bot"); // or let bot = Account::microsoft("email").await.unwrap(); diff --git a/azalea/examples/testbot/main.rs b/azalea/examples/testbot/main.rs index b4ef20ba..6adb782c 100644 --- a/azalea/examples/testbot/main.rs +++ b/azalea/examples/testbot/main.rs @@ -32,7 +32,7 @@ use azalea::{ use commands::{CommandSource, register_commands}; use parking_lot::Mutex; -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() { let args = parse_args(); diff --git a/azalea/examples/todo/craft_dig_straight_down.rs b/azalea/examples/todo/craft_dig_straight_down.rs index bf312331..951c3de2 100644 --- a/azalea/examples/todo/craft_dig_straight_down.rs +++ b/azalea/examples/todo/craft_dig_straight_down.rs @@ -8,7 +8,7 @@ struct State { pub started: Arc<Mutex<bool>>, } -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() { let account = Account::offline("bot"); // or let bot = Account::microsoft("email").await; diff --git a/azalea/examples/todo/mine_a_chunk.rs b/azalea/examples/todo/mine_a_chunk.rs index eb7fafd4..0c439f26 100644 --- a/azalea/examples/todo/mine_a_chunk.rs +++ b/azalea/examples/todo/mine_a_chunk.rs @@ -1,6 +1,6 @@ use azalea::{prelude::*, swarm::prelude::*}; -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() { let mut accounts = Vec::new(); let mut states = Vec::new(); diff --git a/azalea/examples/todo/pvp.rs b/azalea/examples/todo/pvp.rs index 0639d86b..d85278e8 100644 --- a/azalea/examples/todo/pvp.rs +++ b/azalea/examples/todo/pvp.rs @@ -5,7 +5,7 @@ use azalea::{ pathfinder, prelude::*, swarm::prelude::*, }; -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() { let mut accounts = Vec::new(); let mut states = Vec::new(); diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs index 0fec9fc0..daed8451 100644 --- a/azalea/src/lib.rs +++ b/azalea/src/lib.rs @@ -63,7 +63,7 @@ pub enum StartError { /// /// ```no_run /// # use azalea::prelude::*; -/// # #[tokio::main(flavor = "current_thread")] +/// # #[tokio::main] /// # async fn main() { /// ClientBuilder::new() /// .set_handler(handle) diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs index d49a5190..a744e0bc 100644 --- a/azalea/src/swarm/mod.rs +++ b/azalea/src/swarm/mod.rs @@ -32,7 +32,7 @@ use bevy_app::{App, AppExit, PluginGroup, PluginGroupBuilder, Plugins, SubApp}; use bevy_ecs::prelude::*; use futures::future::{BoxFuture, join_all}; use parking_lot::{Mutex, RwLock}; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task}; use tracing::{debug, error, warn}; use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError}; @@ -458,153 +458,160 @@ where let main_schedule_label = self.app.update_schedule.unwrap(); - let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app); + let local_set = task::LocalSet::new(); - let swarm = Swarm { - ecs_lock: ecs_lock.clone(), + let appexit = local_set.run_until(async move { + // start_ecs_runner must be run inside of the LocalSet + let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app); - resolved_address: Arc::new(RwLock::new(resolved_address)), - address: Arc::new(RwLock::new(address)), - instance_container, + let swarm = Swarm { + ecs_lock: ecs_lock.clone(), - bots_tx, + resolved_address: Arc::new(RwLock::new(resolved_address)), + address: Arc::new(RwLock::new(address)), + instance_container, - swarm_tx: swarm_tx.clone(), - }; + bots_tx, - // run the main schedule so the startup systems run - { - let mut ecs = ecs_lock.lock(); - ecs.insert_resource(swarm.clone()); - ecs.insert_resource(self.swarm_state.clone()); - if let Some(reconnect_after) = self.reconnect_after { - ecs.insert_resource(AutoReconnectDelay { - delay: reconnect_after, - }); - } else { - ecs.remove_resource::<AutoReconnectDelay>(); - } - ecs.run_schedule(main_schedule_label); - ecs.clear_trackers(); - } + swarm_tx: swarm_tx.clone(), + }; - // only do this after we inserted the Swarm and state resources to avoid errors - // where Res<Swarm> is inaccessible - start_running_systems(); - - // SwarmBuilder (self) isn't Send so we have to take all the things we need out - // of it - let swarm_clone = swarm.clone(); - let join_delay = self.join_delay; - let accounts = self.accounts.clone(); - let states = self.states.clone(); - - tokio::spawn(async move { - if let Some(join_delay) = join_delay { - // if there's a join delay, then join one by one - for ((account, bot_join_opts), state) in accounts.iter().zip(states) { - let mut join_opts = default_join_opts.clone(); - join_opts.update(bot_join_opts); - let _ = swarm_clone.add_with_opts(account, state, &join_opts).await; - tokio::time::sleep(join_delay).await; + // run the main schedule so the startup systems run + { + let mut ecs = ecs_lock.lock(); + ecs.insert_resource(swarm.clone()); + ecs.insert_resource(self.swarm_state.clone()); + if let Some(reconnect_after) = self.reconnect_after { + ecs.insert_resource(AutoReconnectDelay { + delay: reconnect_after, + }); + } else { + ecs.remove_resource::<AutoReconnectDelay>(); } - } else { - // otherwise, join all at once - let swarm_borrow = &swarm_clone; - join_all(accounts.iter().zip(states).map( - |((account, bot_join_opts), state)| async { - let mut join_opts = default_join_opts.clone(); - join_opts.update(bot_join_opts); - let _ = swarm_borrow - .clone() - .add_with_opts(account, state, &join_opts) - .await; - }, - )) - .await; + ecs.run_schedule(main_schedule_label); + ecs.clear_trackers(); } - swarm_tx.send(SwarmEvent::Login).unwrap(); - }); - - let swarm_state = self.swarm_state; - - // Watch swarm_rx and send those events to the swarm_handle. - let swarm_clone = swarm.clone(); - let swarm_handler_task = tokio::spawn(async move { - while let Some(event) = swarm_rx.recv().await { - if let Some(swarm_handler) = &self.swarm_handler { - tokio::spawn((swarm_handler)( - swarm_clone.clone(), - event, - swarm_state.clone(), - )); + // only do this after we inserted the Swarm and state resources to avoid errors + // where Res<Swarm> is inaccessible + start_running_systems(); + + // SwarmBuilder (self) isn't Send so we have to take all the things we need out + // of it + let swarm_clone = swarm.clone(); + let join_delay = self.join_delay; + let accounts = self.accounts.clone(); + let states = self.states.clone(); + + task::spawn_local(async move { + if let Some(join_delay) = join_delay { + // if there's a join delay, then join one by one + for ((account, bot_join_opts), state) in accounts.iter().zip(states) { + let mut join_opts = default_join_opts.clone(); + join_opts.update(bot_join_opts); + let _ = swarm_clone.add_with_opts(account, state, &join_opts).await; + tokio::time::sleep(join_delay).await; + } + } else { + // otherwise, join all at once + let swarm_borrow = &swarm_clone; + join_all(accounts.iter().zip(states).map( + |((account, bot_join_opts), state)| async { + let mut join_opts = default_join_opts.clone(); + join_opts.update(bot_join_opts); + let _ = swarm_borrow + .clone() + .add_with_opts(account, state, &join_opts) + .await; + }, + )) + .await; } - } - unreachable!( - "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail" - ); - }); - - // bot events - let client_handler_task = tokio::spawn(async move { - while let Some((Some(first_event), first_bot)) = bots_rx.recv().await { - if bots_rx.len() > 1_000 { - static WARNED: AtomicBool = AtomicBool::new(false); - if !WARNED.swap(true, atomic::Ordering::Relaxed) { - warn!("the Client Event channel has more than 1000 items!") + swarm_tx.send(SwarmEvent::Login).unwrap(); + }); + + let swarm_state = self.swarm_state; + + // Watch swarm_rx and send those events to the swarm_handle. + let swarm_clone = swarm.clone(); + let swarm_handler_task = task::spawn_local(async move { + while let Some(event) = swarm_rx.recv().await { + if let Some(swarm_handler) = &self.swarm_handler { + task::spawn_local((swarm_handler)( + swarm_clone.clone(), + event, + swarm_state.clone(), + )); } } - if let Some(handler) = &self.handler { - let ecs_mutex = first_bot.ecs.clone(); - let mut ecs = ecs_mutex.lock(); - let mut query = ecs.query::<Option<&S>>(); - let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else { - error!( - "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.", - first_bot.username(), - first_bot.entity - ); - continue; - }; - let first_bot_entity = first_bot.entity; - let first_bot_state = first_bot_state.clone(); - - tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone())); - - // this makes it not have to keep locking the ecs - let mut states = HashMap::new(); - states.insert(first_bot_entity, first_bot_state); - while let Ok((Some(event), bot)) = bots_rx.try_recv() { - let state = match states.entry(bot.entity) { - hash_map::Entry::Occupied(e) => e.into_mut(), - hash_map::Entry::Vacant(e) => { - let Ok(Some(state)) = query.get(&ecs, bot.entity) else { - error!( - "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.", - bot.username(), - bot.entity - ); - continue; - }; - let state = state.clone(); - e.insert(state) - } + unreachable!( + "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail" + ); + }); + + // bot events + let client_handler_task = task::spawn_local(async move { + while let Some((Some(first_event), first_bot)) = bots_rx.recv().await { + if bots_rx.len() > 1_000 { + static WARNED: AtomicBool = AtomicBool::new(false); + if !WARNED.swap(true, atomic::Ordering::Relaxed) { + warn!("the Client Event channel has more than 1000 items!") + } + } + + if let Some(handler) = &self.handler { + let ecs_mutex = first_bot.ecs.clone(); + let mut ecs = ecs_mutex.lock(); + let mut query = ecs.query::<Option<&S>>(); + let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else { + error!( + "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.", + first_bot.username(), + first_bot.entity + ); + continue; }; - tokio::spawn((handler)(bot, event, state.clone())); + let first_bot_entity = first_bot.entity; + let first_bot_state = first_bot_state.clone(); + + task::spawn_local((handler)(first_bot, first_event, first_bot_state.clone())); + + // this makes it not have to keep locking the ecs + let mut states = HashMap::new(); + states.insert(first_bot_entity, first_bot_state); + while let Ok((Some(event), bot)) = bots_rx.try_recv() { + let state = match states.entry(bot.entity) { + hash_map::Entry::Occupied(e) => e.into_mut(), + hash_map::Entry::Vacant(e) => { + let Ok(Some(state)) = query.get(&ecs, bot.entity) else { + error!( + "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.", + bot.username(), + bot.entity + ); + continue; + }; + let state = state.clone(); + e.insert(state) + } + }; + task::spawn_local((handler)(bot, event, state.clone())); + } } } - } - }); + }); - let appexit = appexit_rx - .await - .expect("appexit_tx shouldn't be dropped by the ECS runner before sending"); + let appexit = appexit_rx + .await + .expect("appexit_tx shouldn't be dropped by the ECS runner before sending"); + + swarm_handler_task.abort(); + client_handler_task.abort(); - swarm_handler_task.abort(); - client_handler_task.abort(); + appexit + }).await; Ok(appexit) } @@ -659,7 +666,7 @@ pub type BoxSwarmHandleFn<SS, R> = /// #[derive(Default, Clone, Resource)] /// struct SwarmState {} /// -/// #[tokio::main(flavor = "current_thread")] +/// #[tokio::main] /// async fn main() { /// let mut accounts = Vec::new(); /// let mut states = Vec::new(); @@ -767,7 +774,7 @@ impl Swarm { let bots_tx = self.bots_tx.clone(); let join_opts = join_opts.clone(); - tokio::spawn(Self::event_copying_task( + task::spawn_local(Self::event_copying_task( rx, swarm_tx, bots_tx, cloned_bot, join_opts, )); |
