From 08fcd720000aa741791b787f89f3e047d9ba5246 Mon Sep 17 00:00:00 2001 From: Philipp Wissmann Date: Thu, 5 Sep 2024 13:19:32 +0200 Subject: [PATCH] Add ingestion flow Adds ingestion flow with S3 transfer. Relies on sciat-cli with branch cli-next. Stub for globus transfer is implemented taking config from global application config --- cmd/openem-ingestor-app/app.go | 2 +- cmd/openem-ingestor-app/frontend/.gitignore | 2 + .../frontend/dist/.gitignore | 4 - configs/openem-ingestor-config.yaml | 14 +- go.mod | 31 ++- go.sum | 59 ++++- internal/config.go | 10 +- internal/ingestdataset.go | 248 ++++++++++++++++++ internal/s3upload.go | 112 ++++++++ internal/taskqueue.go | 7 +- 10 files changed, 459 insertions(+), 30 deletions(-) delete mode 100644 cmd/openem-ingestor-app/frontend/dist/.gitignore create mode 100644 internal/ingestdataset.go create mode 100644 internal/s3upload.go diff --git a/cmd/openem-ingestor-app/app.go b/cmd/openem-ingestor-app/app.go index 3a50cca..92c133e 100644 --- a/cmd/openem-ingestor-app/app.go +++ b/cmd/openem-ingestor-app/app.go @@ -23,7 +23,7 @@ func (w *WailsNotifier) OnTaskRemoved(id uuid.UUID) { runtime.EventsEmit(w.AppContext, "folder-removed", id) } func (w *WailsNotifier) OnTaskFailed(id uuid.UUID, err error) { - runtime.EventsEmit(w.AppContext, "upload-failed", id, err) + runtime.EventsEmit(w.AppContext, "upload-failed", id, err.Error()) } func (w *WailsNotifier) OnTaskCompleted(id uuid.UUID, seconds_elapsed int) { runtime.EventsEmit(w.AppContext, "upload-completed", id, seconds_elapsed) diff --git a/cmd/openem-ingestor-app/frontend/.gitignore b/cmd/openem-ingestor-app/frontend/.gitignore index a860310..6755e95 100644 --- a/cmd/openem-ingestor-app/frontend/.gitignore +++ b/cmd/openem-ingestor-app/frontend/.gitignore @@ -1 +1,3 @@ node_modules/** +dist/** +!dist \ No newline at end of file diff --git a/cmd/openem-ingestor-app/frontend/dist/.gitignore b/cmd/openem-ingestor-app/frontend/dist/.gitignore deleted file mode 100644 index 5c3ae63..0000000 --- a/cmd/openem-ingestor-app/frontend/dist/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -# This folder needs to exist in order for the golangci-lint to allow an embed command -.* -**/* -!.gitignore \ No newline at end of file diff --git a/configs/openem-ingestor-config.yaml b/configs/openem-ingestor-config.yaml index 943fa2d..a1f2c2b 100644 --- a/configs/openem-ingestor-config.yaml +++ b/configs/openem-ingestor-config.yaml @@ -2,12 +2,22 @@ Scicat: Host: http://scicat:8080/api/v3 AccessToken: "token" Transfer: - Method: S3 + Method: Globus S3: Endpoint: s3:9000 Bucket: landingzone Checksum: true + Location: "eu-west-1" + User: "minio_user" + Password: "minio_pass" Globus: - Endpoint: globus.psi.ch + ClientID: "" + ClientSecret: "" + RedirectURL: "" + Scopes: [] + SourceCollection: "" + SourcePrefixPath: "" + DestinationCollection: "" + DestinationPrefixPath: "" Misc: ConcurrencyLimit: 2 \ No newline at end of file diff --git a/go.mod b/go.mod index 3ffd437..6042299 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,18 @@ module github.com/SwissOpenEM/Ingestor -go 1.21 +go 1.22.2 -toolchain go1.22.1 +toolchain go1.22.6 require ( - github.com/google/uuid v1.4.0 + github.com/SwissOpenEM/globus v0.0.0-20240822132653-119ec5e19eab + github.com/fatih/color v1.14.1 + github.com/google/uuid v1.6.0 + github.com/minio/minio-go/v7 v7.0.76 + github.com/paulscherrerinstitute/scicat-cli v0.1.6-0.20240905105622-f6eaf0dd713f github.com/spf13/viper v1.19.0 github.com/wailsapp/wails/v2 v2.9.1 + golang.org/x/oauth2 v0.19.0 ) require ( @@ -30,10 +35,18 @@ require ( ) require ( + github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 // indirect github.com/bep/debounce v1.2.1 // indirect + github.com/creack/pty v1.1.17 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/go-ini/ini v1.67.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect + github.com/goccy/go-json v0.10.3 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e // indirect + github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/labstack/echo/v4 v4.10.2 // indirect github.com/labstack/gommon v0.4.0 // indirect github.com/leaanthony/go-ansi-parser v1.6.0 // indirect @@ -42,18 +55,22 @@ require ( github.com/leaanthony/u v1.1.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mcuadros/go-version v0.0.0-20190830083331-035f6764e8d2 // indirect + github.com/minio/md5-simd v1.1.2 // indirect github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/rivo/uniseg v0.4.4 // indirect + github.com/rs/xid v1.6.0 // indirect github.com/samber/lo v1.38.1 // indirect github.com/tkrajina/go-reflector v0.5.6 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect github.com/wailsapp/go-webview2 v1.0.10 // indirect github.com/wailsapp/mimetype v1.4.1 // indirect - golang.org/x/crypto v0.23.0 // indirect + golang.org/x/crypto v0.26.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index b80708f..8522e9b 100644 --- a/go.sum +++ b/go.sum @@ -1,25 +1,46 @@ +github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63nhn5WAunQHLTznkw5W8b1Xc0dNjp83s= +github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w= +github.com/SwissOpenEM/globus v0.0.0-20240822132653-119ec5e19eab h1:Kn57UraLrMUdm/fXZM6qD60xh3vd3B9k1I42JIGvcqo= +github.com/SwissOpenEM/globus v0.0.0-20240822132653-119ec5e19eab/go.mod h1:HiMwPdtUdztPpnA0TamNWBBRPGYjEJWXSRUIV5vjqXc= github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY= github.com/bep/debounce v1.2.1/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0= +github.com/creack/pty v1.1.17 h1:QeVUsEDNrLBW4tMgZHvxy18sKtr6VI492kBhUfhDJNI= +github.com/creack/pty v1.1.17/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= +github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= +github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e h1:Q3+PugElBCf4PFpxhErSzU3/PY5sFL5Z6rfv4AbGAck= github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e/go.mod h1:alcuEEnZsY1WQsagKhZDsoPCRoOijYqhZvPwLG0kzVs= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -50,8 +71,16 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mcuadros/go-version v0.0.0-20190830083331-035f6764e8d2 h1:YocNLcTBdEdvY3iDK6jfWXvEaM5OCKkjxPKoJRdB3Gg= +github.com/mcuadros/go-version v0.0.0-20190830083331-035f6764e8d2/go.mod h1:76rfSfYPWj01Z85hUf/ituArm797mNKcvINh1OlsZKo= +github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= +github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= +github.com/minio/minio-go/v7 v7.0.76 h1:9nxHH2XDai61cT/EFhyIw/wW4vJfpPNvl7lSFpRt+Ng= +github.com/minio/minio-go/v7 v7.0.76/go.mod h1:AVM3IUN6WwKzmwBxVdjzhH8xq+f57JSbbvzqvUzR6eg= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/paulscherrerinstitute/scicat-cli v0.1.6-0.20240905105622-f6eaf0dd713f h1:Q/oN9ZtduvFZrSbLfa6iW9iwiNZcuUgr4vaMIOldnAM= +github.com/paulscherrerinstitute/scicat-cli v0.1.6-0.20240905105622-f6eaf0dd713f/go.mod h1:6V+MY3ONh5zkDppy+iWVCyob7JjjgDZqhoA5NgxMT8E= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= @@ -66,6 +95,8 @@ github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= @@ -87,6 +118,7 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -112,13 +144,15 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/net v0.0.0-20210505024714-0287a6fb4125/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/oauth2 v0.19.0 h1:9+E/EZBCbTLNrbN35fHv/a/d/mOBatymz1zbtQrXpIg= +golang.org/x/oauth2 v0.19.0/go.mod h1:vYi7skDa1x015PmRRYZ7+s1cWyPgrPiSYRe4rnsexc8= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200810151505-1b9f1253b3ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -128,19 +162,24 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/config.go b/internal/config.go index dc76d9f..1df2a14 100644 --- a/internal/config.go +++ b/internal/config.go @@ -5,6 +5,7 @@ import ( "os" "path" + "github.com/paulscherrerinstitute/scicat-cli/cmd/cliutils" "github.com/spf13/viper" ) @@ -16,6 +17,9 @@ type Scicat struct { type S3 struct { Endpoint string `string:"Endpoint"` Bucket string `string:"Bucket"` + Location string `string:"Location"` + User string `string:"User"` + Password string `string:"Password"` Checksum bool `bool:"Checksum"` } @@ -24,9 +28,9 @@ type Globus struct { } type Transfer struct { - Method string `string:"Method"` - S3 S3 `mapstructure:"S3"` - Globus Globus `mapstructure:"Globus"` + Method string `string:"Method"` + S3 S3 `mapstructure:"S3"` + Globus cliutils.GlobusConfig `mapstructure:"cliutils.GlobusConfig"` } type Misc struct { diff --git a/internal/ingestdataset.go b/internal/ingestdataset.go new file mode 100644 index 0000000..7cbc747 --- /dev/null +++ b/internal/ingestdataset.go @@ -0,0 +1,248 @@ +package core + +import ( + "bufio" + "context" + "crypto/tls" + "errors" + "fmt" + "log" + "net/http" + "os" + "path/filepath" + "strings" + "time" + + "github.com/SwissOpenEM/globus" + "github.com/fatih/color" + "github.com/paulscherrerinstitute/scicat-cli/cmd/cliutils" + "github.com/paulscherrerinstitute/scicat-cli/datasetIngestor" + "golang.org/x/oauth2" +) + +func createLocalSymlinkCallbackForFileLister(skipSymlinks *string, skippedLinks *uint) func(symlinkPath string, sourceFolder string) (bool, error) { + scanner := bufio.NewScanner(os.Stdin) + return func(symlinkPath string, sourceFolder string) (bool, error) { + keep := true + pointee, _ := os.Readlink(symlinkPath) // just pass the file name + if !filepath.IsAbs(pointee) { + symlinkAbs, err := filepath.Abs(filepath.Dir(symlinkPath)) + if err != nil { + return false, err + } + // log.Printf(" CWD path pointee :%v %v %v", dir, filepath.Dir(path), pointee) + pointeeAbs := filepath.Join(symlinkAbs, pointee) + pointee, err = filepath.EvalSymlinks(pointeeAbs) + if err != nil { + log.Printf("Could not follow symlink for file:%v %v", pointeeAbs, err) + keep = false + log.Printf("keep variable set to %v", keep) + } + } + //fmt.Printf("Skip variable:%v\n", *skip) + if *skipSymlinks == "ka" || *skipSymlinks == "kA" { + keep = true + } else if *skipSymlinks == "sa" || *skipSymlinks == "sA" { + keep = false + } else if *skipSymlinks == "da" || *skipSymlinks == "dA" { + keep = strings.HasPrefix(pointee, sourceFolder) + } else { + color.Set(color.FgYellow) + log.Printf("Warning: the file %s is a link pointing to %v.", symlinkPath, pointee) + color.Unset() + log.Printf(` + Please test if this link is meaningful and not pointing + outside the sourceFolder %s. The default behaviour is to + keep only internal links within a source folder. + You can also specify that you want to apply the same answer to ALL + subsequent links within the current dataset, by appending an a (dA,ka,sa). + If you want to give the same answer even to all subsequent datasets + in this command then specify a capital 'A', e.g. (dA,kA,sA) + Do you want to keep the link in dataset or skip it (D(efault)/k(eep)/s(kip) ?`, sourceFolder) + scanner.Scan() + *skipSymlinks = scanner.Text() + if *skipSymlinks == "" { + *skipSymlinks = "d" + } + if *skipSymlinks == "d" || *skipSymlinks == "dA" { + keep = strings.HasPrefix(pointee, sourceFolder) + } else { + keep = (*skipSymlinks != "s" && *skipSymlinks != "sa" && *skipSymlinks != "sA") + } + } + if keep { + color.Set(color.FgGreen) + log.Printf("You chose to keep the link %v -> %v.\n\n", symlinkPath, pointee) + } else { + color.Set(color.FgRed) + *skippedLinks++ + log.Printf("You chose to remove the link %v -> %v.\n\n", symlinkPath, pointee) + } + color.Unset() + return keep, nil + } +} + +func createLocalFilenameFilterCallback(illegalFileNamesCounter *uint) func(filepath string) bool { + return func(filepath string) (keep bool) { + keep = true + // make sure that filenames do not contain characters like "\" or "*" + if strings.ContainsAny(filepath, "*\\") { + color.Set(color.FgRed) + log.Printf("Warning: the file %s contains illegal characters like *,\\ and will not be archived.", filepath) + color.Unset() + if illegalFileNamesCounter != nil { + *illegalFileNamesCounter++ + } + keep = false + } + // and check for triple blanks, they are used to separate columns in messages + if keep && strings.Contains(filepath, " ") { + color.Set(color.FgRed) + log.Printf("Warning: the file %s contains 3 consecutive blanks which is not allowed. The file not be archived.", filepath) + color.Unset() + if illegalFileNamesCounter != nil { + *illegalFileNamesCounter++ + } + keep = false + } + return keep + } +} + +func GlobusLogin(gConfig cliutils.GlobusConfig) (gClient globus.GlobusClient, err error) { + + // config setup + ctx := context.Background() + clientConfig := globus.AuthGenerateOauthClientConfig(ctx, gConfig.ClientID, gConfig.ClientSecret, gConfig.RedirectURL, gConfig.Scopes) + verifier := oauth2.GenerateVerifier() + clientConfig.AuthCodeURL("state", oauth2.AccessTypeOffline, oauth2.S256ChallengeOption(verifier)) + + // redirect user to consent page to ask for permission and obtain the code + url := clientConfig.AuthCodeURL("state", oauth2.AccessTypeOffline, oauth2.S256ChallengeOption(verifier)) + fmt.Printf("Visit the URL for the auth dialog: %v\n\nEnter the received code here: ", url) + + // negotiate token and create client + var code string + if _, err := fmt.Scan(&code); err != nil { + return globus.GlobusClient{}, err + } + tok, err := clientConfig.Exchange(ctx, code, oauth2.VerifierOption(verifier)) + if err != nil { + return globus.GlobusClient{}, fmt.Errorf("oauth2 exchange failed: %v", err) + } + + // return globus client + return globus.HttpClientToGlobusClient(clientConfig.Client(ctx, tok)), nil +} + +func IngestDataset( + task_context context.Context, + task IngestionTask, + config Config, + notifier ProgressNotifier, +) (string, error) { + + var http_client = &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, + Timeout: 120 * time.Second} + + SCIAT_API_URL := config.Scicat.Host + + const TAPECOPIES = 2 // dummy value, unused + const DATASETFILELISTTXT = "" + + var skipSymlinks string = "dA" // skip all simlinks + + user := map[string]string{ + "accessToken": config.Scicat.AccessToken, + } + // check if dataset already exists (identified by source folder) + metadatafile := filepath.Join(task.DatasetFolder.FolderPath, "metadata.json") + if _, err := os.Stat(metadatafile); errors.Is(err, os.ErrNotExist) { + return "", err + } + + accessGroups := []string{} + + newMetaDataMap, metadataSourceFolder, _, err := datasetIngestor.ReadAndCheckMetadata(http_client, SCIAT_API_URL, metadatafile, user, accessGroups) + + _ = metadataSourceFolder + if err != nil { + return "", err + } + var skippedLinks uint = 0 + var illegalFileNames uint = 0 + localSymlinkCallback := createLocalSymlinkCallbackForFileLister(&skipSymlinks, &skippedLinks) + localFilepathFilterCallback := createLocalFilenameFilterCallback(&illegalFileNames) + + // collect (local) files + fullFileArray, startTime, endTime, owner, numFiles, totalSize, err := datasetIngestor.GetLocalFileList(task.DatasetFolder.FolderPath, DATASETFILELISTTXT, localSymlinkCallback, localFilepathFilterCallback) + _ = numFiles + _ = totalSize + _ = startTime + _ = endTime + _ = owner + if err != nil { + log.Printf("") + return "", err + } + originalMetaDataMap := map[string]string{} + datasetIngestor.UpdateMetaData(http_client, SCIAT_API_URL, user, originalMetaDataMap, newMetaDataMap, startTime, endTime, owner, TAPECOPIES) + + newMetaDataMap["datasetlifecycle"] = map[string]interface{}{} + newMetaDataMap["datasetlifecycle"].(map[string]interface{})["isOnCentralDisk"] = false + newMetaDataMap["datasetlifecycle"].(map[string]interface{})["archiveStatusMessage"] = "filesNotYetAvailable" + newMetaDataMap["datasetlifecycle"].(map[string]interface{})["archivable"] = false + + datasetId, err := datasetIngestor.IngestDataset(http_client, SCIAT_API_URL, newMetaDataMap, fullFileArray, user) + if err != nil { + return "", err + } + + transfer_params := cliutils.TransferParams{ + DatasetId: datasetId, + DatasetSourceFolder: task.DatasetFolder.FolderPath, + } + + switch task.TransferMethod { + case TransferS3: + _, err = UploadS3(task_context, datasetId, task.DatasetFolder.FolderPath, task.DatasetFolder.Id, config.Transfer.S3, notifier) + case TransferGlobus: + // var gConfig cliutils.GlobusConfig + gConfig := config.Transfer.Globus + // var globusConfigPath string + // if cmd.Flags().Lookup("globus-cfg").Changed { + // globusConfigPath = globusCfgFlag + // } else { + // execPath, err := os.Executable() + // if err != nil { + // log.Fatalln("can't find executable path:", err) + // } + // globusConfigPath = filepath.Join(filepath.Dir(execPath), "globus.yaml") + // } + globusClient, _ := GlobusLogin(gConfig) + + globus_params := cliutils.GlobusParams{ + GlobusClient: globusClient, + SrcCollection: gConfig.SourceCollection, + SrcPrefixPath: gConfig.SourcePrefixPath, + DestCollection: gConfig.DestinationCollection, + DestPrefixPath: gConfig.DestinationPrefixPath, + // Filelist: [string("")], //filePathList, + // IsSymlinkList: [false], //isSymlinkList, + } + transfer_params.GlobusParams = globus_params + cliutils.GlobusTransfer(transfer_params) + _: + } + + if err != nil { + return datasetId, err + } + + // mark dataset archivable + err = datasetIngestor.MarkFilesReady(http_client, SCIAT_API_URL, datasetId, user) + return datasetId, err + +} diff --git a/internal/s3upload.go b/internal/s3upload.go new file mode 100644 index 0000000..f6febce --- /dev/null +++ b/internal/s3upload.go @@ -0,0 +1,112 @@ +package core + +import ( + "context" + "log" + "os" + "path" + "time" + + "github.com/google/uuid" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +// Progress notifier object for Minio upload +type MinioProgressNotifier struct { + total_file_size int64 + current_size int64 + files_count int + current_file int + previous_percentage float64 + start_time time.Time + id uuid.UUID + notifier ProgressNotifier +} + +// Callback that gets called by fputobject. +// Note: does not work for multipart uploads +func (pn *MinioProgressNotifier) Read(p []byte) (n int, err error) { + n = len(p) + pn.current_size += int64(n) + + pn.notifier.OnTaskProgress(pn.id, pn.current_file, pn.files_count, int(time.Since(pn.start_time).Seconds())) + return +} + +// Upload all files in a folder to a minio bucket +func UploadS3(task_ctx context.Context, dataset_pid string, datasetSourceFolder string, uploadId uuid.UUID, options S3, notifier ProgressNotifier) (string, error) { + accessKeyID := options.User + secretAccessKey := options.Password + creds := credentials.NewStaticV4(accessKeyID, secretAccessKey, "") + useSSL := false + + log.Printf("Using endpoint %s\n", options.Endpoint) + + // Initialize minio client object. + minioClient, err := minio.New(options.Endpoint, &minio.Options{ + Creds: creds, + Secure: useSSL, + }) + + if err != nil { + log.Fatalln(err) + } + + // Make a new bucket called testbucket. + bucketName := options.Bucket + + err = minioClient.MakeBucket(task_ctx, bucketName, minio.MakeBucketOptions{Region: options.Location}) + if err != nil { + // Check to see if we already own this bucket (which happens if you run this twice) + exists, errBucketExists := minioClient.BucketExists(task_ctx, bucketName) + if errBucketExists == nil && exists { + log.Printf("We already own %s\n", bucketName) + } else { + log.Fatalln(err) + } + } else { + log.Printf("Successfully created %s\n", bucketName) + } + + contentType := "application/octet-stream" + + entries, err := os.ReadDir(datasetSourceFolder) + if err != nil { + return "", err + } + + pn := MinioProgressNotifier{files_count: len(entries), previous_percentage: 0.0, start_time: time.Now(), id: uploadId, notifier: notifier} + + for idx, f := range entries { + select { + case <-task_ctx.Done(): + pn.notifier.OnTaskCanceled(uploadId) + return "Upload canceled", nil + + default: + filePath := path.Join(datasetSourceFolder, f.Name()) + objectName := "openem-network/datasets/" + dataset_pid + "/raw_files/" + f.Name() + + pn.current_file = idx + 1 + fileinfo, _ := os.Stat(filePath) + pn.total_file_size = fileinfo.Size() + + notifier.OnTaskProgress(uploadId, pn.current_file, pn.files_count, 0) + + _, err := minioClient.FPutObject(task_ctx, bucketName, objectName, filePath, minio.PutObjectOptions{ + ContentType: contentType, + Progress: &pn, + SendContentMd5: true, + NumThreads: 4, + DisableMultipart: false, + ConcurrentStreamParts: true, + }) + if err != nil { + return dataset_pid, err + } + } + } + + return dataset_pid, nil +} diff --git a/internal/taskqueue.go b/internal/taskqueue.go index 7eb0cd6..893120b 100644 --- a/internal/taskqueue.go +++ b/internal/taskqueue.go @@ -150,7 +150,7 @@ func (w *TaskQueue) ScheduleTask(id uuid.UUID) { } -func TestIngestionFunction(task_context context.Context, task IngestionTask, notifier ProgressNotifier) (string, error) { +func TestIngestionFunction(task_context context.Context, task IngestionTask, config Config, notifier ProgressNotifier) (string, error) { start := time.Now() @@ -166,8 +166,9 @@ func TestIngestionFunction(task_context context.Context, task IngestionTask, not func (w *TaskQueue) IngestDataset(task_context context.Context, task IngestionTask) TaskResult { start := time.Now() - datasetPID, _ := TestIngestionFunction(task_context, task, w.Notifier) + // datasetPID, _ := TestIngestionFunction(task_context, task, w.Config, w.Notifier) + datasetPID, err := IngestDataset(task_context, task, w.Config, w.Notifier) end := time.Now() elapsed := end.Sub(start) - return TaskResult{Dataset_PID: datasetPID, Elapsed_seconds: int(elapsed.Seconds()), Error: nil} + return TaskResult{Dataset_PID: datasetPID, Elapsed_seconds: int(elapsed.Seconds()), Error: err} }