Skip to content

Commit ee72213

Browse files
committed
Add embedded driver support for Aeron
1 parent b6eaeb1 commit ee72213

File tree

3 files changed

+39
-6
lines changed

3 files changed

+39
-6
lines changed

docs/guide/transport-aeron.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
1. 外置 Media Driver(推荐生产)
1717
2. Embedded Media Driver(开发/单机)
1818

19-
当前实现默认使用外置 Media Driver。
19+
当前实现默认使用外置 Media Driver,Java 支持 embedded 模式(仅开发/单机)
2020

2121
## 配置建议
2222
- `channel`: Aeron channel(ipc/udp)
@@ -30,6 +30,8 @@
3030
- `fragmentLimit`: 单次 poll 最大片段数(Java 默认 64)
3131
- `offerMaxAttempts`: offer 重试上限(Java 默认 10)
3232
- `idleStrategy`: 空转策略(Java 默认 BusySpin)
33+
- `embeddedDriver`: 是否启用 embedded driver(Java)
34+
- `dirDeleteOnStart/Shutdown`: embedded 模式是否清理目录(Java)
3335

3436
## 消息帧格式(v1)
3537
固定长度 56 bytes,偏移如下:

java/scripts/fetch-aeron.sh

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ AERON_VERSION="1.49.3"
88
AGRGONA_VERSION="2.3.2"
99

1010
CLIENT_JAR="$CACHE_DIR/aeron-client-${AERON_VERSION}.jar"
11+
DRIVER_JAR="$CACHE_DIR/aeron-driver-${AERON_VERSION}.jar"
1112
AGRGONA_JAR="$CACHE_DIR/agrona-${AGRGONA_VERSION}.jar"
1213

1314
mkdir -p "$CACHE_DIR"
@@ -16,8 +17,12 @@ if [ ! -f "$CLIENT_JAR" ]; then
1617
curl -sSL "https://repo1.maven.org/maven2/io/aeron/aeron-client/${AERON_VERSION}/aeron-client-${AERON_VERSION}.jar" -o "$CLIENT_JAR"
1718
fi
1819

20+
if [ ! -f "$DRIVER_JAR" ]; then
21+
curl -sSL "https://repo1.maven.org/maven2/io/aeron/aeron-driver/${AERON_VERSION}/aeron-driver-${AERON_VERSION}.jar" -o "$DRIVER_JAR"
22+
fi
23+
1924
if [ ! -f "$AGRGONA_JAR" ]; then
2025
curl -sSL "https://repo1.maven.org/maven2/org/agrona/agrona/${AGRGONA_VERSION}/agrona-${AGRGONA_VERSION}.jar" -o "$AGRGONA_JAR"
2126
fi
2227

23-
echo "${CLIENT_JAR}:${AGRGONA_JAR}"
28+
echo "${CLIENT_JAR}:${DRIVER_JAR}:${AGRGONA_JAR}"

java/src/main/java/io/epoch/AeronTransport.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.aeron.Aeron;
44
import io.aeron.Publication;
55
import io.aeron.Subscription;
6+
import io.aeron.driver.MediaDriver;
67
import io.aeron.logbuffer.Header;
78
import java.util.ArrayList;
89
import java.util.List;
@@ -19,7 +20,10 @@ public record AeronConfig(
1920
String aeronDirectory,
2021
int fragmentLimit,
2122
int offerMaxAttempts,
22-
IdleStrategy idleStrategy
23+
IdleStrategy idleStrategy,
24+
boolean embeddedDriver,
25+
boolean dirDeleteOnStart,
26+
boolean dirDeleteOnShutdown
2327
) {
2428
public AeronConfig {
2529
if (fragmentLimit <= 0) {
@@ -34,27 +38,46 @@ public record AeronConfig(
3438
}
3539

3640
public static AeronConfig defaults(String channel, int streamId, String aeronDirectory) {
37-
return new AeronConfig(channel, streamId, aeronDirectory, 64, 10, new BusySpinIdleStrategy());
41+
return new AeronConfig(channel, streamId, aeronDirectory, 64, 10, new BusySpinIdleStrategy(), false, false, false);
42+
}
43+
44+
public static AeronConfig embeddedDefaults(String channel, int streamId, String aeronDirectory) {
45+
return new AeronConfig(channel, streamId, aeronDirectory, 64, 10, new BusySpinIdleStrategy(), true, true, true);
3846
}
3947
}
4048

4149
private final AeronConfig config;
4250
private final Aeron aeron;
4351
private final Publication publication;
4452
private final Subscription subscription;
53+
private final MediaDriver mediaDriver;
4554
private final UnsafeBuffer sendBuffer;
4655
private final IdleStrategy idleStrategy;
4756
private final AeronStats stats = new AeronStats();
4857

4958
public AeronTransport(AeronConfig config) {
5059
this.config = config;
60+
MediaDriver driver = null;
61+
String aeronDir = config.aeronDirectory();
62+
if (config.embeddedDriver()) {
63+
MediaDriver.Context driverContext = new MediaDriver.Context();
64+
if (aeronDir != null && !aeronDir.isBlank()) {
65+
driverContext.aeronDirectoryName(aeronDir);
66+
}
67+
driverContext.dirDeleteOnStart(config.dirDeleteOnStart());
68+
driverContext.dirDeleteOnShutdown(config.dirDeleteOnShutdown());
69+
driver = MediaDriver.launch(driverContext);
70+
aeronDir = driverContext.aeronDirectoryName();
71+
}
72+
5173
Aeron.Context context = new Aeron.Context();
52-
if (config.aeronDirectory() != null && !config.aeronDirectory().isBlank()) {
53-
context.aeronDirectoryName(config.aeronDirectory());
74+
if (aeronDir != null && !aeronDir.isBlank()) {
75+
context.aeronDirectoryName(aeronDir);
5476
}
5577
this.aeron = Aeron.connect(context);
5678
this.publication = aeron.addPublication(config.channel(), config.streamId());
5779
this.subscription = aeron.addSubscription(config.channel(), config.streamId());
80+
this.mediaDriver = driver;
5881
this.sendBuffer = new UnsafeBuffer(new ExpandableArrayBuffer(AeronMessageCodec.FRAME_LENGTH));
5982
this.idleStrategy = config.idleStrategy();
6083
}
@@ -100,6 +123,9 @@ public void close() {
100123
publication.close();
101124
subscription.close();
102125
aeron.close();
126+
if (mediaDriver != null) {
127+
mediaDriver.close();
128+
}
103129
}
104130

105131
public AeronConfig config() {

0 commit comments

Comments
 (0)