Skip to content

Commit ff239d7

Browse files
Pani122copybara-github
authored andcommitted
Refactor profile processors for unified 1P/3P architecture.
PiperOrigin-RevId: 900545483
1 parent d0ec6f0 commit ff239d7

8 files changed

+666
-0
lines changed

xprof/convert/BUILD

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,33 @@ cc_library(
4747
],
4848
)
4949

50+
cc_library(
51+
name = "unified_profile_processor",
52+
hdrs = ["unified_profile_processor.h"],
53+
deps = [
54+
":repository",
55+
":tool_options",
56+
"@com_google_absl//absl/status",
57+
"@com_google_absl//absl/status:statusor",
58+
"@com_google_absl//absl/strings",
59+
"@tsl//tsl/profiler/protobuf:xplane_proto_cc",
60+
],
61+
)
62+
63+
cc_library(
64+
name = "unified_profile_processor_factory",
65+
srcs = ["unified_profile_processor_factory.cc"],
66+
hdrs = ["unified_profile_processor_factory.h"],
67+
deps = [
68+
":tool_options",
69+
":unified_profile_processor",
70+
"@com_google_absl//absl/container:flat_hash_map",
71+
"@com_google_absl//absl/functional:any_invocable",
72+
"@com_google_absl//absl/log",
73+
"@com_google_absl//absl/strings",
74+
],
75+
)
76+
5077
cc_test(
5178
name = "preprocess_single_host_xplane_test",
5279
srcs = ["preprocess_single_host_xplane_test.cc"],
@@ -122,6 +149,53 @@ cc_library(
122149
],
123150
)
124151

