Skip to content
Merged
144 changes: 124 additions & 20 deletions docs/modules/pipelines/pages/serialization.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,118 @@ src.mapUsingService(serviceFactory,
(formatter, tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp)));
```

== Compact Serialization

xref:serialization:compact-serialization.adoc[Compact serialization] provides an efficient,
schema-based serialization mechanism for your data objects. While Compact serialization
has the highest priority in Hazelcast's serialization service, there are important caveats
to understand when using it with Jet pipelines.

=== Java Serializable Takes Precedence for Lambdas and Functions

Pipeline definitions, including lambda expressions and function objects, are serialized
using standard Java serialization (`java.io.Serializable`). This is because the pipeline
definition itself must be sent to cluster members before the Hazelcast serialization
service is involved.

When an object implements `Serializable`, Java serialization is used directly and does
not delegate to Hazelcast's serialization service. This means:

* All fields of a `Serializable` object must also be `Serializable`.
* Any Hazelcast serializers registered for field types are not used during Java serialization.
* This applies for Compact serialization even if the field's class has a registered `CompactSerializer`.

=== Entry Processors and Captured Variables

This behavior is particularly relevant when using `Sinks.mapWithEntryProcessor()`.
This sink accepts `toEntryProcessorFn`, a function that creates an `EntryProcessor`.
Because `toEntryProcessorFn` is part of the pipeline definition, it is serialized with Java serialization.
Only variables captured by `toEntryProcessorFn` must be Java-serializable.

Other `IMap` sinks that accept lambdas follow the same captured variable rule,
but `Sinks.mapWithEntryProcessor()` adds one extra level of indirection because you pass
a factory for `EntryProcessor` instances.

The following examples omit unrelated method parameters for brevity:

```java
// No captured state
Sinks.mapWithEntryProcessor(MergeEntryProcessor::new);

// No captured state: OrderStatus is created inside the lambda
Sinks.mapWithEntryProcessor(() -> new MergeEntryProcessor(new OrderStatus("SHIPPED")));

// Captures mep, so MergeEntryProcessor must be Serializable
MergeEntryProcessor mep = new MergeEntryProcessor();
Sinks.mapWithEntryProcessor(() -> mep);

