feat: implement support for volatile events with new VolatileSocket w…#744
feat: implement support for volatile events with new VolatileSocket w…#744LiamCarPer wants to merge 3 commits into
Conversation
…rapper and operator flags
|
Please check this comment regarding the implementation: you are missing a way to bypass the mpsc channel in engineio. Volatile packets will still be buffered, it is not what we want. I might be wrong but, before Claude-ing something please try to understand the issue details before implementing something. Feel free to ask more details here or on the issue. |
|
Thanks for the Review. You are right, my current approach still goes through the mpcs channel so volatile packets can buffer. Does that sound good? Want to make sure i am in the right track before spending more time on it |
|
Is this what you were looking for? |
|
Yes! One thing that we are missing with this solution is that volatile packets are out of order with classic packets. It might be ok though. We can simply document this specific case. |
|
Perfect, let me look into it |
|
I checked js docs and they dont specify ordering constraints for volatile vs regular event. |
| // Because the watch channel only retains the most recent value, | ||
| // any volatile messages that were overwritten since the last poll | ||
| // are automatically discarded. | ||
| let volatile_packets: Vec<_> = { |
There was a problem hiding this comment.
I don't think this is a correct approach, you are yielding the watch packet here. Meaning that if there is any .await point between now and the actual payload encoding and that in the meantime another volatile packet is sent by the user it will still be this one that is sent.
A better approach would be to refactor the mpsc receiver to a Stream and use .chain. With this solution the payload encoding is not complexified.
I made this implementation in the current socketioxide client draft so you can use it as a base: https://github.com/Totodore/socketioxide/pull/555/changes#diff-e4fc706cfac0a21273c6a254bd1a3a2685541b63eca50bbd724afce0de9269a6R128
| serde_json.workspace = true | ||
| thiserror.workspace = true | ||
| tokio = { workspace = true, features = ["rt", "time"] } | ||
| tokio = { workspace = true, features = ["rt", "time", "macros"] } |
| /// Because volatile messages bypass the main mpsc buffer queue, they may | ||
| /// arrive out of order relative to regular messages. | ||
| #[inline] | ||
| pub fn emit_volatile(&self, msg: impl Into<Str>) -> bool { |
|
|
||
| /// Sets the volatile flag for the emit. When set, the event may be dropped | ||
| /// if the client is not ready to receive it (e.g. the connection is buffering or not connected). | ||
| /// This is useful for events that are not critical, such as position updates in a game. | ||
| /// | ||
| /// See [socket.io volatile events](https://socket.io/docs/v4/emitting-events/#volatile-events). |
There was a problem hiding this comment.
Could we move this to external doc to avoid duplication?
Like the other methods.
| /// A wrapper around a [`Socket`] that marks emitted events as volatile. | ||
| /// | ||
| /// Volatile events may be dropped if the client is not ready to receive them | ||
| /// (e.g. the underlying connection is buffering or the socket is not connected). | ||
| /// This is useful for events that are not critical, such as position updates in a game. | ||
| /// | ||
| /// Because volatile events use a separate channel that bypasses the main | ||
| /// mpsc buffer, they may arrive **out of order** relative to regular events | ||
| /// emitted around the same time. Only use volatile when ordering relative to | ||
| /// regular events is not important. | ||
| /// | ||
| /// See [socket.io volatile events](https://socket.io/docs/v4/emitting-events/#volatile-events). | ||
| /// | ||
| /// # Example | ||
| /// ``` | ||
| /// # use socketioxide::{SocketIo, extract::*}; | ||
| /// # use serde::Serialize; | ||
| /// #[derive(Serialize)] | ||
| /// struct GameState { x: f64, y: f64 } | ||
| /// | ||
| /// let (_, io) = SocketIo::new_svc(); | ||
| /// io.ns("/", async |socket: SocketRef| { | ||
| /// // Position updates may be dropped if the connection is slow | ||
| /// socket.volatile().emit("position", &GameState { x: 1.0, y: 2.0 }); | ||
| /// }); | ||
| /// ``` | ||
| pub struct VolatileSocket<'a, A: Adapter = LocalAdapter> { | ||
| socket: &'a Socket<A>, | ||
| } | ||
|
|
||
| impl<A: Adapter> VolatileSocket<'_, A> { | ||
| /// Emit a volatile event to the client. If the socket is not connected or | ||
| /// the internal buffer is full, the event is silently dropped. | ||
| pub fn emit<T: ?Sized + Serialize>(&self, event: impl AsRef<str>, data: &T) { | ||
| if !self.socket.connected() { | ||
| #[cfg(feature = "tracing")] | ||
| tracing::debug!( | ||
| ?self.socket.id, | ||
| "dropping volatile event: socket not connected" | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
| let ns = self.socket.ns.path.clone(); | ||
| let Ok(data) = self.socket.parser.encode_value(data, Some(event.as_ref())) else { | ||
| return; | ||
| }; | ||
|
|
||
| let packet = Packet::event(ns, data); | ||
| self.socket | ||
| .send_raw_volatile(self.socket.parser.encode(packet)); | ||
| } | ||
|
|
||
| #[doc = include_str!("../docs/operators/to.md")] | ||
| pub fn to(self, rooms: impl RoomParam) -> BroadcastOperators<A> { | ||
| self.socket.to(rooms).volatile() | ||
| } | ||
|
|
||
| #[doc = include_str!("../docs/operators/within.md")] | ||
| pub fn within(self, rooms: impl RoomParam) -> BroadcastOperators<A> { | ||
| self.socket.within(rooms).volatile() | ||
| } | ||
|
|
||
| #[doc = include_str!("../docs/operators/except.md")] | ||
| pub fn except(self, rooms: impl RoomParam) -> BroadcastOperators<A> { | ||
| self.socket.except(rooms).volatile() | ||
| } | ||
|
|
||
| #[doc = include_str!("../docs/operators/local.md")] | ||
| pub fn local(self) -> BroadcastOperators<A> { | ||
| self.socket.local().volatile() | ||
| } | ||
|
|
||
| #[doc = include_str!("../docs/operators/broadcast.md")] | ||
| pub fn broadcast(self) -> BroadcastOperators<A> { | ||
| self.socket.broadcast().volatile() | ||
| } | ||
|
|
||
| #[doc = include_str!("../docs/operators/timeout.md")] | ||
| pub fn timeout(self, timeout: Duration) -> BroadcastOperators<A> { | ||
| BroadcastOperators::from(ConfOperators::new(self.socket).timeout(timeout)).volatile() | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Whats the purpose of all of this?
Why not just having a volatile operator? Like the broadcast operator. Imagine if we would have implemented a new wrapper for each kind of send...
| /// ``` | ||
| pub fn volatile(&self) -> VolatileSocket<'_, A> { | ||
| VolatileSocket { socket: self } | ||
| } |
There was a problem hiding this comment.
Just return a ConfOperator like broadcast.
Motivation
Socket.io's Node.js implementation supports a
volatileflag on emits that drops events when the underlying connection is not ready. This is highly useful for high-frequency, non-critical data like game position updates or telemetry, as it prevents buffer buildup on unstable connections.This feature was requested in #602 and is documented in the [Socket.io volatile events spec](https://www.google.com/search?q=https://socket.io/docs/v4/emitting-events/%23volatile-events). Currently,
socketioxidehas no equivalent, forcing users to either accept buffer pressure or implement their own messaging layer.Solution
Added a
Volatilevariant toBroadcastFlags(0x04) that flows through the existing adapter broadcast pipeline. On the local adapter, whenvolatileis set, errors fromsend_manyare silently discarded rather than propagated, perfectly matching fire-and-forget semantics.User-Facing API
Three entry points are provided to mirror the Node.js patterns:
Socket::volatile()Returns aVolatileSocket<'a, A>wrapper with a directemit()that silently drops events when the socket is disconnected, the internal buffer is full, or encoding fails. The wrapper also exposes chain methods (to,within,except,local,broadcast,timeout) that delegate toBroadcastOperatorswith the volatile flag pre-set.BroadcastOperators::volatile()andConfOperators::volatile()**Sets the flag on the operator chain, enabling room-based broadcasting patterns like:SocketIo::volatile()A convenience alias on the default namespace for global emits:Adapter Propagation
The flag propagates directly through the
BroadcastOptionsstruct. This ensures remote adapters (Redis, Postgres, MongoDB) receive it and can handle volatile semantics on their own nodes out of the box, without requiring any adapter-specific changes.