Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managed + Iceberg IO #5494

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft

Managed + Iceberg IO #5494

wants to merge 14 commits into from

Conversation

kellen
Copy link
Contributor

@kellen kellen commented Sep 17, 2024

Adds support for Beam's managed transforms and for Iceberg, which is implemented as a managed transform.

Note this is on a snapshot of magnolify, will need a release before merge.

Copy link

codecov bot commented Sep 17, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 61.32%. Comparing base (a464955) to head (d3b67dc).
Report is 8 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5494      +/-   ##
==========================================
- Coverage   61.32%   61.32%   -0.01%     
==========================================
  Files         312      312              
  Lines       11080    11082       +2     
  Branches      770      728      -42     
==========================================
+ Hits         6795     6796       +1     
- Misses       4285     4286       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@RustedBones RustedBones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the new module in the README ?

Comment on lines +69 to +72
new Schema(
NestedField.required(0, "a", IntegerType.get()),
NestedField.required(1, "b", StringType.get())
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this be given by the RowType ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is an Iceberg schema rather than the Beam schema. The lack of create-on-write does raise the question of whether we also need to derive the iceberg schemas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe RowType could also offer a def icebergSchema? (Similar to how magnolify-parquet has both def schema and def avroSchema...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would mean pulling in iceberg deps into the beam module fyi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could make it provided, I guess, but point taken

Seems like Beam should have a utility function for converting between Beam/Icerberg Schemas. They have similar stuff for BQ/Avro/BeamSchema interop. Maybe we could contribute there

Copy link
Contributor Author

@kellen kellen Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beam does have this but it introduces a dep on the iceberg part of the sdk that in theory should be managed.

I could use it in the integration test directly but that wouldn't help users at all.

IcebergUtils.beamSchemaToIcebergSchema(rowType.schema)

OTOH ... the class has this comment so 🤷

  // This is made public for users convenience, as many may have more experience working with
  // Iceberg types.

Comment on lines +32 to +45
private lazy val _config: java.util.Map[String, Object] = {
// recursively convert this yaml-compatible nested scala map to java map
// we either do this or the user has to create nested java maps in scala code
// both are bad
def _convert(a: Object): Object = {
a match {
case m: Map[_, _] =>
m.asInstanceOf[Map[_, Object]].map { case (k, v) => k -> _convert(v) }.asJava
case i: Iterable[_] => i.map(o => _convert(o.asInstanceOf[Object])).asJava
case _ => a
}
}
config.map { case (k, v) => k -> _convert(v) }.asJava
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should introduce either a config AST to ensure what is passed is YAML compatible, or maybe use lightbend config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API can aslo take a yaml file location, e.g. classpath://foo.yaml if we wanted to support that.

@RustedBones RustedBones marked this pull request as draft September 18, 2024 08:42
@RustedBones
Copy link
Contributor

Converted to draft until magnolify gets released

Copy link
Contributor

@clairemcginty clairemcginty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good to me! the amount of testing+examples are impressive

import org.apache.beam.sdk.values.{PCollectionRowTuple, Row}
import scala.jdk.CollectionConverters._

final case class ManagedIO(ioName: String, config: Map[String, Object]) extends ScioIO[Row] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess once this is merged, we can add a ManagedTypedIO to the 0.15.x branch? maybe let's file a ticket to track the Magnolify API work...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to support managed very extensively, so I decided to just skip any typed variant for Managed in favor of doing it downstream in our io-specific APIs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants