Skip to content

Commit 7fba5d5

Browse files
committed
feat(maintenance-service): Add a job to deduplicate packages
Due to an issue fixed in fb57726, duplicates of packages were stored in the database. The new mechanism to prevent duplicates is very slow if the database contains a lot of packages (>1,000,000), so add a maintenance job to remove the duplicates. Signed-off-by: Martin Nonnenmacher <[email protected]>
1 parent 3f32a25 commit 7fba5d5

File tree

2 files changed

+795
-0
lines changed

2 files changed

+795
-0
lines changed
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
package org.eclipse.apoapsis.ortserver.services.maintenance.jobs
21+
22+
import java.sql.Connection
23+
24+
import kotlinx.serialization.Serializable
25+
import kotlinx.serialization.SerializationException
26+
import kotlinx.serialization.json.Json
27+
import kotlinx.serialization.json.JsonObject
28+
import kotlinx.serialization.json.decodeFromJsonElement
29+
import kotlinx.serialization.json.encodeToJsonElement
30+
import kotlinx.serialization.json.jsonObject
31+
32+
import org.eclipse.apoapsis.ortserver.dao.blockingQuery
33+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.MappedDeclaredLicensesTable
34+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesAnalyzerRunsTable
35+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesAuthorsTable
36+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesDeclaredLicensesTable
37+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesTable
38+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.ProcessedDeclaredLicensesMappedDeclaredLicensesTable
39+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.ProcessedDeclaredLicensesTable
40+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable
41+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.UnmappedDeclaredLicensesTable
42+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobData
43+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobStatus
44+
import org.eclipse.apoapsis.ortserver.model.runs.ProcessedDeclaredLicense
45+
import org.eclipse.apoapsis.ortserver.services.maintenance.MaintenanceJob
46+
import org.eclipse.apoapsis.ortserver.services.maintenance.MaintenanceService
47+
48+
import org.jetbrains.exposed.sql.Database
49+
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
50+
import org.jetbrains.exposed.sql.and
51+
import org.jetbrains.exposed.sql.deleteWhere
52+
import org.jetbrains.exposed.sql.min
53+
import org.jetbrains.exposed.sql.selectAll
54+
import org.jetbrains.exposed.sql.update
55+
56+
import org.slf4j.LoggerFactory
57+
58+
private val json = Json.Default
59+
private val logger = LoggerFactory.getLogger(DeduplicatePackagesJob::class.java)
60+
61+
/**
62+
* Progress data for the [DeduplicatePackagesJob].
63+
*/
64+
@Serializable
65+
private data class DeduplicatePackagesJobData(
66+
/** The last processed package ID. */
67+
val lastPackageId: Long,
68+
69+
/** The number of unique deduplicated packages. */
70+
val deduplicatedPackages: Long,
71+
72+
/** The number of removed duplicate packages. */
73+
val removedDuplicates: Long
74+
)
75+
76+
/**
77+
* A maintenance job to deduplicate packages. The algorithm works as follows:
78+
*
79+
* 1. Find all packages that are equal to the package with the provided ID.
80+
* 2. Update all references to the duplicate packages to point to the original package.
81+
* 3. Delete the duplicate packages.
82+
* 4. Repeat until all packages are processed.
83+
*/
84+
class DeduplicatePackagesJob(private val db: Database) : MaintenanceJob() {
85+
override val name = "DeduplicatePackages"
86+
87+
private var curIndex: Long = -1L
88+
private lateinit var curJobData: DeduplicatePackagesJobData
89+
90+
override suspend fun execute(service: MaintenanceService, jobData: MaintenanceJobData) {
91+
curJobData = jobData.data?.let { deserializeJobData(it) } ?: DeduplicatePackagesJobData(-1L, 0, 0)
92+
curIndex = curJobData.lastPackageId
93+
94+
while (nextPackageId() != null) {
95+
deduplicatePackage(service, jobData.id)
96+
}
97+
}
98+
99+
private fun nextPackageId() = db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
100+
val minId = PackagesTable.id.min()
101+
102+
PackagesTable.select(minId)
103+
.where { PackagesTable.id greater curIndex }
104+
.singleOrNull()
105+
?.get(minId)?.let {
106+
curIndex = it.value
107+
curIndex
108+
}
109+
}
110+
111+
/**
112+
* Find all duplicates of the package with the ID [curIndex], update references to point to this package instead,
113+
* and delete the duplicates.
114+
*/
115+
private fun deduplicatePackage(service: MaintenanceService, jobId: Long) {
116+
db.blockingQuery {
117+
logger.info("Deduplicating package with ID $curIndex.")
118+
119+
val equalPackages = findEqualPackages(curIndex)
120+
121+
logger.info(
122+
"Found ${equalPackages.count()} equal packages for package with ID $curIndex: $equalPackages"
123+
)
124+
125+
equalPackages.forEach {
126+
updateReferences(curIndex, it)
127+
deletePackage(it)
128+
}
129+
130+
curJobData = DeduplicatePackagesJobData(
131+
lastPackageId = curIndex,
132+
deduplicatedPackages = curJobData.deduplicatedPackages + 1,
133+
removedDuplicates = curJobData.removedDuplicates + equalPackages.count()
134+
)
135+
136+
logger.info(
137+
"Finished deduplicating package with ID $curIndex. Deduplicated ${curJobData.deduplicatedPackages} " +
138+
"unique packages and removed ${curJobData.removedDuplicates} duplicates so far."
139+
)
140+
}
141+
142+
val remainingPackages = countRemainingPackages()
143+
144+
if (remainingPackages > 0L) {
145+
service.updateJob(jobId, serializeJobData(curJobData))
146+
logger.info("Remaining packages: $remainingPackages")
147+
} else {
148+
service.updateJob(jobId, serializeJobData(curJobData), MaintenanceJobStatus.FINISHED)
149+
logger.info(
150+
"Package deduplication finished. Deduplicated ${curJobData.deduplicatedPackages} unique packages and " +
151+
"removed ${curJobData.removedDuplicates} duplicates."
152+
)
153+
}
154+
}
155+
156+
/**
157+
* Find all packages that are equal to the package with the provided [pkgId]. Equal means that not only the values
158+
* of the columns are equal, but also the references to other tables.
159+
*/
160+
private fun findEqualPackages(pkgId: Long): List<Long> {
161+
val pkg = PackagesTable.selectAll().where { PackagesTable.id eq pkgId }.single()
162+
163+
val authors = PackagesAuthorsTable.getForPackage(pkgId)
164+
val declaredLicenses = PackagesDeclaredLicensesTable.getForPackage(pkgId)
165+
val processedDeclaredLicenses = ProcessedDeclaredLicensesTable.getForPackage(pkgId)
166+
167+
return PackagesTable.selectAll().where {
168+
PackagesTable.id neq pkgId and
169+
(PackagesTable.identifierId eq pkg[PackagesTable.identifierId]) and
170+
(PackagesTable.vcsId eq pkg[PackagesTable.vcsId]) and
171+
(PackagesTable.vcsProcessedId eq pkg[PackagesTable.vcsProcessedId]) and
172+
(PackagesTable.binaryArtifactId eq pkg[PackagesTable.binaryArtifactId]) and
173+
(PackagesTable.sourceArtifactId eq pkg[PackagesTable.sourceArtifactId]) and
174+
(PackagesTable.purl eq pkg[PackagesTable.purl]) and
175+
(PackagesTable.cpe eq pkg[PackagesTable.cpe]) and
176+
(PackagesTable.description eq pkg[PackagesTable.description]) and
177+
(PackagesTable.homepageUrl eq pkg[PackagesTable.homepageUrl]) and
178+
(PackagesTable.isMetadataOnly eq pkg[PackagesTable.isMetadataOnly]) and
179+
(PackagesTable.isModified eq pkg[PackagesTable.isModified])
180+
}.map { it[PackagesTable.id].value }
181+
.filter { PackagesAuthorsTable.getForPackage(it) == authors }
182+
.filter { PackagesDeclaredLicensesTable.getForPackage(it) == declaredLicenses }
183+
.filter { ProcessedDeclaredLicensesTable.getForPackage(it) == processedDeclaredLicenses }
184+
}
185+
186+
/**
187+
* Update all references to the [duplicatePkgId] to point to the [pkgId] instead.
188+
*/
189+
private fun updateReferences(pkgId: Long, duplicatePkgId: Long) {
190+
PackagesAnalyzerRunsTable.update({ PackagesAnalyzerRunsTable.packageId eq duplicatePkgId }) {
191+
it[packageId] = pkgId
192+
}
193+
}
194+
195+
/**
196+
* Delete the package with the provided [pkgId].
197+
*/
198+
private fun deletePackage(pkgId: Long) {
199+
logger.info("Deleting entries from packages_authors for package with ID $pkgId.")
200+
PackagesAuthorsTable.deleteWhere { packageId eq pkgId }
201+
logger.info("Deleting entries from packages_declared_licenses for package with ID $pkgId.")
202+
PackagesDeclaredLicensesTable.deleteWhere { packageId eq pkgId }
203+
204+
logger.info("Deleting processed declared licenses for package with ID $pkgId.")
205+
ProcessedDeclaredLicensesTable.select(ProcessedDeclaredLicensesTable.id)
206+
.where { ProcessedDeclaredLicensesTable.packageId eq pkgId }
207+
.forEach { processedDeclaredLicense ->
208+
val id = processedDeclaredLicense[ProcessedDeclaredLicensesTable.id].value
209+
210+
ProcessedDeclaredLicensesMappedDeclaredLicensesTable.deleteWhere {
211+
processedDeclaredLicenseId eq id
212+
}
213+
214+
ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable.deleteWhere {
215+
processedDeclaredLicenseId eq id
216+
}
217+
}
218+
219+
logger.info("Deleting declared licenses for package with ID $pkgId.")
220+
ProcessedDeclaredLicensesTable.deleteWhere { packageId eq pkgId }
221+
222+
logger.info("Deleting package with ID $pkgId.")
223+
PackagesTable.deleteWhere { id eq pkgId }
224+
}
225+
226+
/**
227+
* Count the number of remaining packages to process.
228+
*/
229+
private fun countRemainingPackages() =
230+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
231+
PackagesTable.selectAll().where { PackagesTable.id greater curIndex }.count()
232+
}
233+
}
234+
235+
private fun deserializeJobData(data: JsonObject) =
236+
try {
237+
json.decodeFromJsonElement<DeduplicatePackagesJobData>(data)
238+
} catch (e: SerializationException) {
239+
logger.error("Could not deserialize job data, starting from the beginning.", e)
240+
null
241+
}
242+
243+
private fun serializeJobData(data: DeduplicatePackagesJobData) = json.encodeToJsonElement(data).jsonObject
244+
245+
private fun PackagesAuthorsTable.getForPackage(pkgId: Long): Set<Long> =
246+
select(authorId)
247+
.where { packageId eq pkgId }
248+
.orderBy(packageId)
249+
.mapTo(mutableSetOf()) { it[authorId].value }
250+
251+
private fun PackagesDeclaredLicensesTable.getForPackage(pkgId: Long): Set<Long> =
252+
select(declaredLicenseId)
253+
.where { packageId eq pkgId }
254+
.orderBy(packageId)
255+
.mapTo(mutableSetOf()) { it[declaredLicenseId].value }
256+
257+
private fun ProcessedDeclaredLicensesTable.getForPackage(pkgId: Long): Set<ProcessedDeclaredLicense> =
258+
selectAll()
259+
.where { packageId eq pkgId }
260+
.orderBy(packageId)
261+
.mapTo(mutableSetOf()) { processedDeclaredLicense ->
262+
val id = processedDeclaredLicense[id].value
263+
val spdxExpression = processedDeclaredLicense[spdxExpression]
264+
265+
val mappedLicenses =
266+
ProcessedDeclaredLicensesMappedDeclaredLicensesTable.getForProcessedDeclaredLicense(id)
267+
268+
val unmappedLicenses =
269+
ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable.getForProcessedDeclaredLicense(id)
270+
271+
ProcessedDeclaredLicense(spdxExpression, mappedLicenses, unmappedLicenses)
272+
}
273+
274+
private fun ProcessedDeclaredLicensesMappedDeclaredLicensesTable.getForProcessedDeclaredLicense(
275+
processedDeclaredLicense: Long
276+
) = (this innerJoin MappedDeclaredLicensesTable)
277+
.select(MappedDeclaredLicensesTable.declaredLicense, MappedDeclaredLicensesTable.mappedLicense)
278+
.where { processedDeclaredLicenseId eq processedDeclaredLicense }
279+
.associate {
280+
it[MappedDeclaredLicensesTable.declaredLicense] to it[MappedDeclaredLicensesTable.mappedLicense]
281+
}
282+
283+
private fun ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable.getForProcessedDeclaredLicense(
284+
processedDeclaredLicense: Long
285+
) = (this innerJoin UnmappedDeclaredLicensesTable)
286+
.select(UnmappedDeclaredLicensesTable.unmappedLicense)
287+
.where { processedDeclaredLicenseId eq processedDeclaredLicense }
288+
.mapTo(mutableSetOf()) { it[UnmappedDeclaredLicensesTable.unmappedLicense] }

0 commit comments

Comments
 (0)