Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 1 addition & 250 deletions Plugins/DynamoDBDriverPlugin/DynamoDBPluginDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ internal final class DynamoDBPluginDriver: PluginDatabaseDriver, @unchecked Send
// Table description cache to avoid repeated DescribeTable calls
private var _tableDescriptionCache: [String: TableDescription] = [:]

// Pagination state per query
private var _paginationStates: [String: PaginationState] = [:]

private var connection: DynamoDBConnection? {
lock.withLock { _connection }
}
Expand Down Expand Up @@ -82,7 +79,6 @@ internal final class DynamoDBPluginDriver: PluginDatabaseDriver, @unchecked Send
_connection?.disconnect()
_connection = nil
_tableDescriptionCache.removeAll()
_paginationStates.removeAll()
}
}

Expand Down Expand Up @@ -490,21 +486,7 @@ internal final class DynamoDBPluginDriver: PluginDatabaseDriver, @unchecked Send
extractKeySchema(from: _tableDescriptionCache[table])
}

let typeNames: [String] = lock.withLock {
let matchingState = _paginationStates.values.first { state in
if let parsed = DynamoDBQueryBuilder.parseScanQuery(state.queryKey) {
return parsed.tableName == table
}
if let parsed = DynamoDBQueryBuilder.parseQueryQuery(state.queryKey) {
return parsed.tableName == table
}
return false
}
if let state = matchingState {
return state.columnTypeNames
}
return columns.map { _ in "S" }
}
let typeNames: [String] = columns.map { _ in "S" }

let generator = DynamoDBStatementGenerator(
tableName: table,
Expand Down Expand Up @@ -817,60 +799,6 @@ internal final class DynamoDBPluginDriver: PluginDatabaseDriver, @unchecked Send
throw DynamoDBError.serverError("Invalid tagged query format")
}

private func executePaginatedTaggedQuery(
_ query: String, offset: Int, limit: Int,
conn: DynamoDBConnection, startTime: Date
) async throws -> PluginQueryResult {
let queryKey = query

// If offset is 0, start a fresh scan/query
if offset == 0 {
lock.withLock { _paginationStates.removeValue(forKey: queryKey) }
}

var state = lock.withLock { _paginationStates[queryKey] }

// If we have cached items that cover the requested range, serve from cache
if let existingState = state {
let end = min(offset + limit, existingState.allItems.count)
if offset < existingState.allItems.count {
let pageItems = Array(existingState.allItems[offset..<end])
let rows = DynamoDBItemFlattener.flatten(items: pageItems, columns: existingState.discoveredColumns)
return PluginQueryResult(
columns: existingState.discoveredColumns,
columnTypeNames: existingState.columnTypeNames,
rows: rows,
rowsAffected: 0,
executionTime: Date().timeIntervalSince(startTime)
)
}

// Need more items but exhausted
if existingState.isExhausted {
return emptyResult(columns: existingState.discoveredColumns,
typeNames: existingState.columnTypeNames,
startTime: startTime)
}
}

// Fetch more items from DynamoDB
if let parsed = DynamoDBQueryBuilder.parseScanQuery(query) {
return try await fetchMoreScanItems(
parsed: parsed, queryKey: queryKey, offset: offset, limit: limit,
existingState: state, conn: conn, startTime: startTime
)
}

if let parsed = DynamoDBQueryBuilder.parseQueryQuery(query) {
return try await fetchMoreQueryItems(
parsed: parsed, queryKey: queryKey, offset: offset, limit: limit,
existingState: state, conn: conn, startTime: startTime
)
}

return try await executeTaggedQuery(query, conn: conn, startTime: startTime)
}

// MARK: - Scan Execution

private func executeScan(
Expand Down Expand Up @@ -989,161 +917,6 @@ internal final class DynamoDBPluginDriver: PluginDatabaseDriver, @unchecked Send
)
}

// MARK: - Paginated Fetch Helpers

