Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -113,8 +114,14 @@ public static <T> T selectInstance(Instance<T> instances, String name) {
public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, Schema schema) {

if (!((SupportsNamespaces) icebergCatalog).namespaceExists(tableIdentifier.namespace())) {
((SupportsNamespaces) icebergCatalog).createNamespace(tableIdentifier.namespace());
LOGGER.warn("Created namespace:'{}'", tableIdentifier.namespace());
// multiple threads can try to create the namespace concurrently
// if table was already created, this will avoid the error of already exists
try {
((SupportsNamespaces) icebergCatalog).createNamespace(tableIdentifier.namespace());
LOGGER.warn("Created namespace:'{}'", tableIdentifier.namespace());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be Info instead of warn

} catch (AlreadyExistsException e) {
LOGGER.debug("Namespace '{}' already exists", tableIdentifier.namespace());
}
}
return icebergCatalog.createTable(tableIdentifier, schema);
}
Expand All @@ -131,8 +138,12 @@ public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier t
schema.identifierFieldNames());

if (!((SupportsNamespaces) icebergCatalog).namespaceExists(tableIdentifier.namespace())) {
((SupportsNamespaces) icebergCatalog).createNamespace(tableIdentifier.namespace());
LOGGER.warn("Created namespace:'{}'", tableIdentifier.namespace());
try {
((SupportsNamespaces) icebergCatalog).createNamespace(tableIdentifier.namespace());
LOGGER.warn("Created namespace:'{}'", tableIdentifier.namespace());
} catch (AlreadyExistsException e) {
LOGGER.debug("Namespace '{}' already exists", tableIdentifier.namespace());
}
}

// If we have partition transforms, create a PartitionSpec
Expand Down
34 changes: 19 additions & 15 deletions drivers/abstract/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,27 @@ func (a *AbstractDriver) Incremental(ctx context.Context, pool *destination.Writ
logger.Infof("Backfill skipped for stream[%s], already completed", stream.ID())
backfillWaitChannel <- stream.ID()
return nil
}
// Reset only mentioned cursor state while preserving other state values
a.state.ResetCursor(stream.Self())
} else if chunks := a.state.GetChunks(stream.Self()); chunks == nil || chunks.Len() == 0 {
// This else if condition is added, so that the cursor reset and new cursor fetch is done only if there are no pending chunks
// other wise it might cause loss of data if some chunks were already processed with old max cursor values

maxPrimaryCursorValue, maxSecondaryCursorValue, err := a.driver.FetchMaxCursorValues(ctx, stream)
if err != nil {
return fmt.Errorf("failed to fetch max cursor values: %s", err)
}
// Reset only mentioned cursor state while preserving other state values
a.state.ResetCursor(stream.Self())

a.state.SetCursor(stream.Self(), primaryCursor, a.reformatCursorValue(maxPrimaryCursorValue))
if maxPrimaryCursorValue == nil {
logger.Warnf("max primary cursor value is nil for stream: %s", stream.ID())
}
if secondaryCursor != "" {
a.state.SetCursor(stream.Self(), secondaryCursor, a.reformatCursorValue(maxSecondaryCursorValue))
if maxSecondaryCursorValue == nil {
logger.Warnf("max secondary cursor value is nil for stream: %s", stream.ID())
maxPrimaryCursorValue, maxSecondaryCursorValue, err := a.driver.FetchMaxCursorValues(ctx, stream)
if err != nil {
return fmt.Errorf("failed to fetch max cursor values: %s", err)
}

a.state.SetCursor(stream.Self(), primaryCursor, a.reformatCursorValue(maxPrimaryCursorValue))
if maxPrimaryCursorValue == nil {
logger.Warnf("max primary cursor value is nil for stream: %s", stream.ID())
}
if secondaryCursor != "" {
a.state.SetCursor(stream.Self(), secondaryCursor, a.reformatCursorValue(maxSecondaryCursorValue))
if maxSecondaryCursorValue == nil {
logger.Warnf("max secondary cursor value is nil for stream: %s", stream.ID())
}
}
}

Expand Down
Loading