Skip to content

Commit 4c19509

Browse files
authored
Merge pull request velero-io#9887 from Lyndon-Li/incremental-object-aware-write-at
Incremental aware object writer - writeat
2 parents 901b1a0 + f474e31 commit 4c19509

3 files changed

Lines changed: 863 additions & 5 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add WriteAt implementation for Incremental aware object writer for block data mover

pkg/repository/udmrepo/kopialib/lib_repo.go

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ type kopiaObjectWriterEx struct {
9292
description string
9393
compressor compression.Name
9494
splitter string
95+
zeroObject object.ID
9596
writeLock sync.Mutex
9697
asyncWritesSem chan struct{}
9798
asyncWritesGroup sync.WaitGroup
@@ -479,6 +480,7 @@ func (kr *kopiaRepository) NewObjectWriter(ctx context.Context, opt udmrepo.Obje
479480
description: opt.Description,
480481
compressor: getCompressorForObject(opt),
481482
blockSize: fixedBlockSize,
483+
zeroObject: object.EmptyID,
482484
splitter: fixedSplitter1M,
483485
asyncWritesSem: asyncWritesSem,
484486
asyncBuffer: asyncBuffer,
@@ -985,9 +987,119 @@ func (kow *kopiaObjectWriterEx) writeObjectAsync(objName string, entryID int, p
985987
}
986988
}
987989

988-
// TODO add implementation in following PRs
990+
func (kow *kopiaObjectWriterEx) writeZeroObject(objName string, entryID int) error {
991+
if kow.zeroObject == object.EmptyID {
992+
zeroBuffer := make([]byte, kow.blockSize)
993+
objectID, err := kow.writeObject(objName, zeroBuffer)
994+
if err != nil {
995+
return err
996+
}
997+
998+
kow.zeroObject = objectID
999+
}
1000+
1001+
kow.entryLock.Lock()
1002+
kow.entries[entryID].Object = kow.zeroObject
1003+
kow.entryLock.Unlock()
1004+
1005+
return nil
1006+
}
1007+
9891008
func (kow *kopiaObjectWriterEx) WriteAt(p []byte, offset int64) (int, error) {
990-
return 0, errors.New("not implemented")
1009+
kow.writeLock.Lock()
1010+
defer kow.writeLock.Unlock()
1011+
1012+
if kow.rawRepoWriter == nil {
1013+
return 0, errors.New("object writer is closed or not open")
1014+
}
1015+
1016+
if err := kow.getWriteError(); err != nil {
1017+
return 0, errors.Wrapf(err, "error happened during writing object")
1018+
}
1019+
1020+
if offset%kow.blockSize != 0 {
1021+
return 0, errors.Errorf("invalid offset %v", offset)
1022+
}
1023+
1024+
length := len(p)
1025+
if int64(length)%kow.blockSize != 0 {
1026+
return 0, errors.Errorf("invalid length %v", length)
1027+
}
1028+
1029+
kow.entryLock.Lock()
1030+
curPos := int64(len(kow.entries)) * kow.blockSize
1031+
kow.entryLock.Unlock()
1032+
1033+
if offset < curPos {
1034+
return 0, errors.Errorf("cannot write back, cur pos %v", curPos)
1035+
}
1036+
1037+
if offset > curPos && kow.parentEntries != nil {
1038+
startEntry := int(curPos / kow.blockSize)
1039+
endEntry := int(offset / kow.blockSize)
1040+
if startEntry < len(kow.parentEntries) {
1041+
if len(kow.parentEntries) < endEntry {
1042+
endEntry = len(kow.parentEntries)
1043+
}
1044+
1045+
for i := startEntry; i < endEntry; i++ {
1046+
e := kow.parentEntries[i]
1047+
1048+
if e.Start != int64(i)*kow.blockSize {
1049+
return 0, errors.Errorf("parent entry %v start %v does not match expected start %v", i, e.Start, int64(i)*kow.blockSize)
1050+
}
1051+
1052+
if e.Length != kow.blockSize {
1053+
return 0, errors.Errorf("parent entry %v length %v does not match child block size %v", i, e.Length, kow.blockSize)
1054+
}
1055+
}
1056+
1057+
kow.entryLock.Lock()
1058+
kow.entries = append(kow.entries, kow.parentEntries[startEntry:endEntry]...)
1059+
curPos = int64(len(kow.entries)) * kow.blockSize
1060+
kow.entryLock.Unlock()
1061+
}
1062+
}
1063+
1064+
entryID := 0
1065+
for curPos < offset {
1066+
kow.entryLock.Lock()
1067+
entryID = len(kow.entries)
1068+
kow.entries = append(kow.entries, object.IndirectObjectEntry{
1069+
Start: curPos,
1070+
Length: kow.blockSize,
1071+
})
1072+
kow.entryLock.Unlock()
1073+
1074+
objName := fmt.Sprintf("%s-b%v", kow.description, entryID)
1075+
if err := kow.writeZeroObject(objName, entryID); err != nil {
1076+
return 0, errors.Wrapf(err, "error writing zero object for %s", objName)
1077+
}
1078+
1079+
curPos += kow.blockSize
1080+
}
1081+
1082+
if length == 0 {
1083+
return length, nil
1084+
}
1085+
1086+
for curPos < offset+int64(length) {
1087+
kow.entryLock.Lock()
1088+
entryID = len(kow.entries)
1089+
kow.entries = append(kow.entries, object.IndirectObjectEntry{
1090+
Start: curPos,
1091+
Length: kow.blockSize,
1092+
})
1093+
kow.entryLock.Unlock()
1094+
1095+
buffOffset := curPos - offset
1096+
objName := fmt.Sprintf("%s-b%v", kow.description, entryID)
1097+
kow.writeObjectAsync(objName, entryID, p[buffOffset:buffOffset+kow.blockSize])
1098+
1099+
curPos += kow.blockSize
1100+
}
1101+
1102+
return length, nil
9911103
}
9921104

9931105
func (kow *kopiaObjectWriterEx) Checkpoint() (udmrepo.ID, error) {

0 commit comments

Comments
 (0)