Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions server/internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ var (
OptPathTerminalExecute = "/api/ams/v1/terminal/catalogs/%s/execute"
OptPathTerminalLogs = "/api/ams/v1/terminal/%s/logs"
// others
OptMaxTimeout = 30 * time.Second
OptQueryResultPollTime = 1500 * time.Millisecond
OptMinorCron = "self-optimizing.minor.trigger.cron"
OptMajorCron = "self-optimizing.major.trigger.cron"
OptFullCron = "self-optimizing.full.trigger.cron"
OptTargetFileSize = "self-optimizing.target-size"
OptEnableOptimization = "self-optimizing.enabled"
OptSQLCommand = "ALTER TABLE %s.%s SET TBLPROPERTIES (%s)"
OptMaxTimeout = 30 * time.Second
OptSessionTimeout = 5 * time.Minute // used for fusion poll (terminal query execution)
OptMinorCron = "self-optimizing.minor.trigger.cron"
OptMajorCron = "self-optimizing.major.trigger.cron"
OptFullCron = "self-optimizing.full.trigger.cron"
OptTargetFileSize = "self-optimizing.target-size"
OptEnableOptimization = "self-optimizing.enabled"
OptSQLCommand = "ALTER TABLE %s.%s SET TBLPROPERTIES (%s)"
// properties
OptCreatedAt = "created-at"
OptCacheEnabled = "cache-enabled"
Expand Down
27 changes: 7 additions & 20 deletions server/internal/handlers/optimization/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,26 @@ func (h *Handler) GetTablesWithDetails(c *gin.Context) {
utils.SuccessResponse(c, "Successfully fetched tables with details", tables)
}

// SetProperties configures table level properties for optimization
// SetProperties configures the same optimization properties on multiple tables (one terminal batch).
func (h *Handler) SetProperties(c *gin.Context) {
catalog, database, table, ok := h.requiredCatalogDatabaseTable(c)
catalog, database, ok := h.requiredCatalogAndDatabase(c)
if !ok {
return
}

var req dto.SQLInput
if err := utils.BindAndValidate(c, &req); err != nil {
utils.ErrorResponse(c, utils.StatusFromBindError(err), "invalid request body for setting config in table properties", err)
var tableConfigs dto.OptimizationTableConfig
if err := utils.BindAndValidate(c, &tableConfigs); err != nil {
utils.ErrorResponse(c, utils.StatusFromBindError(err), "invalid request body for optimization table config", err)
return
}

result, err := h.opt.SetProperties(c.Request.Context(), catalog, database, table, req)
result, err := h.opt.SetProperties(c.Request.Context(), catalog, database, tableConfigs)
if err != nil {
utils.ErrorResponse(c, upstreamStatus(err), err.Error(), err)
return
}

utils.SuccessResponse(c, result.Message, result)
}

func (h *Handler) requiredCatalogDatabaseTable(c *gin.Context) (string, string, string, bool) {
catalog := c.Param("catalog")
database := c.Param("database")
table := c.Param("table")

if catalog == "" || database == "" || table == "" {
utils.ErrorResponse(c, badRequestStatusCode, "catalog, database, and table parameters are required", nil)
return "", "", "", false
}

return catalog, database, table, true
utils.SuccessResponse(c, "Finished setting properties for selected tables", result)
}

func (h *Handler) requiredCatalogAndDatabase(c *gin.Context) (string, string, bool) {
Expand Down
23 changes: 10 additions & 13 deletions server/internal/models/dto/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,22 +162,19 @@ type LogInfo struct {
Logs []string `json:"logs"`
}

type SetTablePropertiesRequest struct {
Catalog string `json:"catalog"`
Database string `json:"database"`
Table string `json:"table"`
Properties map[string]string `json:"properties"`
}

// TerminalExecuteRequest represents the request body for terminal SQL execution
type TerminalExecuteRequest struct {
SQL string `json:"sql"`
}

type SQLInput struct {
MinorCron *string `json:"minor_cron"`
MajorCron *string `json:"major_cron"`
FullCron *string `json:"full_cron"`
TargetFileSize *int64 `json:"target_file_size"`
EnabledForOptimization *string `json:"enabled_for_optimization"`
// OptimizationTableConfig configures optimization properties for multiple tables in one terminal session.
type OptimizationTableConfig struct {
Tables []string `json:"tables" binding:"required"`
SQLInput struct {
MinorCron *string `json:"minor_cron"`
MajorCron *string `json:"major_cron"`
FullCron *string `json:"full_cron"`
TargetFileSize *int64 `json:"target_file_size"`
EnabledForOptimization *string `json:"enabled_for_optimization"`
} `json:"sql_input"`
}
1 change: 0 additions & 1 deletion server/internal/models/dto/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ type OptimizationInfo struct {
type TableProperties struct {
SessionID string `json:"sessionId"`
Success bool `json:"success"`
Message string `json:"message"`
Logs []string `json:"logs,omitempty"`
}

Expand Down
98 changes: 42 additions & 56 deletions server/internal/services/optimization/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,103 +13,89 @@ import (
"github.com/datazip-inc/olake-ui/server/internal/utils"
)

func (s *Service) SetProperties(ctx context.Context, catalog, database, table string, config dto.SQLInput) (*dto.TableProperties, error) {
func convertConfigToMap(config dto.OptimizationTableConfig) map[string]string {
properties := make(map[string]string)

if config.MinorCron != nil {
properties[constants.OptMinorCron] = *config.MinorCron
if config.SQLInput.MinorCron != nil {
properties[constants.OptMinorCron] = *config.SQLInput.MinorCron
}
if config.MajorCron != nil {
properties[constants.OptMajorCron] = *config.MajorCron
if config.SQLInput.MajorCron != nil {
properties[constants.OptMajorCron] = *config.SQLInput.MajorCron
}
if config.FullCron != nil {
properties[constants.OptFullCron] = *config.FullCron
if config.SQLInput.FullCron != nil {
properties[constants.OptFullCron] = *config.SQLInput.FullCron
}
if config.EnabledForOptimization != nil {
properties[constants.OptEnableOptimization] = *config.EnabledForOptimization
if config.SQLInput.EnabledForOptimization != nil {
properties[constants.OptEnableOptimization] = *config.SQLInput.EnabledForOptimization
}
if config.TargetFileSize != nil {
properties[constants.OptTargetFileSize] = utils.ConvertMBToBytes(*config.TargetFileSize)
if config.SQLInput.TargetFileSize != nil {
properties[constants.OptTargetFileSize] = utils.ConvertMBToBytes(*config.SQLInput.TargetFileSize)
}
return properties
}

// sql query
sqlResult, err := s.SetTableProperties(ctx, dto.SetTablePropertiesRequest{
Catalog: catalog,
Database: database,
Table: table,
Properties: properties,
})

if err != nil {
return nil, fmt.Errorf("failed to set optimization properties: %w", err)
func createAlterQuery(database, table string, properties map[string]string) string {
props := make([]string, 0, len(properties))
for k, value := range properties {
props = append(props, fmt.Sprintf("'%s'='%s'", k, value))
}
propsJoined := strings.Join(props, ", ")

return sqlResult, nil
return fmt.Sprintf(constants.OptSQLCommand, database, table, propsJoined) + ";"
}

// sets table properties using the SQL query
func (s *Service) SetTableProperties(ctx context.Context, req dto.SetTablePropertiesRequest) (*dto.TableProperties, error) {
var propsSQL []string
for key, value := range req.Properties {
propsSQL = append(propsSQL, fmt.Sprintf("'%s' = '%s'", key, value))
}
// set properties for multiple tables using sql query
func (s *Service) SetProperties(ctx context.Context, catalog, database string, config dto.OptimizationTableConfig) (*dto.TableProperties, error) {
tables := config.Tables
properties := convertConfigToMap(config)

sql := fmt.Sprintf(constants.OptSQLCommand, req.Database, req.Table, strings.Join(propsSQL, ", "))
alterTableQuery := make([]string, 0, len(tables))
for _, tableName := range tables {
alterTableQuery = append(alterTableQuery, createAlterQuery(database, tableName, properties))
}

// execute via Terminal API
path := fmt.Sprintf(constants.OptPathTerminalExecute, req.Catalog)
var sessionResult dto.TerminalSessionResponse
requestBody := dto.TerminalExecuteRequest{
SQL: sql,
SQL: strings.Join(alterTableQuery, "\n"),
}

var sessionResult dto.TerminalSessionResponse
if err := s.DoInto(ctx, http.MethodPost, path, url.Values{}, requestBody, &sessionResult); err != nil {
return nil, fmt.Errorf("failed to execute ALTER TABLE for %s.%s.%s: %w", req.Catalog, req.Database, req.Table, err)
if err := s.DoInto(ctx, http.MethodPost, fmt.Sprintf(constants.OptPathTerminalExecute, catalog), url.Values{}, requestBody, &sessionResult); err != nil {
return nil, fmt.Errorf("failed to execute bulk ALTER TABLE for catalog %s, database %s: %w", catalog, database, err)
}

// Poll for execution completion
logInfo, err := s.pollForCompletion(ctx, req.Catalog, sessionResult.SessionID)
logInfo, err := s.pollForCompletion(ctx, sessionResult.SessionID)

if err != nil {
return nil, fmt.Errorf("failed to poll for completion: %w", err)
}

// TODO: Fusion may return "Finished" even if the query fails (e.g., syntax error).
// Solution: validate execution status by checking logs for "Finished" vs "Failed".
success := logInfo.LogStatus == "Finished"
var message string
if success {
message = fmt.Sprintf("optimization sql command completed successfully with session ID: %s", sessionResult.SessionID)
} else {
message = fmt.Sprintf("optimization sql command failed with session ID: %s", sessionResult.SessionID)
}

// TODO: Fusion may return "Finished" even if the query fails, but query logs will contain error message
return &dto.TableProperties{
SessionID: sessionResult.SessionID,
Success: success,
Message: message,
Success: logInfo.LogStatus == "Finished",
Logs: logInfo.Logs,
}, nil
}

// pollForCompletion polls the terminal API for SQL execution completion
func (s *Service) pollForCompletion(ctx context.Context, _, sessionID string) (*dto.LogInfo, error) {
func (s *Service) pollForCompletion(ctx context.Context, sessionID string) (*dto.LogInfo, error) {
path := fmt.Sprintf(constants.OptPathTerminalLogs, sessionID)
timeoutCtx, cancel := context.WithTimeout(ctx, constants.OptMaxTimeout)
timeoutCtx, cancel := context.WithTimeout(ctx, constants.OptSessionTimeout)
defer cancel()

ticker := time.NewTicker(constants.OptQueryResultPollTime)
defer ticker.Stop()

for {
select {
case <-timeoutCtx.Done():
return nil, fmt.Errorf("timeout waiting for SQL execution to complete")
case <-ticker.C:
case <-time.After(1 * time.Second):
var logInfo dto.LogInfo
if err := s.DoInto(ctx, http.MethodGet, path, url.Values{}, nil, &logInfo); err != nil {
return nil, fmt.Errorf("failed to get logs for session %s: %w", sessionID, err)
}

if logInfo.LogStatus == "Running" {
continue
}

return &logInfo, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/routes/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func RegisterRoutes(engine *gin.Engine, h *handlers.Handler) {
opt.DELETE("/catalog/:catalog", optHandler.DeleteCatalog)

// terminal: cron, enable/disable optimization
opt.PUT("/:catalog/:database/:table/config", optHandler.SetProperties)
opt.PUT("/:catalog/:database/tables/config", optHandler.SetProperties)

// tables: view
opt.GET("/:catalog/:database/tables", optHandler.GetTablesWithDetails)
Expand Down
4 changes: 2 additions & 2 deletions ui/src/config/apiConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ export const API_CONFIG = {
catalogName
? `/api/opt/v1/catalog/${encodeURIComponent(catalogName)}`
: `/api/opt/v1/catalog`,
TABLE_CONFIG: (catalog: string, database: string, tableName: string) =>
`/api/opt/v1/${encodeURIComponent(catalog)}/${encodeURIComponent(database)}/${encodeURIComponent(tableName)}`,
TABLE_CONFIG: (catalog: string, database: string) =>
`/api/opt/v1/${encodeURIComponent(catalog)}/${encodeURIComponent(database)}/tables/config`,
TABLE: (catalog: string, database: string, tableName: string) =>
`/api/opt/v1/tables/catalogs/${encodeURIComponent(catalog)}/dbs/${encodeURIComponent(database)}/tables/${encodeURIComponent(tableName)}`,
TABLES: (catalog: string, database: string) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type {
TableMetricsApiResponse,
UpdateTableCronApiRequest,
UpdateTableConfigApiResponse,
UpdateTablesConfigApiRequest,
} from "../types"

export const tableService = {
Expand Down Expand Up @@ -86,9 +87,14 @@ export const tableService = {
payload: UpdateTableCronApiRequest,
): Promise<UpdateTableConfigApiResponse> => {
try {
const request: UpdateTablesConfigApiRequest = {
tables: [tableName],
sql_input: payload,
}

const response = await api.put<UpdateTableConfigApiResponse>(
`${API_CONFIG.ENDPOINTS.OPT.TABLE_CONFIG(catalog, database, tableName)}/config`,
payload,
API_CONFIG.ENDPOINTS.OPT.TABLE_CONFIG(catalog, database),
request,
{ disableErrorNotification: true },
)
return response.data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ export interface UpdateTableCronApiRequest {
target_file_size?: number
}

export interface UpdateTablesConfigApiRequest {
tables: string[]
sql_input: UpdateTableCronApiRequest
}

// Frontend Domain Types
export type FilterKey = "all" | "olake" | "external"

Expand Down Expand Up @@ -191,8 +196,9 @@ export interface CancelRunRequest {
}

export interface UpdateTableConfigApiResponse {
sessionId?: string
success: boolean
message: string
message?: string
logs?: string[]
}

Expand Down
Loading