Skip to content

Commit fb88ee6

Browse files
committed
feat: parquet export - limit cpu usage
1 parent e2ba5f0 commit fb88ee6

File tree

4 files changed

+44
-2
lines changed

4 files changed

+44
-2
lines changed

arc-batch/src/main/java/fr/insee/arc/batch/BatchARC.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ private void initializeBatchLoop() {
737737
* start paralell thread
738738
*
739739
* @throws ArcException
740+
* @throws InterruptedException
740741
*/
741742
private void executeLoopOverPhases() throws ArcException {
742743

@@ -762,6 +763,13 @@ private void executeLoopOverPhases() throws ArcException {
762763
waitAndClear();
763764

764765
} while (!exit);
766+
767+
// wait for maintenance thread to finish
768+
try {
769+
maintenance.join();
770+
} catch (InterruptedException e) {
771+
message("Maintenance thread had been interrupted");
772+
}
765773

766774
message("Fin de la boucle d'itération");
767775

arc-core/src/main/java/fr/insee/arc/core/service/p6export/parquet/ParquetDao.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
1010

11+
import fr.insee.arc.core.service.p1reception.provider.DirectoriesReception;
12+
import fr.insee.arc.core.service.p2chargement.bo.NormeRules;
13+
import fr.insee.arc.core.util.BDParameters;
1114
import fr.insee.arc.utils.consumer.ThrowingConsumer;
1215
import fr.insee.arc.utils.dao.DuckdbDao;
1316
import fr.insee.arc.utils.dao.GenericPreparedStatementBuilder;
@@ -23,6 +26,10 @@ public class ParquetDao {
2326

2427
private static final Logger LOGGER = LogManager.getLogger(ParquetDao.class);
2528

29+
// Ratio MAX_PARALLEL_WORKERS parameters
30+
// used not to over throttle input database or s3
31+
private static final float RATIO_OF_CPU_USED_TO_EXPORT_PARQUET = 0.5F;
32+
2633
// parquet file format as "file.parquet"
2734
private static final String PARQUET_FILE_EXTENSION = ".parquet";
2835

@@ -69,6 +76,8 @@ public void exportToParquet(List<TableToRetrieve> tables, String outputDirectory
6976
{
7077
addParquetEncryptionKeyInDuckDb(connection);
7178

79+
defineNumberOfThreadThread(connection);
80+
7281
// create output directory
7382
FileUtilsArc.createDirIfNotexist(outputDirectory);
7483

@@ -79,10 +88,26 @@ public void exportToParquet(List<TableToRetrieve> tables, String outputDirectory
7988
}
8089
};
8190

91+
8292
duckdbDao.executeOnDuckdb(exportToParquetOperation);
8393

8494
}
8595

96+
private void defineNumberOfThreadThread(Connection connection) throws ArcException {
97+
98+
BDParameters bdParameters = new BDParameters(ArcDatabase.COORDINATOR);
99+
100+
int maxParallelWorkers = (int) (bdParameters.getInt(null,"ApiService.MAX_PARALLEL_WORKERS", 4) * RATIO_OF_CPU_USED_TO_EXPORT_PARQUET) ;
101+
maxParallelWorkers = (maxParallelWorkers < 1)? 1 : maxParallelWorkers;
102+
103+
GenericPreparedStatementBuilder query = new GenericPreparedStatementBuilder();
104+
query.append("SET threads=").append(query.quoteInt(maxParallelWorkers)).append(";");
105+
106+
duckdbDao.executeQuery(connection, query);
107+
108+
}
109+
110+
86111
private void addParquetEncryptionKeyInDuckDb(Connection connection) throws ArcException {
87112

88113
if (encryptionKey == null) {

arc-core/src/test/java/fr/insee/arc/core/service/p6export/parquet/ParquetDaoTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,15 @@ private void createTestTable(Connection connection, String testTableName) throws
9999

100100
// test query on
101101
GenericPreparedStatementBuilder query = new GenericPreparedStatementBuilder();
102+
103+
query.build(SQL.CREATE, SQL.SCHEMA, SQL.IF_NOT_EXISTS, "arc", SQL.END_QUERY);
104+
105+
query.build(SQL.CREATE, SQL.TABLE, "arc.parameter", SQL.AS, SQL.SELECT);
106+
query.append("'ApiService.MAX_PARALLEL_WORKERS'::text as key");
107+
query.append(",'12'::text as val");
108+
query.append(SQL.END_QUERY);
109+
110+
102111
query.build(SQL.CREATE, SQL.TABLE, testTableName, SQL.AS, SQL.SELECT);
103112
query.append("'string'::text as column_string");
104113
query.append(",12::int as column_int4");
@@ -110,7 +119,8 @@ private void createTestTable(Connection connection, String testTableName) throws
110119
query.append(",array[8, 9, 10]::int[] as column_array_int8");
111120
query.append(",array[current_timestamp, current_timestamp] as column_array_timestamp");
112121
query.append(",array[current_date, current_date] as column_array_date");
113-
122+
query.append(SQL.END_QUERY);
123+
114124
UtilitaireDao.get(0).executeRequest(connection, query);
115125
}
116126

arc-ws/src/main/java/fr/insee/arc/ws/services/importServlet/dao/ClientDao.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import fr.insee.arc.core.dataobjects.ColumnEnum;
1515
import fr.insee.arc.core.dataobjects.ViewEnum;
1616
import fr.insee.arc.core.model.ExportOption;
17-
import fr.insee.arc.core.model.TraitementEtape;
1817
import fr.insee.arc.core.model.TraitementEtat;
1918
import fr.insee.arc.core.model.TraitementPhase;
2019
import fr.insee.arc.core.service.global.dao.TableNaming;

0 commit comments

Comments
 (0)