Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import io.rebble.libpebblecommon.web.LockerModelWrapper
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.mapNotNull
Expand All @@ -78,6 +79,53 @@ import kotlin.uuid.Uuid
data class PhoneCapabilities(val capabilities: Set<ProtocolCapsFlag>)
data class PlatformFlags(val flags: UInt)

data class CustomDataLoggingEvent(
val sessionId: UByte,
val appUuid: Uuid,
val tag: UInt,
val data: ByteArray,
val itemSize: UShort,
val itemsLeft: UInt,
) {
override fun equals(other: Any?): Boolean {
if (this === other) {
return true
}
if (other !is CustomDataLoggingEvent) {
return false
}
return sessionId == other.sessionId &&
appUuid == other.appUuid &&
tag == other.tag &&
data.contentEquals(other.data) &&
itemSize == other.itemSize &&
itemsLeft == other.itemsLeft
}

override fun hashCode(): Int {
var result = sessionId.hashCode()
result = 31 * result + appUuid.hashCode()
result = 31 * result + tag.hashCode()
result = 31 * result + data.contentHashCode()
result = 31 * result + itemSize.hashCode()
result = 31 * result + itemsLeft.hashCode()
return result
}
}

fun interface CustomDataLoggingSink {
suspend fun onData(event: CustomDataLoggingEvent)
}

interface CustomDataLogging {
val customData: SharedFlow<CustomDataLoggingEvent>
get() = _noopCustomData
fun setDataSink(sink: CustomDataLoggingSink?) {}
companion object {
private val _noopCustomData: SharedFlow<CustomDataLoggingEvent> = MutableSharedFlow()
}
}

typealias PebbleDevices = StateFlow<List<PebbleDevice>>

