aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormat <27899617+mat-1@users.noreply.github.com>2025-12-11 21:00:37 -0600
committerGitHub <noreply@github.com>2025-12-11 21:00:37 -0600
commitff1e28f88e93ba83cf76569b5613445b841efd45 (patch)
tree2dc6a20bbd0fa3d038fe0e655d1cf96f0e3bb838
parent9bfb1705afb8a48ceace712bc4ee8c0b4d507f49 (diff)
downloadazalea-drasl-ff1e28f88e93ba83cf76569b5613445b841efd45.tar.xz
Run handler function in a Tokio LocalSet (#295)
* Run handler function in a Tokio LocalSet * remove tokio flavor=current_thread from examples * update changelog
-rw-r--r--CHANGELOG.md1
-rw-r--r--azalea-client/src/client.rs10
-rw-r--r--azalea-client/src/plugins/tick_counter.rs2
-rw-r--r--azalea/README.md12
-rw-r--r--azalea/examples/echo.rs2
-rw-r--r--azalea/examples/nearest_entity.rs2
-rw-r--r--azalea/examples/steal.rs2
-rw-r--r--azalea/examples/testbot/main.rs2
-rw-r--r--azalea/examples/todo/craft_dig_straight_down.rs2
-rw-r--r--azalea/examples/todo/mine_a_chunk.rs2
-rw-r--r--azalea/examples/todo/pvp.rs2
-rw-r--r--azalea/src/lib.rs2
-rw-r--r--azalea/src/swarm/mod.rs269
13 files changed, 163 insertions, 147 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a5a6c323..6953a9c8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -29,6 +29,7 @@ is breaking anyways, semantic versioning is not followed.
- Rename `ResourceLocation` to `Identifier` to match Minecraft's new internal naming.
- Rename `azalea_protocol::resolver` to `resolve` and `ResolverError` to `ResolveError`.
- Refactor `RegistryHolder` to pre-deserialize some registries.
+- The handler function is now automatically single-threaded, making `#[tokio::main(flavor = "current_thread")]` unnecessary.
### Fixed
diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs
index 439f4d29..18a54125 100644
--- a/azalea-client/src/client.rs
+++ b/azalea-client/src/client.rs
@@ -158,7 +158,7 @@ impl Client {
/// ```rust,no_run
/// use azalea_client::{Account, Client};
///
- /// #[tokio::main(flavor = "current_thread")]
+ /// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let account = Account::offline("bot");
/// let (client, rx) = Client::join(account, "localhost").await?;
@@ -587,6 +587,12 @@ impl Plugin for AzaleaPlugin {
///
/// You can create your app with `App::new()`, but don't forget to add
/// [`DefaultPlugins`].
+///
+/// # Panics
+///
+/// This function panics if it's called outside of a Tokio `LocalSet` (or
+/// `LocalRuntime`). This exists so Azalea doesn't unexpectedly run game ticks
+/// in the middle of blocking user code.
#[doc(hidden)]
pub fn start_ecs_runner(
app: &mut SubApp,
@@ -615,7 +621,7 @@ pub fn start_ecs_runner(
let (appexit_tx, appexit_rx) = oneshot::channel();
let start_running_systems = move || {
- tokio::spawn(async move {
+ tokio::task::spawn_local(async move {
let appexit = run_schedule_loop(ecs_clone, outer_schedule_label).await;
appexit_tx.send(appexit)
});
diff --git a/azalea-client/src/plugins/tick_counter.rs b/azalea-client/src/plugins/tick_counter.rs
index 9d07991a..43100bba 100644
--- a/azalea-client/src/plugins/tick_counter.rs
+++ b/azalea-client/src/plugins/tick_counter.rs
@@ -8,7 +8,7 @@ use crate::{mining::MiningSystems, movement::send_position, tick_broadcast::send
/// Counts the number of game ticks elapsed on the **local client** since the
/// `login` packet was received.
-#[derive(Component, Clone, Debug, Default)]
+#[derive(Component, Clone, Debug, Default, Eq, PartialEq)]
pub struct TicksConnected(pub u64);
/// Inserts the counter-increment system into the `GameTick` schedule **before**
diff --git a/azalea/README.md b/azalea/README.md
index 7860741a..3055b0c0 100644
--- a/azalea/README.md
+++ b/azalea/README.md
@@ -42,7 +42,7 @@ use std::sync::Arc;
use azalea::prelude::*;
use parking_lot::Mutex;
-#[tokio::main(flavor = "current_thread")]
+#[tokio::main]
async fn main() {
let account = Account::offline("bot");
// or Account::microsoft("example@example.com").await.unwrap();
@@ -97,10 +97,6 @@ Also note that just because something is an entity in the ECS doesn't mean that
See the [Bevy Cheatbook](https://bevy-cheatbook.github.io/programming/ecs-intro.html) to learn more about Bevy ECS (and the ECS paradigm in general).
-# Using a single-threaded Tokio runtime
-
-Due to the fact that Azalea clients store the ECS in a Mutex that's frequently locked and unlocked, bots that rely on the `Client` or `Swarm` types may run into race condition bugs (like out-of-order events and ticks happening at suboptimal moments) if they do not set Tokio to use a single thread with `#[tokio::main(flavor = "current_thread")]`. This may change in a future version of Azalea. Setting this option will usually not result in a performance hit, and Azalea internally will keep using multiple threads for running the ECS itself (because Tokio is not used for this).
-
# Debugging
Azalea uses several relatively complex features of Rust, which may make debugging certain issues more tricky if you're not familiar with them.
@@ -121,4 +117,10 @@ If your code is simply hanging, it might be a deadlock. Enable `parking_lot`'s `
Backtraces are also useful, though they're sometimes hard to read and don't always contain the actual location of the error. Run your code with `RUST_BACKTRACE=1` to enable full backtraces. If it's very long, often searching for the keyword "azalea" will help you filter out unrelated things and find the actual source of the issue.
+# Other common problems
+
+## Using `tokio::task::spawn_local` instead of `tokio::spawn`
+
+If you spawn a task with `tokio::spawn` and move your bot into it, it's possible for Tokio to run the handler function or schedule a Minecraft tick at an unexpected moment. For instance, `bot.component::<TicksConnected>() == bot.component::<TicksConnected>()` is not guaranteed to be true inside of a `tokio::spawn`. Azalea already mitigates this in the handler function by using a Tokio [LocalSet](https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html), but that mitigation does not apply if you call `tokio::spawn` yourself. To avoid this, you must call `tokio::task::spawn_local` in place of `tokio::spawn`. Alternatively, you could also mark your main function with `#[tokio::main(flavor = "current_thread")]`.
+
[`bevy_log`]: https://docs.rs/bevy_log
diff --git a/azalea/examples/echo.rs b/azalea/examples/echo.rs
index 0f59be2a..a2219008 100644
--- a/azalea/examples/echo.rs
+++ b/azalea/examples/echo.rs
@@ -2,7 +2,7 @@
use azalea::prelude::*;
-#[tokio::main(flavor = "current_thread")]
+#[tokio::main]
async fn main() {
let account = Account::offline("bot");
// or let account = Account::microsoft("email").await.unwrap();
diff --git a/azalea/examples/nearest_entity.rs b/azalea/examples/nearest_entity.rs
index 51aa26f6..19223589 100644
--- a/azalea/examples/nearest_entity.rs
+++ b/azalea/examples/nearest_entity.rs
@@ -17,7 +17,7 @@ use bevy_ecs::{
system::Query,
};
-#[tokio::main(flavor = "current_thread")]
+#[tokio::main]
async fn main() {
let account = Account::offline("bot");
diff --git a/azalea/examples/steal.rs b/azalea/examples/steal.rs
index 87a1561b..899c2568 100644
--- a/azalea/examples/steal.rs
+++ b/azalea/examples/steal.rs
@@ -6,7 +6,7 @@ use azalea::{BlockPos, pathfinder::goals::RadiusGoal, prelude::*};
use azalea_inventory::{ItemStack, operations::QuickMoveClick};
use parking_lot::Mutex;
-#[tokio::main(flavor = "current_thread")]
+#[tokio::main]
async fn main() {
let account = Account::offline("bot");
// or let bot = Account::microsoft("email").await.unwrap();
diff --git a/azalea/examples/testbot/main.rs b/azalea/examples/testbot/main.rs
index b4ef20ba..6adb782c 100644
--- a/azalea/examples/testbot/main.rs
+++ b/azalea/examples/testbot/main.rs
@@ -32,7 +32,7 @@ use azalea::{
use commands::{CommandSource, register_commands};
use parking_lot::Mutex;
-#[tokio::main(flavor = "current_thread")]
+#[tokio::main]
async fn main() {
let args = parse_args();
diff --git a/azalea/examples/todo/craft_dig_straight_down.rs b/azalea/examples/todo/craft_dig_straight_down.rs
index bf312331..951c3de2 100644
--- a/azalea/examples/todo/craft_dig_straight_down.rs
+++ b/azalea/examples/todo/craft_dig_straight_down.rs
@@ -8,7 +8,7 @@ struct State {
pub started: Arc<Mutex<bool>>,
}
-#[tokio::main(flavor = "current_thread")]
+#[tokio::main]
async fn main() {
let account = Account::offline("bot");
// or let bot = Account::microsoft("email").await;
diff --git a/azalea/examples/todo/mine_a_chunk.rs b/azalea/examples/todo/mine_a_chunk.rs
index eb7fafd4..0c439f26 100644
--- a/azalea/examples/todo/mine_a_chunk.rs
+++ b/azalea/examples/todo/mine_a_chunk.rs
@@ -1,6 +1,6 @@
use azalea::{prelude::*, swarm::prelude::*};
-#[tokio::main(flavor = "current_thread")]
+#[tokio::main]
async fn main() {
let mut accounts = Vec::new();
let mut states = Vec::new();
diff --git a/azalea/examples/todo/pvp.rs b/azalea/examples/todo/pvp.rs
index 0639d86b..d85278e8 100644
--- a/azalea/examples/todo/pvp.rs
+++ b/azalea/examples/todo/pvp.rs
@@ -5,7 +5,7 @@ use azalea::{
pathfinder, prelude::*, swarm::prelude::*,
};
-#[tokio::main(flavor = "current_thread")]
+#[tokio::main]
async fn main() {
let mut accounts = Vec::new();
let mut states = Vec::new();
diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs
index 0fec9fc0..daed8451 100644
--- a/azalea/src/lib.rs
+++ b/azalea/src/lib.rs
@@ -63,7 +63,7 @@ pub enum StartError {
///
/// ```no_run
/// # use azalea::prelude::*;
-/// # #[tokio::main(flavor = "current_thread")]
+/// # #[tokio::main]
/// # async fn main() {
/// ClientBuilder::new()
/// .set_handler(handle)
diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs
index d49a5190..a744e0bc 100644
--- a/azalea/src/swarm/mod.rs
+++ b/azalea/src/swarm/mod.rs
@@ -32,7 +32,7 @@ use bevy_app::{App, AppExit, PluginGroup, PluginGroupBuilder, Plugins, SubApp};
use bevy_ecs::prelude::*;
use futures::future::{BoxFuture, join_all};
use parking_lot::{Mutex, RwLock};
-use tokio::sync::mpsc;
+use tokio::{sync::mpsc, task};
use tracing::{debug, error, warn};
use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError};
@@ -458,153 +458,160 @@ where
let main_schedule_label = self.app.update_schedule.unwrap();
- let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app);
+ let local_set = task::LocalSet::new();
- let swarm = Swarm {
- ecs_lock: ecs_lock.clone(),
+ let appexit = local_set.run_until(async move {
+ // start_ecs_runner must be run inside of the LocalSet
+ let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app);
- resolved_address: Arc::new(RwLock::new(resolved_address)),
- address: Arc::new(RwLock::new(address)),
- instance_container,
+ let swarm = Swarm {
+ ecs_lock: ecs_lock.clone(),
- bots_tx,
+ resolved_address: Arc::new(RwLock::new(resolved_address)),
+ address: Arc::new(RwLock::new(address)),
+ instance_container,
- swarm_tx: swarm_tx.clone(),
- };
+ bots_tx,
- // run the main schedule so the startup systems run
- {
- let mut ecs = ecs_lock.lock();
- ecs.insert_resource(swarm.clone());
- ecs.insert_resource(self.swarm_state.clone());
- if let Some(reconnect_after) = self.reconnect_after {
- ecs.insert_resource(AutoReconnectDelay {
- delay: reconnect_after,
- });
- } else {
- ecs.remove_resource::<AutoReconnectDelay>();
- }
- ecs.run_schedule(main_schedule_label);
- ecs.clear_trackers();
- }
+ swarm_tx: swarm_tx.clone(),
+ };
- // only do this after we inserted the Swarm and state resources to avoid errors
- // where Res<Swarm> is inaccessible
- start_running_systems();
-
- // SwarmBuilder (self) isn't Send so we have to take all the things we need out
- // of it
- let swarm_clone = swarm.clone();
- let join_delay = self.join_delay;
- let accounts = self.accounts.clone();
- let states = self.states.clone();
-
- tokio::spawn(async move {
- if let Some(join_delay) = join_delay {
- // if there's a join delay, then join one by one
- for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
- let mut join_opts = default_join_opts.clone();
- join_opts.update(bot_join_opts);
- let _ = swarm_clone.add_with_opts(account, state, &join_opts).await;
- tokio::time::sleep(join_delay).await;
+ // run the main schedule so the startup systems run
+ {
+ let mut ecs = ecs_lock.lock();
+ ecs.insert_resource(swarm.clone());
+ ecs.insert_resource(self.swarm_state.clone());
+ if let Some(reconnect_after) = self.reconnect_after {
+ ecs.insert_resource(AutoReconnectDelay {
+ delay: reconnect_after,
+ });
+ } else {
+ ecs.remove_resource::<AutoReconnectDelay>();
}
- } else {
- // otherwise, join all at once
- let swarm_borrow = &swarm_clone;
- join_all(accounts.iter().zip(states).map(
- |((account, bot_join_opts), state)| async {
- let mut join_opts = default_join_opts.clone();
- join_opts.update(bot_join_opts);
- let _ = swarm_borrow
- .clone()
- .add_with_opts(account, state, &join_opts)
- .await;
- },
- ))
- .await;
+ ecs.run_schedule(main_schedule_label);
+ ecs.clear_trackers();
}
- swarm_tx.send(SwarmEvent::Login).unwrap();
- });
-
- let swarm_state = self.swarm_state;
-
- // Watch swarm_rx and send those events to the swarm_handle.
- let swarm_clone = swarm.clone();
- let swarm_handler_task = tokio::spawn(async move {
- while let Some(event) = swarm_rx.recv().await {
- if let Some(swarm_handler) = &self.swarm_handler {
- tokio::spawn((swarm_handler)(
- swarm_clone.clone(),
- event,
- swarm_state.clone(),
- ));
+ // only do this after we inserted the Swarm and state resources to avoid errors
+ // where Res<Swarm> is inaccessible
+ start_running_systems();
+
+ // SwarmBuilder (self) isn't Send so we have to take all the things we need out
+ // of it
+ let swarm_clone = swarm.clone();
+ let join_delay = self.join_delay;
+ let accounts = self.accounts.clone();
+ let states = self.states.clone();
+
+ task::spawn_local(async move {
+ if let Some(join_delay) = join_delay {
+ // if there's a join delay, then join one by one
+ for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
+ let mut join_opts = default_join_opts.clone();
+ join_opts.update(bot_join_opts);
+ let _ = swarm_clone.add_with_opts(account, state, &join_opts).await;
+ tokio::time::sleep(join_delay).await;
+ }
+ } else {
+ // otherwise, join all at once
+ let swarm_borrow = &swarm_clone;
+ join_all(accounts.iter().zip(states).map(
+ |((account, bot_join_opts), state)| async {
+ let mut join_opts = default_join_opts.clone();
+ join_opts.update(bot_join_opts);
+ let _ = swarm_borrow
+ .clone()
+ .add_with_opts(account, state, &join_opts)
+ .await;
+ },
+ ))
+ .await;
}
- }
- unreachable!(
- "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail"
- );
- });
-
- // bot events
- let client_handler_task = tokio::spawn(async move {
- while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
- if bots_rx.len() > 1_000 {
- static WARNED: AtomicBool = AtomicBool::new(false);
- if !WARNED.swap(true, atomic::Ordering::Relaxed) {
- warn!("the Client Event channel has more than 1000 items!")
+ swarm_tx.send(SwarmEvent::Login).unwrap();
+ });
+
+ let swarm_state = self.swarm_state;
+
+ // Watch swarm_rx and send those events to the swarm_handle.
+ let swarm_clone = swarm.clone();
+ let swarm_handler_task = task::spawn_local(async move {
+ while let Some(event) = swarm_rx.recv().await {
+ if let Some(swarm_handler) = &self.swarm_handler {
+ task::spawn_local((swarm_handler)(
+ swarm_clone.clone(),
+ event,
+ swarm_state.clone(),
+ ));
}
}
- if let Some(handler) = &self.handler {
- let ecs_mutex = first_bot.ecs.clone();
- let mut ecs = ecs_mutex.lock();
- let mut query = ecs.query::<Option<&S>>();
- let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
- error!(
- "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
- first_bot.username(),
- first_bot.entity
- );
- continue;
- };
- let first_bot_entity = first_bot.entity;
- let first_bot_state = first_bot_state.clone();
-
- tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone()));
-
- // this makes it not have to keep locking the ecs
- let mut states = HashMap::new();
- states.insert(first_bot_entity, first_bot_state);
- while let Ok((Some(event), bot)) = bots_rx.try_recv() {
- let state = match states.entry(bot.entity) {
- hash_map::Entry::Occupied(e) => e.into_mut(),
- hash_map::Entry::Vacant(e) => {
- let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
- error!(
- "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
- bot.username(),
- bot.entity
- );
- continue;
- };
- let state = state.clone();
- e.insert(state)
- }
+ unreachable!(
+ "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail"
+ );
+ });
+
+ // bot events
+ let client_handler_task = task::spawn_local(async move {
+ while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
+ if bots_rx.len() > 1_000 {
+ static WARNED: AtomicBool = AtomicBool::new(false);
+ if !WARNED.swap(true, atomic::Ordering::Relaxed) {
+ warn!("the Client Event channel has more than 1000 items!")
+ }
+ }
+
+ if let Some(handler) = &self.handler {
+ let ecs_mutex = first_bot.ecs.clone();
+ let mut ecs = ecs_mutex.lock();
+ let mut query = ecs.query::<Option<&S>>();
+ let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
+ error!(
+ "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
+ first_bot.username(),
+ first_bot.entity
+ );
+ continue;
};
- tokio::spawn((handler)(bot, event, state.clone()));
+ let first_bot_entity = first_bot.entity;
+ let first_bot_state = first_bot_state.clone();
+
+ task::spawn_local((handler)(first_bot, first_event, first_bot_state.clone()));
+
+ // this makes it not have to keep locking the ecs
+ let mut states = HashMap::new();
+ states.insert(first_bot_entity, first_bot_state);
+ while let Ok((Some(event), bot)) = bots_rx.try_recv() {
+ let state = match states.entry(bot.entity) {
+ hash_map::Entry::Occupied(e) => e.into_mut(),
+ hash_map::Entry::Vacant(e) => {
+ let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
+ error!(
+ "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
+ bot.username(),
+ bot.entity
+ );
+ continue;
+ };
+ let state = state.clone();
+ e.insert(state)
+ }
+ };
+ task::spawn_local((handler)(bot, event, state.clone()));
+ }
}
}
- }
- });
+ });
- let appexit = appexit_rx
- .await
- .expect("appexit_tx shouldn't be dropped by the ECS runner before sending");
+ let appexit = appexit_rx
+ .await
+ .expect("appexit_tx shouldn't be dropped by the ECS runner before sending");
+
+ swarm_handler_task.abort();
+ client_handler_task.abort();
- swarm_handler_task.abort();
- client_handler_task.abort();
+ appexit
+ }).await;
Ok(appexit)
}
@@ -659,7 +666,7 @@ pub type BoxSwarmHandleFn<SS, R> =
/// #[derive(Default, Clone, Resource)]
/// struct SwarmState {}
///
-/// #[tokio::main(flavor = "current_thread")]
+/// #[tokio::main]
/// async fn main() {
/// let mut accounts = Vec::new();
/// let mut states = Vec::new();
@@ -767,7 +774,7 @@ impl Swarm {
let bots_tx = self.bots_tx.clone();
let join_opts = join_opts.clone();
- tokio::spawn(Self::event_copying_task(
+ task::spawn_local(Self::event_copying_task(
rx, swarm_tx, bots_tx, cloned_bot, join_opts,
));