Skip to content

Commit 1d4891f

Browse files
author
Frank
committed
Cloudflare Astro
1 parent 5fd1a1a commit 1d4891f

File tree

9 files changed

+898
-275
lines changed

9 files changed

+898
-275
lines changed
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
package resource
2+
3+
import (
4+
"bytes"
5+
"encoding/base64"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"mime/multipart"
10+
"net/http"
11+
"os"
12+
"path/filepath"
13+
"sync"
14+
"time"
15+
)
16+
17+
type WorkerAssets struct {
18+
*CloudflareResource
19+
}
20+
21+
type WorkerAssetsInputs struct {
22+
Manifest AssetManifest `json:"manifest"`
23+
Directory string `json:"directory"`
24+
AccountId string `json:"accountId"`
25+
ApiToken string `json:"apiToken"`
26+
ScriptName string `json:"scriptName"`
27+
}
28+
29+
type WorkerAssetsOutputs struct {
30+
Manifest AssetManifest `json:"manifest"`
31+
Directory string `json:"directory"`
32+
AccountId string `json:"accountId"`
33+
ScriptName string `json:"scriptName"`
34+
Jwt string `json:"jwt"`
35+
}
36+
37+
type AssetManifest map[string]AssetEntry
38+
39+
type AssetEntry struct {
40+
Hash string `json:"hash"`
41+
Size int64 `json:"size"`
42+
ContentType string `json:"contentType"`
43+
}
44+
45+
type InitializeAssetsResponse struct {
46+
Buckets [][]string `json:"buckets"`
47+
Jwt string `json:"jwt"`
48+
}
49+
50+
type UploadResponse struct {
51+
Jwt string `json:"jwt"`
52+
}
53+
54+
55+
func (r *WorkerAssets) Create(input *WorkerAssetsInputs, output *CreateResult[WorkerAssetsOutputs]) error {
56+
jwt, err := r.handleUpload(input.Manifest, input.Directory, input.AccountId, input.ScriptName, input.ApiToken)
57+
if err != nil {
58+
return err
59+
}
60+
61+
*output = CreateResult[WorkerAssetsOutputs]{
62+
ID: "assets",
63+
Outs: WorkerAssetsOutputs{
64+
Manifest: input.Manifest,
65+
Directory: input.Directory,
66+
AccountId: input.AccountId,
67+
ScriptName: input.ScriptName,
68+
Jwt: jwt,
69+
},
70+
}
71+
return nil
72+
}
73+
74+
func (r *WorkerAssets) Update(input *UpdateInput[WorkerAssetsInputs, WorkerAssetsOutputs], output *UpdateResult[WorkerAssetsOutputs]) error {
75+
jwt, err := r.handleUpload(input.News.Manifest, input.News.Directory, input.News.AccountId, input.News.ScriptName, input.News.ApiToken)
76+
if err != nil {
77+
return err
78+
}
79+
80+
*output = UpdateResult[WorkerAssetsOutputs]{
81+
Outs: WorkerAssetsOutputs{
82+
Manifest: input.News.Manifest,
83+
Directory: input.News.Directory,
84+
AccountId: input.News.AccountId,
85+
ScriptName: input.News.ScriptName,
86+
Jwt: jwt,
87+
},
88+
}
89+
return nil
90+
}
91+
92+
func (r *WorkerAssets) handleUpload(manifest AssetManifest, directory, accountId, scriptName, apiToken string) (string, error) {
93+
94+
// Initialize assets upload session
95+
initResponse, err := r.uploadAssetManifest(accountId, scriptName, apiToken, manifest)
96+
if err != nil {
97+
return "", err
98+
}
99+
100+
// No files to upload
101+
totalFilesToUpload := 0
102+
for _, bucket := range initResponse.Buckets {
103+
totalFilesToUpload += len(bucket)
104+
}
105+
if totalFilesToUpload == 0 {
106+
return initResponse.Jwt, nil
107+
}
108+
109+
// Create channels for work distribution and error collection
110+
bucketsChan := make(chan []string)
111+
errChan := make(chan error, len(initResponse.Buckets))
112+
jwtChan := make(chan string, len(initResponse.Buckets))
113+
var wg sync.WaitGroup
114+
115+
// Start worker pool (3 workers)
116+
numWorkers := 3
117+
for i := 0; i < numWorkers; i++ {
118+
wg.Add(1)
119+
go func() {
120+
defer wg.Done()
121+
// Each worker processes buckets from the channel
122+
for hashes := range bucketsChan {
123+
jwt, err := r.uploadAssets(manifest, directory, accountId, apiToken, hashes, initResponse.Jwt)
124+
if err != nil {
125+
errChan <- fmt.Errorf("bucket %v upload failed: %w", hashes, err)
126+
return
127+
}
128+
129+
if jwt != "" {
130+
jwtChan <- jwt
131+
}
132+
}
133+
}()
134+
}
135+
136+
// Send buckets to the channel
137+
go func() {
138+
for _, bucketHashes := range initResponse.Buckets {
139+
bucketsChan <- bucketHashes
140+
}
141+
close(bucketsChan)
142+
}()
143+
144+
// Wait for all workers to finish
145+
wg.Wait()
146+
close(errChan)
147+
close(jwtChan)
148+
149+
// Check for any errors
150+
for err := range errChan {
151+
if err != nil {
152+
return "", err
153+
}
154+
}
155+
156+
// Get completion JWT (from the last bucket response)
157+
var completionJwt string
158+
for jwt := range jwtChan {
159+
completionJwt = jwt
160+
}
161+
162+
if completionJwt == "" {
163+
return "", fmt.Errorf("failed to complete asset upload - no completion JWT received")
164+
}
165+
166+
return completionJwt, nil
167+
}
168+
169+
func (r *WorkerAssets) uploadAssetManifest(accountId, scriptName, apiToken string, manifest AssetManifest) (*InitializeAssetsResponse, error) {
170+
url := fmt.Sprintf("https://api.cloudflare.com/client/v4/accounts/%s/workers/scripts/%s/assets-upload-session", accountId, scriptName)
171+
172+
jsonBody, err := json.Marshal(map[string]interface{}{
173+
"manifest": manifest,
174+
})
175+
if err != nil {
176+
return nil, err
177+
}
178+
179+
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBody))
180+
if err != nil {
181+
return nil, err
182+
}
183+
req.Header.Set("Content-Type", "application/json")
184+
req.Header.Set("Authorization", "Bearer "+apiToken)
185+
186+
client := &http.Client{Timeout: 30 * time.Second}
187+
resp, err := client.Do(req)
188+
if err != nil {
189+
return nil, err
190+
}
191+
defer resp.Body.Close()
192+
193+
// print out response body as a string
194+
if resp.StatusCode != http.StatusOK {
195+
responseBody, _ := io.ReadAll(resp.Body)
196+
return nil, fmt.Errorf("failed to initialize assets upload: HTTP %d %s", resp.StatusCode, string(responseBody))
197+
}
198+
199+
var result struct {
200+
Result InitializeAssetsResponse `json:"result"`
201+
}
202+
203+
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
204+
return nil, err
205+
}
206+
207+
if result.Result.Jwt == "" {
208+
return nil, fmt.Errorf("failed to initialize assets upload: no JWT received")
209+
}
210+
211+
return &result.Result, nil
212+
}
213+
214+
func (r *WorkerAssets) uploadAssets(manifest AssetManifest, directory, accountId, apiToken string, hashes []string, jwt string) (string, error) {
215+
var body bytes.Buffer
216+
writer := multipart.NewWriter(&body)
217+
218+
for _, hash := range hashes {
219+
// Find the file path for this hash in the manifest
220+
var fileKey string
221+
var contentType string
222+
for path, entry := range manifest {
223+
if entry.Hash == hash {
224+
fileKey = path
225+
contentType = entry.ContentType
226+
break
227+
}
228+
}
229+
if fileKey == "" {
230+
return "", fmt.Errorf("hash %s not found in manifest", hash)
231+
}
232+
233+
// Read file content
234+
absFilePath := filepath.Join(directory, fileKey)
235+
fileContent, err := os.ReadFile(absFilePath)
236+
if err != nil {
237+
return "", fmt.Errorf("failed to read file %s: %w", absFilePath, err)
238+
}
239+
240+
// Base64 encode the file content
241+
encodedContent := base64.StdEncoding.EncodeToString(fileContent)
242+
243+
// Create form field with content type header
244+
part, err := writer.CreatePart(map[string][]string{
245+
"Content-Disposition": []string{fmt.Sprintf(`form-data; name="%s"; filename="%s"`, hash, hash)},
246+
"Content-Type": []string{contentType},
247+
})
248+
if err != nil {
249+
return "", fmt.Errorf("failed to create form part %s: %w", hash, err)
250+
}
251+
252+
_, err = part.Write([]byte(encodedContent))
253+
if err != nil {
254+
return "", fmt.Errorf("failed to write encoded content for %s: %w", hash, err)
255+
}
256+
}
257+
err := writer.Close()
258+
if err != nil {
259+
return "", fmt.Errorf("failed to close writer: %w", err)
260+
}
261+
262+
url := fmt.Sprintf("https://api.cloudflare.com/client/v4/accounts/%s/workers/assets/upload?base64=true", accountId)
263+
264+
req, err := http.NewRequest("POST", url, &body)
265+
if err != nil {
266+
return "", err
267+
}
268+
req.Header.Set("Content-Type", "multipart/form-data; boundary="+writer.Boundary())
269+
req.Header.Set("Authorization", "Bearer "+jwt)
270+
271+
client := &http.Client{Timeout: 30 * time.Second}
272+
resp, err := client.Do(req)
273+
if err != nil {
274+
return "", err
275+
}
276+
defer resp.Body.Close()
277+
278+
// API returns:
279+
// - 202 Accepted if there are more buckets to upload
280+
// - 201 Created if all buckets have been uploaded
281+
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusAccepted {
282+
responseBody, _ := io.ReadAll(resp.Body)
283+
return "", fmt.Errorf("failed to upload assets: HTTP %d %s", resp.StatusCode, string(responseBody))
284+
}
285+
286+
// Decode response
287+
var result struct {
288+
Result UploadResponse `json:"result"`
289+
}
290+
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
291+
return "", err
292+
}
293+
294+
return result.Result.Jwt, nil
295+
}

pkg/server/resource/resource.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ func Register(ctx context.Context, p *project.Project, r *rpc.Server) error {
8585

8686
// Cloudflare Resources
8787
r.RegisterName("Resource.Cloudflare.DnsRecord", &CloudflareDnsRecord{cloudflareResource})
88+
r.RegisterName("Resource.Cloudflare.WorkerAssets", &WorkerAssets{cloudflareResource})
8889

8990
// Vercel Resources
9091
r.RegisterName("Resource.Vercel.DnsRecord", &VercelDnsRecord{vercelResource})

0 commit comments

Comments
 (0)