Skip to content

Commit 483bc90

Browse files
authored
Merge pull request #162 from mraspaud/fix-remote-segments
Ensure filesystem is passed for remote file checks
2 parents baf6385 + b74ab9f commit 483bc90

5 files changed

Lines changed: 79 additions & 24 deletions

File tree

pytroll_collectors/harvest_EUM_schedules.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
# You should have received a copy of the GNU General Public License
2222
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2323

24-
"""Harvest EUM schedules. Download schedules from EUM and parse to limit gatherer"""
24+
"""Harvest EUM schedules. Download schedules from EUM and parse to limit gatherer."""
2525

2626
import re
2727
import os
@@ -94,13 +94,14 @@ def _generate_pass_list_file_name(params, save_basename, eum_base_url):
9494
else:
9595
logger.error("sensor not given in params in granule_metadata. Can not continue.")
9696
return (None, None)
97-
eum_url = EUM_BASE_URL + pass_list_file
97+
eum_url = eum_base_url + pass_list_file
9898
save_file = os.path.join(save_basename, pass_list_file)
9999
logger.debug("Pass list save file, %s", save_file)
100100
return eum_url, save_file
101101

102102

103103
def harvest_schedules(params, save_basename=None, eum_base_url=EUM_BASE_URL):
104+
"""Harvest schedules."""
104105
if save_basename is None:
105106
save_basename = tempfile.gettempdir()
106107
logger.debug("harvest_schedules params: %s", params)

pytroll_collectors/region_collector.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,7 @@
2828

2929
from pyresample import parse_area_file
3030

31-
try:
32-
from trollsched.satpass import Pass
33-
except ImportError:
34-
Pass = None
31+
from trollsched.satpass import Pass
3532

3633
import logging
3734

pytroll_collectors/segments.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class Status(Enum):
6666
SLOT_OBSOLETE_TIMEOUT = 4
6767

6868

69-
DO_NOT_COPY_KEYS = ("uid", "uri", "channel_name", "segment", "sensor")
69+
DO_NOT_COPY_KEYS = ("uid", "uri", "channel_name", "segment", "sensor", "filesystem", "path")
7070
REMOVE_TAGS = {'path', 'segment'}
7171

7272

@@ -397,7 +397,12 @@ def _add_message_to_metadata(self, message):
397397
def _add_file_info_to_metadata(self, metadata, message):
398398
msg_data = message.message_data
399399
if message.type == 'file':
400-
metadata['dataset'].append({'uri': msg_data['uri'], 'uid': msg_data['uid']})
400+
mda = {'uri': msg_data['uri'], 'uid': msg_data['uid']}
401+
if "filesystem" in msg_data:
402+
mda["filesystem"] = msg_data["filesystem"]
403+
if "path" in msg_data:
404+
mda["path"] = msg_data["path"]
405+
metadata['dataset'].append(mda)
401406
elif message.type == 'dataset':
402407
metadata['dataset'].extend(message.message_data['dataset'])
403408
else:
@@ -898,11 +903,11 @@ def _add_existing_files_to_slot(self, slot, fnames, message):
898903
def _get_existing_files_from_message(message):
899904
mask = message.pattern.parser.globify({})
900905
url_parts = urlparse(message.message_data["uri"])
906+
storage_options = message.message_data.get("filesystem")
907+
return _fsspec_glob(url_parts, mask, storage_options)
901908

902-
return _fsspec_glob(url_parts, mask)
903909

904-
905-
def _fsspec_glob(url_parts, mask):
910+
def _fsspec_glob(url_parts, mask, storage_options):
906911
import fsspec
907912

908913
pattern = urlunparse(
@@ -915,8 +920,8 @@ def _fsspec_glob(url_parts, mask):
915920
''
916921
)
917922
)
918-
919-
fs_ = fsspec.filesystem(url_parts.scheme)
923+
storage_options = storage_options or dict()
924+
fs_ = fsspec.filesystem(url_parts.scheme, **storage_options)
920925
files = fs_.glob(pattern)
921926
# There might be no scheme in the returned filenames, so add it if scheme is defined
922927
if url_parts.scheme:

pytroll_collectors/tests/test_region_collector.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
_granule_metadata_metop_b = {"platform_name": "Metop-B",
6868
"sensor": "avhrr"}
6969

