Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/apis/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ To use Kafka Streams, add the following Maven dependency to your project:
When using Scala you may optionally include the `kafka-streams-scala` library. Additional documentation on using the Kafka Streams DSL for Scala is available [in the developer guide](/43/documentation/streams/developer-guide/dsl-api.html#scala-dsl).

To use Kafka Streams DSL for Scala 2.13, add the following Maven dependency to your project:


> **⚠️ DEPRECATION NOTICE**: The `kafka-streams-scala` library is deprecated as of Kafka 4.3
> and will be removed in Kafka 5.0. Please migrate to using the Java Streams API directly from Scala.
> See the [migration guide](/{version}/streams/developer-guide/scala-migration) for details.

<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
1 change: 1 addition & 0 deletions docs/getting-started/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type: docs
* Two new configs have been introduced: `group.coordinator.cached.buffer.max.bytes` and `share.coordinator.cached.buffer.max.bytes`. They allow the respective coordinators to set the maximum buffer size retained for reuse. For further details, please refer to [KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg).
* The new config have been introduced: `remote.log.metadata.topic.min.isr` with 2 as default value. You can correct the min.insync.replicas for the existed __remote_log_metadata topic via kafka-configs.sh if needed. For further details, please refer to [KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).
* The new config prefix `remote.log.metadata.admin.` has been introduced. It allows independent configuration of the admin client used by `TopicBasedRemoteLogMetadataManager`. For further details, please refer to [KIP-1208](https://cwiki.apache.org/confluence/x/vYqhFg).
* The `kafka-streams-scala` library is deprecated as of Kafka 4.3 and will be removed in Kafka 5.0. For further details, please refer to the [migration guide](/{version}/streams/developer-guide/scala-migration).

## Upgrading to 4.2.0

Expand Down
6 changes: 6 additions & 0 deletions docs/streams/developer-guide/dsl-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -5738,6 +5738,12 @@ Kafka Streams comes with a `test-utils` module to help you test your application

# Kafka Streams DSL for Scala

> **⚠️ DEPRECATION NOTICE**: The Kafka Streams DSL for Scala library (`kafka-streams-scala`) is
> **deprecated as of Kafka 4.3** and will be **removed in Kafka 5.0**.
>
> **See the [migration guide](/{version}/streams/developer-guide/scala-migration)** for instructions
> and code examples showing how to migrate from the Scala wrapper to the Java API.

The Kafka Streams DSL Java APIs are based on the Builder design pattern, which allows users to incrementally build the target functionality using lower level compositional fluent APIs. These APIs can be called from Scala, but there are several issues:

1. **Additional type annotations** \- The Java APIs use Java generics in a way that are not fully compatible with the type inferencer of the Scala compiler. Hence the user has to add type annotations to the Scala code, which seems rather non-idiomatic in Scala.
Expand Down
104 changes: 104 additions & 0 deletions docs/streams/developer-guide/scala-migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
---
title: Migrating from Streams Scala to Java API
description:
weight: 16
tags: ['kafka', 'docs']
aliases:
keywords:
type: docs
---

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

> **⚠️ DEPRECATION NOTICE**: The `kafka-streams-scala` library is deprecated as of Kafka 4.3
> and will be removed in Kafka 5.0. This guide will help you migrate your Scala applications
> to use the Java Streams API directly.
> For more information, see [KIP-1244](https://cwiki.apache.org/confluence/x/r4LMFw).

## Migration Overview

The Java Streams API works well from Scala with minimal adjustments. The main differences are:

1. **Use Java types directly** instead of Scala wrapper classes
2. **Configure Serdes explicitly** via `StreamsConfig` or pass them to methods

### Example: Word Count Application

#### Scala Wrapper Approach (Deprecated)

```scala
import java.util.Properties

import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.streams.scala.serialization.Serdes._

object WordCountScala extends App {
val props = new Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")

val builder = new StreamsBuilder // Scala wrapper
val textLines: KStream[String, String] = builder.stream[String, String]("input-topic")

val wordCounts: KTable[String, Long] = textLines
.flatMapValues(line => line.toLowerCase.split("\\W+"))
.groupBy((_, word) => word)
.count()

wordCounts.toStream.to("output-topic")

val streams = new KafkaStreams(builder.build(), props)
streams.start()
}
```

#### Java API Approach

```scala
import java.util.Properties

import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
import org.apache.kafka.streams.kstream.{KStream, KTable, Produced}
import org.apache.kafka.common.serialization.Serdes
import scala.jdk.CollectionConverters._

object WordCountJava extends App {
val props = new Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// Configure default serdes
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)

val builder = new StreamsBuilder // Java StreamsBuilder
val textLines = builder.stream[String, String]("input-topic")

val wordCounts = textLines
.flatMapValues(_.toLowerCase.split("\\W+"))
.groupBy((_, word) => word)
.count()

wordCounts.toStream.to("output-topic", Produced.`with`(Serdes.String(), Serdes.Long()))

val streams = new KafkaStreams(builder.build(), props)
streams.start()
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.jdk.CollectionConverters._
/**
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
*/
@deprecated("Use `org.apache.kafka.streams.StreamsBuilder` instead", "4.3.0")
class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.kafka.streams.scala.kstream

import org.apache.kafka.streams.kstream.{Branched => BranchedJ, KStream => KStreamJ}

@deprecated("Use `org.apache.kafka.streams.kstream.Branched` instead", "4.3.0")
object Branched {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import scala.jdk.CollectionConverters._
* @tparam K Type of keys
* @tparam V Type of values
*/
@deprecated("Use `org.apache.kafka.streams.kstream.BranchedKStream` instead", "4.3.0")
class BranchedKStream[K, V](val inner: BranchedKStreamJ[K, V]) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.{AggregatorFrom
* @param inner The underlying Java abstraction for CogroupedKStream
* @see `org.apache.kafka.streams.kstream.CogroupedKStream`
*/
@deprecated("Use `org.apache.kafka.streams.kstream.CogroupedKStream` instead", "4.3.0")
class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.kafka.streams.kstream.{Consumed => ConsumedJ}
import org.apache.kafka.streams.{AutoOffsetReset, Topology}
import org.apache.kafka.streams.processor.TimestampExtractor

@deprecated("Use `org.apache.kafka.streams.kstream.Consumed` instead", "4.3.0")
object Consumed {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Grouped => GroupedJ}

@deprecated("Use `org.apache.kafka.streams.kstream.Grouped` instead", "4.3.0")
object Grouped {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Joined => JoinedJ}

@deprecated("Use `org.apache.kafka.streams.kstream.Joined` instead", "4.3.0")
object Joined {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
* @param inner The underlying Java abstraction for KGroupedStream
* @see `org.apache.kafka.streams.kstream.KGroupedStream`
*/
@deprecated("Use `org.apache.kafka.streams.kstream.KGroupedStream` instead", "4.3.0")
class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
* @param inner The underlying Java abstraction for KGroupedTable
* @see `org.apache.kafka.streams.kstream.KGroupedTable`
*/
@deprecated("Use `org.apache.kafka.streams.kstream.KGroupedTable` instead", "4.3.0")
class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import scala.jdk.CollectionConverters._
* @see `org.apache.kafka.streams.kstream.KStream`
*/
//noinspection ScalaDeprecation
@deprecated("Use `org.apache.kafka.streams.kstream.KStream` instead", "4.3.0")
class KStream[K, V](val inner: KStreamJ[K, V]) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kafka.streams.state.KeyValueStore
* @param inner The underlying Java abstraction for KTable
* @see `org.apache.kafka.streams.kstream.KTable`
*/
@deprecated("Use `org.apache.kafka.streams.kstream.KTable` instead", "4.0.0")
class KTable[K, V](val inner: KTableJ[K, V]) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.StateStore
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, ByteArraySessionStore, ByteArrayWindowStore}
import org.apache.kafka.streams.state.{KeyValueBytesStoreSupplier, SessionBytesStoreSupplier, WindowBytesStoreSupplier}

@deprecated("Use `org.apache.kafka.streams.kstream.Materialized` instead", "4.3.0")
object Materialized {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Produced => ProducedJ}
import org.apache.kafka.streams.processor.StreamPartitioner

@deprecated("Use `org.apache.kafka.streams.kstream.Produced` instead", "4.3.0")
object Produced {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Repartitioned => RepartitionedJ}
import org.apache.kafka.streams.processor.StreamPartitioner

@deprecated("Use `org.apache.kafka.streams.kstream.Repartitioned` instead", "4.3.0")
object Repartitioned {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.{InitializerFro
* @param inner The underlying Java abstraction for SessionWindowedCogroupedKStream
* @see `org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream`
*/
@deprecated("Use `org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream` instead", "4.3.0")
class SessionWindowedCogroupedKStream[K, V](val inner: SessionWindowedCogroupedKStreamJ[K, V]) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
* @param inner The underlying Java abstraction for SessionWindowedKStream
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream`
*/
@deprecated("Use `org.apache.kafka.streams.kstream.SessionWindowedKStream` instead", "4.3.0")
class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{StreamJoined => StreamJoinedJ}
import org.apache.kafka.streams.state.WindowBytesStoreSupplier

@deprecated("Use `org.apache.kafka.streams.kstream.StreamJoined` instead", "4.3.0")
object StreamJoined {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.InitializerFrom
* @param inner The underlying Java abstraction for TimeWindowedCogroupedKStream
* @see `org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream`
*/
@deprecated("Use `org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream` instead", "4.3.0")
class TimeWindowedCogroupedKStream[K, V](val inner: TimeWindowedCogroupedKStreamJ[K, V]) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
* @param inner The underlying Java abstraction for TimeWindowedKStream
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream`
*/
@deprecated("Use `org.apache.kafka.streams.kstream.TimeWindowedKStream` instead", "4.3.0")
class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.UUID
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serdes => JSerdes, Serializer}
import org.apache.kafka.streams.kstream.WindowedSerdes

@deprecated("Use org.apache.kafka.common.serialization.Serdes instead", "3.0.0")
object Serdes extends LowPrioritySerdes {
implicit def stringSerde: Serde[String] = JSerdes.String()
implicit def longSerde: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
Expand Down