Skip to content

Commit 8bc1647

Browse files
authored
Module: Rules engine (#4147)
1 parent 0dc4424 commit 8bc1647

File tree

177 files changed

+13677
-68
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

177 files changed

+13677
-68
lines changed

extra/bundle/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@
7070
<artifactId>live-intent-omni-channel-identity</artifactId>
7171
<version>${project.version}</version>
7272
</dependency>
73+
<dependency>
74+
<groupId>org.prebid.server.hooks.modules</groupId>
75+
<artifactId>pb-rule-engine</artifactId>
76+
<version>${project.version}</version>
77+
</dependency>
7378
</dependencies>
7479

7580
<build>
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
5+
<parent>
6+
<groupId>org.prebid.server.hooks.modules</groupId>
7+
<artifactId>all-modules</artifactId>
8+
<version>3.33.0-SNAPSHOT</version>
9+
</parent>
10+
11+
<artifactId>pb-rule-engine</artifactId>
12+
13+
<name>pb-rule-engine</name>
14+
<description>Rule engine module</description>
15+
</project>
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
lombok.anyConstructor.addConstructorProperties = true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package org.prebid.server.hooks.modules.rule.engine.config;
2+
3+
import com.iab.openrtb.request.BidRequest;
4+
import io.vertx.core.Vertx;
5+
import org.prebid.server.bidder.BidderCatalog;
6+
import org.prebid.server.execution.retry.ExponentialBackoffRetryPolicy;
7+
import org.prebid.server.hooks.execution.model.Stage;
8+
import org.prebid.server.hooks.modules.rule.engine.core.config.AccountConfigParser;
9+
import org.prebid.server.hooks.modules.rule.engine.core.config.RuleParser;
10+
import org.prebid.server.hooks.modules.rule.engine.core.config.StageConfigParser;
11+
import org.prebid.server.hooks.modules.rule.engine.core.request.RequestConditionalRuleFactory;
12+
import org.prebid.server.hooks.modules.rule.engine.core.request.RequestRuleContext;
13+
import org.prebid.server.hooks.modules.rule.engine.core.request.RequestStageSpecification;
14+
import org.prebid.server.hooks.modules.rule.engine.v1.PbRuleEngineModule;
15+
import org.prebid.server.json.ObjectMapperProvider;
16+
import org.springframework.beans.factory.annotation.Value;
17+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18+
import org.springframework.context.annotation.Bean;
19+
import org.springframework.context.annotation.Configuration;
20+
21+
import java.time.Clock;
22+
import java.util.concurrent.ThreadLocalRandom;
23+
import java.util.random.RandomGenerator;
24+
25+
@Configuration
26+
@ConditionalOnProperty(prefix = "hooks." + PbRuleEngineModule.CODE, name = "enabled", havingValue = "true")
27+
public class PbRuleEngineModuleConfiguration {
28+
29+
@Bean
30+
PbRuleEngineModule ruleEngineModule(RuleParser ruleParser,
31+
@Value("${datacenter-region:#{null}}") String datacenter) {
32+
33+
return new PbRuleEngineModule(ruleParser, datacenter);
34+
}
35+
36+
@Bean
37+
StageConfigParser<BidRequest, RequestRuleContext> processedAuctionRequestStageParser(
38+
BidderCatalog bidderCatalog) {
39+
40+
final RandomGenerator randomGenerator = () -> ThreadLocalRandom.current().nextLong();
41+
42+
return new StageConfigParser<>(
43+
randomGenerator,
44+
Stage.processed_auction_request,
45+
new RequestStageSpecification(ObjectMapperProvider.mapper(), bidderCatalog, randomGenerator),
46+
new RequestConditionalRuleFactory());
47+
}
48+
49+
@Bean
50+
AccountConfigParser accountConfigParser(
51+
StageConfigParser<BidRequest, RequestRuleContext> processedAuctionRequestStageParser) {
52+
53+
return new AccountConfigParser(ObjectMapperProvider.mapper(), processedAuctionRequestStageParser);
54+
}
55+
56+
@Bean
57+
RuleParser ruleParser(
58+
@Value("${hooks.pb-rule-engine.rule-cache.expire-after-minutes}") long cacheExpireAfterMinutes,
59+
@Value("${hooks.pb-rule-engine.rule-cache.max-size}") long cacheMaxSize,
60+
@Value("${hooks.pb-rule-engine.rule-parsing.retry-initial-delay-millis}") long delay,
61+
@Value("${hooks.pb-rule-engine.rule-parsing.retry-max-delay-millis}") long maxDelay,
62+
@Value("${hooks.pb-rule-engine.rule-parsing.retry-exponential-factor}") double factor,
63+
@Value("${hooks.pb-rule-engine.rule-parsing.retry-exponential-jitter}") double jitter,
64+
AccountConfigParser accountConfigParser,
65+
Vertx vertx,
66+
Clock clock) {
67+
68+
return new RuleParser(
69+
cacheExpireAfterMinutes,
70+
cacheMaxSize,
71+
ExponentialBackoffRetryPolicy.of(delay, maxDelay, factor, jitter),
72+
accountConfigParser,
73+
vertx,
74+
clock);
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.prebid.server.hooks.modules.rule.engine.core.config;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.databind.node.ObjectNode;
6+
import com.iab.openrtb.request.BidRequest;
7+
import org.prebid.server.exception.PreBidException;
8+
import org.prebid.server.hooks.modules.rule.engine.core.config.model.AccountConfig;
9+
import org.prebid.server.hooks.modules.rule.engine.core.request.RequestRuleContext;
10+
import org.prebid.server.hooks.modules.rule.engine.core.rules.NoOpRule;
11+
import org.prebid.server.hooks.modules.rule.engine.core.rules.PerStageRule;
12+
13+
import java.util.Objects;
14+
15+
public class AccountConfigParser {
16+
17+
private final ObjectMapper mapper;
18+
private final StageConfigParser<BidRequest, RequestRuleContext> processedAuctionRequestStageParser;
19+
20+
public AccountConfigParser(
21+
ObjectMapper mapper,
22+
StageConfigParser<BidRequest, RequestRuleContext> processedAuctionRequestStageParser) {
23+
24+
this.mapper = Objects.requireNonNull(mapper);
25+
this.processedAuctionRequestStageParser = Objects.requireNonNull(processedAuctionRequestStageParser);
26+
}
27+
28+
public PerStageRule parse(ObjectNode accountConfig) {
29+
final AccountConfig parsedConfig;
30+
try {
31+
parsedConfig = mapper.treeToValue(accountConfig, AccountConfig.class);
32+
} catch (JsonProcessingException e) {
33+
throw new PreBidException(e.getMessage());
34+
}
35+
36+
if (!parsedConfig.isEnabled()) {
37+
return PerStageRule.builder()
38+
.timestamp(parsedConfig.getTimestamp())
39+
.processedAuctionRequestRule(NoOpRule.create())
40+
.build();
41+
}
42+
43+
return PerStageRule.builder()
44+
.timestamp(parsedConfig.getTimestamp())
45+
.processedAuctionRequestRule(processedAuctionRequestStageParser.parse(parsedConfig))
46+
.build();
47+
}
48+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package org.prebid.server.hooks.modules.rule.engine.core.config;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.fasterxml.jackson.databind.node.ObjectNode;
5+
import com.github.benmanes.caffeine.cache.Caffeine;
6+
import io.vertx.core.Future;
7+
import io.vertx.core.Vertx;
8+
import org.apache.commons.lang3.ObjectUtils;
9+
import org.prebid.server.execution.retry.RetryPolicy;
10+
import org.prebid.server.execution.retry.Retryable;
11+
import org.prebid.server.hooks.modules.rule.engine.core.rules.PerStageRule;
12+
import org.prebid.server.log.Logger;
13+
import org.prebid.server.log.LoggerFactory;
14+
15+
import java.time.Clock;
16+
import java.time.Duration;
17+
import java.time.Instant;
18+
import java.time.format.DateTimeParseException;
19+
import java.util.Map;
20+
import java.util.Objects;
21+
import java.util.Optional;
22+
import java.util.concurrent.TimeUnit;
23+
24+
public class RuleParser {
25+
26+
private static final Logger logger = LoggerFactory.getLogger(RuleParser.class);
27+
28+
private final AccountConfigParser parser;
29+
private final Vertx vertx;
30+
private final Clock clock;
31+
32+
private final RetryPolicy retryPolicy;
33+
34+
private final Map<String, ParsingAttempt> accountIdToParsingAttempt;
35+
private final Map<String, PerStageRule> accountIdToRules;
36+
37+
public RuleParser(long cacheExpireAfterMinutes,
38+
long cacheMaxSize,
39+
RetryPolicy retryPolicy,
40+
AccountConfigParser parser,
41+
Vertx vertx,
42+
Clock clock) {
43+
44+
this.parser = Objects.requireNonNull(parser);
45+
this.vertx = Objects.requireNonNull(vertx);
46+
this.clock = Objects.requireNonNull(clock);
47+
this.retryPolicy = Objects.requireNonNull(retryPolicy);
48+
49+
this.accountIdToParsingAttempt = Caffeine.newBuilder()
50+
.expireAfterAccess(cacheExpireAfterMinutes, TimeUnit.MINUTES)
51+
.maximumSize(cacheMaxSize)
52+
.<String, ParsingAttempt>build()
53+
.asMap();
54+
55+
this.accountIdToRules = Caffeine.newBuilder()
56+
.expireAfterAccess(cacheExpireAfterMinutes, TimeUnit.MINUTES)
57+
.maximumSize(cacheMaxSize)
58+
.<String, PerStageRule>build()
59+
.asMap();
60+
}
61+
62+
public Future<PerStageRule> parseForAccount(String accountId, ObjectNode config) {
63+
final PerStageRule cachedRule = accountIdToRules.get(accountId);
64+
65+
if (cachedRule != null && cachedRule.timestamp().compareTo(getConfigTimestamp(config)) >= 0) {
66+
return Future.succeededFuture(cachedRule);
67+
}
68+
69+
parseConfig(accountId, config);
70+
return Future.succeededFuture(ObjectUtils.defaultIfNull(cachedRule, PerStageRule.noOp()));
71+
}
72+
73+
private Instant getConfigTimestamp(ObjectNode config) {
74+
try {
75+
return Optional.of(config)
76+
.map(node -> node.get("timestamp"))
77+
.filter(JsonNode::isTextual)
78+
.map(JsonNode::asText)
79+
.map(Instant::parse)
80+
.orElse(Instant.EPOCH);
81+
} catch (DateTimeParseException exception) {
82+
return Instant.EPOCH;
83+
}
84+
}
85+
86+
private void parseConfig(String accountId, ObjectNode config) {
87+
final Instant now = clock.instant();
88+
final ParsingAttempt attempt = accountIdToParsingAttempt.compute(
89+
accountId, (ignored, previousAttempt) -> tryRegisteringNewAttempt(previousAttempt, now));
90+
91+
// reference equality used on purpose - if references are equal - then we should parse
92+
if (attempt.timestamp() == now) {
93+
logger.info("Parsing rule for account {}", accountId);
94+
vertx.executeBlocking(() -> parser.parse(config))
95+
.onSuccess(result -> succeedParsingAttempt(accountId, result))
96+
.onFailure(error -> failParsingAttempt(accountId, attempt, error));
97+
}
98+
}
99+
100+
private ParsingAttempt tryRegisteringNewAttempt(ParsingAttempt previousAttempt, Instant currentAttemptStart) {
101+
if (previousAttempt == null) {
102+
return new ParsingAttempt.InProgress(currentAttemptStart, retryPolicy);
103+
}
104+
105+
if (previousAttempt instanceof ParsingAttempt.InProgress) {
106+
return previousAttempt;
107+
}
108+
109+
if (previousAttempt.retryPolicy() instanceof Retryable previousAttemptRetryPolicy) {
110+
final Instant previouslyDecidedToRetryAfter = previousAttempt.timestamp().plus(
111+
Duration.ofMillis(previousAttemptRetryPolicy.delay()));
112+
113+
return previouslyDecidedToRetryAfter.isBefore(currentAttemptStart)
114+
? new ParsingAttempt.InProgress(currentAttemptStart, previousAttemptRetryPolicy.next())
115+
: previousAttempt;
116+
}
117+
118+
return previousAttempt;
119+
}
120+
121+
private void succeedParsingAttempt(String accountId, PerStageRule result) {
122+
accountIdToRules.put(accountId, result);
123+
accountIdToParsingAttempt.remove(accountId);
124+
125+
logger.debug("Successfully parsed rule-engine config for account {}", accountId);
126+
}
127+
128+
private void failParsingAttempt(String accountId, ParsingAttempt attempt, Throwable cause) {
129+
accountIdToParsingAttempt.put(accountId, ((ParsingAttempt.InProgress) attempt).failed());
130+
131+
logger.error(
132+
"Failed to parse rule-engine config for account %s: %s".formatted(accountId, cause.getMessage()),
133+
cause);
134+
}
135+
136+
private sealed interface ParsingAttempt {
137+
138+
Instant timestamp();
139+
140+
RetryPolicy retryPolicy();
141+
142+
record Failed(Instant timestamp, RetryPolicy retryPolicy) implements ParsingAttempt {
143+
}
144+
145+
record InProgress(Instant timestamp, RetryPolicy retryPolicy) implements ParsingAttempt {
146+
147+
public Failed failed() {
148+
return new Failed(timestamp, retryPolicy);
149+
}
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)