-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: parallelization #352
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 10 commits
2599712
0c655fc
641e182
352447b
4ffbcba
4c0eb30
bd25950
8360d3d
38539d5
e0b52c1
b2e8332
7d0339a
c4c77e6
5c6f478
5becc84
5e8a6d1
73c2cc3
6984401
7105749
a931a13
5411484
7650794
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,10 @@ import Concurrency from './utils/concurrency'; | |
import Preprocessor from './preprocessor'; | ||
import log, { Level } from './utils/logger'; | ||
|
||
import { WorkerConfig } from './types/worker'; | ||
import { WorkerPool } from './utils/worker-pool'; | ||
import os from 'os'; | ||
|
||
/** | ||
* Extending the Window interface for custom scraping functions. | ||
*/ | ||
|
@@ -39,6 +43,7 @@ declare global { | |
interface InterpreterOptions { | ||
maxRepeats: number; | ||
maxConcurrency: number; | ||
maxWorkers: number; | ||
serializableCallback: (output: any) => (void | Promise<void>); | ||
binaryCallback: (output: any, mimeType: string) => (void | Promise<void>); | ||
debug: boolean; | ||
|
@@ -68,13 +73,16 @@ export default class Interpreter extends EventEmitter { | |
|
||
private cumulativeResults: Record<string, any>[] = []; | ||
|
||
private workerPool: WorkerPool; | ||
|
||
constructor(workflow: WorkflowFile, options?: Partial<InterpreterOptions>) { | ||
super(); | ||
this.workflow = workflow.workflow; | ||
this.initializedWorkflow = null; | ||
this.options = { | ||
maxRepeats: 5, | ||
maxConcurrency: 5, | ||
maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)), | ||
serializableCallback: (data) => { | ||
log(JSON.stringify(data), Level.WARN); | ||
}, | ||
|
@@ -85,6 +93,7 @@ export default class Interpreter extends EventEmitter { | |
}; | ||
this.concurrency = new Concurrency(this.options.maxConcurrency); | ||
this.log = (...args) => log(...args); | ||
this.workerPool = new WorkerPool(this.options.maxWorkers); | ||
|
||
const error = Preprocessor.validateWorkflow(workflow); | ||
if (error) { | ||
|
@@ -451,7 +460,7 @@ export default class Interpreter extends EventEmitter { | |
const scrapeResults: Record<string, any>[] = await page.evaluate((cfg) => window.scrapeList(cfg), config); | ||
await this.options.serializableCallback(scrapeResults); | ||
} else { | ||
const scrapeResults: Record<string, any>[] = await this.handlePagination(page, config); | ||
const scrapeResults: Record<string, any>[] = await this.handleParallelPagination(page, config); | ||
await this.options.serializableCallback(scrapeResults); | ||
} | ||
}, | ||
|
@@ -540,6 +549,177 @@ export default class Interpreter extends EventEmitter { | |
} | ||
} | ||
|
||
private async handleParallelPagination(page: Page, config: any): Promise<any[]> { | ||
if (config.limit > 10000 && config.pagination.type === 'clickNext') { | ||
console.time('parallel-scraping'); | ||
|
||
const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, 4)); | ||
const batchSize = Math.ceil(config.limit / numWorkers); | ||
const workerPool = new WorkerPool(numWorkers); | ||
const pageUrls: string[] = []; | ||
|
||
let workers: any = null; | ||
let availableSelectors = config.pagination.selector.split(','); | ||
let visitedUrls: string[] = []; | ||
|
||
const { itemsPerPage, estimatedPages } = await page.evaluate( | ||
({ listSelector, limit }) => { | ||
const items = document.querySelectorAll(listSelector).length; | ||
return { | ||
itemsPerPage: items, | ||
estimatedPages: Math.ceil(limit / items) | ||
}; | ||
}, | ||
{ listSelector: config.listSelector, limit: config.limit } | ||
); | ||
|
||
console.log(`Items per page: ${itemsPerPage}`); | ||
console.log(`Estimated pages needed: ${estimatedPages}`); | ||
|
||
try { | ||
while (true) { | ||
pageUrls.push(page.url()) | ||
|
||
if (pageUrls.length >= estimatedPages) { | ||
console.log('Reached estimated number of pages. Stopping pagination.'); | ||
break; | ||
} | ||
|
||
let checkButton = null; | ||
let workingSelector = null; | ||
|
||
for (let i = 0; i < availableSelectors.length; i++) { | ||
const selector = availableSelectors[i]; | ||
try { | ||
// Wait for selector with a short timeout | ||
checkButton = await page.waitForSelector(selector, { state: 'attached' }); | ||
if (checkButton) { | ||
workingSelector = selector; | ||
break; | ||
} | ||
} catch (error) { | ||
console.log(`Selector failed: ${selector}`); | ||
} | ||
} | ||
|
||
if(!workingSelector) { | ||
break; | ||
} | ||
|
||
const nextButton = await page.$(workingSelector); | ||
if (!nextButton) { | ||
break; | ||
} | ||
|
||
const selectorIndex = availableSelectors.indexOf(workingSelector!); | ||
availableSelectors = availableSelectors.slice(selectorIndex); | ||
|
||
const previousUrl = page.url(); | ||
visitedUrls.push(previousUrl); | ||
|
||
try { | ||
// Try both click methods simultaneously | ||
await Promise.race([ | ||
Promise.all([ | ||
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), | ||
nextButton.click() | ||
]), | ||
Promise.all([ | ||
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), | ||
nextButton.dispatchEvent('click') | ||
]) | ||
]); | ||
} catch (error) { | ||
// Verify if navigation actually succeeded | ||
const currentUrl = page.url(); | ||
if (currentUrl === previousUrl) { | ||
console.log("Previous URL same as current URL. Navigation failed."); | ||
} | ||
} | ||
|
||
const currentUrl = page.url(); | ||
if (visitedUrls.includes(currentUrl)) { | ||
console.log(`Detected navigation to a previously visited URL: ${currentUrl}`); | ||
|
||
// Extract the current page number from the URL | ||
const match = currentUrl.match(/\d+/); | ||
if (match) { | ||
const currentNumber = match[0]; | ||
// Use visitedUrls.length + 1 as the next page number | ||
const nextNumber = visitedUrls.length + 1; | ||
|
||
// Create new URL by replacing the current number with the next number | ||
const nextUrl = currentUrl.replace(currentNumber, nextNumber.toString()); | ||
|
||
console.log(`Navigating to constructed URL: ${nextUrl}`); | ||
|
||
// Navigate to the next page | ||
await Promise.all([ | ||
page.waitForNavigation({ waitUntil: 'networkidle' }), | ||
page.goto(nextUrl) | ||
]); | ||
} | ||
} | ||
|
||
await page.waitForTimeout(1000); | ||
} | ||
Comment on lines
+596
to
+682
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Modularize the pagination navigation logic. The navigation logic is complex and could benefit from being split into smaller, focused functions for better maintainability and testing. Consider extracting these functionalities:
Example refactor for URL collection: private async collectPageUrls(page: Page, config: any): Promise<string[]> {
const pageUrls: string[] = [];
const visitedUrls: Set<string> = new Set();
while (true) {
const currentUrl = page.url();
if (visitedUrls.has(currentUrl)) {
break;
}
pageUrls.push(currentUrl);
visitedUrls.add(currentUrl);
if (!await this.navigateToNextPage(page)) {
break;
}
}
return pageUrls;
} |
||
} catch (error) { | ||
console.error('Error collecting page URLs:', error); | ||
} | ||
|
||
console.log(`Collected ${pageUrls.length} unique page URLs`); | ||
|
||
workerPool.on('progress', (progress) => { | ||
console.log( | ||
`Worker ${progress.workerId}: ` + | ||
`${progress.percentage.toFixed(2)}% complete, ` + | ||
`${progress.scrapedItems} items scraped, ` + | ||
`ETA: ${Math.round(progress.estimatedTimeRemaining / 1000)}s` | ||
); | ||
}); | ||
|
||
workerPool.on('globalProgress', (metrics) => { | ||
// Global progress is automatically logged by the worker pool | ||
}); | ||
|
||
try { | ||
// Distribute pages among workers | ||
const pagesPerWorker = Math.ceil(pageUrls.length / numWorkers); | ||
const workerConfigs: WorkerConfig[] = Array.from( | ||
{ length: numWorkers }, | ||
(_, i) => ({ | ||
workerIndex: i, | ||
startIndex: i * batchSize, | ||
endIndex: Math.min((i + 1) * batchSize, config.limit), | ||
batchSize, | ||
pageUrls: pageUrls.slice( | ||
i * pagesPerWorker, | ||
Math.min((i + 1) * pagesPerWorker, pageUrls.length) | ||
), | ||
listSelector: config.listSelector, | ||
fields: config.fields, | ||
pagination: config.pagination | ||
}) | ||
); | ||
|
||
const results = await workerPool.runWorkers(workerConfigs); | ||
|
||
// Process and sort results | ||
const sortedResults = results.sort((a, b) => { | ||
return (a.index || 0) - (b.index || 0); | ||
}); | ||
|
||
console.timeEnd('parallel-scraping'); | ||
return sortedResults.slice(0, config.limit); | ||
} catch (error) { | ||
console.error('Parallel scraping failed!'); | ||
return this.handlePagination(page, config); | ||
} | ||
} | ||
|
||
return this.handlePagination(page, config); | ||
} | ||
|
||
private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { | ||
let allResults: Record<string, any>[] = []; | ||
let previousHeight = 0; | ||
|
@@ -556,6 +736,7 @@ export default class Interpreter extends EventEmitter { | |
await page.waitForTimeout(2000); | ||
|
||
const currentHeight = await page.evaluate(() => document.body.scrollHeight); | ||
console.log(`Current scroll height: ${currentHeight}`); | ||
if (currentHeight === previousHeight) { | ||
const finalResults = await page.evaluate((cfg) => window.scrapeList(cfg), config); | ||
allResults = allResults.concat(finalResults); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
export interface WorkerConfig { | ||
workerIndex: number; | ||
startIndex: number; | ||
endIndex: number; | ||
batchSize: number; | ||
pageUrls: string[]; | ||
listSelector: string; | ||
fields: any; | ||
pagination: { | ||
type: string; | ||
selector: string; | ||
}; | ||
} | ||
|
||
export interface SharedState { | ||
totalScraped: number; | ||
results: any[]; | ||
} | ||
Comment on lines
+15
to
+18
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Thread safety and type safety concerns are valid issues that need addressing The codebase shows:
Suggested improvements:
🔗 Analysis chainConsider thread safety and type safety for SharedState. The shared state between workers needs careful consideration:
Consider using a thread-safe data structure or implementing proper synchronization mechanisms. Also, define a generic type parameter: -export interface SharedState {
+export interface SharedState<T = unknown> {
totalScraped: number;
- results: any[];
+ results: T[];
} 🏁 Scripts executedThe following scripts were executed for the analysis: Error: No shell scripts found Script: #!/bin/bash
# Find all files importing or using SharedState
rg "SharedState" -l
# Look for actual usage patterns of SharedState
rg "SharedState" -A 5 -B 5
# Check for any thread synchronization patterns
rg -i "sync|mutex|lock|atomic" maxun-core/src/
# Look for type definitions related to worker results
ast-grep --pattern 'type $_ = $_' maxun-core/src/types/
Length of output: 12076 |
||
|
||
export interface WorkerProgressData { | ||
percentage: number; | ||
currentUrl: string; | ||
scrapedItems: number; | ||
timeElapsed: number; | ||
estimatedTimeRemaining: number; | ||
failures: number; | ||
performance: PerformanceMetrics; | ||
} | ||
|
||
export interface PerformanceMetrics { | ||
startTime: number; | ||
endTime: number; | ||
duration: number; | ||
pagesProcessed: number; | ||
itemsScraped: number; | ||
failedPages: number; | ||
averageTimePerPage: number; | ||
memoryUsage: { | ||
heapUsed: number; | ||
heapTotal: number; | ||
external: number; | ||
rss: number; | ||
}; | ||
cpuUsage: { | ||
user: number; | ||
system: number; | ||
}; | ||
} | ||
|
||
export interface GlobalMetrics { | ||
totalPagesProcessed: number; | ||
totalItemsScraped: number; | ||
totalFailures: number; | ||
workersActive: number; | ||
averageSpeed: number; | ||
timeElapsed: number; | ||
memoryUsage: NodeJS.MemoryUsage; | ||
cpuUsage: NodeJS.CpuUsage; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add missing property to
InterpreterOptions
interfaceThe
serializableCallback
property is used in the code but not defined in the providedInterpreterOptions
interface.Ensure that
serializableCallback
is correctly defined in the interface to maintain type safety.