Skip to content

Commit 430ecb7

Browse files
authored
Merge pull request #3 from hcp-uw/shardLib
First steps into file sharding
2 parents fcb1100 + 5bf67d1 commit 430ecb7

File tree

11 files changed

+632
-2
lines changed

11 files changed

+632
-2
lines changed

.github/workflows/go-tests.yaml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
name: Go CI
2+
3+
on:
4+
push:
5+
branches: [ main, master ]
6+
pull_request:
7+
branches: [ main, master ]
8+
9+
jobs:
10+
ci:
11+
runs-on: ubuntu-latest
12+
13+
steps:
14+
# --- Checkout code (Pinned to v4) ---
15+
- name: Checkout repository
16+
uses: actions/checkout@v4
17+
18+
# --- Setup Go (Pinned to v6) ---
19+
- name: Set up Go
20+
uses: actions/setup-go@v6
21+
with:
22+
go-version: v1.25.1
23+
24+
# --- Cache Go modules (Pinned to v4) ---
25+
- name: Cache Go modules
26+
uses: actions/cache@v4
27+
with:
28+
path: |
29+
~/go/pkg/mod
30+
~/.cache/go-build
31+
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
32+
restore-keys: |
33+
${{ runner.os }}-go-
34+
35+
# --- Download Dependencies (Recommended Practice) ---
36+
- name: Download Go modules
37+
run: go mod download
38+
39+
40+
# --- Run tests and collect coverage ---
41+
- name: Run tests with coverage
42+
run: |
43+
go test -v -coverprofile=coverage.out ./...
44+
go tool cover -func=coverage.out
45+
46+
# --- Generate coverage badge (Pinned to v2) ---
47+
- name: Generate coverage badge
48+
uses: tj-actions/coverage-badge-go@v2
49+
if: github.ref == 'refs/heads/main'
50+
with:
51+
filename: coverage.out
52+
53+
# --- Commit coverage badge (main only) ---
54+
- name: Commit coverage badge
55+
if: github.ref == 'refs/heads/main'
56+
run: |
57+
git config user.name "github-actions[bot]"
58+
git config user.email "github-actions[bot]@users.noreply.github.com"
59+
git add coverage-badge.svg
60+
git commit -m "chore: update coverage badge" || echo "No changes to commit"
61+
git push

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
notes.md
2+
files
3+
output

cmd/mosaic-node/main.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,39 @@
11
package main
22

3-
import "fmt"
3+
import (
4+
"log"
5+
"os"
6+
7+
"github.com/hcp-uw/mosaic/internal/encoding"
8+
)
49

510
func main() {
6-
fmt.Println("welcome to mosaic")
11+
//for testing purposes rn
12+
13+
// fileSize := 1267513984
14+
file, err := os.ReadFile("output_file.jpg")
15+
if err != nil {
16+
log.Fatal(err)
17+
}
18+
19+
fileSize := len(file)
20+
21+
encoder, err := encoding.NewEncoder(8, 4, "./files", "./files/.bin")
22+
23+
if err != nil {
24+
log.Fatal(err)
25+
}
26+
27+
//err = encoder.EncodeFile("/pictures/pic.jpg")
28+
//if err != nil {
29+
// fmt.Println(err)
30+
//}
31+
32+
//fmt.Println(fileSize)
33+
34+
err = encoder.DecodeShards("pictures/pic.jpg", fileSize)
35+
if err != nil {
36+
log.Fatal(err)
37+
}
38+
739
}

go.mod

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
11
module github.com/hcp-uw/mosaic
22

33
go 1.25.1
4+
5+
require (
6+
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
7+
github.com/klauspost/reedsolomon v1.12.5 // indirect
8+
golang.org/x/sys v0.37.0 // indirect
9+
)

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
2+
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
3+
github.com/klauspost/reedsolomon v1.12.5 h1:4cJuyH926If33BeDgiZpI5OU0pE+wUHZvMSyNGqN73Y=
4+
github.com/klauspost/reedsolomon v1.12.5/go.mod h1:LkXRjLYGM8K/iQfujYnaPeDmhZLqkrGUyG9p7zs5L68=
5+
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
6+
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=

