Skip to content
This repository was archived by the owner on Dec 5, 2023. It is now read-only.

Commit fd9e6c2

Browse files
author
embs
committed
Enrich Zipkin db tracing
Add info regarding - database type and address - approximation for queries results size
1 parent 9de2c86 commit fd9e6c2

File tree

6 files changed

+168
-28
lines changed

6 files changed

+168
-28
lines changed

cmd/cataloguesvc/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,10 @@ func main() {
106106
}
107107

108108
// Data domain.
109-
db, err := sqlx.Open("mysql", *dsn)
109+
var db catalogue.Database
110+
sqlxdb, err := sqlx.Open("mysql", *dsn)
111+
db = &catalogue.SqlxDb{Db: sqlxdb}
112+
db = catalogue.DbTracingMiddleware()(db)
110113
if err != nil {
111114
logger.Log("err", err)
112115
os.Exit(1)

db.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package catalogue
2+
3+
import(
4+
"context"
5+
"database/sql"
6+
7+
"github.com/jmoiron/sqlx"
8+
)
9+
10+
type Database interface {
11+
Close() error
12+
Ping() error
13+
Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error
14+
Prepare(query string) (StmtMiddleware, error)
15+
Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error
16+
Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
17+
}
18+
19+
// SqlxDb meets the Database interface requirements
20+
type SqlxDb struct {
21+
// db is a reference for the underlying database implementation
22+
Db *sqlx.DB
23+
}
24+
25+
func (sqlxdb *SqlxDb) Close() error {
26+
return sqlxdb.Db.Close()
27+
}
28+
29+
func (sqlxdb *SqlxDb) Ping() error {
30+
return sqlxdb.Db.Ping()
31+
}
32+
33+
func (sqlxdb *SqlxDb) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
34+
return sqlxdb.Db.Select(dest, query, args...)
35+
}
36+
37+
func (sqlxdb *SqlxDb) Prepare(query string) (StmtMiddleware, error) {
38+
sel, err := sqlxdb.Db.Prepare(query)
39+
return StmtMiddleware{next: sel}, err
40+
}
41+
42+
func (sqlxdb *SqlxDb) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
43+
return sqlxdb.Db.Get(dest, query, args...)
44+
}
45+
46+
func (sqlxdb *SqlxDb) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
47+
return sqlxdb.Db.Query(query, args...)
48+
}

db_tracing_middleware.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package catalogue
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"unsafe"
7+
8+
otext "github.com/opentracing/opentracing-go/ext"
9+
stdopentracing "github.com/opentracing/opentracing-go"
10+
)
11+
12+
// Middleware decorates a database.
13+
type DbMiddleware func(Database) Database
14+
15+
// DbTracingMiddleware traces database calls.
16+
func DbTracingMiddleware() DbMiddleware {
17+
return func(next Database) Database {
18+
return dbTracingMiddleware{
19+
next: next,
20+
}
21+
}
22+
}
23+
24+
type dbTracingMiddleware struct {
25+
next Database
26+
}
27+
28+
type StmtMiddleware struct {
29+
next *sql.Stmt
30+
}
31+
32+
func (stmt StmtMiddleware) Close() error {
33+
return stmt.next.Close()
34+
}
35+
36+
func (stmt StmtMiddleware) QueryRow(ctx context.Context, args ...interface{}) *sql.Row {
37+
span := startSpan(ctx, "rows from database")
38+
rows := stmt.next.QueryRow(args...)
39+
finishSpan(span, unsafe.Sizeof(rows))
40+
return rows
41+
}
42+
43+
func (mw dbTracingMiddleware) Close() error {
44+
return mw.next.Close()
45+
}
46+
47+
func (mw dbTracingMiddleware) Ping() error {
48+
return mw.next.Ping()
49+
}
50+
51+
func (mw dbTracingMiddleware) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
52+
span := startSpan(ctx, "socks from database")
53+
err := mw.next.Select(ctx, dest, query, args...)
54+
finishSpan(span, unsafe.Sizeof(dest))
55+
return err
56+
}
57+
58+
func (mw dbTracingMiddleware) Prepare(query string) (StmtMiddleware, error) {
59+
return mw.next.Prepare(query)
60+
}
61+
62+
func (mw dbTracingMiddleware) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
63+
span := startSpan(ctx, "get from database")
64+
err := mw.next.Get(ctx, dest, query, args...)
65+
finishSpan(span, unsafe.Sizeof(dest))
66+
return err
67+
}
68+
69+
func (mw dbTracingMiddleware) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
70+
span := startSpan(ctx, "query from database")
71+
rows, err := mw.next.Query(ctx, query, args...)
72+
finishSpan(span, unsafe.Sizeof(rows))
73+
return rows, err
74+
}
75+
76+
func startSpan(ctx context.Context, n string) stdopentracing.Span {
77+
var span stdopentracing.Span
78+
span, ctx = stdopentracing.StartSpanFromContext(ctx, n)
79+
otext.SpanKindRPCClient.Set(span)
80+
span.SetTag("db.type", "mysql")
81+
span.SetTag("peer.address", "catalogue-db:3306")
82+
return span
83+
}
84+
85+
func finishSpan(span stdopentracing.Span, size uintptr) {
86+
span.SetTag("db.query.result.size", size)
87+
span.Finish()
88+
}

endpoints.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func MakeEndpoints(s Service, tracer stdopentracing.Tracer) Endpoints {
3636
func MakeListEndpoint(s Service) endpoint.Endpoint {
3737
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
3838
req := request.(listRequest)
39-
socks, err := s.List(req.Tags, req.Order, req.PageNum, req.PageSize)
39+
socks, err := s.List(ctx, req.Tags, req.Order, req.PageNum, req.PageSize)
4040
return listResponse{Socks: socks, Err: err}, err
4141
}
4242
}
@@ -45,7 +45,7 @@ func MakeListEndpoint(s Service) endpoint.Endpoint {
4545
func MakeCountEndpoint(s Service) endpoint.Endpoint {
4646
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
4747
req := request.(countRequest)
48-
n, err := s.Count(req.Tags)
48+
n, err := s.Count(ctx, req.Tags)
4949
return countResponse{N: n, Err: err}, err
5050
}
5151
}
@@ -54,15 +54,15 @@ func MakeCountEndpoint(s Service) endpoint.Endpoint {
5454
func MakeGetEndpoint(s Service) endpoint.Endpoint {
5555
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
5656
req := request.(getRequest)
57-
sock, err := s.Get(req.ID)
57+
sock, err := s.Get(ctx, req.ID)
5858
return getResponse{Sock: sock, Err: err}, err
5959
}
6060
}
6161

6262
// MakeTagsEndpoint returns an endpoint via the given service.
6363
func MakeTagsEndpoint(s Service) endpoint.Endpoint {
6464
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
65-
tags, err := s.Tags()
65+
tags, err := s.Tags(ctx)
6666
return tagsResponse{Tags: tags, Err: err}, err
6767
}
6868
}

logging.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package catalogue
22

33
import (
4+
"context"
45
"strings"
56
"time"
67

@@ -22,7 +23,7 @@ type loggingMiddleware struct {
2223
logger log.Logger
2324
}
2425

25-
func (mw loggingMiddleware) List(tags []string, order string, pageNum, pageSize int) (socks []Sock, err error) {
26+
func (mw loggingMiddleware) List(ctx context.Context, tags []string, order string, pageNum, pageSize int) (socks []Sock, err error) {
2627
defer func(begin time.Time) {
2728
mw.logger.Log(
2829
"method", "List",
@@ -35,10 +36,10 @@ func (mw loggingMiddleware) List(tags []string, order string, pageNum, pageSize
3536
"took", time.Since(begin),
3637
)
3738
}(time.Now())
38-
return mw.next.List(tags, order, pageNum, pageSize)
39+
return mw.next.List(ctx, tags, order, pageNum, pageSize)
3940
}
4041

41-
func (mw loggingMiddleware) Count(tags []string) (n int, err error) {
42+
func (mw loggingMiddleware) Count(ctx context.Context, tags []string) (n int, err error) {
4243
defer func(begin time.Time) {
4344
mw.logger.Log(
4445
"method", "Count",
@@ -48,10 +49,10 @@ func (mw loggingMiddleware) Count(tags []string) (n int, err error) {
4849
"took", time.Since(begin),
4950
)
5051
}(time.Now())
51-
return mw.next.Count(tags)
52+
return mw.next.Count(ctx, tags)
5253
}
5354

54-
func (mw loggingMiddleware) Get(id string) (s Sock, err error) {
55+
func (mw loggingMiddleware) Get(ctx context.Context, id string) (s Sock, err error) {
5556
defer func(begin time.Time) {
5657
mw.logger.Log(
5758
"method", "Get",
@@ -61,10 +62,10 @@ func (mw loggingMiddleware) Get(id string) (s Sock, err error) {
6162
"took", time.Since(begin),
6263
)
6364
}(time.Now())
64-
return mw.next.Get(id)
65+
return mw.next.Get(ctx, id)
6566
}
6667

67-
func (mw loggingMiddleware) Tags() (tags []string, err error) {
68+
func (mw loggingMiddleware) Tags(ctx context.Context) (tags []string, err error) {
6869
defer func(begin time.Time) {
6970
mw.logger.Log(
7071
"method", "Tags",
@@ -73,7 +74,7 @@ func (mw loggingMiddleware) Tags() (tags []string, err error) {
7374
"took", time.Since(begin),
7475
)
7576
}(time.Now())
76-
return mw.next.Tags()
77+
return mw.next.Tags(ctx)
7778
}
7879

7980
func (mw loggingMiddleware) Health() (health []Health) {

service.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,21 @@ package catalogue
44
// catalogue service. Everything here is agnostic to the transport (HTTP).
55

66
import (
7+
"context"
78
"errors"
89
"strings"
910
"time"
1011

1112
"github.com/go-kit/kit/log"
12-
"github.com/jmoiron/sqlx"
1313
)
1414

1515
// Service is the catalogue service, providing read operations on a saleable
1616
// catalogue of sock products.
1717
type Service interface {
18-
List(tags []string, order string, pageNum, pageSize int) ([]Sock, error) // GET /catalogue
19-
Count(tags []string) (int, error) // GET /catalogue/size
20-
Get(id string) (Sock, error) // GET /catalogue/{id}
21-
Tags() ([]string, error) // GET /tags
18+
List(ctx context.Context, tags []string, order string, pageNum, pageSize int) ([]Sock, error) // GET /catalogue
19+
Count(ctx context.Context, tags []string) (int, error) // GET /catalogue/size
20+
Get(ctx context.Context, id string) (Sock, error) // GET /catalogue/{id}
21+
Tags(ctx context.Context) ([]string, error) // GET /tags
2222
Health() []Health // GET /health
2323
}
2424

@@ -56,19 +56,19 @@ var baseQuery = "SELECT sock.sock_id AS id, sock.name, sock.description, sock.pr
5656

5757
// NewCatalogueService returns an implementation of the Service interface,
5858
// with connection to an SQL database.
59-
func NewCatalogueService(db *sqlx.DB, logger log.Logger) Service {
59+
func NewCatalogueService(db Database, logger log.Logger) Service {
6060
return &catalogueService{
6161
db: db,
6262
logger: logger,
6363
}
6464
}
6565

6666
type catalogueService struct {
67-
db *sqlx.DB
67+
db Database
6868
logger log.Logger
6969
}
7070

71-
func (s *catalogueService) List(tags []string, order string, pageNum, pageSize int) ([]Sock, error) {
71+
func (s *catalogueService) List(ctx context.Context, tags []string, order string, pageNum, pageSize int) ([]Sock, error) {
7272
var socks []Sock
7373
query := baseQuery
7474

@@ -93,7 +93,7 @@ func (s *catalogueService) List(tags []string, order string, pageNum, pageSize i
9393

9494
query += ";"
9595

96-
err := s.db.Select(&socks, query, args...)
96+
err := s.db.Select(ctx, &socks, query, args...)
9797
if err != nil {
9898
s.logger.Log("database error", err)
9999
return []Sock{}, ErrDBConnection
@@ -111,7 +111,7 @@ func (s *catalogueService) List(tags []string, order string, pageNum, pageSize i
111111
return socks, nil
112112
}
113113

114-
func (s *catalogueService) Count(tags []string) (int, error) {
114+
func (s *catalogueService) Count(ctx context.Context, tags []string) (int, error) {
115115
query := "SELECT COUNT(DISTINCT sock.sock_id) FROM sock JOIN sock_tag ON sock.sock_id=sock_tag.sock_id JOIN tag ON sock_tag.tag_id=tag.tag_id"
116116

117117
var args []interface{}
@@ -137,7 +137,7 @@ func (s *catalogueService) Count(tags []string) (int, error) {
137137
defer sel.Close()
138138

139139
var count int
140-
err = sel.QueryRow(args...).Scan(&count)
140+
err = sel.QueryRow(ctx, args...).Scan(&count)
141141

142142
if err != nil {
143143
s.logger.Log("database error", err)
@@ -147,11 +147,11 @@ func (s *catalogueService) Count(tags []string) (int, error) {
147147
return count, nil
148148
}
149149

150-
func (s *catalogueService) Get(id string) (Sock, error) {
150+
func (s *catalogueService) Get(ctx context.Context, id string) (Sock, error) {
151151
query := baseQuery + " WHERE sock.sock_id =? GROUP BY sock.sock_id;"
152152

153153
var sock Sock
154-
err := s.db.Get(&sock, query, id)
154+
err := s.db.Get(ctx, &sock, query, id)
155155
if err != nil {
156156
s.logger.Log("database error", err)
157157
return Sock{}, ErrNotFound
@@ -181,10 +181,10 @@ func (s *catalogueService) Health() []Health {
181181
return health
182182
}
183183

184-
func (s *catalogueService) Tags() ([]string, error) {
184+
func (s *catalogueService) Tags(ctx context.Context) ([]string, error) {
185185
var tags []string
186186
query := "SELECT name FROM tag;"
187-
rows, err := s.db.Query(query)
187+
rows, err := s.db.Query(ctx, query)
188188
if err != nil {
189189
s.logger.Log("database error", err)
190190
return []string{}, ErrDBConnection

0 commit comments

Comments
 (0)