From 2baf5bc0f79587f787193614935de2e00dab7227 Mon Sep 17 00:00:00 2001 From: Zach Price Date: Tue, 29 Apr 2025 09:58:16 -0400 Subject: [PATCH 1/3] Initial CI/CD Setup --- .github/workflows/k8s.yml | 97 +++++++++++++++++++++++++ Dockerfile | 24 ++++++ helm/.helmignore | 23 ++++++ helm/Chart.yaml | 24 ++++++ helm/ci/sync_dry_run.yaml | 13 ++++ helm/templates/_helpers.tpl | 62 ++++++++++++++++ helm/templates/serviceaccount.yaml | 13 ++++ helm/templates/statefulsets.yaml | 84 +++++++++++++++++++++ helm/values.yaml | 77 ++++++++++++++++++++ pyproject.toml | 6 +- src/metadata_migrate_sync/__init__.py | 0 src/metadata_migrate_sync/app.py | 75 ++++++++----------- src/metadata_migrate_sync/database.py | 6 +- src/metadata_migrate_sync/globus.py | 87 +++++----------------- src/metadata_migrate_sync/ingest.py | 21 +++--- src/metadata_migrate_sync/migrate.py | 13 +--- src/metadata_migrate_sync/provenance.py | 48 +----------- src/metadata_migrate_sync/query.py | 23 +++--- src/metadata_migrate_sync/sync.py | 39 +++++----- src/metadata_migrate_sync/util.py | 9 +-- 20 files changed, 524 insertions(+), 220 deletions(-) create mode 100644 .github/workflows/k8s.yml create mode 100644 Dockerfile create mode 100644 helm/.helmignore create mode 100644 helm/Chart.yaml create mode 100644 helm/ci/sync_dry_run.yaml create mode 100644 helm/templates/_helpers.tpl create mode 100644 helm/templates/serviceaccount.yaml create mode 100644 helm/templates/statefulsets.yaml create mode 100644 helm/values.yaml create mode 100644 src/metadata_migrate_sync/__init__.py diff --git a/.github/workflows/k8s.yml b/.github/workflows/k8s.yml new file mode 100644 index 0000000..7958650 --- /dev/null +++ b/.github/workflows/k8s.yml @@ -0,0 +1,97 @@ +name: K8S Build & Test + +on: + pull_request: + workflow_dispatch: + push: + branches: + - "main" + tags: + - "v*" + +permissions: {} + +jobs: + container: + name: Build & publish container + runs-on: ubuntu-latest + permissions: + packages: write + steps: + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + - uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - uses: docker/metadata-action@v5 + id: metadata + with: + images: ghcr.io/esgf2-us/metadata-migrate-sync + tags: | + type=ref,event=pr + type=ref,event=tag + - uses: docker/build-push-action@v4 + with: + cache-from: type=gha + cache-to: type=gha,mode=max + file: Dockerfile + push: true + tags: ${{ steps.metadata.outputs.tags }} + labels: ${{ steps.metadata.outputs.labels }} + + helm: + name: Publish Helm chart + runs-on: ubuntu-latest + permissions: + packages: write + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + - name: Install jq + run: | + sudo apt-get install --yes jq + - name: Install yq + run: | + pip install yq + - name: Generate SemVer + id: semantic-version + run: | + CHART_VERSION=$(yq -r '.version' helm/Chart.yaml) + LOCAL_SEGMENT=+pr-${{ github.event.pull_request.number }} + GENERATED_VERSION=${CHART_VERSION}${LOCAL_SEGMENT} + yq -Y -i ".version = \"$GENERATED_VERSION\"" helm/Chart.yaml + echo "generated-semver=$GENERATED_VERSION" >> $GITHUB_OUTPUT + - name: Chart | Push + uses: appany/helm-oci-chart-releaser@v0.4.2 + with: + name: esgf-sync + repository: esgf2-us + tag: ${{ steps.semantic-version.outputs.generated-semver }} + path: helm + registry: ghcr.io + registry_username: ${{ github.actor }} + registry_password: ${{ secrets.GITHUB_TOKEN }} + update_dependencies: "true" + + test: + name: Test deployment + runs-on: ubuntu-latest + needs: [container, helm] + steps: + - uses: actions/checkout@v4 + - name: Start minikube + uses: medyagh/setup-minikube@latest + - name: Set up Helm + uses: azure/setup-helm@v4.2.0 + - name: Install Helm Chart + run: | + helm install test oci://ghcr.io/esgf2-us/esgf-sync \ + --version=${{ needs.helm.outputs.generated-semver }} \ + --set image.tag=pr-${{ github.event.pull_request.number }} \ + -f helm/ci/sync_dry_run.yaml \ + --debug \ + --wait + + kubectl get pods diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d99d257 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ +FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim + +# Install the project into `/app` +WORKDIR /app + +# Enable bytecode compilation +ENV UV_COMPILE_BYTECODE=1 + +# No need for a venv in a container +ENV UV_PROJECT_ENVIRONMENT=/usr/local + +# Install the dependencies +COPY pyproject.toml uv.lock /app/ +RUN uv sync --frozen --no-dev --no-cache --no-editable --no-install-project + +# Install the app separately to avoid rebuilding the image when the app changes +COPY src/ README.md /app/ +RUN uv sync --frozen --no-dev --no-cache --no-editable + +# Reset the entrypoint, don't invoke `uv` +ENTRYPOINT ["esgf15mms"] + +# Run the application +# CMD ["esgf15mms"] \ No newline at end of file diff --git a/helm/.helmignore b/helm/.helmignore new file mode 100644 index 0000000..0e8a0eb --- /dev/null +++ b/helm/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/helm/Chart.yaml b/helm/Chart.yaml new file mode 100644 index 0000000..da3e1bc --- /dev/null +++ b/helm/Chart.yaml @@ -0,0 +1,24 @@ +apiVersion: v2 +name: esgf-sync +description: A Helm chart for Kubernetes + +# A chart can be either an 'application' or a 'library' chart. +# +# Application charts are a collection of templates that can be packaged into versioned archives +# to be deployed. +# +# Library charts provide useful utilities or functions for the chart developer. They're included as +# a dependency of application charts to inject those utilities and functions into the rendering +# pipeline. Library charts do not define any templates and therefore cannot be deployed. +type: application + +# This is the chart version. This version number should be incremented each time you make changes +# to the chart and its templates, including the app version. +# Versions are expected to follow Semantic Versioning (https://semver.org/) +version: 0.1.0 + +# This is the version number of the application being deployed. This version number should be +# incremented each time you make changes to the application. Versions are not expected to +# follow Semantic Versioning. They should reflect the version the application is using. +# It is recommended to use it with quotes. +appVersion: "1.16.0" diff --git a/helm/ci/sync_dry_run.yaml b/helm/ci/sync_dry_run.yaml new file mode 100644 index 0000000..3638ade --- /dev/null +++ b/helm/ci/sync_dry_run.yaml @@ -0,0 +1,13 @@ +instances: + cmip6plus: + command: |- + while true; do + esgf15mms sync stage backup CMIP6Plus --prod --dry-run --start-time 2025-04-30 --work-dir /data + sleep 5m + done + drcdp: + command: |- + while true; do + esgf15mms sync stage backup DRCDP --prod --dry-run --start-time 2025-04-30 --work-dir /data + sleep 5m + done \ No newline at end of file diff --git a/helm/templates/_helpers.tpl b/helm/templates/_helpers.tpl new file mode 100644 index 0000000..66ee10a --- /dev/null +++ b/helm/templates/_helpers.tpl @@ -0,0 +1,62 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "esgf-sync.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "esgf-sync.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- if contains $name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "esgf-sync.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "esgf-sync.labels" -}} +helm.sh/chart: {{ include "esgf-sync.chart" . }} +{{ include "esgf-sync.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "esgf-sync.selectorLabels" -}} +app.kubernetes.io/name: {{ include "esgf-sync.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* +Create the name of the service account to use +*/}} +{{- define "esgf-sync.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "esgf-sync.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} diff --git a/helm/templates/serviceaccount.yaml b/helm/templates/serviceaccount.yaml new file mode 100644 index 0000000..4536a26 --- /dev/null +++ b/helm/templates/serviceaccount.yaml @@ -0,0 +1,13 @@ +{{- if .Values.serviceAccount.create -}} +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ include "esgf-sync.serviceAccountName" . }} + labels: + {{- include "esgf-sync.labels" . | nindent 4 }} + {{- with .Values.serviceAccount.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +automountServiceAccountToken: {{ .Values.serviceAccount.automount }} +{{- end }} diff --git a/helm/templates/statefulsets.yaml b/helm/templates/statefulsets.yaml new file mode 100644 index 0000000..4a9aae4 --- /dev/null +++ b/helm/templates/statefulsets.yaml @@ -0,0 +1,84 @@ +{{- range $name, $spec := .Values.instances }} +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: {{ include "esgf-sync.fullname" $ }}-{{ $name }} + labels: + {{- include "esgf-sync.labels" $ | nindent 4 }} + app.kubernetes.io/component: {{ $name }} +spec: + replicas: 1 + selector: + matchLabels: + {{- include "esgf-sync.selectorLabels" $ | nindent 6 }} + app.kubernetes.io/component: {{ $name }} + template: + metadata: + {{- with $.Values.podAnnotations }} + annotations: + {{- toYaml . | nindent 8 }} + {{- end }} + labels: + {{- include "esgf-sync.labels" $ | nindent 8 }} + app.kubernetes.io/component: {{ $name }} + {{- with $.Values.podLabels }} + {{- toYaml . | nindent 8 }} + {{- end }} + spec: + {{- with $.Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + serviceAccountName: {{ include "esgf-sync.serviceAccountName" $ }} + {{- with $.Values.podSecurityContext }} + securityContext: + {{- toYaml . | nindent 8 }} + {{- end }} + containers: + - name: {{ $name }} + env: + - name: GLOBUS_CLIENT_ID + value: {{ $.Values.GLOBUS_CLIENT_ID | quote }} + - name: GLOBUS_CLIENT_SECRET + value: {{ $.Values.GLOBUS_CLIENT_SECRET | quote }} + command: ["/bin/sh", "-c"] + args: + - {{ $spec.command | quote }} + {{- with $.Values.securityContext }} + securityContext: + {{- toYaml . | nindent 12 }} + {{- end }} + image: "{{ $.Values.image.repository }}:{{ $.Values.image.tag | default $.Chart.AppVersion }}" + imagePullPolicy: {{ $.Values.image.pullPolicy }} + {{- with $.Values.resources }} + resources: + {{- toYaml . | nindent 12 }} + {{- end }} + volumeMounts: + - name: {{ include "esgf-sync.fullname" $ }}-{{ $name }} + mountPath: /data + {{- with $.Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with $.Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with $.Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} + volumeClaimTemplates: + - metadata: + name: {{ include "esgf-sync.fullname" $ }}-{{ $name }} + labels: + {{- include "esgf-sync.labels" $ | nindent 8 }} + app.kubernetes.io/component: {{ $name }} + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 5G +--- +{{- end }} \ No newline at end of file diff --git a/helm/values.yaml b/helm/values.yaml new file mode 100644 index 0000000..1d044dd --- /dev/null +++ b/helm/values.yaml @@ -0,0 +1,77 @@ +# Default values for esgf-sync. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +GLOBUS_CLIENT_ID: +GLOBUS_CLIENT_SECRET: + +instances: + # 1 StatefulSet will be created for each key in this dict, using the key as part of the name of the StatefulSet. + # test: + # command: |- + # while true; do + # esgf15mms check-index public + # sleep 5m + # done + +image: + repository: ghcr.io/esgf2-us/metadata-migrate-sync + # This sets the pull policy for images. + pullPolicy: Always + # Overrides the image tag whose default is the chart appVersion. + tag: "" + +# This is for the secrets for pulling an image from a private repository more information can be found here: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ +imagePullSecrets: [] +# This is to override the chart name. +nameOverride: "" +fullnameOverride: "" + +# This section builds out the service account more information can be found here: https://kubernetes.io/docs/concepts/security/service-accounts/ +serviceAccount: + # Specifies whether a service account should be created + create: true + # Automatically mount a ServiceAccount's API credentials? + automount: false + # Annotations to add to the service account + annotations: {} + # The name of the service account to use. + # If not set and create is true, a name is generated using the fullname template + name: "" + +# This is for setting Kubernetes Annotations to a Pod. +# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/ +podAnnotations: {} +# This is for setting Kubernetes Labels to a Pod. +# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ +podLabels: {} + +podSecurityContext: {} + # fsGroup: 2000 + +securityContext: {} + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # runAsUser: 1000 + + +resources: {} + # We usually recommend not to specify default resources and to leave this as a conscious + # choice for the user. This also increases chances charts run on environments with little + # resources, such as Minikube. If you do want to specify resources, uncomment the following + # lines, adjust them as necessary, and remove the curly braces after 'resources:'. + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + +nodeSelector: {} + +tolerations: [] + +affinity: {} diff --git a/pyproject.toml b/pyproject.toml index 56b81d5..0f5dc13 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -115,14 +115,14 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -packages = ["src/metadata_migrate_sync"] +packages = ["metadata_migrate_sync"] [tool.pytest.ini_options] -pythonpath = "." +pythonpath = "src" addopts = [ "--import-mode=importlib", ] [project.scripts] -esgf15mms= "metadata_migrate_sync.app:app" +esgf15mms = "metadata_migrate_sync.app:app" diff --git a/src/metadata_migrate_sync/__init__.py b/src/metadata_migrate_sync/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/metadata_migrate_sync/app.py b/src/metadata_migrate_sync/app.py index 6e646f3..3d02170 100755 --- a/src/metadata_migrate_sync/app.py +++ b/src/metadata_migrate_sync/app.py @@ -8,13 +8,12 @@ """ - - import datetime import json import pathlib import sys from enum import Enum +from typing import Literal import typer from rich import print @@ -45,26 +44,25 @@ def _combine_enums(*enums: Enum, name:str="CombinedEnum") -> Enum: app = typer.Typer() -def _validate_meta(meta: str) -> str: +def _validate_meta(meta: str) -> Literal["files", "datasets"]: if meta not in ["files", "datasets"]: raise typer.BadParameter("meta must be 'files' or 'datasets'") return meta -def _validate_src_ep(ep: str) -> str: - +def _validate_src_ep(ep: str) -> Literal["ornl", "anl", "llnl", "stage", "test_1", "test"]: if ep not in ["ornl", "anl", "llnl", "stage", "test_1", "test"]: raise typer.BadParameter(f"{ep} is not a supported ep") return ep -def _validate_tgt_ep(ep: str) -> str: +def _validate_tgt_ep(ep: str) -> Literal["test", "test_1", "public", "stage", "backup"]: if ep not in ["test", "test_1", "public", "stage", "backup"]: raise typer.BadParameter(f"{ep} is not a supported ep ") return ep -def _validate_project(project: str) -> str: +def _validate_project(project: str) -> ProjectReadOnly | ProjectReadWrite: if project is not None: for p in ProjectReadOnly: if p.value == project: @@ -78,14 +76,10 @@ def _validate_project(project: str) -> str: @app.command() def migrate( - source_ep: str = typer.Argument( - help="source end point name", callback=_validate_src_ep - ), - target_ep: str = typer.Argument( - help="target end point name", callback=_validate_tgt_ep - ), - project: str = typer.Argument(help="project name", callback=_validate_project), - meta: str = typer.Option(help="metadata type", callback=_validate_meta), + source_ep: str = typer.Argument(help="source end point name"), + target_ep: str = typer.Argument(help="target end point name"), + project: str = typer.Argument(help="project name"), + meta: str = typer.Option(help="metadata type"), prod: bool = typer.Option(help="production run", default=False), ) -> None: """Migrate documents in solr index to the globus index. @@ -93,22 +87,21 @@ def migrate( Following the ESGF-1.5 migration plan and desingation """ metadata_migrate( - source_epname=source_ep, - target_epname=target_ep, - metatype=meta, - project=project, + source_epname=_validate_src_ep(source_ep), + target_epname=_validate_tgt_ep(target_ep), + metatype=_validate_meta(meta), + project=_validate_project(project), production=prod, ) -def _validate_tgt_ep_all(ep: str) -> str: +def _validate_tgt_ep_all(ep: str) -> Literal["test", "test_1", "public", "stage", "all-prod", "backup"]: if ep not in ["test", "test_1", "public", "stage", "all-prod", "backup"]: raise typer.BadParameter(f"{ep} is not a supported ep ") return ep @app.command() def check_index( - globus_ep: str = typer.Argument( - help="globus end point name", callback=_validate_tgt_ep_all), + globus_ep: str = typer.Argument(help="globus end point name", callback=_validate_tgt_ep_all), project: str = typer.Option(None, help="project name", callback=_validate_project), save: bool = typer.Option(False, help="save to index.json"), ) -> None: @@ -145,35 +138,29 @@ def check_index( @app.command() def sync( - source_ep: str = typer.Argument( - help="source end point name", callback=_validate_src_ep - ), - target_ep: str = typer.Argument( - help="target end point name", callback=_validate_tgt_ep - ), - project: str = typer.Argument(help="project name", callback=_validate_project), + source_ep: str = typer.Argument(help="source end point name"), + target_ep: str = typer.Argument(help="target end point name"), + project: ProjectReadWrite = typer.Argument(help="project name"), prod: bool = typer.Option(help="production run", default=False), start_time: datetime.datetime = typer.Option(help="start time", default=None), + work_dir: pathlib.Path = typer.Option(help="writable directory to store database and outputs", default=pathlib.Path(".")), + dry_run: bool = typer.Option(help="do everything the same except don't write to the target index", default=False), ) -> None: """Sync the ESGF-1.5 staged indexes to the public index. Details can be seen in the design.md """ - lock_file_path = f"/tmp/metadata_migrate_sync_{project.value}.lock" # noqa S108 - - try: - lock_fd = create_lock(lock_file_path) - - metadata_sync( - source_epname=source_ep, - target_epname=target_ep, - project=project, - production=prod, - sync_freq=5, - start_time=start_time, - ) - finally: - release_lock(lock_fd, lock_file_path) + + metadata_sync( + source_epname=_validate_src_ep(source_ep), + target_epname=_validate_tgt_ep(target_ep), + project=_validate_project(project), + production=prod, + sync_freq=5, + start_time=start_time, + work_dir=work_dir, + dry_run=dry_run, + ) @app.command() diff --git a/src/metadata_migrate_sync/database.py b/src/metadata_migrate_sync/database.py index 0e7d7ed..0eb5ab3 100644 --- a/src/metadata_migrate_sync/database.py +++ b/src/metadata_migrate_sync/database.py @@ -1,5 +1,6 @@ """Sqlite database for index migrationa and sync.""" +import logging import pathlib from datetime import datetime from typing import Any, ClassVar, Optional @@ -24,6 +25,8 @@ from metadata_migrate_sync.provenance import provenance from metadata_migrate_sync.solr import SolrIndexes +logger = logging.getLogger(__name__) + # Create a base class for models class Base(DeclarativeBase): @@ -152,9 +155,6 @@ def __init__(self, db_filename: str | pathlib.Path, insert_index: bool): self._engine = create_engine(self._DATABASE_URL, echo=False) Base.metadata.create_all(self._engine) - logger = provenance.get_logger(__name__) - - logger.info("this is the only initalization in database") if insert_index: diff --git a/src/metadata_migrate_sync/globus.py b/src/metadata_migrate_sync/globus.py index 42e16eb..f088853 100644 --- a/src/metadata_migrate_sync/globus.py +++ b/src/metadata_migrate_sync/globus.py @@ -1,21 +1,23 @@ """Globus index handlers and CVs and methods.""" -import pathlib +import logging +import os from enum import Enum from typing import Any, Literal from uuid import UUID from globus_sdk import ( - NativeAppAuthClient, - RefreshTokenAuthorizer, + AccessTokenAuthorizer, + ConfidentialAppAuthClient, SearchClient, SearchQuery, ) -from globus_sdk.tokenstorage import SimpleJSONFileAdapter from pydantic import BaseModel, ConfigDict, field_validator from metadata_migrate_sync.esgf_index_schema.schema_solr import DatasetDocs, FileDocs from metadata_migrate_sync.project import ProjectReadWrite -from metadata_migrate_sync.provenance import provenance + + +logger = logging.getLogger(__name__) class GlobusCV(str, Enum): @@ -51,65 +53,29 @@ class GlobusIngestModel(BaseModel): @classmethod def check_gmeta(cls, data: dict[Any, Any]) -> dict[Any, Any]: """Check if the dictionary is GlobusMeta.""" - logger = provenance._instance.get_logger(__name__) if len(data.keys()) != 1 or "gmeta" not in data: logger.error("no gmeta in the dict") raise ValueError("no gmeta in the dict") return data +def get_authorized_search_client() -> SearchClient: + """Return a SearchClient authorized to search indicies.""" -# from Lucasz and Nate code with some minor changes -def get_authorized_search_client( - app_client_id: UUID | str, token_name: str = "token.json" # noqa S107 -) -> SearchClient: - """Return a transfer client authorized to make transfers.""" - config_path = pathlib.Path.home() / ".ssh" - config_path.mkdir(parents=True, exist_ok=True) - token_adapter = SimpleJSONFileAdapter(config_path / token_name) - app_client = NativeAppAuthClient(app_client_id) - - if token_adapter.file_exists(): - tokens = token_adapter.get_token_data("search.api.globus.org") - else: - app_client.oauth2_start_flow( - requested_scopes=["urn:globus:auth:scope:search.api.globus.org:all"], - refresh_tokens=True, - ) - authorize_url = app_client.oauth2_get_authorize_url() - print( - f""" -All interactions with Globus must be authorized. To ensure that we have permission to faciliate your transfer, -please open the following link in your browser. - -{authorize_url} - -You will have to login (or be logged in) to your Globus account. -Globus will also request that you give a label for this authorization. -You may pick anything of your choosing. After following the instructions in your browser, -Globus will generate a code which you must copy and paste here and then hit .\n""" - ) - auth_code = input("> ").strip() - token_response = app_client.oauth2_exchange_code_for_tokens(auth_code) - token_adapter.store(token_response) - tokens = token_response.by_resource_server["search.api.globus.org"] - - authorizer = RefreshTokenAuthorizer( - tokens["refresh_token"], - app_client, - access_token=tokens["access_token"], - expires_at=tokens["expires_at_seconds"], - on_refresh=token_adapter.on_refresh, + globus_auth_client = ConfidentialAppAuthClient( + os.environ['GLOBUS_CLIENT_ID'], + os.environ['GLOBUS_CLIENT_SECRET'], ) - search_client = SearchClient(authorizer=authorizer) - return search_client + token_response = globus_auth_client.oauth2_client_credentials_tokens("urn:globus:auth:scope:search.api.globus.org:all") + access_token = token_response.by_resource_server["search.api.globus.org"][ + "access_token" + ] + return SearchClient(authorizer=AccessTokenAuthorizer(access_token)) class ClientModel(BaseModel): """A client model includes many aspects and indexes.""" model_config = ConfigDict(arbitrary_types_allowed=True) - app_client_id: UUID - token_name: str search_client: SearchClient | None search_query: SearchQuery indexes: dict[str, UUID] @@ -134,8 +100,6 @@ class GlobusClient: globus_clients: dict[str, ClientModel] = {} _client_test = { - "app_client_id": "fe862e63-f3bb-457a-9662-995832bb692f", - "token_name": "test_index.json", "search_client": None, "search_query": SearchQuery(""), "indexes": { @@ -145,8 +109,6 @@ class GlobusClient: } _client_prod_migration = { - "app_client_id": "bb163ebc-7feb-490e-8296-e572a0622e3c", - "token_name": "metadata_migration_sync_tokens.json", "search_client": None, "search_query": SearchQuery(""), "indexes": { @@ -156,8 +118,6 @@ class GlobusClient: } _client_prod_sync = { - "app_client_id": "bb163ebc-7feb-490e-8296-e572a0622e3c", - "token_name": "metadata_migration_sync_tokens.json", "search_client": None, "search_query": SearchQuery(""), "indexes": { @@ -225,8 +185,6 @@ def get_client_index_names( def get_client(cls, name: str = "test") -> ClientModel: """Get the search client and index list.""" - logger = provenance.get_logger(__name__) - client_name, _ = cls.get_client_index_names(name) if client_name == "prod-all": @@ -239,11 +197,7 @@ def get_client(cls, name: str = "test") -> ClientModel: if client_prod_all["search_client"] is None: # migration app client - client_prod_all["search_client"] = get_authorized_search_client( - client_prod_all["app_client_id"], - client_prod_all["token_name"], - ) - + client_prod_all["search_client"] = get_authorized_search_client() return ClientModel(**client_prod_all) @@ -252,10 +206,7 @@ def get_client(cls, name: str = "test") -> ClientModel: if cls.globus_clients[client_name].search_client is None: logger.info(f"no search client and request for the client {client_name}") - cls.globus_clients[client_name].search_client = get_authorized_search_client( - cls.globus_clients[client_name].app_client_id, - cls.globus_clients[client_name].token_name, - ) + cls.globus_clients[client_name].search_client = get_authorized_search_client() logger.info(f"return the search client with the name {name}.") diff --git a/src/metadata_migrate_sync/ingest.py b/src/metadata_migrate_sync/ingest.py index 5fde8c6..4b93d8d 100644 --- a/src/metadata_migrate_sync/ingest.py +++ b/src/metadata_migrate_sync/ingest.py @@ -1,20 +1,24 @@ """Ingest module.""" import json from datetime import datetime +import logging from typing import Any, Literal from uuid import UUID -from globus_sdk import SearchClient +from globus_sdk import GlobusHTTPResponse, SearchClient from pydantic import ( BaseModel, validate_call, ) +from requests import Response from sqlalchemy.orm import object_session from metadata_migrate_sync.database import Datasets, Files, Ingest, MigrationDB, Query from metadata_migrate_sync.globus import GlobusClient, GlobusIngestModel from metadata_migrate_sync.project import ProjectReadOnly, ProjectReadWrite -from metadata_migrate_sync.provenance import provenance + + +logger = logging.getLogger(__name__) class BaseIngest(BaseModel): @@ -33,9 +37,8 @@ class GlobusIngest(BaseIngest): _response_data: dict[Any, Any] = {} # from globus2solr - def ingest(self, gingest: dict[str, Any]) -> None: + def ingest(self, gingest: dict[str, Any], dry_run: bool = False) -> None: """Ingest documents to a globus index using globus search client.""" - logger = provenance._instance.get_logger(__name__) current_timestr = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info("start the inject now at " + current_timestr) @@ -54,11 +57,12 @@ def ingest(self, gingest: dict[str, Any]) -> None: logger.error("end_point is not consistent with ep_name") raise ValueError("end_point is not consistent with ep_name") - if isinstance(sc, SearchClient): - response = sc.ingest(_globus_index_id, gingest) + if dry_run: + r = Response() + r._content = b"{'acknowledged': True, 'success': True, 'task_id': '1234567890'}" + response = GlobusHTTPResponse(r) else: - logger.error("not a search client") - raise ValueError("not a search client") + response = sc.ingest(_globus_index_id, gingest) self._response_data = response.data @@ -79,7 +83,6 @@ def prov_collect( batch_num: int = -1, ) -> None: """Provenance collection and database updation.""" - logger = provenance._instance.get_logger(__name__) if not self._submitted: diff --git a/src/metadata_migrate_sync/migrate.py b/src/metadata_migrate_sync/migrate.py index 2682f6b..0492982 100644 --- a/src/metadata_migrate_sync/migrate.py +++ b/src/metadata_migrate_sync/migrate.py @@ -18,6 +18,8 @@ from metadata_migrate_sync.query import SolrQuery, params_search from metadata_migrate_sync.solr import SolrIndexes +logger = logging.getLogger(__name__) + @validate_call def metadata_migrate( @@ -43,7 +45,6 @@ def metadata_migrate( ingest_index_type="globus", ingest_index_name=target_epname, ingest_index_schema="ESGF1.5", - log_file=f"migration_{source_epname}_{target_epname}_{project.value}_{metatype}.log", prov_file=f"migration_{source_epname}_{target_epname}_{project.value}_{metatype}.json", db_file=f"migration_{source_epname}_{target_epname}_{project.value}_{metatype}.sqlite", type_query=metatype.capitalize(), @@ -52,16 +53,10 @@ def metadata_migrate( pathlib.Path(prov.prov_file).write_text(prov.model_dump_json(indent=2)) - logger = ( - provenance._instance.get_logger(__name__) - if provenance._instance is not None else logging.getLogger(__name__) - ) - logger.info(f"set up the provenance and save it to {prov.prov_file}") - logger.info(f"log file is at {prov.log_file}") # database - _ = MigrationDB(prov.db_file, True) + MigrationDB(prov.db_file, True) logger.info(f"initialed the sqllite database at {prov.db_file}") # query generator @@ -201,4 +196,4 @@ def metadata_migrate( # clean up logging.shutdown() prov.successful = True - pathlib.Path(prov.prov_file).write_text(prov.model_dump_json(indent=2)) + prov.prov_file.write_text(prov.model_dump_json(indent=2)) diff --git a/src/metadata_migrate_sync/provenance.py b/src/metadata_migrate_sync/provenance.py index 7925e37..593e0e9 100644 --- a/src/metadata_migrate_sync/provenance.py +++ b/src/metadata_migrate_sync/provenance.py @@ -1,11 +1,9 @@ """Provenance module.""" -import logging -import logging.config import os -import pathlib import platform import sys from importlib.metadata import distributions +from pathlib import Path from typing import Any, Literal from uuid import UUID @@ -42,9 +40,8 @@ class provenance(BaseModel, metaclass=SingletonMeta): ingest_index_schema: str = "ESGF1.5" cmd_line: str - log_file: str | pathlib.Path = "test.log" - prov_file: str | pathlib.Path = "test.json" - db_file: str | pathlib.Path = "test.db" + prov_file: Path = Path("test.json") + db_file: Path = Path("test.db") successful: bool = False @@ -60,42 +57,3 @@ class provenance(BaseModel, metaclass=SingletonMeta): python_modules: dict[str, str] | None = { p.metadata["Name"]: p.version for p in distributions() } - - @classmethod - def get_logger(cls, name:str) -> logging.Logger: - """Get a logger handler.""" - log_filename = "test.log" if cls._instance is None else cls._instance.log_file - - logging_config = { - "version": 1, - "formatters": { - "standard": { - "format": "%(asctime)s - %(funcName)s - %(levelname)s - %(message)s", - }, - }, - "handlers": { - "console": { - "level": "INFO", - "class": "logging.StreamHandler", - "formatter": "standard", - }, - "file": { - "level": "DEBUG", - "class": "logging.FileHandler", - "filename": log_filename, - "formatter": "standard", - }, - }, - "loggers": { - "": { - "handlers": ["file"], - "level": "DEBUG", - "propagate": True, - }, - }, - } - - logging.config.dictConfig(logging_config) - logger = logging.getLogger() - - return logger diff --git a/src/metadata_migrate_sync/query.py b/src/metadata_migrate_sync/query.py index 0f104a0..10158e0 100644 --- a/src/metadata_migrate_sync/query.py +++ b/src/metadata_migrate_sync/query.py @@ -18,7 +18,10 @@ from metadata_migrate_sync.database import Files, Index, Ingest, MigrationDB, Query from metadata_migrate_sync.globus import GlobusClient from metadata_migrate_sync.project import ProjectReadOnly, ProjectReadWrite -from metadata_migrate_sync.provenance import provenance + + +logger = logging.getLogger(__name__) + params_search = { "sort": "id asc", @@ -50,7 +53,6 @@ class SolrQuery(BaseQuery): def get_cursormark(self, review: bool = False) -> None: """Get the cursormark from the database file.""" - logger = provenance._instance.get_logger(__name__) if provenance is not None else logging.getLogger() if review: # get all the failed cases in the database, re-query and re-ingest @@ -122,8 +124,8 @@ def get_cursormark(self, review: bool = False) -> None: @staticmethod def _make_request( - url: str, - params: dict[str, Any], + url: str, + params: dict[str, Any], is_test: bool = False ) -> tuple[dict[str, Any], float, str] | None | int: """Make an HTTP GET request with retry logic. @@ -139,7 +141,6 @@ def _make_request( None if request failed. """ - logger = provenance.get_logger(__name__) retry_strategy = Retry( total=3, @@ -180,7 +181,6 @@ def run(self) -> Generator[Any, None, None]: Generator[Any, None, None]: The docs from each page. """ - logger = provenance.get_logger(__name__) while True: result = self._make_request(self.end_point, self.query) @@ -279,12 +279,15 @@ def prov_collect( self._current_query = curpage +Paginator = Literal["post", "scroll"] + + class GlobusQuery(BaseQuery): """query globus index.""" query: dict[Any, Any] generator: bool = False - paginator: Literal["post", "scroll"] + paginator: Paginator skip_prov: bool = False _current_query: Any | None = None @@ -296,7 +299,6 @@ class GlobusQuery(BaseQuery): def get_offset_marker(self, review:bool = False) -> None: """Find the offset or marker of previous synchronization.""" - logger = provenance._instance.get_logger(__name__) if review: pass @@ -423,10 +425,6 @@ def get_offset_marker(self, review:bool = False) -> None: def run(self) -> Generator[Any, None, None]: """Query the globus index in a pagination way.""" - logger = ( - provenance._instance.get_logger(__name__) - if provenance._instance is not None else logging.getLogger(__name__) - ) client_name, index_name = GlobusClient.get_client_index_names(self.ep_name, self.project.value) @@ -521,7 +519,6 @@ def prov_collect( sq: SearchQuery ) -> None: """Collect provenance and update database from a globus query.""" - logger = provenance._instance.get_logger(__name__) self._numFound = entries.get("total") diff --git a/src/metadata_migrate_sync/sync.py b/src/metadata_migrate_sync/sync.py index 9a20004..1dd3203 100644 --- a/src/metadata_migrate_sync/sync.py +++ b/src/metadata_migrate_sync/sync.py @@ -19,6 +19,9 @@ from metadata_migrate_sync.util import get_last_value, get_utc_time_from_server +logger = logging.getLogger(__name__) + + class SyncConfig: """config class for sync.""" @@ -69,8 +72,7 @@ def _setup_time_range_filter( production: bool, sync_freq: int | None, start_time: datetime | None, - logger: logging.Logger, - data_dir: str = "./", + data_dir: pathlib.Path = pathlib.Path("./"), ) -> dict[str, dict[str, Any] | None]: """Set up the time range filter for the query.""" if not production or sync_freq is None: @@ -88,12 +90,11 @@ def _setup_time_range_filter( prod_start = None for day in [0, 1, 2]: time_str = (datetime.now() - timedelta(days=day)).strftime("%Y-%m-%d") - path_db = f"{path_db_base}_{time_str}.sqlite" - prev_db = pathlib.Path(data_dir) / path_db + prev_db = data_dir / f"{path_db_base}_{time_str}.sqlite" logger.info(f"Looking for previous database file {prev_db}") - if pathlib.Path(prev_db).is_file(): + if prev_db.is_file(): query_str = get_last_value('query_str', "query", db_path=prev_db) cursorMark_next = get_last_value('cursorMark_next', "query", db_path=prev_db) @@ -139,6 +140,8 @@ def metadata_sync( production: bool, sync_freq: int | None = None, start_time: datetime | None = None, + work_dir: pathlib.Path, + dry_run: bool = False, ) -> None: """Sync the metadata between two Globus Indexes.""" target_client, target_index = GlobusClient.get_client_index_names(target_epname, target_epname) @@ -159,26 +162,19 @@ def metadata_sync( ingest_index_type="globus", ingest_index_name=target_epname, ingest_index_schema="ESGF1.5", - log_file=f"{file_base}.log", - prov_file=f"{file_base}.json", - db_file=f"{file_base}.sqlite", + prov_file=work_dir / f"{file_base}.json", + db_file=work_dir / f"{file_base}.sqlite", type_query="mixed (datasets and files)", cmd_line=" ".join(sys.argv), ) - pathlib.Path(prov.prov_file).write_text(prov.model_dump_json(indent=2)) - - logger = ( - provenance._instance.get_logger(__name__) - if provenance._instance is not None else logging.getLogger() - ) + prov.prov_file.write_text(prov.model_dump_json(indent=2)) logger.info(f"set up the provenance and save it to {prov.prov_file}") - logger.info(f"log file is at {prov.log_file}") # database - _ = MigrationDB(prov.db_file, True) + MigrationDB(prov.db_file, True) logger.info(f"initialized the sqlite database at {prov.db_file}") # query generator @@ -197,13 +193,13 @@ def metadata_sync( } - path_db_base = f"synchronization_{source_epname}_{target_epname}_{project.value}" #_{time_str}.sqlite" + path_db_base: str = f"synchronization_{source_epname}_{target_epname}_{project.value}" #_{time_str}.sqlite" time_range_filter = _setup_time_range_filter( path_db_base, production, sync_freq, start_time, - logger, + data_dir=work_dir, ) @@ -215,7 +211,6 @@ def metadata_sync( maxpage = 2 - gq = GlobusQuery( end_point=prov.source_index_id, ep_type="globus", @@ -315,7 +310,8 @@ def metadata_sync( GlobusCV.INGEST_DATA.value: { GlobusCV.GMETA.value: batch, } - } + }, + dry_run=dry_run, ) ig.prov_collect( @@ -358,7 +354,7 @@ def metadata_sync( # clean up logging.shutdown() prov.successful = True - pathlib.Path(prov.prov_file).write_text(prov.model_dump_json(indent=2)) + prov.prov_file.write_text(prov.model_dump_json(indent=2)) if __name__ == "__main__": @@ -368,5 +364,6 @@ def metadata_sync( target_epname="test", project=ProjectReadWrite.INPUT4MIPS, production=False, + work_dir=pathlib.Path("./"), ) diff --git a/src/metadata_migrate_sync/util.py b/src/metadata_migrate_sync/util.py index ccd0e30..f4cf209 100644 --- a/src/metadata_migrate_sync/util.py +++ b/src/metadata_migrate_sync/util.py @@ -11,9 +11,8 @@ import requests -def create_lock(lockfile_path: str) -> int: +def create_lock(lockfile_path: Path) -> int: """Create a lock file to prevent multiple instances.""" - lock_file = Path(lockfile_path) try: # Create or open the lock file @@ -27,10 +26,10 @@ def create_lock(lockfile_path: str) -> int: return fd except (OSError, BlockingIOError): - print(f"Another instance is already running (PID: {lock_file.read_text().strip()})") + print(f"Another instance is already running (PID: {lockfile_path.read_text().strip()})") sys.exit(1) -def release_lock(fd: int, lockfile_path: str) -> None: +def release_lock(fd: int, lockfile_path: Path) -> None: """Release the lock file.""" try: os.unlink(lockfile_path) @@ -83,7 +82,7 @@ def get_utc_time_from_server(ahead_minutes: int = 3) -> str: -def get_last_value(column_name:str, table_name:str, db_path:str='database.db') -> str | None: +def get_last_value(column_name: str, table_name: str, db_path: Path = Path('database.db')) -> str | None: """Get a column value in the last row of a table.""" with sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) as conn: cursor = conn.cursor() From b76aa22e2d9d2287bc0b084670451479bb0bba04 Mon Sep 17 00:00:00 2001 From: Zach Price Date: Tue, 6 May 2025 18:47:50 -0400 Subject: [PATCH 2/3] Make package importable again --- pyproject.toml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 0f5dc13..e53f3e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [project] -name = "metadata_migrate_sync" +name = "metadata-migrate-sync" version = "0.0.1" description = "metadata migrate and sync" readme = "README.md" @@ -114,9 +114,6 @@ follow_imports = "normal" requires = ["hatchling"] build-backend = "hatchling.build" -[tool.hatch.build.targets.wheel] -packages = ["metadata_migrate_sync"] - [tool.pytest.ini_options] pythonpath = "src" From 08722d6dad56158c55f294f30f580f3972b0106e Mon Sep 17 00:00:00 2001 From: Zach Price Date: Wed, 7 May 2025 14:09:13 -0400 Subject: [PATCH 3/3] Reimplement file based locking to handle OLCF filesystem --- src/metadata_migrate_sync/app.py | 134 ++++++++++++++++-------------- src/metadata_migrate_sync/util.py | 113 ++++++++++++++----------- 2 files changed, 139 insertions(+), 108 deletions(-) diff --git a/src/metadata_migrate_sync/app.py b/src/metadata_migrate_sync/app.py index 3d02170..bd3836e 100755 --- a/src/metadata_migrate_sync/app.py +++ b/src/metadata_migrate_sync/app.py @@ -10,9 +10,9 @@ import datetime import json -import pathlib import sys from enum import Enum +from pathlib import Path from typing import Literal import typer @@ -24,11 +24,12 @@ from metadata_migrate_sync.project import ProjectReadOnly, ProjectReadWrite from metadata_migrate_sync.query import GlobusQuery from metadata_migrate_sync.sync import metadata_sync -from metadata_migrate_sync.util import create_lock, release_lock +from metadata_migrate_sync.util import file_lock sys.setrecursionlimit(10000) -def _combine_enums(*enums: Enum, name:str="CombinedEnum") -> Enum: + +def _combine_enums(*enums: Enum, name: str = "CombinedEnum") -> Enum: members = {} for enum in enums: for member in enum: @@ -94,11 +95,13 @@ def migrate( production=prod, ) + def _validate_tgt_ep_all(ep: str) -> Literal["test", "test_1", "public", "stage", "all-prod", "backup"]: if ep not in ["test", "test_1", "public", "stage", "all-prod", "backup"]: raise typer.BadParameter(f"{ep} is not a supported ep ") return ep + @app.command() def check_index( globus_ep: str = typer.Argument(help="globus end point name", callback=_validate_tgt_ep_all), @@ -107,17 +110,16 @@ def check_index( ) -> None: """Check the globus index status.""" gc = GlobusClient() - cm = gc.get_client(name = globus_ep) + cm = gc.get_client(name=globus_ep) sc = cm.search_client if project is None: - tab_index = [] for index_name in cm.indexes: index_id = cm.indexes.get(index_name) r = sc.get_index(index_id) - print (r.data) + print(r.data) tab_index.append(r.data) else: @@ -128,13 +130,14 @@ def check_index( index_id = cm.indexes.get(project.value) if index_id: - print (sc.get_index(index_id).data) + print(sc.get_index(index_id).data) tab_index = sc.get_index(index_id).data else: - print (f"Cannot find index for {project} in the {globus_ep} group, find it in public group") + print(f"Cannot find index for {project} in the {globus_ep} group, find it in public group") if save: - pathlib.Path("index.json").write_text(json.dumps(tab_index)) + Path("index.json").write_text(json.dumps(tab_index)) + @app.command() def sync( @@ -143,42 +146,45 @@ def sync( project: ProjectReadWrite = typer.Argument(help="project name"), prod: bool = typer.Option(help="production run", default=False), start_time: datetime.datetime = typer.Option(help="start time", default=None), - work_dir: pathlib.Path = typer.Option(help="writable directory to store database and outputs", default=pathlib.Path(".")), - dry_run: bool = typer.Option(help="do everything the same except don't write to the target index", default=False), + work_dir: Path = typer.Option(help="writable directory to store outputs", default=Path(".")), + dry_run: bool = typer.Option( + help="do everything the same except don't write to the target index", default=False + ), ) -> None: """Sync the ESGF-1.5 staged indexes to the public index. Details can be seen in the design.md """ - metadata_sync( - source_epname=_validate_src_ep(source_ep), - target_epname=_validate_tgt_ep(target_ep), - project=_validate_project(project), - production=prod, - sync_freq=5, - start_time=start_time, - work_dir=work_dir, - dry_run=dry_run, - ) + with file_lock(): + metadata_sync( + source_epname=_validate_src_ep(source_ep), + target_epname=_validate_tgt_ep(target_ep), + project=_validate_project(project), + production=prod, + sync_freq=5, + start_time=start_time, + work_dir=work_dir, + dry_run=dry_run, + ) @app.command() def create_index() -> None: """Create index for the test app.""" gc = GlobusClient() - cm = gc.get_client(name = "test") + cm = gc.get_client(name="test") sc = cm.search_client r = sc.create_index("minxu test index 2", "for testing purpose") - print (r) + print(r) + @app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) def query_globus( ctx: typer.Context, - globus_ep: str = typer.Argument( - help="globus end point name", callback=_validate_tgt_ep), + globus_ep: str = typer.Argument(help="globus end point name", callback=_validate_tgt_ep), project: str = typer.Argument(help="project name", callback=_validate_project), order_by: str = typer.Option(help="sort the result by field_name.asc or field_name.desc"), limit: int = typer.Option(10, help="the limit of a page"), @@ -192,41 +198,43 @@ def query_globus( ) -> None: """Search globus index with normal and scroll paginations.""" if "." not in order_by: - print ("please provide the correct order-by") + print("please provide the correct order-by") raise typer.Abort() - order_field = order_by.split('.')[0] - order = order_by.split('.')[1] - query = {"filters":[], "sort_field": order_field, "sort": order} + order_field = order_by.split(".")[0] + order = order_by.split(".")[1] + query = {"filters": [], "sort_field": order_field, "sort": order} query["limit"] = limit query["offset"] = offset - if 'TO' not in time_range: - print ("please provide a validate time range datetime-datetime") + if "TO" not in time_range: + print("please provide a validate time range datetime-datetime") raise typer.Abort() - start_time = time_range.split('TO')[0] - if start_time == '': + start_time = time_range.split("TO")[0] + if start_time == "": start_iso = "*" else: t_start = datetime.datetime.fromisoformat(start_time) start_iso = t_start.isoformat() + "Z" # "2023-01-01T00:00:00Z" - end_time = time_range.split('TO')[1] - if end_time == '': + end_time = time_range.split("TO")[1] + if end_time == "": end_iso = "*" else: t_end = datetime.datetime.fromisoformat(end_time) - end_iso = t_end.isoformat() + "Z" # "2023-12-31T00:00:00Z" + end_iso = t_end.isoformat() + "Z" # "2023-12-31T00:00:00Z" time_cond = { "type": "range", "field_name": "_timestamp", - "values": [{ - "from": start_iso, # Greater than or equal to start_date - "to": end_iso # Less than or equal to end_date - }] + "values": [ + { + "from": start_iso, # Greater than or equal to start_date + "to": end_iso, # Less than or equal to end_date + } + ], } query["filters"].append(time_cond) @@ -243,12 +251,11 @@ def query_globus( else: typer.echo(f"Ignoring invalid argument: {arg}") - # Handle kwargs if kwargs: for key, value in kwargs: if key[2:] == "project": - query["filters"].remove(proj_cond) + query["filters"].remove(proj_cond) if "::" in value: value_1 = value.split("::")[0] value_2 = value.split("::")[1] @@ -259,7 +266,7 @@ def query_globus( case "not": filter_cond = { "type": value_2, - "filter":{ + "filter": { "type": "match_all", "field_name": key[2:], "values": [value_1], @@ -300,27 +307,30 @@ def query_globus( if printvar is not None: for k, g in enumerate(page.get("gmeta")): - print_dict = { "total": page.get("total"), "subject": g["subject"], } - for var in printvar.split(','): + for var in printvar.split(","): if var in g["entries"][0]["content"]: - print_dict.update({ - var: g["entries"][0]["content"][var], - }) + print_dict.update( + { + var: g["entries"][0]["content"][var], + } + ) elif var in page and var != "gmeta": - print_dict.update({ - var: page[var], - }) - + print_dict.update( + { + var: page[var], + } + ) - print (json.dumps(print_dict)) + print(json.dumps(print_dict)) if k >= 10: - break + break + @app.command() def check_task( @@ -330,19 +340,21 @@ def check_task( ) -> None: """Check the globus task ids.""" check_ingest_tasks( - task_id = task_id, - db_file = db_file, - update = update, + task_id=task_id, + db_file=db_file, + update=update, ) @app.callback() def main(ctx: typer.Context) -> None: """Add the tip for more filter functions.""" - if ctx.invoked_subcommand == "query-globus" and ( - "--help" in sys.argv or "-h" in sys.argv): - print ("\n[bold red]Attention:[/bold red] more globus filters can " + - "be applied by [green]--keyword=value::filter_option[/green]") + if ctx.invoked_subcommand == "query-globus" and ("--help" in sys.argv or "-h" in sys.argv): + print( + "\n[bold red]Attention:[/bold red] more globus filters can " + + "be applied by [green]--keyword=value::filter_option[/green]" + ) + if __name__ == "__main__": app() diff --git a/src/metadata_migrate_sync/util.py b/src/metadata_migrate_sync/util.py index f4cf209..5a2a42e 100644 --- a/src/metadata_migrate_sync/util.py +++ b/src/metadata_migrate_sync/util.py @@ -1,42 +1,63 @@ """Utility tools.""" + +import contextlib import datetime import fcntl +import logging import os import sqlite3 import sys +from collections.abc import Generator +from io import TextIOWrapper from pathlib import Path import ntplib -from ntplib import NTPException import requests +from ntplib import NTPException +logger = logging.getLogger(__name__) -def create_lock(lockfile_path: Path) -> int: - """Create a lock file to prevent multiple instances.""" - - try: - # Create or open the lock file - fd = os.open(lockfile_path, os.O_WRONLY | os.O_CREAT) - - # Try to acquire an exclusive lock (non-blocking) - fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - - # Write our PID to the file - os.write(fd, str(os.getpid()).encode()) - - return fd - except (OSError, BlockingIOError): - print(f"Another instance is already running (PID: {lockfile_path.read_text().strip()})") - sys.exit(1) -def release_lock(fd: int, lockfile_path: Path) -> None: - """Release the lock file.""" +@contextlib.contextmanager +def file_lock(lock_file_path: Path = Path("/var/run")) -> Generator[TextIOWrapper, None, None]: + """Context manager for file based locking.""" + lock_file = lock_file_path / "esgf15mms.pid" + acquired = False try: - os.unlink(lockfile_path) - fcntl.flock(fd, fcntl.LOCK_UN) - os.close(fd) - except OSError: - pass + # Open the lock file. Use 'a+' mode to create the file if it doesn't exist. + with open(lock_file, "a+") as file_handle: + # Acquire an exclusive lock (LOCK_EX). + # LOCK_NB makes the lock non-blocking. If the lock cannot be acquired + # immediately, an OSError is raised. Remove LOCK_NB for a blocking lock. + fcntl.flock(file_handle, fcntl.LOCK_EX | fcntl.LOCK_NB) + acquired = True + + # Write the current process ID to the lock file. + file_handle.write(str(os.getpid())) + logger.info(f"Lock acquired on {lock_file}") + + # Yield control to the 'with' block. + yield file_handle + + # Release the lock. + fcntl.flock(file_handle, fcntl.LOCK_UN) + logger.info(f"Lock released on {lock_file}") + + except OSError as e: + # If the lock is already held (due to LOCK_NB), an OSError is raised. + # errno.EWOULDBLOCK is the specific error code for this. + import errno + + if e.errno == errno.EWOULDBLOCK: + logger.error("Could not acquire lock on {lock_file}", exc_info=True) + sys.exit(1) + else: + # Re-raise the exception if it's not a lock error + raise + + finally: + if acquired: + lock_file.unlink() def get_utc_time_from_server(ahead_minutes: int = 3) -> str: @@ -44,30 +65,30 @@ def get_utc_time_from_server(ahead_minutes: int = 3) -> str: apis = [ "http://worldtimeapi.org/api/timezone/Etc/UTC", # HTTP (no SSL) "https://timeapi.io/api/Time/current/zone?timeZone=UTC", - "http://worldclockapi.com/api/json/utc/now" # HTTP + "http://worldclockapi.com/api/json/utc/now", # HTTP ] try: client = ntplib.NTPClient() response = client.request("pool.ntp.org") - cur_time = datetime.datetime.fromtimestamp(response.tx_time, datetime.timezone.utc) + cur_time = datetime.datetime.fromtimestamp(response.tx_time, datetime.timezone.utc) except NTPException or requests.RequestException: - cur_time = datetime.datetime.now(datetime.timezone.utc) # Local fallback + cur_time = datetime.datetime.now(datetime.timezone.utc) # Local fallback for api in apis: try: response = requests.get(api, timeout=5) data = response.json() - print (data) - if 'datetime' in data: - utc_time = data["datetime"] # 2025-04-09T19:42:34.490293+00:00 - if 'dateTime' in data: - utc_time = data["dateTime"] # 2025-04-09T19:44:44.024434 + print(data) + if "datetime" in data: + utc_time = data["datetime"] # 2025-04-09T19:42:34.490293+00:00 + if "dateTime" in data: + utc_time = data["dateTime"] # 2025-04-09T19:44:44.024434 - if 'Z' in utc_time: - cur_time = datetime.datetime.fromisoformat(utc_time.replace('Z', '+00:00')) - elif '+00:00' not in utc_time: - cur_time = datetime.datetime.fromisoformat(utc_time[:-1] + '+00:00') + if "Z" in utc_time: + cur_time = datetime.datetime.fromisoformat(utc_time.replace("Z", "+00:00")) + elif "+00:00" not in utc_time: + cur_time = datetime.datetime.fromisoformat(utc_time[:-1] + "+00:00") else: cur_time = datetime.datetime.fromisoformat(utc_time) @@ -76,22 +97,20 @@ def get_utc_time_from_server(ahead_minutes: int = 3) -> str: print(f"Error fetching UTC time: {e}") continue - cur_time_minus3 = (cur_time - - datetime.timedelta(minutes=ahead_minutes)).replace(second=0, microsecond=0) - return cur_time_minus3.isoformat(timespec='milliseconds').replace('+00:00', 'Z') + cur_time_minus3 = (cur_time - datetime.timedelta(minutes=ahead_minutes)).replace(second=0, microsecond=0) + return cur_time_minus3.isoformat(timespec="milliseconds").replace("+00:00", "Z") - -def get_last_value(column_name: str, table_name: str, db_path: Path = Path('database.db')) -> str | None: +def get_last_value(column_name: str, table_name: str, db_path: Path = Path("database.db")) -> str | None: """Get a column value in the last row of a table.""" with sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) as conn: cursor = conn.cursor() - - if not (table_name.replace('_', '').isalnum() and - column_name.replace('_', '').isalnum()): - raise ValueError("Invalid table or column name - \ - only alphanumeric and underscore characters allowed") + if not (table_name.replace("_", "").isalnum() and column_name.replace("_", "").isalnum()): + raise ValueError( + "Invalid table or column name - \ + only alphanumeric and underscore characters allowed" + ) query = """ SELECT ?