Skip to content

feat: implement support for volatile events with new VolatileSocket w…#744

Open
LiamCarPer wants to merge 3 commits into
Totodore:mainfrom
LiamCarPer:feat/volatile-flag
Open

feat: implement support for volatile events with new VolatileSocket w…#744
LiamCarPer wants to merge 3 commits into
Totodore:mainfrom
LiamCarPer:feat/volatile-flag

Conversation

@LiamCarPer

Copy link
Copy Markdown

Motivation

Socket.io's Node.js implementation supports a volatile flag on emits that drops events when the underlying connection is not ready. This is highly useful for high-frequency, non-critical data like game position updates or telemetry, as it prevents buffer buildup on unstable connections.

This feature was requested in #602 and is documented in the [Socket.io volatile events spec](https://www.google.com/search?q=https://socket.io/docs/v4/emitting-events/%23volatile-events). Currently, socketioxide has no equivalent, forcing users to either accept buffer pressure or implement their own messaging layer.


Solution

Added a Volatile variant to BroadcastFlags (0x04) that flows through the existing adapter broadcast pipeline. On the local adapter, when volatile is set, errors from send_many are silently discarded rather than propagated, perfectly matching fire-and-forget semantics.

User-Facing API

Three entry points are provided to mirror the Node.js patterns:

  • Socket::volatile() Returns a VolatileSocket<'a, A> wrapper with a direct emit() that silently drops events when the socket is disconnected, the internal buffer is full, or encoding fails. The wrapper also exposes chain methods (to, within, except, local, broadcast, timeout) that delegate to BroadcastOperators with the volatile flag pre-set.
  • **BroadcastOperators::volatile() and ConfOperators::volatile()** Sets the flag on the operator chain, enabling room-based broadcasting patterns like:
io.to("room").volatile().emit(...);
  • SocketIo::volatile() A convenience alias on the default namespace for global emits:
io.volatile().emit(...);

Adapter Propagation

The flag propagates directly through the BroadcastOptions struct. This ensures remote adapters (Redis, Postgres, MongoDB) receive it and can handle volatile semantics on their own nodes out of the box, without requiring any adapter-specific changes.

@codspeed-hq

codspeed-hq Bot commented Jun 20, 2026

Copy link
Copy Markdown

Merging this PR will not alter performance

✅ 87 untouched benchmarks


Comparing LiamCarPer:feat/volatile-flag (aafc372) with main (b61d522)

Open in CodSpeed

@Totodore

Copy link
Copy Markdown
Owner

Please check this comment regarding the implementation:
#602 (comment)

you are missing a way to bypass the mpsc channel in engineio. Volatile packets will still be buffered, it is not what we want.

I might be wrong but, before Claude-ing something please try to understand the issue details before implementing something. Feel free to ask more details here or on the issue.

@LiamCarPer

Copy link
Copy Markdown
Author

Thanks for the Review. You are right, my current approach still goes through the mpcs channel so volatile packets can buffer.
i am planning to add a separate volatile send path in engineio, a second mpsc channel on Socket with capacity 1, paired with a send_volatile() method that does try _send iton it (drops if full). Then update both transporters to drain it with priority. Then thread the volatile flag BroadcastOptions through send_many so individual socket sends in broadcast flows also use the volatile engine.io path.

Does that sound good? Want to make sure i am in the right track before spending more time on it

@LiamCarPer

Copy link
Copy Markdown
Author

Is this what you were looking for?

@Totodore Totodore added A-socketioxide Area related to socketioxide A-engineioxide Area related to engineioxide C-Feature-request Request for a feature labels Jun 21, 2026
@Totodore

Copy link
Copy Markdown
Owner

Yes! One thing that we are missing with this solution is that volatile packets are out of order with classic packets. It might be ok though. We can simply document this specific case.
It would be nice to check what are the ordering constraints in the js implementation though and explore if there is any solution for this. I'll review this whenever I can.

@LiamCarPer

Copy link
Copy Markdown
Author

Perfect, let me look into it

@LiamCarPer

Copy link
Copy Markdown
Author

I checked js docs and they dont specify ordering constraints for volatile vs regular event.
I have added doc comments on both VolatileSocket and the engine.io emit.volatile() methods.

@Totodore Totodore left a comment

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the good direction!
We need to rethink how the polling payload encoding is done though. You can pick some changes from #555 regarding the use of Stream in the encoder.

