Skip to content
This repository was archived by the owner on Jan 13, 2025. It is now read-only.
This repository was archived by the owner on Jan 13, 2025. It is now read-only.

Design session summary #1

@iravid

Description

@iravid

Purpose and requirements

The purpose of this library is to provide a solution for creating distributed stream processing applications. These applications read data from data sources that (optionally) support partitioning, in parallel, process them through collection-like operators and write them out to data sinks.

The library’s representation for processing plans will be modeled in a reified manner from day 1, allowing for introspection and optimization. The library will also allow to perform stateful computations in an efficient manner. State will be kept persistently per-partition or per-key (similar to how Flink handles state).

Deployment model, distribution and coordination

We started off by comparing two possible deployment models:

  • A master/worker model, in which the application jar is submitted to the cluster. The lifecycle of the cluster “outlives” the application: the cluster can host several applications, applications can be stopped and re-deployed, and so forth.
This model is used by Flink and Spark.
  • A homogenous model in which multiple JVMs are executing the same application jar. This model is used by Kafka Streams and Akka Cluster.

I noted that the first model is notoriously hard to get right and operate in modern deployment environment, such as Kubernetes. The problem is that it creates a “platform-within-a-platform”: you’ve got a K8s cluster running your applications, and on top of that you create a cluster that runs more applications.

On the other hand, with the homogenous model, multiple JVMs are running the same application jar: they start up, parse their configuration, bind an HTTP server, run other stuff. They all run the same API calls for creating the analytics plan. Imagine something like this being run on multiple nodes:

def main = 
  for {
    config <- parseConfig
    _      <- startHttpServer
    plan = CassandraSource("table").map(...).fold(...).to(S3Destination("bucket", "prefix"))
    _ <- analytics.execute(plan)
  } yield 0

When the nodes get to analytics.execute, they start a coordination process that does some sort of leader election, analysis of the plan and work distribution. With this model, we no longer have to worry about transferring code or closures between the master and the workers: all classes required to run the application are already present on all nodes.

So to summarize, we will be going for the homogenous, embedded library model. To clarify - this does not preclude creating an application that can receive new plans online! There’s no reason why the JVM nodes cannot expose an HTTP interface that can receive JSON representations of the plans and execute them. In fact, this is quite harder with the master/worker model, because the platform “runs us” vs. us running the cluster.

API concepts

We will follow up on the excellent work done on scalaz-analytics last year. The concept there is that users of the library do not embed arbitrary functions and closures into the plan, but rather write it in a specialized DSL. Following is a short excerpt; see github.com/scalaz/scalaz-analytics for the full details:

trait AnalyticsModule {
  // A description of a stream processing plan
  type DataStream[A]
  // An abstract arrow that represents a transformation between types in the plan
  type =>:[A, B]
  // A type that can be represented in the plan
  type Type[A]

  trait StdLib {
    def andThen[A, B, C](f: A =>: B, g: B =>: C): A =>: C
    // more operations
  }
}

If we limit all analytics plans to be written without use of actual Scala functions, and only use the abstract arrows for transformations, we guarantee full introspection and optimization opportunities for the plans.

Extensibility

Three things cannot be expressed with abstract arrows and the standard library from the analytics module:

  • Lookup functions: an operator in a plan that performs external IO and enriches records with data from an external data store;
  • Data sources: operators that actually produce data by performing external IO;
  • Data sinks: operators that write data to outside data stores by performing external IO.

For these, we will need to devise open traits that users will implement with a strict specification of how they will work. This remains to be fleshed out.

State and Time

One extremely useful concept that has been implemented in Flink is the pervasive use of state to implement stateful operators. Every operator in Flink can define persistent state that is local to either a partition of an operator or a key (in case the operator follows a groupBy operation). These states can then be accessed and mutated freely within the operator.

The state is not just some opaque value with get/set/modify operations; it is further reified into types such as ListState, MapState, AggregatingState and so forth. Reifying the state in this manner allows for efficient persistence and memory usage when storing the data in something like RocksDB. When accessing a MapState[K, V], for example, only the key which is being modified needs to be deserialized into memory.

It remains to be fleshed out how this will be solved, but most likely the standard library of operations will contain a state module with operations that pertain to state mutation and access.

For time handling, we need to support the installation of timers in operators to perform periodic work, and support for accessing the current time - both in terms of event time and processing time.

Next steps

I will soon(tm) work on a plan that will hopefully lead us to an MVP. Stay tuned for updates!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions