Skip to content

Commit e5cb428

Browse files
committed
make queuefs support handlefs
1 parent 32cf775 commit e5cb428

3 files changed

Lines changed: 282 additions & 63 deletions

File tree

agfs-fuse/pkg/fusefs/handles.go

Lines changed: 69 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@ type handleInfo struct {
2424
path string
2525
flags agfs.OpenFlag
2626
mode uint32
27-
// Write buffer for local handles - accumulates writes until Close
28-
writeBuffer []byte
29-
dirty bool // true if writeBuffer has data to be flushed
27+
// Read buffer for local handles - caches first read to avoid multiple server requests
28+
readBuffer []byte
3029
}
3130

3231
// HandleManager manages the mapping between FUSE handles and AGFS handles
@@ -111,28 +110,21 @@ func (hm *HandleManager) Close(fuseHandle uint64) error {
111110
return nil
112111
}
113112

114-
// Local handles: flush write buffer if dirty
115-
if info.dirty && len(info.writeBuffer) > 0 {
116-
_, err := hm.client.Write(info.path, info.writeBuffer)
117-
if err != nil {
118-
return fmt.Errorf("failed to flush write buffer: %w", err)
119-
}
120-
}
121-
113+
// Local handles: nothing to do on close since writes are sent immediately
122114
return nil
123115
}
124116

125117
// Read reads data from a handle
126118
func (hm *HandleManager) Read(fuseHandle uint64, offset int64, size int) ([]byte, error) {
127-
hm.mu.RLock()
119+
hm.mu.Lock()
128120
info, ok := hm.handles[fuseHandle]
129-
hm.mu.RUnlock()
130-
131121
if !ok {
122+
hm.mu.Unlock()
132123
return nil, fmt.Errorf("handle %d not found", fuseHandle)
133124
}
134125

135126
if info.htype == handleTypeRemote {
127+
hm.mu.Unlock()
136128
// Use server-side handle
137129
data, err := hm.client.ReadHandle(info.agfsHandle, offset, size)
138130
if err != nil {
@@ -141,12 +133,58 @@ func (hm *HandleManager) Read(fuseHandle uint64, offset int64, size int) ([]byte
141133
return data, nil
142134
}
143135

144-
// Local handle: use direct Read API from SDK
145-
data, err := hm.client.Read(info.path, offset, int64(size))
146-
if err != nil {
147-
return nil, fmt.Errorf("failed to read file: %w", err)
136+
// Local handle: cache the first read and return from cache for subsequent reads
137+
// This is critical for special filesystems like queuefs where each read
138+
// should be an independent atomic operation (e.g., each read from dequeue
139+
// should consume only one message, not multiple)
140+
if info.readBuffer == nil {
141+
// First read: fetch ALL data from server and cache (use size=-1 to read all)
142+
path := info.path
143+
hm.mu.Unlock()
144+
145+
data, err := hm.client.Read(path, 0, -1) // Read all data
146+
if err != nil {
147+
return nil, fmt.Errorf("failed to read file: %w", err)
148+
}
149+
150+
// Cache the data
151+
hm.mu.Lock()
152+
// Re-check if handle still exists
153+
info, ok = hm.handles[fuseHandle]
154+
if ok {
155+
info.readBuffer = data
156+
}
157+
hm.mu.Unlock()
158+
159+
// Return requested portion
160+
if offset >= int64(len(data)) {
161+
return []byte{}, nil
162+
}
163+
end := offset + int64(size)
164+
if end > int64(len(data)) {
165+
end = int64(len(data))
166+
}
167+
return data[offset:end], nil
148168
}
149-
return data, nil
169+
170+
// Return from cache or empty for subsequent reads
171+
if info.readBuffer != nil {
172+
if offset >= int64(len(info.readBuffer)) {
173+
hm.mu.Unlock()
174+
return []byte{}, nil // EOF
175+
}
176+
end := offset + int64(size)
177+
if end > int64(len(info.readBuffer)) {
178+
end = int64(len(info.readBuffer))
179+
}
180+
result := info.readBuffer[offset:end]
181+
hm.mu.Unlock()
182+
return result, nil
183+
}
184+
185+
// No cached data and offset > 0, return empty
186+
hm.mu.Unlock()
187+
return []byte{}, nil
150188
}
151189

152190
// Write writes data to a handle
@@ -168,36 +206,19 @@ func (hm *HandleManager) Write(fuseHandle uint64, data []byte, offset int64) (in
168206
return written, nil
169207
}
170208

171-
// Local handle: buffer the write data
172-
// This ensures all writes are accumulated and sent as one request on Close
173-
// which is critical for queuefs enqueue to work correctly with large messages
174-
if offset == 0 && len(info.writeBuffer) == 0 {
175-
// First write at offset 0: initialize buffer
176-
info.writeBuffer = make([]byte, len(data))
177-
copy(info.writeBuffer, data)
178-
} else if offset == int64(len(info.writeBuffer)) {
179-
// Sequential append
180-
info.writeBuffer = append(info.writeBuffer, data...)
181-
} else if offset < int64(len(info.writeBuffer)) {
182-
// Overwrite within existing buffer
183-
endOffset := offset + int64(len(data))
184-
if endOffset > int64(len(info.writeBuffer)) {
185-
// Extend buffer if needed
186-
newBuf := make([]byte, endOffset)
187-
copy(newBuf, info.writeBuffer)
188-
info.writeBuffer = newBuf
189-
}
190-
copy(info.writeBuffer[offset:], data)
191-
} else {
192-
// Gap in write - extend with zeros
193-
newBuf := make([]byte, offset+int64(len(data)))
194-
copy(newBuf, info.writeBuffer)
195-
copy(newBuf[offset:], data)
196-
info.writeBuffer = newBuf
197-
}
198-
info.dirty = true
209+
// Local handle: send data directly to server for each write
210+
// This is critical for special filesystems like queuefs where each write
211+
// should be an independent atomic operation (e.g., each write to enqueue
212+
// should create a separate queue message)
213+
path := info.path
199214
hm.mu.Unlock()
200215

216+
// Send directly to server
217+
_, err := hm.client.Write(path, data)
218+
if err != nil {
219+
return 0, fmt.Errorf("failed to write to server: %w", err)
220+
}
221+
201222
return len(data), nil
202223
}
203224

@@ -219,20 +240,7 @@ func (hm *HandleManager) Sync(fuseHandle uint64) error {
219240
return nil
220241
}
221242

222-
// Local handles: flush write buffer if dirty
223-
if info.dirty && len(info.writeBuffer) > 0 {
224-
data := info.writeBuffer
225-
info.writeBuffer = nil
226-
info.dirty = false
227-
hm.mu.Unlock()
228-
229-
_, err := hm.client.Write(info.path, data)
230-
if err != nil {
231-
return fmt.Errorf("failed to sync write buffer: %w", err)
232-
}
233-
return nil
234-
}
235-
243+
// Local handles: nothing to sync since writes are sent immediately
236244
hm.mu.Unlock()
237245
return nil
238246
}

agfs-server/pkg/handlers/handlers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,9 @@ func (h *Handler) Stat(w http.ResponseWriter, r *http.Request) {
338338
status := mapErrorToStatus(err)
339339
// "Not found" is expected during cp/mv operations, use debug level
340340
if status == http.StatusNotFound {
341-
log.Debugf("Stat: path not found: %s", path)
341+
log.Debugf("Stat: path not found: %s (from %s)", path, r.RemoteAddr)
342342
} else {
343-
log.Errorf("Stat error for path %s: %v", path, err)
343+
log.Errorf("Stat error for path %s: %v (from %s)", path, err, r.RemoteAddr)
344344
}
345345
writeError(w, status, err.Error())
346346
return

0 commit comments

Comments
 (0)