Listen to events on a channel
Listen for postgres_changes, broadcast, and presence events on a Realtime channel.
Listener registration on a Supabase::Realtime::Channel is split into three families, one per event class:
on_postgres_changes(event, schema:, table:, filter:)— database row events from the Realtime replication slot.on_broadcast(event)— custom messages sent from another client viachannel.send_broadcast(...).on_presence_sync/on_presence_join/on_presence_leave— Phoenix Presence state changes.
Listeners must be attached before subscribe — they're serialized into the join payload so the server knows which postgres_changes bindings to honor and whether to enable Presence. Listeners attached after a channel is already JOINED trigger a resubscribe so the server config catches up.
postgres_changes
Signature
channel.on_postgres_changes(event, schema: nil, table: nil, filter: nil) do |payload|# payload["data"]["type"] => "INSERT" | "UPDATE" | "DELETE"# payload["data"]["record"] => Hash of new row (or empty for DELETE)# payload["data"]["old_record"] => Hash of old row (for UPDATE / DELETE)# payload["data"]["schema"], payload["data"]["table"], ...endParameters
| Name | Type | Required | Description |
|---|---|---|---|
event | String | Required | One of "INSERT", "UPDATE", "DELETE", or "*". Anything else raises ArgumentError immediately. |
schema | String | Optional | Filter to a single schema (e.g. "public"). nil matches every schema. |
table | String | Optional | Filter to a single table. nil matches every table in the chosen schema. |
filter | String | Optional | PostgREST-style row filter (e.g. "status=eq.active") evaluated server-side before forwarding. |
Example — listen for all changes on a table
supabase
.channel("public:countries")
.on_postgres_changes("*", schema: "public", table: "countries") do |payload|
case payload["data"]["type"]
when "INSERT" then puts "added: #{payload['data']['record']}"
when "UPDATE" then puts "updated: #{payload['data']['record']}"
when "DELETE" then puts "deleted: #{payload['data']['old_record']}"
end
end
.subscribeExample — server-side row filter
supabase
.channel("orders:pending")
.on_postgres_changes(
"UPDATE",
schema: "public",
table: "orders",
filter: "status=eq.pending"
) do |payload|
notify_dispatcher(payload["data"]["record"])
end
.subscribebroadcast
Signature
channel.on_broadcast(event) do |payload|# payload["event"] => the event name# payload["payload"] => the body the sender passed to send_broadcastendParameters
| Name | Type | Required | Description |
|---|---|---|---|
event | String | Required | Event name. Only frames whose payload["event"] == event reach this block. Use distinct names per logical channel (e.g. "cursor", "chat:message"). |
Example — broadcast send + receive
The send_broadcast(event, payload) call on the same channel object emits a broadcast frame to every other subscriber of the topic. By default the server does not echo a broadcast back to its sender (config.broadcast.self = false); flip it via params (see channel) if you want a confirmation loop.
chat = supabase.channel("room:42")
chat.on_broadcast("chat:message") do |payload|
msg = payload["payload"]
puts "<#{msg['user']}> #{msg['text']}"
end
chat.subscribe do |state, _|
next unless state == "SUBSCRIBED"
chat.send_broadcast("chat:message", {
user: "ada",
text: "shipping in five"
})
endpresence
Presence listeners get attached via on_presence_sync / on_presence_join / on_presence_leave. The first attachment automatically flips config.presence.enabled = true in the join payload so the server starts streaming presence frames.
Signatures
channel.on_presence_sync { current_state = channel.presence_state }channel.on_presence_join do |key, current_presences, new_presences|# someone with presence_key = key just joinedendchannel.on_presence_leave do |key, remaining_presences, left_presences|# someone with presence_key = key just leftendPresence payload keys are normalized — Phoenix's wire-level metas array and phx_ref field are rewritten to a flat { key => [{ "presence_ref" => ..., ... }] } shape before reaching your block.
Example — track / untrack the local user
channel.track(payload) stores payload under this client's presence key (set via params.config.presence.key — see channel). channel.untrack removes it. Track from inside the subscribe block — calling it before the join handshake completes is buffered, but reading presence_state won't return anything until the first server presence_state frame arrives.
room = supabase.channel("room:42", params: {
"config" => {
"broadcast" => { "ack" => false, "self" => false },
"presence" => { "key" => "user-#{current_user.id}", "enabled" => true },
"private" => false
}
})
room.on_presence_sync do
puts "snapshot: #{room.presence_state}"
end
room.on_presence_join do |key, _current, new_presences|
puts "join: #{key} = #{new_presences.first}"
end
room.on_presence_leave do |key, _remaining, _left|
puts "leave: #{key}"
end
room.subscribe do |state, _|
next unless state == "SUBSCRIBED"
room.track({
online_at: Time.now.utc.iso8601,
typing: false
})
end
# Later, when the user closes the page:
room.untrackpresence_state returns a snapshot Hash — safe to iterate while the read-thread continues applying inbound diffs (US-007 thread safety).
Listener ordering and the join payload
When the channel joins, inject_postgres_changes_bindings serializes every registered on_postgres_changes listener into config.postgres_changes on the join payload — the server filters before forwarding, instead of shipping every row event for the topic. The server echoes back its registered bindings in the phx_reply; if the echo doesn't match the order/shape you registered, the channel raises CHANNEL_ERROR and unsubscribes (otherwise the listener would silently miss events). Adding a postgres_changes listener after subscribe is not supported — re-create the channel.
By contrast, broadcast and presence listeners are demuxed client-side, so they can be added at any time. Adding a presence listener after subscribe triggers an automatic resubscribe so the server starts emitting presence_state / diff frames.
Callbacks run on the realtime read-thread
All three listener families dispatch on the realtime read-thread — the same thread that parses inbound WebSocket frames. A slow callback blocks delivery of every subsequent frame on the channel. Push events to a Queue (or Concurrent::Async-style worker) if your handler does anything I/O-bound. Exceptions raised inside a callback are caught by Supabase::Realtime::CallbackSafety and forwarded to the client's logger: (falling back to Kernel#warn), so the read-loop survives — but your handler still saw a failed delivery.