-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbigquery.js
More file actions
97 lines (86 loc) · 2.58 KB
/
bigquery.js
File metadata and controls
97 lines (86 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
const { BigQuery } = require('@google-cloud/bigquery');
const csv = require('csv-parser');
const through2 = require('through2');
const iconv = require('iconv-lite');
const { bigqueryConfig, erb } = require('./config/bigquery.js');
// Check all env variables in bigquery config
if (Object.values(bigqueryConfig).includes(undefined)) {
console.log(bigqueryConfig);
console.error('no connection settings in env');
process.exit(100);
}
// Connect to Google BigQuery
const db = new BigQuery({
projectId: bigqueryConfig.projectID,
credentials: {
client_email: bigqueryConfig.client_email,
private_key: bigqueryConfig.private_key,
},
clientOptions: {
clientId: bigqueryConfig.clientId,
},
});
// Create table with name in tableID
const createTable = (tableConfig) => {
const options = {
schema: tableConfig.settlementsSchema,
};
console.log('Creating table ', tableConfig.tableID);
return db
.dataset(bigqueryConfig.datasetID)
.createTable(tableConfig.tableID, options);
};
// Drop table and call createTable
module.exports.getTable = async (tableConfig) => {
const exists = await db
.dataset(bigqueryConfig.datasetID)
.table(tableConfig.tableID)
.exists();
if (exists[0]) {
console.log('Drop table');
await db
.dataset(bigqueryConfig.datasetID)
.table(tableConfig.tableID)
.delete();
}
return createTable(tableConfig);
};
module.exports.insertData = (fileStream, tableConfig) => {
let count = 1;
// Creating BigQuery writting stream
const bqStream = db
.dataset(bigqueryConfig.datasetID)
.table(tableConfig.tableID)
.createWriteStream({
sourceFormat: 'NEWLINE_DELIMITED_JSON',
});
bqStream.on('error', function (e) {
console.log(e);
process.exit(1);
});
const propertiesArray = [];
erb.settlementsSchema.forEach((obj) => propertiesArray.push(obj.name));
const iconvStream = fileStream.pipe(iconv.decodeStream('win1251'));
const csvStream = iconvStream.pipe(csv());
const checkedStream = csvStream.pipe(
through2.obj(async function (chunk, enc, callback) {
const queryObj = {};
const chunkProps = Object.keys(chunk);
propertiesArray.forEach((prop, index) => {
queryObj[prop] = chunk?.[chunkProps?.[index]] ?? '';
});
const toSend = JSON.stringify(queryObj) + '\n';
if (count % 10000 === 0) {
console.log(count);
}
count++;
this.push(toSend);
callback();
})
);
const streamToBigQuery = checkedStream.pipe(bqStream);
bqStream.once('complete', () => {
console.log('bq end');
process.exit(0);
});
};