Skip to content

Commit dbea10f

Browse files
authored
Fix connection pool exhaustion when inserting (#823)
The getHandle method was obtaining a connection from the pool and not freeing it Signed-off-by: henneberger <[email protected]>
1 parent c927c88 commit dbea10f

File tree

5 files changed

+114
-102
lines changed

5 files changed

+114
-102
lines changed

src/main/java/marquez/db/DatasetDao.java

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,30 +35,32 @@
3535
public interface DatasetDao extends SqlObject {
3636
@Transaction
3737
default void insert(DatasetRow row) {
38-
getHandle()
39-
.createUpdate(
40-
"INSERT INTO datasets ("
41-
+ "uuid, "
42-
+ "type, "
43-
+ "created_at, "
44-
+ "updated_at, "
45-
+ "namespace_uuid, "
46-
+ "source_uuid, "
47-
+ "name, "
48-
+ "physical_name, "
49-
+ "description"
50-
+ ") VALUES ("
51-
+ ":uuid, "
52-
+ ":type, "
53-
+ ":createdAt, "
54-
+ ":updatedAt, "
55-
+ ":namespaceUuid, "
56-
+ ":sourceUuid, "
57-
+ ":name, "
58-
+ ":physicalName, "
59-
+ ":description)")
60-
.bindBean(row)
61-
.execute();
38+
withHandle(
39+
handle ->
40+
handle
41+
.createUpdate(
42+
"INSERT INTO datasets ("
43+
+ "uuid, "
44+
+ "type, "
45+
+ "created_at, "
46+
+ "updated_at, "
47+
+ "namespace_uuid, "
48+
+ "source_uuid, "
49+
+ "name, "
50+
+ "physical_name, "
51+
+ "description"
52+
+ ") VALUES ("
53+
+ ":uuid, "
54+
+ ":type, "
55+
+ ":createdAt, "
56+
+ ":updatedAt, "
57+
+ ":namespaceUuid, "
58+
+ ":sourceUuid, "
59+
+ ":name, "
60+
+ ":physicalName, "
61+
+ ":description)")
62+
.bindBean(row)
63+
.execute());
6264
// Tags
6365
final Instant taggedAt = row.getCreatedAt();
6466
row.getTagUuids().forEach(tagUuid -> updateTags(row.getUuid(), tagUuid, taggedAt));

src/main/java/marquez/db/DatasetFieldDao.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,26 +38,28 @@ default void insertAll(List<DatasetFieldRow> rows) {
3838

3939
@Transaction
4040
default void insert(DatasetFieldRow row) {
41-
getHandle()
42-
.createUpdate(
43-
"INSERT INTO dataset_fields ("
44-
+ "uuid, "
45-
+ "type, "
46-
+ "created_at, "
47-
+ "updated_at, "
48-
+ "dataset_uuid, "
49-
+ "name, "
50-
+ "description"
51-
+ ") VALUES ("
52-
+ ":uuid, "
53-
+ ":type, "
54-
+ ":createdAt, "
55-
+ ":updatedAt, "
56-
+ ":datasetUuid, "
57-
+ ":name, "
58-
+ ":description)")
59-
.bindBean(row)
60-
.execute();
41+
withHandle(
42+
handle ->
43+
handle
44+
.createUpdate(
45+
"INSERT INTO dataset_fields ("
46+
+ "uuid, "
47+
+ "type, "
48+
+ "created_at, "
49+
+ "updated_at, "
50+
+ "dataset_uuid, "
51+
+ "name, "
52+
+ "description"
53+
+ ") VALUES ("
54+
+ ":uuid, "
55+
+ ":type, "
56+
+ ":createdAt, "
57+
+ ":updatedAt, "
58+
+ ":datasetUuid, "
59+
+ ":name, "
60+
+ ":description)")
61+
.bindBean(row)
62+
.execute());
6163
// Tags
6264
final Instant taggedAt = row.getCreatedAt();
6365
row.getTagUuids().forEach(tagUuid -> updateTags(row.getUuid(), tagUuid, taggedAt));

src/main/java/marquez/db/JobVersionDao.java

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,30 @@ enum IoType {
4141

4242
@Transaction
4343
default void insert(@NonNull JobVersionRow row) {
44-
getHandle()
45-
.createUpdate(
46-
"INSERT INTO job_versions ("
47-
+ "uuid, "
48-
+ "created_at, "
49-
+ "updated_at, "
50-
+ "job_uuid, "
51-
+ "version, "
52-
+ "location, "
53-
+ "latest_run_uuid, "
54-
+ "job_context_uuid"
55-
+ ") VALUES ("
56-
+ ":uuid, "
57-
+ ":createdAt, "
58-
+ ":updateAt, "
59-
+ ":jobUuid, "
60-
+ ":version, "
61-
+ ":location, "
62-
+ ":latestRunUuid, "
63-
+ ":jobContextUuid)")
64-
.bindBean(row)
65-
.execute();
44+
withHandle(
45+
handle ->
46+
handle
47+
.createUpdate(
48+
"INSERT INTO job_versions ("
49+
+ "uuid, "
50+
+ "created_at, "
51+
+ "updated_at, "
52+
+ "job_uuid, "
53+
+ "version, "
54+
+ "location, "
55+
+ "latest_run_uuid, "
56+
+ "job_context_uuid"
57+
+ ") VALUES ("
58+
+ ":uuid, "
59+
+ ":createdAt, "
60+
+ ":updateAt, "
61+
+ ":jobUuid, "
62+
+ ":version, "
63+
+ ":location, "
64+
+ ":latestRunUuid, "
65+
+ ":jobContextUuid)")
66+
.bindBean(row)
67+
.execute());
6668
// I/O
6769
row.getInputUuids().forEach(inputUuid -> updateInputs(row.getUuid(), inputUuid));
6870
row.getOutputUuids().forEach(outputUuid -> updateOutputs(row.getUuid(), outputUuid));

src/main/java/marquez/db/RunDao.java

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -42,26 +42,28 @@ default void insertWith(RunRow row, List<UUID> inputVersionUuids) {
4242

4343
@Transaction
4444
default void insert(RunRow row) {
45-
getHandle()
46-
.createUpdate(
47-
"INSERT INTO runs ("
48-
+ "uuid, "
49-
+ "created_at, "
50-
+ "updated_at, "
51-
+ "job_version_uuid, "
52-
+ "run_args_uuid, "
53-
+ "nominal_start_time, "
54-
+ "nominal_end_time"
55-
+ ") VALUES ("
56-
+ ":uuid, "
57-
+ ":createdAt, "
58-
+ ":updatedAt, "
59-
+ ":jobVersionUuid, "
60-
+ ":runArgsUuid, "
61-
+ ":nominalStartTime, "
62-
+ ":nominalEndTime)")
63-
.bindBean(row)
64-
.execute();
45+
withHandle(
46+
handle ->
47+
handle
48+
.createUpdate(
49+
"INSERT INTO runs ("
50+
+ "uuid, "
51+
+ "created_at, "
52+
+ "updated_at, "
53+
+ "job_version_uuid, "
54+
+ "run_args_uuid, "
55+
+ "nominal_start_time, "
56+
+ "nominal_end_time"
57+
+ ") VALUES ("
58+
+ ":uuid, "
59+
+ ":createdAt, "
60+
+ ":updatedAt, "
61+
+ ":jobVersionUuid, "
62+
+ ":runArgsUuid, "
63+
+ ":nominalStartTime, "
64+
+ ":nominalEndTime)")
65+
.bindBean(row)
66+
.execute());
6567
// Input versions
6668
row.getInputVersionUuids()
6769
.forEach(inputVersionUuid -> updateInputVersions(row.getUuid(), inputVersionUuid));
@@ -72,16 +74,18 @@ default void insert(RunRow row) {
7274

7375
@Transaction
7476
default void updateJobVersionUuid(UUID rowUuid, Instant updatedAt, UUID jobVersionUuid) {
75-
getHandle()
76-
.createUpdate(
77-
"UPDATE runs "
78-
+ "SET updated_at = :updatedAt, "
79-
+ " job_version_uuid = :jobVersionUuid "
80-
+ "WHERE uuid = :rowUuid")
81-
.bind("updatedAt", updatedAt)
82-
.bind("jobVersionUuid", jobVersionUuid)
83-
.bind("rowUuid", rowUuid)
84-
.execute();
77+
withHandle(
78+
handle ->
79+
handle
80+
.createUpdate(
81+
"UPDATE runs "
82+
+ "SET updated_at = :updatedAt, "
83+
+ " job_version_uuid = :jobVersionUuid "
84+
+ "WHERE uuid = :rowUuid")
85+
.bind("updatedAt", updatedAt)
86+
.bind("jobVersionUuid", jobVersionUuid)
87+
.bind("rowUuid", rowUuid)
88+
.execute());
8589
createJobVersionDao().updateLatestRun(jobVersionUuid, updatedAt, rowUuid);
8690
}
8791

src/main/java/marquez/db/RunStateDao.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ default void insert(
4141
boolean starting,
4242
boolean done,
4343
boolean complete) {
44-
getHandle()
45-
.createUpdate(
46-
"INSERT INTO run_states (uuid, transitioned_at, run_uuid, state)"
47-
+ "VALUES (:uuid, :transitionedAt, :runUuid, :state)")
48-
.bindBean(row)
49-
.execute();
44+
withHandle(
45+
handle ->
46+
handle
47+
.createUpdate(
48+
"INSERT INTO run_states (uuid, transitioned_at, run_uuid, state)"
49+
+ "VALUES (:uuid, :transitionedAt, :runUuid, :state)")
50+
.bindBean(row)
51+
.execute());
5052
// State transition
5153
final Instant updateAt = row.getTransitionedAt();
5254
createRunDao().updateRunState(row.getRunUuid(), updateAt, row.getState());

0 commit comments

Comments
 (0)