Skip to content

Commit 0ba9e46

Browse files
committed
suggestions to improve new SQS handler
1 parent 8df642b commit 0ba9e46

File tree

4 files changed

+111
-149
lines changed

4 files changed

+111
-149
lines changed

handlers/zuora-callout-apis/src/main/scala/com/gu/autoCancel/AutoCancelHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ object AutoCancelHandler extends App with Logging {
4444
AutoCancelSteps(
4545
AutoCancel.apply(zuoraRequest),
4646
cancelRequestsProducer,
47-
ZuoraEmailSteps.sendEmailRegardingAccount(
47+
new ZuoraEmailSteps(
4848
EmailSendSteps(awsSQSSend(EmailQueueName)),
4949
ZuoraGetInvoiceTransactions(ZuoraRestRequestMaker(response, zuoraRestConfig)),
5050
),
Lines changed: 91 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,35 @@
11
package com.gu.autoCancel
22

3-
import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
43
import com.amazonaws.services.lambda.runtime.events.SQSEvent
5-
import com.gu.effects.sqs.AwsSQSSend.{EmailQueueName, Payload, QueueName}
4+
import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
5+
import com.gu.autoCancel.GetPaymentData.PaymentFailureInformation
6+
import com.gu.effects.sqs.AwsSQSSend.EmailQueueName
67
import com.gu.effects.sqs.SqsSync
78
import com.gu.effects.{GetFromS3, RawEffects}
89
import com.gu.util.Logging
9-
import com.gu.util.apigateway.{ApiGatewayResponse, Auth}
10+
import com.gu.util.apigateway.ApiGatewayResponse.unauthorized
11+
import com.gu.util.apigateway.Auth
1012
import com.gu.util.apigateway.Auth.TrustedApiConfig
11-
import com.gu.util.config.LoadConfigModule
12-
import com.gu.util.config.LoadConfigModule.StringFromS3
13-
import com.gu.util.config.Stage
14-
import com.gu.util.email.EmailSendSteps
13+
import com.gu.util.config.{ConfigReads, LoadConfigModule}
14+
import com.gu.util.email.{EmailId, EmailSendSteps}
1515
import com.gu.util.reader.Types._
16+
import com.gu.util.resthttp.RestRequestMaker
1617
import com.gu.util.zuora._
17-
import okhttp3.{Request, Response}
1818
import play.api.libs.json.{Json, Reads}
1919

2020
import java.time.LocalDateTime
2121
import scala.jdk.CollectionConverters._
22-
import scala.util.Try
22+
import scala.util.{Failure, Try}
23+
24+
/** Represents the message format from ApiGatewayToSqs */
25+
case class ApiGatewayToSqsMessage(
26+
queryStringParameters: Map[String, String],
27+
body: String,
28+
)
29+
30+
object ApiGatewayToSqsMessage {
31+
implicit val reads: Reads[ApiGatewayToSqsMessage] = Json.reads[ApiGatewayToSqsMessage]
32+
}
2333

2434
/** Processes auto-cancel requests from SQS. Triggered by an EventSourceMapping with maxConcurrency: 5 to avoid Zuora
2535
* rate limits.
@@ -29,23 +39,23 @@ import scala.util.Try
2939
*/
3040
class AutoCancelSqsHandler extends RequestHandler[SQSEvent, Unit] with Logging {
3141

32-
/** Represents the message format from ApiGatewayToSqs */
33-
case class ApiGatewayToSqsMessage(
34-
queryStringParameters: Map[String, String],
35-
body: String,
36-
)
37-
38-
object ApiGatewayToSqsMessage {
39-
implicit val reads: Reads[ApiGatewayToSqsMessage] = Json.reads[ApiGatewayToSqsMessage]
40-
}
41-
42+
// main entry point from AWS lambda
4243
override def handleRequest(event: SQSEvent, context: Context): Unit = {
4344
val records = event.getRecords.asScala.toList
4445
logger.info(s"Processing ${records.size} SQS message(s)")
4546

46-
val results = records.map(processRecord)
47+
val results = for {
48+
record <- records
49+
} yield {
50+
for {
51+
parsedCallout <- parseRecord(record)
52+
(zuoraCalloutRecord, apiToken) = parsedCallout
53+
processor <- ProcessCalloutSteps.build().toTry
54+
_ <- processor.execute(zuoraCalloutRecord, apiToken).toTry(())
55+
} yield ()
56+
}
4757

48-
val failures = results.collect { case Left(error) => error }
58+
val failures = results.collect { case Failure(error) => error }
4959
if (failures.nonEmpty) {
5060
logger.error(s"${failures.size} message(s) failed to process")
5161
// Throw exception to trigger retry/DLQ
@@ -55,84 +65,69 @@ class AutoCancelSqsHandler extends RequestHandler[SQSEvent, Unit] with Logging {
5565
logger.info(s"Successfully processed ${records.size} message(s)")
5666
}
5767

58-
private def processRecord(record: SQSEvent.SQSMessage): Either[String, Unit] = {
68+
private def parseRecord(record: SQSEvent.SQSMessage): Try[(AutoCancelCallout, String)] = {
5969
val messageId = record.getMessageId
60-
logger.info(s"Processing SQS message: $messageId")
61-
62-
Try {
63-
val rawBody = record.getBody
64-
logger.info(s"Message body: $rawBody")
65-
66-
// Parse the ApiGatewayToSqs envelope
67-
val envelope = Json.parse(rawBody).as[ApiGatewayToSqsMessage]
68-
val maybeApiToken = envelope.queryStringParameters.get("apiToken")
69-
val calloutBody = envelope.body
70-
71-
// Parse the actual callout
72-
val calloutResult = Json.parse(calloutBody).validate[AutoCancelCallout]
73-
74-
calloutResult.fold(
75-
errors => {
76-
val errorMsg = s"Failed to parse callout from message $messageId: $errors"
77-
logger.error(errorMsg)
78-
Left(errorMsg)
79-
},
80-
callout => {
81-
logger.info(s"Processing auto-cancel for account: ${callout.accountId}, invoice: ${callout.invoiceId}")
82-
processCallout(callout, maybeApiToken) match {
83-
case Right(_) =>
84-
logger.info(s"Successfully processed message $messageId")
85-
Right(())
86-
case Left(error) =>
87-
logger.error(s"Failed to process message $messageId: $error")
88-
Left(error)
89-
}
90-
},
70+
val rawBody = record.getBody
71+
logger.info(s"$messageId: Message body: $rawBody")
72+
73+
for {
74+
originalApiGatewayEvent <- Try(Json.parse(rawBody).as[ApiGatewayToSqsMessage])
75+
apiToken <- originalApiGatewayEvent.queryStringParameters
76+
.get("apiToken")
77+
.toRight(new RuntimeException("no apiToken header, untrusted content"))
78+
.toTry
79+
httpRequestBody = originalApiGatewayEvent.body
80+
zuoraCalloutRecord <- Try(Json.parse(httpRequestBody).as[AutoCancelCallout])
81+
_ = logger.info(
82+
s"Processing auto-cancel for account: ${zuoraCalloutRecord.accountId}, invoice: ${zuoraCalloutRecord.invoiceId}",
9183
)
92-
}.toEither.left.map { e =>
93-
val errorMsg = s"Exception processing message $messageId: ${e.getMessage}"
94-
logger.error(errorMsg, e)
95-
errorMsg
96-
}.flatten
84+
} yield (zuoraCalloutRecord, apiToken)
85+
9786
}
9887

99-
private def processCallout(
100-
callout: AutoCancelCallout,
101-
maybeApiToken: Option[String],
102-
): Either[String, Unit] = {
88+
}
89+
90+
object ProcessCalloutSteps extends Logging {
91+
92+
def build(): Either[ConfigReads.ConfigFailure, ProcessCalloutSteps] = {
10393
val stage = RawEffects.stage
10494
val fetchString = GetFromS3.fetchString _
10595
val response = RawEffects.response
10696
val now = RawEffects.now
10797
val sqsSend = SqsSync.send(SqsSync.buildClient) _
108-
109-
processCalloutWithEffects(stage, fetchString, response, now, sqsSend)(callout, maybeApiToken) match {
110-
case ApiGatewayOp.ContinueProcessing(_) => Right(())
111-
case ApiGatewayOp.ReturnWithResponse(resp) =>
112-
if (resp.statusCode.startsWith("2")) Right(())
113-
else Left(s"Processing returned non-success response: ${resp.statusCode} - ${resp.body}")
114-
}
115-
}
116-
117-
private def processCalloutWithEffects(
118-
stage: Stage,
119-
fetchString: StringFromS3,
120-
response: Request => Response,
121-
now: () => LocalDateTime,
122-
awsSQSSend: QueueName => Payload => Try[Unit],
123-
)(callout: AutoCancelCallout, maybeApiToken: Option[String]): ApiGatewayOp[Unit] = {
12498
val loadConfigModule = LoadConfigModule(stage, fetchString)
12599

126100
for {
127-
// Load and validate authentication
128-
trustedApiConfig <- loadConfigModule.load[TrustedApiConfig].toApiGatewayOp("load trusted Api config")
129-
_ <- validateAuth(trustedApiConfig, maybeApiToken)
130-
131-
zuoraRestConfig <- loadConfigModule.load[ZuoraRestConfig].toApiGatewayOp("load zuora config")
101+
zuoraRestConfig <- loadConfigModule.load[ZuoraRestConfig]
132102
_ = logger.info(s"Loaded Zuora config for stage: $stage")
133103

134104
zuoraRequest = ZuoraRestRequestMaker(response, zuoraRestConfig)
135105

106+
zuoraEmailSteps = new ZuoraEmailSteps(
107+
EmailSendSteps(sqsSend(EmailQueueName)),
108+
ZuoraGetInvoiceTransactions(ZuoraRestRequestMaker(response, zuoraRestConfig)),
109+
)
110+
trustedApiConfig <- loadConfigModule.load[TrustedApiConfig]
111+
112+
} yield new ProcessCalloutSteps(zuoraRequest, now, trustedApiConfig, zuoraEmailSteps)
113+
}
114+
115+
}
116+
117+
class ProcessCalloutSteps(
118+
zuoraRequest: RestRequestMaker.Requests,
119+
now: () => LocalDateTime,
120+
trustedApiConfig: TrustedApiConfig,
121+
zuoraEmailSteps: ZuoraEmailSteps,
122+
) {
123+
def execute(autoCancelCallout: AutoCancelCallout, apiToken: String): ApiGatewayOp[Unit] = {
124+
125+
for {
126+
_ <- Auth
127+
.credentialsAreValid(trustedApiConfig, Auth.RequestAuth(Some(apiToken)))
128+
.toApiGatewayContinueProcessing(unauthorized)
129+
.withLogging("authentication")
130+
136131
cancelRequestsProducer = AutoCancelDataCollectionFilter(
137132
now().toLocalDate,
138133
ZuoraGetAccountSummary(zuoraRequest),
@@ -141,64 +136,26 @@ class AutoCancelSqsHandler extends RequestHandler[SQSEvent, Unit] with Logging {
141136
) _
142137

143138
// Check if we should process this callout
144-
_ <- AutoCancelInputFilter(callout, onlyCancelDirectDebit = false)
139+
_ <- AutoCancelInputFilter(autoCancelCallout, onlyCancelDirectDebit = false)
145140

146141
// Get the auto-cancel requests
147-
autoCancelRequests <- cancelRequestsProducer(callout).withLogging(
148-
s"auto-cancellation requests for ${callout.accountId}",
142+
autoCancelRequests <- cancelRequestsProducer(autoCancelCallout).withLogging(
143+
s"auto-cancellation requests for ${autoCancelCallout.accountId}",
149144
)
150145

151146
// Execute the cancellation
152-
_ <- AutoCancel
153-
.apply(zuoraRequest)(
154-
autoCancelRequests,
155-
AutoCancelSteps.AutoCancelUrlParams(onlyCancelDirectDebit = false, dryRun = false),
156-
)
157-
.withLogging(s"auto-cancellation for ${callout.accountId}")
147+
_ <- AutoCancel(zuoraRequest)(
148+
autoCancelRequests,
149+
AutoCancelSteps.AutoCancelUrlParams(onlyCancelDirectDebit = false, dryRun = false),
150+
)
151+
.withLogging(s"auto-cancellation for ${autoCancelCallout.accountId}")
158152

159-
// Send email notification
160-
_ = sendEmailNotification(callout, response, zuoraRestConfig, awsSQSSend)
153+
request = ToMessage(autoCancelCallout, _: PaymentFailureInformation, EmailId.cancelledId)
154+
_ <- zuoraEmailSteps
155+
.sendEmailRegardingAccount(autoCancelCallout.accountId, request)
156+
.toDisjunction
157+
.toApiGatewayOp("send email")
161158

162159
} yield ()
163160
}
164-
165-
private def validateAuth(trustedApiConfig: TrustedApiConfig, maybeApiToken: Option[String]): ApiGatewayOp[Unit] = {
166-
val requestAuth = Auth.RequestAuth(maybeApiToken)
167-
if (Auth.credentialsAreValid(trustedApiConfig, requestAuth)) {
168-
logger.info("Authentication successful")
169-
ApiGatewayOp.ContinueProcessing(())
170-
} else {
171-
logger.warn("Authentication failed: invalid or missing apiToken")
172-
ApiGatewayOp.ReturnWithResponse(ApiGatewayResponse.unauthorized)
173-
}
174-
}
175-
176-
private def sendEmailNotification(
177-
callout: AutoCancelCallout,
178-
response: Request => Response,
179-
zuoraRestConfig: ZuoraRestConfig,
180-
awsSQSSend: QueueName => Payload => Try[Unit],
181-
): Unit = {
182-
try {
183-
val toMessageFn: GetPaymentData.PaymentFailureInformation => Either[String, com.gu.util.email.EmailMessage] =
184-
paymentInfo => ToMessage(callout, paymentInfo, com.gu.util.email.EmailId.cancelledId)
185-
186-
val sendEmailResult = ZuoraEmailSteps.sendEmailRegardingAccount(
187-
EmailSendSteps(awsSQSSend(EmailQueueName)),
188-
ZuoraGetInvoiceTransactions(ZuoraRestRequestMaker(response, zuoraRestConfig)),
189-
)(
190-
callout.accountId,
191-
toMessageFn,
192-
)
193-
sendEmailResult match {
194-
case com.gu.util.resthttp.Types.ClientSuccess(_) =>
195-
logger.info(s"Successfully sent cancellation email for account ${callout.accountId}")
196-
case failure: com.gu.util.resthttp.Types.ClientFailure =>
197-
logger.warn(s"Failed to send cancellation email for account ${callout.accountId}: ${failure.message}")
198-
}
199-
} catch {
200-
case e: Exception =>
201-
logger.warn(s"Exception sending cancellation email for account ${callout.accountId}: ${e.getMessage}", e)
202-
}
203-
}
204161
}

handlers/zuora-callout-apis/src/main/scala/com/gu/autoCancel/AutoCancelSteps.scala

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,7 @@ object AutoCancelSteps extends Logging {
3838
def apply(
3939
callZuoraAutoCancel: (List[AutoCancelRequest], AutoCancelUrlParams) => ApiGatewayOp[Unit],
4040
autoCancelReqProducer: AutoCancelCallout => ApiGatewayOp[List[AutoCancelRequest]],
41-
sendEmailRegardingAccount: (
42-
String,
43-
PaymentFailureInformation => Either[String, EmailMessage],
44-
) => ClientFailableOp[Unit],
41+
zuoraEmailSteps: ZuoraEmailSteps,
4542
): Operation = Operation.noHealthcheck({ apiGatewayRequest: ApiGatewayRequest =>
4643
(for {
4744
autoCancelCallout <- apiGatewayRequest.bodyAsCaseClass[AutoCancelCallout]()
@@ -54,7 +51,7 @@ object AutoCancelSteps extends Logging {
5451
s"auto-cancellation for ${autoCancelCallout.accountId}",
5552
)
5653
request = makeRequest(autoCancelCallout) _
57-
_ <- handleSendPaymentFailureEmail(autoCancelCallout.accountId, request, sendEmailRegardingAccount, urlParams)
54+
_ <- handleSendPaymentFailureEmail(autoCancelCallout.accountId, request, zuoraEmailSteps, urlParams)
5855
} yield ApiGatewayResponse.successfulExecution).apiResponse
5956
})
6057

@@ -66,17 +63,14 @@ object AutoCancelSteps extends Logging {
6663
private def handleSendPaymentFailureEmail(
6764
accountId: String,
6865
request: PaymentFailureInformation => Either[String, EmailMessage],
69-
sendEmailRegardingAccount: (
70-
String,
71-
PaymentFailureInformation => Either[String, EmailMessage],
72-
) => ClientFailableOp[Unit],
66+
zuoraEmailSteps: ZuoraEmailSteps,
7367
urlParams: AutoCancelUrlParams,
7468
) = {
7569
if (urlParams.dryRun) {
7670
val msg = "DryRun of SendPaymentFailureEmail"
7771
logger.info(msg)
7872
ContinueProcessing(())
79-
} else logErrorsAndContinueProcessing(sendEmailRegardingAccount(accountId, request))
73+
} else logErrorsAndContinueProcessing(zuoraEmailSteps.sendEmailRegardingAccount(accountId, request))
8074
}
8175

8276
private def logErrorsAndContinueProcessing(clientFailableOp: ClientFailableOp[Unit]): ContinueProcessing[Unit] =
@@ -88,12 +82,15 @@ object AutoCancelSteps extends Logging {
8882
}
8983
}
9084

91-
object ZuoraEmailSteps {
85+
class ZuoraEmailSteps(
86+
sendEmail: EmailMessage => ClientFailableOp[Unit],
87+
getInvoiceTransactions: String => ClientFailableOp[InvoiceTransactionSummary],
88+
) {
9289

9390
def sendEmailRegardingAccount(
94-
sendEmail: EmailMessage => ClientFailableOp[Unit],
95-
getInvoiceTransactions: String => ClientFailableOp[InvoiceTransactionSummary],
96-
)(accountId: String, toMessage: PaymentFailureInformation => Either[String, EmailMessage]): ClientFailableOp[Unit] = {
91+
accountId: String,
92+
toMessage: PaymentFailureInformation => Either[String, EmailMessage],
93+
): ClientFailableOp[Unit] = {
9794
for {
9895
invoiceTransactionSummary <- getInvoiceTransactions(accountId)
9996
paymentInformation <- GetPaymentData(accountId)(invoiceTransactionSummary).left

lib/handler/src/main/scala/com/gu/util/reader/Types.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,18 @@ object Types extends Logging {
1515
case class ContinueProcessing[A](a: A) extends ApiGatewayOp[A] {
1616
override def toDisjunction: Either[ApiResponse, A] = Right(a)
1717

18+
override def toTry[B](a: B): Try[B] = Success(a)
19+
1820
override def isComplete: Boolean = false
1921
}
2022
case class ReturnWithResponse(resp: ApiResponse) extends ApiGatewayOp[Nothing] {
2123
override def toDisjunction: Either[ApiResponse, Nothing] = Left(resp)
2224

25+
override def toTry[B](a: B): Try[B] =
26+
if (resp.statusCode.startsWith("2")) Success(a)
27+
else
28+
Failure(new RuntimeException(s"Processing returned non-success response: ${resp.statusCode} - ${resp.body}"))
29+
2330
override def isComplete: Boolean = true
2431
}
2532

@@ -28,6 +35,7 @@ object Types extends Logging {
2835
def isComplete: Boolean
2936

3037
def toDisjunction: Either[ApiResponse, A]
38+
def toTry[B](a: B): Try[B]
3139

3240
def flatMap[B](f: A => ApiGatewayOp[B]): ApiGatewayOp[B] =
3341
toDisjunction.flatMap(f.andThen(_.toDisjunction)).toApiGatewayOp

0 commit comments

Comments
 (0)