Skip to content

Commit 8aed396

Browse files
committed
fix(events): SNS universal target message attributes
1 parent 04fd34f commit 8aed396

2 files changed

Lines changed: 61 additions & 10 deletions

File tree

src/main/java/io/github/hectorvent/floci/services/scheduler/ScheduleInvoker.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.github.hectorvent.floci.services.scheduler.model.Target;
1111
import io.github.hectorvent.floci.services.sns.SnsService;
1212
import io.github.hectorvent.floci.services.sqs.SqsService;
13+
import io.github.hectorvent.floci.services.sqs.model.MessageAttributeValue;
1314
import jakarta.enterprise.context.ApplicationScoped;
1415
import jakarta.inject.Inject;
1516
import org.jboss.logging.Logger;
@@ -114,8 +115,9 @@ private void invokeUniversalTarget(String serviceAction, String input, String re
114115
String subject = text(params, "Subject");
115116
String messageGroupId = text(params, "MessageGroupId");
116117
String messageDeduplicationId = text(params, "MessageDeduplicationId");
118+
Map<String, MessageAttributeValue> messageAttributes = parseMessageAttributes(params);
117119
String snsRegion = extractRegion(topicArn != null ? topicArn : targetArn, region);
118-
snsService.publish(topicArn, targetArn, null, message, subject, null,
120+
snsService.publish(topicArn, targetArn, null, message, subject, messageAttributes,
119121
messageGroupId, messageDeduplicationId, snsRegion);
120122
LOG.debugv("Scheduler delivered to SNS (universal target): {0}", topicArn);
121123
}
@@ -131,6 +133,20 @@ private void invokeUniversalTarget(String serviceAction, String input, String re
131133
}
132134
}
133135

136+
private Map<String, MessageAttributeValue> parseMessageAttributes(JsonNode params) {
137+
JsonNode attrsNode = params.path("MessageAttributes");
138+
if (!attrsNode.isObject()) {
139+
return null;
140+
}
141+
Map<String, MessageAttributeValue> attributes = new HashMap<>();
142+
attrsNode.fields().forEachRemaining(entry -> {
143+
String dataType = entry.getValue().path("DataType").asText("String");
144+
String stringValue = entry.getValue().path("StringValue").asText(null);
145+
attributes.put(entry.getKey(), new MessageAttributeValue(stringValue, dataType));
146+
});
147+
return attributes.isEmpty() ? null : attributes;
148+
}
149+
134150
private static String text(JsonNode node, String field) {
135151
JsonNode value = node.get(field);
136152
return value != null && !value.isNull() ? value.asText() : null;

src/test/java/io/github/hectorvent/floci/services/scheduler/ScheduleInvokerTest.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,14 @@
88
import io.github.hectorvent.floci.services.scheduler.model.Target;
99
import io.github.hectorvent.floci.services.sns.SnsService;
1010
import io.github.hectorvent.floci.services.sqs.SqsService;
11+
import io.github.hectorvent.floci.services.sqs.model.MessageAttributeValue;
1112
import org.junit.jupiter.api.BeforeEach;
1213
import org.junit.jupiter.api.Test;
1314

14-
import static org.mockito.ArgumentMatchers.any;
15-
import static org.mockito.ArgumentMatchers.anyInt;
16-
import static org.mockito.ArgumentMatchers.anyString;
17-
import static org.mockito.ArgumentMatchers.eq;
18-
import static org.mockito.ArgumentMatchers.isNull;
19-
import static org.mockito.Mockito.mock;
20-
import static org.mockito.Mockito.never;
21-
import static org.mockito.Mockito.verify;
22-
import static org.mockito.Mockito.when;
15+
import java.util.Map;
16+
17+
import static org.mockito.ArgumentMatchers.*;
18+
import static org.mockito.Mockito.*;
2319

2420
class ScheduleInvokerTest {
2521

@@ -43,6 +39,45 @@ void setUp() {
4339
eventBridgeService, new ObjectMapper(), config);
4440
}
4541

42+
@Test
43+
void universalSnsPublishForwardsMessageAttributes() {
44+
Target target = new Target();
45+
target.setArn("arn:aws:scheduler:::aws-sdk:sns:publish");
46+
target.setRoleArn("arn:aws:iam::000000000000:role/x");
47+
target.setInput("{\"TopicArn\":\"" + TOPIC_ARN + "\","
48+
+ "\"Message\":\"{}\","
49+
+ "\"Subject\":\"my-subject\","
50+
+ "\"MessageAttributes\":{"
51+
+ "\"EventName\":{\"DataType\":\"String\",\"StringValue\":\"my-subject\"}"
52+
+ "}}");
53+
54+
invoker.invoke(target, "us-east-1");
55+
56+
verify(snsService).publish(
57+
eq(TOPIC_ARN), isNull(), isNull(),
58+
eq("{}"), eq("my-subject"),
59+
argThat((Map<String, MessageAttributeValue> attrs) ->
60+
attrs != null
61+
&& attrs.containsKey("EventName")
62+
&& "my-subject".equals(attrs.get("EventName").getStringValue())
63+
&& "String".equals(attrs.get("EventName").getDataType())),
64+
isNull(), isNull(), eq("us-east-1"));
65+
}
66+
67+
@Test
68+
void universalSnsPublishWithNoMessageAttributesPassesNull() {
69+
Target target = new Target();
70+
target.setArn("arn:aws:scheduler:::aws-sdk:sns:publish");
71+
target.setRoleArn("arn:aws:iam::000000000000:role/x");
72+
target.setInput("{\"TopicArn\":\"" + TOPIC_ARN + "\",\"Message\":\"hello\"}");
73+
74+
invoker.invoke(target, "us-east-1");
75+
76+
verify(snsService).publish(eq(TOPIC_ARN), isNull(), isNull(),
77+
eq("hello"), isNull(), isNull(),
78+
isNull(), isNull(), eq("us-east-1"));
79+
}
80+
4681
@Test
4782
void universalSnsPublishReadsTopicArnAndMessageFromInput() {
4883
Target target = new Target();

0 commit comments

Comments
 (0)