Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 156 additions & 28 deletions xprof/convert/trace_viewer/trace_events.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.

#include <stddef.h>

#include <algorithm>
#include <cstdint>
#include <cstring>
#include <functional>
Expand Down Expand Up @@ -47,6 +48,7 @@ limitations under the License.
#include "xla/tsl/platform/errors.h"
#include "xla/tsl/platform/file_system.h"
#include "xla/tsl/profiler/utils/timespan.h"
#include "tsl/platform/cpu_info.h"
#include "xprof/convert/trace_viewer/prefix_trie.h"
#include "xprof/convert/trace_viewer/trace_events_util.h"
#include "xprof/convert/trace_viewer/trace_viewer_visibility.h"
Expand All @@ -69,22 +71,6 @@ inline int32_t NumEvents(
return num_events;
}

// Mark events with duplicated timestamp with different serial. This is to
// help front end to deduplicate events during streaming mode. The uniqueness
// is guaranteed by the tuple <device_id, timestamp_ps, serial_number>.
// REQUIRES: events is sorted by timestamp_ps
void MaybeAddEventUniqueId(std::vector<TraceEvent*>& events) {
uint64_t last_ts = UINT64_MAX;
uint64_t serial = 0;
for (TraceEvent* event : events) {
if (event->timestamp_ps() == last_ts) {
event->set_serial(++serial);
} else {
serial = 0;
}
last_ts = event->timestamp_ps();
}
}