private func fetchMoreScanItems(
parsed: DynamoDBParsedScanQuery,
queryKey: String,
offset: Int,
limit: Int,
existingState: PaginationState?,
conn: DynamoDBConnection,
startTime: Date
) async throws -> PluginQueryResult {
let keySchema = try await cachedKeySchema(parsed.tableName, conn: conn)
var state = existingState ?? PaginationState(
queryKey: queryKey,
allItems: [],
lastEvaluatedKey: nil,
isExhausted: false,
discoveredColumns: [],
columnTypeNames: [],
keySchema: keySchema
)

// Fetch until we have enough items or exhaust the table
let needed = offset + limit
while state.allItems.count < needed && !state.isExhausted {
let batchSize = min(needed - state.allItems.count, 1000)
let response = try await conn.scan(
tableName: parsed.tableName,
limit: batchSize,
exclusiveStartKey: state.lastEvaluatedKey
)

var newItems = response.Items ?? []

if !parsed.filters.isEmpty {
newItems = applyClientFilters(
items: newItems, filters: parsed.filters, logicMode: parsed.logicMode
)
}

state.allItems.append(contentsOf: newItems)
state.lastEvaluatedKey = response.LastEvaluatedKey

if response.LastEvaluatedKey == nil {
state.isExhausted = true
}

if state.allItems.count >= Self.maxItems {
state.isExhausted = true
state.allItems = Array(state.allItems.prefix(Self.maxItems))
}
}

// Update discovered columns from all items
state.discoveredColumns = DynamoDBItemFlattener.unionColumns(from: state.allItems, keySchema: keySchema)
state.columnTypeNames = DynamoDBItemFlattener.columnTypeNames(
for: state.discoveredColumns, items: state.allItems
)

lock.withLock { _paginationStates[queryKey] = state }

// Serve the requested page
let end = min(offset + limit, state.allItems.count)
guard offset < state.allItems.count else {
return emptyResult(columns: state.discoveredColumns, typeNames: state.columnTypeNames, startTime: startTime)
}
let pageItems = Array(state.allItems[offset..<end])
let rows = DynamoDBItemFlattener.flatten(items: pageItems, columns: state.discoveredColumns)

return PluginQueryResult(
columns: state.discoveredColumns,
columnTypeNames: state.columnTypeNames,
rows: rows,
rowsAffected: 0,
executionTime: Date().timeIntervalSince(startTime)
)
}

private func fetchMoreQueryItems(
parsed: DynamoDBParsedQueryQuery,
queryKey: String,
offset: Int,
limit: Int,
existingState: PaginationState?,
conn: DynamoDBConnection,
startTime: Date
) async throws -> PluginQueryResult {
let keySchema = try await cachedKeySchema(parsed.tableName, conn: conn)
var state = existingState ?? PaginationState(
queryKey: queryKey,
allItems: [],
lastEvaluatedKey: nil,
isExhausted: false,
discoveredColumns: [],
columnTypeNames: [],
keySchema: keySchema
)

var expressionValues: [String: DynamoDBAttributeValue] = [:]
switch parsed.partitionKeyType {
case "N":
expressionValues[":pkval"] = .number(parsed.partitionKeyValue)
default:
expressionValues[":pkval"] = .string(parsed.partitionKeyValue)
}
let keyCondition = "\(parsed.partitionKeyName) = :pkval"

let needed = offset + limit
while state.allItems.count < needed && !state.isExhausted {
let batchSize = min(needed - state.allItems.count, 1000)
let response = try await conn.query(
tableName: parsed.tableName,
keyConditionExpression: keyCondition,
expressionAttributeValues: expressionValues,
limit: batchSize,
exclusiveStartKey: state.lastEvaluatedKey
)

let fetchedItems = response.Items ?? []
state.allItems.append(contentsOf: fetchedItems)
state.lastEvaluatedKey = response.LastEvaluatedKey

if response.LastEvaluatedKey == nil {
state.isExhausted = true
}

if state.allItems.count >= Self.maxItems {
state.isExhausted = true
state.allItems = Array(state.allItems.prefix(Self.maxItems))
}
}

state.discoveredColumns = DynamoDBItemFlattener.unionColumns(from: state.allItems, keySchema: keySchema)
state.columnTypeNames = DynamoDBItemFlattener.columnTypeNames(
for: state.discoveredColumns, items: state.allItems
)

lock.withLock { _paginationStates[queryKey] = state }

let end = min(offset + limit, state.allItems.count)
guard offset < state.allItems.count else {
return emptyResult(columns: state.discoveredColumns, typeNames: state.columnTypeNames, startTime: startTime)
}
let pageItems = Array(state.allItems[offset..<end])
let rows = DynamoDBItemFlattener.flatten(items: pageItems, columns: state.discoveredColumns)

return PluginQueryResult(
columns: state.discoveredColumns,
columnTypeNames: state.columnTypeNames,
rows: rows,
rowsAffected: 0,
executionTime: Date().timeIntervalSince(startTime)
)
}

// MARK: - PartiQL Execution

private func executePartiQL(
Expand Down Expand Up @@ -1444,31 +1217,9 @@ internal final class DynamoDBPluginDriver: PluginDatabaseDriver, @unchecked Send
}
}

private func emptyResult(columns: [String], typeNames: [String], startTime: Date) -> PluginQueryResult {
PluginQueryResult(
columns: columns.isEmpty ? ["Result"] : columns,
columnTypeNames: typeNames.isEmpty ? ["String"] : typeNames,
rows: [],
rowsAffected: 0,
executionTime: Date().timeIntervalSince(startTime)
)
}

private func formatBytes(_ bytes: Int64) -> String {
let formatter = ByteCountFormatter()
formatter.countStyle = .binary
return formatter.string(fromByteCount: bytes)
}
}

// MARK: - Pagination State

private struct PaginationState {
let queryKey: String
var allItems: [[String: DynamoDBAttributeValue]]
var lastEvaluatedKey: [String: DynamoDBAttributeValue]?
var isExhausted: Bool
var discoveredColumns: [String]
var columnTypeNames: [String]
var keySchema: [(name: String, keyType: String)]
}
Loading
Loading