152+
cc_library(
153+
name = "base_op_stats_processor",
154+
srcs = ["base_op_stats_processor.cc"],
155+
hdrs = ["base_op_stats_processor.h"],
156+
deps = [
157+
":file_utils",
158+
":op_stats_combiner",
159+
":preprocess_single_host_xplane",
160+
":repository",
161+
":tool_options",
162+
":unified_profile_processor",
163+
":xplane_to_op_stats",
164+
"@com_google_absl//absl/log",
165+
"@com_google_absl//absl/status",
166+
"@com_google_absl//absl/status:statusor",
167+
"@com_google_absl//absl/time",
168+
"@com_google_protobuf//:protobuf",
169+
"@org_xprof//plugin/xprof/protobuf:op_stats_proto_cc",
170+
"@org_xprof//xprof/utils:hardware_type_utils",
171+
"@org_xprof//xprof/utils:step_intersection",
172+
"@tsl//tsl/platform:path",
173+
"@tsl//tsl/profiler/protobuf:xplane_proto_cc",
174+
"@xla//xla/tsl/platform:env",
175+
"@xla//xla/tsl/platform:errors",
176+
"@xla//xla/tsl/platform:statusor",
177+
],
178+
)
179+
180+
cc_library(
181+
name = "unified_hlo_stats_processor",
182+
srcs = ["unified_hlo_stats_processor.cc"],
183+
hdrs = ["unified_hlo_stats_processor.h"],
184+
deps = [
185+
":base_op_stats_processor",
186+
":multi_xplanes_to_op_stats",
187+
":op_stats_to_hlo_stats",
188+
":tool_options",
189+
":unified_profile_processor_factory",
190+
"@com_google_absl//absl/status",
191+
"@org_xprof//plugin/xprof/protobuf:hlo_stats_proto_cc",
192+
"@org_xprof//plugin/xprof/protobuf:op_stats_proto_cc",
193+
"@tsl//tsl/profiler/protobuf:xplane_proto_cc",
194+
"@xla//xla/tsl/platform:errors",
195+
],
196+
alwayslink = 1,
197+
)
198+
125199
cc_library(
126200
name = "overview_page_processor",
127201
srcs = ["overview_page_processor.cc"],
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Copyright 2026 The OpenXLA Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
// ==============================================================================
15+
16+
#include "xprof/convert/base_op_stats_processor.h"
17+
18+
#include <cstdint>
19+
#include <limits>
20+
#include <optional>
21+
#include <string>
22+
#include <variant>
23+
#include <vector>
24+
25+
#include "absl/log/log.h"
26+
#include "absl/status/status.h"
27+
#include "absl/time/clock.h"
28+
#include "absl/time/time.h"
29+
#include "google/protobuf/arena.h"
30+
#include "xla/tsl/platform/env.h"
31+
#include "xla/tsl/platform/errors.h"
32+
#include "xla/tsl/platform/statusor.h"
33+
#include "tsl/platform/path.h"
34+
#include "tsl/profiler/protobuf/xplane.pb.h"
35+
#include "xprof/convert/file_utils.h"
36+
#include "xprof/convert/op_stats_combiner.h"
37+
#include "xprof/convert/preprocess_single_host_xplane.h"
38+
#include "xprof/convert/repository.h"
39+
#include "xprof/convert/tool_options.h"
40+
#include "xprof/convert/xplane_to_op_stats.h"
41+
#include "plugin/xprof/protobuf/op_stats.pb.h"
42+
#include "xprof/utils/hardware_type_utils.h"
43+
#include "xprof/utils/step_intersection.h"
44+
45+
namespace xprof {
46+
namespace {
47+
48+
using ::tensorflow::profiler::CombineAllOpStats;
49+
using ::tensorflow::profiler::ComputeStepIntersectionToMergeOpStats;
50+
using ::tensorflow::profiler::ConvertXSpaceToOpStats;
51+
using ::tensorflow::profiler::OpStats;
52+
using ::tensorflow::profiler::OpStatsInfo;
53+
using ::tensorflow::profiler::OpStatsOptions;
54+
using ::tensorflow::profiler::ParseHardwareType;
55+
using ::tensorflow::profiler::PreprocessSingleHostXSpace;
56+
using ::tensorflow::profiler::SessionSnapshot;
57+
using ::tensorflow::profiler::StepIntersection;
58+
using ::tensorflow::profiler::StoredDataType;
59+
using ::tensorflow::profiler::WriteBinaryProto;
60+
using ::tensorflow::profiler::XSpace;
61+
62+
std::string GetCacheFilePath(const SessionSnapshot& session_snapshot,
63+
const std::string& hostname) {
64+
StoredDataType cache_type = StoredDataType::OP_STATS;
65+
std::string filename =
66+
session_snapshot.GetHostDataFileName(cache_type, hostname).value_or("");
67+
return tsl::io::JoinPath(session_snapshot.GetSessionRunDir(), filename);
68+
}
69+
70+
bool GetUseSavedResult(const tensorflow::profiler::ToolOptions& options) {
71+
if (auto it = options.find("use_saved_result"); it != options.end()) {
72+
if (std::holds_alternative<bool>(it->second)) {
73+
return std::get<bool>(it->second);
74+
}
75+
}
76+
return false;
77+
}
78+
79+
bool AreAllOpStatsCached(const SessionSnapshot& session_snapshot) {
80+
for (int i = 0; i < session_snapshot.XSpaceSize(); ++i) {
81+
std::string hostname = session_snapshot.GetHostname(i);
82+
std::string cache_file_path = GetCacheFilePath(session_snapshot, hostname);
83+
if (!tsl::Env::Default()->FileExists(cache_file_path).ok()) {
84+
return false;
85+
}
86+
}
87+
return true;
88+
}
89+
90+
} // namespace
91+
92+
absl::StatusOr<std::string> BaseOpStatsProcessor::Map(
93+
const std::string& xspace_path) {
94+
std::vector<std::string> xspace_paths = {xspace_path};
95+
TF_ASSIGN_OR_RETURN(
96+
SessionSnapshot session_snapshot,
97+
SessionSnapshot::Create(xspace_paths, /*xspaces=*/std::nullopt));
98+
std::string hostname = session_snapshot.GetHostname(0);
99+
google::protobuf::Arena arena;
100+
TF_ASSIGN_OR_RETURN(XSpace * xspace, session_snapshot.GetXSpace(0, &arena));
101+
102+
return Map(session_snapshot, hostname, *xspace);
103+
}
104+
105+
absl::StatusOr<std::string> BaseOpStatsProcessor::Map(
106+
const SessionSnapshot& session_snapshot, const std::string& hostname,
107+
const XSpace& xspace) {
108+
std::string cache_file_path = GetCacheFilePath(session_snapshot, hostname);
109+
110+
if (tsl::Env::Default()->FileExists(cache_file_path).ok()) {
111+
return cache_file_path;
112+
}
113+
114+
XSpace temp_xspace = xspace;
115+
PreprocessSingleHostXSpace(&temp_xspace, /*step_grouping=*/true,
116+
/*derived_timeline=*/true);
117+
OpStatsOptions options;
118+
options.generate_op_metrics_db = true;
119+
options.generate_step_db = true;
120+
options.generate_kernel_stats_db = true;
121+
122+
TF_ASSIGN_OR_RETURN(OpStats op_stats,
123+
ConvertXSpaceToOpStats(temp_xspace, options));
124+
TF_RETURN_IF_ERROR(WriteBinaryProto(
125+
session_snapshot, StoredDataType::OP_STATS, hostname, op_stats));
126+
return cache_file_path;
127+
}
128+
129+
absl::Status BaseOpStatsProcessor::Reduce(
130+
const SessionSnapshot& session_snapshot,
131+
const std::vector<std::string>& map_output_files) {
132+
if (map_output_files.empty()) {
133+
return absl::InvalidArgumentError("map_output_files cannot be empty");
134+
}
135+
136+
std::vector<OpStats> all_op_stats;
137+
all_op_stats.reserve(map_output_files.size());
138+
139+
for (const auto& map_output_file : map_output_files) {
140+
OpStats op_stats;
141+
TF_RETURN_IF_ERROR(xprof::ReadBinaryProto(map_output_file, &op_stats));
142+
all_op_stats.push_back(op_stats);
143+
}
144+
145+
std::vector<OpStatsInfo> all_op_stats_info;
146+
all_op_stats_info.reserve(all_op_stats.size());
147+
std::vector<OpStats> op_stats_copy = all_op_stats;
148+
for (int i = 0; i < op_stats_copy.size(); i++) {
149+
all_op_stats_info.emplace_back(
150+
&op_stats_copy[i],
151+
ParseHardwareType(op_stats_copy[i].run_environment().device_type()), i);
152+
}
153+
154+
StepIntersection step_intersection = ComputeStepIntersectionToMergeOpStats(
155+
all_op_stats_info, std::numeric_limits<uint32_t>::max());
156+
157+
OpStats combined_op_stats;
158+
CombineAllOpStats(all_op_stats_info, step_intersection, &combined_op_stats);
159+
160+
TF_RETURN_IF_ERROR(WriteBinaryProto(
161+
session_snapshot, StoredDataType::OP_STATS,
162+
tensorflow::profiler::kAllHostsIdentifier, combined_op_stats));
163+
164+
return ProcessCombinedOpStats(session_snapshot, combined_op_stats, options_);
165+
}
166+
167+
bool BaseOpStatsProcessor::ShouldUseWorkerService(
168+
const SessionSnapshot& session_snapshot,
169+
const tensorflow::profiler::ToolOptions& options) const {
170+
if (session_snapshot.XSpaceSize() == 1) {
171+
return false;
172+
}
173+
174+
bool use_saved_result = GetUseSavedResult(options);
175+
if (!use_saved_result) {
176+
return true;
177+
}
178+
179+
return !AreAllOpStatsCached(session_snapshot);
180+
}
181+
182+
} // namespace xprof
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright 2026 The OpenXLA Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
// ==============================================================================
15+
16+
#ifndef THIRD_PARTY_XPROF_CONVERT_BASE_OP_STATS_PROCESSOR_H_
17+
#define THIRD_PARTY_XPROF_CONVERT_BASE_OP_STATS_PROCESSOR_H_
18+
19+
#include <string>
20+
#include <vector>
21+
22+
#include "absl/status/status.h"
23+
#include "absl/status/statusor.h"
24+
#include "tsl/profiler/protobuf/xplane.pb.h"
25+
#include "xprof/convert/tool_options.h"
26+
#include "xprof/convert/unified_profile_processor.h"
27+
#include "plugin/xprof/protobuf/op_stats.pb.h"
28+
29+
namespace xprof {
30+
31+
// Unified base class for OpStats processors across 1P and 3P environments.
32+
// Inherits virtually from UnifiedProfileProcessor to support combination with
33+
// other intermediate structural classes.
34+
class BaseOpStatsProcessor : public virtual UnifiedProfileProcessor {
35+
public:
36+
explicit BaseOpStatsProcessor(
37+
const tensorflow::profiler::ToolOptions& options)
38+
: options_(options) {}
39+
40+
virtual ~BaseOpStatsProcessor() = default;
41+
42+
// Converts XSpace to serialized OpStats.
43+
absl::StatusOr<std::string> Map(
44+
const tensorflow::profiler::SessionSnapshot& session_snapshot,
45+
const std::string& hostname,
46+
const tensorflow::profiler::XSpace& xspace) override;
47+
48+
absl::StatusOr<std::string> Map(const std::string& xspace_path) override;
49+
50+
// Deserializes map_outputs, combines OpStats, and delegates to
51+
// ProcessCombinedOpStats.
52+
absl::Status Reduce(
53+
const tensorflow::profiler::SessionSnapshot& session_snapshot,
54+
const std::vector<std::string>& map_output_files) override;
55+
56+
// Base implementation for tools not needing distributed mapping.
57+
absl::Status ProcessSession(
58+
const tensorflow::profiler::SessionSnapshot& session_snapshot,
59+
const tensorflow::profiler::ToolOptions& options) override {
60+
return absl::UnimplementedError("ProcessSession not implemented");
61+
}
62+
63+
bool ShouldUseWorkerService(
64+
const tensorflow::profiler::SessionSnapshot& session_snapshot,
65+
const tensorflow::profiler::ToolOptions& options) const override;
66+
67+
// Extension point for subclasses to process the merged OpStats.
68+
virtual absl::Status ProcessCombinedOpStats(
69+
const tensorflow::profiler::SessionSnapshot& session_snapshot,
70+
const tensorflow::profiler::OpStats& combined_op_stats,
71+
const tensorflow::profiler::ToolOptions& options) = 0;
72+
73+
protected:
74+
tensorflow::profiler::ToolOptions options_;
75+
};
76+
77+
} // namespace xprof
78+
79+
#endif // THIRD_PARTY_XPROF_CONVERT_BASE_OP_STATS_PROCESSOR_H_

0 commit comments

Comments
 (0)