Skip to content

Commit c71656e

Browse files
committed
tp: introduce ForgedTracePacketWriter for writing TracePackets efficiently
This CL introduces a new class to write TracePacket protos efficiently and get out TraceBlobViews without creating tons of copies. There were already N instances of this in the codebase and we're now introducing a new one in #4768. Instead now we have a single class which works with TraceBlob + TraceBlobView and is stored in TraceProcessorContext so everyone can use it. It works in 4MB chunks and reuses them across TracePacket calls which makes it really efficient now to create these packets with 99% of cases being totally zero-copy.
1 parent a21ceb5 commit c71656e

16 files changed

+426
-98
lines changed

Android.bp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15707,6 +15707,7 @@ filegroup {
1570715707
"src/trace_processor/importers/proto/chrome_system_probes_module.cc",
1570815708
"src/trace_processor/importers/proto/chrome_system_probes_parser.cc",
1570915709
"src/trace_processor/importers/proto/default_modules.cc",
15710+
"src/trace_processor/importers/proto/forged_packet_writer.cc",
1571015711
"src/trace_processor/importers/proto/memory_tracker_snapshot_module.cc",
1571115712
"src/trace_processor/importers/proto/memory_tracker_snapshot_parser.cc",
1571215713
"src/trace_processor/importers/proto/metadata_minimal_module.cc",
@@ -15747,6 +15748,7 @@ filegroup {
1574715748
srcs: [
1574815749
"src/trace_processor/importers/proto/active_chrome_processes_tracker_unittest.cc",
1574915750
"src/trace_processor/importers/proto/chrome_string_lookup_unittest.cc",
15751+
"src/trace_processor/importers/proto/forged_packet_writer_unittest.cc",
1575015752
"src/trace_processor/importers/proto/heap_graph_tracker_unittest.cc",
1575115753
"src/trace_processor/importers/proto/jit_tracker_unittest.cc",
1575215754
"src/trace_processor/importers/proto/network_trace_module_unittest.cc",

BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2810,6 +2810,8 @@ perfetto_filegroup(
28102810
"src/trace_processor/importers/proto/chrome_system_probes_parser.h",
28112811
"src/trace_processor/importers/proto/default_modules.cc",
28122812
"src/trace_processor/importers/proto/default_modules.h",
2813+
"src/trace_processor/importers/proto/forged_packet_writer.cc",
2814+
"src/trace_processor/importers/proto/forged_packet_writer.h",
28132815
"src/trace_processor/importers/proto/memory_tracker_snapshot_module.cc",
28142816
"src/trace_processor/importers/proto/memory_tracker_snapshot_module.h",
28152817
"src/trace_processor/importers/proto/memory_tracker_snapshot_parser.cc",

src/trace_processor/importers/proto/BUILD.gn

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ source_set("minimal") {
3030
"chrome_system_probes_parser.h",
3131
"default_modules.cc",
3232
"default_modules.h",
33+
"forged_packet_writer.cc",
34+
"forged_packet_writer.h",
3335
"memory_tracker_snapshot_module.cc",
3436
"memory_tracker_snapshot_module.h",
3537
"memory_tracker_snapshot_parser.cc",
@@ -299,6 +301,7 @@ source_set("unittests") {
299301
sources = [
300302
"active_chrome_processes_tracker_unittest.cc",
301303
"chrome_string_lookup_unittest.cc",
304+
"forged_packet_writer_unittest.cc",
302305
"heap_graph_tracker_unittest.cc",
303306
"jit_tracker_unittest.cc",
304307
"network_trace_module_unittest.cc",

src/trace_processor/importers/proto/android_probes_module.cc

Lines changed: 45 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@
2626
#include "perfetto/base/logging.h"
2727
#include "perfetto/ext/base/string_view.h"
2828
#include "perfetto/protozero/field.h"
29-
#include "perfetto/protozero/scattered_heap_buffer.h"
3029
#include "perfetto/trace_processor/ref_counted.h"
31-
#include "perfetto/trace_processor/trace_blob.h"
3230
#include "protos/perfetto/common/builtin_clock.pbzero.h"
3331
#include "src/trace_processor/importers/common/clock_tracker.h"
3432
#include "src/trace_processor/importers/common/event_tracker.h"
3533
#include "src/trace_processor/importers/common/parser_types.h"
3634
#include "src/trace_processor/importers/proto/android_probes_parser.h"
3735
#include "src/trace_processor/importers/proto/android_probes_tracker.h"
36+
#include "src/trace_processor/importers/proto/forged_packet_writer.h"
3837
#include "src/trace_processor/importers/proto/packet_sequence_state_generation.h"
3938
#include "src/trace_processor/importers/proto/proto_importer_module.h"
4039
#include "src/trace_processor/sorter/trace_sorter.h"
@@ -139,20 +138,20 @@ ModuleResult AndroidProbesModule::TokenizePacket(
139138
actual_ts = packet_timestamp;
140139
}
141140

142-
protozero::HeapBuffered<protos::pbzero::TracePacket> data_packet;
143-
// Keep the original timestamp to later extract as an arg; the sorter does
144-
// not read this.
145-
data_packet->set_timestamp(static_cast<uint64_t>(packet_timestamp));
146-
147-
auto* power_rails_proto = data_packet->set_power_rails();
148-
power_rails_proto->set_session_uuid(evt.session_uuid());
149-
auto* energy = power_rails_proto->add_energy_data();
150-
energy->set_energy(data.energy());
151-
energy->set_index(data.index());
152-
energy->set_timestamp_ms(static_cast<uint64_t>(actual_ts / 1000000));
153-
154-
auto [vec, size] = data_packet.SerializeAsUniquePtr();
155-
TraceBlobView tbv(TraceBlob::TakeOwnership(std::move(vec), size));
141+
TraceBlobView tbv =
142+
context_->forged_packet_writer->WritePacket([&](auto* data_packet) {
143+
// Keep the original timestamp to later extract as an arg; the
144+
// sorter does not read this.
145+
data_packet->set_timestamp(static_cast<uint64_t>(packet_timestamp));
146+
147+
auto* power_rails_proto = data_packet->set_power_rails();
148+
power_rails_proto->set_session_uuid(evt.session_uuid());
149+
auto* energy = power_rails_proto->add_energy_data();
150+
energy->set_energy(data.energy());
151+
energy->set_index(data.index());
152+
energy->set_timestamp_ms(
153+
static_cast<uint64_t>(actual_ts / 1000000));
154+
});
156155
module_context_->trace_packet_stream->Push(
157156
actual_ts, TracePacketData{std::move(tbv), state});
158157
}
@@ -176,36 +175,36 @@ ModuleResult AndroidProbesModule::TokenizePacket(
176175
}
177176
int64_t actual_ts = *trace_ts;
178177

179-
protozero::HeapBuffered<protos::pbzero::TracePacket> data_packet;
180-
data_packet->set_timestamp(static_cast<uint64_t>(actual_ts));
181-
182-
auto* log_pkt = data_packet->set_android_log();
183-
auto* log_evt = log_pkt->add_events();
184-
log_evt->set_log_id(
185-
static_cast<protos::pbzero::AndroidLogId>(evt.log_id()));
186-
log_evt->set_pid(evt.pid());
187-
log_evt->set_tid(evt.tid());
188-
log_evt->set_uid(evt.uid());
189-
log_evt->set_timestamp(evt.timestamp());
190-
log_evt->set_tag(evt.tag());
191-
log_evt->set_prio(
192-
static_cast<protos::pbzero::AndroidLogPriority>(evt.prio()));
193-
log_evt->set_message(evt.message());
194-
for (auto arg_it = evt.args(); arg_it; ++arg_it) {
195-
protos::pbzero::AndroidLogPacket::LogEvent::Arg::Decoder arg(*arg_it);
196-
auto* new_arg = log_evt->add_args();
197-
new_arg->set_name(arg.name());
198-
if (arg.has_int_value()) {
199-
new_arg->set_int_value(arg.int_value());
200-
} else if (arg.has_float_value()) {
201-
new_arg->set_float_value(arg.float_value());
202-
} else if (arg.has_string_value()) {
203-
new_arg->set_string_value(arg.string_value());
204-
}
205-
}
206-
207-
auto [vec, size] = data_packet.SerializeAsUniquePtr();
208-
TraceBlobView tbv(TraceBlob::TakeOwnership(std::move(vec), size));
178+
TraceBlobView tbv =
179+
context_->forged_packet_writer->WritePacket([&](auto* data_packet) {
180+
data_packet->set_timestamp(static_cast<uint64_t>(actual_ts));
181+
182+
auto* log_pkt = data_packet->set_android_log();
183+
auto* log_evt = log_pkt->add_events();
184+
log_evt->set_log_id(
185+
static_cast<protos::pbzero::AndroidLogId>(evt.log_id()));
186+
log_evt->set_pid(evt.pid());
187+
log_evt->set_tid(evt.tid());
188+
log_evt->set_uid(evt.uid());
189+
log_evt->set_timestamp(evt.timestamp());
190+
log_evt->set_tag(evt.tag());
191+
log_evt->set_prio(
192+
static_cast<protos::pbzero::AndroidLogPriority>(evt.prio()));
193+
log_evt->set_message(evt.message());
194+
for (auto arg_it = evt.args(); arg_it; ++arg_it) {
195+
protos::pbzero::AndroidLogPacket::LogEvent::Arg::Decoder arg(
196+
*arg_it);
197+
auto* new_arg = log_evt->add_args();
198+
new_arg->set_name(arg.name());
199+
if (arg.has_int_value()) {
200+
new_arg->set_int_value(arg.int_value());
201+
} else if (arg.has_float_value()) {
202+
new_arg->set_float_value(arg.float_value());
203+
} else if (arg.has_string_value()) {
204+
new_arg->set_string_value(arg.string_value());
205+
}
206+
}
207+
});
209208
module_context_->trace_packet_stream->Push(
210209
actual_ts, TracePacketData{std::move(tbv), state});
211210
}

src/trace_processor/importers/proto/app_wakelock_module.cc

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@
2525
#include "perfetto/ext/base/murmur_hash.h"
2626
#include "perfetto/protozero/field.h"
2727
#include "perfetto/trace_processor/ref_counted.h"
28-
#include "perfetto/trace_processor/trace_blob.h"
2928
#include "protos/perfetto/trace/android/app_wakelock_data.pbzero.h"
3029
#include "protos/perfetto/trace/interned_data/interned_data.pbzero.h"
3130
#include "protos/perfetto/trace/trace_packet.pbzero.h"
3231
#include "src/trace_processor/importers/common/parser_types.h"
3332
#include "src/trace_processor/importers/common/slice_tracker.h"
3433
#include "src/trace_processor/importers/common/track_compressor.h"
3534
#include "src/trace_processor/importers/common/tracks.h"
35+
#include "src/trace_processor/importers/proto/forged_packet_writer.h"
3636
#include "src/trace_processor/importers/proto/packet_sequence_state_generation.h"
3737
#include "src/trace_processor/importers/proto/proto_importer_module.h"
3838
#include "src/trace_processor/sorter/trace_sorter.h"
@@ -93,12 +93,16 @@ ModuleResult AppWakelockModule::TokenizePacket(
9393
continue;
9494
}
9595

96-
packet_buffer_->set_timestamp(static_cast<uint64_t>(real_ts));
97-
auto* event = packet_buffer_->set_app_wakelock_bundle();
98-
size_t length = static_cast<size_t>(interned->end() - interned->begin());
99-
event->set_info()->AppendRawProtoBytes(interned->begin(), length);
100-
event->set_acquired(acquired);
101-
PushPacketBufferForSort(real_ts, state);
96+
TraceBlobView tbv =
97+
context_->forged_packet_writer->WritePacket([&](auto* pkt) {
98+
pkt->set_timestamp(static_cast<uint64_t>(real_ts));
99+
auto* event = pkt->set_app_wakelock_bundle();
100+
auto length =
101+
static_cast<size_t>(interned->end() - interned->begin());
102+
event->set_info()->AppendRawProtoBytes(interned->begin(), length);
103+
event->set_acquired(acquired);
104+
});
105+
PushPacketBufferForSort(real_ts, std::move(tbv), state);
102106
}
103107

104108
return ModuleResult::Handled();
@@ -159,12 +163,10 @@ void AppWakelockModule::ParseWakelockBundle(int64_t ts, ConstBytes blob) {
159163

160164
void AppWakelockModule::PushPacketBufferForSort(
161165
int64_t timestamp,
166+
TraceBlobView tbv,
162167
RefPtr<PacketSequenceStateGeneration> state) {
163-
auto [vec, size] = packet_buffer_.SerializeAsUniquePtr();
164-
TraceBlobView tbv(TraceBlob::TakeOwnership(std::move(vec), size));
165168
module_context_->trace_packet_stream->Push(
166169
timestamp, TracePacketData{std::move(tbv), std::move(state)});
167-
packet_buffer_.Reset();
168170
}
169171

170172
} // namespace perfetto::trace_processor

src/trace_processor/importers/proto/app_wakelock_module.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
#include <cstdint>
2121

2222
#include "perfetto/protozero/field.h"
23-
#include "perfetto/protozero/scattered_heap_buffer.h"
2423
#include "perfetto/trace_processor/ref_counted.h"
24+
#include "perfetto/trace_processor/trace_blob_view.h"
2525
#include "protos/perfetto/trace/trace_packet.pbzero.h"
2626
#include "src/trace_processor/importers/common/parser_types.h"
2727
#include "src/trace_processor/importers/proto/packet_sequence_state_generation.h"
@@ -54,13 +54,12 @@ class AppWakelockModule : public ProtoImporterModule {
5454
private:
5555
void ParseWakelockBundle(int64_t ts, protozero::ConstBytes blob);
5656

57-
// Helper to simplify pushing a TracePacket to the sorter. The caller fills in
58-
// the packet buffer and uses this to push for sorting and reset the buffer.
57+
// Pushes a serialized TracePacket to the sorter.
5958
void PushPacketBufferForSort(int64_t timestamp,
59+
TraceBlobView tbv,
6060
RefPtr<PacketSequenceStateGeneration> state);
6161

6262
TraceProcessorContext* context_;
63-
protozero::HeapBuffered<protos::pbzero::TracePacket> packet_buffer_;
6463

6564
const StringId arg_flags_;
6665
const StringId arg_owner_pid_;
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright (C) 2025 The Android Open Source Project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "src/trace_processor/importers/proto/forged_packet_writer.h"
18+
19+
#include <cstddef>
20+
#include <cstdint>
21+
#include <cstring>
22+
#include <utility>
23+
24+
#include "perfetto/base/logging.h"
25+
#include "perfetto/protozero/contiguous_memory_range.h"
26+
#include "perfetto/protozero/scattered_stream_writer.h"
27+
#include "perfetto/public/compiler.h"
28+
#include "perfetto/trace_processor/ref_counted.h"
29+
#include "perfetto/trace_processor/trace_blob.h"
30+
#include "perfetto/trace_processor/trace_blob_view.h"
31+
32+
#include "protos/perfetto/trace/trace_packet.pbzero.h"
33+
34+
namespace perfetto::trace_processor {
35+
36+
ForgedTracePacketWriter::ForgedTracePacketWriter() : writer_(this) {}
37+
38+
ForgedTracePacketWriter::~ForgedTracePacketWriter() = default;
39+
40+
protos::pbzero::TracePacket* ForgedTracePacketWriter::BeginPacket() {
41+
if (!slab_ || packet_start_ptr_ >= slab_->data() + slab_->size()) {
42+
slab_.reset(new TraceBlob(TraceBlob::Allocate(kSlabSize)));
43+
packet_start_ptr_ = slab_->data();
44+
}
45+
46+
protozero::ContiguousMemoryRange range{packet_start_ptr_,
47+
slab_->data() + slab_->size()};
48+
msg_.Reset(&writer_);
49+
writer_.Reset(range);
50+
slices_.push_back(range);
51+
return &msg_;
52+
}
53+
54+
TraceBlobView ForgedTracePacketWriter::EndPacket() {
55+
PERFETTO_CHECK(slices_.size() > 0);
56+
msg_.Finalize();
57+
58+
// Close the last slice with the actual end position of the packet and
59+
// keep track of the current offset for the next packet.
60+
{
61+
auto& s = slices_.back();
62+
s.end = writer_.write_ptr();
63+
packet_start_ptr_ = s.end;
64+
}
65+
66+
// Common case: packet fits in the single slab. Zero copies.
67+
if (PERFETTO_LIKELY(slices_.size() == 1)) {
68+
auto s = slices_.back();
69+
slices_.clear();
70+
return {slab_, static_cast<size_t>(s.begin - slab_->data()), s.size()};
71+
}
72+
73+
PERFETTO_CHECK(overflow_slabs_.size() > 0);
74+
75+
// Rare: packet spans multiple slabs. Stitch into one contiguous blob.
76+
size_t total = 0;
77+
for (const auto& s : slices_) {
78+
total += s.size();
79+
}
80+
81+
TraceBlob stitched = TraceBlob::Allocate(total);
82+
{
83+
uint8_t* dst = stitched.data();
84+
for (const auto& s : slices_) {
85+
memcpy(dst, s.begin, s.size());
86+
dst += s.size();
87+
}
88+
}
89+
90+
slab_ = std::move(overflow_slabs_.back());
91+
overflow_slabs_.clear();
92+
slices_.clear();
93+
return TraceBlobView(std::move(stitched), 0, total);
94+
}
95+
96+
protozero::ContiguousMemoryRange ForgedTracePacketWriter::GetNewBuffer() {
97+
PERFETTO_CHECK(slices_.size() > 0);
98+
99+
// Close the current slice and start a new one for the new slab.
100+
slices_.back().end = writer_.write_ptr();
101+
102+
// Allocate a new slab and add it to the overflow slabs.
103+
overflow_slabs_.emplace_back(new TraceBlob(TraceBlob::Allocate(kSlabSize)));
104+
105+
auto& blob = *overflow_slabs_.back();
106+
protozero::ContiguousMemoryRange range{blob.data(),
107+
blob.data() + blob.size()};
108+
slices_.push_back(range);
109+
return range;
110+
}
111+
112+
} // namespace perfetto::trace_processor

0 commit comments

Comments
 (0)