sealed class PebbleConnectionEvent {
Expand All @@ -88,7 +136,7 @@ sealed class PebbleConnectionEvent {
@Stable
interface LibPebble : Scanning, RequestSync, LockerApi, NotificationApps, CallManagement, Calendar,
OtherPebbleApps, PKJSToken, Watches, Errors, Contacts, AnalyticsEvents, HealthApi, WatchPrefs,
SystemGeolocation, Timeline, Vibrations, Weather, HealthDataApi {
SystemGeolocation, Timeline, Vibrations, Weather, HealthDataApi, CustomDataLogging {
fun init()

val config: StateFlow<LibPebbleConfig>
Expand Down Expand Up @@ -395,13 +443,14 @@ class LibPebble3(
private val vibePatternDao: VibePatternDao,
private val watchPreferences: WatchPrefs,
private val weatherManager: WeatherManager,
private val datalogging: CustomDataLogging,
) : LibPebble, Scanning by scanning, RequestSync by webSyncManager, LockerApi by locker,
NotificationApps by notificationApi, Calendar by phoneCalendarSyncer,
OtherPebbleApps by otherPebbleApps, PKJSToken by jsTokenUtil, Watches by watchManager,
Errors by errorTracker, Contacts by contacts, AnalyticsEvents by analytics,
HealthApi by health, SystemGeolocation by systemGeolocation, Timeline by timeline,
Vibrations by notificationApi, WatchPrefs by watchPreferences, Weather by weatherManager,
HealthDataApi by health {
HealthDataApi by health, CustomDataLogging by datalogging {
private val logger = Logger.withTag("LibPebble3")
private val initialized = AtomicBoolean(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,64 @@ package io.rebble.libpebblecommon.datalogging

import co.touchlab.kermit.Logger
import io.rebble.libpebblecommon.SystemAppIDs.SYSTEM_APP_UUID
import io.rebble.libpebblecommon.connection.CustomDataLogging
import io.rebble.libpebblecommon.connection.CustomDataLoggingEvent
import io.rebble.libpebblecommon.connection.CustomDataLoggingSink
import io.rebble.libpebblecommon.connection.WebServices
import io.rebble.libpebblecommon.di.LibPebbleCoroutineScope
import io.rebble.libpebblecommon.services.WatchInfo
import io.rebble.libpebblecommon.structmapper.SBytes
import io.rebble.libpebblecommon.structmapper.SUInt
import io.rebble.libpebblecommon.structmapper.StructMappable
import io.rebble.libpebblecommon.util.DataBuffer
import io.rebble.libpebblecommon.util.Endian
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
import kotlin.concurrent.atomics.AtomicReference
import kotlin.uuid.Uuid

class Datalogging(
private val webServices: WebServices,
private val healthDataProcessor: HealthDataProcessor,
) {
libPebbleScope: LibPebbleCoroutineScope,
) : CustomDataLogging {
private val logger = Logger.withTag("Datalogging")

fun logData(
private val _customData =
MutableSharedFlow<CustomDataLoggingEvent>(
extraBufferCapacity = BUFFER_CAPACITY,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
override val customData: SharedFlow<CustomDataLoggingEvent> = _customData.asSharedFlow()
private val sinkRef = AtomicReference<CustomDataLoggingSink?>(null)
private val sinkChannel =
Channel<CustomDataLoggingEvent>(
capacity = SINK_CHANNEL_CAPACITY,
onBufferOverflow = BufferOverflow.SUSPEND,
)

init {
libPebbleScope.launch {
for (event in sinkChannel) {
val sink = sinkRef.load() ?: continue
try {
sink.onData(event)
} catch (e: Throwable) {
logger.e(e) { "Sink threw for tag=${event.tag} uuid=${event.appUuid}" }
}
}
}
}

override fun setDataSink(sink: CustomDataLoggingSink?) {
sinkRef.store(sink)
}

suspend fun logData(
sessionId: UByte,
uuid: Uuid,
tag: UInt,
Expand Down Expand Up @@ -48,6 +90,7 @@ class Datalogging(
offset += size
}
}

ANALYTICS_HEARTBEAT_TAG -> {
// Fixed-size native_heartbeat_record items (no inner length prefix).
val size = itemSize.toInt()
Expand All @@ -63,16 +106,39 @@ class Datalogging(
}
}
}
return
}
val event =
CustomDataLoggingEvent(
sessionId = sessionId,
appUuid = uuid,
tag = tag,
data = data,
itemSize = itemSize,
itemsLeft = itemsLeft,
)

if (sinkRef.load() != null) {
sinkChannel.send(event)
}
_customData.tryEmit(event)
}

fun openSession(sessionId: UByte, tag: UInt, applicationUuid: Uuid, itemSize: UShort) {
fun openSession(
sessionId: UByte,
tag: UInt,
applicationUuid: Uuid,
itemSize: UShort,
) {
if (tag in HealthDataProcessor.HEALTH_TAGS) {
healthDataProcessor.handleSessionOpen(sessionId, tag, applicationUuid, itemSize)
}
}

fun closeSession(sessionId: UByte, tag: UInt) {
fun closeSession(
sessionId: UByte,
tag: UInt,
) {
if (tag in HealthDataProcessor.HEALTH_TAGS) {
healthDataProcessor.handleSessionClose(sessionId)
}
Expand All @@ -81,6 +147,8 @@ class Datalogging(
companion object {
private val MEMFAULT_CHUNKS_TAG: UInt = 86u
private val ANALYTICS_HEARTBEAT_TAG: UInt = 87u
private const val SINK_CHANNEL_CAPACITY = 256
private const val BUFFER_CAPACITY = 256
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.rebble.libpebblecommon.connection.AppContext
import io.rebble.libpebblecommon.connection.ConnectionFailureHandler
import io.rebble.libpebblecommon.connection.Contacts
import io.rebble.libpebblecommon.connection.CreatePlatformIdentifier
import io.rebble.libpebblecommon.connection.CustomDataLogging
import io.rebble.libpebblecommon.connection.LegacyBtClassicMigrator
import io.rebble.libpebblecommon.connection.LibPebble
import io.rebble.libpebblecommon.connection.LibPebble3
Expand Down Expand Up @@ -405,6 +406,7 @@ fun initKoin(
get(),
get(),
get(),
get(),
)
} bind LibPebble::class
single { RealConnectionScopeFactory(koin) } bind ConnectionScopeFactory::class
Expand Down Expand Up @@ -434,7 +436,7 @@ fun initKoin(
singleOf(::MissedCallSyncer)
singleOf(::FirmwareDownloader)
singleOf(::JsTokenUtil)
singleOf(::Datalogging)
singleOf(::Datalogging) bind CustomDataLogging::class
singleOf(::Health)
singleOf(::ErrorTracker)
singleOf(::RealConnectionFailureHandler) bind ConnectionFailureHandler::class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,55 +43,56 @@ class DataLoggingService(
}

fun initialInit() {
protocolHandler.inboundMessages.onEach {
when (it) {
is DataLoggingIncomingPacket.OpenSession -> {
val id = it.sessionId.get()
val tag = it.tag.get()
val applicationUuid = it.applicationUUID.get()
val itemSize = it.dataItemSize.get()
logger.d { "Session opened: $id tag: $tag (accepted: $acceptSessions)" }
sessions[id] = DataLoggingSession(id, tag, applicationUuid, itemSize)
datalogging.openSession(id, tag, applicationUuid, itemSize)
sendAckNack(id)
}

is DataLoggingIncomingPacket.SendDataItems -> {
val id = it.sessionId.get()
val session = sessions[id]
if (session == null) {
logger.e { "Session not found: $id" }
return@onEach
protocolHandler.inboundMessages
.onEach {
when (it) {
is DataLoggingIncomingPacket.OpenSession -> {
val id = it.sessionId.get()
val tag = it.tag.get()
val applicationUuid = it.applicationUUID.get()
val itemSize = it.dataItemSize.get()
logger.d { "Session opened: $id tag: $tag (accepted: $acceptSessions)" }
sessions[id] = DataLoggingSession(id, tag, applicationUuid, itemSize)
datalogging.openSession(id, tag, applicationUuid, itemSize)
sendAckNack(id)
}
sendAckNack(id)
val info = watchInfo
if (info == null) {
logger.e { "watch info is null" }
return@onEach

is DataLoggingIncomingPacket.SendDataItems -> {
val id = it.sessionId.get()
val session = sessions[id]
if (session == null) {
logger.e { "Session not found: $id" }
return@onEach
}
val info = watchInfo
if (info == null) {
logger.e { "watch info is null" }
return@onEach
}
datalogging.logData(
sessionId = id,
uuid = session.uuid,
tag = session.tag,
data = it.payload.get().toByteArray(),
watchInfo = info,
itemSize = session.itemSize,
itemsLeft = it.itemsLeftAfterThis.get(),
)
sendAckNack(id)
}
datalogging.logData(
sessionId = id,
uuid = session.uuid,
tag = session.tag,
data = it.payload.get().toByteArray(),
watchInfo = info,
itemSize = session.itemSize,
itemsLeft = it.itemsLeftAfterThis.get(),
)
}

is DataLoggingIncomingPacket.CloseSession -> {
val id = it.sessionId.get()
val session = sessions[id]
logger.d { "Session closed: $id" }
if (session != null) {
datalogging.closeSession(id, session.tag)
is DataLoggingIncomingPacket.CloseSession -> {
val id = it.sessionId.get()
val session = sessions[id]
logger.d { "Session closed: $id" }
if (session != null) {
datalogging.closeSession(id, session.tag)
}
sessions.remove(id)
sendAckNack(id)
}
sessions.remove(id)
sendAckNack(id)
}
}
}.launchIn(scope)
}.launchIn(scope)
}
}

Expand All @@ -100,4 +101,5 @@ data class DataLoggingSession(
val tag: UInt,
val uuid: Uuid,
val itemSize: UShort,
)
)