|
20 | 20 | import json |
21 | 21 | from pathlib import Path |
22 | 22 | from typing import Any, Literal, TypeAlias |
| 23 | +from unittest.mock import mock_open, patch |
23 | 24 |
|
24 | 25 | from networkx.classes.digraph import DiGraph |
25 | 26 | import pytest |
26 | 27 |
|
27 | 28 | from CPAC.pipeline import nipype_pipeline_engine as pe |
28 | | -from CPAC.utils.test_resources import setup_test_wf |
29 | | -from CPAC.utils.utils import PE_DIRECTION, get_fmap_type |
30 | 29 | from CPAC.utils.datasource import ( |
31 | 30 | match_epi_fmaps, |
32 | 31 | match_epi_fmaps_function_node, |
33 | 32 | ) |
| 33 | +from CPAC.utils.test_resources import setup_test_wf |
| 34 | +from CPAC.utils.utils import ( |
| 35 | + get_fmap_build_info, |
| 36 | + get_fmap_metadata_at_build_time, |
| 37 | + get_fmap_type, |
| 38 | + PE_DIRECTION, |
| 39 | +) |
34 | 40 |
|
35 | 41 |
|
36 | 42 | @dataclass |
@@ -484,3 +490,192 @@ def test_get_fmap_type_real_world_examples() -> None: |
484 | 490 | # Real-world phase example (only required fields) |
485 | 491 | phase_metadata = {"EchoTime": 0.00746} |
486 | 492 | assert get_fmap_type(phase_metadata) == "phase" |
| 493 | + |
| 494 | + |
| 495 | +class TestGetFmapMetadataAtBuildTime: |
| 496 | + """Test get_fmap_metadata_at_build_time function.""" |
| 497 | + |
| 498 | + def test_missing_fmap_key(self): |
| 499 | + """Test when fieldmap key doesn't exist in sub_dict.""" |
| 500 | + sub_dict = {"fmap": {"other_key": {}}} |
| 501 | + result = get_fmap_metadata_at_build_time(sub_dict, "missing_key", "", "") |
| 502 | + assert result is None |
| 503 | + |
| 504 | + def test_missing_scan_parameters(self): |
| 505 | + """Test when scan_parameters field is missing.""" |
| 506 | + sub_dict = {"fmap": {"test_key": {"scan": "path/to/scan.nii.gz"}}} |
| 507 | + result = get_fmap_metadata_at_build_time(sub_dict, "test_key", "", "") |
| 508 | + assert result is None |
| 509 | + |
| 510 | + def test_direct_dict_metadata(self): |
| 511 | + """Test when metadata is provided as a direct dictionary.""" |
| 512 | + metadata = {"EchoTime1": 0.006, "EchoTime2": 0.007} |
| 513 | + sub_dict = {"fmap": {"test_key": {"scan_parameters": metadata}}} |
| 514 | + result = get_fmap_metadata_at_build_time(sub_dict, "test_key", "", "") |
| 515 | + assert result == metadata |
| 516 | + |
| 517 | + @patch("builtins.open", new_callable=mock_open, read_data='{"EchoTime": 0.006}') |
| 518 | + @patch("os.path.exists", return_value=True) |
| 519 | + def test_json_file_metadata(self, mock_exists, mock_file): |
| 520 | + """Test loading metadata from JSON file.""" |
| 521 | + sub_dict = {"fmap": {"test_key": {"scan_parameters": "/path/to/metadata.json"}}} |
| 522 | + result = get_fmap_metadata_at_build_time(sub_dict, "test_key", "", "") |
| 523 | + assert result == {"EchoTime": 0.006} |
| 524 | + mock_file.assert_called_once_with( |
| 525 | + "/path/to/metadata.json", "r", encoding="utf-8" |
| 526 | + ) |
| 527 | + |
| 528 | + @patch("os.path.exists", return_value=False) |
| 529 | + def test_nonexistent_file(self, mock_exists): |
| 530 | + """Test when JSON file doesn't exist.""" |
| 531 | + sub_dict = {"fmap": {"test_key": {"scan_parameters": "/nonexistent/file.json"}}} |
| 532 | + result = get_fmap_metadata_at_build_time(sub_dict, "test_key", "", "") |
| 533 | + assert result is None |
| 534 | + |
| 535 | + @patch("builtins.open", side_effect=json.JSONDecodeError("Invalid JSON", "", 0)) |
| 536 | + @patch("os.path.exists", return_value=True) |
| 537 | + def test_invalid_json(self, mock_exists, mock_file): |
| 538 | + """Test when JSON file contains invalid JSON.""" |
| 539 | + sub_dict = {"fmap": {"test_key": {"scan_parameters": "/path/to/invalid.json"}}} |
| 540 | + result = get_fmap_metadata_at_build_time(sub_dict, "test_key", "", "") |
| 541 | + assert result is None |
| 542 | + |
| 543 | + def test_non_json_file(self): |
| 544 | + """Test when file path doesn't end with .json.""" |
| 545 | + sub_dict = {"fmap": {"test_key": {"scan_parameters": "/path/to/file.txt"}}} |
| 546 | + result = get_fmap_metadata_at_build_time(sub_dict, "test_key", "", "") |
| 547 | + assert result is None |
| 548 | + |
| 549 | + def test_exception_handling(self): |
| 550 | + """Test general exception handling.""" |
| 551 | + sub_dict = {"fmap": {"test_key": {"scan_parameters": 123}}} # Invalid type |
| 552 | + result = get_fmap_metadata_at_build_time(sub_dict, "test_key", "", "") |
| 553 | + assert result is None |
| 554 | + |
| 555 | + |
| 556 | +class TestGetFmapBuildInfo: |
| 557 | + """Test get_fmap_build_info function.""" |
| 558 | + |
| 559 | + def test_none_metadata_raises_error(self): |
| 560 | + """Test that None metadata raises ValueError.""" |
| 561 | + with pytest.raises( |
| 562 | + ValueError, match="Fieldmap metadata dictionary is required" |
| 563 | + ): |
| 564 | + get_fmap_build_info(None) |
| 565 | + |
| 566 | + def test_empty_metadata_raises_error(self): |
| 567 | + """Test that empty metadata raises ValueError.""" |
| 568 | + with pytest.raises( |
| 569 | + ValueError, match="Fieldmap metadata dictionary is required" |
| 570 | + ): |
| 571 | + get_fmap_build_info({}) |
| 572 | + |
| 573 | + def test_unknown_fmap_type_raises_error(self): |
| 574 | + """Test that unknown fieldmap type raises ValueError.""" |
| 575 | + metadata = {"SomeUnknownField": "value"} |
| 576 | + with pytest.raises(ValueError, match="Could not determine fieldmap type"): |
| 577 | + get_fmap_build_info(metadata) |
| 578 | + |
| 579 | + def test_phase_fieldmap_info(self): |
| 580 | + """Test phase fieldmap build info.""" |
| 581 | + metadata = {"EchoTime": 0.006} |
| 582 | + result = get_fmap_build_info(metadata) |
| 583 | + expected = { |
| 584 | + "fmap_type": "phase", |
| 585 | + "needs_echo_times": True, |
| 586 | + "needs_phasediff_processing": True, |
| 587 | + "is_epi": False, |
| 588 | + } |
| 589 | + assert result == expected |
| 590 | + |
| 591 | + def test_phasediff_fieldmap_info(self): |
| 592 | + """Test phasediff fieldmap build info.""" |
| 593 | + metadata = {"EchoTime1": 0.006, "EchoTime2": 0.007} |
| 594 | + result = get_fmap_build_info(metadata) |
| 595 | + expected = { |
| 596 | + "fmap_type": "phasediff", |
| 597 | + "needs_echo_times": True, |
| 598 | + "needs_phasediff_processing": True, |
| 599 | + "is_epi": False, |
| 600 | + } |
| 601 | + assert result == expected |
| 602 | + |
| 603 | + def test_epi_fieldmap_info(self): |
| 604 | + """Test EPI fieldmap build info.""" |
| 605 | + metadata = {"PhaseEncodingDirection": "j-"} |
| 606 | + result = get_fmap_build_info(metadata) |
| 607 | + expected = { |
| 608 | + "fmap_type": "epi", |
| 609 | + "needs_echo_times": True, |
| 610 | + "needs_phasediff_processing": False, |
| 611 | + "is_epi": True, |
| 612 | + } |
| 613 | + assert result == expected |
| 614 | + |
| 615 | + @pytest.mark.parametrize( |
| 616 | + "metadata,expected_fmap_type", |
| 617 | + [ |
| 618 | + ({"EchoTime": 0.006}, "phase"), |
| 619 | + ({"EchoTime1": 0.006, "EchoTime2": 0.007}, "phasediff"), |
| 620 | + ({"PhaseEncodingDirection": "j-"}, "epi"), |
| 621 | + ], |
| 622 | + ) |
| 623 | + def test_various_fieldmap_types(self, metadata, expected_fmap_type): |
| 624 | + """Test that various fieldmap types are correctly identified.""" |
| 625 | + result = get_fmap_build_info(metadata) |
| 626 | + assert result["fmap_type"] == expected_fmap_type |
| 627 | + |
| 628 | + def test_real_world_metadata_examples(self): |
| 629 | + """Test with realistic metadata examples from the existing tests.""" |
| 630 | + # Use some of the test data from the existing test_get_fmap_type tests |
| 631 | + |
| 632 | + # Phasediff example |
| 633 | + phasediff_metadata = { |
| 634 | + "EchoTime1": 0.00600, |
| 635 | + "EchoTime2": 0.00746, |
| 636 | + "IntendedFor": ["bids::sub-01/func/sub-01_task-motor_bold.nii.gz"], |
| 637 | + } |
| 638 | + result = get_fmap_build_info(phasediff_metadata) |
| 639 | + assert result["fmap_type"] == "phasediff" |
| 640 | + assert result["needs_echo_times"] is True |
| 641 | + assert result["needs_phasediff_processing"] is True |
| 642 | + assert result["is_epi"] is False |
| 643 | + |
| 644 | + # EPI example |
| 645 | + epi_metadata = { |
| 646 | + "PhaseEncodingDirection": "j-", |
| 647 | + "TotalReadoutTime": 0.095, |
| 648 | + "IntendedFor": "bids::sub-01/func/sub-01_task-motor_bold.nii.gz", |
| 649 | + } |
| 650 | + result = get_fmap_build_info(epi_metadata) |
| 651 | + assert result["fmap_type"] == "epi" |
| 652 | + assert result["needs_echo_times"] is True |
| 653 | + assert result["needs_phasediff_processing"] is False |
| 654 | + assert result["is_epi"] is True |
| 655 | + |
| 656 | + def test_phase_fieldmap_with_extra_fields(self): |
| 657 | + """Test phase fieldmap with additional optional fields.""" |
| 658 | + metadata = { |
| 659 | + "EchoTime": 0.006, |
| 660 | + "IntendedFor": "bids::sub-01/func/sub-01_task-motor_bold.nii.gz", |
| 661 | + "B0FieldIdentifier": "my_fieldmap", |
| 662 | + } |
| 663 | + result = get_fmap_build_info(metadata) |
| 664 | + assert result["fmap_type"] == "phase" |
| 665 | + assert result["needs_echo_times"] is True |
| 666 | + assert result["needs_phasediff_processing"] is True |
| 667 | + assert result["is_epi"] is False |
| 668 | + |
| 669 | + def test_phasediff_fieldmap_with_extra_fields(self): |
| 670 | + """Test phasediff fieldmap with additional optional fields.""" |
| 671 | + metadata = { |
| 672 | + "EchoTime1": 0.006, |
| 673 | + "EchoTime2": 0.007, |
| 674 | + "IntendedFor": ["bids::sub-01/func/sub-01_task-motor_bold.nii.gz"], |
| 675 | + "B0FieldIdentifier": "my_phasediff", |
| 676 | + } |
| 677 | + result = get_fmap_build_info(metadata) |
| 678 | + assert result["fmap_type"] == "phasediff" |
| 679 | + assert result["needs_echo_times"] is True |
| 680 | + assert result["needs_phasediff_processing"] is True |
| 681 | + assert result["is_epi"] is False |
0 commit comments