-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnewsFetcher.js
More file actions
153 lines (137 loc) · 4.67 KB
/
newsFetcher.js
File metadata and controls
153 lines (137 loc) · 4.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
require('dotenv').config();
const fs = require('fs');
const path = require('path');
const pLimit = require('p-limit').default;
const { Pool } = require('pg');
const redisClient = require('./redisClient');
// ===================
// Postgres Connection
// ===================
const pool = new Pool({
user: process.env.DB_USER,
host: process.env.DB_HOST,
database: process.env.DB_NAME,
password: process.env.DB_PASSWORD,
port: process.env.DB_PORT
});
// ======
// Config
// ======
const BATCH_SIZE = 50; // Insert 50 articles / query
const CONCURRENCY_LIMIT = 5; // Max 5 concurrent ops
const PARSERS_DIR = path.join(__dirname, 'Parsers'); // Parser folders directory
// ======================================================
// Recursively find parser files in the parsers directory
// ======================================================
function findParsers(dir) {
let parsers = [];
const files = fs.readdirSync(dir);
files.forEach(file => {
const fullPath = path.join(dir, file);
const stat = fs.statSync(fullPath);
if (stat.isDirectory()) {
parsers = parsers.concat(findParsers(fullPath));
} else if (stat.isFile() && file.endsWith('.js')) {
parsers.push(fullPath);
}
});
return parsers;
}
// =============================
// Batch Insert with concurrency
// =============================
async function batchInsertNews(client, articles) {
if (!articles.length) return;
// Split articles into chunks
const chunks = [];
for (let i=0; i<articles.length; i+=BATCH_SIZE) {
chunks.push(articles.slice(i, i+BATCH_SIZE));
}
// Limit concurrent inserts
const limit = pLimit(CONCURRENCY_LIMIT);
await Promise.all(
chunks.map(chunk =>
limit(async () => {
const values = [];
const placeholders = [];
chunk.forEach((a, idx) => {
const start = idx * 9 + 1;
placeholders.push(`($${start}, $${start + 1}, $${start + 2}, $${start + 3}, $${start + 4}, $${start + 5}, $${start + 6}, $${start + 7}, $${start + 8})`);
values.push(a.title, a.crux, a.news_type, a.location, a.source, a.source_country, a.url, a.image_url, a.published_at);
});
const query = `
INSERT INTO news
(title, crux, news_type, location, source, source_country, url, image_url, published_at)
VALUES ${placeholders.join(',')}
ON CONFLICT (url) DO NOTHING
`;
await client.query(query, values);
})
)
);
}
// ===============================================
// Fetch, process, and store news from all parsers
// ===============================================
async function fetchAndStoreNews(client = pool) {
console.log(`[${new Date().toISOString()}] Starting news fetch...`);
const limit = pLimit(CONCURRENCY_LIMIT);
const parsers = findParsers(PARSERS_DIR);
let totalInserted = 0;
await Promise.all(
parsers.map(parserPath =>
limit(async () => {
try {
const parserModule = require(parserPath);
if (typeof parserModule !== 'function') {
console.warn(`Parser at ${parserPath} does not export parse(). Skipping.`);
return;
}
const articles = await parserModule();
if (!Array.isArray(articles) || !articles.length) return;
// Insert articles in scalable batches
await batchInsertNews(client, articles);
totalInserted += articles.length;
console.log(`Inserted ${articles.length} articles from parser ${parserPath}`);
} catch (err) {
console.error(`Error processing parser ${parserPath}:`, err);
}
})
)
);
console.log(`[${new Date().toISOString()}] Fetch + Store complete. Total articles inserted: ${totalInserted}`);
return totalInserted;
}
// ===============
// Delete all news
// ===============
async function deleteAllNews(client = pool) {
try {
console.log(`[${new Date().toISOString()}] Deleting all news from Postgres...`);
await client.query("TRUNCATE news RESTART IDENTITY");
console.log(`[${new Date().toISOString()}] Deleting all news from Redis...`);
const keys = await redisClient.keys('news:*');
if (keys.length) await redisClient.del(keys);
console.log(`[${new Date().toISOString()}] All news deleted.`);
} catch (err) {
console.error(err);
throw err;
}
}
// ==========
// Manual run
// ==========
if (require.main === module) {
(async () => {
try {
console.log("Manual fetch initiated...");
await fetchAndStoreNews();
console.log("Manual fetch complete.");
process.exit(0);
} catch (err) {
console.error(err);
process.exit(1);
}
})();
}
module.exports = { fetchAndStoreNews, deleteAllNews, pool };