Skip to content

Commit b4495ec

Browse files
authored
job type selection and web scraper job logging (#54)
* web scraper logging * var assignment fix
1 parent f2ce562 commit b4495ec

File tree

2 files changed

+91
-16
lines changed

2 files changed

+91
-16
lines changed

internal/jobs/webscraper.go

+54-6
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,46 @@ func NewWebScraper(jc types.JobConfiguration, statsCollector *stats.StatsCollect
4141
}
4242

4343
func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) {
44+
logrus.Info("Starting ExecuteJob for web scraper")
45+
46+
// Step 1: Unmarshal arguments
4447
args := &WebScraperArgs{}
45-
j.Arguments.Unmarshal(args)
48+
logrus.Info("Unmarshaling job arguments")
49+
if err := j.Arguments.Unmarshal(args); err != nil {
50+
logrus.Errorf("Failed to unmarshal job arguments: %v", err)
51+
return types.JobResult{Error: fmt.Sprintf("Invalid arguments: %v", err)}, err
52+
}
53+
logrus.Infof("Job arguments unmarshaled successfully: %+v", args)
4654

55+
// Step 2: Validate URL against blacklist
56+
logrus.Info("Validating URL against blacklist")
4757
for _, u := range ws.configuration.Blacklist {
58+
logrus.Debugf("Checking if URL contains blacklisted term: %s", u)
4859
if strings.Contains(args.URL, u) {
60+
logrus.Warnf("URL %s is blacklisted due to term: %s", args.URL, u)
4961
ws.stats.Add(stats.WebInvalid, 1)
5062
logrus.Errorf("Blacklisted URL: %s", args.URL)
5163
return types.JobResult{
5264
Error: fmt.Sprintf("URL blacklisted: %s", args.URL),
5365
}, nil
5466
}
5567
}
68+
logrus.Infof("URL %s passed blacklist validation", args.URL)
5669

70+
// Step 3: Perform web scraping
71+
logrus.Infof("Initiating web scraping for URL: %s with depth: %d", args.URL, args.Depth)
5772
result, err := scrapeWeb([]string{args.URL}, args.Depth)
5873
if err != nil {
74+
logrus.Errorf("Web scraping failed for URL %s: %v", args.URL, err)
5975
ws.stats.Add(stats.WebErrors, 1)
6076
return types.JobResult{Error: err.Error()}, err
6177
}
78+
logrus.Infof("Web scraping succeeded for URL %s: %v", args.URL, result)
6279

63-
// Do the web scraping here
64-
// For now, just return the URL
80+
// Step 4: Process result and return
81+
logrus.Info("Updating statistics for successful web scraping")
6582
ws.stats.Add(stats.WebSuccess, 1)
83+
logrus.Infof("Returning web scraping result for URL %s", args.URL)
6684
return types.JobResult{
6785
Data: result,
6886
}, nil
@@ -107,64 +125,82 @@ type CollectedData struct {
107125
// logrus.WithField("result", string(res)).Info("Scraping completed")
108126
// }()
109127
func scrapeWeb(uri []string, depth int) ([]byte, error) {
128+
logrus.Infof("Starting scrapeWeb with parameters: URIs=%v, Depth=%d", uri, depth)
110129
// Set default depth to 1 if 0 is provided
111130
if depth <= 0 {
131+
logrus.Infof("Invalid depth (%d) provided, setting default depth to 1", depth)
112132
depth = 1
113133
}
114134

135+
logrus.Info("Initializing CollectedData struct")
115136
var collectedData CollectedData
116137

138+
logrus.Info("Creating new Colly collector")
117139
c := colly.NewCollector(
118140
colly.Async(true), // Enable asynchronous requests
119141
colly.AllowURLRevisit(),
120142
colly.IgnoreRobotsTxt(),
121143
colly.MaxDepth(depth),
122144
)
145+
logrus.Info("Colly collector created successfully")
123146

124147
// Adjust the parallelism and delay based on your needs and server capacity
148+
logrus.Info("Setting scraping limits with parallelism and delay")
125149
limitRule := colly.LimitRule{
126150
DomainGlob: "*",
127151
Parallelism: 4, // Increased parallelism
128152
Delay: 500 * time.Millisecond, // Reduced delay
129153
}
154+
logrus.Info("Applying scraping limits to the collector")
130155
if err := c.Limit(&limitRule); err != nil {
131156
logrus.Errorf("[-] Unable to set scraper limit. Using default. Error: %v", err)
132157
}
133158

134159
// Increase the timeout slightly if necessary
135-
c.SetRequestTimeout(240 * time.Second) // Increased to 4 minutes
160+
logrus.Info("Setting request timeout to 240 seconds")
161+
c.SetRequestTimeout(240 * time.Second)
136162

137163
// Initialize a backoff strategy
164+
logrus.Info("Initializing exponential backoff strategy")
138165
backoffStrategy := backoff.NewExponentialBackOff()
139166

167+
logrus.Info("Registering OnError callback to handle request errors")
140168
c.OnError(func(r *colly.Response, err error) {
169+
logrus.Errorf("Error occurred during request to URL: %s. StatusCode: %d, Error: %v", r.Request.URL, r.StatusCode, err)
141170
if r.StatusCode == http.StatusTooManyRequests {
142171
// Parse the Retry-After header (in seconds)
143172
retryAfter, convErr := strconv.Atoi(r.Headers.Get("Retry-After"))
144173
if convErr != nil {
145174
// If not in seconds, it might be a date. Handle accordingly.
146-
logrus.Debugf("[-] Retry-After: %s", r.Headers.Get("Retry-After"))
175+
logrus.Warnf("Retry-After header is present but unrecognized format: %s", r.Headers.Get("Retry-After"))
147176
}
148177
// Calculate the next delay
149178
nextDelay := backoffStrategy.NextBackOff()
150179
if retryAfter > 0 {
151180
nextDelay = time.Duration(retryAfter) * time.Second
152181
}
153-
logrus.Warnf("[-] Rate limited. Retrying after %v", nextDelay)
182+
logrus.Warnf("Rate limited for URL: %s. Retrying after %v", r.Request.URL, nextDelay)
154183
time.Sleep(nextDelay)
155184
// Retry the request
185+
logrus.Info("Retrying the request")
156186
_ = r.Request.Retry()
187+
157188
} else {
189+
logrus.Errorf("Request failed for URL: %s with error: %v", r.Request.URL, err)
158190
logrus.Errorf("[-] Request URL: %s failed with error: %v", r.Request.URL, err)
159191
}
160192
})
161193

194+
logrus.Info("Registering OnHTML callback for h1, h2 elements (titles)")
162195
c.OnHTML("h1, h2", func(e *colly.HTMLElement) {
196+
logrus.Infof("Title (h1/h2) found: %s", e.Text)
163197
// Directly append a new Section to collectedData.Sections
164198
collectedData.Sections = append(collectedData.Sections, Section{Title: e.Text})
165199
})
166200

201+
logrus.Info("Registering OnHTML callback for paragraph elements")
167202
c.OnHTML("p", func(e *colly.HTMLElement) {
203+
logrus.Infof("Paragraph detected: %s", e.Text)
168204
// Check if there are any sections to append paragraphs to
169205
if len(collectedData.Sections) > 0 {
170206
// Get a reference to the last section
@@ -185,15 +221,19 @@ func scrapeWeb(uri []string, depth int) ([]byte, error) {
185221
}
186222
})
187223

224+
logrus.Info("Registering OnHTML callback for image elements")
188225
c.OnHTML("img", func(e *colly.HTMLElement) {
226+
logrus.Infof("Image detected with source URL: %s", e.Attr("src"))
189227
imageURL := e.Request.AbsoluteURL(e.Attr("src"))
190228
if len(collectedData.Sections) > 0 {
191229
lastSection := &collectedData.Sections[len(collectedData.Sections)-1]
192230
lastSection.Images = append(lastSection.Images, imageURL)
193231
}
194232
})
195233

234+
logrus.Info("Registering OnHTML callback for anchor elements")
196235
c.OnHTML("a", func(e *colly.HTMLElement) {
236+
logrus.Infof("Link detected: %s", e.Attr("href"))
197237
pageURL := e.Request.AbsoluteURL(e.Attr("href"))
198238
// Check if the URL protocol is supported (http or https)
199239
if strings.HasPrefix(pageURL, "http://") || strings.HasPrefix(pageURL, "https://") {
@@ -202,16 +242,24 @@ func scrapeWeb(uri []string, depth int) ([]byte, error) {
202242
}
203243
})
204244

245+
logrus.Infof("Starting to visit URLs: %v", uri)
205246
for _, u := range uri {
206247
err := c.Visit(u)
248+
logrus.Infof("Visiting URL: %s", u)
249+
err = c.Visit(u)
207250
if err != nil {
251+
logrus.Errorf("Failed to visit URL: %s. Error: %v", u, err)
208252
return nil, err
209253
}
210254
}
211255

212256
// Wait for all requests to finish
257+
logrus.Info("Waiting for all requests to complete")
213258
c.Wait()
214259

260+
logrus.Info("Scraping completed, marshaling collected data into JSON format")
215261
j, _ := json.Marshal(collectedData)
262+
263+
logrus.Infof("Scraping successful. Returning data for URIs: %v", uri)
216264
return j, nil
217265
}

internal/jobserver/jobserver.go

+37-10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package jobserver
22

33
import (
44
"context"
5+
"github.com/sirupsen/logrus"
56
"sync"
67

78
"github.com/google/uuid"
@@ -28,35 +29,61 @@ type jobWorkerEntry struct {
2829
}
2930

3031
func NewJobServer(workers int, jc types.JobConfiguration) *JobServer {
32+
logrus.Info("Initializing JobServer...")
33+
34+
// Validate and set worker count
3135
if workers <= 0 {
36+
logrus.Infof("Invalid worker count (%d), defaulting to 1 worker.", workers)
3237
workers = 1
38+
} else {
39+
logrus.Infof("Setting worker count to %d.", workers)
3340
}
3441

42+
// Retrieve and set buffer size for stats collector
3543
bufSize, ok := jc["stats_buf_size"].(uint)
3644
if !ok {
45+
logrus.Info("stats_buf_size not provided or invalid in JobConfiguration. Defaulting to 128.")
3746
bufSize = 128
47+
} else {
48+
logrus.Infof("Using stats_buf_size: %d.", bufSize)
3849
}
3950

4051
// Start stats collector
52+
logrus.Info("Starting stats collector...")
4153
s := stats.StartCollector(bufSize, jc)
54+
logrus.Info("Stats collector started successfully.")
4255

4356
// Set worker ID in stats collector if available
4457
if workerID, ok := jc["worker_id"].(string); ok && workerID != "" {
58+
logrus.Infof("Setting worker ID to '%s' in stats collector.", workerID)
4559
s.SetWorkerID(workerID)
60+
} else {
61+
logrus.Info("No worker ID found in JobConfiguration.")
62+
}
63+
64+
// Initialize job workers
65+
logrus.Info("Setting up job workers...")
66+
jobworkers := make(map[string]*jobWorkerEntry)
67+
68+
jobworkers[jobs.WebScraperType] = &jobWorkerEntry{
69+
w: jobs.NewWebScraper(jc, s),
4670
}
71+
logrus.Infof("Initialized job worker for: %s", jobs.WebScraperType)
4772

48-
jobworkers := map[string]*jobWorkerEntry{
49-
jobs.WebScraperType: {
50-
w: jobs.NewWebScraper(jc, s),
51-
},
52-
jobs.TwitterScraperType: {
53-
w: jobs.NewTwitterScraper(jc, s),
54-
},
55-
jobs.TelemetryJobType: {
56-
w: jobs.NewTelemetryJob(jc, s),
57-
},
73+
jobworkers[jobs.TwitterScraperType] = &jobWorkerEntry{
74+
w: jobs.NewTwitterScraper(jc, s),
5875
}
76+
logrus.Infof("Initialized job worker for: %s", jobs.TwitterScraperType)
77+
78+
jobworkers[jobs.TelemetryJobType] = &jobWorkerEntry{
79+
w: jobs.NewTelemetryJob(jc, s),
80+
}
81+
logrus.Infof("Initialized job worker for: %s", jobs.TelemetryJobType)
82+
83+
logrus.Info("Job workers setup completed.")
5984

85+
// Return the JobServer instance
86+
logrus.Info("JobServer initialization complete.")
6087
return &JobServer{
6188
jobChan: make(chan types.Job),
6289
results: make(map[string]types.JobResult),

0 commit comments

Comments
 (0)