Skip to content

Commit 392c71c

Browse files
thedavekwonfacebook-github-bot
authored andcommitted
Revisit mesh telemetry IDs
Summary: ActorMeshId values are only unique within a ProcMesh, so actor mesh telemetry IDs must include both `ProcMeshId` and `ActorMeshId`. This showed up with the sidecar design because entity memtables are now per host: singleton actor mesh IDs from multiple proc meshes on the same host land in the same table, so hashing only the actor mesh name makes those singleton rows collide. Previously, this inconsistency was hidden. Add `telemetry_actor_mesh_id` as the shared derivation helper, and use it for actor mesh creation, sent-message telemetry, actor rows, and the bootstrap client mesh so `sent_messages.actor_mesh_id`, `meshes.id`, and `actors.mesh_id` stay joinable. Also align host and proc mesh row IDs to hash typed `HostMeshId` and `ProcMeshId` values, so `meshes.parent_mesh_id` uses the same derivation as the parent `meshes.id` row. Caveat: this does not fully address potential collisions for `ProcMeshId::singleton` values themselves; proc mesh telemetry IDs may also need parent scoping if multiple singleton proc meshes can share a host-local memtable. Differential Revision: D107178870
1 parent 7205785 commit 392c71c

6 files changed

Lines changed: 41 additions & 14 deletions

