@@ -398,27 +398,18 @@ def field_segment(*decorator_args, **decorator_kwargs):
398398 set_as: The field name to append the result as
399399 """
400400 def decorator (func ):
401- class FieldSegment (AbstractSegment ):
401+ class FieldSegment (AbstractFieldSegment ):
402402 def __init__ (self , * init_args , ** init_kwargs ):
403- super ().__init__ ()
404403 merged_kwargs = {** decorator_kwargs , ** init_kwargs }
405- self .field = merged_kwargs .get ('field' )
406- self .set_as = merged_kwargs .get ('set_as' )
407- merged_kwargs .pop ('field' , None )
408- merged_kwargs .pop ('set_as' , None )
404+ field = merged_kwargs .pop ('field' , None )
405+ set_as = merged_kwargs .pop ('set_as' , None )
406+ super ().__init__ (field = field , set_as = set_as )
409407 self ._func = lambda x : func (x , * init_args , ** merged_kwargs )
410408 # Store reference to original function for documentation access
411409 self ._original_func = func
412410
413- def transform (self , input_iter ):
414- for item in input_iter :
415- value = data_manipulation .extract_property (item , self .field ) if self .field else item
416- result = self ._func (value )
417- if self .set_as :
418- item [self .set_as ] = result
419- yield item
420- else :
421- yield result
411+ def process_value (self , value ):
412+ return self ._func (value )
422413
423414 FieldSegment .__name__ = f"{ func .__name__ } FieldSegment"
424415 # Preserve original function's docstring and metadata
@@ -430,6 +421,56 @@ def transform(self, input_iter):
430421 return decorator (decorator_args [0 ])
431422 return decorator
432423
424+ class AbstractFieldSegment (AbstractSegment [T , U ]):
425+ """Abstract base class for segments that process a single field and optionally set results.
426+
427+ This class handles the 'field' and 'set_as' parameters that are commonly used
428+ in field-processing segments, making it easy for descendant classes to have
429+ their own constructors while still supporting field extraction and result setting.
430+
431+ Args:
432+ field: The field to extract from each item (optional)
433+ set_as: The field name to set/append the result as (optional)
434+ """
435+
436+ def __init__ (self , field : str = None , set_as : str = None ):
437+ super ().__init__ ()
438+ self .field = field
439+ self .set_as = set_as
440+
441+ @abstractmethod
442+ def process_value (self , value : Any ) -> Any :
443+ """Process the extracted field value or the entire item.
444+
445+ This method must be implemented by subclasses to define how to process
446+ the extracted field value (or entire item if no field is specified).
447+
448+ Args:
449+ value: The field value extracted from the item, or the entire item
450+ if no field was specified
451+
452+ Returns:
453+ Any: The processed result
454+ """
455+ pass
456+
457+ def transform (self , input_iter : Iterable [T ]) -> Iterator [U ]:
458+ """Transform input items by processing field values.
459+
460+ For each item:
461+ 1. Extract the specified field value (or use entire item if no field)
462+ 2. Process the value using process_value()
463+ 3. Either yield the result directly or set it on the item and yield the item
464+ """
465+ for item in input_iter :
466+ value = data_manipulation .extract_property (item , self .field ) if self .field else item
467+ result = self .process_value (value )
468+ if self .set_as :
469+ item [self .set_as ] = result
470+ yield item
471+ else :
472+ yield result
473+
433474class Pipeline (AbstractSegment ):
434475 """A pipeline is a sequence of operations. Each operation draws from the output of the previous operation
435476 and yields items to the next operation. The pipeline can be executed by calling it with an input iterator.
0 commit comments