diff --git a/server/internal/constants/constants.go b/server/internal/constants/constants.go index eaa96389..fd7265f6 100644 --- a/server/internal/constants/constants.go +++ b/server/internal/constants/constants.go @@ -101,14 +101,14 @@ var ( OptPathTerminalExecute = "/api/ams/v1/terminal/catalogs/%s/execute" OptPathTerminalLogs = "/api/ams/v1/terminal/%s/logs" // others - OptMaxTimeout = 30 * time.Second - OptQueryResultPollTime = 1500 * time.Millisecond - OptMinorCron = "self-optimizing.minor.trigger.cron" - OptMajorCron = "self-optimizing.major.trigger.cron" - OptFullCron = "self-optimizing.full.trigger.cron" - OptTargetFileSize = "self-optimizing.target-size" - OptEnableOptimization = "self-optimizing.enabled" - OptSQLCommand = "ALTER TABLE %s.%s SET TBLPROPERTIES (%s)" + OptMaxTimeout = 30 * time.Second + OptSessionTimeout = 5 * time.Minute // used for fusion poll (terminal query execution) + OptMinorCron = "self-optimizing.minor.trigger.cron" + OptMajorCron = "self-optimizing.major.trigger.cron" + OptFullCron = "self-optimizing.full.trigger.cron" + OptTargetFileSize = "self-optimizing.target-size" + OptEnableOptimization = "self-optimizing.enabled" + OptSQLCommand = "ALTER TABLE %s.%s SET TBLPROPERTIES (%s)" // properties OptCreatedAt = "created-at" OptCacheEnabled = "cache-enabled" diff --git a/server/internal/handlers/optimization/table.go b/server/internal/handlers/optimization/table.go index 8aeb035f..16714757 100644 --- a/server/internal/handlers/optimization/table.go +++ b/server/internal/handlers/optimization/table.go @@ -22,39 +22,26 @@ func (h *Handler) GetTablesWithDetails(c *gin.Context) { utils.SuccessResponse(c, "Successfully fetched tables with details", tables) } -// SetProperties configures table level properties for optimization +// SetProperties configures the same optimization properties on multiple tables (one terminal batch). func (h *Handler) SetProperties(c *gin.Context) { - catalog, database, table, ok := h.requiredCatalogDatabaseTable(c) + catalog, database, ok := h.requiredCatalogAndDatabase(c) if !ok { return } - var req dto.SQLInput - if err := utils.BindAndValidate(c, &req); err != nil { - utils.ErrorResponse(c, utils.StatusFromBindError(err), "invalid request body for setting config in table properties", err) + var tableConfigs dto.OptimizationTableConfig + if err := utils.BindAndValidate(c, &tableConfigs); err != nil { + utils.ErrorResponse(c, utils.StatusFromBindError(err), "invalid request body for optimization table config", err) return } - result, err := h.opt.SetProperties(c.Request.Context(), catalog, database, table, req) + result, err := h.opt.SetProperties(c.Request.Context(), catalog, database, tableConfigs) if err != nil { utils.ErrorResponse(c, upstreamStatus(err), err.Error(), err) return } - utils.SuccessResponse(c, result.Message, result) -} - -func (h *Handler) requiredCatalogDatabaseTable(c *gin.Context) (string, string, string, bool) { - catalog := c.Param("catalog") - database := c.Param("database") - table := c.Param("table") - - if catalog == "" || database == "" || table == "" { - utils.ErrorResponse(c, badRequestStatusCode, "catalog, database, and table parameters are required", nil) - return "", "", "", false - } - - return catalog, database, table, true + utils.SuccessResponse(c, "Finished setting properties for selected tables", result) } func (h *Handler) requiredCatalogAndDatabase(c *gin.Context) (string, string, bool) { diff --git a/server/internal/models/dto/requests.go b/server/internal/models/dto/requests.go index c52e4342..ecb249d3 100644 --- a/server/internal/models/dto/requests.go +++ b/server/internal/models/dto/requests.go @@ -162,22 +162,19 @@ type LogInfo struct { Logs []string `json:"logs"` } -type SetTablePropertiesRequest struct { - Catalog string `json:"catalog"` - Database string `json:"database"` - Table string `json:"table"` - Properties map[string]string `json:"properties"` -} - // TerminalExecuteRequest represents the request body for terminal SQL execution type TerminalExecuteRequest struct { SQL string `json:"sql"` } -type SQLInput struct { - MinorCron *string `json:"minor_cron"` - MajorCron *string `json:"major_cron"` - FullCron *string `json:"full_cron"` - TargetFileSize *int64 `json:"target_file_size"` - EnabledForOptimization *string `json:"enabled_for_optimization"` +// OptimizationTableConfig configures optimization properties for multiple tables in one terminal session. +type OptimizationTableConfig struct { + Tables []string `json:"tables" binding:"required"` + SQLInput struct { + MinorCron *string `json:"minor_cron"` + MajorCron *string `json:"major_cron"` + FullCron *string `json:"full_cron"` + TargetFileSize *int64 `json:"target_file_size"` + EnabledForOptimization *string `json:"enabled_for_optimization"` + } `json:"sql_input"` } diff --git a/server/internal/models/dto/response.go b/server/internal/models/dto/response.go index b20c204c..496f2802 100644 --- a/server/internal/models/dto/response.go +++ b/server/internal/models/dto/response.go @@ -248,7 +248,6 @@ type OptimizationInfo struct { type TableProperties struct { SessionID string `json:"sessionId"` Success bool `json:"success"` - Message string `json:"message"` Logs []string `json:"logs,omitempty"` } diff --git a/server/internal/services/optimization/terminal.go b/server/internal/services/optimization/terminal.go index 731713d3..02935753 100644 --- a/server/internal/services/optimization/terminal.go +++ b/server/internal/services/optimization/terminal.go @@ -13,103 +13,89 @@ import ( "github.com/datazip-inc/olake-ui/server/internal/utils" ) -func (s *Service) SetProperties(ctx context.Context, catalog, database, table string, config dto.SQLInput) (*dto.TableProperties, error) { +func convertConfigToMap(config dto.OptimizationTableConfig) map[string]string { properties := make(map[string]string) - - if config.MinorCron != nil { - properties[constants.OptMinorCron] = *config.MinorCron + if config.SQLInput.MinorCron != nil { + properties[constants.OptMinorCron] = *config.SQLInput.MinorCron } - if config.MajorCron != nil { - properties[constants.OptMajorCron] = *config.MajorCron + if config.SQLInput.MajorCron != nil { + properties[constants.OptMajorCron] = *config.SQLInput.MajorCron } - if config.FullCron != nil { - properties[constants.OptFullCron] = *config.FullCron + if config.SQLInput.FullCron != nil { + properties[constants.OptFullCron] = *config.SQLInput.FullCron } - if config.EnabledForOptimization != nil { - properties[constants.OptEnableOptimization] = *config.EnabledForOptimization + if config.SQLInput.EnabledForOptimization != nil { + properties[constants.OptEnableOptimization] = *config.SQLInput.EnabledForOptimization } - if config.TargetFileSize != nil { - properties[constants.OptTargetFileSize] = utils.ConvertMBToBytes(*config.TargetFileSize) + if config.SQLInput.TargetFileSize != nil { + properties[constants.OptTargetFileSize] = utils.ConvertMBToBytes(*config.SQLInput.TargetFileSize) } + return properties +} - // sql query - sqlResult, err := s.SetTableProperties(ctx, dto.SetTablePropertiesRequest{ - Catalog: catalog, - Database: database, - Table: table, - Properties: properties, - }) - - if err != nil { - return nil, fmt.Errorf("failed to set optimization properties: %w", err) +func createAlterQuery(database, table string, properties map[string]string) string { + props := make([]string, 0, len(properties)) + for k, value := range properties { + props = append(props, fmt.Sprintf("'%s'='%s'", k, value)) } + propsJoined := strings.Join(props, ", ") - return sqlResult, nil + return fmt.Sprintf(constants.OptSQLCommand, database, table, propsJoined) + ";" } -// sets table properties using the SQL query -func (s *Service) SetTableProperties(ctx context.Context, req dto.SetTablePropertiesRequest) (*dto.TableProperties, error) { - var propsSQL []string - for key, value := range req.Properties { - propsSQL = append(propsSQL, fmt.Sprintf("'%s' = '%s'", key, value)) - } +// set properties for multiple tables using sql query +func (s *Service) SetProperties(ctx context.Context, catalog, database string, config dto.OptimizationTableConfig) (*dto.TableProperties, error) { + tables := config.Tables + properties := convertConfigToMap(config) - sql := fmt.Sprintf(constants.OptSQLCommand, req.Database, req.Table, strings.Join(propsSQL, ", ")) + alterTableQuery := make([]string, 0, len(tables)) + for _, tableName := range tables { + alterTableQuery = append(alterTableQuery, createAlterQuery(database, tableName, properties)) + } - // execute via Terminal API - path := fmt.Sprintf(constants.OptPathTerminalExecute, req.Catalog) + var sessionResult dto.TerminalSessionResponse requestBody := dto.TerminalExecuteRequest{ - SQL: sql, + SQL: strings.Join(alterTableQuery, "\n"), } - var sessionResult dto.TerminalSessionResponse - if err := s.DoInto(ctx, http.MethodPost, path, url.Values{}, requestBody, &sessionResult); err != nil { - return nil, fmt.Errorf("failed to execute ALTER TABLE for %s.%s.%s: %w", req.Catalog, req.Database, req.Table, err) + if err := s.DoInto(ctx, http.MethodPost, fmt.Sprintf(constants.OptPathTerminalExecute, catalog), url.Values{}, requestBody, &sessionResult); err != nil { + return nil, fmt.Errorf("failed to execute bulk ALTER TABLE for catalog %s, database %s: %w", catalog, database, err) } - // Poll for execution completion - logInfo, err := s.pollForCompletion(ctx, req.Catalog, sessionResult.SessionID) + logInfo, err := s.pollForCompletion(ctx, sessionResult.SessionID) + if err != nil { return nil, fmt.Errorf("failed to poll for completion: %w", err) } - // TODO: Fusion may return "Finished" even if the query fails (e.g., syntax error). - // Solution: validate execution status by checking logs for "Finished" vs "Failed". - success := logInfo.LogStatus == "Finished" - var message string - if success { - message = fmt.Sprintf("optimization sql command completed successfully with session ID: %s", sessionResult.SessionID) - } else { - message = fmt.Sprintf("optimization sql command failed with session ID: %s", sessionResult.SessionID) - } - + // TODO: Fusion may return "Finished" even if the query fails, but query logs will contain error message return &dto.TableProperties{ SessionID: sessionResult.SessionID, - Success: success, - Message: message, + Success: logInfo.LogStatus == "Finished", Logs: logInfo.Logs, }, nil } // pollForCompletion polls the terminal API for SQL execution completion -func (s *Service) pollForCompletion(ctx context.Context, _, sessionID string) (*dto.LogInfo, error) { +func (s *Service) pollForCompletion(ctx context.Context, sessionID string) (*dto.LogInfo, error) { path := fmt.Sprintf(constants.OptPathTerminalLogs, sessionID) - timeoutCtx, cancel := context.WithTimeout(ctx, constants.OptMaxTimeout) + timeoutCtx, cancel := context.WithTimeout(ctx, constants.OptSessionTimeout) defer cancel() - ticker := time.NewTicker(constants.OptQueryResultPollTime) - defer ticker.Stop() - for { select { case <-timeoutCtx.Done(): return nil, fmt.Errorf("timeout waiting for SQL execution to complete") - case <-ticker.C: + case <-time.After(1 * time.Second): var logInfo dto.LogInfo if err := s.DoInto(ctx, http.MethodGet, path, url.Values{}, nil, &logInfo); err != nil { return nil, fmt.Errorf("failed to get logs for session %s: %w", sessionID, err) } + if logInfo.LogStatus == "Running" { + continue + } + return &logInfo, nil } } diff --git a/server/routes/router.go b/server/routes/router.go index 8c845d58..df4a4e27 100644 --- a/server/routes/router.go +++ b/server/routes/router.go @@ -92,7 +92,7 @@ func RegisterRoutes(engine *gin.Engine, h *handlers.Handler) { opt.DELETE("/catalog/:catalog", optHandler.DeleteCatalog) // terminal: cron, enable/disable optimization - opt.PUT("/:catalog/:database/:table/config", optHandler.SetProperties) + opt.PUT("/:catalog/:database/tables/config", optHandler.SetProperties) // tables: view opt.GET("/:catalog/:database/tables", optHandler.GetTablesWithDetails) diff --git a/ui/package.json b/ui/package.json index aab12a13..12171ee2 100644 --- a/ui/package.json +++ b/ui/package.json @@ -4,13 +4,14 @@ "version": "0.0.0", "type": "module", "resolutions": { - "form-data": "4.0.4", + "form-data": "4.0.6", "lodash": ">=4.18.1", "lodash-es": ">=4.18.1" }, "pnpm": { "overrides": { - "fast-uri": "3.1.2" + "fast-uri": "3.1.2", + "form-data": "4.0.6" } }, "scripts": { diff --git a/ui/pnpm-lock.yaml b/ui/pnpm-lock.yaml index c1f0737b..814f606e 100644 --- a/ui/pnpm-lock.yaml +++ b/ui/pnpm-lock.yaml @@ -5,7 +5,7 @@ settings: excludeLinksFromLockfile: false overrides: - form-data: 4.0.4 + form-data: 4.0.6 lodash: '>=4.18.1' lodash-es: '>=4.18.1' fast-uri: 3.1.2 @@ -678,56 +678,67 @@ packages: resolution: {integrity: sha512-9VlPY/BN3AgbukfVHAB8zNFWB/lKEuvzRo1NKev0Po8sYFKx0i+AQlCYftgEjcL43F2h9Ui1ZSdVBc4En/sP2w==} cpu: [arm] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm-musleabihf@4.50.2': resolution: {integrity: sha512-+GdKWOvsifaYNlIVf07QYan1J5F141+vGm5/Y8b9uCZnG/nxoGqgCmR24mv0koIWWuqvFYnbURRqw1lv7IBINw==} cpu: [arm] os: [linux] + libc: [musl] '@rollup/rollup-linux-arm64-gnu@4.50.2': resolution: {integrity: sha512-df0Eou14ojtUdLQdPFnymEQteENwSJAdLf5KCDrmZNsy1c3YaCNaJvYsEUHnrg+/DLBH612/R0xd3dD03uz2dg==} cpu: [arm64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm64-musl@4.50.2': resolution: {integrity: sha512-iPeouV0UIDtz8j1YFR4OJ/zf7evjauqv7jQ/EFs0ClIyL+by++hiaDAfFipjOgyz6y6xbDvJuiU4HwpVMpRFDQ==} cpu: [arm64] os: [linux] + libc: [musl] '@rollup/rollup-linux-loong64-gnu@4.50.2': resolution: {integrity: sha512-OL6KaNvBopLlj5fTa5D5bau4W82f+1TyTZRr2BdnfsrnQnmdxh4okMxR2DcDkJuh4KeoQZVuvHvzuD/lyLn2Kw==} cpu: [loong64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-ppc64-gnu@4.50.2': resolution: {integrity: sha512-I21VJl1w6z/K5OTRl6aS9DDsqezEZ/yKpbqlvfHbW0CEF5IL8ATBMuUx6/mp683rKTK8thjs/0BaNrZLXetLag==} cpu: [ppc64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-riscv64-gnu@4.50.2': resolution: {integrity: sha512-Hq6aQJT/qFFHrYMjS20nV+9SKrXL2lvFBENZoKfoTH2kKDOJqff5OSJr4x72ZaG/uUn+XmBnGhfr4lwMRrmqCQ==} cpu: [riscv64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-riscv64-musl@4.50.2': resolution: {integrity: sha512-82rBSEXRv5qtKyr0xZ/YMF531oj2AIpLZkeNYxmKNN6I2sVE9PGegN99tYDLK2fYHJITL1P2Lgb4ZXnv0PjQvw==} cpu: [riscv64] os: [linux] + libc: [musl] '@rollup/rollup-linux-s390x-gnu@4.50.2': resolution: {integrity: sha512-4Q3S3Hy7pC6uaRo9gtXUTJ+EKo9AKs3BXKc2jYypEcMQ49gDPFU2P1ariX9SEtBzE5egIX6fSUmbmGazwBVF9w==} cpu: [s390x] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-gnu@4.50.2': resolution: {integrity: sha512-9Jie/At6qk70dNIcopcL4p+1UirusEtznpNtcq/u/C5cC4HBX7qSGsYIcG6bdxj15EYWhHiu02YvmdPzylIZlA==} cpu: [x64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-musl@4.50.2': resolution: {integrity: sha512-HPNJwxPL3EmhzeAnsWQCM3DcoqOz3/IC6de9rWfGR8ZCuEHETi9km66bH/wG3YH0V3nyzyFEGUZeL5PKyy4xvw==} cpu: [x64] os: [linux] + libc: [musl] '@rollup/rollup-openharmony-arm64@4.50.2': resolution: {integrity: sha512-nMKvq6FRHSzYfKLHZ+cChowlEkR2lj/V0jYj9JnGUVPL2/mIeFGmVM2mLaFeNa5Jev7W7TovXqXIG2d39y1KYA==} @@ -1466,8 +1477,8 @@ packages: resolution: {integrity: sha512-gIXjKqtFuWEgzFRJA9WCQeSJLZDjgJUOMCMzxtvFq/37KojM1BFGufqsCy0r4qSQmYLsZYMeyRqzIWOMup03sw==} engines: {node: '>=14'} - form-data@4.0.4: - resolution: {integrity: sha512-KrGhL9Q4zjj0kiUt5OO4Mr/A/jlI2jDYs5eHBpYHPcBEVSiipAvn2Ko2HnPe20rmcuuvMHNdZFp+4IlGTMF0Ow==} + form-data@4.0.6: + resolution: {integrity: sha512-vKatAh4SlVfgbv+YtmhiRjhEMJsYpsG1Y2rMQtR+SVSbytsSD1YGzDIcrAJmdFec88u/+VoGmxnl+80gL1tRCQ==} engines: {node: '>= 6'} fraction.js@4.3.7: @@ -1567,6 +1578,10 @@ packages: resolution: {integrity: sha512-0hJU9SCPvmMzIBdZFqNPXWa6dqh7WdH0cII9y+CyS8rG3nL48Bclra9HmKhVVUHyPWNH5Y7xDwAB7bfgSjkUMQ==} engines: {node: '>= 0.4'} + hasown@2.0.4: + resolution: {integrity: sha512-T2UbfbBEF32wiepXIsMlTW9+dDYC6wMh/t/vYA4tuOMKqWz/n3vr1NFSxQiyP+zk2mXsoMA/i/7qV6LKut1t1A==} + engines: {node: '>= 0.4'} + hast-util-to-jsx-runtime@2.3.6: resolution: {integrity: sha512-zl6s8LwNyo1P9uw+XJGvZtdFF1GdAkOg8ujOw+4Pyb76874fLps4ueHXDhXWdk6YHQ6OgUtinliG7RsYvCbbBg==} @@ -3899,7 +3914,7 @@ snapshots: axios@1.16.0: dependencies: follow-redirects: 1.16.0 - form-data: 4.0.4 + form-data: 4.0.6 proxy-from-env: 2.1.0 transitivePeerDependencies: - debug @@ -4209,7 +4224,7 @@ snapshots: es-errors: 1.3.0 get-intrinsic: 1.3.0 has-tostringtag: 1.0.2 - hasown: 2.0.2 + hasown: 2.0.4 es-shim-unscopables@1.1.0: dependencies: @@ -4474,12 +4489,12 @@ snapshots: cross-spawn: 7.0.6 signal-exit: 4.1.0 - form-data@4.0.4: + form-data@4.0.6: dependencies: asynckit: 0.4.0 combined-stream: 1.0.8 es-set-tostringtag: 2.1.0 - hasown: 2.0.2 + hasown: 2.0.4 mime-types: 2.1.35 fraction.js@4.3.7: {} @@ -4581,6 +4596,10 @@ snapshots: dependencies: function-bind: 1.1.2 + hasown@2.0.4: + dependencies: + function-bind: 1.1.2 + hast-util-to-jsx-runtime@2.3.6: dependencies: '@types/estree': 1.0.8 diff --git a/ui/src/config/apiConfig.ts b/ui/src/config/apiConfig.ts index 8096142c..beee952d 100644 --- a/ui/src/config/apiConfig.ts +++ b/ui/src/config/apiConfig.ts @@ -26,8 +26,8 @@ export const API_CONFIG = { ? `/api/opt/v1/catalog/${encodeURIComponent(catalogName)}` : `/api/opt/v1/catalog`, CATALOG_SPEC: "/api/opt/v1/catalog/resources/spec", - TABLE_CONFIG: (catalog: string, database: string, tableName: string) => - `/api/opt/v1/${encodeURIComponent(catalog)}/${encodeURIComponent(database)}/${encodeURIComponent(tableName)}`, + TABLE_CONFIG: (catalog: string, database: string) => + `/api/opt/v1/${encodeURIComponent(catalog)}/${encodeURIComponent(database)}/tables/config`, TABLE: (catalog: string, database: string, tableName: string) => `/api/opt/v1/tables/catalogs/${encodeURIComponent(catalog)}/dbs/${encodeURIComponent(database)}/tables/${encodeURIComponent(tableName)}`, TABLES: (catalog: string, database: string) => diff --git a/ui/src/modules/maintenance/features/tables/services/tableService.ts b/ui/src/modules/maintenance/features/tables/services/tableService.ts index da54b24c..c76dc353 100644 --- a/ui/src/modules/maintenance/features/tables/services/tableService.ts +++ b/ui/src/modules/maintenance/features/tables/services/tableService.ts @@ -11,6 +11,7 @@ import type { TableMetricsApiResponse, UpdateTableCronApiRequest, UpdateTableConfigApiResponse, + UpdateTablesConfigApiRequest, } from "../types" export const tableService = { @@ -86,9 +87,14 @@ export const tableService = { payload: UpdateTableCronApiRequest, ): Promise => { try { + const request: UpdateTablesConfigApiRequest = { + tables: [tableName], + sql_input: payload, + } + const response = await api.put( - `${API_CONFIG.ENDPOINTS.OPT.TABLE_CONFIG(catalog, database, tableName)}/config`, - payload, + API_CONFIG.ENDPOINTS.OPT.TABLE_CONFIG(catalog, database), + request, { disableErrorNotification: true }, ) return response.data diff --git a/ui/src/modules/maintenance/features/tables/types/tableTypes.ts b/ui/src/modules/maintenance/features/tables/types/tableTypes.ts index a5554f12..a7974b05 100644 --- a/ui/src/modules/maintenance/features/tables/types/tableTypes.ts +++ b/ui/src/modules/maintenance/features/tables/types/tableTypes.ts @@ -99,6 +99,11 @@ export interface UpdateTableCronApiRequest { target_file_size?: number } +export interface UpdateTablesConfigApiRequest { + tables: string[] + sql_input: UpdateTableCronApiRequest +} + // Frontend Domain Types export type FilterKey = "all" | "olake" | "external" @@ -191,8 +196,9 @@ export interface CancelRunRequest { } export interface UpdateTableConfigApiResponse { + sessionId?: string success: boolean - message: string + message?: string logs?: string[] }