|
4 | 4 | from .dataclass import NonPositionalField
|
5 | 5 | from .operator import SourceOperator
|
6 | 6 | from .random_utils import new_random_generator
|
| 7 | +from .settings_utils import get_settings |
7 | 8 | from .stream import DynamicStream, MultiStream
|
8 | 9 | from .type_utils import isoftype
|
9 | 10 |
|
| 11 | +settings = get_settings() |
| 12 | + |
10 | 13 |
|
11 | 14 | class BaseFusion(SourceOperator):
|
12 | 15 | """BaseFusion operator that combines multiple multistreams into one.
|
@@ -75,26 +78,30 @@ def prepare(self):
|
75 | 78 |
|
76 | 79 | # flake8: noqa: C901
|
77 | 80 | def fusion_generator(self, split) -> Generator:
|
78 |
| - for origin_name, origin in self.named_subsets.items(): |
79 |
| - multi_stream = origin() |
80 |
| - if split not in multi_stream: |
81 |
| - continue |
82 |
| - emitted_from_this_split = 0 |
83 |
| - try: |
84 |
| - for instance in multi_stream[split]: |
85 |
| - if ( |
86 |
| - self.max_instances_per_subset is not None |
87 |
| - and emitted_from_this_split >= self.max_instances_per_subset |
88 |
| - ): |
89 |
| - break |
90 |
| - if isinstance(origin_name, str): |
91 |
| - if "subset" not in instance: |
92 |
| - instance["subset"] = [] |
93 |
| - instance["subset"].insert(0, origin_name) |
94 |
| - emitted_from_this_split += 1 |
95 |
| - yield instance |
96 |
| - except Exception as e: |
97 |
| - raise RuntimeError(f"Exception in subset: {origin_name}") from e |
| 81 | + with settings.context( |
| 82 | + disable_hf_datasets_cache=False, |
| 83 | + allow_unverified_code=True, |
| 84 | + ): |
| 85 | + for origin_name, origin in self.named_subsets.items(): |
| 86 | + multi_stream = origin() |
| 87 | + if split not in multi_stream: |
| 88 | + continue |
| 89 | + emitted_from_this_split = 0 |
| 90 | + try: |
| 91 | + for instance in multi_stream[split]: |
| 92 | + if ( |
| 93 | + self.max_instances_per_subset is not None |
| 94 | + and emitted_from_this_split >= self.max_instances_per_subset |
| 95 | + ): |
| 96 | + break |
| 97 | + if isinstance(origin_name, str): |
| 98 | + if "subset" not in instance: |
| 99 | + instance["subset"] = [] |
| 100 | + instance["subset"].insert(0, origin_name) |
| 101 | + emitted_from_this_split += 1 |
| 102 | + yield instance |
| 103 | + except Exception as e: |
| 104 | + raise RuntimeError(f"Exception in subset: {origin_name}") from e |
98 | 105 |
|
99 | 106 |
|
100 | 107 | class WeightedFusion(BaseFusion):
|
|
0 commit comments