diff --git a/Examples/Examples.xcodeproj/project.pbxproj b/Examples/Examples.xcodeproj/project.pbxproj index 9eaca694..1290c151 100644 --- a/Examples/Examples.xcodeproj/project.pbxproj +++ b/Examples/Examples.xcodeproj/project.pbxproj @@ -28,6 +28,7 @@ 7956406A2955AFBD0088A06F /* ErrorText.swift in Sources */ = {isa = PBXBuildFile; fileRef = 795640692955AFBD0088A06F /* ErrorText.swift */; }; 7956406D2955B3500088A06F /* SwiftUINavigation in Frameworks */ = {isa = PBXBuildFile; productRef = 7956406C2955B3500088A06F /* SwiftUINavigation */; }; 795640702955B5190088A06F /* IdentifiedCollections in Frameworks */ = {isa = PBXBuildFile; productRef = 7956406F2955B5190088A06F /* IdentifiedCollections */; }; + 795E90A12DE87AA3009F8C11 /* AsyncAlgorithms in Frameworks */ = {isa = PBXBuildFile; productRef = 795E90A02DE87AA3009F8C11 /* AsyncAlgorithms */; }; 796298992AEBBA77000AA957 /* MFAFlow.swift in Sources */ = {isa = PBXBuildFile; fileRef = 796298982AEBBA77000AA957 /* MFAFlow.swift */; }; 7962989D2AEBC6F9000AA957 /* SVGView in Frameworks */ = {isa = PBXBuildFile; productRef = 7962989C2AEBC6F9000AA957 /* SVGView */; }; 79719ECE2ADF26C400737804 /* Supabase in Frameworks */ = {isa = PBXBuildFile; productRef = 79719ECD2ADF26C400737804 /* Supabase */; }; @@ -174,6 +175,7 @@ files = ( 79D884D92B3C18E90009EA4A /* Supabase in Frameworks */, 79B8F4242B5FED7C0000E839 /* IdentifiedCollections in Frameworks */, + 795E90A12DE87AA3009F8C11 /* AsyncAlgorithms in Frameworks */, ); runOnlyForDeploymentPostprocessing = 0; }; @@ -397,6 +399,7 @@ packageProductDependencies = ( 79D884D82B3C18E90009EA4A /* Supabase */, 79B8F4232B5FED7C0000E839 /* IdentifiedCollections */, + 795E90A02DE87AA3009F8C11 /* AsyncAlgorithms */, ); productName = SlackClone; productReference = 79D884C72B3C18830009EA4A /* SlackClone.app */; @@ -458,6 +461,7 @@ 7962989B2AEBC6F9000AA957 /* XCRemoteSwiftPackageReference "SVGView" */, 79E2B5562B97890F0042CD21 /* XCRemoteSwiftPackageReference "GoogleSignIn-iOS" */, 79BE429F2D942EFD00B9DDF4 /* XCRemoteSwiftPackageReference "clerk-ios" */, + 795E909F2DE87AA3009F8C11 /* XCRemoteSwiftPackageReference "swift-async-algorithms" */, ); productRefGroup = 793895C72954ABFF0044F2B8 /* Products */; projectDirPath = ""; @@ -991,6 +995,14 @@ minimumVersion = 1.0.0; }; }; + 795E909F2DE87AA3009F8C11 /* XCRemoteSwiftPackageReference "swift-async-algorithms" */ = { + isa = XCRemoteSwiftPackageReference; + repositoryURL = "https://github.com/apple/swift-async-algorithms.git"; + requirement = { + kind = upToNextMajorVersion; + minimumVersion = 1.0.4; + }; + }; 7962989B2AEBC6F9000AA957 /* XCRemoteSwiftPackageReference "SVGView" */ = { isa = XCRemoteSwiftPackageReference; repositoryURL = "https://github.com/exyte/SVGView"; @@ -1028,6 +1040,11 @@ package = 7956406E2955B5190088A06F /* XCRemoteSwiftPackageReference "swift-identified-collections" */; productName = IdentifiedCollections; }; + 795E90A02DE87AA3009F8C11 /* AsyncAlgorithms */ = { + isa = XCSwiftPackageProductDependency; + package = 795E909F2DE87AA3009F8C11 /* XCRemoteSwiftPackageReference "swift-async-algorithms" */; + productName = AsyncAlgorithms; + }; 7962989C2AEBC6F9000AA957 /* SVGView */ = { isa = XCSwiftPackageProductDependency; package = 7962989B2AEBC6F9000AA957 /* XCRemoteSwiftPackageReference "SVGView" */; diff --git a/Examples/SlackClone/.vscode/extensions.json b/Examples/SlackClone/.vscode/extensions.json new file mode 100644 index 00000000..74baffcc --- /dev/null +++ b/Examples/SlackClone/.vscode/extensions.json @@ -0,0 +1,3 @@ +{ + "recommendations": ["denoland.vscode-deno"] +} diff --git a/Examples/SlackClone/.vscode/settings.json b/Examples/SlackClone/.vscode/settings.json new file mode 100644 index 00000000..af62c23f --- /dev/null +++ b/Examples/SlackClone/.vscode/settings.json @@ -0,0 +1,24 @@ +{ + "deno.enablePaths": [ + "supabase/functions" + ], + "deno.lint": true, + "deno.unstable": [ + "bare-node-builtins", + "byonm", + "sloppy-imports", + "unsafe-proto", + "webgpu", + "broadcast-channel", + "worker-options", + "cron", + "kv", + "ffi", + "fs", + "http", + "net" + ], + "[typescript]": { + "editor.defaultFormatter": "denoland.vscode-deno" + } +} diff --git a/Examples/SlackClone/ChannelStore.swift b/Examples/SlackClone/ChannelStore.swift index e954cf93..69e96589 100644 --- a/Examples/SlackClone/ChannelStore.swift +++ b/Examples/SlackClone/ChannelStore.swift @@ -5,7 +5,9 @@ // Created by Guilherme Souza on 18/01/24. // +import AsyncAlgorithms import Foundation +import OSLog import Supabase @MainActor @@ -22,22 +24,21 @@ final class ChannelStore { Task { channels = await fetchChannels() - let channel = supabase.channel("public:channels") + await supabase.realtimeV2.setAuth() - let insertions = channel.postgresChange(InsertAction.self, table: "channels") - let deletions = channel.postgresChange(DeleteAction.self, table: "channels") + let realtimeChannel = supabase.channel("channel:*") { + $0.isPrivate = true + } - await channel.subscribe() + let insertions = realtimeChannel.broadcastStream(event: "INSERT") + let updates = realtimeChannel.broadcastStream(event: "UPDATE") + let deletions = realtimeChannel.broadcastStream(event: "DELETE") - Task { - for await insertion in insertions { - handleInsertedChannel(insertion) - } - } + await realtimeChannel.subscribe() Task { - for await delete in deletions { - handleDeletedChannel(delete) + for await event in merge(insertions, updates, deletions) { + handleBroadcastEvent(event) } } } @@ -52,7 +53,7 @@ final class ChannelStore { .insert(channel) .execute() } catch { - dump(error) + Logger.main.error("Failed to add channel: \(error.localizedDescription)") toast = .init(status: .error, title: "Error", description: error.localizedDescription) } } @@ -62,7 +63,8 @@ final class ChannelStore { return channel } - let channel: Channel = try await supabase + let channel: Channel = + try await supabase .from("channels") .select() .eq("id", value: id) @@ -72,27 +74,40 @@ final class ChannelStore { return channel } - private func handleInsertedChannel(_ action: InsertAction) { + private func handleBroadcastEvent(_ event: BroadcastEvent) { do { - let channel = try action.decodeRecord(decoder: decoder) as Channel - channels.append(channel) + let change = try event.broadcastChange() + switch change.operation { + case .insert(let channel): + channels.append(try channel.decode(decoder: decoder)) + + case .update(let new, _): + let channel = try new.decode(decoder: decoder) as Channel + if let index = channels.firstIndex(where: { $0.id == channel.id }) { + channels[index] = channel + } else { + Logger.main.warning("Channel with ID \(channel.id) not found for update") + } + + case .delete(let old): + guard let id = old["id"]?.intValue else { + Logger.main.error("Missing channel ID in delete operation") + return + } + channels.removeAll { $0.id == id } + messages.removeMessages(for: id) + } } catch { - dump(error) + Logger.main.error("Failed to handle broadcast event: \(error.localizedDescription)") toast = .init(status: .error, title: "Error", description: error.localizedDescription) } } - private func handleDeletedChannel(_ action: DeleteAction) { - guard let id = action.oldRecord["id"]?.intValue else { return } - channels.removeAll { $0.id == id } - messages.removeMessages(for: id) - } - private func fetchChannels() async -> [Channel] { do { return try await supabase.from("channels").select().execute().value } catch { - dump(error) + Logger.main.error("Failed to fetch channels: \(error.localizedDescription)") toast = .init(status: .error, title: "Error", description: error.localizedDescription) return [] } diff --git a/Examples/SlackClone/supabase/.gitignore b/Examples/SlackClone/supabase/.gitignore index a3ad8805..a735017e 100644 --- a/Examples/SlackClone/supabase/.gitignore +++ b/Examples/SlackClone/supabase/.gitignore @@ -2,3 +2,12 @@ .branches .temp .env + +# Supabase +.branches +.temp + +# dotenvx +.env.keys +.env.local +.env.*.local diff --git a/Examples/SlackClone/supabase/config.toml b/Examples/SlackClone/supabase/config.toml index a4ee105f..44ad0e38 100644 --- a/Examples/SlackClone/supabase/config.toml +++ b/Examples/SlackClone/supabase/config.toml @@ -1,3 +1,5 @@ +# For detailed configuration reference documentation, visit: +# https://supabase.com/docs/guides/local-development/cli/config # A string used to distinguish different Supabase projects on the same host. Defaults to the # working directory name when running `supabase init`. project_id = "SlackClone" @@ -7,14 +9,18 @@ enabled = true # Port to use for the API URL. port = 54321 # Schemas to expose in your API. Tables, views and stored procedures in this schema will get API -# endpoints. public and storage are always included. -schemas = ["public", "storage", "graphql_public"] -# Extra schemas to add to the search_path of every request. public is always included. +# endpoints. `public` and `graphql_public` schemas are included by default. +schemas = ["public", "graphql_public"] +# Extra schemas to add to the search_path of every request. extra_search_path = ["public", "extensions"] # The maximum number of rows returns from a view, table, or stored procedure. Limits payload size # for accidental or malicious requests. max_rows = 1000 +[api.tls] +# Enable HTTPS endpoints locally using a self-signed certificate. +enabled = false + [db] # Port to use for the local database URL. port = 54322 @@ -36,9 +42,24 @@ default_pool_size = 20 # Maximum number of client connections allowed. max_client_conn = 100 +# [db.vault] +# secret_key = "env(SECRET_VALUE)" + +[db.migrations] +# Specifies an ordered list of schema files that describe your database. +# Supports glob patterns relative to supabase directory: "./schemas/*.sql" +schema_paths = [] + +[db.seed] +# If enabled, seeds the database after migrations during a db reset. +enabled = true +# Specifies an ordered list of seed files to load during db reset. +# Supports glob patterns relative to supabase directory: "./seeds/*.sql" +sql_paths = ["./seed.sql"] + [realtime] enabled = true -# Bind realtime via either IPv4 or IPv6. (default: IPv6) +# Bind realtime via either IPv4 or IPv6. (default: IPv4) # ip_version = "IPv6" # The maximum length in bytes of HTTP request headers. (default: 4096) # max_header_length = 4096 @@ -49,6 +70,8 @@ enabled = true port = 54323 # External URL of the API server that frontend connects to. api_url = "http://127.0.0.1" +# OpenAI API Key to use for Supabase AI in the Supabase Studio. +openai_api_key = "env(OPENAI_API_KEY)" # Email testing server. Emails sent with the local dev setup are not actually sent - rather, they # are monitored, and you can view the emails that would have been sent from the web interface. @@ -59,12 +82,25 @@ port = 54324 # Uncomment to expose additional ports for testing user applications that send emails. # smtp_port = 54325 # pop3_port = 54326 +# admin_email = "admin@email.com" +# sender_name = "Admin" [storage] enabled = true # The maximum file size allowed (e.g. "5MB", "500KB"). file_size_limit = "50MiB" +# Image transformation API is available to Supabase Pro plan. +# [storage.image_transformation] +# enabled = true + +# Uncomment to configure local storage buckets +# [storage.buckets.images] +# public = false +# file_size_limit = "50MiB" +# allowed_mime_types = ["image/png", "image/jpeg"] +# objects_path = "./images" + [auth] enabled = true # The base URL of your website. Used as an allow-list for redirects and for constructing URLs used @@ -81,6 +117,35 @@ enable_refresh_token_rotation = true refresh_token_reuse_interval = 10 # Allow/disallow new user signups to your project. enable_signup = true +# Allow/disallow anonymous sign-ins to your project. +enable_anonymous_sign_ins = false +# Allow/disallow testing manual linking of accounts +enable_manual_linking = false +# Passwords shorter than this value will be rejected as weak. Minimum 6, recommended 8 or more. +minimum_password_length = 6 +# Passwords that do not meet the following requirements will be rejected as weak. Supported values +# are: `letters_digits`, `lower_upper_letters_digits`, `lower_upper_letters_digits_symbols` +password_requirements = "" + +[auth.rate_limit] +# Number of emails that can be sent per hour. Requires auth.email.smtp to be enabled. +email_sent = 2 +# Number of SMS messages that can be sent per hour. Requires auth.sms to be enabled. +sms_sent = 30 +# Number of anonymous sign-ins that can be made per hour per IP address. Requires enable_anonymous_sign_ins = true. +anonymous_users = 30 +# Number of sessions that can be refreshed in a 5 minute interval per IP address. +token_refresh = 150 +# Number of sign up and sign-in requests that can be made in a 5 minute interval per IP address (excludes anonymous users). +sign_in_sign_ups = 30 +# Number of OTP / Magic link verifications that can be made in a 5 minute interval per IP address. +token_verifications = 30 + +# Configure one of the supported captcha providers: `hcaptcha`, `turnstile`. +# [auth.captcha] +# enabled = true +# provider = "hcaptcha" +# secret = "" [auth.email] # Allow/disallow new user signups via email to your project. @@ -90,6 +155,24 @@ enable_signup = true double_confirm_changes = true # If enabled, users need to confirm their email address before signing in. enable_confirmations = false +# If enabled, users will need to reauthenticate or have logged in recently to change their password. +secure_password_change = false +# Controls the minimum amount of time that must pass before sending another signup confirmation or password reset email. +max_frequency = "1s" +# Number of characters used in the email OTP. +otp_length = 6 +# Number of seconds before the email OTP expires (defaults to 1 hour). +otp_expiry = 3600 + +# Use a production-ready SMTP server +# [auth.email.smtp] +# enabled = true +# host = "smtp.sendgrid.net" +# port = 587 +# user = "apikey" +# pass = "env(SENDGRID_API_KEY)" +# admin_email = "admin@email.com" +# sender_name = "Admin" # Uncomment to customize email template # [auth.email.template.invite] @@ -98,16 +181,30 @@ enable_confirmations = false [auth.sms] # Allow/disallow new user signups via SMS to your project. -enable_signup = true +enable_signup = false # If enabled, users need to confirm their phone number before signing in. enable_confirmations = false # Template for sending OTP to users -template = "Your code is {{ .Code }} ." +template = "Your code is {{ .Code }}" +# Controls the minimum amount of time that must pass before sending another sms otp. +max_frequency = "5s" # Use pre-defined map of phone number to OTP for testing. -[auth.sms.test_otp] +# [auth.sms.test_otp] # 4152127777 = "123456" +# Configure logged in session timeouts. +# [auth.sessions] +# Force log out after the specified duration. +# timebox = "24h" +# Force log out if the user has been inactive longer than the specified duration. +# inactivity_timeout = "8h" + +# This hook runs before a token is issued and allows you to add additional claims based on the authentication method used. +# [auth.hook.custom_access_token] +# enabled = true +# uri = "pg-functions:////" + # Configure one of the supported SMS providers: `twilio`, `twilio_verify`, `messagebird`, `textlocal`, `vonage`. [auth.sms.twilio] enabled = false @@ -116,8 +213,31 @@ message_service_sid = "" # DO NOT commit your Twilio auth token to git. Use environment variable substitution instead: auth_token = "env(SUPABASE_AUTH_SMS_TWILIO_AUTH_TOKEN)" +# Multi-factor-authentication is available to Supabase Pro plan. +[auth.mfa] +# Control how many MFA factors can be enrolled at once per user. +max_enrolled_factors = 10 + +# Control MFA via App Authenticator (TOTP) +[auth.mfa.totp] +enroll_enabled = false +verify_enabled = false + +# Configure MFA via Phone Messaging +[auth.mfa.phone] +enroll_enabled = false +verify_enabled = false +otp_length = 6 +template = "Your code is {{ .Code }}" +max_frequency = "5s" + +# Configure MFA via WebAuthn +# [auth.mfa.web_authn] +# enroll_enabled = true +# verify_enabled = true + # Use an external OAuth provider. The full list of providers are: `apple`, `azure`, `bitbucket`, -# `discord`, `facebook`, `github`, `gitlab`, `google`, `keycloak`, `linkedin`, `notion`, `twitch`, +# `discord`, `facebook`, `github`, `gitlab`, `google`, `keycloak`, `linkedin_oidc`, `notion`, `twitch`, # `twitter`, `slack`, `spotify`, `workos`, `zoom`. [auth.external.apple] enabled = false @@ -129,11 +249,48 @@ redirect_uri = "" # Overrides the default auth provider URL. Used to support self-hosted gitlab, single-tenant Azure, # or any other third-party OIDC providers. url = "" +# If enabled, the nonce check will be skipped. Required for local sign in with Google auth. +skip_nonce_check = false -[analytics] +# Use Firebase Auth as a third-party provider alongside Supabase Auth. +[auth.third_party.firebase] +enabled = false +# project_id = "my-firebase-project" + +# Use Auth0 as a third-party provider alongside Supabase Auth. +[auth.third_party.auth0] +enabled = false +# tenant = "my-auth0-tenant" +# tenant_region = "us" + +# Use AWS Cognito (Amplify) as a third-party provider alongside Supabase Auth. +[auth.third_party.aws_cognito] enabled = false +# user_pool_id = "my-user-pool-id" +# user_pool_region = "us-east-1" + +# Use Clerk as a third-party provider alongside Supabase Auth. +[auth.third_party.clerk] +enabled = false +# Obtain from https://clerk.com/setup/supabase +# domain = "example.clerk.accounts.dev" + +[edge_runtime] +enabled = true +# Configure one of the supported request policies: `oneshot`, `per_worker`. +# Use `oneshot` for hot reload, or `per_worker` for load testing. +policy = "oneshot" +# Port to attach the Chrome inspector for debugging edge functions. +inspector_port = 8083 +# The Deno major version to use. +deno_version = 1 + +# [edge_runtime.secrets] +# secret_key = "env(SECRET_VALUE)" + +[analytics] +enabled = true port = 54327 -vector_port = 54328 # Configure one of the supported backends: `postgres`, `bigquery`. backend = "postgres" diff --git a/Examples/SlackClone/supabase/migrations/20240121113535_init.sql b/Examples/SlackClone/supabase/migrations/20240121113535_init.sql index d23091ce..5874df46 100644 --- a/Examples/SlackClone/supabase/migrations/20240121113535_init.sql +++ b/Examples/SlackClone/supabase/migrations/20240121113535_init.sql @@ -160,4 +160,50 @@ insert into public.role_permissions (role, permission) values ('admin', 'channels.delete'), ('admin', 'messages.delete'), - ('moderator', 'messages.delete'); \ No newline at end of file + ('moderator', 'messages.delete'); + +-- allow authenticated users to receive broadcasts +create policy "Authenticated users can receive broadcasts" +on "realtime"."messages" +for select +to authenticated +using ( true ); + +-- trigger function for channel changes +create or replace function public.channel_changes() +returns trigger +security definer +language plpgsql +as $$ +begin + -- broadcast to the specific channel + perform realtime.broadcast_changes( + 'channel:' || coalesce(NEW.id, OLD.id) ::text, -- topic - the topic to which we're broadcasting + TG_OP, -- event - the event that triggered the function + TG_OP, -- operation - the operation that triggered the function + TG_TABLE_NAME, -- table - the table that caused the trigger + TG_TABLE_SCHEMA, -- schema - the schema of the table that caused the trigger + NEW, -- new record - the record after the change + OLD -- old record - the record before the change + ); + + -- broadcast to all channels + perform realtime.broadcast_changes( + 'channel:*', + TG_OP, + TG_OP, + TG_TABLE_NAME, + TG_TABLE_SCHEMA, + NEW, + OLD + ); + + return null; +end; +$$; + +create trigger handle_channel_changes +after insert or update or delete +on public.channels +for each row +execute function channel_changes (); \ No newline at end of file diff --git a/Sources/Helpers/AnyJSON/AnyJSON.swift b/Sources/Helpers/AnyJSON/AnyJSON.swift index afe6ae73..9b6bbf5a 100644 --- a/Sources/Helpers/AnyJSON/AnyJSON.swift +++ b/Sources/Helpers/AnyJSON/AnyJSON.swift @@ -3,6 +3,27 @@ import Foundation public typealias JSONObject = [String: AnyJSON] public typealias JSONArray = [AnyJSON] +/// A type for decoding arbitraty JSON keys. +package struct AnyCodingKey: CodingKey { + package var stringValue: String + package var intValue: Int? + + package init(stringValue: String) { + self.stringValue = stringValue + self.intValue = nil + } + + package init(intValue: Int) { + self.stringValue = "\(intValue)" + self.intValue = intValue + } +} +extension AnyCodingKey: ExpressibleByStringLiteral { + package init(stringLiteral value: String) { + self.init(stringValue: value) + } +} + /// An enumeration that represents JSON-compatible values of various types. public enum AnyJSON: Sendable, Codable, Hashable { /// Represents a `null` JSON value. diff --git a/Sources/Realtime/CallbackManager.swift b/Sources/Realtime/CallbackManager.swift index 3d92e184..4eb0c102 100644 --- a/Sources/Realtime/CallbackManager.swift +++ b/Sources/Realtime/CallbackManager.swift @@ -26,7 +26,7 @@ final class CallbackManager: Sendable { @discardableResult func addBroadcastCallback( event: String, - callback: @escaping @Sendable (JSONObject) -> Void + callback: @escaping @Sendable (BroadcastEvent) -> Void ) -> Int { mutableState.withValue { $0.id += 1 @@ -118,7 +118,7 @@ final class CallbackManager: Sendable { } } - func triggerBroadcast(event: String, json: JSONObject) { + func triggerBroadcast(event: String, json: BroadcastEvent) { let broadcastCallbacks = mutableState.callbacks.compactMap { if case let .broadcast(callback) = $0 { return callback @@ -178,7 +178,7 @@ struct PostgresCallback { struct BroadcastCallback { var id: Int var event: String - var callback: @Sendable (JSONObject) -> Void + var callback: @Sendable (BroadcastEvent) -> Void } struct PresenceCallback { diff --git a/Sources/Realtime/Deprecated/Deprecated.swift b/Sources/Realtime/Deprecated/Deprecated.swift index 27d32f91..4734c298 100644 --- a/Sources/Realtime/Deprecated/Deprecated.swift +++ b/Sources/Realtime/Deprecated/Deprecated.swift @@ -79,3 +79,26 @@ extension RealtimeChannelV2 { @available(*, deprecated, renamed: "RealtimeChannelStatus") public typealias Status = RealtimeChannelStatus } + +extension RealtimeChannelV2 { + @_disfavoredOverload + @available(*, deprecated, message: "Use `onBroadcast(event:callback:)` with `BroadcastEvent` instead.") + public func onBroadcast( + event: String, + callback: @escaping @Sendable (JSONObject) -> Void + ) -> RealtimeSubscription { + self.onBroadcast(event: event) { (payload: BroadcastEvent) in + callback(try! JSONObject(payload)) + } + } + + @_disfavoredOverload + @available(*, deprecated, message: "Use `broadcastStream(event:)` with `BroadcastEvent` instead.") + public func broadcastStream(event: String) -> AsyncStream { + let stream = self.broadcastStream(event: event).map { (payload: BroadcastEvent) in + try! JSONObject(payload) + } + + return AsyncStream(stream) + } +} diff --git a/Sources/Realtime/RealtimeChannel+AsyncAwait.swift b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift index 8a12a4d9..8b4359f2 100644 --- a/Sources/Realtime/RealtimeChannel+AsyncAwait.swift +++ b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift @@ -155,8 +155,8 @@ extension RealtimeChannelV2 { } /// Listen for broadcast messages sent by other clients within the same channel under a specific `event`. - public func broadcastStream(event: String) -> AsyncStream { - let (stream, continuation) = AsyncStream.makeStream() + public func broadcastStream(event: String) -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream() let subscription = onBroadcast(event: event) { continuation.yield($0) @@ -192,7 +192,7 @@ extension RealtimeChannelV2 { } // Helper to work around type ambiguity in macOS 13 -fileprivate extension AsyncStream { +extension AsyncStream { func compactErase() -> AsyncStream { AsyncStream(compactMap { $0.wrappedAction as? T } as AsyncCompactMapSequence) } diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index 384d84af..503b298b 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -361,13 +361,8 @@ public final class RealtimeChannelV2: Sendable { callbackManager.triggerPostgresChanges(ids: ids, data: action) case .broadcast: - let payload = message.payload - - guard let event = payload["event"]?.stringValue else { - throw RealtimeError("Expected 'event' key in 'payload' for broadcast event.") - } - - callbackManager.triggerBroadcast(event: event, json: payload) + let payload = try message.payload.decode(as: BroadcastEvent.self) + callbackManager.triggerBroadcast(event: payload.event, json: payload) case .close: socket._remove(self) @@ -514,7 +509,7 @@ public final class RealtimeChannelV2: Sendable { /// Listen for broadcast messages sent by other clients within the same channel under a specific `event`. public func onBroadcast( event: String, - callback: @escaping @Sendable (JSONObject) -> Void + callback: @escaping @Sendable (BroadcastEvent) -> Void ) -> RealtimeSubscription { let id = callbackManager.addBroadcastCallback(event: event, callback: callback) return RealtimeSubscription { [weak callbackManager, logger] in diff --git a/Sources/Realtime/Types.swift b/Sources/Realtime/Types.swift index 5f45a248..21af8437 100644 --- a/Sources/Realtime/Types.swift +++ b/Sources/Realtime/Types.swift @@ -107,3 +107,110 @@ extension HTTPField.Name { public enum LogLevel: String, Sendable { case info, warn, error } + +/// A broadcast event from the server. +/// +/// If broadcast event was triggered using [`realtime.broadcast_changes`](https://supabase.com/docs/guides/realtime/subscribing-to-database-changes#using-broadcast), +/// use ``BroadcastEvent/broadcastChange(of:)`` to decode the payload into a specific type. +public struct BroadcastEvent: Codable, Hashable, Sendable { + /// The type of the event, e.g. `broadcast`. + public let type: String + /// The event that triggered the broadcast. + public let event: String + /// The payload of the event. + public let payload: JSONObject + + /// Decodes the payload into a specific type. + /// + /// If broadcast event was triggered using [`realtime.broadcast_changes`](https://supabase.com/docs/guides/realtime/subscribing-to-database-changes#using-broadcast), + /// use this method to decode the payload into a specific type. + public func broadcastChange() throws -> BroadcastChange { + try payload.decode() + } +} + +/// A postgres change event sent through broadcast. +/// +/// More info in [Subscribing to Database Changes](https://supabase.com/docs/guides/realtime/subscribing-to-database-changes) +public struct BroadcastChange: Codable, Sendable { + /// The schema of the table that was changed. + public var schema: String + /// The table that was changed. + public var table: String + /// The operation that was performed on the table. + public var operation: Operation + + /// The operation that was performed on the table. + public enum Operation: Codable, Sendable { + /// A new record was inserted. + case insert(new: JSONObject) + /// A record was updated. + case update(new: JSONObject, old: JSONObject) + /// A record was deleted. + case delete(old: JSONObject) + } +} + +extension BroadcastChange { + public init(from decoder: any Decoder) throws { + let container = try decoder.container(keyedBy: AnyCodingKey.self) + + self.schema = try container.decode(String.self, forKey: "schema") + self.table = try container.decode(String.self, forKey: "table") + self.operation = try BroadcastChange.Operation(from: decoder) + } + + public func encode(to encoder: any Encoder) throws { + var container = encoder.container(keyedBy: AnyCodingKey.self) + + try container.encode(schema, forKey: "schema") + try container.encode(table, forKey: "table") + try operation.encode(to: encoder) + } +} + +extension BroadcastChange.Operation { + public init(from decoder: any Decoder) throws { + let container = try decoder.container(keyedBy: AnyCodingKey.self) + + let operation = try container.decode(String.self, forKey: "operation") + switch operation { + case "INSERT": + let new = try container.decode(JSONObject.self, forKey: "record") + self = .insert(new: new) + case "UPDATE": + let new = try container.decode(JSONObject.self, forKey: "record") + let old = try container.decode(JSONObject.self, forKey: "old_record") + self = .update(new: new, old: old) + case "DELETE": + let old = try container.decode(JSONObject.self, forKey: "old_record") + self = .delete(old: old) + default: + throw DecodingError.dataCorruptedError( + forKey: "operation", + in: container, + debugDescription: "Unknown operation type: \(operation)" + ) + } + } + + public func encode(to encoder: any Encoder) throws { + var container = encoder.container(keyedBy: AnyCodingKey.self) + + switch self { + case .insert(let new): + try container.encode("INSERT", forKey: "operation") + try container.encode(new, forKey: "record") + try container.encodeNil(forKey: "old_record") + case .update(let new, let old): + try container.encode("UPDATE", forKey: "operation") + try container.encode(new, forKey: "record") + try container.encode(old, forKey: "old_record") + case .delete(let old): + try container.encode("DELETE", forKey: "operation") + try container.encode(old, forKey: "old_record") + try container.encodeNil(forKey: "record") + } + + } +} diff --git a/Supabase.xcworkspace/xcshareddata/swiftpm/Package.resolved b/Supabase.xcworkspace/xcshareddata/swiftpm/Package.resolved index d0804396..eb978204 100644 --- a/Supabase.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/Supabase.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -99,6 +99,15 @@ "version" : "1.3.1" } }, + { + "identity" : "swift-async-algorithms", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-async-algorithms.git", + "state" : { + "revision" : "042e1c4d9d19748c9c228f8d4ebc97bb1e339b0b", + "version" : "1.0.4" + } + }, { "identity" : "swift-case-paths", "kind" : "remoteSourceControl", diff --git a/Tests/RealtimeTests/BroadcastEventTests.swift b/Tests/RealtimeTests/BroadcastEventTests.swift new file mode 100644 index 00000000..2497fce1 --- /dev/null +++ b/Tests/RealtimeTests/BroadcastEventTests.swift @@ -0,0 +1,244 @@ +import InlineSnapshotTesting +import SnapshotTestingCustomDump +import XCTest + +@testable import Realtime + +final class BroadcastEventTests: XCTestCase { + func testBroadcastChangeDecoding() throws { + // Test INSERT operation + let insertJSON = Data( + """ + { + "schema": "public", + "table": "users", + "operation": "INSERT", + "record": { + "id": 1, + "name": "John Doe", + "email": "john@example.com" + }, + "old_record": null + } + """.utf8 + ) + + let insertChange = try JSONDecoder().decode(BroadcastChange.self, from: insertJSON) + + assertInlineSnapshot(of: insertChange, as: .customDump) { + """ + BroadcastChange( + schema: "public", + table: "users", + operation: .insert( + new: [ + "email": .string("john@example.com"), + "id": .integer(1), + "name": .string("John Doe") + ] + ) + ) + """ + } + + // Test UPDATE operation + let updateJSON = Data( + """ + { + "schema": "public", + "table": "users", + "operation": "UPDATE", + "record": { + "id": 1, + "name": "John Updated", + "email": "john@example.com" + }, + "old_record": { + "id": 1, + "name": "John Doe", + "email": "john@example.com" + } + } + """.utf8 + ) + + let updateChange = try JSONDecoder().decode(BroadcastChange.self, from: updateJSON) + assertInlineSnapshot(of: updateChange, as: .customDump) { + """ + BroadcastChange( + schema: "public", + table: "users", + operation: .update( + new: [ + "email": .string("john@example.com"), + "id": .integer(1), + "name": .string("John Updated") + ], + old: [ + "email": .string("john@example.com"), + "id": .integer(1), + "name": .string("John Doe") + ] + ) + ) + """ + } + + // Test DELETE operation + let deleteJSON = Data( + """ + { + "schema": "public", + "table": "users", + "operation": "DELETE", + "record": null, + "old_record": { + "id": 1, + "name": "John Doe", + "email": "john@example.com" + } + } + """.utf8 + ) + + let deleteChange = try JSONDecoder().decode(BroadcastChange.self, from: deleteJSON) + + assertInlineSnapshot(of: deleteChange, as: .customDump) { + """ + BroadcastChange( + schema: "public", + table: "users", + operation: .delete( + old: [ + "email": .string("john@example.com"), + "id": .integer(1), + "name": .string("John Doe") + ] + ) + ) + """ + } + } + + func testBroadcastChangeEncoding() throws { + // Test INSERT operation encoding + let insertChange = BroadcastChange( + schema: "public", + table: "users", + operation: .insert(new: [ + "id": 1, + "name": "John Doe", + "email": "john@example.com", + ]) + ) + + let insertJSON = try JSONObject(insertChange) + assertInlineSnapshot(of: insertJSON, as: .json) { + """ + { + "old_record" : null, + "operation" : "INSERT", + "record" : { + "email" : "john@example.com", + "id" : 1, + "name" : "John Doe" + }, + "schema" : "public", + "table" : "users" + } + """ + } + + // Test UPDATE operation encoding + let updateChange = BroadcastChange( + schema: "public", + table: "users", + operation: .update( + new: ["id": 1, "name": "John Updated"], + old: ["id": 1, "name": "John Doe"] + ) + ) + + let updateJSON = try JSONObject(updateChange) + assertInlineSnapshot(of: updateJSON, as: .json) { + """ + { + "old_record" : { + "id" : 1, + "name" : "John Doe" + }, + "operation" : "UPDATE", + "record" : { + "id" : 1, + "name" : "John Updated" + }, + "schema" : "public", + "table" : "users" + } + """ + } + + // Test DELETE operation encoding + let deleteChange = BroadcastChange( + schema: "public", + table: "users", + operation: .delete(old: [ + "id": 1, + "name": "John Doe", + ]) + ) + + let deleteJSON = try JSONObject(deleteChange) + assertInlineSnapshot(of: deleteJSON, as: .json) { + """ + { + "old_record" : { + "id" : 1, + "name" : "John Doe" + }, + "operation" : "DELETE", + "record" : null, + "schema" : "public", + "table" : "users" + } + """ + } + } + + func testBroadcastEvent() throws { + let eventJSON = Data( + """ + { + "type": "broadcast", + "event": "test_event", + "payload": { + "schema": "public", + "table": "users", + "operation": "INSERT", + "record": { + "id": 1, + "name": "John Doe" + }, + "old_record": null + } + } + """.utf8 + ) + + let event = try JSONDecoder().decode(BroadcastEvent.self, from: eventJSON) + + XCTAssertEqual(event.type, "broadcast") + XCTAssertEqual(event.event, "test_event") + + let change = try event.broadcastChange() + XCTAssertEqual(change.schema, "public") + XCTAssertEqual(change.table, "users") + + if case .insert(let new) = change.operation { + XCTAssertEqual(new["id"]?.intValue, 1) + XCTAssertEqual(new["name"]?.stringValue, "John Doe") + } else { + XCTFail("Expected INSERT operation") + } + } +} diff --git a/Tests/RealtimeTests/CallbackManagerTests.swift b/Tests/RealtimeTests/CallbackManagerTests.swift index 779993b2..bc1bdf2c 100644 --- a/Tests/RealtimeTests/CallbackManagerTests.swift +++ b/Tests/RealtimeTests/CallbackManagerTests.swift @@ -178,24 +178,20 @@ final class CallbackManagerTests: XCTestCase { XCTAssertNoLeak(callbackManager) let event = "new_user" - let message = RealtimeMessageV2( - joinRef: nil, - ref: nil, - topic: "realtime:users", + let message = BroadcastEvent( + type: "broadcast", event: event, payload: ["email": "mail@example.com"] ) - let jsonObject = try JSONObject(message) - - let receivedMessage = LockIsolated(nil) + let receivedMessage = LockIsolated(nil) callbackManager.addBroadcastCallback(event: event) { receivedMessage.setValue($0) } - callbackManager.triggerBroadcast(event: event, json: jsonObject) + callbackManager.triggerBroadcast(event: event, json: message) - XCTAssertEqual(receivedMessage.value, jsonObject) + XCTAssertEqual(receivedMessage.value, message) } func testTriggerPresenceDiffs() {