Skip to content

Commit

Permalink
Scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
caiocamatta-stripe committed Feb 27, 2024
1 parent c366012 commit aab3496
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
10 changes: 5 additions & 5 deletions flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed)
/**
* A Flink function that is responsible for converting an array of pre-aggregates (aka a tile) to a form
* that can be written out to the KV store (PutRequest object).
*
*
* @param groupByServingInfoParsed The GroupBy we are working with
* @tparam T The input data type
*/
Expand All @@ -146,7 +146,7 @@ case class TiledAvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParse
eventProcessingErrorCounter.inc()
avroConversionErrorCounter.inc()
}

def avroConvertTileToPutRequest(in: TimestampedTile): PutRequest = {
val tsMills = in.latestTsMillis

Expand All @@ -157,13 +157,13 @@ case class TiledAvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParse
val valueBytes = in.tileBytes

logger.debug(
s"""
s"""
|Avro converting tile to PutRequest - tile=${in}
|groupBy=${groupByServingInfoParsed.groupBy.getMetaData.getName} tsMills=$tsMills keys=$keys
|keyBytes=${java.util. Base64.getEncoder.encodeToString(keyBytes)}
|keyBytes=${java.util.Base64.getEncoder.encodeToString(keyBytes)}
|valueBytes=${java.util.Base64.getEncoder.encodeToString(valueBytes)}
|streamingDataset=$streamingDataset""".stripMargin
)
)

PutRequest(keyBytes, valueBytes, streamingDataset, Some(tsMills))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ class FlinkRowAggregationFunction(
// Given that the rowAggregator is transient, it may be null when a job is restored from a checkpoint
if (rowAggregator == null) {
logger.debug(
f"The Flink RowAggregator was null for groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills"
)
f"The Flink RowAggregator was null for groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills"
)
initializeRowAggregator()
}

logger.debug(
f"Flink pre-aggregates BEFORE adding new element: accumulatorIr=[${accumulatorIr.ir
.mkString(", ")}] groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element"
)
f"Flink pre-aggregates BEFORE adding new element: accumulatorIr=[${accumulatorIr.ir
.mkString(", ")}] groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element"
)

val partialAggregates = Try {
rowAggregator.update(accumulatorIr.ir, row)
Expand All @@ -89,9 +89,9 @@ class FlinkRowAggregationFunction(
partialAggregates match {
case Success(v) => {
logger.debug(
f"Flink pre-aggregates AFTER adding new element [${v.mkString(", ")}] " +
f"groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element"
)
f"Flink pre-aggregates AFTER adding new element [${v.mkString(", ")}] " +
f"groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element"
)
TimestampedIR(v, Some(tsMills))
}
case Failure(e) =>
Expand Down Expand Up @@ -186,12 +186,12 @@ class FlinkRowAggProcessFunction(
tileBytes match {
case Success(v) => {
logger.debug(
s"""
s"""
|Flink aggregator processed element irEntry=$irEntry
|tileBytes=${java.util.Base64.getEncoder.encodeToString(v)}
|windowEnd=$windowEnd groupBy=${groupBy.getMetaData.getName}
|keys=$keys isComplete=$isComplete tileAvroSchema=${tileCodec.tileAvroSchema}"""
)
)
// The timestamp should never be None here.
out.collect(TimestampedTile(keys, v, irEntry.latestTsMillis.get))
}
Expand Down

0 comments on commit aab3496

Please sign in to comment.