-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransaction_logger.go
100 lines (80 loc) · 2.24 KB
/
transaction_logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main
import (
"bufio"
"fmt"
"os"
)
type TransactionLogger interface {
WriteDelete(key string)
WritePut(key, value string)
Err() <-chan error
ReadEvents() (<-chan Event, <-chan error)
Run()
}
func NewFileTransactionLogger(filename string) (TransactionLogger, error) {
file, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
if err != nil {
return nil, fmt.Errorf("cannot open transaction log file: %w", err)
}
return &FileTransactionLogger{file: file}, nil
}
type FileTransactionLogger struct {
events chan<- Event
errors <-chan error
lastSequence uint64
file *os.File
}
func (l *FileTransactionLogger) WritePut(key, value string) {
l.events <- Event{EventType: EventPut, Key: key, Value: value}
}
func (l *FileTransactionLogger) WriteDelete(key string) {
l.events <- Event{EventType: EventDelete, Key: key}
}
func (l *FileTransactionLogger) Err() <-chan error {
return l.errors
}
func (l *FileTransactionLogger) Run() {
events := make(chan Event, 16)
l.events = events
errors := make(chan error, 1)
l.errors = errors
go func() {
for e := range events {
l.lastSequence++
_, err := fmt.Fprintf(l.file, "%d\t%d\t%s\t%s\n", l.lastSequence, e.EventType, e.Key, e.Value)
if err != nil {
errors <- err
return
}
}
}()
}
func (l *FileTransactionLogger) ReadEvents() (<-chan Event, <-chan error) {
scanner := bufio.NewScanner(l.file)
outEvents := make(chan Event)
outErrors := make(chan error, 1)
go func() {
var e Event
defer close(outEvents)
defer close(outErrors)
for scanner.Scan() {
line := scanner.Text()
if _, err := fmt.Sscanf(line, "%d\t%d\t%s\t%s", &e.Sequence, &e.EventType, &e.Key, &e.Value); err != nil {
outErrors <- fmt.Errorf("cannot parse event: %w", err)
}
// checking whether the inputted sequence is out of order
// last sequence should be lower than current read event sequence
if l.lastSequence >= e.Sequence {
outErrors <- fmt.Errorf("cannot read events: transactions number out of sequence")
return
}
l.lastSequence = e.Sequence
outEvents <- e
}
if err := scanner.Err(); err != nil {
outErrors <- fmt.Errorf("transaction log read failure: %w", err)
return
}
}()
return outEvents, outErrors
}