70+
7071
def granule_metadata(s_min):
7172
"""Return common granule_metadata dictionary."""
7273
return {**_granule_metadata,
@@ -125,17 +126,29 @@ def europe_collector_schedule_cut_custom_method_failed(europe, schedule_cut=True
125126
return RegionCollector(europe, schedule_cut=schedule_cut, schedule_cut_method=schedule_cut_method)
126127

127128

128-
def _fakeopen(url):
129+
def _fakeopen_celestrak(url):
129130
return io.BytesIO(tles)
130131

131132

133+
ears_avhrr_pass_predictions = b"""EARS-AVHRR Pass Predictions
134+
2025/06/19 05:01:28
135+
regionBeginEumetsat,regionEndEumetsat,Satellite
136+
2025-06-19 00:20,2025-06-19 00:37,metopc
137+
2025-06-19 01:10,2025-06-19 01:28,metopb
138+
"""
139+
140+
141+
def _fakeopen_eum(url):
142+
return io.BytesIO(ears_avhrr_pass_predictions)
143+
144+
132145
def test_init(europe):
133146
"""Test that initialisation appears to work."""
134147
from pytroll_collectors.region_collector import RegionCollector
135148
RegionCollector(europe)
136149

137150

138-
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen)
151+
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen_celestrak)
139152
def test_collect(europe_collector, caplog):
140153
"""Test that granules can be collected."""
141154
with caplog.at_level(logging.DEBUG):
@@ -150,7 +163,7 @@ def test_collect(europe_collector, caplog):
150163
assert "Granule file://18 is not overlapping euro_ma"
151164

152165

153-
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen)
166+
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen_celestrak)
154167
def test_collect_duration(europe):
155168
"""Test with tle_platform_name, without end_time, using call syntax."""
156169
from pytroll_collectors.region_collector import RegionCollector
@@ -165,7 +178,8 @@ def test_collect_duration(europe):
165178
alt_europe_collector(granule_metadata)
166179

167180

168-
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen)
181+
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen_celestrak)
182+
@unittest.mock.patch("pytroll_collectors.harvest_EUM_schedules.urlopen", new=_fakeopen_eum)
169183
def test_collect_check_schedules(europe_collector_schedule_cut, caplog):
170184
"""Test default schedule cut method."""
171185
with caplog.at_level(logging.DEBUG):
@@ -180,7 +194,7 @@ def test_collect_check_schedules(europe_collector_schedule_cut, caplog):
180194
assert "harvest_EUM_schedules.py'>, with type <class 'module'>" in caplog.text
181195

182196

183-
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen)
197+
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen_celestrak)
184198
def test_collect_check_schedules_custom_method(europe_collector_schedule_cut_custom_method, caplog):
185199
"""Test custom schedule cut method."""
186200
with caplog.at_level(logging.DEBUG):
@@ -196,7 +210,7 @@ def test_collect_check_schedules_custom_method(europe_collector_schedule_cut_cus
196210
assert "test_region_collector.py'>, with type <class 'module'>" in caplog.text
197211

198212

199-
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen)
213+
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen_celestrak)
200214
def test_collect_check_schedules_custom_method_failed(europe_collector_schedule_cut_custom_method_failed, caplog):
201215
"""Test custom schedule cut method failed import."""
202216
with caplog.at_level(logging.DEBUG):
@@ -208,16 +222,16 @@ def test_collect_check_schedules_custom_method_failed(europe_collector_schedule_
208222
assert test_string in caplog.text
209223

210224

211-
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen)
225+
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen_celestrak)
212226
def test_collect_missing_tle_from_file(europe_collector, caplog):
213-
"""Test that granules can be collected, but missing TLE raises and exception"""
227+
"""Test that granules can be collected, but missing TLE raises and exception."""
214228
with caplog.at_level(logging.DEBUG):
215229
for s_min in (0, 3, 6, 9, 12, 15, 18):
216230
with pytest.raises(KeyError):
217231
europe_collector.collect({**granule_metadata_metop_b(s_min)})
218232

219233

220-
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen)
234+
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen_celestrak)
221235
def test_adjust_timeout(europe, caplog):
222236
"""Test timeout adjustment."""
223237
from pytroll_collectors.region_collector import RegionCollector
@@ -241,7 +255,7 @@ def test_adjust_timeout(europe, caplog):
241255

242256

