Skip to content

Commit be14357

Browse files
authored
Start modelling firehose (#6)
1 parent 26ebeaf commit be14357

File tree

3 files changed

+177
-4
lines changed

3 files changed

+177
-4
lines changed

jetstream/build.gradle.kts

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ dependencies {
1818
implementation(platform("org.slf4j:slf4j-bom:2.0.16"))
1919
implementation("org.slf4j:slf4j-api")
2020
implementation("com.github.luben:zstd-jni:1.5.6-7")
21+
implementation(platform("com.fasterxml.jackson:jackson-bom:2.18.1"))
22+
implementation("com.fasterxml.jackson.core:jackson-core")
23+
implementation("com.fasterxml.jackson.module:jackson-module-blackbird")
2124
}
2225

2326
java {
@@ -82,7 +85,7 @@ publishing {
8285

8386
pom {
8487
name = "bluesky-jvm-jetstream"
85-
description = "Tools to work with Bluesky's Jetstream system"
88+
description = "Tools to help you build Bluesky integrations in Java, Kotlin, and JVM systems"
8689
url = "https://github.com/lopcode/bluesky-jvm"
8790
licenses {
8891
license {

jetstream/src/main/java/com/lopcode/bluesky/jetstream/Jetstream.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
package com.lopcode.bluesky.jetstream;
22

3-
import com.github.luben.zstd.*;
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.DeserializationFeature;
5+
import com.fasterxml.jackson.databind.json.JsonMapper;
6+
import com.fasterxml.jackson.module.blackbird.BlackbirdModule;
7+
import com.github.luben.zstd.ZstdDecompressCtx;
8+
import com.github.luben.zstd.ZstdDictDecompress;
9+
import com.github.luben.zstd.ZstdException;
10+
import com.lopcode.bluesky.jetstream.model.JetstreamEvent;
411
import org.slf4j.Logger;
512
import org.slf4j.LoggerFactory;
613

7-
import java.io.*;
14+
import java.io.IOException;
815
import java.net.URI;
916
import java.net.http.HttpClient;
1017
import java.net.http.WebSocket;
@@ -24,6 +31,11 @@ public class Jetstream {
2431
private final int maxWebsocketFrameBufferSizeBytes = 16 * 1024 * 1000;
2532
private final ByteBuffer frameBuffer = ByteBuffer.allocateDirect(maxWebsocketFrameBufferSizeBytes);
2633
private final ByteBuffer decompressBuffer = ByteBuffer.allocateDirect(maxWebsocketFrameBufferSizeBytes);
34+
private final JsonMapper jsonMapper = JsonMapper.builder()
35+
.addModule(new BlackbirdModule())
36+
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
37+
.build();
38+
2739
private Instant connectedAt;
2840

2941
public void start() throws IOException {
@@ -143,6 +155,12 @@ void logDecompressedMessage(
143155
} finally {
144156
decompressBuffer.clear();
145157
}
146-
logger.info("{}: {}", messageId, text);
158+
JetstreamEvent event;
159+
try {
160+
event = jsonMapper.readValue(text, JetstreamEvent.class);
161+
} catch (JsonProcessingException e) {
162+
throw new RuntimeException(e);
163+
}
164+
logger.info("{}: {}", messageId, event);
147165
}
148166
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package com.lopcode.bluesky.jetstream.model;
2+
3+
import com.fasterxml.jackson.annotation.JsonProperty;
4+
import com.fasterxml.jackson.annotation.JsonSubTypes;
5+
import com.fasterxml.jackson.annotation.JsonTypeInfo;
6+
import com.fasterxml.jackson.databind.JsonNode;
7+
8+
@JsonTypeInfo(
9+
use = JsonTypeInfo.Id.NAME,
10+
include = JsonTypeInfo.As.EXISTING_PROPERTY,
11+
defaultImpl = JetstreamEvent.Unknown.class,
12+
property = "kind",
13+
visible = true
14+
)
15+
@JsonSubTypes({
16+
@JsonSubTypes.Type(value = JetstreamEvent.Commit.class, name = "commit"),
17+
@JsonSubTypes.Type(value = JetstreamEvent.Identity.class, name = "identity"),
18+
@JsonSubTypes.Type(value = JetstreamEvent.Account.class, name = "account")
19+
})
20+
public class JetstreamEvent {
21+
@JsonProperty("did")
22+
String did;
23+
@JsonProperty("kind")
24+
String kind;
25+
@JsonProperty("time_us")
26+
long unixTimeMicroseconds;
27+
28+
public static class Commit extends JetstreamEvent {
29+
@JsonProperty("commit")
30+
CommitInfo commit;
31+
32+
@Override
33+
public String toString() {
34+
return "Commit{" +
35+
"commit=" + commit +
36+
", did='" + did + '\'' +
37+
", kind='" + kind + '\'' +
38+
", unixTimeMicroseconds=" + unixTimeMicroseconds +
39+
'}';
40+
}
41+
}
42+
43+
public static class CommitInfo {
44+
@JsonProperty("rev")
45+
String rev;
46+
@JsonProperty("rkey")
47+
String rkey;
48+
@JsonProperty("operation")
49+
String operation;
50+
@JsonProperty("collection")
51+
String collection;
52+
@JsonProperty("cid")
53+
String cid;
54+
@JsonProperty("record")
55+
JsonNode record; // todo: model atproto records
56+
57+
@Override
58+
public String toString() {
59+
return "CommitInfo{" +
60+
"rev='" + rev + '\'' +
61+
", rkey='" + rkey + '\'' +
62+
", operation='" + operation + '\'' +
63+
", collection='" + collection + '\'' +
64+
", cid='" + cid + '\'' +
65+
", record=" + record +
66+
'}';
67+
}
68+
}
69+
70+
public static class Identity extends JetstreamEvent {
71+
@JsonProperty("identity")
72+
IdentityInfo identity;
73+
74+
@Override
75+
public String toString() {
76+
return "Identity{" +
77+
"identity=" + identity +
78+
", did='" + did + '\'' +
79+
", kind='" + kind + '\'' +
80+
", unixTimeMicroseconds=" + unixTimeMicroseconds +
81+
'}';
82+
}
83+
}
84+
85+
public static class IdentityInfo {
86+
@JsonProperty("did")
87+
String did;
88+
@JsonProperty("handle")
89+
String handle;
90+
@JsonProperty("seq")
91+
Long seq;
92+
@JsonProperty("time")
93+
String time;
94+
95+
@Override
96+
public String toString() {
97+
return "IdentityInfo{" +
98+
"did='" + did + '\'' +
99+
", handle='" + handle + '\'' +
100+
", seq=" + seq +
101+
", time='" + time + '\'' +
102+
'}';
103+
}
104+
}
105+
106+
public static class Account extends JetstreamEvent {
107+
@JsonProperty("account")
108+
AccountInfo account;
109+
110+
@Override
111+
public String toString() {
112+
return "Account{" +
113+
"account=" + account +
114+
'}';
115+
}
116+
}
117+
118+
public static class AccountInfo {
119+
@JsonProperty("active")
120+
boolean active;
121+
@JsonProperty("did")
122+
String did;
123+
@JsonProperty("seq")
124+
Long seq;
125+
@JsonProperty("time")
126+
String time;
127+
@JsonProperty("status")
128+
String status;
129+
130+
@Override
131+
public String toString() {
132+
return "AccountInfo{" +
133+
"active=" + active +
134+
", did='" + did + '\'' +
135+
", seq=" + seq +
136+
", time='" + time + '\'' +
137+
", status='" + status + '\'' +
138+
'}';
139+
}
140+
}
141+
142+
public static class Unknown extends JetstreamEvent {
143+
@Override
144+
public String toString() {
145+
return "Unknown{" +
146+
"did='" + did + '\'' +
147+
", kind='" + kind + '\'' +
148+
", unixTimeMicroseconds=" + unixTimeMicroseconds +
149+
'}';
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)