// Appends all events from src into dst.
inline void AppendEvents(TraceEventTrack&& src, TraceEventTrack* dst) {
Expand All @@ -111,6 +97,34 @@ absl::Status SerializeWithReusableEvent(const TraceEvent& event,

} // namespace

// Mark events with duplicated timestamp with different serial. This is to
// help front end to deduplicate events during streaming mode. The uniqueness
// is guaranteed by the tuple <device_id, timestamp_ps, serial_number>.
void MaybeAddEventUniqueId(
const std::vector<const TraceEventTrack*>& event_tracks) {
std::vector<TraceEvent*> all_events;
std::vector<const std::vector<TraceEvent*>*> tracks_ptrs;
tracks_ptrs.reserve(event_tracks.size());
for (const auto* track : event_tracks) {
if (!track->empty()) {
tracks_ptrs.push_back(track);
}
}
nway_merge(tracks_ptrs, std::back_inserter(all_events),
TraceEventsComparator());

uint64_t last_ts = UINT64_MAX;
uint64_t serial = 0;
for (TraceEvent* event : all_events) {
if (event->timestamp_ps() == last_ts) {
event->set_serial(++serial);
} else {
serial = 0;
}
last_ts = event->timestamp_ps();
}
}

TraceEvent::EventType GetTraceEventType(const TraceEvent& event) {
return event.has_resource_id() ? TraceEvent::EVENT_TYPE_COMPLETE
: event.has_flow_id() ? TraceEvent::EVENT_TYPE_ASYNC
Expand Down Expand Up @@ -186,35 +200,147 @@ std::vector<TraceEvent*> MergeEventTracks(
}

std::vector<std::vector<const TraceEvent*>> GetEventsByLevel(
const Trace& trace, std::vector<TraceEvent*>& events) {
MaybeAddEventUniqueId(events);
const Trace& trace,
const std::vector<const TraceEventTrack*>& event_tracks) {
int num_threads = std::min(tsl::port::MaxParallelism(),
static_cast<int>(event_tracks.size()));
if (num_threads <= 0) num_threads = 1;

// Pass 1: Extract flows in parallel.
std::vector<std::vector<const TraceEvent*>> track_flow_events(
event_tracks.size());
{
auto executor = std::make_unique<XprofThreadPoolExecutor>(
"EventsByLevelParallel_Pass1", num_threads);
for (int i = 0; i < num_threads; ++i) {
executor->Execute([&, i] {
size_t start = (event_tracks.size() * i) / num_threads;
size_t end = (event_tracks.size() * (i + 1)) / num_threads;
for (size_t j = start; j < end; ++j) {
const TraceEventTrack* track = event_tracks[j];
for (const TraceEvent* event : *track) {
if (event->has_flow_id()) {
track_flow_events[j].push_back(event);
}
}
}
});
}
}

constexpr int kNumLevels = NumLevels();
// Merge and sort flows using N-way merge.
std::vector<const TraceEvent*> flow_events;
std::vector<const std::vector<const TraceEvent*>*> track_flow_events_ptrs;
track_flow_events_ptrs.reserve(track_flow_events.size());
for (const auto& vec : track_flow_events) {
if (!vec.empty()) {
track_flow_events_ptrs.push_back(&vec);
}
}
nway_merge(track_flow_events_ptrs, std::back_inserter(flow_events),
TraceEventsComparator());

// Track visibility per zoom level.
// Calculate flow visibility.
constexpr int kNumLevels = NumLevels();
tsl::profiler::Timespan trace_span = TraceSpan(trace);
std::vector<TraceViewerVisibility> visibility_by_level;
visibility_by_level.reserve(kNumLevels);
for (int zoom_level = 0; zoom_level < kNumLevels - 1; ++zoom_level) {
visibility_by_level.emplace_back(trace_span, LayerResolutionPs(zoom_level));
}

std::vector<std::vector<const TraceEvent*>> events_by_level(kNumLevels);
for (const TraceEvent* event : events) {
std::vector<absl::flat_hash_map<uint64_t, bool>> flow_visibility_by_level(
kNumLevels);

for (const TraceEvent* event : flow_events) {
int zoom_level = 0;
// Find the smallest zoom level on which we can distinguish this event.
for (; zoom_level < kNumLevels - 1; ++zoom_level) {
if (visibility_by_level[zoom_level].VisibleAtResolution(*event)) {
bool visible =
visibility_by_level[zoom_level].VisibleAtResolution(*event);
flow_visibility_by_level[zoom_level].try_emplace(event->flow_id(),
visible);
if (visible) {
break;
}
}
events_by_level[zoom_level].push_back(event);
// Record the visibility of this event in all higher zoom levels.
// An event on zoom level N can make events at zoom levels >N invisible.
for (++zoom_level; zoom_level < kNumLevels - 1; ++zoom_level) {
visibility_by_level[zoom_level].SetVisibleAtResolution(*event);
flow_visibility_by_level[zoom_level].try_emplace(event->flow_id(), true);
}
}

// Pass 2: Parallel track processing.
// track_events_by_level[track_index][zoom_level]
std::vector<std::vector<std::vector<const TraceEvent*>>>
track_events_by_level(
event_tracks.size(),
std::vector<std::vector<const TraceEvent*>>(kNumLevels));
{
auto executor = std::make_unique<XprofThreadPoolExecutor>(
"EventsByLevelParallel_Pass2", num_threads);
for (int i = 0; i < num_threads; ++i) {
executor->Execute([&, i] {
size_t start = (event_tracks.size() * i) / num_threads;
size_t end = (event_tracks.size() * (i + 1)) / num_threads;

for (size_t j = start; j < end; ++j) {
const TraceEventTrack* track = event_tracks[j];

std::vector<TraceViewerVisibility> track_visibility_by_level;
track_visibility_by_level.reserve(kNumLevels);
for (int zoom_level = 0; zoom_level < kNumLevels - 1; ++zoom_level) {
track_visibility_by_level.emplace_back(
trace_span, LayerResolutionPs(zoom_level));
}

for (const TraceEvent* event : *track) {
int zoom_level = 0;

if (event->has_flow_id()) {
for (; zoom_level < kNumLevels - 1; ++zoom_level) {
auto it =
flow_visibility_by_level[zoom_level].find(event->flow_id());
if (it != flow_visibility_by_level[zoom_level].end() &&
it->second) {
break;
}
}
} else {
for (; zoom_level < kNumLevels - 1; ++zoom_level) {
if (track_visibility_by_level[zoom_level].VisibleAtResolution(
*event)) {
break;
}
}
}

track_events_by_level[j][zoom_level].push_back(event);

for (++zoom_level; zoom_level < kNumLevels - 1; ++zoom_level) {
track_visibility_by_level[zoom_level].SetVisibleAtResolution(
*event);
}
}
}
});
}
}

// Final Merge using N-way merge per level.
std::vector<std::vector<const TraceEvent*>> events_by_level(kNumLevels);
for (int zoom_level = 0; zoom_level < kNumLevels; ++zoom_level) {
std::vector<const std::vector<const TraceEvent*>*> level_events_ptrs;
level_events_ptrs.reserve(event_tracks.size());
for (size_t j = 0; j < event_tracks.size(); ++j) {
if (!track_events_by_level[j][zoom_level].empty()) {
level_events_ptrs.push_back(&track_events_by_level[j][zoom_level]);
}
}
nway_merge(level_events_ptrs,
std::back_inserter(events_by_level[zoom_level]),
TraceEventsComparator());
}

return events_by_level;
}

Expand Down Expand Up @@ -255,8 +381,9 @@ absl::Status CreateAndSavePrefixTrie(
PrefixTrie prefix_trie;
for (int zoom_level = 0; zoom_level < events_by_level.size(); ++zoom_level) {
for (const TraceEvent* event : events_by_level[zoom_level]) {
uint64_t timestamp = event->timestamp_ps();
std::string event_id =
LevelDbTableKey(zoom_level, event->timestamp_ps(), event->serial());
LevelDbTableKey(zoom_level, timestamp, event->serial());
if (!event_id.empty()) {
prefix_trie.Insert(event->name(), event_id);
}
Expand Down Expand Up @@ -498,6 +625,7 @@ void TraceEventsContainerBase<EventFactory, RawData, Hash>::Merge(
other.trace_.Clear();
}


// Explicit instantiations for the common case.
template class TraceEventsContainerBase<EventFactory, RawData>;

Expand Down
35 changes: 32 additions & 3 deletions xprof/convert/trace_viewer/trace_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,19 @@ absl::Status DoReadFullEventFromLevelDbTable(
// Reads the trace metadata from a file with given path
absl::Status ReadFileTraceMetadata(std::string& filepath, Trace* trace);

// Returns all events grouped by visibility level.
// Events are assigned to the smallest zoom level on which they can be
// distinguished based on resolution. Visibility of an event at level N
// makes it visible at all higher levels (>N) as well, and can make other
// events at those levels invisible due to occlusion/downsampling.
// Flow events are handled specially to ensure consistency across tracks.
std::vector<std::vector<const TraceEvent*>> GetEventsByLevel(
const Trace& trace, std::vector<TraceEvent*>& events);
const Trace& trace,
const std::vector<const TraceEventTrack*>& event_tracks);

// Assigns serials to events with duplicate timestamps globally.
void MaybeAddEventUniqueId(
const std::vector<const TraceEventTrack*>& event_tracks);

// Return the minimum duration an event can have in `level`.
uint64_t LayerResolutionPs(unsigned level);
Expand Down Expand Up @@ -991,8 +1002,26 @@ class TraceEventsContainerBase {

// Returns all events grouped by visibility level.
std::vector<std::vector<const TraceEvent*>> EventsByLevel() const {
std::vector<TraceEvent*> events = SortedEvents();
return GetEventsByLevel(trace_, events);
std::vector<const TraceEventTrack*> event_tracks;
event_tracks.reserve(NumTracks());

ForAllMutableTracks([&](uint32_t device_id, ResourceValue resource_id,
TraceEventTrack* events) {
event_tracks.push_back(events);
});

XprofThreadPoolExecutor executor("EventsByLevelExecutor", 2);

std::vector<std::vector<const TraceEvent*>> events_by_level;

executor.Execute(
[&] { events_by_level = GetEventsByLevel(trace_, event_tracks); });

executor.Execute([&] { MaybeAddEventUniqueId(event_tracks); });

executor.JoinAll();

return events_by_level;
}

// Returns all events sorted using TraceEventsComparator.
Expand Down
18 changes: 15 additions & 3 deletions xprof/convert/trace_viewer/trace_events_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,21 @@ inline absl::string_view ResourceName(const Trace& trace,
// (descending) so nested events are sorted from outer to innermost.
struct TraceEventsComparator {
bool operator()(const TraceEvent* a, const TraceEvent* b) const {
if (a->timestamp_ps() < b->timestamp_ps()) return true;
if (a->timestamp_ps() > b->timestamp_ps()) return false;
return (a->duration_ps() > b->duration_ps());
if (a->timestamp_ps() != b->timestamp_ps()) {
return a->timestamp_ps() < b->timestamp_ps();
}
if (a->duration_ps() != b->duration_ps()) {
return a->duration_ps() > b->duration_ps();
}
if (a->device_id() != b->device_id()) {
return a->device_id() < b->device_id();
}
if (a->has_resource_id() && !b->has_resource_id()) return true;
if (!a->has_resource_id() && b->has_resource_id()) return false;
if (a->has_resource_id()) {
return a->resource_id() < b->resource_id();
}
return a->name() < b->name();
}
};

Expand Down
2 changes: 2 additions & 0 deletions xprof/convert/trace_viewer/trace_viewer_visibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class TraceViewerVisibility {
// self-explanatory (eg. MinDurationPs)
uint64_t ResolutionPs() const { return resolution_ps_; }

const absl::flat_hash_map<uint64_t, bool>& Flows() const { return flows_; }

private:
// Identifier for one Trace Viewer row.
using RowId = std::pair<uint32_t /*device_id*/, uint32_t /*resource_id*/>;
Expand Down
Loading