243257
@pytest.mark.skip(reason="test never finishes")
244-
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen)
258+
@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen_celestrak)
245259
def test_faulty_end_time(europe_collector, caplog):
246260
"""Test adapting if end_time before start_time."""
247261
granule_metadata = {

pytroll_collectors/tests/test_segments.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1455,7 +1455,7 @@ def test_floor_grouping_does_not_affect_other_pattern(self):
14551455
FCI_CONFIG = """patterns:
14561456
fci_nc:
14571457
pattern:
1458-
"W_XX-EUMETSAT-Darmstadt,IMG+SAT,{platform_name:4s}+FCI-1C-RRAD-FDHSI-FD--CHK-{segment_type}---NC4E_C_EUMT_{processing_time}_GTT_DEV_{start_time:%Y%m%d%H%M%S}_{end_time:%Y%m%d%H%M%S}_N_{special_compression}_T_{repeat_cycle_in_day:>04d}_{segment:0>4s}.nc"
1458+
"W_XX-EUMETSAT-Darmstadt,IMG+SAT,{platform_name:4s}+FCI-1C-RRAD-FDHSI-FD--CHK-{segment_type}--{locality}-NC4E_C_EUMT_{processing_time}_{processor}_{processor_status}_{start_time:%Y%m%d%H%M%S}_{end_time:%Y%m%d%H%M%S}_N_{special_compression}_{service_status}_{repeat_cycle_in_day:>04d}_{segment:0>4s}.nc"
14591459
critical_files:
14601460
wanted_files: :0001-0040
14611461
all_files: :0001-0040
@@ -1503,3 +1503,41 @@ def test_process_message_fci_with_nonzero_seconds(self, caplog):
15031503
assert len(slot.output_metadata["dataset"]) == 2
15041504
uids = set(info["uid"] for info in slot.output_metadata["dataset"])
15051505
assert uids == expected_uids
1506+
1507+
1508+
fci_remote = ('pytroll://1b/FCI-segment/FDHSI file user@collecting.host.se 2025-06-26T07:39:40.483302+00:00 v1.2 application/json {"sensor": "fci", "filesystem": {"cls": "fsspec.implementations.sftp:SFTPFileSystem", "protocol": "sftp", "args": [], "host": "receiving_host"}, "uri": "ssh:///local_disk/tellicast/received/TER-1/T01-MTG-1/W_XX-EUMETSAT-Darmstadt,IMG+SAT,MTI1+FCI-1C-RRAD-FDHSI-FD--CHK-BODY--DIS-NC4E_C_EUMT_20250626073728_IDPFI_OPE_20250626073332_20250626073411_N_JLS_O_0046_0017.nc", "path": "/local_disk/tellicast/received/TER-1/T01-MTG-1/W_XX-EUMETSAT-Darmstadt,IMG+SAT,MTI1+FCI-1C-RRAD-FDHSI-FD--CHK-BODY--DIS-NC4E_C_EUMT_20250626073728_IDPFI_OPE_20250626073332_20250626073411_N_JLS_O_0046_0017.nc", "uid": "W_XX-EUMETSAT-Darmstadt,IMG+SAT,MTI1+FCI-1C-RRAD-FDHSI-FD--CHK-BODY--DIS-NC4E_C_EUMT_20250626073728_IDPFI_OPE_20250626073332_20250626073411_N_JLS_O_0046_0017.nc", "pflag": "W", "location_indicator": "XX-EUMETSAT-Darmstadt", "data_designator": "IMG+SAT", "spacecraft_id": 1, "data_source": "FCI", "coverage": "FD", "subsetting": "", "component1": "CHK", "component3": "", "purpose": "DIS", "format": "NC4E", "oflag": "C", "originator": "EUMT", "processing_time": "2025-06-26T07:37:28", "facility_or_tool": "IDPFI", "environment": "OPE", "start_time": "2025-06-26T07:33:32", "end_time": "2025-06-26T07:34:11", "processing_mode": "N", "special_compression": "JLS", "disposition_mode": "O", "repeat_cycle_in_day": 46, "count_in_repeat_cycle": 17}') # noqa
1509+
1510+
1511+
FCI_CONFIG_WITH_CHECK_ON_START = FCI_CONFIG + "\ncheck_existing_files_after_start: true\n"
1512+
1513+
1514+
@patch("fsspec.filesystem")
1515+
def test_remote_file_with_filesystem_passes_filesystem_info(filesystem):
1516+
"""Test that remote filesystem info is passed from message when available."""
1517+
from posttroll.message import Message as Message_p
1518+
import yaml
1519+
1520+
segment_gatherer = SegmentGatherer(yaml.safe_load(FCI_CONFIG_WITH_CHECK_ON_START))
1521+
1522+
expected_uids = set()
1523+
1524+
for fci_message in [fci_remote]:
1525+
fci_msg = Message_p(rawstr=fci_message)
1526+
expected_uids.add(fci_msg.data['uid'])
1527+
msg = FakeMessage(fci_msg.data)
1528+
segment_gatherer.process(msg)
1529+
1530+
timestamp = '2025-06-26 07:30:00'
1531+
print(segment_gatherer.slots)
1532+
assert timestamp in segment_gatherer.slots
1533+
storage_options = dict(cls="fsspec.implementations.sftp:SFTPFileSystem",
1534+
protocol="sftp",
1535+
args=[],
1536+
host="receiving_host")
1537+
filesystem.assert_called_once_with("ssh", **storage_options)
1538+
1539+
assert segment_gatherer.slots[timestamp].output_metadata["dataset"][0]["filesystem"] == storage_options
1540+
assert "filesystem" not in segment_gatherer.slots[timestamp].output_metadata
1541+
path = "/local_disk/tellicast/received/TER-1/T01-MTG-1/W_XX-EUMETSAT-Darmstadt,IMG+SAT,MTI1+FCI-1C-RRAD-FDHSI-FD--CHK-BODY--DIS-NC4E_C_EUMT_20250626073728_IDPFI_OPE_20250626073332_20250626073411_N_JLS_O_0046_0017.nc" # noqa
1542+
assert segment_gatherer.slots[timestamp].output_metadata["dataset"][0]["path"] == path
1543+
assert "path" not in segment_gatherer.slots[timestamp].output_metadata

0 commit comments

Comments
 (0)