Skip to content

Commit e9d5bde

Browse files
authored
Add cache to dedupe table (which adds limit) (#177)
feat: add otter.Cache to dedupeHashTable to reduce unbounded growth. This should prioritize entries that have been retreived.
1 parent 9054016 commit e9d5bde

File tree

3 files changed

+28
-7
lines changed

3 files changed

+28
-7
lines changed

client.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"sync"
77
"sync/atomic"
88
"time"
9+
10+
"github.com/maypok86/otter"
911
)
1012

1113
type Error struct {
@@ -44,7 +46,7 @@ type HTTPClientSettings struct {
4446
type CustomHTTPClient struct {
4547
interfacesWatcherStop chan bool
4648
WaitGroup *WaitGroupWithCount
47-
dedupeHashTable *sync.Map
49+
dedupeHashTable *otter.Cache[string, revisitRecord]
4850
ErrChan chan *Error
4951
WARCWriter chan *RecordBatch
5052
interfacesWatcherStarted chan bool
@@ -60,6 +62,7 @@ type CustomHTTPClient struct {
6062
FullOnDisk bool
6163
DigestAlgorithm DigestAlgorithm
6264
closeDNSCache func()
65+
closeDedupeCache func()
6366
// MaxRAMUsageFraction is the fraction of system RAM above which we'll force spooling to disk. For example, 0.5 = 50%.
6467
// If set to <= 0, the default value is DefaultMaxRAMUsageFraction.
6568
MaxRAMUsageFraction float64
@@ -99,6 +102,7 @@ func (c *CustomHTTPClient) Close() error {
99102
}
100103

101104
c.closeDNSCache()
105+
c.closeDedupeCache()
102106

103107
return nil
104108
}
@@ -132,7 +136,23 @@ func NewWARCWritingHTTPClient(HTTPClientSettings HTTPClientSettings) (httpClient
132136

133137
// Toggle deduplication options and create map for deduplication records.
134138
httpClient.dedupeOptions = HTTPClientSettings.DedupeOptions
135-
httpClient.dedupeHashTable = new(sync.Map)
139+
140+
// Set default dedupe cache size to 1M entries if not specified
141+
dedupeCacheSize := HTTPClientSettings.DedupeOptions.DedupeCacheSize
142+
if dedupeCacheSize == 0 {
143+
dedupeCacheSize = 1_000_000
144+
}
145+
146+
dedupeCache, err := otter.MustBuilder[string, revisitRecord](dedupeCacheSize).Build()
147+
if err != nil {
148+
return nil, err
149+
}
150+
httpClient.dedupeHashTable = &dedupeCache
151+
152+
httpClient.closeDedupeCache = func() {
153+
httpClient.dedupeHashTable.Close()
154+
time.Sleep(1 * time.Second)
155+
}
136156

137157
// Set default deduplication threshold to 2048 bytes
138158
if httpClient.dedupeOptions.SizeThreshold == 0 {

dedupe.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type DedupeOptions struct {
2727
DoppelgangerHost string
2828
CDXCookie string
2929
SizeThreshold int
30+
DedupeCacheSize int
3031
LocalDedupe bool
3132
CDXDedupe bool
3233
DoppelgangerDedupe bool
@@ -40,9 +41,9 @@ type revisitRecord struct {
4041
}
4142

4243
func (d *customDialer) checkLocalRevisit(digest string) revisitRecord {
43-
revisit, exists := d.client.dedupeHashTable.Load(digest)
44+
revisit, exists := d.client.dedupeHashTable.Get(digest)
4445
if exists {
45-
return revisit.(revisitRecord)
46+
return revisit
4647
}
4748

4849
return revisitRecord{}
@@ -51,7 +52,7 @@ func (d *customDialer) checkLocalRevisit(digest string) revisitRecord {
5152
func checkCDXRevisit(CDXURL string, digest string, targetURI string, cookie string) (revisitRecord, error) {
5253
// CDX expects no hash header. For now we need to strip it.
5354
digest = strings.SplitN(digest, ":", 2)[1]
54-
55+
5556
req, err := http.NewRequest("GET", CDXURL+"/web/timemap/cdx?url="+url.QueryEscape(targetURI)+"&limit=-1", nil)
5657
if err != nil {
5758
return revisitRecord{}, err
@@ -95,7 +96,7 @@ func checkCDXRevisit(CDXURL string, digest string, targetURI string, cookie stri
9596
func checkDoppelgangerRevisit(DoppelgangerHost string, digest string, targetURI string) (revisitRecord, error) {
9697
// Doppelganger is not expecting a hash header either but this will all be rewritten ... shortly...
9798
digest = strings.SplitN(digest, ":", 2)[1]
98-
99+
99100
req, err := http.NewRequest("GET", DoppelgangerHost+"/api/records/"+digest+"?uri="+targetURI, nil)
100101
if err != nil {
101102
return revisitRecord{}, err

dialer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ func (d *customDialer) writeWARCFromConnection(ctx context.Context, reqPipe, res
611611
}
612612
return
613613
}
614-
d.client.dedupeHashTable.Store(r.Header.Get("WARC-Payload-Digest"), revisitRecord{
614+
d.client.dedupeHashTable.Set(r.Header.Get("WARC-Payload-Digest"), revisitRecord{
615615
responseUUID: recordIDs[i],
616616
size: getContentLength(r.Content),
617617
targetURI: warcTargetURI,

0 commit comments

Comments
 (0)