Skip to content
Draft
6 changes: 4 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ let package = Package(
targets: ["damus"]),
],
dependencies: [
.package(url: "https://github.com/jb55/secp256k1.swift.git", branch: "main")
.package(url: "https://github.com/jb55/secp256k1.swift.git", branch: "main"),
.package(url: "https://github.com/damus-io/negentropy-swift", from: "0.1.0")
],
targets: [
.target(
name: "damus",
dependencies: [
.product(name: "secp256k1", package: "secp256k1.swift")
.product(name: "secp256k1", package: "secp256k1.swift"),
.product(name: "Negentropy", package: "negentropy-swift")
],
path: "damus"),
.testTarget(
Expand Down
41 changes: 41 additions & 0 deletions damus.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -1833,6 +1833,10 @@
D7DB1FE82D5A9F5300CF06DA /* CryptoSwift in Frameworks */ = {isa = PBXBuildFile; productRef = D7DB1FE72D5A9F5300CF06DA /* CryptoSwift */; };
D7DB1FEA2D5A9F5A00CF06DA /* CryptoSwift in Frameworks */ = {isa = PBXBuildFile; productRef = D7DB1FE92D5A9F5A00CF06DA /* CryptoSwift */; };
D7DB1FEC2D5A9F6500CF06DA /* CryptoSwift in Frameworks */ = {isa = PBXBuildFile; productRef = D7DB1FEB2D5A9F6500CF06DA /* CryptoSwift */; };
D7E5B2DB2EA080BE00CF47AC /* Negentropy in Frameworks */ = {isa = PBXBuildFile; productRef = D7E5B2DA2EA080BE00CF47AC /* Negentropy */; };
D7E5B2DF2EA0A68600CF47AC /* Negentropy in Frameworks */ = {isa = PBXBuildFile; productRef = D7E5B2DE2EA0A68600CF47AC /* Negentropy */; };
D7E5B2E12EA0A69200CF47AC /* Negentropy in Frameworks */ = {isa = PBXBuildFile; productRef = D7E5B2E02EA0A69200CF47AC /* Negentropy */; };
D7E5B2E32EA0A69900CF47AC /* Negentropy in Frameworks */ = {isa = PBXBuildFile; productRef = D7E5B2E22EA0A69900CF47AC /* Negentropy */; };
D7DB1FEE2D5AC51B00CF06DA /* NIP44v2EncryptionTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D7DB1FED2D5AC50F00CF06DA /* NIP44v2EncryptionTests.swift */; };
D7DB1FF12D5AC5D700CF06DA /* nip44.vectors.json in Resources */ = {isa = PBXBuildFile; fileRef = D7DB1FF02D5AC5D700CF06DA /* nip44.vectors.json */; };
D7DB1FF32D5AC5EA00CF06DA /* LICENSES in Resources */ = {isa = PBXBuildFile; fileRef = D7DB1FF22D5AC5E400CF06DA /* LICENSES */; };
Expand Down Expand Up @@ -2876,6 +2880,7 @@
D78DB8592C1CE9CA00F0AB12 /* SwipeActions in Frameworks */,
4C649881286E0EE300EAE2B3 /* secp256k1 in Frameworks */,
4C27C9322A64766F007DBC75 /* MarkdownUI in Frameworks */,
D7E5B2DB2EA080BE00CF47AC /* Negentropy in Frameworks */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand Down Expand Up @@ -2908,6 +2913,7 @@
82D6FC882CD9A4DE00C925F4 /* EmojiPicker in Frameworks */,
82D6FC842CD9A48500C925F4 /* Kingfisher in Frameworks */,
82D6FC812CD99FC500C925F4 /* secp256k1 in Frameworks */,
D7E5B2E32EA0A69900CF47AC /* Negentropy in Frameworks */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand All @@ -2925,6 +2931,7 @@
D703D7492C6709B100A400EA /* secp256k1 in Frameworks */,
D70D909C2CDED7B200CD0534 /* CodeScanner in Frameworks */,
D73E5F9B2C6AA8B0007EB227 /* Kingfisher in Frameworks */,
D7E5B2E12EA0A69200CF47AC /* Negentropy in Frameworks */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand All @@ -2936,6 +2943,7 @@
D789D1202AFEFBF20083A7AB /* secp256k1 in Frameworks */,
D7EDED312B1290B80018B19C /* MarkdownUI in Frameworks */,
D7DB1FEA2D5A9F5A00CF06DA /* CryptoSwift in Frameworks */,
D7E5B2DF2EA0A68600CF47AC /* Negentropy in Frameworks */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand Down Expand Up @@ -5353,6 +5361,7 @@
D7C48C0A2D12DE0C00A3BACF /* SwiftyCrop */,
D7DB1FE32D5A9AC900CF06DA /* CryptoSwift */,
3ACF94372DA9A52F00971A4E /* FaviconFinder */,
D7E5B2DA2EA080BE00CF47AC /* Negentropy */,
);
productName = damus;
productReference = 4CE6DEE327F7A08100C66700 /* damus.app */;
Expand Down Expand Up @@ -5421,6 +5430,7 @@
D7C48C0C2D12E34900A3BACF /* SwiftyCrop */,
D7DB1FEB2D5A9F6500CF06DA /* CryptoSwift */,
3ACF943F2DA9B11200971A4E /* FaviconFinder */,
D7E5B2E22EA0A69900CF47AC /* Negentropy */,
);
productName = "share extension";
productReference = 82D6FA972CD9820500C925F4 /* ShareExtension.appex */;
Expand Down Expand Up @@ -5451,6 +5461,7 @@
D7C48C0E2D12E35600A3BACF /* SwiftyCrop */,
D7DB1FE72D5A9F5300CF06DA /* CryptoSwift */,
3ACF943D2DA9B10800971A4E /* FaviconFinder */,
D7E5B2E02EA0A69200CF47AC /* Negentropy */,
);
productName = "highlighter action extension";
productReference = D703D7172C66E47100A400EA /* HighlighterActionExtension.appex */;
Expand All @@ -5475,6 +5486,7 @@
D7EDED302B1290B80018B19C /* MarkdownUI */,
D7DB1FE92D5A9F5A00CF06DA /* CryptoSwift */,
4C5726B92D72C6FA00E7FF82 /* Kingfisher */,
D7E5B2DE2EA0A68600CF47AC /* Negentropy */,
);
productName = DamusNotificationService;
productReference = D79C4C142AFEB061003A41B4 /* DamusNotificationService.appex */;
Expand Down Expand Up @@ -5564,6 +5576,7 @@
D7C48C092D12DE0C00A3BACF /* XCRemoteSwiftPackageReference "SwiftyCrop" */,
D7DB1FE22D5A9AC900CF06DA /* XCRemoteSwiftPackageReference "CryptoSwift" */,
3ACF94362DA9A52F00971A4E /* XCRemoteSwiftPackageReference "FaviconFinder" */,
D7E5B2D92EA080BE00CF47AC /* XCRemoteSwiftPackageReference "negentropy-swift" */,
);
productRefGroup = 4CE6DEE427F7A08100C66700 /* Products */;
projectDirPath = "";
Expand Down Expand Up @@ -8396,6 +8409,14 @@
revision = e74bbbfbef939224b242ae7c342a90e60b88b5ce;
};
};
D7E5B2D92EA080BE00CF47AC /* XCRemoteSwiftPackageReference "negentropy-swift" */ = {
isa = XCRemoteSwiftPackageReference;
repositoryURL = "https://github.com/damus-io/negentropy-swift";
requirement = {
kind = upToNextMajorVersion;
minimumVersion = 0.1.0;
};
};
/* End XCRemoteSwiftPackageReference section */

/* Begin XCSwiftPackageProductDependency section */
Expand Down Expand Up @@ -8579,6 +8600,26 @@
package = D70D90962CDED61800CD0534 /* XCRemoteSwiftPackageReference "CodeScanner" */;
productName = CodeScanner;
};
D7E5B2DA2EA080BE00CF47AC /* Negentropy */ = {
isa = XCSwiftPackageProductDependency;
package = D7E5B2D92EA080BE00CF47AC /* XCRemoteSwiftPackageReference "negentropy-swift" */;
productName = Negentropy;
};
D7E5B2DE2EA0A68600CF47AC /* Negentropy */ = {
isa = XCSwiftPackageProductDependency;
package = D7E5B2D92EA080BE00CF47AC /* XCRemoteSwiftPackageReference "negentropy-swift" */;
productName = Negentropy;
};
D7E5B2E02EA0A69200CF47AC /* Negentropy */ = {
isa = XCSwiftPackageProductDependency;
package = D7E5B2D92EA080BE00CF47AC /* XCRemoteSwiftPackageReference "negentropy-swift" */;
productName = Negentropy;
};
D7E5B2E22EA0A69900CF47AC /* Negentropy */ = {
isa = XCSwiftPackageProductDependency;
package = D7E5B2D92EA080BE00CF47AC /* XCRemoteSwiftPackageReference "negentropy-swift" */;
productName = Negentropy;
};
/* End XCSwiftPackageProductDependency section */
};
rootObject = 4CE6DEDB27F7A08100C66700 /* Project object */;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion damus/ContentView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ struct ContentView: View {
NotificationsView(state: damus, notifications: home.notifications, subtitle: $menu_subtitle)

case .dms:
DirectMessagesView(damus_state: damus_state!, model: damus_state!.dms, settings: damus_state!.settings, subtitle: $menu_subtitle)
DirectMessagesView(damus_state: damus_state!, home: home, model: damus_state!.dms)
}
}
.background(DamusColors.adaptableWhite)
Expand Down
56 changes: 39 additions & 17 deletions damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//
import Foundation
import os
import Negentropy


extension NostrNetworkManager {
Expand Down Expand Up @@ -137,21 +138,16 @@ extension NostrNetworkManager {
}

var networkStreamTask: Task<Void, any Error>? = nil
var latestNoteTimestampSeen: UInt32? = nil
// Collect IDs seen locally so negentropy can reconcile deltas against relays.
var negentropyStorageVector = NegentropyStorageVector()
var negentropyInsertCount = 0
let negentropyResetLimit = 10_000 // Bound memory for long-lived streams.

let startNetworkStreamTask = {
guard streamMode.shouldStreamFromNetwork else { return }
networkStreamTask = Task {
while !Task.isCancelled {
let optimizedFilters = filters.map {
var optimizedFilter = $0
// Shift the since filter 2 minutes (120 seconds) before the last note timestamp
if let latestTimestamp = latestNoteTimestampSeen {
optimizedFilter.since = latestTimestamp > 120 ? latestTimestamp - 120 : 0
}
return optimizedFilter
}
for await item in self.multiSessionNetworkStream(filters: optimizedFilters, to: desiredRelays, streamMode: streamMode, id: id) {
for await item in self.multiSessionNetworkStream(filters: filters, to: desiredRelays, streamMode: streamMode, negentropyVector: negentropyStorageVector, id: id) {
try Task.checkCancellation()
logStreamPipelineStats("SubscriptionManager_Network_Stream_\(id)", "SubscriptionManager_Advanced_Stream_\(id)")
switch item {
Expand Down Expand Up @@ -187,11 +183,11 @@ extension NostrNetworkManager {
case .event(let lender):
logStreamPipelineStats("SubscriptionManager_Advanced_Stream_\(id)", "Consumer_\(id)")
try? lender.borrow({ event in
if let latestTimestamp = latestNoteTimestampSeen {
latestNoteTimestampSeen = max(latestTimestamp, event.createdAt)
}
else {
latestNoteTimestampSeen = event.createdAt
try negentropyStorageVector.insert(timestamp: UInt64(event.createdAt), id: try Id(data: event.id.id))
negentropyInsertCount += 1
if negentropyInsertCount >= negentropyResetLimit {
negentropyStorageVector = NegentropyStorageVector()
negentropyInsertCount = 0
}
})
continuation.yield(item)
Expand Down Expand Up @@ -219,7 +215,7 @@ extension NostrNetworkManager {
}
}

private func multiSessionNetworkStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
private func multiSessionNetworkStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, negentropyVector: NegentropyStorageVector?, id: UUID? = nil) -> AsyncStream<StreamItem> {
let id = id ?? UUID()
let streamMode = streamMode ?? defaultStreamMode()
return AsyncStream<StreamItem> { continuation in
Expand All @@ -234,7 +230,7 @@ extension NostrNetworkManager {
}

do {
for await item in await self.pool.subscribe(filters: filters, to: desiredRelays, id: id) {
for await item in await self.baseNetworkStream(filters: filters, to: desiredRelays, streamMode: streamMode, negentropyVector: negentropyVector, id: id) {
try Task.checkCancellation()
logStreamPipelineStats("RelayPool_Handler_\(id)", "SubscriptionManager_Network_Stream_\(id)")
switch item {
Expand Down Expand Up @@ -264,6 +260,32 @@ extension NostrNetworkManager {
}
}

private func baseNetworkStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, negentropyVector: NegentropyStorageVector?, id: UUID? = nil) -> AsyncStream<RelayPool.StreamItem> {
let streamMode = streamMode ?? defaultStreamMode()
return AsyncStream<RelayPool.StreamItem> { continuation in
let streamTask = Task {
do {
if let negentropyVector, streamMode.optimizeNetworkFilter {
for await item in try await self.pool.negentropySubscribe(filters: filters, to: desiredRelays, negentropyVector: negentropyVector, id: id) {
try Task.checkCancellation()
continuation.yield(item)
}
} else {
for await item in await self.pool.subscribe(filters: filters, to: desiredRelays, id: id) {
try Task.checkCancellation()
continuation.yield(item)
}
}
} catch {
Self.logger.error("Network subscription \(id?.uuidString ?? "unknown"): Streaming error: \(error.localizedDescription)")
}
}
continuation.onTermination = { @Sendable _ in
streamTask.cancel()
}
}
}

private func multiSessionNdbStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
return AsyncStream<StreamItem> { continuation in
let subscriptionId = id ?? UUID()
Expand Down
4 changes: 4 additions & 0 deletions damus/Core/Nostr/NostrEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ func verify_nostr_response(response: borrowing NostrResponse) -> Bool {
return true
case .auth(_):
return true
case .negentropyError(subscriptionId: let subscriptionId, reasonCodeString: let reasonCodeString):
return true
case .negentropyMessage(subscriptionId: let subscriptionId, hexEncodedData: let hexEncodedData):
return true
}
}

Expand Down
12 changes: 12 additions & 0 deletions damus/Core/Nostr/NostrRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ enum NostrRequest {
case event(NostrEvent)
/// Authenticate with the relay
case auth(NostrEvent)
/// Negentropy open
case negentropyOpen(subscriptionId: String, filters: [NostrFilter], initialMessage: [UInt8])
/// Negentropy message
case negentropyMessage(subscriptionId: String, message: [UInt8])
/// Close negentropy communication
case negentropyClose(subscriptionId: String)

/// Whether this request is meant to write data to a relay
var is_write: Bool {
Expand All @@ -60,6 +66,12 @@ enum NostrRequest {
return true
case .auth:
return false
case .negentropyOpen:
return false
case .negentropyMessage:
return false
case .negentropyClose:
return false
}
}

Expand Down
53 changes: 51 additions & 2 deletions damus/Core/Nostr/NostrResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,26 @@ enum MaybeResponse {
case ok(NostrResponse)
}

enum NegentropyResponse {
/// Negentropy error
case error(subscriptionId: String, reasonCodeString: String)
/// Negentropy message
case message(subscriptionId: String, data: [UInt8])
/// Invalid negentropy message
case invalidResponse(subscriptionId: String)

var subscriptionId: String {
switch self {
case .error(subscriptionId: let subscriptionId, reasonCodeString: _):
return subscriptionId
case .message(subscriptionId: let subscriptionId, data: _):
return subscriptionId
case .invalidResponse(subscriptionId: let subscriptionId):
return subscriptionId
}
}
}

enum NostrResponse {
case event(String, NostrEvent)
case notice(String)
Expand All @@ -27,6 +47,10 @@ enum NostrResponse {
///
/// The associated type of this case is the challenge string sent by the server.
case auth(String)
/// Negentropy error
case negentropyError(subscriptionId: String, reasonCodeString: String)
/// Negentropy message
case negentropyMessage(subscriptionId: String, hexEncodedData: String)

var subid: String? {
switch self {
Expand All @@ -36,10 +60,36 @@ enum NostrResponse {
return sub_id
case .eose(let sub_id):
return sub_id
case .notice:
case .notice(_):
return nil
case .auth(let challenge_string):
return challenge_string
case .negentropyError(subscriptionId: let subscriptionId, reasonCodeString: _):
return subscriptionId
case .negentropyMessage(subscriptionId: let subscriptionId, hexEncodedData: _):
return subscriptionId
}
}

var negentropyResponse: NegentropyResponse? {
switch self {
case .event(_, _):
return nil
case .notice(_):
return nil
case .eose(_):
return nil
case .ok(_):
return nil
case .auth(_):
return nil
case .negentropyError(subscriptionId: let subscriptionId, reasonCodeString: let reasonCodeString):
return .error(subscriptionId: subscriptionId, reasonCodeString: reasonCodeString)
case .negentropyMessage(subscriptionId: let subscriptionId, hexEncodedData: let hexData):
if let bytes = hex_decode(hexData) {
return .message(subscriptionId: subscriptionId, data: bytes)
}
return .invalidResponse(subscriptionId: subscriptionId)
}
}

Expand Down Expand Up @@ -119,4 +169,3 @@ func sized_cstr(cstr: UnsafePointer<CChar>, len: Int32) -> String? {
let msgbuf = Data(bytes: cstr, count: Int(len))
return String(data: msgbuf, encoding: .utf8)
}

Loading