Skip to content

Commit d9eb1ac

Browse files
committed
Multi-threaded patches
1 parent 66935f4 commit d9eb1ac

File tree

2 files changed

+146
-136
lines changed

2 files changed

+146
-136
lines changed

synapseclient/core/download/download_functions.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import os
88
import shutil
99
import sys
10+
import threading
1011
import urllib.parse as urllib_urlparse
1112
import urllib.request as urllib_request
1213
from typing import TYPE_CHECKING, Dict, Optional, Union
@@ -533,16 +534,35 @@ def download_fn(
533534
concrete_type=concrete_type
534535
) as monitor:
535536
try:
537+
# Create a thread-safe progress callback that properly tracks delta
538+
def multi_thread_progress_callback(bytes_transferred, total_bytes):
539+
# Use a thread local storage to track last bytes per thread
540+
thread_id = threading.get_ident()
541+
with monitor._lock: # Use monitor's lock for thread safety
542+
if not hasattr(monitor, '_thread_bytes'):
543+
# Initialize thread-local storage for byte tracking
544+
monitor._thread_bytes = {}
545+
546+
# Calculate bytes transferred since last update for this thread
547+
last_bytes = monitor._thread_bytes.get(thread_id, 0)
548+
bytes_delta = bytes_transferred - last_bytes
549+
550+
# Only update if we have actual progress
551+
if bytes_delta > 0:
552+
# Store the new value for next comparison
553+
monitor._thread_bytes[thread_id] = bytes_transferred
554+
# Update the monitor with the bytes transferred in this update
555+
# (update method has its own lock)
556+
monitor.update(bytes_delta)
557+
536558
downloaded_path = await download_from_url_multi_threaded(
537559
file_handle_id=file_handle_id,
538560
object_id=synapse_id,
539561
object_type=entity_type,
540562
destination=destination,
541563
expected_md5=actual_md5,
542564
synapse_client=syn,
543-
progress_callback=lambda bytes_transferred, total_bytes: monitor.update(
544-
bytes_transferred - getattr(monitor, '_last_transferred_bytes', 0)
545-
) if hasattr(monitor, '_last_transferred_bytes') else setattr(monitor, '_last_transferred_bytes', bytes_transferred)
565+
progress_callback=multi_thread_progress_callback
546566
)
547567
# Update the monitor with the result
548568
monitor.transferred_bytes = actual_file_size

synapseclient/core/telemetry_integration.py

Lines changed: 123 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,7 @@ class TransferMonitor:
525525
"""
526526
Helper class to track progress during file transfers.
527527
Separates metrics collection from trace data.
528+
Thread-safe for multi-threaded environments.
528529
"""
529530

530531
def __init__(
@@ -553,6 +554,11 @@ def __init__(
553554
self.metric_attributes = metric_attributes or {}
554555
self._events = []
555556

557+
# Thread lock for safe updates in multi-threaded environments
558+
self._lock = threading.RLock()
559+
# Dictionary to track per-thread byte progress
560+
self._thread_bytes = {}
561+
556562
# For tracking throughput statistics
557563
self._throughput_window_size = 10 # Number of recent updates to track
558564
self._throughput_samples = deque(maxlen=self._throughput_window_size)
@@ -570,31 +576,32 @@ def update(self, bytes_count: int):
570576
if bytes_count <= 0:
571577
return
572578

573-
# Update internal counter
574-
self.transferred_bytes += bytes_count
579+
with self._lock:
580+
# Update internal counter
581+
self.transferred_bytes += bytes_count
575582

576-
# Update progress bar if available
577-
if self.progress_bar:
578-
self.progress_bar.update(bytes_count)
583+
# Update progress bar if available
584+
if self.progress_bar:
585+
self.progress_bar.update(bytes_count)
579586

580-
# Calculate throughput for this update
581-
current_time = time.time()
582-
time_delta = current_time - self._last_update_time
587+
# Calculate throughput for this update
588+
current_time = time.time()
589+
time_delta = current_time - self._last_update_time
583590

584-
if time_delta > 0:
585-
# Only record throughput if time has passed
586-
throughput_mbps = (bytes_count / 1_000_000) / time_delta
587-
self._throughput_samples.append(throughput_mbps)
591+
if time_delta > 0:
592+
# Only record throughput if time has passed
593+
throughput_mbps = (bytes_count / 1_000_000) / time_delta
594+
self._throughput_samples.append(throughput_mbps)
588595

589-
# Update metrics in real-time
590-
transfer_bytes_counter.add(bytes_count, self.metric_attributes)
596+
# Update metrics in real-time
597+
transfer_bytes_counter.add(bytes_count, self.metric_attributes)
591598

592-
# Record trace data
593-
self._record_trace_progress(bytes_count)
599+
# Record trace data
600+
self._record_trace_progress(bytes_count)
594601

595-
# Update state for next calculation
596-
self._last_update_time = current_time
597-
self._last_bytes = bytes_count
602+
# Update state for next calculation
603+
self._last_update_time = current_time
604+
self._last_bytes = bytes_count
598605

599606
def _record_trace_progress(self, bytes_count: int):
600607
"""
@@ -616,9 +623,10 @@ def update_progress_percent(self):
616623
"""
617624
Update the progress percentage attribute in the span.
618625
"""
619-
if self.total_size and self.total_size > 0:
620-
progress_percent = min(100.0, (self.transferred_bytes / self.total_size) * 100)
621-
self.span.set_attribute("synapse.transfer.progress_percent", progress_percent)
626+
with self._lock:
627+
if self.total_size and self.total_size > 0:
628+
progress_percent = min(100.0, (self.transferred_bytes / self.total_size) * 100)
629+
self.span.set_attribute("synapse.transfer.progress_percent", progress_percent)
622630

623631
def set_chunks_info(self, total_chunks: int):
624632
"""
@@ -627,28 +635,30 @@ def set_chunks_info(self, total_chunks: int):
627635
Args:
628636
total_chunks: Total number of chunks to transfer
629637
"""
630-
self.chunks_total = total_chunks
631-
self.span.set_attribute("synapse.transfer.chunks_total", total_chunks)
638+
with self._lock:
639+
self.chunks_total = total_chunks
640+
self.span.set_attribute("synapse.transfer.chunks_total", total_chunks)
632641

633642
def chunk_completed(self):
634643
"""Mark a chunk as completed in a multipart transfer."""
635-
self.chunks_completed += 1
644+
with self._lock:
645+
self.chunks_completed += 1
636646

637-
# Update metrics
638-
chunk_counter.add(1, self.metric_attributes)
647+
# Update metrics
648+
chunk_counter.add(1, self.metric_attributes)
639649

640-
# Update trace attributes
641-
self.span.set_attribute("synapse.transfer.chunks_completed", self.chunks_completed)
650+
# Update trace attributes
651+
self.span.set_attribute("synapse.transfer.chunks_completed", self.chunks_completed)
642652

643-
# Update progress percentage based on chunks if available
644-
if self.chunks_total > 0:
645-
chunk_progress = (self.chunks_completed / self.chunks_total) * 100
646-
self.span.set_attribute("synapse.transfer.chunks_progress_percent", chunk_progress)
653+
# Update progress percentage based on chunks if available
654+
if self.chunks_total > 0:
655+
chunk_progress = (self.chunks_completed / self.chunks_total) * 100
656+
self.span.set_attribute("synapse.transfer.chunks_progress_percent", chunk_progress)
647657

648-
self._add_event("chunk_completed", {
649-
"chunk_number": self.chunks_completed,
650-
"chunks_total": self.chunks_total
651-
})
658+
self._add_event("chunk_completed", {
659+
"chunk_number": self.chunks_completed,
660+
"chunks_total": self.chunks_total
661+
})
652662

653663
def chunk_retry(self, chunk_number: int, error: Optional[Exception] = None):
654664
"""
@@ -658,18 +668,19 @@ def chunk_retry(self, chunk_number: int, error: Optional[Exception] = None):
658668
chunk_number: The chunk number that failed
659669
error: Optional exception that caused the retry
660670
"""
661-
# Update metrics
662-
retry_attributes = dict(self.metric_attributes)
663-
if error:
664-
retry_attributes["error_type"] = type(error).__name__
665-
chunk_retry_counter.add(1, retry_attributes)
671+
with self._lock:
672+
# Update metrics
673+
retry_attributes = dict(self.metric_attributes)
674+
if error:
675+
retry_attributes["error_type"] = type(error).__name__
676+
chunk_retry_counter.add(1, retry_attributes)
666677

667-
event_attrs = {"chunk_number": chunk_number}
668-
if error:
669-
event_attrs["error"] = str(error)
670-
event_attrs["error_type"] = type(error).__name__
678+
event_attrs = {"chunk_number": chunk_number}
679+
if error:
680+
event_attrs["error"] = str(error)
681+
event_attrs["error_type"] = type(error).__name__
671682

672-
self._add_event("chunk_retry", event_attrs)
683+
self._add_event("chunk_retry", event_attrs)
673684

674685
def record_retry(self, error: Optional[Exception] = None):
675686
"""
@@ -678,23 +689,24 @@ def record_retry(self, error: Optional[Exception] = None):
678689
Args:
679690
error: Optional exception that caused the retry
680691
"""
681-
self.retry_count += 1
692+
with self._lock:
693+
self.retry_count += 1
682694

683-
# Update metrics
684-
retry_attributes = dict(self.metric_attributes)
685-
if error:
686-
retry_attributes["error_type"] = type(error).__name__
687-
retry_counter.add(1, retry_attributes)
695+
# Update metrics
696+
retry_attributes = dict(self.metric_attributes)
697+
if error:
698+
retry_attributes["error_type"] = type(error).__name__
699+
retry_counter.add(1, retry_attributes)
688700

689-
# Update trace attributes
690-
self.span.set_attribute("synapse.operation.retry_count", self.retry_count)
701+
# Update trace attributes
702+
self.span.set_attribute("synapse.operation.retry_count", self.retry_count)
691703

692-
event_attrs = {"retry_number": self.retry_count}
693-
if error:
694-
event_attrs["error"] = str(error)
695-
event_attrs["error_type"] = type(error).__name__
704+
event_attrs = {"retry_number": self.retry_count}
705+
if error:
706+
event_attrs["error"] = str(error)
707+
event_attrs["error_type"] = type(error).__name__
696708

697-
self._add_event("transfer_retry", event_attrs)
709+
self._add_event("transfer_retry", event_attrs)
698710

699711
def record_cache_hit(self, hit: bool = True):
700712
"""
@@ -703,90 +715,68 @@ def record_cache_hit(self, hit: bool = True):
703715
Args:
704716
hit: Whether the cache was hit (True) or missed (False)
705717
"""
706-
self.span.set_attribute("synapse.cache.hit", hit)
718+
with self._lock:
719+
self.span.set_attribute("synapse.cache.hit", hit)
707720

708-
# Record in metrics
709-
cache_attributes = dict(self.metric_attributes)
710-
cache_attributes["cache_hit"] = hit
711-
cache_counter.add(1, cache_attributes)
721+
# Record in metrics
722+
cache_attributes = dict(self.metric_attributes)
723+
cache_attributes["cache_hit"] = hit
724+
cache_counter.add(1, cache_attributes)
712725

713-
self._add_event("cache_access", {"hit": hit})
726+
self._add_event("cache_access", {"hit": hit})
714727

715728
def record_final_statistics(self):
716729
"""
717730
Record final throughput statistics at the end of the transfer.
718731
"""
719-
if not self._throughput_samples:
720-
return
721-
722-
# Calculate statistics only if we have samples
723-
try:
724-
avg_throughput = sum(self._throughput_samples) / len(self._throughput_samples)
725-
max_throughput = max(self._throughput_samples)
726-
min_throughput = min(self._throughput_samples)
727-
728-
# Record in trace for detailed analysis
729-
self.span.set_attribute("synapse.transfer.throughput.avg_mbps", avg_throughput)
730-
self.span.set_attribute("synapse.transfer.throughput.max_mbps", max_throughput)
731-
self.span.set_attribute("synapse.transfer.throughput.min_mbps", min_throughput)
732-
733-
if len(self._throughput_samples) >= 2:
734-
# Only calculate standard deviation if we have multiple samples
735-
stdev = statistics.stdev(self._throughput_samples)
736-
self.span.set_attribute("synapse.transfer.throughput.stdev_mbps", stdev)
737-
738-
# Calculate jitter (variability)
739-
jitter = stdev / avg_throughput if avg_throughput > 0 else 0
740-
self.span.set_attribute("synapse.transfer.throughput.jitter", jitter)
741-
742-
# Calculate and record percentiles
743-
sorted_samples = sorted(self._throughput_samples)
744-
median = sorted_samples[len(sorted_samples) // 2]
745-
self.span.set_attribute("synapse.transfer.throughput.median_mbps", median)
746-
747-
# Record 95th percentile if we have enough samples
748-
if len(sorted_samples) >= 5:
749-
p95_index = int(len(sorted_samples) * 0.95)
750-
p95 = sorted_samples[p95_index]
751-
self.span.set_attribute("synapse.transfer.throughput.p95_mbps", p95)
752-
753-
except (ValueError, ZeroDivisionError, statistics.StatisticsError):
754-
# Catch specific calculation errors
755-
pass
756-
757-
def set_http_attributes(self, method: str, status_code: int, route: Optional[str] = None):
758-
"""
759-
Set HTTP method and status code attributes.
760-
761-
Args:
762-
method: HTTP method (GET, POST, PUT, DELETE, etc.)
763-
status_code: HTTP status code (200, 404, 500, etc.)
764-
route: API endpoint pattern (not full URL)
765-
"""
766-
self.span.set_attribute("http.method", method)
767-
self.span.set_attribute("http.status_code", status_code)
768-
if route:
769-
self.span.set_attribute("http.route", route)
770-
771-
def set_storage_provider(self, provider: str):
772-
"""
773-
Set the storage provider attribute.
774-
775-
Args:
776-
provider: Storage provider (S3, SFTP, Azure, GCS, Local)
777-
"""
778-
self.span.set_attribute("synapse.storage.provider", provider)
779-
780-
# Update metric attributes for future metrics
781-
self.metric_attributes["storage_provider"] = provider
732+
with self._lock:
733+
if not self._throughput_samples:
734+
return
735+
736+
# Calculate statistics only if we have samples
737+
try:
738+
avg_throughput = sum(self._throughput_samples) / len(self._throughput_samples)
739+
max_throughput = max(self._throughput_samples)
740+
min_throughput = min(self._throughput_samples)
741+
742+
# Record in trace for detailed analysis
743+
self.span.set_attribute("synapse.transfer.throughput.avg_mbps", avg_throughput)
744+
self.span.set_attribute("synapse.transfer.throughput.max_mbps", max_throughput)
745+
self.span.set_attribute("synapse.transfer.throughput.min_mbps", min_throughput)
746+
747+
if len(self._throughput_samples) >= 2:
748+
# Only calculate standard deviation if we have multiple samples
749+
stdev = statistics.stdev(self._throughput_samples)
750+
self.span.set_attribute("synapse.transfer.throughput.stdev_mbps", stdev)
751+
752+
# Calculate jitter (variability)
753+
jitter = stdev / avg_throughput if avg_throughput > 0 else 0
754+
self.span.set_attribute("synapse.transfer.throughput.jitter", jitter)
755+
756+
# Calculate and record percentiles
757+
sorted_samples = sorted(self._throughput_samples)
758+
median = sorted_samples[len(sorted_samples) // 2]
759+
self.span.set_attribute("synapse.transfer.throughput.median_mbps", median)
760+
761+
# Record 95th percentile if we have enough samples
762+
if len(sorted_samples) >= 5:
763+
p95_index = int(len(sorted_samples) * 0.95)
764+
p95 = sorted_samples[p95_index]
765+
self.span.set_attribute("synapse.transfer.throughput.p95_mbps", p95)
766+
except Exception:
767+
# Don't fail the transfer if statistics calculation fails
768+
pass
782769

783770
def _add_event(self, name: str, attributes: Dict[str, Any]):
784771
"""
785-
Add an event to the span with the given name and attributes.
772+
Add a timestamped event to the span.
786773
787774
Args:
788-
name: Name of the event
789-
attributes: Attributes to add to the event
775+
name: Event name
776+
attributes: Event attributes dictionary
790777
"""
778+
# Store events for potential later analysis
779+
self._events.append((name, attributes))
780+
781+
# Add to trace
791782
self.span.add_event(name, attributes)
792-
self._events.append({"name": name, "attributes": attributes})

0 commit comments

Comments
 (0)