diff --git a/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/connection/LibPebble.kt b/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/connection/LibPebble.kt index afa2f2021..03bd58cc0 100644 --- a/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/connection/LibPebble.kt +++ b/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/connection/LibPebble.kt @@ -65,6 +65,7 @@ import io.rebble.libpebblecommon.web.LockerModelWrapper import kotlinx.coroutines.Deferred import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.mapNotNull @@ -78,6 +79,53 @@ import kotlin.uuid.Uuid data class PhoneCapabilities(val capabilities: Set) data class PlatformFlags(val flags: UInt) +data class CustomDataLoggingEvent( + val sessionId: UByte, + val appUuid: Uuid, + val tag: UInt, + val data: ByteArray, + val itemSize: UShort, + val itemsLeft: UInt, +) { + override fun equals(other: Any?): Boolean { + if (this === other) { + return true + } + if (other !is CustomDataLoggingEvent) { + return false + } + return sessionId == other.sessionId && + appUuid == other.appUuid && + tag == other.tag && + data.contentEquals(other.data) && + itemSize == other.itemSize && + itemsLeft == other.itemsLeft + } + + override fun hashCode(): Int { + var result = sessionId.hashCode() + result = 31 * result + appUuid.hashCode() + result = 31 * result + tag.hashCode() + result = 31 * result + data.contentHashCode() + result = 31 * result + itemSize.hashCode() + result = 31 * result + itemsLeft.hashCode() + return result + } +} + +fun interface CustomDataLoggingSink { + suspend fun onData(event: CustomDataLoggingEvent) +} + +interface CustomDataLogging { + val customData: SharedFlow + get() = _noopCustomData + fun setDataSink(sink: CustomDataLoggingSink?) {} + companion object { + private val _noopCustomData: SharedFlow = MutableSharedFlow() + } +} + typealias PebbleDevices = StateFlow> sealed class PebbleConnectionEvent { @@ -88,7 +136,7 @@ sealed class PebbleConnectionEvent { @Stable interface LibPebble : Scanning, RequestSync, LockerApi, NotificationApps, CallManagement, Calendar, OtherPebbleApps, PKJSToken, Watches, Errors, Contacts, AnalyticsEvents, HealthApi, WatchPrefs, - SystemGeolocation, Timeline, Vibrations, Weather, HealthDataApi { + SystemGeolocation, Timeline, Vibrations, Weather, HealthDataApi, CustomDataLogging { fun init() val config: StateFlow @@ -395,13 +443,14 @@ class LibPebble3( private val vibePatternDao: VibePatternDao, private val watchPreferences: WatchPrefs, private val weatherManager: WeatherManager, + private val datalogging: CustomDataLogging, ) : LibPebble, Scanning by scanning, RequestSync by webSyncManager, LockerApi by locker, NotificationApps by notificationApi, Calendar by phoneCalendarSyncer, OtherPebbleApps by otherPebbleApps, PKJSToken by jsTokenUtil, Watches by watchManager, Errors by errorTracker, Contacts by contacts, AnalyticsEvents by analytics, HealthApi by health, SystemGeolocation by systemGeolocation, Timeline by timeline, Vibrations by notificationApi, WatchPrefs by watchPreferences, Weather by weatherManager, - HealthDataApi by health { + HealthDataApi by health, CustomDataLogging by datalogging { private val logger = Logger.withTag("LibPebble3") private val initialized = AtomicBoolean(false) diff --git a/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/datalogging/Datalogging.kt b/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/datalogging/Datalogging.kt index 2206a9606..716df6801 100644 --- a/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/datalogging/Datalogging.kt +++ b/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/datalogging/Datalogging.kt @@ -2,22 +2,64 @@ package io.rebble.libpebblecommon.datalogging import co.touchlab.kermit.Logger import io.rebble.libpebblecommon.SystemAppIDs.SYSTEM_APP_UUID +import io.rebble.libpebblecommon.connection.CustomDataLogging +import io.rebble.libpebblecommon.connection.CustomDataLoggingEvent +import io.rebble.libpebblecommon.connection.CustomDataLoggingSink import io.rebble.libpebblecommon.connection.WebServices +import io.rebble.libpebblecommon.di.LibPebbleCoroutineScope import io.rebble.libpebblecommon.services.WatchInfo import io.rebble.libpebblecommon.structmapper.SBytes import io.rebble.libpebblecommon.structmapper.SUInt import io.rebble.libpebblecommon.structmapper.StructMappable import io.rebble.libpebblecommon.util.DataBuffer import io.rebble.libpebblecommon.util.Endian +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.launch +import kotlin.concurrent.atomics.AtomicReference import kotlin.uuid.Uuid class Datalogging( private val webServices: WebServices, private val healthDataProcessor: HealthDataProcessor, -) { + libPebbleScope: LibPebbleCoroutineScope, +) : CustomDataLogging { private val logger = Logger.withTag("Datalogging") - fun logData( + private val _customData = + MutableSharedFlow( + extraBufferCapacity = BUFFER_CAPACITY, + onBufferOverflow = BufferOverflow.DROP_OLDEST, + ) + override val customData: SharedFlow = _customData.asSharedFlow() + private val sinkRef = AtomicReference(null) + private val sinkChannel = + Channel( + capacity = SINK_CHANNEL_CAPACITY, + onBufferOverflow = BufferOverflow.SUSPEND, + ) + + init { + libPebbleScope.launch { + for (event in sinkChannel) { + val sink = sinkRef.load() ?: continue + try { + sink.onData(event) + } catch (e: Throwable) { + logger.e(e) { "Sink threw for tag=${event.tag} uuid=${event.appUuid}" } + } + } + } + } + + override fun setDataSink(sink: CustomDataLoggingSink?) { + sinkRef.store(sink) + } + + suspend fun logData( sessionId: UByte, uuid: Uuid, tag: UInt, @@ -48,6 +90,7 @@ class Datalogging( offset += size } } + ANALYTICS_HEARTBEAT_TAG -> { // Fixed-size native_heartbeat_record items (no inner length prefix). val size = itemSize.toInt() @@ -63,16 +106,39 @@ class Datalogging( } } } + return } + val event = + CustomDataLoggingEvent( + sessionId = sessionId, + appUuid = uuid, + tag = tag, + data = data, + itemSize = itemSize, + itemsLeft = itemsLeft, + ) + + if (sinkRef.load() != null) { + sinkChannel.send(event) + } + _customData.tryEmit(event) } - fun openSession(sessionId: UByte, tag: UInt, applicationUuid: Uuid, itemSize: UShort) { + fun openSession( + sessionId: UByte, + tag: UInt, + applicationUuid: Uuid, + itemSize: UShort, + ) { if (tag in HealthDataProcessor.HEALTH_TAGS) { healthDataProcessor.handleSessionOpen(sessionId, tag, applicationUuid, itemSize) } } - fun closeSession(sessionId: UByte, tag: UInt) { + fun closeSession( + sessionId: UByte, + tag: UInt, + ) { if (tag in HealthDataProcessor.HEALTH_TAGS) { healthDataProcessor.handleSessionClose(sessionId) } @@ -81,6 +147,8 @@ class Datalogging( companion object { private val MEMFAULT_CHUNKS_TAG: UInt = 86u private val ANALYTICS_HEARTBEAT_TAG: UInt = 87u + private const val SINK_CHANNEL_CAPACITY = 256 + private const val BUFFER_CAPACITY = 256 } } diff --git a/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/di/LibPebbleModule.kt b/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/di/LibPebbleModule.kt index cde0d0643..f8b35e1ff 100644 --- a/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/di/LibPebbleModule.kt +++ b/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/di/LibPebbleModule.kt @@ -22,6 +22,7 @@ import io.rebble.libpebblecommon.connection.AppContext import io.rebble.libpebblecommon.connection.ConnectionFailureHandler import io.rebble.libpebblecommon.connection.Contacts import io.rebble.libpebblecommon.connection.CreatePlatformIdentifier +import io.rebble.libpebblecommon.connection.CustomDataLogging import io.rebble.libpebblecommon.connection.LegacyBtClassicMigrator import io.rebble.libpebblecommon.connection.LibPebble import io.rebble.libpebblecommon.connection.LibPebble3 @@ -405,6 +406,7 @@ fun initKoin( get(), get(), get(), + get(), ) } bind LibPebble::class single { RealConnectionScopeFactory(koin) } bind ConnectionScopeFactory::class @@ -434,7 +436,7 @@ fun initKoin( singleOf(::MissedCallSyncer) singleOf(::FirmwareDownloader) singleOf(::JsTokenUtil) - singleOf(::Datalogging) + singleOf(::Datalogging) bind CustomDataLogging::class singleOf(::Health) singleOf(::ErrorTracker) singleOf(::RealConnectionFailureHandler) bind ConnectionFailureHandler::class diff --git a/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/services/DataLoggingService.kt b/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/services/DataLoggingService.kt index 55b28085f..7a163dec7 100644 --- a/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/services/DataLoggingService.kt +++ b/libpebble3/src/commonMain/kotlin/io/rebble/libpebblecommon/services/DataLoggingService.kt @@ -43,55 +43,56 @@ class DataLoggingService( } fun initialInit() { - protocolHandler.inboundMessages.onEach { - when (it) { - is DataLoggingIncomingPacket.OpenSession -> { - val id = it.sessionId.get() - val tag = it.tag.get() - val applicationUuid = it.applicationUUID.get() - val itemSize = it.dataItemSize.get() - logger.d { "Session opened: $id tag: $tag (accepted: $acceptSessions)" } - sessions[id] = DataLoggingSession(id, tag, applicationUuid, itemSize) - datalogging.openSession(id, tag, applicationUuid, itemSize) - sendAckNack(id) - } - - is DataLoggingIncomingPacket.SendDataItems -> { - val id = it.sessionId.get() - val session = sessions[id] - if (session == null) { - logger.e { "Session not found: $id" } - return@onEach + protocolHandler.inboundMessages + .onEach { + when (it) { + is DataLoggingIncomingPacket.OpenSession -> { + val id = it.sessionId.get() + val tag = it.tag.get() + val applicationUuid = it.applicationUUID.get() + val itemSize = it.dataItemSize.get() + logger.d { "Session opened: $id tag: $tag (accepted: $acceptSessions)" } + sessions[id] = DataLoggingSession(id, tag, applicationUuid, itemSize) + datalogging.openSession(id, tag, applicationUuid, itemSize) + sendAckNack(id) } - sendAckNack(id) - val info = watchInfo - if (info == null) { - logger.e { "watch info is null" } - return@onEach + + is DataLoggingIncomingPacket.SendDataItems -> { + val id = it.sessionId.get() + val session = sessions[id] + if (session == null) { + logger.e { "Session not found: $id" } + return@onEach + } + val info = watchInfo + if (info == null) { + logger.e { "watch info is null" } + return@onEach + } + datalogging.logData( + sessionId = id, + uuid = session.uuid, + tag = session.tag, + data = it.payload.get().toByteArray(), + watchInfo = info, + itemSize = session.itemSize, + itemsLeft = it.itemsLeftAfterThis.get(), + ) + sendAckNack(id) } - datalogging.logData( - sessionId = id, - uuid = session.uuid, - tag = session.tag, - data = it.payload.get().toByteArray(), - watchInfo = info, - itemSize = session.itemSize, - itemsLeft = it.itemsLeftAfterThis.get(), - ) - } - is DataLoggingIncomingPacket.CloseSession -> { - val id = it.sessionId.get() - val session = sessions[id] - logger.d { "Session closed: $id" } - if (session != null) { - datalogging.closeSession(id, session.tag) + is DataLoggingIncomingPacket.CloseSession -> { + val id = it.sessionId.get() + val session = sessions[id] + logger.d { "Session closed: $id" } + if (session != null) { + datalogging.closeSession(id, session.tag) + } + sessions.remove(id) + sendAckNack(id) } - sessions.remove(id) - sendAckNack(id) } - } - }.launchIn(scope) + }.launchIn(scope) } } @@ -100,4 +101,5 @@ data class DataLoggingSession( val tag: UInt, val uuid: Uuid, val itemSize: UShort, -) \ No newline at end of file +) +