// Because the watch channel only retains the most recent value,
// any volatile messages that were overwritten since the last poll
// are automatically discarded.
let volatile_packets: Vec<_> = {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a correct approach, you are yielding the watch packet here. Meaning that if there is any .await point between now and the actual payload encoding and that in the meantime another volatile packet is sent by the user it will still be this one that is sent.

A better approach would be to refactor the mpsc receiver to a Stream and use .chain. With this solution the payload encoding is not complexified.

I made this implementation in the current socketioxide client draft so you can use it as a base: https://github.com/Totodore/socketioxide/pull/555/changes#diff-e4fc706cfac0a21273c6a254bd1a3a2685541b63eca50bbd724afce0de9269a6R128

serde_json.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "time"] }
tokio = { workspace = true, features = ["rt", "time", "macros"] }

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the macros feature?

/// Because volatile messages bypass the main mpsc buffer queue, they may
/// arrive out of order relative to regular messages.
#[inline]
pub fn emit_volatile(&self, msg: impl Into<Str>) -> bool {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls document the return value.

Comment on lines +240 to +245

/// Sets the volatile flag for the emit. When set, the event may be dropped
/// if the client is not ready to receive it (e.g. the connection is buffering or not connected).
/// This is useful for events that are not critical, such as position updates in a game.
///
/// See [socket.io volatile events](https://socket.io/docs/v4/emitting-events/#volatile-events).

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this to external doc to avoid duplication?
Like the other methods.

Comment on lines +281 to +364
/// A wrapper around a [`Socket`] that marks emitted events as volatile.
///
/// Volatile events may be dropped if the client is not ready to receive them
/// (e.g. the underlying connection is buffering or the socket is not connected).
/// This is useful for events that are not critical, such as position updates in a game.
///
/// Because volatile events use a separate channel that bypasses the main
/// mpsc buffer, they may arrive **out of order** relative to regular events
/// emitted around the same time. Only use volatile when ordering relative to
/// regular events is not important.
///
/// See [socket.io volatile events](https://socket.io/docs/v4/emitting-events/#volatile-events).
///
/// # Example
/// ```
/// # use socketioxide::{SocketIo, extract::*};
/// # use serde::Serialize;
/// #[derive(Serialize)]
/// struct GameState { x: f64, y: f64 }
///
/// let (_, io) = SocketIo::new_svc();
/// io.ns("/", async |socket: SocketRef| {
/// // Position updates may be dropped if the connection is slow
/// socket.volatile().emit("position", &GameState { x: 1.0, y: 2.0 });
/// });
/// ```
pub struct VolatileSocket<'a, A: Adapter = LocalAdapter> {
socket: &'a Socket<A>,
}

impl<A: Adapter> VolatileSocket<'_, A> {
/// Emit a volatile event to the client. If the socket is not connected or
/// the internal buffer is full, the event is silently dropped.
pub fn emit<T: ?Sized + Serialize>(&self, event: impl AsRef<str>, data: &T) {
if !self.socket.connected() {
#[cfg(feature = "tracing")]
tracing::debug!(
?self.socket.id,
"dropping volatile event: socket not connected"
);
return;
}

let ns = self.socket.ns.path.clone();
let Ok(data) = self.socket.parser.encode_value(data, Some(event.as_ref())) else {
return;
};

let packet = Packet::event(ns, data);
self.socket
.send_raw_volatile(self.socket.parser.encode(packet));
}

#[doc = include_str!("../docs/operators/to.md")]
pub fn to(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
self.socket.to(rooms).volatile()
}

#[doc = include_str!("../docs/operators/within.md")]
pub fn within(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
self.socket.within(rooms).volatile()
}

#[doc = include_str!("../docs/operators/except.md")]
pub fn except(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
self.socket.except(rooms).volatile()
}

#[doc = include_str!("../docs/operators/local.md")]
pub fn local(self) -> BroadcastOperators<A> {
self.socket.local().volatile()
}

#[doc = include_str!("../docs/operators/broadcast.md")]
pub fn broadcast(self) -> BroadcastOperators<A> {
self.socket.broadcast().volatile()
}

#[doc = include_str!("../docs/operators/timeout.md")]
pub fn timeout(self, timeout: Duration) -> BroadcastOperators<A> {
BroadcastOperators::from(ConfOperators::new(self.socket).timeout(timeout)).volatile()
}
}

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the purpose of all of this?
Why not just having a volatile operator? Like the broadcast operator. Imagine if we would have implemented a new wrapper for each kind of send...

/// ```
pub fn volatile(&self) -> VolatileSocket<'_, A> {
VolatileSocket { socket: self }
}

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just return a ConfOperator like broadcast.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-engineioxide Area related to engineioxide A-socketioxide Area related to socketioxide C-Feature-request Request for a feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants