Skip to content

Commit d68a67a

Browse files
Record skipped files in prometheus (#642)
* Added Prometheus gauge to keep track of files skipped due to file transfer end time on a per RSyncer basis * Added FastAPI server endpoint to register skipped files * Added logic to reset skipped file counter when a flush is triggered for a given RSyncer --------- Co-authored-by: Eu Pin Tien <[email protected]>
1 parent 41e0dfa commit d68a67a

File tree

9 files changed

+66
-19
lines changed

9 files changed

+66
-19
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,8 +667,22 @@ def _increment_transferred_files_prometheus(
667667
requests.post(url, json=data)
668668

669669
def _increment_transferred_files(
670-
self, updates: List[RSyncerUpdate], source: str, destination: str
670+
self,
671+
updates: List[RSyncerUpdate],
672+
num_skipped_files: int,
673+
source: str,
674+
destination: str,
671675
):
676+
skip_url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_skipped_files_prometheus', visit_name=self._environment.visit)}"
677+
requests.post(
678+
skip_url,
679+
json={
680+
"source": source,
681+
"session_id": self.session_id,
682+
"increment_count": num_skipped_files,
683+
},
684+
)
685+
672686
checked_updates = [
673687
update for update in updates if update.outcome is TransferResult.SUCCESS
674688
]

src/murfey/client/rsync.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ def _fake_transfer(self, files: list[Path]) -> bool:
309309
self.notify(update)
310310
updates.append(update)
311311
time.sleep(0.01)
312-
self.notify([update], secondary=True)
312+
self.notify([update], num_skipped_files=0, secondary=True)
313313
# self.notify(updates, secondary=True)
314314

315315
return True
@@ -328,8 +328,10 @@ def _transfer(self, infiles: list[Path]) -> bool:
328328
if f.is_file() and f.stat().st_ctime < self._end_time.timestamp()
329329
]
330330
self._skipped_files.extend(set(infiles).difference(set(files)))
331+
num_skipped_files = len(set(infiles).difference(set(files)))
331332
else:
332333
files = [f for f in infiles if f.is_file()]
334+
num_skipped_files = 0
333335

334336
previously_transferred = self._files_transferred
335337
transfer_success: set[Path] = set()
@@ -528,7 +530,9 @@ def parse_stderr(line: str):
528530
if success:
529531
success = result.returncode == 0
530532

531-
self.notify(successful_updates, secondary=True)
533+
self.notify(
534+
successful_updates, num_skipped_files=num_skipped_files, secondary=True
535+
)
532536

533537
# Print out a summary message for each file transfer batch instead of individual messages
534538
# List out file paths as stored in memory to see if issue is due to file path mismatch

src/murfey/server/api/instrument.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from sqlmodel import select
1414
from werkzeug.utils import secure_filename
1515

16+
import murfey.server.prometheus as prom
1617
from murfey.server.api.auth import MurfeyInstrumentNameFrontend as MurfeyInstrumentName
1718
from murfey.server.api.auth import MurfeySessionIDFrontend as MurfeySessionID
1819
from murfey.server.api.auth import (
@@ -555,8 +556,8 @@ async def flush_skipped_rsyncer(
555556
db.commit()
556557

557558
# Send request to flush rsyncer
558-
data: dict = {}
559559
update_result: dict = {}
560+
flush_result: dict = {}
560561
machine_config = get_machine_config(instrument_name=instrument_name)[
561562
instrument_name
562563
]
@@ -583,8 +584,14 @@ async def flush_skipped_rsyncer(
583584
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
584585
},
585586
) as resp:
586-
data = await resp.json()
587-
return data
587+
flush_result = await resp.json()
588+
if not flush_result.get("success", False):
589+
return {"success": False}
590+
# Reset the skipped file count for the specific Prometheus gauge to 0
591+
prom.skipped_files.labels(
592+
rsync_source=rsyncer_source.source, visit=session_entry.visit
593+
).set(0)
594+
return flush_result
588595

589596

590597
class RSyncerInfo(BaseModel):

src/murfey/server/api/prometheus.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from murfey.server.murfey_db import murfey_db
1313
from murfey.util import sanitise
1414
from murfey.util.db import RsyncInstance
15-
from murfey.util.models import RsyncerInfo
15+
from murfey.util.models import RsyncerInfo, RsyncerSkippedFiles
1616

1717
logger = getLogger("murfey.server.api.prometheus")
1818

@@ -90,6 +90,15 @@ def increment_rsync_transferred_files_prometheus(
9090
).inc(rsyncer_info.data_bytes)
9191

9292

93+
@router.post("/visits/{visit_name}/increment_rsync_skipped_files_prometheus")
94+
def increment_rsync_skipped_files_prometheus(
95+
visit_name: str, rsyncer_skipped_files: RsyncerSkippedFiles, db=murfey_db
96+
):
97+
prom.skipped_files.labels(
98+
rsync_source=rsyncer_skipped_files.source, visit=visit_name
99+
).inc(rsyncer_skipped_files.increment_count)
100+
101+
93102
@router.post("/visits/{visit_name}/monitoring/{on}")
94103
def change_monitoring_status(visit_name: str, on: int):
95104
prom.monitoring_switch.labels(visit=visit_name)

src/murfey/server/api/session_control.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -224,18 +224,6 @@ def register_rsyncer(session_id: int, rsyncer_info: RsyncerInfo, db=murfey_db):
224224
db.add(rsync_instance)
225225
db.commit()
226226
db.close()
227-
prom.seen_files.labels(rsync_source=rsyncer_info.source, visit=visit_name)
228-
prom.seen_data_files.labels(rsync_source=rsyncer_info.source, visit=visit_name)
229-
prom.transferred_files.labels(rsync_source=rsyncer_info.source, visit=visit_name)
230-
prom.transferred_files_bytes.labels(
231-
rsync_source=rsyncer_info.source, visit=visit_name
232-
)
233-
prom.transferred_data_files.labels(
234-
rsync_source=rsyncer_info.source, visit=visit_name
235-
)
236-
prom.transferred_data_files_bytes.labels(
237-
rsync_source=rsyncer_info.source, visit=visit_name
238-
)
239227
prom.seen_files.labels(rsync_source=rsyncer_info.source, visit=visit_name).set(0)
240228
prom.transferred_files.labels(
241229
rsync_source=rsyncer_info.source, visit=visit_name
@@ -252,6 +240,7 @@ def register_rsyncer(session_id: int, rsyncer_info: RsyncerInfo, db=murfey_db):
252240
prom.transferred_data_files_bytes.labels(
253241
rsync_source=rsyncer_info.source, visit=visit_name
254242
).set(0)
243+
prom.skipped_files.labels(rsync_source=rsyncer_info.source, visit=visit_name).set(0)
255244
return rsyncer_info
256245

257246

src/murfey/server/api/shared.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ def remove_session_by_id(session_id: int, db):
7777
args=(ri.source, session.visit),
7878
label="transferred_data_file_bytes",
7979
)
80+
safe_run(
81+
prom.skipped_files.remove,
82+
args=(ri.source, session.visit),
83+
label="skipped_files",
84+
)
8085
collected_ids = db.exec(
8186
select(DataCollectionGroup, DataCollection, ProcessingJob)
8287
.where(DataCollectionGroup.session_id == session_id)

src/murfey/server/prometheus.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@
2424
["rsync_source", "visit"],
2525
)
2626

27+
skipped_files = Gauge(
28+
"skipped_files",
29+
"Number of files not transferred due to end time",
30+
["rsync_source", "visit"],
31+
)
32+
2733
preprocessed_movies = Counter(
2834
"preprocessed_movies",
2935
"Number of movies that have been preprocessed",

src/murfey/util/models.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ class RsyncerInfo(BaseModel):
7878
tag: str = ""
7979

8080

81+
class RsyncerSkippedFiles(BaseModel):
82+
source: str
83+
session_id: int
84+
increment_count: int = 1
85+
86+
8187
"""
8288
Single Particle Analysis
8389
========================

src/murfey/util/route_manifest.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,13 @@ murfey.server.api.prometheus.router:
642642
type: str
643643
methods:
644644
- POST
645+
- path: /prometheus/visits/{visit_name}/increment_rsync_skipped_files_prometheus
646+
function: increment_rsync_skipped_files_prometheus
647+
path_params:
648+
- name: visit_name
649+
type: str
650+
methods:
651+
- POST
645652
- path: /prometheus/visits/{visit_name}/monitoring/{on}
646653
function: change_monitoring_status
647654
path_params:

0 commit comments

Comments
 (0)