Skip to content

Commit 5be2a5e

Browse files
committed
Add S3 module with utilities
1 parent 98176c8 commit 5be2a5e

4 files changed

Lines changed: 224 additions & 0 deletions

File tree

build.sbt

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,36 @@ lazy val `aws-lambda` = project
8787
}
8888
)
8989

90+
lazy val S3 = project
91+
.in(file("s3"))
92+
.dependsOn(core)
93+
.settings(commonSettings)
94+
.settings(
95+
name := "latis3-s3",
96+
libraryDependencies ++= Seq(
97+
"com.amazonaws" % "aws-java-sdk-core" % "1.12.785" % Test,
98+
"ch.qos.logback" % "logback-classic" % logbackVersion % Test,
99+
"com.github.fs2-blobstore" %% "s3" % "0.9.15",
100+
"software.amazon.awssdk" % "s3" % "2.31.59",
101+
"com.dimafeng" %% "testcontainers-scala-localstack-v2" % "0.43.0" % Test
102+
)
103+
)
104+
105+
lazy val awsS3 = project
106+
.in(file("aws-s3"))
107+
.dependsOn(core)
108+
.settings(commonSettings)
109+
.settings(
110+
name := "latis3-aws-s3",
111+
libraryDependencies ++= Seq(
112+
"com.amazonaws" % "aws-java-sdk-core" % "1.12.746" % Test,
113+
"ch.qos.logback" % "logback-classic" % logbackVersion % Test,
114+
"com.github.fs2-blobstore" %% "s3" % "0.9.14",
115+
"software.amazon.awssdk" % "s3" % "2.26.5",
116+
"com.dimafeng" %% "testcontainers-scala-localstack-v2" % "0.41.4" % Test
117+
)
118+
)
119+
90120
lazy val core = project
91121
.dependsOn(`dap2-parser`)
92122
.dependsOn(macros)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package latis.util
2+
3+
import scala.jdk.CollectionConverters.*
4+
5+
import blobstore.s3.S3Store
6+
import blobstore.url.*
7+
import cats.effect.Async
8+
import cats.syntax.all.*
9+
import fs2.*
10+
import software.amazon.awssdk.regions.Region
11+
import software.amazon.awssdk.services.s3.S3AsyncClient
12+
import software.amazon.awssdk.services.s3.S3Client
13+
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request
14+
15+
object S3Utils {
16+
//TODO: generalize for any supported blobstore: azure, box, gcs (google), sftp
17+
//TODO: special concerns for directory buckets?
18+
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
19+
// "For general purpose buckets, ListObjectsV2 returns objects in lexicographical order based on their key names."
20+
// "For directory buckets, ListObjectsV2 does not return objects in lexicographical order."
21+
// Do we need to traverse directories and order as we go?
22+
23+
/** Makes an S3 client. */
24+
def makeClient(region: Region = Region.US_EAST_1): S3Client =
25+
S3Client.builder.region(region).build
26+
27+
/** Makes an effectful S3 client. */
28+
//TODO: Async here? delay?
29+
def makeClientF[F[_] : Async](region: Region = Region.US_EAST_1): F[S3AsyncClient] =
30+
Async[F].delay(S3AsyncClient.builder.region(region).build)
31+
32+
/** Returns an Iterator of keys for the objects in an S3 bucket. */
33+
def getKeys(client: S3Client, bucket: String, prefix: String): Iterator[String] = {
34+
val request = ListObjectsV2Request.builder
35+
.bucket(bucket)
36+
.prefix(prefix)
37+
.build
38+
39+
client.listObjectsV2Paginator(request).iterator.asScala
40+
.flatMap(resp => resp.contents.asScala.map(_.key))
41+
}
42+
43+
/** Effectfully gets a Stream of keys from an S3 bucket. */
44+
//TODO: take advantage of S3Client's startAfter (e.g. polling for updates), not in S3Store?
45+
//TODO: optional prefix? default = "/"?
46+
def getKeysF[F[_] : Async](client: S3AsyncClient, bucketName: String, prefix: String): F[Stream[F, String]] = {
47+
for {
48+
// Just keeping the first error out of possibly many. //TODO: risk missing useful error?
49+
store <- S3Store.builder[F](client).build.leftMap(_.head).liftTo[F]
50+
host <- Hostname.parseF[F](bucketName)
51+
} yield {
52+
val url = Url("s3", Authority(host), Path(prefix))
53+
store.list(url, true).map(_.representation.key)
54+
}
55+
}
56+
57+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!DOCTYPE configuration>
3+
4+
<configuration>
5+
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
6+
<import class="ch.qos.logback.core.ConsoleAppender"/>
7+
8+
<appender name="STDOUT" class="ConsoleAppender">
9+
<encoder class="PatternLayoutEncoder">
10+
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
11+
</encoder>
12+
</appender>
13+
14+
<root level="error">
15+
<appender-ref ref="STDOUT"/>
16+
</root>
17+
</configuration>
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package latis.util
2+
3+
import scala.concurrent.duration.Duration
4+
5+
import cats.effect.IO
6+
import cats.effect.Resource
7+
import com.dimafeng.testcontainers.LocalStackV2Container
8+
import munit.catseffect.IOFixture
9+
import org.testcontainers.containers.localstack.LocalStackContainer.Service
10+
import software.amazon.awssdk.core.async.AsyncRequestBody
11+
import software.amazon.awssdk.services.s3.S3AsyncClient
12+
import software.amazon.awssdk.services.s3.model.CreateBucketRequest
13+
import software.amazon.awssdk.services.s3.model.PutObjectRequest
14+
15+
import latis.util.S3Utils.*
16+
17+
class S3UtilsSuite extends munit.CatsEffectSuite {
18+
19+
//Starting the test container is slow.
20+
override val munitIOTimeout: Duration = Duration(1, "minute")
21+
22+
private def makeBucket(client: S3AsyncClient, bucket: String) = {
23+
IO.fromCompletableFuture {
24+
IO {
25+
client.createBucket(
26+
CreateBucketRequest.builder().bucket(bucket).build()
27+
)
28+
}
29+
}
30+
}
31+
32+
private def putEmpty(client: S3AsyncClient, bucket: String, key: String) =
33+
IO.fromCompletableFuture {
34+
IO {
35+
client.putObject(
36+
PutObjectRequest.builder().bucket(bucket).key(key).build(),
37+
AsyncRequestBody.empty()
38+
)
39+
}
40+
}
41+
42+
val s3Client: IOFixture[S3AsyncClient] = {
43+
val localStack: Resource[IO, LocalStackV2Container] = {
44+
val mkContainer = IO {
45+
LocalStackV2Container(services = List(Service.S3))
46+
}.flatTap(container => IO(container.start()))
47+
Resource.make(mkContainer)(container => IO(container.stop()))
48+
}
49+
50+
val client: Resource[IO, S3AsyncClient] = localStack.flatMap { ls =>
51+
val mkClient = IO {
52+
S3AsyncClient
53+
.builder()
54+
.endpointOverride(ls.endpointOverride(Service.S3))
55+
.credentialsProvider(ls.staticCredentialsProvider)
56+
.region(ls.region)
57+
.build()
58+
}
59+
Resource.make(mkClient)(client => IO(client.close()))
60+
}.evalTap { client =>
61+
makeBucket(client, "foobar")
62+
>> putEmpty(client, "foobar", "foo")
63+
>> putEmpty(client, "foobar", "bar")
64+
>> makeBucket(client, "barfoo")
65+
>> putEmpty(client, "foobar", "bar")
66+
>> putEmpty(client, "foobar", "foo")
67+
}
68+
69+
ResourceSuiteLocalFixture("s3-client", client)
70+
}
71+
72+
override val munitFixtures = List(s3Client)
73+
74+
test("ordering") {
75+
IO(s3Client()).flatTap { client =>
76+
getKeysF[IO](client, "foobar", "/").flatMap { stream =>
77+
stream.compile.toList.map {
78+
case "bar" :: "foo" :: Nil => ()
79+
case _ => fail("Objects not ordered as expected.")
80+
}
81+
}
82+
}
83+
}
84+
85+
//=== Test with NOAA data ===//
86+
87+
val bucket = "noaa-nesdis-swfo-ccor-1-pds"
88+
val prefix = "SWFO/GOES-19/CCOR-1/ccor1-l3_science/"
89+
val first = "SWFO/GOES-19/CCOR-1/ccor1-l3_science/2025/02/25/" +
90+
"sci_ccor1-l3_g19_s20250225T000020Z_e20250225T000048Z_p20250401T234952Z_pub.fits"
91+
92+
test("pure client") {
93+
val client = makeClient()
94+
val key = getKeys(client, bucket, prefix).toList.head
95+
assertEquals(first, key)
96+
}
97+
98+
test("effectful client") {
99+
for {
100+
client <- makeClientF[IO]()
101+
keys <- getKeysF[IO](client, bucket, prefix)
102+
last <- keys.head.compile.last
103+
key <- IO.fromOption(last)(fail("Empty list of keys")) //TODO: does this failure work?
104+
} yield {
105+
assertEquals(first, key)
106+
}
107+
}
108+
109+
test("chunk size") {
110+
for {
111+
client <- makeClientF[IO]()
112+
keys <- getKeysF[IO](client, bucket, prefix)
113+
last <- keys.chunks.head.compile.last
114+
chunk <- IO.fromOption(last)(fail("Empty list of keys")) //TODO: does this failure work?
115+
} yield {
116+
assertEquals(1000, chunk.size)
117+
}
118+
}
119+
120+
}

0 commit comments

Comments
 (0)