internal/encoding/decode.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package encoding
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"log"
8+
"net/http"
9+
"os"
10+
"path/filepath"
11+
"strings"
12+
"sync"
13+
)
14+
15+
// has to remember file length
16+
// relativePath is the path inside the IN folder where incoming shards are stored
17+
// should be a directory not a file
18+
func (e *Encoder) DecodeShards(relativePath string, fileLength int) error {
19+
totalShards := e.parity + e.shards
20+
fileName := filepath.Base(relativePath)
21+
fileNameNoExt := strings.TrimSuffix(fileName, filepath.Ext(fileName))
22+
23+
type shardResult struct {
24+
index int
25+
result []byte
26+
}
27+
28+
shardFiles := make([]*os.File, totalShards)
29+
for i := 0; i < totalShards; i++ {
30+
shardName := fmt.Sprintf("shard%d_%s.dat", i, fileNameNoExt)
31+
shardPath := filepath.Join(e.dirIn, relativePath, shardName)
32+
33+
file, err := os.Open(shardPath)
34+
if err != nil {
35+
shardFiles[i] = nil
36+
continue
37+
}
38+
shardFiles[i] = file
39+
}
40+
41+
// makes sure outpath exists if not it creates it
42+
outPath := filepath.Dir(filepath.Join(e.dirOut, relativePath))
43+
err := os.MkdirAll(outPath, 0755)
44+
if err != nil {
45+
log.Fatal("here")
46+
return err
47+
}
48+
49+
// on first run through it finds the file extension creates the file then further
50+
// writes to that file
51+
var outFile *os.File
52+
writtenBytes := 0
53+
fileCreated := false
54+
for {
55+
if fileLength <= writtenBytes {
56+
break
57+
}
58+
59+
shardResults := make(chan shardResult, totalShards)
60+
shardArray := make([][]byte, totalShards)
61+
var shardReaders sync.WaitGroup
62+
63+
for index, file := range shardFiles {
64+
if file == nil {
65+
continue
66+
}
67+
68+
shardReaders.Add(1)
69+
go func(index int, file *os.File) {
70+
defer shardReaders.Done()
71+
shard := make([]byte, e.blockSize)
72+
73+
io.ReadFull(file, shard)
74+
// ts silent error is not that tuff
75+
76+
shardResults <- shardResult{index: index, result: shard}
77+
78+
}(index, file)
79+
}
80+
81+
go func() {
82+
shardReaders.Wait()
83+
close(shardResults)
84+
}()
85+
86+
var dataReadCount int
87+
for res := range shardResults {
88+
shardArray[res.index] = res.result
89+
if res.result != nil {
90+
dataReadCount++
91+
}
92+
}
93+
94+
// CRITICAL FIX: Exit condition 2: If zero non-nil blocks were read in this iteration,
95+
// it means all active shards are exhausted. Break the decoding loop to prevent
96+
// continuous reconstruction failure on empty data.
97+
if dataReadCount == 0 {
98+
break
99+
}
100+
101+
err := e.encoder.ReconstructData(shardArray)
102+
if err != nil {
103+
return err
104+
}
105+
106+
var buf bytes.Buffer
107+
108+
// works except for padding
109+
err = e.encoder.Join(&buf, shardArray, e.shards*e.blockSize)
110+
joinedBytes := buf.Bytes()
111+
if !fileCreated {
112+
113+
fileExtension := detectExtension(joinedBytes)
114+
fileOutDir := filepath.Join(e.dirOut, filepath.Dir(relativePath))
115+
fileOutPath := filepath.Join(fileOutDir, fileNameNoExt+fileExtension)
116+
os.MkdirAll(fileOutDir, 0755)
117+
118+
outFile, err = os.Create(fileOutPath)
119+
if err != nil {
120+
return err
121+
}
122+
defer outFile.Close()
123+
124+
fileCreated = true
125+
}
126+
127+
remaining := fileLength - writtenBytes
128+
toWrite := len(joinedBytes)
129+
if remaining < toWrite {
130+
toWrite = remaining
131+
}
132+
133+
totalWritten := 0
134+
for totalWritten < toWrite {
135+
n, err := outFile.Write(joinedBytes[totalWritten:toWrite])
136+
if err != nil {
137+
return err
138+
}
139+
totalWritten += n
140+
writtenBytes += n
141+
}
142+
}
143+
144+
return nil
145+
}
146+
147+
func detectExtension(data []byte) string {
148+
if len(data) > 512 {
149+
data = data[:512] // only need first 512 bytes
150+
}
151+
mimeType := http.DetectContentType(data)
152+
153+
switch mimeType {
154+
case "image/png":
155+
return ".png"
156+
case "image/jpeg":
157+
return ".jpg"
158+
case "application/pdf":
159+
return ".pdf"
160+
case "text/plain; charset=utf-8":
161+
return ".txt"
162+
default:
163+
return ".bin" // fallback
164+
}
165+
}

