Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/e2e/.copier-answers.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Changes here will be overwritten by Copier
_commit: 2025.09.22
_commit: 2025.12.17
_src_path: gh:zenml-io/template-e2e-batch
data_quality_checks: true
email: [email protected]
Expand Down
2 changes: 1 addition & 1 deletion examples/e2e/steps/etl/inference_data_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def inference_data_preprocessor(
# artificially adding `target` column to avoid Pipeline issues
dataset_inf[target] = pd.Series([1] * dataset_inf.shape[0])
dataset_inf = preprocess_pipeline.transform(dataset_inf)
dataset_inf.drop(columns=["target"], inplace=True)
dataset_inf.drop(columns=[target], inplace=True)
### YOUR CODE ENDS HERE ###

return dataset_inf
10 changes: 7 additions & 3 deletions examples/e2e/steps/etl/train_data_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,13 @@ def train_data_preprocessor(
if normalize:
# Normalize the data
preprocess_pipeline.steps.append(("normalize", MinMaxScaler()))
preprocess_pipeline.steps.append(
("cast", DataFrameCaster(dataset_trn.columns))
)
# Ensure the DataFrameCaster uses a column list consistent with any dropping.
# This avoids shape/column mismatches when `drop_columns` is enabled.
caster_columns = list(dataset_trn.columns)
if drop_columns:
drop_set = set(drop_columns)
caster_columns = [c for c in caster_columns if c not in drop_set]
preprocess_pipeline.steps.append(("cast", DataFrameCaster(caster_columns)))
dataset_trn = preprocess_pipeline.fit_transform(dataset_trn)
dataset_tst = preprocess_pipeline.transform(dataset_tst)
### YOUR CODE ENDS HERE ###
Expand Down
29 changes: 18 additions & 11 deletions examples/e2e/utils/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,48 @@
#


from typing import Union
from typing import Sequence, Union

import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin


class NADropper:
class NADropper(TransformerMixin, BaseEstimator):
"""Support class to drop NA values in sklearn Pipeline."""

def fit(self, *args, **kwargs):
def fit(self, X, y=None):
self.is_fitted_ = True
return self

def transform(self, X: Union[pd.DataFrame, pd.Series]):
return X.dropna()


class ColumnsDropper:
class ColumnsDropper(TransformerMixin, BaseEstimator):
"""Support class to drop specific columns in sklearn Pipeline."""

def __init__(self, columns):
self.columns = columns
def __init__(self, columns: Sequence[str]):
self.columns = list(columns)

def fit(self, *args, **kwargs):
def fit(self, X, y=None):
self.is_fitted_ = True
return self

def transform(self, X: Union[pd.DataFrame, pd.Series]):
return X.drop(columns=self.columns)


class DataFrameCaster:
class DataFrameCaster(TransformerMixin, BaseEstimator):
"""Support class to cast type back to pd.DataFrame in sklearn Pipeline."""

def __init__(self, columns):
self.columns = columns
def __init__(self, columns: Sequence[str]):
self.columns = list(columns)

def fit(self, *args, **kwargs):
def fit(self, X, y=None):
# Set fitted attributes so sklearn can recognize this transformer as fitted.
# (newer sklearn calls check_is_fitted on the Pipeline's final step)
self.n_features_in_ = X.shape[1] if hasattr(X, "shape") else None
self.is_fitted_ = True
return self

def transform(self, X):
Expand Down
2 changes: 1 addition & 1 deletion examples/mlops_starter/.copier-answers.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Changes here will be overwritten by Copier
_commit: 2024.11.28
_commit: 2025.12.17
_src_path: gh:zenml-io/template-starter
email: [email protected]
full_name: ZenML GmbH
Expand Down
4 changes: 2 additions & 2 deletions examples/mlops_starter/quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1113,8 +1113,8 @@
"This was just the tip of the iceberg of what ZenML can do; check out the [**docs**](https://docs.zenml.io/) to learn more\n",
"about the capabilities of ZenML. For example, you might want to:\n",
"\n",
"- [Deploy ZenML](https://docs.zenml.io/user-guide/production-guide/connect-deployed-zenml) to collaborate with your colleagues.\n",
"- Run the same pipeline on a [cloud MLOps stack in production](https://docs.zenml.io/user-guide/production-guide/cloud-stack).\n",
"- [Deploy ZenML](https://docs.zenml.io/user-guides/production-guide/connect-deployed-zenml) to collaborate with your colleagues.\n",
"- Run the same pipeline on a [cloud MLOps stack in production](https://docs.zenml.io/user-guides/production-guide/cloud-stack).\n",
"- Track your metrics in an experiment tracker like [MLflow](https://docs.zenml.io/stacks-and-components/component-guide/experiment-trackers/mlflow).\n",
"\n",
"## What next?\n",
Expand Down
10 changes: 7 additions & 3 deletions examples/mlops_starter/steps/data_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,13 @@ def data_preprocessor(
if normalize:
# Normalize the data
preprocess_pipeline.steps.append(("normalize", MinMaxScaler()))
preprocess_pipeline.steps.append(
("cast", DataFrameCaster(dataset_trn.columns))
)
# Ensure the DataFrameCaster uses a column list consistent with any dropping.
# This avoids shape/column mismatches when `drop_columns` is enabled.
caster_columns = list(dataset_trn.columns)
if drop_columns:
drop_set = set(drop_columns)
caster_columns = [c for c in caster_columns if c not in drop_set]
preprocess_pipeline.steps.append(("cast", DataFrameCaster(caster_columns)))
dataset_trn = preprocess_pipeline.fit_transform(dataset_trn)
dataset_tst = preprocess_pipeline.transform(dataset_tst)

Expand Down
29 changes: 18 additions & 11 deletions examples/mlops_starter/utils/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,48 @@
# limitations under the License.
#

from typing import Union
from typing import Sequence, Union

import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin


class NADropper:
class NADropper(TransformerMixin, BaseEstimator):
"""Support class to drop NA values in sklearn Pipeline."""

def fit(self, *args, **kwargs):
def fit(self, X, y=None):
self.is_fitted_ = True
return self

def transform(self, X: Union[pd.DataFrame, pd.Series]):
return X.dropna()


class ColumnsDropper:
class ColumnsDropper(TransformerMixin, BaseEstimator):
"""Support class to drop specific columns in sklearn Pipeline."""

def __init__(self, columns):
self.columns = columns
def __init__(self, columns: Sequence[str]):
self.columns = list(columns)

def fit(self, *args, **kwargs):
def fit(self, X, y=None):
self.is_fitted_ = True
return self

def transform(self, X: Union[pd.DataFrame, pd.Series]):
return X.drop(columns=self.columns)


class DataFrameCaster:
class DataFrameCaster(TransformerMixin, BaseEstimator):
"""Support class to cast type back to pd.DataFrame in sklearn Pipeline."""

def __init__(self, columns):
self.columns = columns
def __init__(self, columns: Sequence[str]):
self.columns = list(columns)

def fit(self, *args, **kwargs):
def fit(self, X, y=None):
# Set fitted attributes so sklearn can recognize this transformer as fitted.
# (newer sklearn calls check_is_fitted on the Pipeline's final step)
self.n_features_in_ = X.shape[1] if hasattr(X, "shape") else None
self.is_fitted_ = True
return self

def transform(self, X):
Expand Down
22 changes: 22 additions & 0 deletions helm/templates/_environment.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ backup_strategy: {{ .ZenML.database.backupStrategy | quote }}
backup_database: {{ .ZenML.database.backupDatabase | quote }}
{{- else if eq .ZenML.database.backupStrategy "dump-file" }}
backup_directory: "/backups"
{{- else if eq .ZenML.database.backupStrategy "mydumper" }}
backup_directory: "/backups"
{{- if .ZenML.database.mydumperThreads }}
mydumper_threads: {{ .ZenML.database.mydumperThreads | quote }}
{{- end }}
{{- if .ZenML.database.mydumperCompress }}
mydumper_compress: {{ .ZenML.database.mydumperCompress | quote }}
{{- end }}
{{- if .ZenML.database.mydumperExtraArgs }}
mydumper_extra_args: {{ .ZenML.database.mydumperExtraArgs | toJson | quote }}
{{- end }}
{{- if .ZenML.database.myloaderThreads }}
myloader_threads: {{ .ZenML.database.myloaderThreads | quote }}
{{- end }}
{{- if .ZenML.database.myloaderExtraArgs }}
myloader_extra_args: {{ .ZenML.database.myloaderExtraArgs | toJson | quote }}
{{- end }}
{{- else if eq .ZenML.database.backupStrategy "custom" }}
custom_backup_engine: {{ .ZenML.database.customBackupEngine | quote }}
{{- if .ZenML.database.customBackupEngineConfig }}
custom_backup_engine_config: {{ .ZenML.database.customBackupEngineConfig | toJson | quote }}
{{- end }}
{{- end }}
{{- end }}
{{- if .ZenML.database.poolSize }}
Expand Down
4 changes: 2 additions & 2 deletions helm/templates/server-db-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ spec:
{{- else }}
emptyDir: {}
{{- end }}
{{- if eq .Values.zenml.database.backupStrategy "dump-file" }}
{{- if or (eq .Values.zenml.database.backupStrategy "dump-file") (eq .Values.zenml.database.backupStrategy "mydumper") }}
# define a volume that will hold a backup of the database
- name: db-backup
# if a storage PVC is configured, then use it
Expand Down Expand Up @@ -136,7 +136,7 @@ spec:
volumeMounts:
- name: zenml-config
mountPath: /zenml/.zenconfig
{{- if eq .Values.zenml.database.backupStrategy "dump-file" }}
{{- if or (eq .Values.zenml.database.backupStrategy "dump-file") (eq .Values.zenml.database.backupStrategy "mydumper") }}
- name: db-backup
mountPath: /backups
{{- end }}
Expand Down
2 changes: 1 addition & 1 deletion helm/templates/server-db-pvc.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{- if and (eq .Values.zenml.database.backupStrategy "dump-file") .Values.zenml.database.backupPVStorageSize }}
{{- if and (or (eq .Values.zenml.database.backupStrategy "dump-file") (eq .Values.zenml.database.backupStrategy "mydumper")) .Values.zenml.database.backupPVStorageSize }}
{{- $pvc_name := printf "%s-db-backup" (include "zenml.fullname" .) -}}
{{- $pvc := (lookup "v1" "PersistentVolumeClaim" .Release.Namespace $pvc_name) }}
{{- if not $pvc }}
Expand Down
27 changes: 25 additions & 2 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,34 @@ zenml:
# database URL must have permissions to manage (create, drop, and
# modify) the backup database in addition to the main
# database.
backupStrategy: in-memory
# mydumper - the database is backed up using mydumper/myloader. This
# requires the mydumper and myloader utilities to be installed
# in the ZenML server container. The `mydumperThreads`,
# `mydumperCompress`, `mydumperExtraArgs`, `myloaderThreads`,
# and `myloaderExtraArgs` options can be used to configure the
# backup and restore processes.
# custom - use a custom backup engine. This requires the `customBackupEngine`
# option to be set to the class path of the custom backup engine.
# The class should extend from the `zenml.zen_stores.migrations.backup.base_backup_engine.BaseBackupEngine`
# base class and be importable from the container image that you
# are using for the ZenML server. Arguments for the custom backup engine
# can be passed using the `customBackupEngineConfig` option.
#
# backupStrategy: in-memory
# backupPVStorageClass: standard
# backupPVStorageSize: 1Gi
# backupDatabase: "zenml_backup"

#
# mydumperThreads: 4
# mydumperCompress: true
# mydumperExtraArgs: []
# myloaderThreads: 4
# myloaderExtraArgs: []
#
# customBackupEngine: my.custom.backup.engine.MyCustomBackupEngine
# customBackupEngineConfig:
# arg1: value1
# arg2: value2

# Secrets store settings. This is used to store centralized secrets.
secretsStore:
Expand Down
16 changes: 10 additions & 6 deletions src/zenml/cli/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,14 @@ def backup_database(
store_config, skip_default_registrations=True, skip_migrations=True
)
assert isinstance(store, SqlZenStore)
msg, location = store.backup_database(
backup_engine = store.initialize_database_backup_engine(
strategy=DatabaseBackupStrategy(strategy) if strategy else None,
location=location,
overwrite=overwrite,
)
cli_utils.declare(f"Database was backed up to {msg}.")
backup_engine.backup_database(overwrite=overwrite)
cli_utils.declare(
f"Database was backed up to {backup_engine.backup_location}."
)
else:
cli_utils.warning(
"Cannot backup database while connected to a ZenML server."
Expand Down Expand Up @@ -822,12 +824,14 @@ def restore_database(
store_config, skip_default_registrations=True, skip_migrations=True
)
assert isinstance(store, SqlZenStore)
store.restore_database(
backup_engine = store.initialize_database_backup_engine(
strategy=DatabaseBackupStrategy(strategy) if strategy else None,
location=location,
cleanup=cleanup,
)
cli_utils.declare("Database restore finished.")
backup_engine.restore_database(cleanup=cleanup)
cli_utils.declare(
f"Database was restored from {backup_engine.backup_location}."
)
else:
cli_utils.warning(
"Cannot restore database while connected to a ZenML server."
Expand Down
4 changes: 4 additions & 0 deletions src/zenml/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ class DatabaseBackupStrategy(StrEnum):
DUMP_FILE = "dump-file"
# Create a backup of the database in the remote database service
DATABASE = "database"
# Use mydumper/myloader for parallel backup/restore (MySQL only)
MYDUMPER = "mydumper"
# Use a custom backup engine
CUSTOM = "custom"


class PluginType(StrEnum):
Expand Down
14 changes: 14 additions & 0 deletions src/zenml/zen_stores/migrations/backup/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) ZenML GmbH 2025. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Database backup and restore utilities."""
Loading