// Captures status, so OrderStatus must be Serializable
OrderStatus status = new OrderStatus("SHIPPED");
Sinks.mapWithEntryProcessor(() -> new MergeEntryProcessor(status));
```

This capture behavior is standard Java serialization behavior and is not Jet-specific.
The same rule applies when you use an `EntryProcessor` directly with the `IMap` API.

After `toEntryProcessorFn` runs, the created `EntryProcessor` instance is serialized for `IMap` execution.
At this stage, requirements depend on the `EntryProcessor` serialization strategy:

* If `MergeEntryProcessor` is serialized with Java serialization (for example by relying on `Serializable`), all its fields must be Java-serializable.
* If `MergeEntryProcessor` is serialized with Compact serialization, its fields can use Compact serializers.

Target members must also be able to deserialize `MergeEntryProcessor`.
Make this class available where an `IMap` deserializes it (for example member classpath or User Code Namespace), not only in Jet job resources.

The same applies to lambdas that capture Compact-serializable variables:

```java
// orderStatus has a CompactSerializer, but capturing it in a lambda
// requires it to also implement Serializable
OrderStatus status = new OrderStatus("SHIPPED");
pipeline.readFrom(source)
.filter(order -> order.getStatus().equals(status));
```

=== Using Compact Serialization with Pipelines

Compact and Java serialization can both be involved in the same pipeline, but at different points.

* For pipeline definition objects (for example lambdas and captured variables), Java serialization rules apply.
* For runtime data movement and storage, Compact serialization rules apply.
* If a class is used in both contexts, it may need both a Java serialization compatible form and a Compact serializer.

=== Compact Serializers and Jet Jobs

When using Compact serialization with Jet, keep the following limits and recommendations in mind:

* Unlike `StreamSerializer`, `CompactSerializer` is currently not supported for single-job registration with `JobConfig.registerSerializer()`.
* Explicit `CompactSerializer` implementations must be registered in member configuration.
* Avoid Zero Config Compact Serialization for classes attached to jobs, because repeated job classloading can lead to classloader leaks and `ClassCastException`.
* Prefer keeping Compact-serializable DTOs and related classes on the member classpath.

== Serialization of Data Types

The objects you store in xref:data-structures:distributed-data-structures.adoc[Hazelcast data structures] must be serializable.

Another case that requires serializable objects is sending computation
results between members, for example when grouping by key. To catch
results between members, for example, when grouping by key. To catch
serialization issues early on, we recommend using a 2-member local
cluster for development and testing.

Currently, Hazelcast supports 4 interfaces to serialize custom
Currently, Hazelcast supports five interfaces to serialize custom
types:

- link:https://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html[java.io.Serializable]
- link:https://docs.oracle.com/javase/8/docs/api/java/io/Externalizable.html[java.io.Externalizable]
- link:https://docs.hazelcast.org/docs/{os-version}/javadoc/com/hazelcast/nio/serialization/Portable.html[com.hazelcast.nio.serialization.Portable]
- link:https://docs.hazelcast.org/docs/{os-version}/javadoc/com/hazelcast/nio/serialization/StreamSerializer.html[com.hazelcast.nio.serialization.StreamSerializer]
- link:https://docs.hazelcast.org/docs/{os-version}/javadoc/com/hazelcast/nio/serialization/compact/CompactSerializer.html[com.hazelcast.nio.serialization.compact.CompactSerializer]

[CAUTION]
.Deprecation Notice for Portable Serialization
====
Portable Serialization has been deprecated. We recommend you use Compact Serialization as Portable Serialization will be removed as of version 7.0.
====

The following table provides a comparison between them to help you in
deciding which interface to use in your applications.
Expand All @@ -173,11 +269,15 @@ deciding which interface to use in your applications.
|StreamSerializer
|Fastest and lightest
|Requires implementation and registration

|CompactSerializer
|Faster and more space efficient than Serializable. Supports versioning and partial deserialization. Does not require implementation or registration
|Not as fast or lightweight as StreamSerializer
|===

Below you can find rough performance numbers you can expect when
employing each of those strategies. A straightforward benchmark that
continuously serializes and then deserializes this simple object:
serializes and then deserializes unique, pooled instances of this simple object:

```java
class Person {
Expand All @@ -191,29 +291,35 @@ class Person {
yields following throughputs:

```
# Processor: Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz
# VM version: JDK 13, OpenJDK 64-Bit Server VM, 13+33

Benchmark Mode Cnt Score Error Units
SerializationBenchmark.serializable thrpt 3 0.259 ± 0.087 ops/us
SerializationBenchmark.externalizable thrpt 3 0.846 ± 0.057 ops/us
SerializationBenchmark.portable thrpt 3 1.171 ± 0.539 ops/us
SerializationBenchmark.stream thrpt 3 4.828 ± 1.227 ops/us
# Processor: Intel(R) Core(TM) i9-12900H CPU @ 2.50GHz
# VM version: JDK 17.0.6, OpenJDK 64-Bit Server VM, 17.0.6+10-LTS

Benchmark Mode Cnt Score Error Units
SerializationBenchmark.serializable thrpt 10 2.080 ± 0.056 ops/us
SerializationBenchmark.externalizable thrpt 10 3.171 ± 0.056 ops/us
SerializationBenchmark.portable thrpt 10 3.330 ± 0.051 ops/us
SerializationBenchmark.compact_zeroConfig thrpt 10 5.337 ± 0.108 ops/us
SerializationBenchmark.compact_registered thrpt 10 8.108 ± 0.160 ops/us
SerializationBenchmark.stream thrpt 10 18.035 ± 0.322 ops/us
```

Here are the sizes of the serialized form by each serializer:
NOTE: `compact_zeroConfig` represents serialization using Zero Config Compact Serialization, while `compact_registered`
represents serialization using a serializer defined and registered with Compact.

Here are the sizes of the serialized form for the same data by each serializer:

```
Strategy Number of Bytes Overhead %
java.io.Serializable 162 523
java.io.Externalizable 87 234
com.hazelcast.nio.serialization.Portable 104 300
com.hazelcast.nio.serialization.StreamSerializer 26 0
java.io.Serializable 154 327.8
java.io.Externalizable 93 158.3
com.hazelcast.nio.serialization.Portable 114 216.7
com.hazelcast.nio.serialization.compact.CompactSerializer 50 38.9
com.hazelcast.nio.serialization.StreamSerializer 36 0.0
```

You can see that using plain `Serializable` can easily become a
bottleneck in your application, as even with this simple data type it's
more than an order of magnitude slower than other serialization options,
significantly slower than some of the other serialization options,
not to mention very wasteful with memory.

== Write a Custom Serializer
Expand Down Expand Up @@ -273,9 +379,7 @@ travel through the pipeline (over distributed DAG edges) and get
saved to snapshots.

Job-level serializers can also be used with xref:sources-sinks.adoc[sources and sinks] that use in-memory data structures. You can read from/write to a local
`Observable`, `IList`, `IMap` or `ICache`. We are working on adding the
ability to read from an `IMap` using a user-defined predicate and
projections, update an `IMap`, and read from `EventJournal`.
`Observable`, `IList`, `IMap` or `ICache`.

== Register a Serializer with the Hazelcast cluster

Expand Down
7 changes: 7 additions & 0 deletions docs/modules/serialization/pages/compact-serialization.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,13 @@ public class Employee implements Serializable {
}
----


NOTE: When using Jet pipelines, lambda expressions and certain pipeline components (such as
`EntryProcessor` implementations) are serialized using Java serialization, which does not
delegate to Hazelcast's serialization service. In these cases, captured objects must
implement `Serializable` even if they have Compact serializers registered. For details,
see xref:pipelines:serialization.adoc#using-compact-serialization-with-pipelines[Using Compact Serialization with Pipelines].

== Compact Serialization Binary Specification

The binary specification of compact serialization is publicly available at xref:ROOT:compact-binary-specification.adoc[this page].