[FLINK-37984] Cassandra Sink V2 Implementation (FLIP-533) (Phase-1)#38
Open
Poorvankbhatia wants to merge 1 commit into
Open
[FLINK-37984] Cassandra Sink V2 Implementation (FLIP-533) (Phase-1)#38Poorvankbhatia wants to merge 1 commit into
Poorvankbhatia wants to merge 1 commit into
Conversation
Contributor
Author
|
Hey @dannycranmer @MartijnVisser can you help with this review? Thanks. cc: @echauchot for visibility. |
5386e16 to
f9e0c15
Compare
f9e0c15 to
0fbdea6
Compare
Contributor
Author
|
Hey @vahmed-hamdy PTAL :) Thank you. |
Contributor
Author
|
cc: @Samrat002 for visibility. |
|
I think this PR could be broken down to smaller PR, it might be easier to get it reviewed and merged in that case. |
Contributor
Author
Thanks @vahmed-hamdy . Let me break it down. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Cassandra Sink V2
This PR is a detailed Phase-1 implementation of FLIP-533.
It adds a new Cassandra sink for Flink that cleanly separates planning (CQL generation/binding) from writing
POJO writing via DataStax Mapper; Row/Tuple & Scala.Product via static & dynamic CQL
What this unlocks
Static CQL (
Row,Tuple,Scala.Product)UPDATE ... SET ... WHERE ...(incl.IFLWT,USING TTL/TIMESTAMP).UNSETto avoid tombstones.Dynamic CQL (
Row,Tuple)SET, skip nulls.USINGandIFgenerated from the record.Capabilities (what the sink supports)
Static CQL (user-provided):
Dynamic CQL (planned per record):
TableResolverColumnValueResolverCqlClauseResolverStatementCustomizer(consistency/timeout)PreparedStatementCacheInput formats:
Writer semantics:
flush()waits for all in-flight operationsMetrics (representative):
numRecordsOut,numRecordsOutErrors,retriesFeature matrix
.save()upsert)IF,IF EXISTS/NOT EXISTS)saveNullFields=false)How users use it
POJO TYPE
CQL TYPES (STATIC QUERIES)
CQL TYPES (DYNAMIC)
What’s included (by area)
Architecture Overview
Sink entrypoints
CassandraSink.java- Main sink implementation with Sink V2 APICassandraSinkWriter.java- Async write coordinator with backpressureCassandraSinkBuilder.java- Fluent builder for sink configurationConfiguration Layer
CassandraSinkConfig.java- Base configuration interfacePojoSinkConfig.java- POJO-specific with DataStax Mapper supportCqlSinkConfig.java- CQL query-based configurationRowDataSinkConfig.java- Table API RowData configurationRequestConfiguration.java- Write operation parametersWriter runtime & failure handling
RecordWriterFactoryCassandraFailureHandlerCassandraFatalExceptionClassifierMaxRetriesExceededExceptionQuery Planning
StatementPlanner.java- Core planning interfaceStaticPlannerAssembler.java- Pre-compiled statement generationDynamicPlannerAssembler.java- Runtime query constructionPreparedStatementCache.java- Statement caching layerStrategy Pattern Implementation
InsertStrategy.java/StaticInsertStrategy.java- INSERT operationsUpdateStrategy.java/StaticUpdateStrategy.java- UPDATE operationsNullUnsettingCustomizer.java- Null field handling customizationResolvers & Helpers
TableResolver,FixedTableResolver,TableRefColumnValueResolver,FixedColumnValueResolver,ResolvedWriteCqlClauseResolver,NoOpClauseResolver,ClauseBindingsStatementCustomizer,NoOpCustomizer,NullUnsettingCustomizerQueryParser,CqlStatementHelperTesting in this PR
A. Unit tests (no Cassandra; fast & deterministic)
DynamicPlannerAssemblerTest,StaticPlannerAssemblerTest— assembleStatementPlannerfor dynamic/static paths; verify routing, clause resolution, strategy selection.RequestConfigurationTest— concurrency/retry/timeout and builder validation.CqlSinkConfigTest,PojoSinkConfigTest— static vs dynamic toggles, null-unsetting, customizers, format selection.InsertStrategyTest,StaticInsertStrategyTest,UpdateStrategyTest,StaticUpdateStrategyTest— CQL generation for INSERT/UPDATE, TTL/TIMESTAMP/IF.QueryParserTest— user CQL parsing, placeholder counts, safety checks.ColumnValueResolverTest,FixedColumnValueResolverTest— value mapping, null semantics.ResolvedWriteTest— field extraction, composite PK binding.TableRefTest— keyspace/table/quoted identifiers.PreparedStatementCacheTest— reuse/eviction behavior.CqlStatementHelperTest— binding helpers & UNSET handling.StatementPlannerFactoryTest,StatementPlannerTest— end-to-end plan → bind (incl. clause/customizer/multi-keyspace).RecordWriterFactoryTest— picks correct low-level writer per format.CassandraSinkBuilderTest— public builder validation and option wiring.CassandraSinkWriterTest— permits/backpressure, permit release (success/error), retries & max-retries, fatal short-circuit,flush()/close()semantics, mailbox callback threading, metrics, no deadlocks.B. Integration tests (real Cassandra; smoke coverage)
CassandraCqlSinkITCase— static CQL path: INSERT/UPDATE variants, USING TTL/TIMESTAMP/IF, quoted identifiers, composite keys, ignore-null (UNSET), fatal validation messages. (Dynamic Table & Column Resolvers.)CassandraPojoSinkITCase— POJO path: mapper options (TTL, consistency, saveNullFields), upsert semantics, null-PK failure, missing table, TTL expiry.CassandraSinkWriterITCase— end-to-end writer loop with mailbox & permits: backpressure atmaxConcurrentRequests=1,flush()waits,close()idempotent, fatal error (bad table) surfaces once.