diff --git a/openc3-cosmos-cmd-tlm-api/app/controllers/script_autocomplete_controller.rb b/openc3-cosmos-cmd-tlm-api/app/controllers/script_autocomplete_controller.rb index 8c727fb81a..d730ec8bc5 100644 --- a/openc3-cosmos-cmd-tlm-api/app/controllers/script_autocomplete_controller.rb +++ b/openc3-cosmos-cmd-tlm-api/app/controllers/script_autocomplete_controller.rb @@ -92,6 +92,17 @@ def build_autocomplete_data(type, scope) OpenC3::TargetModel.packets(target_name, type: type.upcase.intern, scope: scope).each do |packet| packet_to_autocomplete_hashes(autocomplete_data, packet, target_info, type) end + if type.upcase.intern == :TLM + items = OpenC3::TargetModel.all_item_names(target_name, type: :TLM, scope: scope) + items.each do |item| + autocomplete_data << + { + :caption => "#{target_name} LATEST #{item}", + :snippet => "#{target_name} LATEST #{item}", + :meta => 'telemetry', + } + end + end end autocomplete_data.sort_by { |packet| packet[:caption] } end diff --git a/openc3-cosmos-init/plugins/packages/openc3-cosmos-tool-limitsmonitor/src/tools/LimitsMonitor/LimitsControl.vue b/openc3-cosmos-init/plugins/packages/openc3-cosmos-tool-limitsmonitor/src/tools/LimitsMonitor/LimitsControl.vue index dda1dd1049..c421855e70 100644 --- a/openc3-cosmos-init/plugins/packages/openc3-cosmos-tool-limitsmonitor/src/tools/LimitsMonitor/LimitsControl.vue +++ b/openc3-cosmos-init/plugins/packages/openc3-cosmos-tool-limitsmonitor/src/tools/LimitsMonitor/LimitsControl.vue @@ -212,6 +212,7 @@ export default { items: [], itemList: [], screenItems: [], + availableItems: [], screenValues: {}, updateCounter: 0, itemsPerPage: 25, @@ -508,8 +509,8 @@ export default { } }, update() { - if (this.screenItems.length !== 0) { - this.api.get_tlm_values(this.screenItems).then((data) => { + if (this.availableItems.length !== 0) { + this.api.get_tlm_values(this.availableItems).then((data) => { this.updateValues(data) }) } @@ -522,11 +523,15 @@ export default { } }, addItem: function (valueId) { - this.screenItems.push(valueId) - this.screenValues[valueId] = [null, null, 0] + this.api.get_tlm_available([valueId]).then((available) => { + this.screenItems.push(valueId) + this.availableItems.push(available[0]) + this.screenValues[available[0]] = [null, null, 0] + }) }, deleteItem: function (valueId) { let index = this.screenItems.indexOf(valueId) + this.availableItems.splice(index, 1) this.screenItems.splice(index, 1) }, @@ -596,5 +601,4 @@ export default { font-weight: bold; background-color: var(--color-background-base-default); } - diff --git a/openc3-cosmos-init/plugins/packages/openc3-cosmos-tool-packetviewer/src/tools/PacketViewer/PacketViewer.vue b/openc3-cosmos-init/plugins/packages/openc3-cosmos-tool-packetviewer/src/tools/PacketViewer/PacketViewer.vue index 7938b33db0..3619c4e2df 100644 --- a/openc3-cosmos-init/plugins/packages/openc3-cosmos-tool-packetviewer/src/tools/PacketViewer/PacketViewer.vue +++ b/openc3-cosmos-init/plugins/packages/openc3-cosmos-tool-packetviewer/src/tools/PacketViewer/PacketViewer.vue @@ -28,6 +28,7 @@ @@ -252,6 +253,8 @@ export default { itemName: '', x: 0, y: 0, + latestAvailable: null, + latestItems: null, } }, computed: { @@ -503,19 +506,34 @@ export default { } }, packetChanged(event) { - this.api - .get_target(event.targetName) - .then((target) => { + // Handle LATEST packet specially + if (event.packetName === 'LATEST') { + this.api.get_target(event.targetName).then((target) => { if (target) { this.ignoredItems = target.ignored_items + // For LATEST, we can't get specific packet derived items + this.derivedItems = [] - return this.api.get_packet_derived_items( - event.targetName, - event.packetName, - ) + this.targetName = event.targetName + this.packetName = event.packetName + const currentTarget = this.$route.params.target?.toUpperCase() + const currentPacket = this.$route.params.packet?.toUpperCase() + if ( + currentTarget !== event.targetName || + currentPacket !== event.packetName + ) { + this.saveDefaultConfig(this.currentConfig) + this.$router.push({ + name: 'PackerViewer', + params: { + target: this.targetName, + packet: this.packetName, + }, + }) + } + this.changeUpdater(true) } else { // Probably got here from an old config or URL params that point to something that no longer exists - // (e.g. the plugin that defined this target was deleted). Unset these to avoid API errors. this.targetName = null this.packetName = null this.$router.push({ @@ -524,28 +542,93 @@ export default { }) } }) - .then((derived) => { - if (derived) { - this.derivedItems = derived + } else { + // Regular packet handling + this.api + .get_target(event.targetName) + .then((target) => { + if (target) { + this.ignoredItems = target.ignored_items - this.targetName = event.targetName - this.packetName = event.packetName - if ( - this.$route.params.target !== event.targetName || - this.$route.params.packet !== event.packetName - ) { - this.saveDefaultConfig(this.currentConfig) + return this.api.get_packet_derived_items( + event.targetName, + event.packetName, + ) + } else { + // Probably got here from an old config or URL params that point to something that no longer exists + // (e.g. the plugin that defined this target was deleted). Unset these to avoid API errors. + this.targetName = null + this.packetName = null this.$router.push({ name: 'PackerViewer', - params: { - target: this.targetName, - packet: this.packetName, - }, + params: {}, + }) + } + }) + .then((derived) => { + if (derived) { + this.derivedItems = derived + + this.targetName = event.targetName + this.packetName = event.packetName + const currentTarget = this.$route.params.target?.toUpperCase() + const currentPacket = this.$route.params.packet?.toUpperCase() + if ( + currentTarget !== event.targetName || + currentPacket !== event.packetName + ) { + this.saveDefaultConfig(this.currentConfig) + this.$router.push({ + name: 'PackerViewer', + params: { + target: this.targetName, + packet: this.packetName, + }, + }) + } + this.changeUpdater(true) + } + }) + } + }, + latestGetTlmValues(values) { + if (values != null && values.length > 0) { + this.counter += 1 + let derived = [] + let other = [] + this.latestItemNames.forEach((itemName, index) => { + if (!this.showIgnored && this.ignoredItems.includes(itemName)) { + return + } + const itemValue = values[index] + if (itemValue) { + if (this.derivedItems.includes(itemName)) { + derived.push({ + name: itemName, + value: itemValue[0], + limitsState: itemValue[1], + derived: true, + counter: this.counter, + pinned: this.isPinned(itemName), + }) + } else { + other.push({ + name: itemName, + value: itemValue[0], + limitsState: itemValue[1], + derived: false, + counter: this.counter, + pinned: this.isPinned(itemName), }) } - this.changeUpdater(true) } }) + if (this.derivedLast) { + this.rows = other.concat(derived) + } else { + this.rows = derived.concat(other) + } + } }, changeUpdater(clearExisting) { if (this.updater != null) { @@ -554,62 +637,107 @@ export default { } if (clearExisting) { this.rows = [] + this.latestAvailable = null + this.latestItems = null } this.updater = setInterval(() => { if (!this.targetName || !this.packetName) { return // noop if target/packet aren't set } - this.api - .get_tlm_packet( - this.targetName, - this.packetName, - this.valueType, - this.staleLimit, - ) - .then((data) => { - // Make sure data isn't null or undefined. Note this is the only valid use of == or != - if (data != null) { - this.counter += 1 - let derived = [] - let other = [] - data.forEach((value) => { - if (!this.showIgnored && this.ignoredItems.includes(value[0])) { - return - } - if (this.derivedItems.includes(value[0])) { - derived.push({ - name: value[0], - value: value[1], - limitsState: value[2], - derived: true, - counter: this.counter, - pinned: this.isPinned(value[0]), - }) + + // Handle LATEST packet using get_tlm_values + if (this.packetName === 'LATEST') { + if (this.latestAvailable) { + this.api + .get_tlm_values(this.latestAvailable, this.staleLimit) + .then((values) => { + this.latestGetTlmValues(values) + }) + .catch((error) => { + // eslint-disable-next-line + console.log(error) + }) + } else { + this.api + .get_all_tlm_item_names(this.targetName) + .then((itemNames) => { + this.latestItemNames = itemNames + // Build items array in format TGT__LATEST__ITEM__TYPE + const items = itemNames.map( + (item) => + `${this.targetName}__LATEST__${item}__${this.valueType}`, + ) + return this.api.get_tlm_available(items) + }) + .then((available) => { + this.latestAvailable = available + return this.api.get_tlm_values(available, this.staleLimit) + }) + .then((values) => { + this.latestGetTlmValues(values) + }) + .catch((error) => { + // eslint-disable-next-line + console.log(error) + }) + } + } else { + // Regular packet handling using get_tlm_packet + this.api + .get_tlm_packet( + this.targetName, + this.packetName, + this.valueType, + this.staleLimit, + ) + .then((data) => { + // Make sure data isn't null or undefined. Note this is the only valid use of == or != + if (data != null) { + this.counter += 1 + let derived = [] + let other = [] + data.forEach((value) => { + if ( + !this.showIgnored && + this.ignoredItems.includes(value[0]) + ) { + return + } + if (this.derivedItems.includes(value[0])) { + derived.push({ + name: value[0], + value: value[1], + limitsState: value[2], + derived: true, + counter: this.counter, + pinned: this.isPinned(value[0]), + }) + } else { + other.push({ + name: value[0], + value: value[1], + limitsState: value[2], + derived: false, + counter: this.counter, + pinned: this.isPinned(value[0]), + }) + } + }) + if (this.derivedLast) { + this.rows = other.concat(derived) } else { - other.push({ - name: value[0], - value: value[1], - limitsState: value[2], - derived: false, - counter: this.counter, - pinned: this.isPinned(value[0]), - }) + this.rows = derived.concat(other) } - }) - if (this.derivedLast) { - this.rows = other.concat(derived) - } else { - this.rows = derived.concat(other) } - } - }) - // Catch errors but just log to the console - // We don't clear the updater because errors can happen on upgrade - // and we want to continue updating once the new plugin comes online - .catch((error) => { - // eslint-disable-next-line - console.log(error) - }) + }) + // Catch errors but just log to the console + // We don't clear the updater because errors can happen on upgrade + // and we want to continue updating once the new plugin comes online + .catch((error) => { + // eslint-disable-next-line + console.log(error) + }) + } }, this.refreshInterval) }, resetConfig: function () { diff --git a/openc3-cosmos-init/plugins/packages/openc3-js-common/src/services/openc3Api.js b/openc3-cosmos-init/plugins/packages/openc3-js-common/src/services/openc3Api.js index 03bc29da82..97a358fc2f 100644 --- a/openc3-cosmos-init/plugins/packages/openc3-js-common/src/services/openc3Api.js +++ b/openc3-cosmos-init/plugins/packages/openc3-js-common/src/services/openc3Api.js @@ -459,12 +459,12 @@ export default class OpenC3Api { 10000, // 10s timeout ... should never be this long ) if (data && data.length > 0) { - let len = data[0].length + let len = data.length let converted = null for (let i = 0; i < len; i++) { - converted = this.decode_openc3_type(data[0][i]) + converted = this.decode_openc3_type(data[i][0]) if (converted !== null) { - data[0][i] = converted + data[i][0] = converted } } } diff --git a/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/DetailsDialog.vue b/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/DetailsDialog.vue index 640d4e29b4..c55a1f98e2 100644 --- a/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/DetailsDialog.vue +++ b/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/DetailsDialog.vue @@ -213,6 +213,7 @@ export default { data() { return { details: Object, + available: null, updater: null, rawValue: null, convertedValue: null, @@ -239,14 +240,8 @@ export default { this.requestDetails() if (this.type === 'tlm') { this.updater = setInterval(() => { - this.api - .get_tlm_values([ - `${this.targetName}__${this.packetName}__${this.itemName}__RAW`, - `${this.targetName}__${this.packetName}__${this.itemName}__CONVERTED`, - `${this.targetName}__${this.packetName}__${this.itemName}__FORMATTED`, - `${this.targetName}__${this.packetName}__${this.itemName}__WITH_UNITS`, - ]) - .then((values) => { + if (this.available && this.details) { + this.api.get_tlm_values(this.available).then((values) => { for (let value of values) { let rawString = null // Check for raw encoded strings (non-ascii) @@ -304,6 +299,7 @@ export default { this.formattedValue = values[2][0] this.unitsValue = values[3][0] }) + } }, 1000) } } else { @@ -340,18 +336,30 @@ export default { }, async requestDetails() { if (this.type === 'tlm') { - await this.api - .get_item(this.targetName, this.packetName, this.itemName) - .then((details) => { - this.details = details - // If the item does not have limits explicitly null it - // to make the check in the template easier - if (!this.hasLimits(details)) { - this.details.limits = null + this.api + .get_tlm_available([ + `${this.targetName}__${this.packetName}__${this.itemName}__RAW`, + `${this.targetName}__${this.packetName}__${this.itemName}__CONVERTED`, + `${this.targetName}__${this.packetName}__${this.itemName}__FORMATTED`, + `${this.targetName}__${this.packetName}__${this.itemName}__WITH_UNITS`, + ]) + .then((available) => { + if (available && available.length > 0) { + this.available = available } + this.api + .get_item(this.targetName, this.packetName, this.itemName) + .then((details) => { + this.details = details + // If the item does not have limits explicitly null it + // to make the check in the template easier + if (!this.hasLimits(details)) { + this.details.limits = null + } + }) }) } else { - await this.api + this.api .get_parameter(this.targetName, this.packetName, this.itemName) .then((details) => { this.details = details diff --git a/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/TargetPacketItemChooser.vue b/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/TargetPacketItemChooser.vue index 32613454eb..641674059d 100644 --- a/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/TargetPacketItemChooser.vue +++ b/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/TargetPacketItemChooser.vue @@ -53,10 +53,6 @@ item-value="value" @update:model-value="packetNameChanged" > - 1 + ) { + this.selectedPacketName = this.packetNames[1].value + } } } this.updatePacketDetails(this.selectedPacketName) @@ -605,6 +615,8 @@ export default { } else if (value === 'LATEST') { this.itemsDisabled = false this.selectedPacketName = 'LATEST' + this.description = 'Latest values from all packets' + this.hazardous = false } else { this.itemsDisabled = false const packet = this.packetNames.find((packet) => { diff --git a/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/autocomplete/screenCompleter.js b/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/autocomplete/screenCompleter.js index 6437dfa301..e05ca167c8 100644 --- a/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/autocomplete/screenCompleter.js +++ b/openc3-cosmos-init/plugins/packages/openc3-vue-common/src/components/autocomplete/screenCompleter.js @@ -88,14 +88,23 @@ export default class ScreenCompleter { (acc, pkt) => ((acc[pkt.packet_name] = pkt.description), acc), {}, ) + suggestions['LATEST'] = 'LATEST values from all packets' } else if (current['Item name']) { let target_name = parsedLine[parsedLine.length - 3] let packet_name = parsedLine[parsedLine.length - 2] - let packet = await this.api.get_tlm(target_name, packet_name) - suggestions = packet.items.reduce( - (acc, item) => ((acc[item.name] = item.description), acc), - {}, - ) + if (packet_name === 'LATEST') { + let items = await this.api.get_all_tlm_item_names(target_name) + suggestions = {} + items.forEach((item) => { + suggestions[item] = `LATEST ${item}` + }) + } else { + let packet = await this.api.get_tlm(target_name, packet_name) + suggestions = packet.items.reduce( + (acc, item) => ((acc[item.name] = item.description), acc), + {}, + ) + } } else { // Not a special case so just use the param as is suggestions = current diff --git a/openc3.code-workspace b/openc3.code-workspace index 0419c4170b..83bf9f7466 100644 --- a/openc3.code-workspace +++ b/openc3.code-workspace @@ -35,6 +35,15 @@ }, { "path": "../../Development/openc3-cosmos-vueflow" + }, + { + "path": "../../Development/fprime" + }, + { + "path": "../openc3-cosmos-fprime" + }, + { + "path": "../openc3-cosmos-script-engine-cstol" } ], "settings": { diff --git a/openc3/data/config/command_modifiers.yaml b/openc3/data/config/command_modifiers.yaml index 24a8a52e64..3449b99677 100644 --- a/openc3/data/config/command_modifiers.yaml +++ b/openc3/data/config/command_modifiers.yaml @@ -176,6 +176,19 @@ ACCESSOR: description: Additional argument passed to the accessor class constructor values: .+ since: 5.0.10 +SUBPACKETIZER: + summary: Defines a class used to break up the packet into subpackets before decom + description: Defines a class used to break up the packet into subpackets before decom. Defaults to nil/None. + parameters: + - name: Subpacketizer Class Name + required: true + description: The name of the Subpacketizer class + values: .+ + - name: Argument + required: false + description: Additional argument passed to the Subpacketizer class constructor + values: .+ + since: 6.10.0 TEMPLATE: summary: Defines a template string used to initialize the command before default values are filled in description: Generally the template string is formatted in JSON or HTML and then values are filled in with @@ -256,6 +269,10 @@ RESTRICTED: summary: Marks this packet as restricted and will require approval if critical commanding is enabled description: Used as one of the two types of critical commands (HAZARDOUS and RESTRICTED) since: 5.20.0 +SUBPACKET: + summary: Marks this packet as as a subpacket which will exclude it from Interface level identification + description: Used with a SUBPACKETIZER to breakup up packets into subpackets at decom time + since: 6.10.0 VALIDATOR: summary: Defines a validator class for a command description: Validator class is used to validate the command success or failure with both a pre_check and post_check method. diff --git a/openc3/data/config/telemetry_modifiers.yaml b/openc3/data/config/telemetry_modifiers.yaml index 44e877385d..7d4bbcb01c 100644 --- a/openc3/data/config/telemetry_modifiers.yaml +++ b/openc3/data/config/telemetry_modifiers.yaml @@ -173,6 +173,19 @@ ACCESSOR: description: The name of the accessor class values: .+ since: 5.0.10 +SUBPACKETIZER: + summary: Defines a class used to break up the packet into subpackets before decom + description: Defines a class used to break up the packet into subpackets before decom. Defaults to nil/None. + parameters: + - name: Subpacketizer Class Name + required: true + description: The name of the Subpacketizer class + values: .+ + - name: Argument + required: false + description: Additional argument passed to the Subpacketizer class constructor + values: .+ + since: 6.10.0 TEMPLATE: summary: Defines a template string used to pull telemetry values from a string buffer parameters: @@ -198,3 +211,7 @@ VIRTUAL: summary: Marks this packet as virtual and not participating in identification description: Used for packet definitions that can be used as structures for items with a given packet. since: 5.18.0 +SUBPACKET: + summary: Marks this packet as as a subpacket which will exclude it from Interface level identification + description: Used with a SUBPACKETIZER to breakup up packets into subpackets at decom time + since: 6.10.0 \ No newline at end of file diff --git a/openc3/ext/openc3/ext/packet/packet.c b/openc3/ext/openc3/ext/packet/packet.c index fa75f84eba..027e2ce4e2 100644 --- a/openc3/ext/openc3/ext/packet/packet.c +++ b/openc3/ext/openc3/ext/packet/packet.c @@ -64,6 +64,9 @@ static ID id_ivar_packet_time = 0; static ID id_ivar_ignore_overlap = 0; static ID id_ivar_virtual = 0; static ID id_ivar_restricted = 0; +static ID id_ivar_subpacket = 0; +static ID id_ivar_subpacketizer = 0; +static ID id_ivar_obfuscated_items = 0; /* Sets the target name this packet is associated with. Unidentified packets * will have target name set to nil. @@ -291,6 +294,9 @@ static VALUE packet_initialize(int argc, VALUE *argv, VALUE self) rb_ivar_set(self, id_ivar_ignore_overlap, Qfalse); rb_ivar_set(self, id_ivar_virtual, Qfalse); rb_ivar_set(self, id_ivar_restricted, Qfalse); + rb_ivar_set(self, id_ivar_subpacket, Qfalse); + rb_ivar_set(self, id_ivar_subpacketizer, Qnil); + rb_ivar_set(self, id_ivar_obfuscated_items, Qnil); return self; } @@ -335,6 +341,9 @@ void Init_packet(void) id_ivar_ignore_overlap = rb_intern("@ignore_overlap"); id_ivar_virtual = rb_intern("@virtual"); id_ivar_restricted = rb_intern("@restricted"); + id_ivar_subpacket = rb_intern("@subpacket"); + id_ivar_subpacketizer = rb_intern("@subpacketizer"); + id_ivar_obfuscated_items = rb_intern("@obfuscated_items"); cPacket = rb_define_class_under(mOpenC3, "Packet", cStructure); rb_define_method(cPacket, "initialize", packet_initialize, -1); diff --git a/openc3/lib/openc3/accessors/template_accessor.rb b/openc3/lib/openc3/accessors/template_accessor.rb index a38fd3b419..607d2c44a6 100644 --- a/openc3/lib/openc3/accessors/template_accessor.rb +++ b/openc3/lib/openc3/accessors/template_accessor.rb @@ -25,6 +25,7 @@ def initialize(packet, left_char = '<', right_char = '>') @left_char = left_char @right_char = right_char @configured = false + @args = [left_char, right_char] end def configure diff --git a/openc3/lib/openc3/api/tlm_api.rb b/openc3/lib/openc3/api/tlm_api.rb index 2f481de6c8..f866249904 100644 --- a/openc3/lib/openc3/api/tlm_api.rb +++ b/openc3/lib/openc3/api/tlm_api.rb @@ -221,15 +221,21 @@ def normalize_tlm(*args, type: :ALL, manual: false, scope: $openc3_scope, token: # @param target_name [String] Name of the target # @param packet_name [String] Name of the packet # @return [Hash] telemetry hash with last telemetry buffer - def get_tlm_buffer(*args, manual: false, scope: $openc3_scope, token: $openc3_token) + def get_tlm_buffer(*args, manual: false, scope: $openc3_scope, token: $openc3_token, timeout: 5) target_name, packet_name = _extract_target_packet_names('get_tlm_buffer', *args) authorize(permission: 'tlm', target_name: target_name, packet_name: packet_name, manual: manual, scope: scope, token: token) - TargetModel.packet(target_name, packet_name, scope: scope) - topic = "#{scope}__TELEMETRY__{#{target_name}}__#{packet_name}" - msg_id, msg_hash = Topic.get_newest_message(topic) - if msg_id + model = TargetModel.packet(target_name, packet_name, scope: scope) + if model["subpacket"] + msg_hash = DecomInterfaceTopic.get_tlm_buffer(target_name, packet_name, timeout: timeout, scope: scope) msg_hash['buffer'] = msg_hash['buffer'].b return msg_hash + else + topic = "#{scope}__TELEMETRY__{#{target_name}}__#{packet_name}" + msg_id, msg_hash = Topic.get_newest_message(topic) + if msg_id + msg_hash['buffer'] = msg_hash['buffer'].b + return msg_hash + end end return nil end @@ -437,8 +443,11 @@ def get_tlm(*args, manual: false, scope: $openc3_scope, token: $openc3_token) # @param packet_name [String] Name of the packet # @param item_name [String] Name of the packet # @return [Hash] Telemetry packet item hash - def get_item(*args, manual: false, scope: $openc3_scope, token: $openc3_token) + def get_item(*args, manual: false, scope: $openc3_scope, token: $openc3_token, cache_timeout: nil) target_name, packet_name, item_name = _extract_target_packet_item_names('get_item', *args) + if packet_name == 'LATEST' + packet_name = CvtModel.determine_latest_packet_for_item(target_name, item_name, cache_timeout: cache_timeout, scope: scope) + end authorize(permission: 'tlm', target_name: target_name, packet_name: packet_name, manual: manual, scope: scope, token: token) TargetModel.packet_item(target_name, packet_name, item_name, scope: scope) end diff --git a/openc3/lib/openc3/interfaces/protocols/fixed_protocol.rb b/openc3/lib/openc3/interfaces/protocols/fixed_protocol.rb index 69846b9505..edc26422df 100644 --- a/openc3/lib/openc3/interfaces/protocols/fixed_protocol.rb +++ b/openc3/lib/openc3/interfaces/protocols/fixed_protocol.rb @@ -89,12 +89,10 @@ def identify_and_finish_packet begin if @telemetry target_packets = System.telemetry.packets(target_name) - target = System.targets[target_name] - unique_id_mode = target.tlm_unique_id_mode if target + unique_id_mode = System.telemetry.tlm_unique_id_mode(target_name) else target_packets = System.commands.packets(target_name) - target = System.targets[target_name] - unique_id_mode = target.cmd_unique_id_mode if target + unique_id_mode = System.commands.cmd_unique_id_mode(target_name) end rescue RuntimeError # No commands/telemetry for this target @@ -103,7 +101,7 @@ def identify_and_finish_packet if unique_id_mode target_packets.each do |_packet_name, packet| - if packet.identify?(@data[@discard_leading_bytes..-1]) + if not packet.subpacket and packet.identify?(@data[@discard_leading_bytes..-1]) # identify? handles virtual identified_packet = packet break end @@ -111,15 +109,23 @@ def identify_and_finish_packet else # Do a hash lookup to quickly identify the packet if target_packets.length > 0 - packet = target_packets.first[1] - key = packet.read_id_values(@data[@discard_leading_bytes..-1]) - if @telemetry - hash = System.telemetry.config.tlm_id_value_hash[target_name] - else - hash = System.commands.config.cmd_id_value_hash[target_name] + packet = nil + target_packets.each do |_packet_name, target_packet| + next if target_packet.virtual + next if target_packet.subpacket + packet = target_packet + break + end + if packet + key = packet.read_id_values(@data[@discard_leading_bytes..-1]) + if @telemetry + hash = System.telemetry.config.tlm_id_value_hash[target_name] + else + hash = System.commands.config.cmd_id_value_hash[target_name] + end + identified_packet = hash[key] + identified_packet = hash['CATCHALL'.freeze] unless identified_packet end - identified_packet = hash[key] - identified_packet = hash['CATCHALL'.freeze] unless identified_packet end end diff --git a/openc3/lib/openc3/microservices/decom_microservice.rb b/openc3/lib/openc3/microservices/decom_microservice.rb index b71db2a223..1337ae7964 100644 --- a/openc3/lib/openc3/microservices/decom_microservice.rb +++ b/openc3/lib/openc3/microservices/decom_microservice.rb @@ -24,6 +24,7 @@ require 'thread' require 'openc3/microservices/microservice' require 'openc3/microservices/interface_decom_common' +require 'openc3/microservices/interface_microservice' require 'openc3/topics/telemetry_decom_topic' require 'openc3/topics/limits_event_topic' @@ -127,6 +128,10 @@ def run handle_build_cmd(msg_hash['build_cmd'], msg_id) next end + if msg_hash.key?('get_tlm_buffer') + handle_get_tlm_buffer(msg_hash['get_tlm_buffer'], msg_id) + next + end else decom_packet(topic, msg_id, msg_hash, redis) @metric.set(name: 'decom_total', value: @count, type: 'counter') @@ -154,9 +159,12 @@ def decom_packet(_topic, msg_id, msg_hash, _redis) @metric.set(name: 'decom_topic_delta_seconds', value: delta, type: 'gauge', unit: 'seconds', help: 'Delta time between data written to stream and decom start') start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + ####################################### + # Build packet object from topic data + ####################################### target_name = msg_hash["target_name"] packet_name = msg_hash["packet_name"] - packet = System.telemetry.packet(target_name, packet_name) packet.stored = ConfigParser.handle_true_false(msg_hash["stored"]) # Note: Packet time will be recalculated as part of decom so not setting @@ -168,29 +176,101 @@ def decom_packet(_topic, msg_id, msg_hash, _redis) packet.extra = extra end packet.buffer = msg_hash["buffer"] - # Processors are user code points which must be rescued - # so the TelemetryDecomTopic can write the packet - begin - packet.process # Run processors - rescue Exception => e - @error_count += 1 - @metric.set(name: 'decom_error_total', value: @error_count, type: 'counter') - @error = e - @logger.error e.message - end - # Process all the limits and call the limits_change_callback (as necessary) - # check_limits also can call user code in the limits response - # but that is rescued separately in the limits_change_callback - packet.check_limits(System.limits_set) - # This is what actually decommutates the packet and updates the CVT - TelemetryDecomTopic.write_packet(packet, scope: @scope) + ################################################################################ + # Break packet into subpackets (if necessary) + # Subpackets are typically channelized data + ################################################################################ + packet_and_subpackets = packet.subpacketize + + packet_and_subpackets.each do |packet_or_subpacket| + if packet_or_subpacket.subpacket + packet_or_subpacket = handle_subpacket(packet, packet_or_subpacket) + end + + ##################################################################################### + # Run Processors + # This must be before the full decom so that processor derived values are available + ##################################################################################### + begin + packet_or_subpacket.process # Run processors + rescue Exception => e + @error_count += 1 + @metric.set(name: 'decom_error_total', value: @error_count, type: 'counter') + @error = e + @logger.error e.message + end + + ############################################################################# + # Process all the limits and call the limits_change_callback (as necessary) + # This must be before the full decom so that limits states are available + ############################################################################# + packet_or_subpacket.check_limits(System.limits_set) + + # This is what actually decommutates the packet and updates the CVT + TelemetryDecomTopic.write_packet(packet_or_subpacket, scope: @scope) + end diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float @metric.set(name: 'decom_duration_seconds', value: diff, type: 'gauge', unit: 'seconds') end end + def handle_subpacket(packet, subpacket) + # Subpacket received time always = packet.received_time + # Use packet_time appropriately if another timestamp is needed + subpacket.received_time = packet.received_time + subpacket.stored = packet.stored + subpacket.extra = packet.extra + + if subpacket.stored + # Stored telemetry does not update the current value table + identified_subpacket = System.telemetry.identify_and_define_packet(subpacket, @target_names, subpackets: true) + else + # Identify and update subpacket + if subpacket.identified? + begin + # Preidentifed subpacket - place it into the current value table + identified_subpacket = System.telemetry.update!(subpacket.target_name, + subpacket.packet_name, + subpacket.buffer) + rescue RuntimeError + # Subpacket identified but we don't know about it + # Clear packet_name and target_name and try to identify + @logger.warn "#{@name}: Received unknown identified subpacket: #{subpacket.target_name} #{subpacket.packet_name}" + subpacket.target_name = nil + subpacket.packet_name = nil + identified_subpacket = System.telemetry.identify!(subpacket.buffer, + @target_names, subpackets: true) + end + else + # Packet needs to be identified + identified_subpacket = System.telemetry.identify!(subpacket.buffer, + @target_names, subpackets: true) + end + end + + if identified_subpacket + identified_subpacket.received_time = subpacket.received_time + identified_subpacket.stored = subpacket.stored + identified_subpacket.extra = subpacket.extra + subpacket = identified_subpacket + else + unknown_subpacket = System.telemetry.update!('UNKNOWN', 'UNKNOWN', subpacket.buffer) + unknown_subpacket.received_time = subpacket.received_time + unknown_subpacket.stored = subpacket.stored + unknown_subpacket.extra = subpacket.extra + subpacket = unknown_subpacket + num_bytes_to_print = [InterfaceMicroservice::UNKNOWN_BYTES_TO_PRINT, subpacket.length].min + data = subpacket.buffer(false)[0..(num_bytes_to_print - 1)] + prefix = data.each_byte.map { | byte | sprintf("%02X", byte) }.join() + @logger.warn "#{@name} #{subpacket.target_name} packet length: #{subpacket.length} starting with: #{prefix}" + end + + TargetModel.sync_tlm_packet_counts(subpacket, @target_names, scope: @scope) + return subpacket + end + # Called when an item in any packet changes limits states. # # @param packet [Packet] Packet which has had an item change limits state diff --git a/openc3/lib/openc3/microservices/interface_decom_common.rb b/openc3/lib/openc3/microservices/interface_decom_common.rb index c9ef86a230..fcd8dca29b 100644 --- a/openc3/lib/openc3/microservices/interface_decom_common.rb +++ b/openc3/lib/openc3/microservices/interface_decom_common.rb @@ -76,5 +76,37 @@ def handle_build_cmd(build_cmd_json, msg_id) end Topic.write_topic(ack_topic, msg_hash) end + + def handle_get_tlm_buffer(get_tlm_buffer_json, msg_id) + get_tlm_buffer_hash = JSON.parse(get_tlm_buffer_json, allow_nan: true, create_additions: true) + target_name = get_tlm_buffer_hash['target_name'] + packet_name = get_tlm_buffer_hash['packet_name'] + ack_topic = "{#{@scope}__ACKCMD}TARGET__#{target_name}" + begin + packet = System.telemetry.packet(target_name, packet_name) + msg_hash = { + id: msg_id, + result: 'SUCCESS', + time: packet.packet_time.to_nsec_from_epoch, + received_time: packet.received_time.to_nsec_from_epoch, + target_name: packet.target_name, + packet_name: packet.packet_name, + received_count: packet.received_count, + stored: packet.stored.to_s, + buffer: packet.buffer(false) + } + msg_hash[:extra] = JSON.generate(packet.extra.as_json, allow_nan: true) if packet.extra + + # If there is an error due to parameter out of range, etc, we rescue it so we can + # write the ACKCMD}TARGET topic and allow the source to return + rescue => error + msg_hash = { + id: msg_id, + result: 'ERROR', + message: error.message + } + end + Topic.write_topic(ack_topic, msg_hash) + end end end diff --git a/openc3/lib/openc3/microservices/interface_microservice.rb b/openc3/lib/openc3/microservices/interface_microservice.rb index a89992fb5b..dd8927d157 100644 --- a/openc3/lib/openc3/microservices/interface_microservice.rb +++ b/openc3/lib/openc3/microservices/interface_microservice.rb @@ -554,23 +554,7 @@ def initialize(name) target = System.targets[target_name] target.interface = @interface end - @interface.tlm_target_names.each do |target_name| - # Initialize the target's packet counters based on the Topic stream - # Prevents packet count resetting to 0 when interface restarts - begin - System.telemetry.packets(target_name).each do |packet_name, packet| - topic = "#{@scope}__TELEMETRY__{#{target_name}}__#{packet_name}" - msg_id, msg_hash = Topic.get_newest_message(topic) - if msg_id - packet.received_count = msg_hash['received_count'].to_i - else - packet.received_count = 0 - end - end - rescue - # Handle targets without telemetry - end - end + TargetModel.init_tlm_packet_counts(@interface.tlm_target_names, scope: @scope) if @interface.connect_on_startup @interface.state = 'ATTEMPTING' else @@ -583,9 +567,6 @@ def initialize(name) end @queued = false - @sync_packet_count_data = {} - @sync_packet_count_time = nil - @sync_packet_count_delay_seconds = 1.0 # Sync packet counts every second @interface.options.each do |option_name, option_values| if option_name.upcase == 'OPTIMIZE_THROUGHPUT' @queued = true @@ -594,7 +575,7 @@ def initialize(name) StoreQueued.instance.set_update_interval(update_interval) end if option_name.upcase == 'SYNC_PACKET_COUNT_DELAY_SECONDS' - @sync_packet_count_delay_seconds = option_values[0].to_f + TargetModel.sync_packet_count_delay_seconds = option_values[0].to_f end end @@ -777,7 +758,7 @@ def handle_packet(packet) # Write to stream if @interface.tlm_target_enabled[packet.target_name] - sync_tlm_packet_counts(packet) + TargetModel.sync_tlm_packet_counts(packet, @interface.tlm_target_names, scope: @scope) TelemetryTopic.write_packet(packet, queued: @queued, scope: @scope) end end @@ -912,59 +893,6 @@ def shutdown(_sig = nil) def graceful_kill # Just to avoid warning end - - def sync_tlm_packet_counts(packet) - if @sync_packet_count_delay_seconds <= 0 or $openc3_redis_cluster - # Perfect but slow method - packet.received_count = TargetModel.increment_telemetry_count(packet.target_name, packet.packet_name, 1, scope: @scope) - else - # Eventually consistent method - # Only sync every period (default 1 second) to avoid hammering Redis - # This is a trade off between speed and accuracy - # The packet count is eventually consistent - @sync_packet_count_data[packet.target_name] ||= {} - @sync_packet_count_data[packet.target_name][packet.packet_name] ||= 0 - @sync_packet_count_data[packet.target_name][packet.packet_name] += 1 - - # Ensures counters change between syncs - packet.received_count += 1 - - # Check if we need to sync the packet counts - if @sync_packet_count_time.nil? or (Time.now - @sync_packet_count_time) > @sync_packet_count_delay_seconds - @sync_packet_count_time = Time.now - - inc_count = 0 - # Use pipeline to make this one transaction - result = Store.redis_pool.pipelined do - # Increment global counters for packets received - @sync_packet_count_data.each do |target_name, packet_data| - packet_data.each do |packet_name, count| - TargetModel.increment_telemetry_count(target_name, packet_name, count, scope: @scope) - inc_count += 1 - end - end - @sync_packet_count_data = {} - - # Get all the packet counts with the global counters - @interface.tlm_target_names.each do |target_name| - TargetModel.get_all_telemetry_counts(target_name, scope: @scope) - end - TargetModel.get_all_telemetry_counts('UNKNOWN', scope: @scope) - end - @interface.tlm_target_names.each do |target_name| - result[inc_count].each do |packet_name, count| - update_packet = System.telemetry.packet(target_name, packet_name) - update_packet.received_count = count.to_i - end - inc_count += 1 - end - result[inc_count].each do |packet_name, count| - update_packet = System.telemetry.packet('UNKNOWN', packet_name) - update_packet.received_count = count.to_i - end - end - end - end end end diff --git a/openc3/lib/openc3/migrations/20251022000000_remove_unique_id.rb b/openc3/lib/openc3/migrations/20251022000000_remove_unique_id.rb new file mode 100644 index 0000000000..1f3262f28e --- /dev/null +++ b/openc3/lib/openc3/migrations/20251022000000_remove_unique_id.rb @@ -0,0 +1,23 @@ +require 'openc3/utilities/migration' +require 'openc3/models/scope_model' +require 'openc3/models/microservice_model' + +module OpenC3 + class RemoveUniqueId < Migration + def self.run + ScopeModel.get_all_models(scope: nil).each do |scope, scope_model| + target_models = TargetModel.all(scope: scope) + target_models.each do |name, target_model| + target_model.delete("cmd_unique_id_mode") + target_model.delete("tlm_unique_id_mode") + model = TargetModel.from_json(target_model, scope: scope) + model.update() + end + end + end + end +end + +unless ENV['OPENC3_NO_MIGRATE'] + OpenC3::RemoveUniqueId.run +end diff --git a/openc3/lib/openc3/models/target_model.rb b/openc3/lib/openc3/models/target_model.rb index cef7ab969e..3c0e6b011f 100644 --- a/openc3/lib/openc3/models/target_model.rb +++ b/openc3/lib/openc3/models/target_model.rb @@ -54,6 +54,9 @@ class TargetModel < Model ERB_EXTENSIONS = %w(.txt .rb .py .json .yaml .yml) ITEM_MAP_CACHE_TIMEOUT = 10.0 @@item_map_cache = {} + @@sync_packet_count_data = {} + @@sync_packet_count_time = nil + @@sync_packet_count_delay_seconds = 1.0 # Sync packet counts every second attr_accessor :folder_name attr_accessor :requires @@ -61,8 +64,6 @@ class TargetModel < Model attr_accessor :ignored_items attr_accessor :limits_groups attr_accessor :cmd_tlm_files - attr_accessor :cmd_unique_id_mode - attr_accessor :tlm_unique_id_mode attr_accessor :id attr_accessor :cmd_buffer_depth attr_accessor :cmd_log_cycle_time @@ -351,8 +352,6 @@ def initialize( ignored_items: [], limits_groups: [], cmd_tlm_files: [], - cmd_unique_id_mode: false, - tlm_unique_id_mode: false, id: nil, updated_at: nil, plugin: nil, @@ -402,8 +401,6 @@ def initialize( @ignored_items = ignored_items @limits_groups = limits_groups @cmd_tlm_files = cmd_tlm_files - @cmd_unique_id_mode = cmd_unique_id_mode - @tlm_unique_id_mode = tlm_unique_id_mode @id = id @cmd_buffer_depth = cmd_buffer_depth @cmd_log_cycle_time = cmd_log_cycle_time @@ -442,8 +439,6 @@ def as_json(*_a) 'ignored_items' => @ignored_items, 'limits_groups' => @limits_groups, 'cmd_tlm_files' => @cmd_tlm_files, - 'cmd_unique_id_mode' => @cmd_unique_id_mode, - 'tlm_unique_id_mode' => @tlm_unique_id_mode, 'id' => @id, 'updated_at' => @updated_at, 'plugin' => @plugin, @@ -790,8 +785,6 @@ def update_target_model(system) @ignored_parameters = target.ignored_parameters @ignored_items = target.ignored_items @cmd_tlm_files = target.cmd_tlm_files - @cmd_unique_id_mode = target.cmd_unique_id_mode - @tlm_unique_id_mode = target.tlm_unique_id_mode @limits_groups = system.limits.groups.keys update() end @@ -1365,6 +1358,81 @@ def self.get_telemetry_counts(target_packets, scope:) return counts end + def self.sync_packet_count_delay_seconds=(value) + @@sync_packet_count_delay_seconds = value + end + + def self.init_tlm_packet_counts(tlm_target_names, scope:) + @@sync_packet_count_time = Time.now + + # Get all the packet counts with the global counters + tlm_target_names.each do |target_name| + get_all_telemetry_counts(target_name, scope: scope).each do |packet_name, count| + update_packet = System.telemetry.packet(target_name, packet_name) + update_packet.received_count = count.to_i + end + end + get_all_telemetry_counts('UNKNOWN', scope: scope).each do |packet_name, count| + update_packet = System.telemetry.packet('UNKNOWN', packet_name) + update_packet.received_count = count.to_i + end + end + + def self.sync_tlm_packet_counts(packet, tlm_target_names, scope:) + if @@sync_packet_count_delay_seconds <= 0 or $openc3_redis_cluster + # Perfect but slow method + packet.received_count = increment_telemetry_count(packet.target_name, packet.packet_name, 1, scope: scope) + else + # Eventually consistent method + # Only sync every period (default 1 second) to avoid hammering Redis + # This is a trade off between speed and accuracy + # The packet count is eventually consistent + @@sync_packet_count_data[packet.target_name] ||= {} + @@sync_packet_count_data[packet.target_name][packet.packet_name] ||= 0 + @@sync_packet_count_data[packet.target_name][packet.packet_name] += 1 + + # Ensures counters change between syncs + update_packet = System.telemetry.packet(packet.target_name, packet.packet_name) + update_packet.received_count += 1 + packet.received_count = update_packet.received_count + + # Check if we need to sync the packet counts + if @@sync_packet_count_time.nil? or (Time.now - @@sync_packet_count_time) > @@sync_packet_count_delay_seconds + @@sync_packet_count_time = Time.now + + inc_count = 0 + # Use pipeline to make this one transaction + result = Store.redis_pool.pipelined do + # Increment global counters for packets received + @@sync_packet_count_data.each do |target_name, packet_data| + packet_data.each do |packet_name, count| + increment_telemetry_count(target_name, packet_name, count, scope: scope) + inc_count += 1 + end + end + @@sync_packet_count_data = {} + + # Get all the packet counts with the global counters + tlm_target_names.each do |target_name| + get_all_telemetry_counts(target_name, scope: scope) + end + get_all_telemetry_counts('UNKNOWN', scope: scope) + end + tlm_target_names.each do |target_name| + result[inc_count].each do |packet_name, count| + update_packet = System.telemetry.packet(target_name, packet_name) + update_packet.received_count = count.to_i + end + inc_count += 1 + end + result[inc_count].each do |packet_name, count| + update_packet = System.telemetry.packet('UNKNOWN', packet_name) + update_packet.received_count = count.to_i + end + end + end + end + def self.increment_command_count(target_name, packet_name, count, scope:) result = Store.hincrby("#{scope}__COMMANDCNTS__{#{target_name}}", packet_name, count) if String === result diff --git a/openc3/lib/openc3/packets/commands.rb b/openc3/lib/openc3/packets/commands.rb index 997ee6dfba..14a484114c 100644 --- a/openc3/lib/openc3/packets/commands.rb +++ b/openc3/lib/openc3/packets/commands.rb @@ -105,7 +105,7 @@ def params(target_name, packet_name) # # @param (see #identify_tlm!) # @return (see #identify_tlm!) - def identify(packet_data, target_names = nil) + def identify(packet_data, target_names = nil, subpackets: false) identified_packet = nil target_names = target_names() unless target_names @@ -120,21 +120,39 @@ def identify(packet_data, target_names = nil) next end - target = System.targets[target_name] - if target and target.cmd_unique_id_mode + if (not subpackets and System.commands.cmd_unique_id_mode(target_name)) or (subpackets and System.commands.cmd_subpacket_unique_id_mode(target_name)) # Iterate through the packets and see if any represent the buffer target_packets.each do |_packet_name, packet| - if packet.identify?(packet_data) + if subpackets + next unless packet.subpacket + else + next if packet.subpacket + end + if packet.identify?(packet_data) # Handles virtual identified_packet = packet break end end else # Do a hash lookup to quickly identify the packet - if target_packets.length > 0 - packet = target_packets.first[1] + packet = nil + target_packets.each do |_packet_name, target_packet| + next if target_packet.virtual + if subpackets + next unless target_packet.subpacket + else + next if target_packet.subpacket + end + packet = target_packet + break + end + if packet key = packet.read_id_values(packet_data) - hash = @config.cmd_id_value_hash[target_name] + if subpackets + hash = @config.cmd_subpacket_id_value_hash[target_name] + else + hash = @config.cmd_id_value_hash[target_name] + end identified_packet = hash[key] identified_packet = hash['CATCHALL'.freeze] unless identified_packet end @@ -264,6 +282,14 @@ def dynamic_add_packet(packet, affect_ids: false) @config.dynamic_add_packet(packet, :COMMAND, affect_ids: affect_ids) end + def cmd_unique_id_mode(target_name) + return @config.cmd_unique_id_mode[target_name.upcase] + end + + def cmd_subpacket_unique_id_mode(target_name) + return @config.cmd_subpacket_unique_id_mode[target_name.upcase] + end + protected def set_parameters(command, params, range_checking) diff --git a/openc3/lib/openc3/packets/packet.rb b/openc3/lib/openc3/packets/packet.rb index 762e16c44d..9e9f7cb64c 100644 --- a/openc3/lib/openc3/packets/packet.rb +++ b/openc3/lib/openc3/packets/packet.rb @@ -14,7 +14,7 @@ # GNU Affero General Public License for more details. # Modified by OpenC3, Inc. -# All changes Copyright 2024, OpenC3, Inc. +# All changes Copyright 2025, OpenC3, Inc. # All Rights Reserved # # This file may also be used under the terms of a commercial license @@ -113,6 +113,12 @@ class Packet < Structure # @return [Boolean] If this packet is marked as restricted use attr_accessor :restricted + # @return [Boolean] If this packet is marked as a subpacket + attr_accessor :subpacket + + # @return [Subpacketizer] Subpacketizer class (optional) + attr_accessor :subpacketizer + # Valid format types VALUE_TYPES = [:RAW, :CONVERTED, :FORMATTED, :WITH_UNITS] @@ -156,7 +162,9 @@ def initialize(target_name = nil, packet_name = nil, default_endianness = :BIG_E @virtual = false @restricted = false @validator = nil - @obfuscated_items = [] + @subpacket = false + @subpacketizer = nil + @obfuscated_items = nil end # Sets the target name this packet is associated with. Unidentified packets @@ -1087,6 +1095,9 @@ def to_config(cmd_or_tlm) else config << "COMMAND #{@target_name.to_s.quote_if_necessary} #{@packet_name.to_s.quote_if_necessary} #{@default_endianness} \"#{@description}\"\n" end + if @subpacketizer + config << " SUBPACKETIZER #{@subpacketizer.class} #{@subpacketizer.args.map { |a| a.to_s.quote_if_necessary }.join(" ")}\n" + end if @accessor.class.to_s != 'OpenC3::BinaryAccessor' config << " ACCESSOR #{@accessor.class} #{@accessor.args.map { |a| a.to_s.quote_if_necessary }.join(" ")}\n" end @@ -1107,6 +1118,9 @@ def to_config(cmd_or_tlm) elsif @hidden config << " HIDDEN\n" end + if @subpacket + config << " SUBPACKET\n" + end if @restricted config << " RESTRICTED\n" end @@ -1170,6 +1184,8 @@ def as_json(*a) config['disabled'] = true if @disabled config['hidden'] = true if @hidden config['virtual'] = true if @virtual + config['subpacket'] = true if @subpacket + config['subpacketizer'] = @subpacketizer.class.to_s if @subpacketizer config['restricted'] = true if @restricted config['accessor'] = @accessor.class.to_s config['accessor_args'] = @accessor.args @@ -1218,60 +1234,6 @@ def as_json(*a) config end - def self.from_json(hash) - endianness = hash['endianness'] ? hash['endianness'].intern : nil # Convert to symbol - packet = Packet.new(hash['target_name'], hash['packet_name'], endianness, hash['description']) - packet.short_buffer_allowed = hash['short_buffer_allowed'] - packet.hazardous = hash['hazardous'] - packet.hazardous_description = hash['hazardous_description'] - packet.messages_disabled = hash['messages_disabled'] - packet.disabled = hash['disabled'] - packet.hidden = hash['hidden'] - packet.virtual = hash['virtual'] - packet.restricted = hash['restricted'] - if hash['accessor'] - begin - accessor = OpenC3::const_get(hash['accessor']) - if hash['accessor_args'] and hash['accessor_args'].length > 0 - packet.accessor = accessor.new(packet, *hash['accessor_args']) - else - packet.accessor = accessor.new(packet) - end - rescue => e - Logger.instance.error "#{packet.target_name} #{packet.packet_name} accessor of #{hash['accessor']} could not be found due to #{e}" - end - end - if hash['validator'] - begin - validator = OpenC3::const_get(hash['validator']) - packet.validator = validator.new(packet) - rescue => e - Logger.instance.error "#{packet.target_name} #{packet.packet_name} validator of #{hash['validator']} could not be found due to #{e}" - end - end - packet.template = Base64.decode64(hash['template']) if hash['template'] - packet.meta = hash['meta'] - # Can't convert processors - hash['items'].each do |item| - packet.define(PacketItem.from_json(item)) - end - if hash['response'] - packet.response = hash['response'] - end - if hash['error_response'] - packet.error_response = hash['error_response'] - end - if hash['screen'] - packet.screen = hash['screen'] - end - if hash['related_items'] - packet.related_items = hash['related_items'] - end - packet.ignore_overlap = hash['ignore_overlap'] - - packet - end - def decom # Read all the RAW at once because this could be optimized by the accessor json_hash = read_items(@sorted_items) @@ -1306,6 +1268,14 @@ def process(buffer = @buffer) end end + def subpacketize + if @subpacketizer + return @subpacketizer.call(self) + else + return [self] + end + end + def obfuscate() return unless @buffer return unless @obfuscated_items diff --git a/openc3/lib/openc3/packets/packet_config.rb b/openc3/lib/openc3/packets/packet_config.rb index a376b456e3..6db2ceeabc 100644 --- a/openc3/lib/openc3/packets/packet_config.rb +++ b/openc3/lib/openc3/packets/packet_config.rb @@ -79,11 +79,21 @@ class PacketConfig # that returns a hash keyed by an array of id values. The id values resolve to the packet # defined by that identification. Command version attr_reader :cmd_id_value_hash + attr_reader :cmd_subpacket_id_value_hash + attr_reader :cmd_id_signature + attr_reader :cmd_subpacket_id_signature + attr_reader :cmd_unique_id_mode + attr_reader :cmd_subpacket_unique_id_mode # @return [Hash=>Hash=>Packet] Hash keyed by target name # that returns a hash keyed by an array of id values. The id values resolve to the packet # defined by that identification. Telemetry version attr_reader :tlm_id_value_hash + attr_reader :tlm_subpacket_id_value_hash + attr_reader :tlm_id_signature + attr_reader :tlm_subpacket_id_signature + attr_reader :tlm_unique_id_mode + attr_reader :tlm_subpacket_unique_id_mode # @return [String] Language of current target (ruby or python) attr_reader :language @@ -102,7 +112,17 @@ def initialize @latest_data = {} @warnings = [] @cmd_id_value_hash = {} + @cmd_subpacket_id_value_hash = {} + @cmd_id_signature = {} + @cmd_subpacket_id_signature = {} + @cmd_unique_id_mode = {} + @cmd_subpacket_unique_id_mode = {} @tlm_id_value_hash = {} + @tlm_subpacket_id_value_hash = {} + @tlm_id_signature = {} + @tlm_subpacket_id_signature = {} + @tlm_unique_id_mode = {} + @tlm_subpacket_unique_id_mode = {} # Create unknown packets @commands['UNKNOWN'] = {} @@ -222,7 +242,7 @@ def process_file(filename, process_target_name, language = 'ruby') 'APPEND_PARAMETER', 'APPEND_ID_ITEM', 'APPEND_ID_PARAMETER', 'APPEND_ARRAY_ITEM',\ 'APPEND_ARRAY_PARAMETER', 'ALLOW_SHORT', 'HAZARDOUS', 'PROCESSOR', 'META',\ 'DISABLE_MESSAGES', 'HIDDEN', 'DISABLED', 'VIRTUAL', 'RESTRICTED', 'ACCESSOR', 'TEMPLATE', 'TEMPLATE_FILE',\ - 'RESPONSE', 'ERROR_RESPONSE', 'SCREEN', 'RELATED_ITEM', 'IGNORE_OVERLAP', 'VALIDATOR' + 'RESPONSE', 'ERROR_RESPONSE', 'SCREEN', 'RELATED_ITEM', 'IGNORE_OVERLAP', 'VALIDATOR', 'SUBPACKET', 'SUBPACKETIZER' raise parser.error("No current packet for #{keyword}") unless @current_packet process_current_packet(parser, keyword, params) @@ -321,18 +341,20 @@ def finish_packet PacketParser.check_item_data_types(@current_packet) @commands[@current_packet.target_name][@current_packet.packet_name] = @current_packet unless @current_packet.virtual - hash = @cmd_id_value_hash[@current_packet.target_name] - hash = {} unless hash - @cmd_id_value_hash[@current_packet.target_name] = hash - update_id_value_hash(@current_packet, hash) + if @current_packet.subpacket + build_id_metadata(@current_packet, @cmd_subpacket_id_value_hash, @cmd_subpacket_id_signature, @cmd_subpacket_unique_id_mode) + else + build_id_metadata(@current_packet, @cmd_id_value_hash, @cmd_id_signature, @cmd_unique_id_mode) + end end else @telemetry[@current_packet.target_name][@current_packet.packet_name] = @current_packet unless @current_packet.virtual - hash = @tlm_id_value_hash[@current_packet.target_name] - hash = {} unless hash - @tlm_id_value_hash[@current_packet.target_name] = hash - update_id_value_hash(@current_packet, hash) + if @current_packet.subpacket + build_id_metadata(@current_packet, @tlm_subpacket_id_value_hash, @tlm_subpacket_id_signature, @tlm_subpacket_unique_id_mode) + else + build_id_metadata(@current_packet, @tlm_id_value_hash, @tlm_id_signature, @tlm_unique_id_mode) + end end end @current_packet = nil @@ -345,10 +367,11 @@ def dynamic_add_packet(packet, cmd_or_tlm = :TELEMETRY, affect_ids: false) @commands[packet.target_name][packet.packet_name] = packet if affect_ids and not packet.virtual - hash = @cmd_id_value_hash[packet.target_name] - hash = {} unless hash - @cmd_id_value_hash[packet.target_name] = hash - update_id_value_hash(packet, hash) + if packet.subpacket + build_id_metadata(packet, @cmd_subpacket_id_value_hash, @cmd_subpacket_id_signature, @cmd_subpacket_unique_id_mode) + else + build_id_metadata(packet, @cmd_id_value_hash, @cmd_id_signature, @cmd_unique_id_mode) + end end else @telemetry[packet.target_name][packet.packet_name] = packet @@ -362,10 +385,11 @@ def dynamic_add_packet(packet, cmd_or_tlm = :TELEMETRY, affect_ids: false) end if affect_ids and not packet.virtual - hash = @tlm_id_value_hash[packet.target_name] - hash = {} unless hash - @tlm_id_value_hash[packet.target_name] = hash - update_id_value_hash(packet, hash) + if packet.subpacket + build_id_metadata(packet, @tlm_subpacket_id_value_hash, @tlm_subpacket_id_signature, @tlm_subpacket_unique_id_mode) + else + build_id_metadata(packet, @tlm_id_value_hash, @tlm_id_signature, @tlm_unique_id_mode) + end end end end @@ -398,15 +422,32 @@ def self.from_config(config, process_target_name, language = 'ruby') protected - def update_id_value_hash(packet, hash) + def build_id_metadata(packet, id_value_hash, id_signature_hash, unique_id_mode_hash) + target_id_value_hash = id_value_hash[packet.target_name] + target_id_value_hash = {} unless target_id_value_hash + id_value_hash[packet.target_name] = target_id_value_hash + update_id_value_hash(packet, target_id_value_hash, id_signature_hash, unique_id_mode_hash) + end + + def update_id_value_hash(packet, target_id_value_hash, id_signature_hash, unique_id_mode_hash) if packet.id_items.length > 0 key = [] + id_signature = "" packet.id_items.each do |item| key << item.id_value + id_signature << "__#{item.key}__#{item.bit_offset}__#{item.bit_size}__#{item.data_type}" + end + target_id_value_hash[key] = packet + target_id_signature = id_signature_hash[packet.target_name] + if target_id_signature + if id_signature != target_id_signature + unique_id_mode_hash[packet.target_name] = true + end + else + id_signature_hash[packet.target_name] = id_signature end - hash[key] = packet else - hash['CATCHALL'.freeze] = packet + target_id_value_hash['CATCHALL'.freeze] = packet end end @@ -509,12 +550,17 @@ def process_current_packet(parser, keyword, params) @current_packet.disabled = true @current_packet.virtual = true + when 'SUBPACKET' + usage = "#{keyword}" + parser.verify_num_parameters(0, 0, usage) + @current_packet.subpacket = true + when 'RESTRICTED' usage = "#{keyword}" parser.verify_num_parameters(0, 0, usage) @current_packet.restricted = true - when 'ACCESSOR', 'VALIDATOR' + when 'ACCESSOR', 'VALIDATOR', 'SUBPACKETIZER' usage = "#{keyword} ..." parser.verify_num_parameters(1, nil, usage) begin diff --git a/openc3/lib/openc3/packets/telemetry.rb b/openc3/lib/openc3/packets/telemetry.rb index e3512fbb98..f8967d00c0 100644 --- a/openc3/lib/openc3/packets/telemetry.rb +++ b/openc3/lib/openc3/packets/telemetry.rb @@ -264,8 +264,8 @@ def newest_packet(target_name, item_name) # default value of nil means to search all known targets. # @return [Packet] The identified packet with its data set to the given # packet_data buffer. Returns nil if no packet could be identified. - def identify!(packet_data, target_names = nil) - identified_packet = identify(packet_data, target_names) + def identify!(packet_data, target_names = nil, subpackets: false) + identified_packet = identify(packet_data, target_names, subpackets: subpackets) identified_packet.buffer = packet_data if identified_packet return identified_packet end @@ -277,7 +277,7 @@ def identify!(packet_data, target_names = nil) # @param target_names [Array] List of target names to limit the search. The # default value of nil means to search all known targets. # @return [Packet] The identified packet, Returns nil if no packet could be identified. - def identify(packet_data, target_names = nil) + def identify(packet_data, target_names = nil, subpackets: false) target_names = target_names() unless target_names target_names.each do |target_name| @@ -292,18 +292,36 @@ def identify(packet_data, target_names = nil) next end - target = System.targets[target_name] - if target and target.tlm_unique_id_mode + if (not subpackets and System.telemetry.tlm_unique_id_mode(target_name)) or (subpackets and System.telemetry.tlm_subpacket_unique_id_mode(target_name)) # Iterate through the packets and see if any represent the buffer target_packets.each do |_packet_name, packet| - return packet if packet.identify?(packet_data) + if subpackets + next unless packet.subpacket + else + next if packet.subpacket + end + return packet if packet.identify?(packet_data) # Handles virtual end else # Do a hash lookup to quickly identify the packet - if target_packets.length > 0 - packet = target_packets.first[1] + packet = nil + target_packets.each do |_packet_name, target_packet| + next if target_packet.virtual + if subpackets + next unless target_packet.subpacket + else + next if target_packet.subpacket + end + packet = target_packet + break + end + if packet key = packet.read_id_values(packet_data) - hash = @config.tlm_id_value_hash[target_name] + if subpackets + hash = @config.tlm_subpacket_id_value_hash[target_name] + else + hash = @config.tlm_id_value_hash[target_name] + end identified_packet = hash[key] identified_packet = hash['CATCHALL'.freeze] unless identified_packet return identified_packet if identified_packet @@ -314,9 +332,9 @@ def identify(packet_data, target_names = nil) return nil end - def identify_and_define_packet(packet, target_names = nil) + def identify_and_define_packet(packet, target_names = nil, subpackets: false) if !packet.identified? - identified_packet = identify(packet.buffer(false), target_names) + identified_packet = identify(packet.buffer(false), target_names, subpackets: subpackets) return nil unless identified_packet identified_packet = identified_packet.clone @@ -425,5 +443,13 @@ def all def dynamic_add_packet(packet, affect_ids: false) @config.dynamic_add_packet(packet, :TELEMETRY, affect_ids: affect_ids) end + + def tlm_unique_id_mode(target_name) + return @config.tlm_unique_id_mode[target_name.upcase] + end + + def tlm_subpacket_unique_id_mode(target_name) + return @config.tlm_subpacket_unique_id_mode[target_name.upcase] + end end # class Telemetry end diff --git a/openc3/lib/openc3/subpacketizers/subpacketizer.rb b/openc3/lib/openc3/subpacketizers/subpacketizer.rb new file mode 100644 index 0000000000..f1fc193670 --- /dev/null +++ b/openc3/lib/openc3/subpacketizers/subpacketizer.rb @@ -0,0 +1,18 @@ +class Subpacketizer + attr_reader :args + + def initialize(packet=nil) + @packet = packet + @args = [] + end + + # Subclass and implement this method to break packet into array of subpackets + # Subpackets should be fully identified and defined + def call(packet) + return [packet] + end + + def as_json(*a) + { 'class' => self.class.name, 'args' => @args.as_json(*a) } + end +end \ No newline at end of file diff --git a/openc3/lib/openc3/system/target.rb b/openc3/lib/openc3/system/target.rb index 60a15289f3..4a38c2124d 100644 --- a/openc3/lib/openc3/system/target.rb +++ b/openc3/lib/openc3/system/target.rb @@ -14,7 +14,7 @@ # GNU Affero General Public License for more details. # Modified by OpenC3, Inc. -# All changes Copyright 2024, OpenC3, Inc. +# All changes Copyright 2025, OpenC3, Inc. # All Rights Reserved # # This file may also be used under the terms of a commercial license @@ -68,12 +68,6 @@ class Target # @return [Integer] The number of telemetry packets received from this target attr_accessor :tlm_cnt - # @return [Boolean] Indicates if all command packets identify using different fields - attr_accessor :cmd_unique_id_mode - - # @return [Boolean] Indicates if telemetry packets identify using different fields - attr_accessor :tlm_unique_id_mode - # @return [String] Id of the target configuration attr_accessor :id @@ -91,13 +85,10 @@ def initialize(target_name, path, gem_path = nil) @ignored_parameters = [] @ignored_items = [] @cmd_tlm_files = [] - # @auto_screen_substitute = false @interface = nil @routers = [] @cmd_cnt = 0 @tlm_cnt = 0 - @cmd_unique_id_mode = false - @tlm_unique_id_mode = false @name = target_name.clone.upcase.freeze get_target_dir(path, gem_path) process_target_config_file() @@ -174,20 +165,8 @@ def process_file(filename) @cmd_tlm_files << filename - # when 'AUTO_SCREEN_SUBSTITUTE' - # usage = "#{keyword}" - # parser.verify_num_parameters(0, 0, usage) - # @auto_screen_substitute = true - - when 'CMD_UNIQUE_ID_MODE' - usage = "#{keyword}" - parser.verify_num_parameters(0, 0, usage) - @cmd_unique_id_mode = true - - when 'TLM_UNIQUE_ID_MODE' - usage = "#{keyword}" - parser.verify_num_parameters(0, 0, usage) - @tlm_unique_id_mode = true + when 'CMD_UNIQUE_ID_MODE', 'TLM_UNIQUE_ID_MODE' + # Deprecated - Now autodetected else # blank lines will have a nil keyword and should not raise an exception @@ -202,15 +181,7 @@ def as_json(*_a) config['requires'] = @requires config['ignored_parameters'] = @ignored_parameters config['ignored_items'] = @ignored_items - # config['auto_screen_substitute'] = true if @auto_screen_substitute config['cmd_tlm_files'] = @cmd_tlm_files - # config['filename'] = @filename - # config['interface'] = @interface.name if @interface - # config['dir'] = @dir - # config['cmd_cnt'] = @cmd_cnt - # config['tlm_cnt'] = @tlm_cnt - config['cmd_unique_id_mode'] = true if @cmd_unique_id_mode - config['tlm_unique_id_mode'] = true if @tlm_unique_id_mode config['id'] = @id config end diff --git a/openc3/lib/openc3/topics/decom_interface_topic.rb b/openc3/lib/openc3/topics/decom_interface_topic.rb index 7785a40940..73f294055c 100644 --- a/openc3/lib/openc3/topics/decom_interface_topic.rb +++ b/openc3/lib/openc3/topics/decom_interface_topic.rb @@ -17,6 +17,7 @@ # if purchased from OpenC3, Inc. require 'openc3/topics/topic' +require 'openc3/config/config_parser' module OpenC3 class DecomInterfaceTopic < Topic @@ -58,5 +59,35 @@ def self.inject_tlm(target_name, packet_name, item_hash = nil, type: :CONVERTED, Topic.write_topic("#{scope}__DECOMINTERFACE__{#{target_name}}", { 'inject_tlm' => JSON.generate(data, allow_nan: true) }, '*', 100) end + + def self.get_tlm_buffer(target_name, packet_name, timeout: 5, scope:) + data = {} + data['target_name'] = target_name.to_s.upcase + data['packet_name'] = packet_name.to_s.upcase + # DecomMicroservice is listening to the DECOMINTERFACE topic and has + # the most recent decommed packets including subpackets + ack_topic = "{#{scope}__ACKCMD}TARGET__#{target_name}" + Topic.update_topic_offsets([ack_topic]) + decom_id = Topic.write_topic("#{scope}__DECOMINTERFACE__{#{target_name}}", + { 'get_tlm_buffer' => JSON.generate(data, allow_nan: true) }, '*', 100) + time = Time.now + while (Time.now - time) < timeout + Topic.read_topics([ack_topic]) do |_topic, _msg_id, msg_hash, _redis| + if msg_hash["id"] == decom_id + if msg_hash["result"] == "SUCCESS" + msg_hash["stored"] = ConfigParser.handle_true_false(msg_hash["stored"]) + extra = msg_hash["extra"] + if extra and extra.length > 0 + msg_hash["extra"] = JSON.parse(extra, allow_nan: true, create_additions: true) + end + return msg_hash + else + raise msg_hash["message"] + end + end + end + end + raise "Timeout of #{timeout}s waiting for ack. Does target '#{target_name}' exist?" + end end end diff --git a/openc3/python/openc3/accessors/template_accessor.py b/openc3/python/openc3/accessors/template_accessor.py index 052de7e038..0ddf3ef64d 100644 --- a/openc3/python/openc3/accessors/template_accessor.py +++ b/openc3/python/openc3/accessors/template_accessor.py @@ -24,6 +24,7 @@ def __init__(self, packet, left_char="<", right_char=">"): self.left_char = left_char self.right_char = right_char self.configured = False + self.args = [left_char, right_char] def configure(self): if self.configured: diff --git a/openc3/python/openc3/api/tlm_api.py b/openc3/python/openc3/api/tlm_api.py index 1054850c43..29e7991ea8 100644 --- a/openc3/python/openc3/api/tlm_api.py +++ b/openc3/python/openc3/api/tlm_api.py @@ -228,15 +228,19 @@ def normalize_tlm(*args, type="ALL", scope=OPENC3_SCOPE): # @param target_name [String] Name of the target # @param packet_name [String] Name of the packet # @return [Hash] telemetry hash with last telemetry buffer -def get_tlm_buffer(*args, scope=OPENC3_SCOPE): +def get_tlm_buffer(*args, scope=OPENC3_SCOPE, timeout=5): target_name, packet_name = _extract_target_packet_names("get_tlm_buffer", *args) authorize(permission="tlm", target_name=target_name, packet_name=packet_name, scope=scope) - TargetModel.packet(target_name, packet_name, scope=scope) - topic = f"{scope}__TELEMETRY__{{{target_name}}}__{packet_name}" - msg_id, msg_hash = Topic.get_newest_message(topic) - if msg_id: - # Decode the keys for user convenience + model = TargetModel.packet(target_name, packet_name, scope=scope) + if model.get("subpacket"): + msg_hash = DecomInterfaceTopic.get_tlm_buffer(target_name, packet_name, timeout=timeout, scope=scope) return {k.decode(): v for (k, v) in msg_hash.items()} + else: + topic = f"{scope}__TELEMETRY__{{{target_name}}}__{packet_name}" + msg_id, msg_hash = Topic.get_newest_message(topic) + if msg_id: + # Decode the keys for user convenience + return {k.decode(): v for (k, v) in msg_hash.items()} return None @@ -452,7 +456,7 @@ def get_tlm(*args, scope: str = OPENC3_SCOPE): get_telemetry = get_tlm -def get_item(*args, scope: str = OPENC3_SCOPE): +def get_item(*args, scope: str = OPENC3_SCOPE, cache_timeout=0.1): """Returns a telemetry packet item hash Args: @@ -462,6 +466,8 @@ def get_item(*args, scope: str = OPENC3_SCOPE): (dict) Telemetry packet hash """ target_name, packet_name, item_name = _extract_target_packet_item_names("get_item", *args) + if packet_name == 'LATEST': + packet_name = CvtModel.determine_latest_packet_for_item(target_name, item_name, cache_timeout=cache_timeout, scope=scope) authorize(permission="tlm", target_name=target_name, packet_name=packet_name, scope=scope) return TargetModel.packet_item(target_name, packet_name, item_name, scope=scope) diff --git a/openc3/python/openc3/config/config_parser.py b/openc3/python/openc3/config/config_parser.py index 93fd386ddb..cd84d73e1b 100644 --- a/openc3/python/openc3/config/config_parser.py +++ b/openc3/python/openc3/config/config_parser.py @@ -207,7 +207,7 @@ def verify_parameter_naming(self, index, usage=""): def handle_none(cls, value): if isinstance(value, str): match value.upper(): - case "" | "NONE" | "NULL": + case "" | "NIL" | "NONE" | "NULL": return None return value diff --git a/openc3/python/openc3/interfaces/protocols/fixed_protocol.py b/openc3/python/openc3/interfaces/protocols/fixed_protocol.py index e2e3fe1ee2..87412ed867 100644 --- a/openc3/python/openc3/interfaces/protocols/fixed_protocol.py +++ b/openc3/python/openc3/interfaces/protocols/fixed_protocol.py @@ -82,14 +82,10 @@ def identify_and_finish_packet(self): try: if self.telemetry: target_packets = System.telemetry.packets(target_name) - target = System.targets[target_name] - if target: - unique_id_mode = target.tlm_unique_id_mode + unique_id_mode = System.telemetry.tlm_unique_id_mode(target_name) else: target_packets = System.commands.packets(target_name) - target = System.targets[target_name] - if target: - unique_id_mode = target.cmd_unique_id_mode + unique_id_mode = System.commands.cmd_unique_id_mode(target_name) except RuntimeError as error: if "does not exist" in traceback.format_exc(): # No commands/telemetry for this target @@ -99,21 +95,31 @@ def identify_and_finish_packet(self): if unique_id_mode: for _, packet in target_packets.items(): - if packet.identify(self.data[self.discard_leading_bytes :]): + if not packet.subpacket and packet.identify( + self.data[self.discard_leading_bytes :] + ): # identify handles virtual identified_packet = packet break else: # Do a lookup to quickly identify the packet if len(target_packets) > 0: - packet = next(iter(target_packets.values())) - key = packet.read_id_values(self.data[self.discard_leading_bytes :]) - if self.telemetry: - id_values = System.telemetry.config.tlm_id_value_hash[target_name] - else: - id_values = System.commands.config.cmd_id_value_hash[target_name] - identified_packet = id_values.get(repr(key)) - if identified_packet is None: - identified_packet = id_values.get("CATCHALL") + packet = None + for _, target_packet in target_packets.items(): + if target_packet.virtual: + continue + if target_packet.subpacket: + continue + packet = target_packet + break + if packet: + key = packet.read_id_values(self.data[self.discard_leading_bytes :]) + if self.telemetry: + id_values = System.telemetry.config.tlm_id_value_hash[target_name] + else: + id_values = System.commands.config.cmd_id_value_hash[target_name] + identified_packet = id_values.get(repr(key)) + if identified_packet is None: + identified_packet = id_values.get("CATCHALL") if identified_packet is not None: if identified_packet.defined_length + self.discard_leading_bytes > len(self.data): diff --git a/openc3/python/openc3/microservices/decom_microservice.py b/openc3/python/openc3/microservices/decom_microservice.py index 87fefe90aa..5502da1bf0 100644 --- a/openc3/python/openc3/microservices/decom_microservice.py +++ b/openc3/python/openc3/microservices/decom_microservice.py @@ -29,10 +29,13 @@ from openc3.config.config_parser import ConfigParser from openc3.utilities.time import to_nsec_from_epoch, from_nsec_from_epoch from openc3.utilities.thread_manager import ThreadManager +from openc3.microservices.interface_microservice import InterfaceMicroservice from openc3.microservices.interface_decom_common import ( handle_build_cmd, handle_inject_tlm, + handle_get_tlm_buffer ) +from openc3.models.target_model import TargetModel from openc3.top_level import kill_thread from datetime import datetime, timezone @@ -141,6 +144,9 @@ def run(self): if msg_hash.get(b"build_cmd"): handle_build_cmd(msg_hash[b"build_cmd"], msg_id, self.scope) continue + if msg_hash.get(b"get_tlm_buffer"): + handle_get_tlm_buffer(msg_hash[b"get_tlm_buffer"], msg_id, self.scope) + continue else: self.decom_packet(topic, msg_id, msg_hash, redis) self.metric.set(name="decom_total", value=self.count, type="counter") @@ -168,9 +174,12 @@ def decom_packet(self, topic, msg_id, msg_hash, _redis): ) start = time.time() + + ####################################### + # Build packet object from topic data + ####################################### target_name = msg_hash[b"target_name"].decode() packet_name = msg_hash[b"packet_name"].decode() - packet = System.telemetry.packet(target_name, packet_name) packet.stored = ConfigParser.handle_true_false(msg_hash[b"stored"].decode()) # Note: Packet time will be recalculated as part of decom so not setting @@ -180,25 +189,94 @@ def decom_packet(self, topic, msg_id, msg_hash, _redis): if extra is not None: packet.extra = json.loads(extra) packet.buffer = msg_hash[b"buffer"] - # Processors are user code points which must be rescued - # so the TelemetryDecomTopic can write the packet - try: - packet.process() # Run processors - except Exception as error: - self.error_count += 1 - self.metric.set(name="decom_error_total", value=self.error_count, type="counter") - self.error = error - self.logger.error(f"Processor error:\n{traceback.format_exc()}") - # Process all the limits and call the limits_change_callback (as necessary) - # check_limits also can call user code in the limits response - # but that is rescued separately in the limits_change_callback - packet.check_limits(System.limits_set()) - - # This is what updates the CVT - TelemetryDecomTopic.write_packet(packet, scope=self.scope) + + ################################################################################ + # Break packet into subpackets (if necessary) + # Subpackets are typically channelized data + ################################################################################ + packet_and_subpackets = packet.subpacketize() + + for packet_or_subpacket in packet_and_subpackets: + if packet_or_subpacket.subpacket: + packet_or_subpacket = self.handle_subpacket(packet, packet_or_subpacket) + + ##################################################################################### + # Run Processors + # This must be before the full decom so that processor derived values are available + ##################################################################################### + try: + packet_or_subpacket.process() # Run processors + except Exception as error: + self.error_count += 1 + self.metric.set(name="decom_error_total", value=self.error_count, type="counter") + self.error = error + self.logger.error(f"Processor error:\n{traceback.format_exc()}") + + ############################################################################# + # Process all the limits and call the limits_change_callback (as necessary) + # This must be before the full decom so that limits states are available + ############################################################################# + packet_or_subpacket.check_limits(System.limits_set()) + + # This is what actually decommutates the packet and updates the CVT + TelemetryDecomTopic.write_packet(packet_or_subpacket, scope=self.scope) diff = time.time() - start # seconds as a float self.metric.set(name="decom_duration_seconds", value=diff, type="gauge", unit="seconds") + def handle_subpacket(self, packet, subpacket): + # Subpacket received time always = packet.received_time + # Use packet_time appropriately if another timestamp is needed + subpacket.received_time = packet.received_time + subpacket.stored = packet.stored + subpacket.extra = packet.extra + + if subpacket.stored: + # Stored telemetry does not update the current value table + identified_subpacket = System.telemetry.identify_and_define_packet(subpacket, self.target_names, subpackets=True) + else: + # Identify and update subpacket + if subpacket.identified(): + try: + # Preidentifed subpacket - place it into the current value table + identified_subpacket = System.telemetry.update(subpacket.target_name, subpacket.packet_name, subpacket.buffer) + except RuntimeError: + # Subpacket identified but we don't know about it + # Clear packet_name and target_name and try to identify + self.logger.warn( + f"{self.name}: Received unknown identified subpacket: {subpacket.target_name} {subpacket.packet_name}" + ) + subpacket.target_name = None + subpacket.packet_name = None + identified_subpacket = System.telemetry.identify_and_set_buffer( + subpacket.buffer, self.target_names, subpackets=True + ) + else: + # Subpacket needs to be identified + identified_subpacket = System.telemetry.identify_and_set_buffer( + subpacket.buffer, self.target_names, subpackets=True + ) + + if identified_subpacket: + identified_subpacket.received_time = subpacket.received_time + identified_subpacket.stored = subpacket.stored + identified_subpacket.extra = subpacket.extra + subpacket = identified_subpacket + else: + unknown_subpacket = System.telemetry.update("UNKNOWN", "UNKNOWN", subpacket.buffer) + unknown_subpacket.received_time = subpacket.received_time + unknown_subpacket.stored = subpacket.stored + unknown_subpacket.extra = subpacket.extra + subpacket = unknown_subpacket + num_bytes_to_print = min(InterfaceMicroservice.UNKNOWN_BYTES_TO_PRINT, len(subpacket.buffer)) + data = subpacket.buffer_no_copy()[0:(num_bytes_to_print)] + prefix = "".join([format(x, "02x") for x in data]) + self.logger.warn( + f"{self.name} {subpacket.target_name} packet length: {len(subpacket.buffer)} starting with: {prefix}" + ) + + TargetModel.sync_tlm_packet_counts(subpacket, self.target_names, scope=self.scope) + return subpacket + # Called when an item in any packet changes limits states. # # @param packet [Packet] Packet which has had an item change limits state diff --git a/openc3/python/openc3/microservices/interface_decom_common.py b/openc3/python/openc3/microservices/interface_decom_common.py index 3a62039f16..a7da83bba0 100644 --- a/openc3/python/openc3/microservices/interface_decom_common.py +++ b/openc3/python/openc3/microservices/interface_decom_common.py @@ -67,3 +67,33 @@ def handle_build_cmd(build_cmd_json, msg_id, scope): except RuntimeError: msg_hash = {"id": msg_id, "result": "ERROR", "message": traceback.format_exc()} Topic.write_topic(ack_topic, msg_hash) + +def handle_get_tlm_buffer(get_tlm_buffer_json, msg_id, scope): + get_tlm_buffer_hash = json.loads(get_tlm_buffer_json, cls=JsonDecoder) + target_name = get_tlm_buffer_hash['target_name'] + packet_name = get_tlm_buffer_hash['packet_name'] + ack_topic = f"{{{scope}__ACKCMD}}TARGET__{target_name}" + try: + packet = System.telemetry.packet(target_name, packet_name) + msg_hash = { + "id": msg_id, + "result": 'SUCCESS', + "time": to_nsec_from_epoch(packet.packet_time), + "received_time": to_nsec_from_epoch(packet.received_time), + "target_name": packet.target_name, + "packet_name": packet.packet_name, + "received_count": packet.received_count, + "stored": str(packet.stored), + "buffer": json.dumps(packet.buffer_no_copy(), cls=JsonEncoder) + } + if packet.extra: + msg_hash["extra"] = json.dumps(packet.extra) + # If there is an error due to parameter out of range, etc, we rescue it so we can + # write the ACKCMD}TARGET topic and allow the source to return + except RuntimeError: + msg_hash = { + "id": msg_id, + "result": 'ERROR', + "message": traceback.format_exc() + } + Topic.write_topic(ack_topic, msg_hash) \ No newline at end of file diff --git a/openc3/python/openc3/microservices/interface_microservice.py b/openc3/python/openc3/microservices/interface_microservice.py index 87e01dd259..c2bdac0852 100644 --- a/openc3/python/openc3/microservices/interface_microservice.py +++ b/openc3/python/openc3/microservices/interface_microservice.py @@ -35,7 +35,6 @@ from openc3.models.router_status_model import RouterStatusModel from openc3.models.cvt_model import CvtModel from openc3.models.target_model import TargetModel -from openc3.topics.topic import Topic from openc3.topics.interface_topic import InterfaceTopic from openc3.topics.router_topic import RouterTopic from openc3.topics.command_topic import CommandTopic @@ -47,7 +46,6 @@ from openc3.utilities.sleeper import Sleeper from openc3.utilities.time import from_nsec_from_epoch from openc3.utilities.json import JsonDecoder, JsonEncoder -from openc3.utilities.store import Store, openc3_redis_cluster from openc3.utilities.store_queued import StoreQueued, EphemeralStoreQueued from openc3.utilities.thread_manager import ThreadManager from openc3.top_level import kill_thread @@ -608,19 +606,7 @@ def __init__(self, name): for target_name in self.interface.target_names: target = System.targets[target_name] target.interface = self.interface - for target_name in self.interface.tlm_target_names: - # Initialize the target's packet counters based on the Topic stream - # Prevents packet count resetting to 0 when interface restarts - try: - for packet_name, packet in System.telemetry.packets(target_name).items(): - topic = f"{self.scope}__TELEMETRY__{{target_name}}__{packet_name}" - msg_id, msg_hash = Topic.get_newest_message(topic) - if msg_id: - packet.received_count = int(msg_hash[b"received_count"]) - else: - packet.received_count = 0 - except RuntimeError: - pass # Handle targets without telemetry + TargetModel.init_tlm_packet_counts(self.interface.tlm_target_names, scope=self.scope) if self.interface.connect_on_startup: self.interface.state = "ATTEMPTING" else: @@ -631,17 +617,14 @@ def __init__(self, name): RouterStatusModel.set(self.interface.as_json(), scope=self.scope) self.queued = False - self.sync_packet_count_data = {} - self.sync_packet_count_time = None - self.sync_packet_count_delay_seconds = 1.0 # Sync packet counts every second for option_name, option_values in self.interface.options.items(): if option_name.upper() == "OPTIMIZE_THROUGHPUT": self.queued = True update_interval = float(option_values[0]) EphemeralStoreQueued.instance().set_update_interval(update_interval) StoreQueued.instance().set_update_interval(update_interval) - if option_name.upper() == "SYNC_PACKET_COUNT_DELAY_SECONDS": - self.sync_packet_count_delay_seconds = float(option_values[0]) + if option_name.upper() == 'SYNC_PACKET_COUNT_DELAY_SECONDS': + TargetModel.sync_packet_count_delay_seconds = float(option_values[0]) if self.interface_or_router == "INTERFACE": self.handler_thread = InterfaceCmdHandlerThread( @@ -835,7 +818,7 @@ def handle_packet(self, packet): # Write to stream if self.interface.tlm_target_enabled.get(packet.target_name, False): - self.sync_tlm_packet_counts(packet) + TargetModel.sync_tlm_packet_counts(packet, self.interface.tlm_target_names, scope=self.scope) TelemetryTopic.write_packet(packet, queued=self.queued, scope=self.scope) def handle_connection_failed(self, connection, connect_error): @@ -959,66 +942,6 @@ def shutdown(self, sig=None): def graceful_kill(self): pass # Just to avoid warning - def sync_tlm_packet_counts(self, packet): - if self.sync_packet_count_delay_seconds <= 0 or openc3_redis_cluster: - # Perfect but slow method - packet.received_count = TargetModel.increment_telemetry_count( - packet.target_name, packet.packet_name, 1, scope=self.scope - ) - else: - # Eventually consistent method - # Only sync every period (default 1 second) to avoid hammering Redis - # This is a trade off between speed and accuracy - # The packet count is eventually consistent - if self.sync_packet_count_data.get(packet.target_name) is None: - self.sync_packet_count_data[packet.target_name] = {} - if self.sync_packet_count_data[packet.target_name].get(packet.packet_name) is None: - self.sync_packet_count_data[packet.target_name][packet.packet_name] = 0 - self.sync_packet_count_data[packet.target_name][packet.packet_name] += 1 - - # Ensures counters change between syncs - packet.received_count += 1 - - # Check if we need to sync the packet counts - if ( - self.sync_packet_count_time is None - or (time.time() - self.sync_packet_count_time) > self.sync_packet_count_delay_seconds - ): - self.sync_packet_count_time = time.time() - - result = [] - inc_count = 0 - # Use pipeline to make this one transaction - with Store.instance().redis_pool.get() as redis: - pipeline = redis.pipeline(transaction=False) - thread_id = threading.get_native_id() - Store.instance().redis_pool.pipelines[thread_id] = pipeline - try: - # Increment global counters for packets received - for target_name, packet_data in self.sync_packet_count_data.items(): - for packet_name, count in packet_data.items(): - TargetModel.increment_telemetry_count(target_name, packet_name, count, scope=self.scope) - inc_count += 1 - self.sync_packet_count_data = {} - - # Get all the packet counts with the global counters - for target_name in self.interface.tlm_target_names: - TargetModel.get_all_telemetry_counts(target_name, scope=self.scope) - TargetModel.get_all_telemetry_counts("UNKNOWN", scope=self.scope) - - result = pipeline.execute() - finally: - Store.instance().redis_pool.pipelines[thread_id] = None - for target_name in self.interface.tlm_target_names: - for packet_name, count in result[inc_count].items(): - update_packet = System.telemetry.packet(target_name, packet_name.decode()) - update_packet.received_count = int(count) - inc_count += 1 - for packet_name, count in result[inc_count].items(): - update_packet = System.telemetry.packet("UNKNOWN", packet_name.decode()) - update_packet.received_count = int(count) - - if os.path.basename(__file__) == os.path.basename(sys.argv[0]): InterfaceMicroservice.class_run() ThreadManager.instance().shutdown() diff --git a/openc3/python/openc3/models/target_model.py b/openc3/python/openc3/models/target_model.py index 0b4e6fec37..85ebf899ef 100644 --- a/openc3/python/openc3/models/target_model.py +++ b/openc3/python/openc3/models/target_model.py @@ -22,6 +22,7 @@ import json import time +import threading from typing import Any from openc3.environment import OPENC3_SCOPE from openc3.topics.topic import Topic @@ -31,6 +32,7 @@ from openc3.utilities.store import Store, openc3_redis_cluster from openc3.utilities.logger import Logger from openc3.utilities.bucket import Bucket +from openc3.system.system import System from openc3.environment import OPENC3_CONFIG_BUCKET @@ -46,6 +48,9 @@ class TargetModel(Model): VALID_TYPES = {"CMD", "TLM"} ITEM_MAP_CACHE_TIMEOUT = 10.0 item_map_cache = {} + sync_packet_count_data = {} + sync_packet_count_time = None + sync_packet_count_delay_seconds = 1.0 # Sync packet counts every second # NOTE: The following three class methods are used by the ModelController # and are reimplemented to enable various Model class methods to work @@ -275,6 +280,76 @@ def get_telemetry_counts(cls, target_packets: list, scope: str = OPENC3_SCOPE): counts.append(int(count)) return counts + @classmethod + def init_tlm_packet_counts(cls, tlm_target_names, scope): + cls.sync_packet_count_time = time.time() + + # Get all the packet counts with the global counters + for target_name in tlm_target_names: + for packet_name, count in TargetModel.get_all_telemetry_counts(target_name, scope=scope).items(): + update_packet = System.telemetry.packet(target_name, packet_name.decode()) + update_packet.received_count = int(count) + for packet_name, count in TargetModel.get_all_telemetry_counts('UNKNOWN', scope=scope).items(): + update_packet = System.telemetry.packet('UNKNOWN', packet_name.decode()) + update_packet.received_count = int(count) + + @classmethod + def sync_tlm_packet_counts(cls, packet, tlm_target_names, scope): + if cls.sync_packet_count_delay_seconds <= 0 or openc3_redis_cluster: + # Perfect but slow method + packet.received_count = TargetModel.increment_telemetry_count(packet.target_name, packet.packet_name, 1, scope=scope) + else: + # Eventually consistent method + # Only sync every period (default 1 second) to avoid hammering Redis + # This is a trade off between speed and accuracy + # The packet count is eventually consistent + if cls.sync_packet_count_data.get(packet.target_name) is None: + cls.sync_packet_count_data[packet.target_name] = {} + if cls.sync_packet_count_data[packet.target_name].get(packet.packet_name) is None: + cls.sync_packet_count_data[packet.target_name][packet.packet_name] = 0 + cls.sync_packet_count_data[packet.target_name][packet.packet_name] += 1 + + # Ensures counters change between syncs + update_packet = System.telemetry.packet(packet.target_name, packet.packet_name) + update_packet.received_count += 1 + packet.received_count = update_packet.received_count + + # Check if we need to sync the packet counts + if cls.sync_packet_count_time is None or (time.time() - cls.sync_packet_count_time) > cls.sync_packet_count_delay_seconds: + cls.sync_packet_count_time = time.time() + + result = [] + inc_count = 0 + # Use pipeline to make this one transaction + with Store.instance().redis_pool.get() as redis: + pipeline = redis.pipeline(transaction=False) + thread_id = threading.get_native_id() + Store.instance().redis_pool.pipelines[thread_id] = pipeline + try: + # Increment global counters for packets received + for target_name, packet_data in cls.sync_packet_count_data.items(): + for packet_name, count in packet_data.items(): + TargetModel.increment_telemetry_count(target_name, packet_name, count, scope=scope) + inc_count += 1 + cls.sync_packet_count_data = {} + + # Get all the packet counts with the global counters + for target_name in tlm_target_names: + TargetModel.get_all_telemetry_counts(target_name, scope=scope) + TargetModel.get_all_telemetry_counts('UNKNOWN', scope=scope) + + result = pipeline.execute() + finally: + Store.instance().redis_pool.pipelines[thread_id] = None + for target_name in tlm_target_names: + for packet_name, count in result[inc_count].items(): + update_packet = System.telemetry.packet(target_name, packet_name.decode()) + update_packet.received_count = int(count) + inc_count += 1 + for packet_name, count in result[inc_count].items(): + update_packet = System.telemetry.packet('UNKNOWN', packet_name.decode()) + update_packet.received_count = int(count) + @classmethod def increment_command_count(cls, target_name: str, packet_name: str, count: int, scope: str = OPENC3_SCOPE): result = Store.hincrby(f"{scope}__COMMANDCNTS__{{{target_name}}}", packet_name, count) @@ -347,8 +422,6 @@ def __init__( ignored_items=[], limits_groups=[], cmd_tlm_files=[], - cmd_unique_id_mode=False, - tlm_unique_id_mode=False, id=None, updated_at=None, plugin=None, diff --git a/openc3/python/openc3/packets/command_validator.py b/openc3/python/openc3/packets/command_validator.py index 50153af742..0786459947 100644 --- a/openc3/python/openc3/packets/command_validator.py +++ b/openc3/python/openc3/packets/command_validator.py @@ -34,6 +34,3 @@ def post_check(self, command): # Return True to indicate Success, False to indicate Failure, # and None to indicate Unknown. The second value is the optional message. return [True, None] - - def args(self): - return self.args diff --git a/openc3/python/openc3/packets/commands.py b/openc3/python/openc3/packets/commands.py index 9596de0606..663e17cb07 100644 --- a/openc3/python/openc3/packets/commands.py +++ b/openc3/python/openc3/packets/commands.py @@ -92,7 +92,7 @@ def params(self, target_name, packet_name): # # @param (see #identify_tlm!) # @return (see #identify_tlm!) - def identify(self, packet_data, target_names=None): + def identify(self, packet_data, target_names=None, subpackets=False): identified_packet = None if target_names is None: @@ -107,20 +107,41 @@ def identify(self, packet_data, target_names=None): # No commands for this target continue - target = self.system.targets.get(target_name) - if target and target.cmd_unique_id_mode: + if (not subpackets and self.cmd_unique_id_mode(target_name)) or ( + subpackets and self.cmd_subpacket_unique_id_mode(target_name) + ): # Iterate through the packets and see if any represent the buffer for _, packet in target_packets.items(): - if packet.identify(packet_data): + if subpackets: + if not packet.subpacket: + continue + else: + if packet.subpacket: + continue + if packet.identify(packet_data): # Handles virtual identified_packet = packet break else: # Do a lookup to quickly identify the packet - if len(target_packets) > 0: - packet = next(iter(target_packets.values())) + packet = None + for _, target_packet in target_packets.items(): + if target_packet.virtual: + continue + if subpackets: + if not target_packet.subpacket: + continue + else: + if target_packet.subpacket: + continue + packet = target_packet + break + if packet: key = packet.read_id_values(packet_data) - id_values = self.config.cmd_id_value_hash[target_name] - identified_packet = id_values.get(str(key)) + if subpackets: + id_values = self.config.cmd_subpacket_id_value_hash[target_name] + else: + id_values = self.config.cmd_id_value_hash[target_name] + identified_packet = id_values.get(repr(key)) if identified_packet is None: identified_packet = id_values.get("CATCHALL") @@ -248,6 +269,12 @@ def all(self): def dynamic_add_packet(self, packet, affect_ids=False): self.config.dynamic_add_packet(packet, "COMMAND", affect_ids=affect_ids) + def cmd_unique_id_mode(self, target_name): + return self.config.cmd_unique_id_mode.get(target_name.upper()) + + def cmd_subpacket_unique_id_mode(self, target_name): + return self.config.cmd_subpacket_unique_id_mode.get(target_name.upper()) + def _set_parameters(self, command, params, range_checking): given_item_names = [] for item_name, value in params.items(): diff --git a/openc3/python/openc3/packets/packet.py b/openc3/python/openc3/packets/packet.py index 14bb833008..f4574e678c 100644 --- a/openc3/python/openc3/packets/packet.py +++ b/openc3/python/openc3/packets/packet.py @@ -42,9 +42,7 @@ from openc3.utilities.string import ( simple_formatted, quote_if_necessary, - class_name_to_filename, ) -from openc3.top_level import get_class_from_module class Packet(Structure): @@ -102,7 +100,9 @@ def __init__( self.ignore_overlap = False self.virtual = False self.restricted = False + self.subpacket = False self.validator = None + self.subpacketizer = None self.obfuscated_items = [] self.obfuscated_items_hash = {} @@ -1010,10 +1010,15 @@ def to_config(self, cmd_or_tlm): config += f'TELEMETRY {quote_if_necessary(self.target_name)} {quote_if_necessary(self.packet_name)} {self.default_endianness} "{self.description}"\n' else: config += f'COMMAND {quote_if_necessary(self.target_name)} {quote_if_necessary(self.packet_name)} {self.default_endianness} "{self.description}"\n' + if self.subpacketizer: + args_str = " ".join([quote_if_necessary(str(a)) for a in self.subpacketizer.args]) + config += f" SUBPACKETIZER {self.subpacketizer.__class__.__name__} {args_str}\n" if self.accessor.__class__.__name__ != "BinaryAccessor": - config += f" ACCESSOR {self.accessor.__class__.__name__}\n" + args_str = " ".join([quote_if_necessary(str(a)) for a in self.accessor.args]) + config += f" ACCESSOR {self.accessor.__class__.__name__} {args_str}\n" if self.validator: - config += f" VALIDATOR {self.validator.__class__.__name__}\n" + args_str = " ".join([quote_if_necessary(str(a)) for a in self.validator.args]) + config += f" VALIDATOR {self.validator.__class__.__name__} {args_str}\n" # TODO: Add TEMPLATE_ENCODED so this can always be done inline regardless of content if self.template: config += f" TEMPLATE '{self.template}'\n" @@ -1029,6 +1034,8 @@ def to_config(self, cmd_or_tlm): config += " DISABLED\n" elif self.hidden: config += " HIDDEN\n" + if self.subpacket: + config += " SUBPACKET\n" if self.restricted: config += " RESTRICTED\n" @@ -1088,6 +1095,10 @@ def as_json(self): config["hidden"] = True if self.virtual: config["virtual"] = True + if self.subpacket: + config["subpacket"] = True + if self.subpacketizer: + config["subpacketizer"] = self.subpacketizer.__class__.__name__ if self.restricted: config["restricted"] = True config["accessor"] = self.accessor.__class__.__name__ @@ -1134,59 +1145,6 @@ def as_json(self): return config - @classmethod - def from_json(cls, hash): - endianness = hash.get("endianness") - packet = Packet(hash["target_name"], hash["packet_name"], endianness, hash["description"]) - packet.short_buffer_allowed = hash.get("short_buffer_allowed") - packet.hazardous = hash.get("hazardous") - packet.hazardous_description = hash.get("hazardous_description") - packet.messages_disabled = hash.get("messages_disabled") - packet.disabled = hash.get("disabled") - packet.hidden = hash.get("hidden") - packet.virtual = hash.get("virtual") - packet.restricted = hash.get("restricted") - if "accessor" in hash: - try: - filename = class_name_to_filename(hash["accessor"]) - accessor = get_class_from_module(f"openc3.accessors.{filename}", hash["accessor"]) - if hash.get("accessor_args") and len(hash["accessor_args"]) > 0: - packet.accessor = accessor(packet, *hash["accessor_args"]) - else: - packet.accessor = accessor(packet) - except RuntimeError: - Logger.error( - f"{packet.target_name} {packet.packet_name} accessor of {hash['accessor']} could not be found due to {traceback.format_exc()}" - ) - if "validator" in hash: - try: - filename = class_name_to_filename(hash["validator"]) - validator = get_class_from_module(filename, hash["validator"]) - packet.validator = validator(packet) - except RuntimeError: - Logger.error( - f"{packet.target_name} {packet.packet_name} validator of {hash['validator']} could not be found due to {traceback.format_exc()}" - ) - if "template" in hash: - packet.template = base64.b64decode(hash["template"]) - packet.meta = hash.get("meta") - # Can't convert processors - for item in hash["items"]: - packet.define(PacketItem.from_json(item)) - - if "response" in hash: - packet.response = hash["response"] - if "error_response" in hash: - packet.error_response = hash["error_response"] - if "screen" in hash: - packet.screen = hash["screen"] - if "related_items" in hash: - packet.related_items = hash["related_items"] - if "ignore_overlap" in hash: - packet.ignore_overlap = hash["ignore_overlap"] - - return packet - def decom(self): # Read all the RAW at once because this could be optimized by the accessor json_hash = self.read_items(self.sorted_items) @@ -1370,3 +1328,14 @@ def obfuscate(self): except Exception as e: Logger.error(f"{item.name} obfuscation failed with error: {repr(e)}") continue + + def subpacketize(self): + """Break packet into subpackets using subpacketizer if defined. + + Returns: + list: List of packet objects (subpackets or [self] if no subpacketizer) + """ + if self.subpacketizer: + return self.subpacketizer.call(self) + else: + return [self] diff --git a/openc3/python/openc3/packets/packet_config.py b/openc3/python/openc3/packets/packet_config.py index bb81b90be8..86fa0cbf78 100644 --- a/openc3/python/openc3/packets/packet_config.py +++ b/openc3/python/openc3/packets/packet_config.py @@ -58,7 +58,17 @@ def __init__(self): self.latest_data = {} self.warnings = [] self.cmd_id_value_hash = {} + self.cmd_subpacket_id_value_hash = {} + self.cmd_id_signature = {} + self.cmd_subpacket_id_signature = {} + self.cmd_unique_id_mode = {} + self.cmd_subpacket_unique_id_mode = {} self.tlm_id_value_hash = {} + self.tlm_subpacket_id_value_hash = {} + self.tlm_id_signature = {} + self.tlm_subpacket_id_signature = {} + self.tlm_unique_id_mode = {} + self.tlm_subpacket_unique_id_mode = {} # Create unknown packets self.commands["UNKNOWN"] = {} @@ -211,9 +221,11 @@ def process_file(self, filename, process_target_name): | "DISABLE_MESSAGES" | "HIDDEN" | "DISABLED" + | "SUBPACKET" | "VIRTUAL" | "ACCESSOR" | "VALIDATOR" + | "SUBPACKETIZER" | "TEMPLATE" | "TEMPLATE_FILE" | "RESPONSE" @@ -329,19 +341,37 @@ def finish_packet(self): PacketParser.check_item_data_types(self.current_packet) self.commands[self.current_packet.target_name][self.current_packet.packet_name] = self.current_packet if not self.current_packet.virtual: - id_values = self.cmd_id_value_hash.get(self.current_packet.target_name) - if not id_values: - id_values = {} - self.cmd_id_value_hash[self.current_packet.target_name] = id_values - self.update_id_value_hash(self.current_packet, id_values) + if self.current_packet.subpacket: + self.build_id_metadata( + self.current_packet, + self.cmd_subpacket_id_value_hash, + self.cmd_subpacket_id_signature, + self.cmd_subpacket_unique_id_mode, + ) + else: + self.build_id_metadata( + self.current_packet, + self.cmd_id_value_hash, + self.cmd_id_signature, + self.cmd_unique_id_mode, + ) else: self.telemetry[self.current_packet.target_name][self.current_packet.packet_name] = self.current_packet if not self.current_packet.virtual: - id_values = self.tlm_id_value_hash.get(self.current_packet.target_name) - if not id_values: - id_values = {} - self.tlm_id_value_hash[self.current_packet.target_name] = id_values - self.update_id_value_hash(self.current_packet, id_values) + if self.current_packet.subpacket: + self.build_id_metadata( + self.current_packet, + self.tlm_subpacket_id_value_hash, + self.tlm_subpacket_id_signature, + self.tlm_subpacket_unique_id_mode, + ) + else: + self.build_id_metadata( + self.current_packet, + self.tlm_id_value_hash, + self.tlm_id_signature, + self.tlm_unique_id_mode, + ) self.current_packet = None self.current_item = None @@ -350,12 +380,18 @@ def dynamic_add_packet(self, packet, cmd_or_tlm="TELEMETRY", affect_ids=False): if cmd_or_tlm == "COMMAND": self.commands[packet.target_name][packet.packet_name] = packet - if affect_ids: - id_values = self.cmd_id_value_hash.get(packet.target_name, None) - if not id_values: - id_values = {} - self.cmd_id_value_hash[packet.target_name] = id_values - self.update_id_value_hash(packet, id_values) + if affect_ids and not packet.virtual: + if packet.subpacket: + self.build_id_metadata( + packet, + self.cmd_subpacket_id_value_hash, + self.cmd_subpacket_id_signature, + self.cmd_subpacket_unique_id_mode, + ) + else: + self.build_id_metadata( + packet, self.cmd_id_value_hash, self.cmd_id_signature, self.cmd_unique_id_mode + ) else: self.telemetry[packet.target_name][packet.packet_name] = packet @@ -368,12 +404,58 @@ def dynamic_add_packet(self, packet, cmd_or_tlm="TELEMETRY", affect_ids=False): if packet not in latest_data_packets: latest_data_packets.append(packet) - if affect_ids: - id_values = self.tlm_id_value_hash.get(packet.target_name, None) - if not id_values: - id_values = {} - self.tlm_id_value_hash[packet.target_name] = id_values - self.update_id_value_hash(packet, id_values) + if affect_ids and not packet.virtual: + if packet.subpacket: + self.build_id_metadata( + packet, + self.tlm_subpacket_id_value_hash, + self.tlm_subpacket_id_signature, + self.tlm_subpacket_unique_id_mode, + ) + else: + self.build_id_metadata( + packet, self.tlm_id_value_hash, self.tlm_id_signature, self.tlm_unique_id_mode + ) + + def build_id_metadata(self, packet, id_value_hash, id_signature_hash, unique_id_mode_hash): + """Build identification metadata for a packet. + + Args: + packet: The packet to build metadata for + id_value_hash: Hash mapping target names to ID value hashes + id_signature_hash: Hash mapping target names to ID signatures + unique_id_mode_hash: Hash mapping target names to unique ID mode flags + """ + target_id_value_hash = id_value_hash.get(packet.target_name) + if not target_id_value_hash: + target_id_value_hash = {} + id_value_hash[packet.target_name] = target_id_value_hash + self.update_id_value_hash(packet, target_id_value_hash, id_signature_hash, unique_id_mode_hash) + + def update_id_value_hash(self, packet, target_id_value_hash, id_signature_hash, unique_id_mode_hash): + """Update the ID value hash for a packet and track ID signatures. + + Args: + packet: The packet to update + target_id_value_hash: Hash mapping ID values to packets for this target + id_signature_hash: Hash mapping target names to ID signatures + unique_id_mode_hash: Hash mapping target names to unique ID mode flags + """ + if packet.id_items and len(packet.id_items) > 0: + key = [] + id_signature = "" + for item in packet.id_items: + key.append(item.id_value) + id_signature += f"__{item.name}___{item.bit_offset}__{item.bit_size}__{item.data_type}" + target_id_value_hash[repr(key)] = packet + target_id_signature = id_signature_hash.get(packet.target_name) + if target_id_signature: + if id_signature != target_id_signature: + unique_id_mode_hash[packet.target_name] = True + else: + id_signature_hash[packet.target_name] = id_signature + else: + target_id_value_hash["CATCHALL"] = packet # This method provides way to quickly test packet configs # @@ -397,16 +479,6 @@ def from_config(cls, config, process_target_name): pc.process_file(tf.name, process_target_name) return pc - def update_id_value_hash(self, packet, hash): - if packet.id_items and len(packet.id_items) > 0: - key = [] - for item in packet.id_items: - key.append(item.id_value) - - hash[repr(key)] = packet - else: - hash["CATCHALL"] = packet - def reset_processing_variables(self): self.current_cmd_or_tlm = None self.current_packet = None @@ -511,12 +583,17 @@ def process_current_packet(self, parser, keyword, params): self.current_packet.disabled = True self.current_packet.virtual = True + case "SUBPACKET": + usage = keyword + parser.verify_num_parameters(0, 0, usage) + self.current_packet.subpacket = True + case "RESTRICTED": usage = keyword parser.verify_num_parameters(0, 0, usage) self.current_packet.restricted = True - case "ACCESSOR": + case "ACCESSOR" | "VALIDATOR" | "SUBPACKETIZER": usage = f"{keyword} ..." parser.verify_num_parameters(1, None, usage) klass = None @@ -527,30 +604,22 @@ def process_current_packet(self, parser, keyword, params): ) except ModuleNotFoundError: try: - # Fall back to the deprecated behavior of passing the ClassName (only works with built-in accessors) + # Fall back to the deprecated behavior of passing the ClassName filename = class_name_to_filename(params[0]) - klass = get_class_from_module(f"openc3.accessors.{filename}", params[0]) + if keyword == "ACCESSOR": + klass = get_class_from_module(f"openc3.accessors.{filename}", params[0]) + elif keyword == "VALIDATOR": + klass = get_class_from_module(f"openc3.validators.{filename}", params[0]) + elif keyword == "SUBPACKETIZER": + klass = get_class_from_module(f"openc3.subpacketizers.{filename}", params[0]) except ModuleNotFoundError: raise parser.error(f"ModuleNotFoundError parsing {params[0]}. Usage: {usage}") + + keyword_attr = keyword.lower() if len(params) > 1: - self.current_packet.accessor = klass(self.current_packet, *params[1:]) + setattr(self.current_packet, keyword_attr, klass(self.current_packet, *params[1:])) else: - self.current_packet.accessor = klass(self.current_packet) - - case "VALIDATOR": - usage = f"{keyword} ..." - parser.verify_num_parameters(1, None, usage) - try: - klass = get_class_from_module( - filename_to_module(params[0]), - filename_to_class_name(params[0]), - ) - if len(params) > 1: - self.current_packet.validator = klass(self.current_packet, *params[1:]) - else: - self.current_packet.validator = klass(self.current_packet) - except ModuleNotFoundError as error: - raise parser.error(error) + setattr(self.current_packet, keyword_attr, klass(self.current_packet)) case "TEMPLATE": usage = f"{keyword}