Skip to content

Commit 7e6e823

Browse files
authored
Retain directories after post-processing (lensesio#197)
* Retain directories after post-processing Customers have noticed that when the Lenses S3 Source Connector deletes files on S3, then the entire path is deleted. Whilst this is actually due to the way S3 works, we can actually do something about this in the connector. This adds a new boolean KCQL property to the S3 source: `post.process.action.retain.dirs` (default value: `false`) If this is set to `true`, then upon moving/deleting files within the source post processing, then first a zero-byte object will be created to ensure that the path will still be represented on S3.
1 parent c3920a3 commit 7e6e823

File tree

13 files changed

+240
-21
lines changed

13 files changed

+240
-21
lines changed

kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class AwsS3StorageInterface(
7676
.contents()
7777
.asScala
7878
.filterNot(AwsS3StorageFilter.filterOut)
79+
.filter(_.size() > 0)
7980
.map(o => S3FileMetadata(o.key(), o.lastModified()))
8081
.filter(md => extensionFilter.forall(_.filter(md)))
8182

@@ -123,9 +124,11 @@ class AwsS3StorageInterface(
123124
bucket,
124125
prefix,
125126
pagReq.iterator().asScala.flatMap(
126-
_.contents().asScala.filterNot(AwsS3StorageFilter.filterOut).toSeq.map(o =>
127-
S3FileMetadata(o.key(), o.lastModified()),
128-
).filter(md => extensionFilter.forall(_.filter(md))),
127+
_.contents().asScala.filterNot(AwsS3StorageFilter.filterOut)
128+
.filter(_.size() > 0)
129+
.toSeq.map(o => S3FileMetadata(o.key(), o.lastModified())).filter(md =>
130+
extensionFilter.forall(_.filter(md)),
131+
),
129132
).toSeq,
130133
)
131134
}.toEither.leftMap {
@@ -325,4 +328,35 @@ class AwsS3StorageInterface(
325328
}
326329
}
327330

331+
/**
332+
* Creates a directory in the specified S3 bucket if it does not already exist.
333+
*
334+
* @param bucket The name of the S3 bucket.
335+
* @param path The path of the directory to create.
336+
* @return Either a FileCreateError if the directory could not be created,
337+
* or Unit if the directory was created successfully or already exists.
338+
*/
339+
override def createDirectoryIfNotExists(bucket: String, path: String): Either[FileCreateError, Unit] = Try {
340+
def ensureEndsWithSlash(input: String): String =
341+
if (input.endsWith("/")) input else input + "/"
342+
343+
val putObjectRequest = PutObjectRequest
344+
.builder()
345+
.ifNoneMatch("*")
346+
.bucket(bucket)
347+
.key(ensureEndsWithSlash(path))
348+
.contentLength(0)
349+
.build()
350+
351+
s3Client.putObject(putObjectRequest, RequestBody.empty())
352+
}
353+
.toEither
354+
.void
355+
// If the object already exists, the "ifNoneMatch" condition will fail, triggering this recovery clause
356+
.recover {
357+
case ex: S3Exception if "PreconditionFailed".equals(ex.awsErrorDetails().errorCode()) =>
358+
()
359+
}
360+
.leftMap(ex => FileCreateError(ex, "empty object file"))
361+
328362
}

kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterface.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,4 +237,6 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak
237237
Try(client.getFileSystemClient(oldBucket).getFileClient(oldPath).rename(newBucket, newPath)).toEither.leftMap(
238238
FileMoveError(_, oldPath, newPath),
239239
).void
240+
241+
override def createDirectoryIfNotExists(bucket: String, path: String): Either[FileCreateError, Unit] = ().asRight
240242
}

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,6 @@ object PropsKeyEnum extends Enum[PropsKeyEntry] {
6868

6969
case object PostProcessActionPrefix extends PropsKeyEntry("post.process.action.prefix")
7070

71+
case object PostProcessActionRetain extends PropsKeyEntry("post.process.action.retain.dirs")
72+
7173
}

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocation.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ case class CloudLocation(
5959

6060
def prefixOrDefault(): String = prefix.getOrElse("")
6161

62+
def pathToLowestDirectory(): Option[String] = path.map(p => p.substring(0, p.lastIndexOf("/")))
63+
6264
private def validate(): Validated[Throwable, CloudLocation] =
6365
cloudLocationValidator.validate(this)
6466

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2017-2025 Lenses.io Ltd
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.lenses.streamreactor.connect.cloud.common.source.config
17+
18+
import cats.implicits.catsSyntaxEitherId
19+
import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError
20+
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
21+
22+
import scala.collection.mutable
23+
24+
/**
25+
* A cache for tracking directories that have been created.
26+
*
27+
* @param storageInterface The storage interface used to create directories.
28+
*/
29+
class DirectoryCache(storageInterface: StorageInterface[_]) {
30+
31+
// A mutable set to keep track of created directories.
32+
private val directoriesCreated = mutable.Set[(String, String)]()
33+
34+
/**
35+
* Ensures that a directory exists in the specified bucket and path.
36+
*
37+
* @param bucket The bucket in which the directory should exist.
38+
* @param path The path of the directory to check or create.
39+
* @return Either a FileCreateError if the directory creation failed, or Unit if the directory exists or was created successfully.
40+
*/
41+
def ensureDirectoryExists(bucket: String, path: String): Either[FileCreateError, Unit] =
42+
if (directoriesCreated.contains((bucket, path))) {
43+
().asRight
44+
} else {
45+
storageInterface.createDirectoryIfNotExists(bucket, path) match {
46+
case Left(value) => value.asLeft
47+
case Right(_) =>
48+
directoriesCreated.add((bucket, path))
49+
().asRight
50+
}
51+
}
52+
53+
}

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/PostProcessAction.scala

Lines changed: 65 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package io.lenses.streamreactor.connect.cloud.common.source.config
1717
import cats.effect.IO
18-
import cats.implicits.catsSyntaxEitherId
1918
import cats.implicits.toBifunctorOps
2019
import cats.implicits.toTraverseOps
2120
import com.typesafe.scalalogging.LazyLogging
@@ -24,6 +23,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnt
2423
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum
2524
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.PostProcessActionBucket
2625
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.PostProcessActionPrefix
26+
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.PostProcessActionRetain
2727
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
2828
import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEntry
2929
import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEnum
@@ -32,11 +32,50 @@ import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.Post
3232
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
3333
import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties
3434

35+
/**
36+
* Trait representing a post-process action to be performed on cloud storage.
37+
*/
3538
trait PostProcessAction {
39+
40+
/**
41+
* Runs the post-process action.
42+
*
43+
* @param storageInterface The storage interface used to interact with the cloud storage.
44+
* @param directoryCache The cache used to track created directories.
45+
* @param cloudLocation The location in the cloud storage where the action should be performed.
46+
* @return An IO effect representing the completion of the action.
47+
*/
3648
def run(
37-
cloudLocation: CloudLocation,
3849
storageInterface: StorageInterface[_],
50+
directoryCache: DirectoryCache,
51+
cloudLocation: CloudLocation,
3952
): IO[Unit]
53+
54+
/**
55+
* Prepares the paths needed for the post-process action.
56+
*
57+
* @param cloudLocation The location in the cloud storage.
58+
* @param directoryCache The cache used to track created directories.
59+
* @return An IO effect containing a tuple with the path and the directory path.
60+
*/
61+
protected def preparePaths(
62+
retainDirs: Boolean,
63+
cloudLocation: CloudLocation,
64+
directoryCache: DirectoryCache,
65+
): IO[(String, String)] =
66+
for {
67+
path <- IO.fromOption(cloudLocation.path)(
68+
new IllegalArgumentException("Cannot proceed without a path, this is probably a logic error"),
69+
)
70+
dirPath <- IO.fromOption(cloudLocation.pathToLowestDirectory())(
71+
new IllegalArgumentException("Cannot proceed without a path, this is probably a logic error"),
72+
)
73+
_ <- if (retainDirs) {
74+
IO.fromEither(directoryCache.ensureDirectoryExists(cloudLocation.bucket, dirPath).leftMap(_.exception))
75+
} else {
76+
IO.unit
77+
}
78+
} yield (path, dirPath)
4079
}
4180

4281
object PostProcessAction {
@@ -49,12 +88,16 @@ object PostProcessAction {
4988
)
5089
.map {
5190
case Delete =>
52-
new DeletePostProcessAction().asRight
91+
for {
92+
retainDirs: Boolean <- kcqlProperties.getBooleanOrDefault(PostProcessActionRetain, default = false)
93+
} yield new DeletePostProcessAction(retainDirs)
94+
5395
case Move => {
5496
for {
5597
destBucket <- kcqlProperties.getString(PostProcessActionBucket)
5698
destPrefix <- kcqlProperties.getString(PostProcessActionPrefix)
57-
} yield MovePostProcessAction(prefix, dropEndSlash(destBucket), dropEndSlash(destPrefix))
99+
retainDirs <- kcqlProperties.getBooleanOrDefault(PostProcessActionRetain, default = false).toOption
100+
} yield MovePostProcessAction(retainDirs, prefix, dropEndSlash(destBucket), dropEndSlash(destPrefix))
58101
}
59102
.toRight(new IllegalArgumentException("A bucket and a path must be specified for moving files to."))
60103
}
@@ -65,31 +108,37 @@ object PostProcessAction {
65108
def dropLastCharacterIfPresent(s: String, char: Char): String = if (s.lastOption.contains(char)) s.dropRight(1) else s
66109
}
67110

68-
class DeletePostProcessAction extends PostProcessAction with LazyLogging {
111+
class DeletePostProcessAction(retainDirs: Boolean) extends PostProcessAction with LazyLogging {
112+
69113
def run(
70-
cloudLocation: CloudLocation,
71114
storageInterface: StorageInterface[_],
115+
directoryCache: DirectoryCache,
116+
cloudLocation: CloudLocation,
72117
): IO[Unit] =
73118
for {
74-
_ <- IO.delay(logger.debug("Running delete for {}", cloudLocation))
75-
path <- IO.fromOption(cloudLocation.path)(
76-
new IllegalArgumentException("Cannot delete without a path, this is probably a logic error"),
77-
)
119+
_ <- IO.delay(logger.debug("Running delete for {}", cloudLocation))
120+
(path, _) <- preparePaths(retainDirs, cloudLocation, directoryCache)
121+
78122
del <- IO.fromEither(storageInterface.deleteFiles(cloudLocation.bucket, Seq(path)).leftMap(_.exception))
79123
} yield del
80124
}
81125

82-
case class MovePostProcessAction(originalPrefix: Option[String], newBucket: String, newPrefix: String)
83-
extends PostProcessAction
126+
case class MovePostProcessAction(
127+
retainDirs: Boolean,
128+
originalPrefix: Option[String],
129+
newBucket: String,
130+
newPrefix: String,
131+
) extends PostProcessAction
84132
with StrictLogging {
133+
85134
override def run(
86-
cloudLocation: CloudLocation,
87135
storageInterface: StorageInterface[_],
136+
directoryCache: DirectoryCache,
137+
cloudLocation: CloudLocation,
88138
): IO[Unit] =
89139
for {
90-
path <- IO.fromOption(cloudLocation.path)(
91-
new IllegalArgumentException("Cannot move without a path, this is probably a logic error"),
92-
)
140+
(path, _) <- preparePaths(retainDirs, cloudLocation, directoryCache)
141+
93142
newPath = originalPrefix.map(o => path.replace(o, newPrefix)).getOrElse(path)
94143
_ = logger.info(s"Moving file from ${cloudLocation.bucket}/$path to $newBucket/$newPath newPrefix: $newPrefix")
95144
mov <- IO.fromEither(

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/CloudSourcePropsSchema.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ object CloudSourcePropsSchema {
3939
PostProcessAction -> EnumPropsSchema(PostProcessActionEnum),
4040
PostProcessActionBucket -> StringPropsSchema,
4141
PostProcessActionPrefix -> StringPropsSchema,
42+
PostProcessActionRetain -> BooleanPropsSchema,
4243
)
4344

4445
val schema = KcqlPropsSchema(PropsKeyEnum, keys)

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/ReaderManager.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.typesafe.scalalogging.LazyLogging
2323
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
2424
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
2525
import io.lenses.streamreactor.connect.cloud.common.source.CommitWatermark
26+
import io.lenses.streamreactor.connect.cloud.common.source.config.DirectoryCache
2627
import io.lenses.streamreactor.connect.cloud.common.source.config.PostProcessAction
2728
import io.lenses.streamreactor.connect.cloud.common.source.files.SourceFileQueue
2829
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
@@ -45,6 +46,8 @@ class ReaderManager(
4546
maybePostProcessAction: Option[PostProcessAction],
4647
) extends LazyLogging {
4748

49+
val directoryCache = new DirectoryCache(storageInterface)
50+
4851
def poll(): IO[Vector[SourceRecord]] = {
4952
def fromNexFile(pollResults: Vector[SourceRecord], allLimit: Int): IO[Vector[SourceRecord]] =
5053
for {
@@ -135,7 +138,7 @@ class ReaderManager(
135138
maybePostProcessAction match {
136139
case Some(action) =>
137140
logger.info("PostProcess for {}", commitWatermark)
138-
action.run(commitWatermark.cloudLocation, storageInterface)
141+
action.run(storageInterface, directoryCache, commitWatermark.cloudLocation)
139142
case None =>
140143
logger.info("No PostProcess for {}", commitWatermark)
141144
IO.unit

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/StorageInterface.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,13 @@ trait StorageInterface[SM <: FileMetadata] extends ResultProcessors {
7070
def deleteFiles(bucket: String, files: Seq[String]): Either[FileDeleteError, Unit]
7171

7272
def mvFile(oldBucket: String, oldPath: String, newBucket: String, newPath: String): Either[FileMoveError, Unit]
73+
74+
/**
75+
* Creates a directory if it does not already exist.
76+
*
77+
* @param bucket The name of the bucket where the directory should be created.
78+
* @param path The path of the directory to create.
79+
* @return Either a FileCreateError if the directory creation failed, or Unit if the directory was created successfully or already exists.
80+
*/
81+
def createDirectoryIfNotExists(bucket: String, path: String): Either[FileCreateError, Unit]
7382
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2017-2025 Lenses.io Ltd
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.lenses.streamreactor.connect.cloud.common.source.config
17+
18+
import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError
19+
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
20+
import org.mockito.MockitoSugar
21+
import org.scalatest.funsuite.AnyFunSuiteLike
22+
import org.scalatest.matchers.should.Matchers
23+
24+
class DirectoryCacheTest extends AnyFunSuiteLike with Matchers with MockitoSugar {
25+
26+
test("ensureDirectoryExists should return Right(Unit) and add the directory to the cache if it does not exist") {
27+
val storageInterface = mock[StorageInterface[_]]
28+
when(storageInterface.createDirectoryIfNotExists("bucket", "path")).thenReturn(Right(()))
29+
val cache = new DirectoryCache(storageInterface)
30+
cache.ensureDirectoryExists("bucket", "path") should be(Right(()))
31+
cache.ensureDirectoryExists("bucket", "path") should be(Right(()))
32+
verify(storageInterface, times(1)).createDirectoryIfNotExists("bucket", "path")
33+
}
34+
35+
test(
36+
"ensureDirectoryExists should return Left(FileCreateError) and not add to the cache if creating the directory fails",
37+
) {
38+
val storageInterface = mock[StorageInterface[_]]
39+
val error = FileCreateError(new IllegalStateException("Bad"), "data")
40+
when(storageInterface.createDirectoryIfNotExists("bucket", "path")).thenReturn(Left(error))
41+
val cache = new DirectoryCache(storageInterface)
42+
cache.ensureDirectoryExists("bucket", "path") should be(Left(error))
43+
cache.ensureDirectoryExists("bucket", "path") should be(Left(error))
44+
verify(storageInterface, times(2)).createDirectoryIfNotExists("bucket", "path")
45+
}
46+
47+
test("ensureDirectoryExists should not add the directory to the cache if it already exists") {
48+
val storageInterface = mock[StorageInterface[_]]
49+
when(storageInterface.createDirectoryIfNotExists("bucket", "path")).thenReturn(Right(()))
50+
val cache = new DirectoryCache(storageInterface)
51+
cache.ensureDirectoryExists("bucket", "path") should be(Right(()))
52+
cache.ensureDirectoryExists("bucket", "path") should be(Right(()))
53+
verify(storageInterface, times(1)).createDirectoryIfNotExists("bucket", "path")
54+
cache.ensureDirectoryExists("bucket", "path") should be(Right(()))
55+
verify(storageInterface, times(1)).createDirectoryIfNotExists("bucket", "path")
56+
}
57+
}

0 commit comments

Comments
 (0)