File tree

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ use crate::mesh_id::ActorMeshId;
7575
use crate::metrics;
7676
use crate::proc_agent::ActorState;
7777
use crate::proc_mesh::GET_ACTOR_STATE_MAX_IDLE;
78+
use crate::proc_mesh::telemetry_actor_mesh_id;
7879
use crate::resource;
7980
use crate::supervision::MeshFailure;
8081
use crate::supervision::Unhealthy;
@@ -538,7 +539,7 @@ impl<A: Referable> ActorMeshRef<A> {
538539
hyperactor_telemetry::notify_sent_message(hyperactor_telemetry::SentMessageEvent {
539540
timestamp: std::time::SystemTime::now(),
540541
sender_actor_id: hyperactor_telemetry::hash_to_u64(cx.mailbox().actor_addr().id()),
541-
actor_mesh_id: hyperactor_telemetry::hash_to_u64(&self.id.to_string()),
542+
actor_mesh_id: telemetry_actor_mesh_id(self.proc_mesh.id(), &self.id),
542543
view_json: serde_json::to_string(view::Ranked::region(self)).unwrap_or_default(),
543544
shape_json: {
544545
let shape: ndslice::Shape = view::Ranked::region(self).into();

hyperactor_mesh/src/host_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ impl HostMesh {
381381
/// Emit a telemetry event for this host mesh creation.
382382
fn notify_created(&self) {
383383
let name_str = self.id.to_string();
384-
let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
384+
let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&self.id);
385385

386386
hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
387387
id: mesh_id_hash,

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use hyperactor::supervision::ActorSupervisionEvent;
3232
use hyperactor_config::CONFIG;
3333
use hyperactor_config::ConfigAttr;
3434
use hyperactor_config::attrs::declare_attrs;
35+
use hyperactor_telemetry::hash_to_u64;
3536
use ndslice::Extent;
3637
use ndslice::ViewExt as _;
3738
use ndslice::view;
@@ -86,6 +87,11 @@ declare_attrs! {
8687
/// present as a system actor (`system_children`) on every proc mesh member.
8788
pub const COMM_ACTOR_NAME: &str = "comm";
8889

90+
/// Returns the telemetry `meshes.id` value for an actor mesh.
91+
pub fn telemetry_actor_mesh_id(proc_mesh_id: &ProcMeshId, actor_mesh_id: &ActorMeshId) -> u64 {
92+
hash_to_u64(&(proc_mesh_id, actor_mesh_id))
93+
}
94+
8995
/// A reference to a single [`hyperactor::Proc`].
9096
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
9197
pub struct ProcRef {
@@ -176,13 +182,13 @@ impl ProcMesh {
176182
// Notify telemetry that the ProcAgent mesh was created.
177183
{
178184
let name_str = id.to_string();
179-
let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&name_str);
185+
let mesh_id_hash = hash_to_u64(&id);
180186

181187
let hm = current_ref
182188
.host_mesh
183189
.as_ref()
184190
.expect("ProcMesh always has a host mesh");
185-
let parent_mesh_id = hyperactor_telemetry::hash_to_u64(&hm.id().to_string());
191+
let parent_mesh_id = hash_to_u64(hm.id());
186192
let parent_view_json = serde_json::to_string(hm.region())
187193
.unwrap_or_else(|e| format!("encountered error when serializing region: {}", e));
188194

@@ -848,12 +854,9 @@ impl ProcMeshRef {
848854
{
849855
let id_str = mesh.id().to_string();
850856

851-
// Hash the actor mesh id. This is used as mesh_id for both
852-
// the MeshEvent and the per-actor ActorEvents below.
853-
let mesh_id_hash = hyperactor_telemetry::hash_to_u64(&id_str);
854-
855857
// Hash the proc mesh id for parent_mesh_id.
856-
let parent_mesh_id_hash = hyperactor_telemetry::hash_to_u64(&self.id().to_string());
858+
let parent_mesh_id_hash = hash_to_u64(self.id());
859+
let mesh_id_hash = telemetry_actor_mesh_id(self.id(), mesh.id());
857860

858861
hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
859862
id: mesh_id_hash,

hyperactor_telemetry/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ pub struct SentMessageEvent {
410410
pub timestamp: SystemTime,
411411
/// Hash of the sending actor's ActorId.
412412
pub sender_actor_id: u64,
413-
/// Hash of the target actor mesh's name.
413+
/// Hash of the target actor mesh's `(ProcMeshId, ActorMeshId)`.
414414
pub actor_mesh_id: u64,
415415
/// The view (slice) of the actor mesh that was targeted, serialized from
416416
/// [`ndslice::Region`]. For full-mesh sends (call, broadcast) this covers

monarch_hyperactor/src/host_mesh.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,15 @@ use hyperactor_mesh::host_mesh::host_agent::GetLocalProcClient;
2929
use hyperactor_mesh::host_mesh::host_agent::HostAgent;
3030
use hyperactor_mesh::host_mesh::host_agent::ShutdownHost;
3131
use hyperactor_mesh::mesh_admin::MeshAdminMessageClient;
32+
use hyperactor_mesh::mesh_id::ActorMeshId;
3233
use hyperactor_mesh::mesh_id::HostMeshId;
3334
use hyperactor_mesh::mesh_id::ProcMeshId;
3435
use hyperactor_mesh::proc_agent::GetProcClient;
3536
use hyperactor_mesh::proc_mesh::ProcRef;
37+
use hyperactor_mesh::proc_mesh::telemetry_actor_mesh_id;
3638
use hyperactor_mesh::shared_cell::SharedCell;
3739
use hyperactor_mesh::transport::default_bind_spec;
40+
use hyperactor_telemetry::hash_to_u64;
3841
use ndslice::View;
3942
use ndslice::view::RankedSliceable;
4043
use pyo3::IntoPyObjectExt;
@@ -408,7 +411,7 @@ fn bootstrap_host(bootstrap_cmd: Option<PyBootstrapCommand>) -> PyResult<PyPytho
408411
let now = std::time::SystemTime::now();
409412

410413
let host_name_str = host_mesh.id().to_string();
411-
let host_mesh_id = hyperactor_telemetry::hash_to_u64(&host_name_str);
414+
let host_mesh_id = hash_to_u64(host_mesh.id());
412415
hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
413416
id: host_mesh_id,
414417
timestamp: now,
@@ -436,7 +439,7 @@ fn bootstrap_host(bootstrap_cmd: Option<PyBootstrapCommand>) -> PyResult<PyPytho
436439
});
437440

438441
let proc_id_str = proc_mesh.id().to_string();
439-
let proc_mesh_id = hyperactor_telemetry::hash_to_u64(&proc_id_str);
442+
let proc_mesh_id = hash_to_u64(proc_mesh.id());
440443
hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
441444
id: proc_mesh_id,
442445
timestamp: now,
@@ -463,8 +466,9 @@ fn bootstrap_host(bootstrap_cmd: Option<PyBootstrapCommand>) -> PyResult<PyPytho
463466
display_name: None,
464467
});
465468

469+
let client_mesh_actor_id = ActorMeshId::singleton(Label::new("client").unwrap());
466470
let client_mesh_name = format!("{}/client", proc_mesh.id());
467-
let client_mesh_id = hyperactor_telemetry::hash_to_u64(&client_mesh_name);
471+
let client_mesh_id = telemetry_actor_mesh_id(proc_mesh.id(), &client_mesh_actor_id);
468472
hyperactor_telemetry::notify_mesh_created(hyperactor_telemetry::MeshEvent {
469473
id: client_mesh_id,
470474
timestamp: now,

python/tests/test_distributed_telemetry.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,7 @@ def test_sent_messages_table(
807807
cast_with_selection in actor_mesh.rs, which calls notify_sent_message
808808
with a SentMessageEvent containing:
809809
- sender_actor_id: hash of the sending actor's ActorId
810-
- actor_mesh_id: hash of the target actor mesh name
810+
- actor_mesh_id: hash of the target (ProcMeshId, ActorMeshId)
811811
- view_json: serialized ndslice::Region of the current view
812812
- shape_json: serialized ndslice::Shape (converted from the Region)
813813
"""
@@ -860,6 +860,25 @@ def test_sent_messages_table(
860860
f"Expected 42 sent_messages via {send_path}, got {joined_count}"
861861
)
862862

863+
actor_joined = engine.query(
864+
"SELECT COUNT(DISTINCT sm.id) AS message_count, "
865+
"COUNT(DISTINCT a.id) AS actor_count "
866+
"FROM sent_messages sm "
867+
"JOIN actors a ON sm.actor_mesh_id = a.mesh_id "
868+
"JOIN meshes m ON a.mesh_id = m.id "
869+
f"WHERE m.given_name = '{mesh_name}'"
870+
)
871+
actor_joined_dict = actor_joined.to_pydict()
872+
joined_message_count = actor_joined_dict["message_count"][0]
873+
joined_actor_count = actor_joined_dict["actor_count"][0]
874+
assert joined_message_count == 42, (
875+
"Expected sent_messages.actor_mesh_id to join actors.mesh_id for "
876+
f"{send_path}, got {joined_message_count} messages"
877+
)
878+
assert joined_actor_count == 2, (
879+
f"Expected 2 target actors for {send_path}, got {joined_actor_count}"
880+
)
881+
863882
# Verify view_json (ndslice Region) and shape_json (ndslice Shape).
864883
# Region serializes as {"labels": [...], "slice": {"offset": ..., "sizes": [...], "strides": [...]}}.
865884
# Shape is Region converted via Region::into::<Shape>, same serialization format.

0 commit comments

Comments
 (0)