Skip to content

Dataflow Listener (bigger project) #55

Open
@davidpablocohn

Description

@davidpablocohn

This would be an alternative to the Listener class in logger/listener/listener.py. The current listener implements a parallel-reader, serial-transform, parallel-writer processing model that is inefficient in terms of not allowing computed values to be used in different processing streams. A pure dataflow model would consist of a set of nodes, each of which defined an input queue, a function (whether it be reader/transform/writer/whatever) and a list of nodes into whose queues it should place its result.

In addition to a stand-alone dataflow runner to parallel listen.py, we would enhance the config file syntax so that run_logger.py could recognize, parse and execute dataflow configs as well as the existing listener configs.

Depending on design (or invocation), the processing of dataflow each node could be a separate thread, or even process (because the multiprocessing module supports interprocess Queues) with an abstract wrapper that looked something like what's below:

# An very-initial first sketch of dataflow
class DataflowNode:

  def __init__(self,
               name='',               # convenient display name
               component=None,        # the reader/transform/writer itself
               method=None,           # the method we call to process input
               use_input_queue=False, # does method take input from queue?  
               extra_args=None,       # additional args the method might need
               threadsafe=False,      # is the method threadsafe?
               max_queue_size=0):     # how big to let input queue get
    pass

  def successors(self):
    pass

  def add_successor(self, node):
    pass

  def push_queue(self, record):
    """Externally-visible method to add a record to this node's input queue."""
    pass

  def run(self):
    """Start the dataflow loop."""
    pass
  
  def quit(self):
    """Shut down nicely."""
    pass

  ############################
  def _generate(self):
    """Get next record from queue (if necessary) and do whatever we need
    to generate our next output record."""
    pass
  
  def _pop_queue(self):
    """Internal method to get the next record from our input queue."""
    pass

  def _push_to_successors(self, record):
    pass

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions