Skip to content

Commit 9fc6208

Browse files
committed
structure and shutdown
1 parent 4160b59 commit 9fc6208

3 files changed

Lines changed: 96 additions & 50 deletions

File tree

data/data.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package data
2+
3+
import (
4+
"database/sql"
5+
"log"
6+
7+
_ "modernc.org/sqlite"
8+
)
9+
10+
func Init() *sql.DB {
11+
db, err := sql.Open("sqlite", "posts.db")
12+
if err != nil {
13+
log.Fatal(err)
14+
}
15+
16+
_, err = db.Exec(`
17+
CREATE TABLE IF NOT EXISTS posts (
18+
id INTEGER PRIMARY KEY AUTOINCREMENT,
19+
time_us INTEGER,
20+
text TEXT
21+
) STRICT
22+
`)
23+
if err != nil {
24+
log.Fatal(err)
25+
}
26+
27+
return db
28+
}
29+
30+
func PrepareInsert(dbConn *sql.DB) *sql.Stmt {
31+
stmt, err := dbConn.Prepare("INSERT INTO posts(time_us, text) VALUES (?, ?)")
32+
if err != nil {
33+
log.Fatal(err)
34+
}
35+
return stmt
36+
}

main.go

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,22 @@
11
package main
22

33
import (
4-
"database/sql"
4+
"context"
55
"encoding/json"
6+
"fmt"
67
"log"
78
"net/url"
9+
"os"
10+
"os/signal"
11+
"syscall"
12+
"time"
813

9-
_ "modernc.org/sqlite"
14+
"cave/data"
1015
"github.com/gorilla/websocket"
16+
_ "modernc.org/sqlite"
1117
)
1218

13-
type MinimalEvent struct {
14-
TimeUS int64 `json:"time_us"`
15-
Commit struct {
16-
Record struct {
17-
Text string `json:"text"`
18-
} `json:"record"`
19-
} `json:"commit"`
20-
}
21-
22-
func main() {
23-
24-
db, err := sql.Open("sqlite", "posts.db")
25-
if err != nil {
26-
log.Fatal(err)
27-
}
28-
defer db.Close()
29-
30-
_, err = db.Exec(`
31-
CREATE TABLE IF NOT EXISTS posts (
32-
id INTEGER PRIMARY KEY AUTOINCREMENT,
33-
time_us INTEGER,
34-
text TEXT
35-
) STRICT
36-
`)
37-
if err != nil {
38-
log.Fatal(err)
39-
}
40-
41-
stmt, err := db.Prepare("INSERT INTO posts(time_us, text) VALUES (?, ?)")
42-
if err != nil {
43-
log.Fatal(err)
44-
}
45-
defer stmt.Close()
46-
19+
func websocket_dail() *websocket.Conn {
4720
u := url.URL{
4821
Scheme: "wss",
4922
Host: "jetstream1.us-east.bsky.network",
@@ -55,26 +28,53 @@ func main() {
5528
if err != nil {
5629
log.Fatal("Dial error:", err)
5730
}
31+
return conn
32+
}
33+
34+
func main() {
35+
db := data.Init()
36+
defer db.Close()
37+
38+
stmt := data.PrepareInsert(db)
39+
defer stmt.Close()
40+
41+
conn := websocket_dail()
5842
defer conn.Close()
5943

44+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
45+
defer stop()
46+
47+
log.Println("gathering data from the firehose...")
6048
for {
61-
_, message, err := conn.ReadMessage()
62-
if err != nil {
63-
log.Fatal("Read error:", err)
64-
}
49+
select {
50+
case <-ctx.Done():
51+
fmt.Print("\nwaiting for a proper shutdown")
52+
for _ = range 3 {
53+
time.Sleep(time.Second / 3)
54+
fmt.Print(".")
55+
}
56+
fmt.Println("\nShutting down!")
57+
return
58+
default:
59+
_, message, err := conn.ReadMessage()
60+
if err != nil {
61+
log.Fatal("Read error:", err)
62+
}
6563

66-
var e MinimalEvent
67-
if err := json.Unmarshal(message, &e); err != nil {
68-
continue
69-
}
64+
var e MinimalEvent
65+
if err := json.Unmarshal(message, &e); err != nil {
66+
continue
67+
}
7068

71-
if e.Commit.Record.Text == "" {
72-
continue
73-
}
69+
if e.Commit.Record.Text == "" {
70+
continue
71+
}
7472

75-
_, err = stmt.Exec(e.TimeUS, e.Commit.Record.Text)
76-
if err != nil {
77-
log.Println("Insert error:", err)
73+
_, err = stmt.Exec(e.TimeUS, e.Commit.Record.Text)
74+
if err != nil {
75+
log.Println("Insert error:", err)
76+
}
7877
}
78+
7979
}
8080
}

models.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package main
2+
3+
type MinimalEvent struct {
4+
TimeUS int64 `json:"time_us"`
5+
Commit struct {
6+
Record struct {
7+
Text string `json:"text"`
8+
} `json:"record"`
9+
} `json:"commit"`
10+
}

0 commit comments

Comments
 (0)