internal/encoding/decode_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package encoding
2+
3+
import (
4+
"bytes"
5+
"os"
6+
"path/filepath"
7+
"testing"
8+
"time"
9+
)
10+
11+
// TestDecodeShards_Performance runs full encode→decode and prints throughput (MB/s)
12+
func TestDecodeShards_Performance(t *testing.T) {
13+
tmpIn := t.TempDir()
14+
tmpOut := t.TempDir()
15+
16+
// prepare a moderately large file (e.g., 50 MB)
17+
fileSize := 50 * 1024 * 1024
18+
fileName := "bigfile.txt"
19+
inFilePath := filepath.Join(tmpOut, fileName)
20+
data := make([]byte, fileSize)
21+
for i := range data {
22+
data[i] = byte(i % 256)
23+
}
24+
25+
if err := os.WriteFile(inFilePath, data, 0644); err != nil {
26+
t.Fatalf("failed to write input file: %v", err)
27+
}
28+
29+
// create encoder
30+
enc, err := NewEncoder(4, 2, tmpOut, tmpIn)
31+
if err != nil {
32+
t.Fatalf("failed to create encoder: %v", err)
33+
}
34+
35+
// make .bin dir for encoded output
36+
if err := os.MkdirAll(filepath.Join(tmpOut, ".bin"), 0755); err != nil {
37+
t.Fatalf("failed to make .bin dir: %v", err)
38+
}
39+
40+
// encode
41+
startEncode := time.Now()
42+
if err := enc.EncodeFile(fileName); err != nil {
43+
t.Fatalf("EncodeFile failed: %v", err)
44+
}
45+
encodeElapsed := time.Since(startEncode).Seconds()
46+
47+
// move shards to decoder input (.bin/<fileName>)
48+
shardDir := filepath.Join(tmpOut, ".bin", fileName)
49+
shards, err := os.ReadDir(shardDir)
50+
if err != nil {
51+
t.Fatalf("failed to read shard dir: %v", err)
52+
}
53+
targetShardDir := filepath.Join(tmpIn, fileName)
54+
if err := os.MkdirAll(targetShardDir, 0755); err != nil {
55+
t.Fatalf("failed to create shard dir: %v", err)
56+
}
57+
for _, shard := range shards {
58+
src := filepath.Join(shardDir, shard.Name())
59+
dst := filepath.Join(targetShardDir, shard.Name())
60+
b, err := os.ReadFile(src)
61+
if err != nil {
62+
t.Fatalf("failed to read shard: %v", err)
63+
}
64+
if err := os.WriteFile(dst, b, 0644); err != nil {
65+
t.Fatalf("failed to write shard: %v", err)
66+
}
67+
}
68+
69+
// decode
70+
startDecode := time.Now()
71+
if err := enc.DecodeShards(fileName, len(data)); err != nil {
72+
t.Fatalf("DecodeShards failed: %v", err)
73+
}
74+
decodeElapsed := time.Since(startDecode).Seconds()
75+
76+
// verify output
77+
decodedPath := filepath.Join(tmpOut, fileName)
78+
decodedData, err := os.ReadFile(decodedPath)
79+
if err != nil {
80+
t.Fatalf("failed to read decoded file: %v", err)
81+
}
82+
83+
if !bytes.Equal(decodedData, data) {
84+
t.Fatalf("decoded file does not match original data")
85+
}
86+
87+
// calculate and log throughput
88+
mb := float64(fileSize) / (1024 * 1024)
89+
encodeThroughput := mb / encodeElapsed
90+
decodeThroughput := mb / decodeElapsed
91+
92+
t.Logf("Encode: %.2f MB in %.2f s (%.2f MB/s)", mb, encodeElapsed, encodeThroughput)
93+
t.Logf("Decode: %.2f MB in %.2f s (%.2f MB/s)", mb, decodeElapsed, decodeThroughput)
94+
}
95+

0 commit comments

Comments
 (0)