Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions services/datalad/datalad_service/common/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import pygit2
from charset_normalizer import from_bytes

from datalad_service.common.onchange import on_head

tag_ref = re.compile('^refs/tags/')

COMMITTER_NAME = 'Git Worker'
Expand Down Expand Up @@ -130,7 +132,7 @@ def git_rename_master_to_main(repo):
repo.references['HEAD'].set_target('refs/heads/main')


def git_commit(
async def git_commit(
repo, file_paths, author=None, message='[OpenNeuro] Recorded changes', parents=None
):
"""Commit array of paths at HEAD."""
Expand Down Expand Up @@ -161,10 +163,10 @@ def git_commit(
sentry_sdk.capture_exception(e)
logger.error(f'Failed to read index after git-annex add: {e}')
raise OpenNeuroGitError(f'Failed to read index: {e}') from e
return git_commit_index(repo, author, message, parents)
return await git_commit_index(repo, author, message, parents)


def git_commit_index(
async def git_commit_index(
repo, author=None, message='[OpenNeuro] Recorded changes', parents=None
):
"""Commit any existing index changes."""
Expand All @@ -180,4 +182,5 @@ def git_commit_index(
'refs/heads/main', author, committer, message, tree, parent_commits
)
repo.head.set_target(commit)
await on_head(repo.workdir)
return commit
11 changes: 11 additions & 0 deletions services/datalad/datalad_service/common/onchange.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from datalad_service.tasks.fsck import git_annex_fsck_local, git_annex_fsck_remote


async def on_head(dataset_path):
"""Called after any change to HEAD."""
await git_annex_fsck_local.kiq(dataset_path)


async def on_tag(dataset_path, tag):
"""Called after any new tag."""
await git_annex_fsck_remote.kiq(dataset_path, tag)
2 changes: 1 addition & 1 deletion services/datalad/datalad_service/handlers/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def on_post(self, req, resp, dataset):
author = pygit2.Signature(name, email)
else:
author = None
hexsha = create_dataset(self.store, dataset, author)
hexsha = await create_dataset(self.store, dataset, author)
resp.media = {'hexsha': hexsha}
resp.status = falcon.HTTP_OK

Expand Down
4 changes: 2 additions & 2 deletions services/datalad/datalad_service/handlers/draft.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ async def on_post(self, req, resp, dataset):
# Add all changes to the index
if name and email:
author = pygit2.Signature(name, email)
media_dict['ref'] = str(git_commit(repo, ['.'], author))
media_dict['ref'] = str(await git_commit(repo, ['.'], author))
else:
media_dict['ref'] = str(git_commit(repo, ['.']))
media_dict['ref'] = str(await git_commit(repo, ['.']))
resp.media = media_dict
resp.status = falcon.HTTP_OK
except:
Expand Down
5 changes: 2 additions & 3 deletions services/datalad/datalad_service/handlers/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pygit2

from datalad_service.common.git import (
git_show,
git_show_content,
git_tree,
OpenNeuroGitError,
Expand Down Expand Up @@ -125,7 +124,7 @@ async def on_delete(self, req, resp, dataset):
media_dict['email'] = email
try:
if len(dirs_to_delete) > 0:
remove_files(
await remove_files(
self.store,
dataset,
dirs_to_delete,
Expand All @@ -135,7 +134,7 @@ async def on_delete(self, req, resp, dataset):
)
resp.status = falcon.HTTP_INTERNAL_SERVER_ERROR
if len(files_to_delete) > 0:
remove_files(
await remove_files(
self.store,
dataset,
files_to_delete,
Expand Down
6 changes: 4 additions & 2 deletions services/datalad/datalad_service/handlers/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from datalad_service.common.events import log_git_event
from datalad_service.common.const import CHUNK_SIZE_BYTES
from datalad_service.common.onchange import on_head


cache_control = ['no-cache', 'max-age=0', 'must-revalidate']
Expand Down Expand Up @@ -137,11 +138,12 @@ async def on_post(self, req, resp, worker, dataset):
resp.status = falcon.HTTP_OK

# After this request finishes successfully, log it to the OpenNeuro API
def schedule_git_event():
async def schedule_git_event():
await on_head(dataset_path)
for new_commit, new_ref in refs_updated:
log_git_event(dataset, new_commit, new_ref, req.context['token'])

resp.schedule_sync(schedule_git_event)
resp.schedule(schedule_git_event)
else:
resp.status = falcon.HTTP_UNPROCESSABLE_ENTITY

Expand Down
4 changes: 2 additions & 2 deletions services/datalad/datalad_service/handlers/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ async def move_files_into_repo(
await move_files(upload_path, dataset_path)
if name and email:
author = pygit2.Signature(name, email)
hexsha = str(git_commit(repo, unlock_files, author))
hexsha = str(await git_commit(repo, unlock_files, author))
else:
hexsha = str(git_commit(repo, unlock_files))
hexsha = str(await git_commit(repo, unlock_files))


class UploadResource:
Expand Down
4 changes: 2 additions & 2 deletions services/datalad/datalad_service/tasks/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def create_datalad_config(dataset_path):
configfile.write(config)


def create_dataset(store, dataset, author=None, initial_head='main'):
async def create_dataset(store, dataset, author=None, initial_head='main'):
"""Create a DataLad git-annex repo for a new dataset.

initial_head is only meant for tests and is overridden by the implementation of git_commit
Expand All @@ -61,7 +61,7 @@ def create_dataset(store, dataset, author=None, initial_head='main'):
# Set a datalad UUID
create_datalad_config(dataset_path)
repo.index.add('.datalad/config')
git_commit(
await git_commit(
repo,
['.gitattributes', '.datalad/config'],
author,
Expand Down
2 changes: 1 addition & 1 deletion services/datalad/datalad_service/tasks/description.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def update_description(store, dataset, description_fields, name=None, emai
path, description, json.dumps(updated, indent=4, ensure_ascii=False)
)
# Commit new content, run validator
commit_files(store, dataset, ['dataset_description.json'])
await commit_files(store, dataset, ['dataset_description.json'])
return updated
else:
return description_json
12 changes: 7 additions & 5 deletions services/datalad/datalad_service/tasks/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from datalad_service.config import AWS_S3_PUBLIC_BUCKET


def commit_files(store, dataset, files, name=None, email=None, cookies=None):
async def commit_files(store, dataset, files, name=None, email=None, cookies=None):
"""
Commit a list of files with the email and name provided.

Expand All @@ -35,9 +35,9 @@ def commit_files(store, dataset, files, name=None, email=None, cookies=None):
and pygit2.Signature(name, email)
or pygit2.Signature(COMMITTER_NAME, COMMITTER_EMAIL)
)
ref = git_commit(repo, files, author)
ref = await git_commit(repo, files, author)
# Run the validator but don't block on the request
asyncio.create_task(validate_dataset.kiq(dataset, dataset_path, str(ref), cookies))
await validate_dataset.kiq(dataset, dataset_path, str(ref), cookies)
return ref


Expand All @@ -47,7 +47,7 @@ def get_tree(store, dataset, tree):
return get_repo_files(dataset, dataset_path, tree)


def remove_files(store, dataset, paths, name=None, email=None, cookies=None):
async def remove_files(store, dataset, paths, name=None, email=None, cookies=None):
dataset_path = store.get_dataset_path(dataset)
repo = pygit2.Repository(dataset_path)
if name and email:
Expand All @@ -57,7 +57,9 @@ def remove_files(store, dataset, paths, name=None, email=None, cookies=None):
repo.index.remove_all(paths)
repo.index.write()
repo.checkout_index()
hexsha = str(git_commit_index(repo, author, message='[OpenNeuro] Files removed'))
hexsha = str(
await git_commit_index(repo, author, message='[OpenNeuro] Files removed')
)


def parse_s3_annex_url(url, bucket_name=AWS_S3_PUBLIC_BUCKET):
Expand Down
117 changes: 117 additions & 0 deletions services/datalad/datalad_service/tasks/fsck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import json
import io
import logging
import subprocess

import pygit2
import requests

from datalad_service.broker import broker
from datalad_service.config import GRAPHQL_ENDPOINT


def get_head_commit_and_references(repo):
"""
Returns the current HEAD commit and any references (tags) pointing to it.
"""
head_commit = repo[repo.head.target]
references = []
for ref in repo.references:
if (
ref.startswith('refs/tags/')
and repo.lookup_reference(ref).target == head_commit.id
):
references.append(ref)
return head_commit, references


@broker.task
def git_annex_fsck_local(dataset_path):
"""Run git-annex fsck for local annexed objects in the draft. Runs on commits, verifies checksums."""
try:
commit, references = get_head_commit_and_references(
pygit2.Repository(dataset_path)
)
except pygit2.GitError:
logging.error(f'Could not open git repository for {dataset_path}')
return
annex_command = (
'git-annex',
'fsck',
'--json',
'--json-error-messages',
'--incremental-schedule',
'7d',
)
annex_process = subprocess.Popen(
annex_command, cwd=dataset_path, stdout=subprocess.PIPE
)
bad_files = []
for annexed_file_json in io.TextIOWrapper(annex_process.stdout, encoding='utf-8'):
annexed_file = json.loads(annexed_file_json)
if not annexed_file['success']:
bad_files.append(annexed_file)
if len(bad_files) > 0:
logging.error(f'missing or corrupt annexed objects found in {dataset_path}')
# Send any bad files to updateFileCheck
requests.post(
url=GRAPHQL_ENDPOINT,
json={
'query': 'mutation ($datasetId: ID!, $hexsha: String!, $refs: [String], $annexFsck: [AnnexFsck]) { updateFileChecks(datasetId: $datasetId, hexsha: $hexsha, refs: $refs, annexFsck: $annexFsck) { datasetId, hexsha } }',
'variables': {
'datasetId': dataset_path.split('/')[-1],
'hexsha': str(commit.id),
'refs': references,
'annexFsck': bad_files,
},
},
)


@broker.task
def git_annex_fsck_remote(dataset_path, branch=None, remote='s3-PUBLIC'):
"""Run incremental fsck for one branch (tag) and remote."""
try:
# Basic sanity check opening the repo before running git-annex
repo = pygit2.Repository(dataset_path)
except pygit2.GitError:
logging.error(f'Could not open git repository for {dataset_path}')
return
if not branch:
# Find the newest tag chronologically
all_tags = sorted(
[tag for tag in repo.references if tag.startswith('refs/tags/')],
key=lambda tag: repo.lookup_reference(tag).target.commit_time,
reverse=True,
)
if all_tags:
branch = all_tags[0]
else:
logging.info(
f'No tags found for dataset: {dataset_path}. Skipping remote fsck.'
)
return

# Run at most once per month per dataset
annex_command = (
'git-annex',
'fsck',
f'--branch={branch}',
'--from={remote}',
'--fast',
'--json',
'--json-error-messages',
'--incremental-schedule=7d',
)
annex_process = subprocess.Popen(
annex_command, cwd=dataset_path, stdout=subprocess.PIPE
)
bad_files = []
for annexed_file_json in io.TextIOWrapper(annex_process.stdout, encoding='utf-8'):
annexed_file = json.loads(annexed_file_json)
if not annexed_file['success']:
bad_files.append(annexed_file)
if len(bad_files) > 0:
logging.error(
f'{dataset_path} remote {remote} has missing or corrupt annexed objects'
)
17 changes: 10 additions & 7 deletions services/datalad/datalad_service/tasks/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datalad_service.tasks.dataset import create_datalad_config
from datalad_service.tasks.description import update_description
from datalad_service.tasks.files import commit_files
from datalad_service.common.onchange import on_tag


class SnapshotExistsException(Exception):
Expand Down Expand Up @@ -119,7 +120,7 @@ async def update_changes(store, dataset, tag, new_changes):
current_date = datetime.today().strftime('%Y-%m-%d')
updated = await write_new_changes(dataset_path, tag, new_changes, current_date)
# Commit new content, run validator
commit_files(store, dataset, ['CHANGES'])
await commit_files(store, dataset, ['CHANGES'])
return updated
else:
return await get_head_changes(dataset_path)
Expand All @@ -133,20 +134,22 @@ def validate_snapshot_name(store, dataset, snapshot):
raise SnapshotExistsException(f'Tag "{snapshot}" already exists, name conflict')


def validate_datalad_config(store, dataset):
async def validate_datalad_config(store, dataset):
"""Add a .datalad/config file if one does not exist."""
dataset_path = store.get_dataset_path(dataset)
repo = store.get_dataset_repo(dataset)
try:
git_show(repo, 'HEAD', '.datalad/config')
except KeyError:
create_datalad_config(dataset_path)
commit_files(store, dataset, ['.datalad/config'])
await commit_files(store, dataset, ['.datalad/config'])


def save_snapshot(store, dataset, snapshot):
repo = pygit2.Repository(store.get_dataset_path(dataset))
async def save_snapshot(store, dataset, snapshot):
ds_path = store.get_dataset_path(dataset)
repo = pygit2.Repository(ds_path)
repo.references.create(f'refs/tags/{snapshot}', str(repo.head.target))
await on_tag(ds_path, snapshot)


async def create_snapshot(
Expand All @@ -158,8 +161,8 @@ async def create_snapshot(
Raises an exception if the tag already exists.
"""
validate_snapshot_name(store, dataset, snapshot)
validate_datalad_config(store, dataset)
await validate_datalad_config(store, dataset)
await update_description(store, dataset, description_fields)
await update_changes(store, dataset, snapshot, snapshot_changes)
save_snapshot(store, dataset, snapshot)
await save_snapshot(store, dataset, snapshot)
return get_snapshot(store, dataset, snapshot)
4 changes: 2 additions & 2 deletions services/datalad/tests/test_bids.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ def test_read_dataset_description(new_dataset):
assert description['Name'] == 'Test fixture new dataset'


def test_read_dataset_description_invalid_json(new_dataset):
async def test_read_dataset_description_invalid_json(new_dataset):
repo = Repository(new_dataset.path)
open(os.path.join(new_dataset.path, 'dataset_description.json'), 'w').close()
git_commit(repo, ['dataset_description.json'])
await git_commit(repo, ['dataset_description.json'])
description = read_dataset_description(new_dataset.path, 'HEAD')
assert description is None

Expand Down
Loading