Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/resources/application-standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ notifier {
notification-schema-name = ${NOTIFICATION_POSTGRES_SCHEMA_NAME}
tables {
key-notification-table-name = ${KEY_NOTIFICATION_POSTGRES_TABLE_NAME}
producer-key-notification-table-name = ${PRODUCER_KEY_NOTIFICATION_POSTGRES_TABLE_NAME}
}
connectionPool = "HikariCP"
dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ notifier {
notification-schema-name = ${NOTIFICATION_POSTGRES_SCHEMA_NAME}
tables {
key-notification-table-name = ${KEY_NOTIFICATION_POSTGRES_TABLE_NAME}
producer-key-notification-table-name = ${PRODUCER_KEY_NOTIFICATION_POSTGRES_TABLE_NAME}
}
connectionPool = "HikariCP"
dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
Expand Down
49 changes: 48 additions & 1 deletion src/main/resources/interface-specification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,54 @@ paths:
content:
application/problem+json:
schema:
$ref: '#/components/schemas/Problem'
$ref: '#/components/schemas/Problem'
'/events/producerKeys':
parameters:
- $ref: '#/components/parameters/CorrelationIdHeader'
- $ref: '#/components/parameters/IpAddress'
get:
tags:
- events
summary: Get list of the producer keys events
description: Retrieves the list of the keys events
operationId: getProducerKeysEvents
parameters:
- name: lastEventId
in: query
description: returns events starting from this last received id
required: true
schema:
type: integer
format: int64
default: 0
- name: limit
in: query
description: the number of events returned by this response
schema:
type: integer
format: int32
minimum: 1
maximum: 500
default: 100
responses:
'200':
description: Events
content:
application/json:
schema:
$ref: '#/components/schemas/Events'
'400':
description: Bad request
content:
application/problem+json:
schema:
$ref: '#/components/schemas/Problem'
'401':
description: Unauthorized
content:
application/problem+json:
schema:
$ref: '#/components/schemas/Problem'
components:
parameters:
CorrelationIdHeader:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@ import it.pagopa.interop.commons.utils.AkkaUtils.getOrganizationIdFutureUUID
import it.pagopa.interop.commons.utils.errors.ServiceCode
import it.pagopa.interop.notifier.api.EventsApiService
import it.pagopa.interop.notifier.api.impl.ResponseHandlers.{
getAllAgreementsEventsFromIdResponse,
getAllEservicesFromIdResponse,
getEventsFromIdResponse,
getKeyEventsResponse,
getAllAgreementsEventsFromIdResponse
getProducerKeyEventsResponse
}
import it.pagopa.interop.notifier.database.{
AuthorizationEventsDao,
KeyEventRecord,
ProducerKeyEventRecord,
ProducerKeyEventsDao
}
import it.pagopa.interop.notifier.database.{AuthorizationEventsDao, KeyEventRecord}
import it.pagopa.interop.notifier.model._
import it.pagopa.interop.notifier.model.Adapters._
import it.pagopa.interop.notifier.service.converters.{allOrganizations, agreementsPartition}
import it.pagopa.interop.notifier.model._
import it.pagopa.interop.notifier.service.converters.{agreementsPartition, allOrganizations}
import it.pagopa.interop.notifier.service.impl.DynamoNotificationService

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -99,18 +105,46 @@ final class EventsServiceApiImpl(dynamoNotificationService: DynamoNotificationSe
toEntityMarshallerEvents: ToEntityMarshaller[Events],
toEntityMarshallerProblem: ToEntityMarshaller[Problem]
): Route = {
val operationLabel = s"Retrieving $limit keys messages from id $lastEventId"
val operationLabel = s"Retrieving $limit producer keys messages from id $lastEventId"
val result: Future[Events] = AuthorizationEventsDao
.select(lastEventId, limit)
.map(convertToEvents)
.map(convertKeyRecordToEvents)

onComplete(result) {
getKeyEventsResponse[Events](operationLabel)(getKeysEvents200)
}

}

private def convertToEvents(records: Seq[KeyEventRecord]): Events = {
private def convertKeyRecordToEvents(records: Seq[KeyEventRecord]): Events = {
val events: Seq[Event] = records.map(record =>
Event(
eventId = record.eventId,
eventType = record.eventType.toString,
objectType = ObjectType.KEY,
objectId = Map("kid" -> record.kid)
)
)

Events(lastEventId = records.lastOption.map(_.eventId), events = events)
}

override def getProducerKeysEvents(lastEventId: Long, limit: Int)(implicit
contexts: Seq[(String, String)],
toEntityMarshallerEvents: ToEntityMarshaller[Events],
toEntityMarshallerProblem: ToEntityMarshaller[Problem]
): Route = {
val operationLabel = s"Retrieving $limit keys messages from id $lastEventId"
val result: Future[Events] = ProducerKeyEventsDao
.select(lastEventId, limit)
.map(convertProducerKeyRecordToEvents)

onComplete(result) {
getProducerKeyEventsResponse[Events](operationLabel)(getProducerKeysEvents200)
}
}

private def convertProducerKeyRecordToEvents(records: Seq[ProducerKeyEventRecord]): Events = {
val events: Seq[Event] = records.map(record =>
Event(
eventId = record.eventId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ object ResponseHandlers extends AkkaResponses {
case Failure(ex) => internalServerError(ex, logMessage)
}

def getProducerKeyEventsResponse[T](logMessage: String)(
success: T => Route
)(result: Try[T])(implicit contexts: Seq[(String, String)], logger: LoggerTakingImplicit[ContextFieldsToLog]): Route =
result match {
case Success(s) => success(s)
case Failure(ex) => internalServerError(ex, logMessage)
}
def getAllAgreementsEventsFromIdResponse[T](logMessage: String)(
success: T => Route
)(result: Try[T])(implicit contexts: Seq[(String, String)], logger: LoggerTakingImplicit[ContextFieldsToLog]): Route =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,10 @@ object ApplicationConfiguration {
s"$schema.$table"
}

val postgresProducerKeysNotificationTable: String = {
val schema = config.getString("notifier.postgres.notification-schema-name")
val table = config.getString("notifier.postgres.tables.producer-key-notification-table-name")
s"$schema.$table"
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package it.pagopa.interop.notifier.database

import it.pagopa.interop.notifier.service.converters.EventType
import it.pagopa.interop.notifier.service.converters.EventType.EventType
import slick.jdbc.GetResult

final case class ProducerKeyEventRecord(eventId: Long, kid: String, eventType: EventType)

object ProducerKeyEventRecord {
implicit val eventResult: GetResult[ProducerKeyEventRecord] =
GetResult(r => ProducerKeyEventRecord(r.<<, r.<<, EventType.withName(r.<<)))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package it.pagopa.interop.notifier.database

import com.typesafe.scalalogging.Logger
import it.pagopa.interop.notifier.common.system.ApplicationConfiguration
import it.pagopa.interop.notifier.common.system.ApplicationConfiguration.postgresProducerKeysNotificationTable
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.PostgresProfile.api._
import slick.sql.SqlStreamingAction

import scala.concurrent.Future

object ProducerKeyEventsDao {

private val logger: Logger = Logger(this.getClass)

private final val postgresqlDB: Database =
Database.forConfig(path = "notifier.postgres", config = ApplicationConfiguration.config)

def select(lastEventId: Long, limit: Int): Future[Vector[ProducerKeyEventRecord]] = {
logger.debug(s"Getting keys events from lastEventId ${lastEventId.toString} with limit ${limit.toString}")

val statement: SqlStreamingAction[Vector[ProducerKeyEventRecord], ProducerKeyEventRecord, Effect] =
sql"SELECT event_id, kid, event_type FROM #$postgresProducerKeysNotificationTable WHERE event_id > $lastEventId ORDER BY event_id ASC LIMIT $limit"
.as[ProducerKeyEventRecord]
postgresqlDB.run(statement)
}

}
Loading