Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 22 additions & 62 deletions internal/pkg/archiver/warc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,62 +58,31 @@ func startWARCWriter() error {
IPv6AnyIP: config.Get().IPv6AnyIP,
ConnReadDeadline: config.Get().ConnReadDeadline,
DigestAlgorithm: warc.GetDigestFromPrefix(config.Get().WARCDigestAlgorithm),
Proxy: config.Get().Proxy,
}

// Instantiate WARC client
var err error
if config.Get().Proxy != "" {
proxiedWARCSettings := WARCSettings
proxiedWARCSettings.Proxy = config.Get().Proxy
globalArchiver.ClientWithProxy, err = warc.NewWARCWritingHTTPClient(proxiedWARCSettings)
if err != nil {
logger.Error("unable to init proxied WARC HTTP client", "err", err.Error(), "func", "archiver.startWARCWriter")
return err
}

go func() {
for err := range globalArchiver.ClientWithProxy.ErrChan {
logger.Error("WARC writer error", "err", err.Err.Error(), "func", err.Func)
}
}()
// Instantiate WARC client
globalArchiver.Client, err = warc.NewWARCWritingHTTPClient(WARCSettings)
if err != nil {
logger.Error("unable to init WARC HTTP client", "err", err.Error(), "func", "archiver.startWARCWriter")
return err
}

// Even if a proxied client has been set, we want to create an non-proxied one
if config.Get().Proxy == "" {
globalArchiver.Client, err = warc.NewWARCWritingHTTPClient(WARCSettings)
if err != nil {
logger.Error("unable to init WARC HTTP client", "err", err.Error(), "func", "archiver.startWARCWriter")
return err
go func() {
for err := range globalArchiver.Client.ErrChan {
logger.Error("WARC writer error", "err", err.Err.Error(), "func", err.Func)
}

go func() {
for err := range globalArchiver.Client.ErrChan {
logger.Error("WARC writer error", "err", err.Err.Error(), "func", err.Func)
}
}()
}
}()

// Set the timeouts
if config.Get().HTTPTimeout > 0 {
if globalArchiver.Client != nil {
globalArchiver.Client.Timeout = config.Get().HTTPTimeout
}

if globalArchiver.ClientWithProxy != nil {
globalArchiver.ClientWithProxy.Timeout = config.Get().HTTPTimeout
}
}
return nil
}

func GetClients() (clients []*warc.CustomHTTPClient) {
for _, c := range []*warc.CustomHTTPClient{globalArchiver.Client, globalArchiver.ClientWithProxy} {
if c != nil {
clients = append(clients, c)
}
globalArchiver.Client.Timeout = config.Get().HTTPTimeout
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because it used nowhere, and with one .Client we won't need any GetClients() shorthand soon


return clients
return nil
}

type WARCStats struct {
Expand All @@ -130,28 +99,19 @@ type WARCStats struct {
func GetStats() WARCStats {
var stats WARCStats

for _, c := range []*warc.CustomHTTPClient{globalArchiver.Client, globalArchiver.ClientWithProxy} {
if c != nil {
stats.WARCWritingQueueSize += int64(c.WaitGroup.Size())
stats.WARCTotalBytesArchived += c.DataTotal.Load()
stats.CDXDedupeTotalBytes += c.CDXDedupeTotalBytes.Load()
stats.DoppelgangerDedupeTotalBytes += c.DoppelgangerDedupeTotalBytes.Load()
stats.LocalDedupeTotalBytes += c.LocalDedupeTotalBytes.Load()
stats.CDXDedupeTotal += c.CDXDedupeTotal.Load()
stats.DoppelgangerDedupeTotal += c.DoppelgangerDedupeTotal.Load()
stats.LocalDedupeTotal += c.LocalDedupeTotal.Load()
}
}
c := globalArchiver.Client
stats.WARCWritingQueueSize = int64(c.WaitGroup.Size())
stats.WARCTotalBytesArchived = c.DataTotal.Load()
stats.CDXDedupeTotalBytes = c.CDXDedupeTotalBytes.Load()
stats.DoppelgangerDedupeTotalBytes = c.DoppelgangerDedupeTotalBytes.Load()
stats.LocalDedupeTotalBytes = c.LocalDedupeTotalBytes.Load()
stats.CDXDedupeTotal = c.CDXDedupeTotal.Load()
stats.DoppelgangerDedupeTotal = c.DoppelgangerDedupeTotal.Load()
stats.LocalDedupeTotal = c.LocalDedupeTotal.Load()
return stats
}

// This function is used in multiple places so it can't be replaced by GetStats()
func GetWARCWritingQueueSize() (total int) {
for _, c := range []*warc.CustomHTTPClient{globalArchiver.Client, globalArchiver.ClientWithProxy} {
if c != nil {
total += c.WaitGroup.Size()
}
}

return total
return globalArchiver.Client.WaitGroup.Size()
}
21 changes: 5 additions & 16 deletions internal/pkg/archiver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ type archiver struct {
inputCh chan *models.Item
outputCh chan *models.Item

Client *warc.CustomHTTPClient
ClientWithProxy *warc.CustomHTTPClient
Client *warc.CustomHTTPClient
}

var (
Expand Down Expand Up @@ -122,14 +121,11 @@ func Stop() {
}
}
}()

globalArchiver.Client.WaitGroup.Wait()
stopLocalWatcher <- struct{}{}
logger.Debug("WARC writing finished")
stopLocalWatcher <- struct{}{}
globalArchiver.Client.Close()
if globalArchiver.ClientWithProxy != nil {
globalArchiver.ClientWithProxy.WaitGroup.Wait()
globalArchiver.ClientWithProxy.Close()
}

logger.Info("stopped")
}
Expand Down Expand Up @@ -218,13 +214,6 @@ func archive(workerID string, seed *models.Item) {
panic(err)
}

var client *warc.CustomHTTPClient
if config.Get().Proxy != "" {
client = globalArchiver.ClientWithProxy
} else {
client = globalArchiver.Client
}

for i := range items {
if items[i].GetStatus() != models.ItemPreProcessed {
logger.Debug("skipping item", "item_id", items[i].GetShortID(), "status", items[i].GetStatus())
Expand All @@ -236,9 +225,9 @@ func archive(workerID string, seed *models.Item) {
wg.Add(1)

if config.Get().Headless {
go headless.ArchiveItem(items[i], &wg, guard, globalBucketManager, client)
go headless.ArchiveItem(items[i], &wg, guard, globalBucketManager, globalArchiver.Client)
} else {
go general.ArchiveItem(items[i], &wg, guard, globalBucketManager, client)
go general.ArchiveItem(items[i], &wg, guard, globalBucketManager, globalArchiver.Client)
}

}
Expand Down
Loading