Skip to content

Commit dbbf132

Browse files
committed
support multipart upload
1 parent d356210 commit dbbf132

6 files changed

Lines changed: 199 additions & 18 deletions

File tree

internal/cmd/db_create.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func CreateDatabase(name string) error {
8686
}
8787

8888
isAWS := strings.HasPrefix(group.Primary, "aws-")
89-
seed, err := parseDBSeedFlags(client, isAWS)
89+
seed, err := parseDBSeedFlags(client, isAWS, multipartFlag)
9090
if err != nil {
9191
return err
9292
}

internal/cmd/db_import.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ import (
88
"github.com/spf13/cobra"
99
)
1010

11+
var multipartFlag bool
12+
1113
func init() {
1214
dbCmd.AddCommand(importCmd)
1315
addGroupFlag(importCmd)
16+
importCmd.Flags().BoolVar(&multipartFlag, "multipart", false, "force multipart upload")
1417
}
1518

1619
var importCmd = &cobra.Command{

internal/cmd/group_flag.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func parseTimestampFlag() (*time.Time, error) {
6464
return &timestamp, nil
6565
}
6666

67-
func parseDBSeedFlags(client *turso.Client, isAWS bool) (*turso.DBSeed, error) {
67+
func parseDBSeedFlags(client *turso.Client, isAWS bool, multipart bool) (*turso.DBSeed, error) {
6868
if countFlags(fromDBFlag, fromDumpFlag, fromFileFlag, fromDumpURLFlag, fromCSVFlag) > 1 {
6969
return nil, errors.New("only one of --from prefixed flags can be used at a time")
7070
}
@@ -85,7 +85,7 @@ func parseDBSeedFlags(client *turso.Client, isAWS bool) (*turso.DBSeed, error) {
8585
}
8686

8787
if fromFileFlag != "" {
88-
return handleDBFile(client, fromFileFlag, isAWS)
88+
return handleDBFile(client, fromFileFlag, isAWS, multipart)
8989
}
9090

9191
if fromDumpFlag != "" {
@@ -271,20 +271,21 @@ func sqliteFileIntegrityChecks(file string) error {
271271
return nil
272272
}
273273

274-
func handleDBFileAWS(file string) (*turso.DBSeed, error) {
274+
func handleDBFileAWS(file string, multipart bool) (*turso.DBSeed, error) {
275275
if err := sqliteFileIntegrityChecks(file); err != nil {
276276
return nil, err
277277
}
278278

279279
seed := &turso.DBSeed{
280-
Type: "database_upload",
281-
Filepath: file,
280+
Type: "database_upload",
281+
Filepath: file,
282+
Multipart: multipart,
282283
}
283284

284285
return seed, nil
285286
}
286287

287-
func handleDBFile(client *turso.Client, file string, isAWS bool) (*turso.DBSeed, error) {
288+
func handleDBFile(client *turso.Client, file string, isAWS bool, multipart bool) (*turso.DBSeed, error) {
288289
if err := checkFileExists(file); err != nil {
289290
return nil, err
290291
}
@@ -293,7 +294,7 @@ func handleDBFile(client *turso.Client, file string, isAWS bool) (*turso.DBSeed,
293294
}
294295

295296
if isAWS {
296-
return handleDBFileAWS(file)
297+
return handleDBFileAWS(file, multipart)
297298
}
298299

299300
if err := checkSQLiteFile(file); err != nil {
@@ -393,7 +394,7 @@ func handleCSVFile(client *turso.Client, file, csvTableName string, separator ru
393394
return nil, err
394395
}
395396

396-
seed, err := handleDBFile(client, tempDB.Name(), false)
397+
seed, err := handleDBFile(client, tempDB.Name(), false, false)
397398
if err != nil {
398399
return nil, err
399400
}

internal/turso/databases.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"github.com/tursodatabase/turso-cli/internal/prompt"
1616
)
1717

18+
const multipartUploadThresholdBytes = 100 * 1024 * 1024 // 100MB
19+
1820
type Database struct {
1921
ID string `json:"dbId" mapstructure:"dbId"`
2022
Name string
@@ -131,7 +133,8 @@ type DBSeed struct {
131133
Timestamp *time.Time `json:"timestamp,omitempty"`
132134
// This is only used locally when uploading a database file and
133135
// never passed to the control plane as JSON.
134-
Filepath string `json:"-"`
136+
Filepath string `json:"-"`
137+
Multipart bool `json:"-"`
135138
}
136139

137140
type RemoteEncryption struct {
@@ -155,14 +158,17 @@ type CreateDatabaseBody struct {
155158
func (d *DatabasesClient) Create(name, location, image, extensions, group string, schema string, isSchema bool, seed *DBSeed, sizeLimit, remoteEncryptionCipher, remoteEncryptionKey string, spinner *prompt.SpinnerT) (*CreateDatabaseResponse, error) {
156159
isTursoServerUpload := seed != nil && seed.Type == "database_upload" && seed.Filepath != ""
157160
var uploadFilepath string
161+
var useMultipart bool
158162
var params CreateDatabaseBody
159163
if isTursoServerUpload {
160164
uploadFilepath = seed.Filepath
165+
useMultipart = seed.Multipart
161166
// Clear the unused seed parameters, only Type=database_upload is used.
162167
seed.Filepath = ""
163168
seed.Name = ""
164169
seed.URL = ""
165170
seed.Timestamp = nil
171+
seed.Multipart = false
166172
params = CreateDatabaseBody{
167173
Name: name,
168174
Location: location,
@@ -210,7 +216,7 @@ func (d *DatabasesClient) Create(name, location, image, extensions, group string
210216
}
211217

212218
if isTursoServerUpload {
213-
if _, err = d.UploadDatabaseAWS(data, group, uploadFilepath, remoteEncryptionCipher, remoteEncryptionKey, spinner); err != nil {
219+
if _, err = d.UploadDatabaseAWS(data, group, uploadFilepath, remoteEncryptionCipher, remoteEncryptionKey, useMultipart, spinner); err != nil {
214220
// Clean up the database if the upload fails
215221
if deleteErr := d.Delete(data.Database.Name); deleteErr != nil {
216222
fmt.Printf("%v", deleteErr)
@@ -231,7 +237,7 @@ func (d *DatabasesClient) Create(name, location, image, extensions, group string
231237
// This call happens in DatabasesClient.Create() above, after which it calls this function.
232238
// 2. This function creates a DB token for the newly-created DB, and then calls turso-server to upload the database file.
233239
// turso-server will perform validations on the file and 'activate' the db if everything is ok.
234-
func (d *DatabasesClient) UploadDatabaseAWS(resp *CreateDatabaseResponse, group, uploadFilepath, remoteEncryptionCipher, remoteEncryptionKey string, spinner *prompt.SpinnerT) (*CreateDatabaseResponse, error) {
240+
func (d *DatabasesClient) UploadDatabaseAWS(resp *CreateDatabaseResponse, group, uploadFilepath, remoteEncryptionCipher, remoteEncryptionKey string, useMultipart bool, spinner *prompt.SpinnerT) (*CreateDatabaseResponse, error) {
235241
// Create a short-lived DB token for the newly created database to facilitate the upload
236242
token, err := d.Token(resp.Database.Name, "1h", false, nil, nil)
237243
if err != nil {
@@ -249,7 +255,18 @@ func (d *DatabasesClient) UploadDatabaseAWS(resp *CreateDatabaseResponse, group,
249255

250256
// Upload the database file
251257
spinner.Text(fmt.Sprintf("Uploading database %s in group %s, this may take a while...", internal.Emph(resp.Database.Name), internal.Emph(group)))
252-
err = tursoServerClient.UploadFile(uploadFilepath, remoteEncryptionCipher, remoteEncryptionKey, func(progressPct int, uploadedBytes int64, totalBytes int64, elapsedTime time.Duration, done bool) {
258+
259+
stat, err := os.Stat(uploadFilepath)
260+
if err != nil {
261+
return nil, fmt.Errorf("failed to stat file %s: %w", uploadFilepath, err)
262+
}
263+
264+
uploadFunc := tursoServerClient.UploadFileSinglePart
265+
if useMultipart || stat.Size() > multipartUploadThresholdBytes {
266+
uploadFunc = tursoServerClient.UploadFileMultipart
267+
}
268+
269+
err = uploadFunc(uploadFilepath, remoteEncryptionCipher, remoteEncryptionKey, func(progressPct int, uploadedBytes int64, totalBytes int64, elapsedTime time.Duration, done bool) {
253270
totalSeconds := int(elapsedTime.Seconds())
254271
minutes := totalSeconds / 60
255272
seconds := totalSeconds % 60

internal/turso/turso.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/url"
1111
"os"
1212
"runtime"
13+
"strconv"
1314

1415
"github.com/tursodatabase/turso-cli/internal/flags"
1516
)
@@ -95,13 +96,21 @@ func (t *Client) newRequest(method, urlPath string, body io.Reader, extraHeaders
9596

9697
func (t *Client) do(method, path string, body io.Reader, extraHeaders map[string]string) (*http.Response, error) {
9798
req, err := t.newRequest(method, path, body, extraHeaders)
99+
if err != nil {
100+
return nil, err
101+
}
102+
if contentLength, ok := extraHeaders["Content-Length"]; ok {
103+
length, err := strconv.Atoi(contentLength)
104+
if err != nil {
105+
return nil, err
106+
}
107+
req.ContentLength = int64(length)
108+
req.TransferEncoding = nil
109+
}
98110
var reqDump string
99111
if flags.Debug() {
100112
reqDump = dumpRequest(req)
101113
}
102-
if err != nil {
103-
return nil, err
104-
}
105114
resp, err := http.DefaultClient.Do(req)
106115
if err != nil {
107116
return nil, err
@@ -167,6 +176,14 @@ func (t *Client) PostBinary(path string, body io.Reader, headers map[string]stri
167176
return t.do("POST", path, body, headers)
168177
}
169178

179+
func (t *Client) PutBinary(path string, body io.Reader, headers map[string]string) (*http.Response, error) {
180+
if headers == nil {
181+
headers = make(map[string]string)
182+
}
183+
headers["Content-Type"] = "application/octet-stream"
184+
return t.do("PUT", path, body, headers)
185+
}
186+
170187
func (t *Client) Patch(path string, body io.Reader) (*http.Response, error) {
171188
return t.do("PATCH", path, body, Header("Content-Type", "application/json"))
172189
}

internal/turso/tursoServer.go

Lines changed: 145 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"net/http"
1212
"net/url"
1313
"os"
14+
"strconv"
15+
"syscall"
1416
"time"
1517
)
1618

@@ -55,10 +57,10 @@ func NewTursoServerClient(baseURL *url.URL, token string, cliVersion string, org
5557
}, nil
5658
}
5759

58-
// UploadFile uploads a database file to the Turso server.
60+
// UploadFileSinglePart uploads a database file to the Turso server using a single request.
5961
// it assumes a SQLite file exists at 'filepath'.
6062
// it streams the file to the server, and calls the onProgress callback with the progress of the upload.
61-
func (i *TursoServerClient) UploadFile(filepath, remoteEncryptionCipher, remoteEncryptionKey string, onUploadProgress func(progressPct int, uploadedBytes int64, totalBytes int64, elapsedTime time.Duration, done bool)) error {
63+
func (i *TursoServerClient) UploadFileSinglePart(filepath, remoteEncryptionCipher, remoteEncryptionKey string, onUploadProgress func(progressPct int, uploadedBytes int64, totalBytes int64, elapsedTime time.Duration, done bool)) error {
6264
file, err := os.Open(filepath)
6365
if err != nil {
6466
return fmt.Errorf("failed to open file %s: %w", filepath, err)
@@ -105,6 +107,147 @@ func (i *TursoServerClient) UploadFile(filepath, remoteEncryptionCipher, remoteE
105107
return nil
106108
}
107109

110+
// UploadFileMultipart uploads a database file using the multipart upload flow.
111+
func (i *TursoServerClient) UploadFileMultipart(filepath string, remoteEncryptionCipher, remoteEncryptionKey string, onUploadProgress func(progressPct int, uploadedBytes int64, totalBytes int64, elapsedTime time.Duration, done bool)) error {
112+
file, err := os.Open(filepath)
113+
if err != nil {
114+
return fmt.Errorf("failed to open file %s: %w", filepath, err)
115+
}
116+
defer file.Close()
117+
118+
if err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX); err == nil {
119+
// locking is on a best effort basis
120+
defer syscall.Flock(int(file.Fd()), syscall.LOCK_UN)
121+
}
122+
123+
stat, err := file.Stat()
124+
if err != nil {
125+
return fmt.Errorf("failed to get file stats for %s: %w", filepath, err)
126+
}
127+
128+
totalSize := stat.Size()
129+
startTime := time.Now()
130+
131+
chunkSize, err := i.startMultipartUpload(totalSize)
132+
if err != nil {
133+
return err
134+
}
135+
136+
uploadedBytes, err := i.uploadChunks(chunkSize, file, totalSize, startTime, remoteEncryptionCipher, remoteEncryptionKey, onUploadProgress)
137+
if err != nil {
138+
return err
139+
}
140+
141+
if err = i.finalizeUpload(); err != nil {
142+
return err
143+
}
144+
145+
elapsedTime := time.Since(startTime)
146+
onUploadProgress(100, uploadedBytes, totalSize, elapsedTime, true)
147+
148+
return nil
149+
}
150+
151+
func (i *TursoServerClient) startMultipartUpload(dbSize int64) (int64, error) {
152+
requestBody := map[string]int64{
153+
"db_size_bytes": dbSize,
154+
}
155+
156+
body, err := marshal(requestBody)
157+
if err != nil {
158+
return 0, fmt.Errorf("failed to marshal multipart upload request: %w", err)
159+
}
160+
161+
r, err := i.client.Put("/v2/upload/start", body)
162+
if err != nil {
163+
return 0, fmt.Errorf("failed to initiate multipart upload: %w", err)
164+
}
165+
defer r.Body.Close()
166+
167+
if r.StatusCode != http.StatusOK {
168+
body, err := io.ReadAll(r.Body)
169+
if err != nil {
170+
return 0, fmt.Errorf("initiate multipart upload failed with status code %d and error reading response: %v", r.StatusCode, err)
171+
}
172+
return 0, fmt.Errorf("initiate multipart upload failed with status code %d: %s", r.StatusCode, string(body))
173+
}
174+
175+
type multipartUploadResponse struct {
176+
ChunkSize int64 `json:"chunk_size"`
177+
}
178+
var uploadResp multipartUploadResponse
179+
if err := json.NewDecoder(r.Body).Decode(&uploadResp); err != nil {
180+
return 0, fmt.Errorf("failed to decode multipart upload response: %w", err)
181+
}
182+
183+
return uploadResp.ChunkSize, nil
184+
}
185+
186+
func (i *TursoServerClient) uploadChunks(chunkSize int64, file io.Reader, totalSize int64, startTime time.Time, remoteEncryptionCipher, remoteEncryptionKey string, onUploadProgress func(progressPct int, uploadedBytes int64, totalBytes int64, elapsedTime time.Duration, done bool)) (int64, error) {
187+
var uploadedBytes int64 = 0
188+
chunkID := 0
189+
190+
for uploadedBytes < totalSize {
191+
remaining := totalSize - uploadedBytes
192+
currentChunkSize := chunkSize
193+
if remaining < chunkSize {
194+
currentChunkSize = remaining
195+
}
196+
197+
chunkReader := io.LimitReader(file, currentChunkSize)
198+
chunkPath := fmt.Sprintf("/v2/upload/chunk/%d", chunkID)
199+
200+
var headers = map[string]string{}
201+
if remoteEncryptionCipher != "" && remoteEncryptionKey != "" {
202+
headers[EncryptionCipherHeader] = remoteEncryptionCipher
203+
headers[EncryptionKeyHeader] = remoteEncryptionKey
204+
}
205+
headers["Content-Length"] = strconv.FormatInt(currentChunkSize, 10)
206+
207+
r, err := i.client.PutBinary(chunkPath, chunkReader, headers)
208+
if err != nil {
209+
return 0, fmt.Errorf("failed to upload chunk %d: %w", chunkID, err)
210+
}
211+
212+
if r.StatusCode != http.StatusOK && r.StatusCode != http.StatusCreated {
213+
if body, err := io.ReadAll(r.Body); err != nil {
214+
_ = r.Body.Close()
215+
return 0, fmt.Errorf("upload chunk %d failed with status code %d and error reading response: %v", chunkID, r.StatusCode, err)
216+
} else {
217+
_ = r.Body.Close()
218+
return 0, fmt.Errorf("upload chunk %d failed with status code %d: %s", chunkID, r.StatusCode, string(body))
219+
}
220+
} else {
221+
_ = r.Body.Close()
222+
}
223+
224+
uploadedBytes += currentChunkSize
225+
progressPct := int(float64(uploadedBytes) / float64(totalSize) * 100)
226+
elapsedTime := time.Since(startTime)
227+
onUploadProgress(progressPct, uploadedBytes, totalSize, elapsedTime, false)
228+
229+
chunkID++
230+
}
231+
return uploadedBytes, nil
232+
}
233+
234+
func (i *TursoServerClient) finalizeUpload() error {
235+
r, err := i.client.Put("/v2/upload/finalize", nil)
236+
if err != nil {
237+
return fmt.Errorf("failed to finalize multipart upload: %w", err)
238+
}
239+
defer r.Body.Close()
240+
241+
if r.StatusCode != http.StatusOK {
242+
body, err := io.ReadAll(r.Body)
243+
if err != nil {
244+
return fmt.Errorf("finalize multipart upload failed with status code %d and error reading response: %v", r.StatusCode, err)
245+
}
246+
return fmt.Errorf("finalize multipart upload failed with status code %d: %s", r.StatusCode, string(body))
247+
}
248+
return nil
249+
}
250+
108251
type ExportInfo struct {
109252
CurrentGeneration int `json:"current_generation"`
110253
}

0 commit comments

Comments
 (0)