Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ object Dependencies {

val logback = "ch.qos.logback" % "logback-classic" % logbackVersion

val jspecify = "org.jspecify" % "jspecify" % "1.0.0" % Optional

object Docs {
val sprayJson = "io.spray" %% "spray-json" % "1.3.6" % Test
val gson = "com.google.code.gson" % "gson" % "2.13.2" % Test
Expand Down Expand Up @@ -352,7 +354,7 @@ object Dependencies {

// pekko stream

lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest)
lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, jspecify, TestDependencies.scalatest)

lazy val streamTestkit = l ++= Seq(
TestDependencies.scalatest,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Java API for Pekko Streams.
*
* <p>This package contains the Java DSL for Pekko Streams. For the Scala DSL see
* [[org.apache.pekko.stream.scaladsl]].
*/
@org.jspecify.annotations.NullMarked
package org.apache.pekko.stream.javadsl;
14 changes: 8 additions & 6 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
import pekko.util.ConstantFun
import pekko.util.Timeout

import org.jspecify.annotations.Nullable
import org.reactivestreams.Processor

object Flow {
Expand Down Expand Up @@ -3492,7 +3493,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* '''Cancels when''' downstream cancels
*/
def interleaveAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = {
import pekko.util.Collections._
Expand Down Expand Up @@ -3576,7 +3577,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* '''Cancels when''' downstream cancels
*/
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = {
import pekko.util.Collections._
val seq = if (those ne null) those.collectToImmutableSeq {
Expand Down Expand Up @@ -4318,7 +4319,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
def log(name: String, extract: function.Function[Out, Any], @Nullable log: LoggingAdapter)
: javadsl.Flow[In, Out, Mat] =
new Flow(delegate.log(name, e => extract.apply(e))(log))

/**
Expand Down Expand Up @@ -4359,7 +4361,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
def log(name: String, @Nullable log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)

/**
Expand Down Expand Up @@ -4406,7 +4408,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
name: String,
marker: function.Function[Out, LogMarker],
extract: function.Function[Out, Any],
log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
@Nullable log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log))

/**
Expand Down Expand Up @@ -4453,7 +4455,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def logWithMarker(
name: String,
marker: function.Function[Out, LogMarker],
log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
@Nullable log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import pekko.japi.{ function, Pair }
import pekko.stream._
import pekko.util.ConstantFun

import org.jspecify.annotations.Nullable

object FlowWithContext {

def create[In, Ctx](): FlowWithContext[In, Ctx, In, Ctx, pekko.NotUsed] =
Expand Down Expand Up @@ -304,7 +306,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
def log(
name: String,
extract: function.Function[Out, Any],
log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
@Nullable log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.log(name, e => extract.apply(e))(log))

/**
Expand All @@ -320,7 +322,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
*
* @see [[pekko.stream.javadsl.Flow.log]]
*/
def log(name: String, log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
def log(name: String, @Nullable log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)

/**
Expand All @@ -340,7 +342,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
name: String,
marker: function.Function2[Out, CtxOut, LogMarker],
extract: function.Function[Out, Any],
log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
@Nullable log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e => extract.apply(e))(log))

/**
Expand All @@ -362,7 +364,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
def logWithMarker(
name: String,
marker: function.Function2[Out, CtxOut, LogMarker],
log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
@Nullable log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import pekko.stream.impl.LinearTraversalBuilder
import pekko.stream.scaladsl.SinkToCompletionStage
import pekko.util.ConstantFun.scalaAnyToUnit

import org.jspecify.annotations.Nullable
import org.reactivestreams.{ Publisher, Subscriber }

/** Java API */
Expand Down Expand Up @@ -435,7 +436,7 @@ object Sink {
def combine[T, U](
output1: Sink[U, _],
output2: Sink[U, _],
rest: java.util.List[Sink[U, _]],
@Nullable rest: java.util.List[Sink[U, _]],
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
: Sink[T, NotUsed] = {
import scala.jdk.CollectionConverters._
Expand All @@ -462,7 +463,7 @@ object Sink {
* @since 1.1.0
*/
def combine[T, U, M](
sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
@Nullable sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
: Sink[T, java.util.List[M]] = {
import pekko.util.Collections._
Expand Down
23 changes: 12 additions & 11 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
import pekko.util._

import org.jspecify.annotations.Nullable
import org.reactivestreams.{ Publisher, Subscriber }

/** Java API */
Expand Down Expand Up @@ -547,7 +548,7 @@ object Source {
def combine[T, U](
first: Source[T, _ <: Any],
second: Source[T, _ <: Any],
rest: java.util.List[Source[T, _ <: Any]],
@Nullable rest: java.util.List[Source[T, _ <: Any]],
fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, NotUsed] = {
import scala.jdk.CollectionConverters._
Expand All @@ -573,7 +574,7 @@ object Source {
* @since 1.1.0
*/
def combine[T, U, M](
sources: java.util.List[_ <: Graph[SourceShape[T], M]],
@Nullable sources: java.util.List[_ <: Graph[SourceShape[T], M]],
fanInStrategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, java.util.List[M]] = {
import pekko.util.Collections._
Expand All @@ -588,7 +589,7 @@ object Source {
/**
* Combine the elements of multiple streams into a stream of lists.
*/
def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = {
def zipN[T](@Nullable sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = {
import scala.jdk.CollectionConverters._
val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else immutable.Seq()
new Source(scaladsl.Source.zipN(seq).map(_.asJava))
Expand All @@ -599,7 +600,7 @@ object Source {
*/
def zipWithN[T, O](
zipper: function.Function[java.util.List[T], O],
sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
@Nullable sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
import scala.jdk.CollectionConverters._
val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else immutable.Seq()
new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq))
Expand Down Expand Up @@ -847,7 +848,7 @@ object Source {
* '''Cancels when''' downstream cancels
*/
def mergePrioritizedN[T](
sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]],
@Nullable sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]],
eagerComplete: Boolean): javadsl.Source[T, NotUsed] = {
import scala.jdk.CollectionConverters._
val seq =
Expand Down Expand Up @@ -1628,7 +1629,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* '''Cancels when''' downstream cancels
*/
def interleaveAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Source[Out, Mat] = {
import pekko.util.Collections._
Expand Down Expand Up @@ -1710,7 +1711,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* '''Cancels when''' downstream cancels
*/
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Source[Out, Mat] = {
import pekko.util.Collections._
val seq = if (those ne null) those.collectToImmutableSeq {
Expand Down Expand Up @@ -4811,7 +4812,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): javadsl.Source[Out, Mat] =
def log(name: String, extract: function.Function[Out, Any], @Nullable log: LoggingAdapter): javadsl.Source[Out, Mat] =
new Source(delegate.log(name, e => extract.apply(e))(log))

/**
Expand Down Expand Up @@ -4852,7 +4853,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, log: LoggingAdapter): javadsl.Source[Out, Mat] =
def log(name: String, @Nullable log: LoggingAdapter): javadsl.Source[Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)

/**
Expand Down Expand Up @@ -4899,7 +4900,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
name: String,
marker: function.Function[Out, LogMarker],
extract: function.Function[Out, Any],
log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
@Nullable log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
new Source(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log))

/**
Expand Down Expand Up @@ -4946,7 +4947,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def logWithMarker(
name: String,
marker: function.Function[Out, LogMarker],
log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
@Nullable log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import pekko.japi.function
import pekko.stream._
import pekko.util.ConstantFun

import org.jspecify.annotations.Nullable

object SourceWithContext {

/**
Expand Down Expand Up @@ -290,7 +292,8 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
*
* @see [[pekko.stream.javadsl.Source.log]]
*/
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
def log(name: String, extract: function.Function[Out, Any], @Nullable log: LoggingAdapter)
: SourceWithContext[Out, Ctx, Mat] =
viaScala(_.log(name, e => extract.apply(e))(log))

/**
Expand All @@ -306,7 +309,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
*
* @see [[pekko.stream.javadsl.Flow.log]]
*/
def log(name: String, log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
def log(name: String, @Nullable log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)

/**
Expand All @@ -326,7 +329,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
name: String,
marker: function.Function2[Out, Ctx, LogMarker],
extract: function.Function[Out, Any],
log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
@Nullable log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e => extract.apply(e))(log))

/**
Expand All @@ -348,7 +351,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
def logWithMarker(
name: String,
marker: function.Function2[Out, Ctx, LogMarker],
log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
@Nullable log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
import pekko.util.ConstantFun

import org.jspecify.annotations.Nullable

object SubFlow {

/**
Expand Down Expand Up @@ -2366,7 +2368,7 @@ final class SubFlow[In, Out, Mat](
* '''Cancels when''' downstream cancels
*/
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): SubFlow[In, Out, Mat] = {
import pekko.util.Collections._
val seq = if (those ne null) those.collectToImmutableSeq {
Expand Down Expand Up @@ -2424,7 +2426,7 @@ final class SubFlow[In, Out, Mat](
* '''Cancels when''' downstream cancels
*/
def interleaveAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): SubFlow[In, Out, Mat] = {
import pekko.util.Collections._
Expand Down Expand Up @@ -2916,7 +2918,7 @@ final class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): SubFlow[In, Out, Mat] =
def log(name: String, extract: function.Function[Out, Any], @Nullable log: LoggingAdapter): SubFlow[In, Out, Mat] =
new SubFlow(delegate.log(name, e => extract.apply(e))(log))

/**
Expand Down Expand Up @@ -2957,7 +2959,7 @@ final class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, log: LoggingAdapter): SubFlow[In, Out, Mat] =
def log(name: String, @Nullable log: LoggingAdapter): SubFlow[In, Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)

/**
Expand Down Expand Up @@ -3004,7 +3006,7 @@ final class SubFlow[In, Out, Mat](
name: String,
marker: function.Function[Out, LogMarker],
extract: function.Function[Out, Any],
log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
@Nullable log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
new SubFlow(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log))

/**
Expand Down Expand Up @@ -3051,7 +3053,7 @@ final class SubFlow[In, Out, Mat](
def logWithMarker(
name: String,
marker: function.Function[Out, LogMarker],
log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
@Nullable log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)

/**
Expand Down
Loading