Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 66 additions & 5 deletions cli/src/worker/annex.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import type { GitWorkerContext } from "./types/git-context.ts"
import type { Logger } from "@std/log"
import { basename, dirname, join, relative } from "@std/path"
import { default as git } from "isomorphic-git"
import { default as git, TREE } from "isomorphic-git"

/**
* Why are we using hash wasm over web crypto?
* Web crypto cannot do streaming hashes of the common git-annex functions yet.
*/
import { createMD5, createSHA256 } from "hash-wasm"

/**
* Mapping from annex key to relative paths
*/
export type AnnexKeyPaths = Record<string, string>

// Initialize a utf-8 text decoder for reuse
const textDecoder = new TextDecoder("utf-8")

/**
* Reusable hash factories
*/
Expand Down Expand Up @@ -65,7 +74,6 @@ export function annexRelativePath(path: string) {
* @param context GitWorkerContext objects
*/
export async function annexAdd(
annexKeys: Record<string, string>,
hash: string,
path: string,
relativePath: string,
Expand Down Expand Up @@ -119,8 +127,6 @@ export async function annexAdd(

// Key has changed if the existing link points to another object
if (forceAdd || link !== symlinkTarget) {
// Upload this key after the git commit
annexKeys[annexKey] = path
// This object has a new annex hash, update the symlink and add it
const symlinkTarget = join(
annexRelativePath(relativePath),
Expand Down Expand Up @@ -157,5 +163,60 @@ export async function readAnnexPath(
oid: annexBranchOid,
filepath: logPath,
})
return new TextDecoder().decode(blob)
return textDecoder.decode(blob)
}

/**
* Walk the git tree belonging to `ref` and return a mapping from annex keys to relative path
* @param ref Git reference to scan for annex objects
* @param logger Logger to use, reports what keys are found at INFO level
* @param context GitWorkerContext configured for a repo
*/
export async function getAnnexKeys(
ref: string,
logger: Logger,
context: GitWorkerContext,
): Promise<AnnexKeyPaths> {
const annexKeys = {} as AnnexKeyPaths
const annexedGitObjects: { path: string; oid: string }[] = []
// Walk HEAD and find all annex symlinks
await git.walk({
...context.config(),
trees: [TREE({ ref })],
map: async function (filepath, [entry]) {
if (
entry && await entry.type() === "blob" &&
await entry.mode() === 0o120000
) {
annexedGitObjects.push({ path: filepath, oid: await entry.oid() })
const content = await entry.content()
if (content) {
const symlinkTarget = textDecoder.decode(content)
const annexKey = basename(symlinkTarget)
// Check that annexKey conforms to the git-annex key format
// Other symlinks are allowed but may be rejected on push if they point outside of the repo
if (
annexKey.match(/^[A-Z0-9]+-s\d+--[0-9a-fA-F]+(\.[a-zA-Z0-9.]*)?$/)
) {
logger.info(`Found key "${annexKey}" in HEAD.`)
annexKeys[annexKey] = filepath
} else {
logger.warn(
`Skipping invalid annex key format: "${annexKey}" for file "${filepath}"`,
)
}
}
}
},
})

if (annexedGitObjects.length > 0) {
logger.info("Annexed objects in HEAD:")
for (const obj of annexedGitObjects) {
logger.info(`- ${obj.path} (OID: ${obj.oid})`)
}
} else {
logger.info("No annexed objects found in HEAD.")
}
return annexKeys
}
29 changes: 12 additions & 17 deletions cli/src/worker/git.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,15 @@ import { logger, setupLogging } from "../logger.ts"
import { PromiseQueue } from "./queue.ts"
import { checkKey, storeKey } from "./transferKey.ts"
import ProgressBar from "@deno-library/progress"
import { annexAdd, hashDirLower, readAnnexPath } from "./annex.ts"
import type { AnnexKeyPaths } from "./annex.ts"
import { annexAdd, getAnnexKeys, hashDirLower, readAnnexPath } from "./annex.ts"
import { GitWorkerContext } from "./types/git-context.ts"
import { resetWorktree } from "./resetWorktree.ts"
import { getDefault } from "./getDefault.ts"

let context: GitWorkerContext
let attributesCache: GitAnnexAttributes

/**
* Paths to upload to the remote annex
*
* Keys are the annex key
* Values are repo relative path
*/
const annexKeys: Record<string, string> = {}

async function done() {
logger.info("Git worker shutdown.")
// @ts-ignore
Expand Down Expand Up @@ -163,7 +156,6 @@ async function add(event: GitWorkerEventAdd) {
} else {
if (
await annexAdd(
annexKeys,
annexed,
event.data.path,
event.data.relativePath,
Expand Down Expand Up @@ -214,14 +206,13 @@ async function createAnnexBranch() {
* Generate a commit for remote.log updates if needed
*/
async function remoteSetup() {
const noAnnexKeys: Record<string, string> = {}
await commitAnnexBranch(noAnnexKeys)
await commitAnnexBranch()
}

/**
* Generate one commit for all pending git-annex branch changes
*/
async function commitAnnexBranch(annexKeys: Record<string, string>) {
async function commitAnnexBranch() {
// Find the UUID of this repository if it exists already
const expectedRemote = "OpenNeuro" // TODO - This could be more flexible?
let uuid
Expand Down Expand Up @@ -280,6 +271,7 @@ async function commitAnnexBranch(annexKeys: Record<string, string>) {
}
}
// Add logs for each annexed file
const annexKeys = await getAnnexKeys("HEAD", logger, context)
for (const [key, _path] of Object.entries(annexKeys)) {
const hashDir = join(...await hashDirLower(key))
const annexBranchPath = join(hashDir, `${key}.log`)
Expand Down Expand Up @@ -379,7 +371,7 @@ async function commit() {
author: context.author,
message: "[OpenNeuro] Added local files",
})
await commitAnnexBranch(annexKeys)
await commitAnnexBranch()
logger.info(`Committed as "${commitHash}"`)
}
}
Expand All @@ -389,6 +381,7 @@ async function commit() {
*/
async function push() {
let completed = 0
const annexKeys: AnnexKeyPaths = await getAnnexKeys("HEAD", logger, context)
const annexedObjects = Object.keys(annexKeys).length
const progress = new ProgressBar({
title: `Transferring annexed files`,
Expand All @@ -398,13 +391,15 @@ async function push() {
await progress.render(completed)
}
// Git-annex copy --to=openneuro
for (const [key, path] of Object.entries(annexKeys)) {
for (const [key, relativePath] of Object.entries(annexKeys)) {
const checkKeyResult = await checkKey({
url: context.repoEndpoint,
token: context.authorization,
}, key)
if (checkKeyResult) {
logger.info(`Skipping key "${key}" present on remote`)
completed += 1
await progress.render(completed)
} else {
let storeKeyResult = -1
let retries = 3
Expand All @@ -416,7 +411,7 @@ async function push() {
token: context.authorization,
},
key,
path,
join(context.sourcePath, relativePath),
)
if (storeKeyResult === -1 && retries > 0) {
logger.warn(`Failed to transfer annex object "${key}" - retrying`)
Expand All @@ -430,7 +425,7 @@ async function push() {
completed += 1
await progress.render(completed)
logger.info(
`Stored ${storeKeyResult} bytes for key "${key}" from path "${path}"`,
`Stored ${storeKeyResult} bytes for key "${key}" from path "${relativePath}"`,
)
}
}
Expand Down