Skip to content

Commit 2d58776

Browse files
authored
Merge pull request #23 from FireTail-io/dev
[MAIN] Fix blocking issue
2 parents 7e4c7ac + d30bbf1 commit 2d58776

File tree

6 files changed

+90
-20
lines changed

6 files changed

+90
-20
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
| ----------------------------------------------- | --------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
1515
| `FIRETAIL_API_TOKEN` || `PS-02-XXXXXXXX` | The API token the sensor will use to report logs to FireTail |
1616
| `BPF_EXPRESSION` || `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html |
17-
| `MAX_CONTENT_LENGTH` || `1048576` | The sensor will only read request or response bodies if their length is less than `MAX_CONTENT_LENGTH` bytes. |
17+
| `MAX_CONTENT_LENGTH` || `1048576` | The sensor will only read requests or responses if their length is less than `MAX_CONTENT_LENGTH` bytes. |
1818
| `ENABLE_ONLY_LOG_JSON` || `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. |
1919
| `DISABLE_SERVICE_IP_FILTERING` || `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
2020
| `FIRETAIL_API_URL` || `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. |

src/bidirectional_stream.go

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@ package main
33
import (
44
"bufio"
55
"bytes"
6+
"context"
67
"fmt"
78
"io"
89
"log/slog"
910
"net/http"
1011
"sync"
12+
"time"
1113

1214
"github.com/google/gopacket"
1315
"github.com/google/gopacket/tcpassembly"
1416
"github.com/google/gopacket/tcpassembly/tcpreader"
17+
"golang.org/x/sync/semaphore"
1518
)
1619

1720
type bidirectionalStreamFactory struct {
@@ -25,8 +28,22 @@ func (f *bidirectionalStreamFactory) New(netFlow, tcpFlow gopacket.Flow) tcpasse
2528

2629
// The second time we see the same connection, it will be from the server to the client
2730
if conn, ok := f.conns.LoadAndDelete(fmt.Sprint(key)); ok {
31+
slog.Debug(
32+
"Found existing connection, assuming this is a server to client connection",
33+
"Src", netFlow.Src().String(),
34+
"Dst", netFlow.Dst().String(),
35+
"SrcPort", tcpFlow.Src().String(),
36+
"DstPort", tcpFlow.Dst().String(),
37+
)
2838
return &conn.(*bidirectionalStream).serverToClient
2939
}
40+
slog.Debug(
41+
"Found new connection, assuming this is a client to server connection",
42+
"Src", netFlow.Src().String(),
43+
"Dst", netFlow.Dst().String(),
44+
"SrcPort", tcpFlow.Src().String(),
45+
"DstPort", tcpFlow.Dst().String(),
46+
)
3047

3148
s := &bidirectionalStream{
3249
net: netFlow,
@@ -57,17 +74,22 @@ type bidirectionalStream struct {
5774

5875
func (s *bidirectionalStream) run() {
5976
defer s.closeCallback()
77+
defer s.clientToServer.Close()
78+
defer s.serverToClient.Close()
6079

61-
wg := sync.WaitGroup{}
62-
wg.Add(2)
80+
sem := semaphore.NewWeighted(2)
6381

6482
requestChannel := make(chan *http.Request, 1)
6583
responseChannel := make(chan *http.Response, 1)
66-
defer close(requestChannel)
67-
defer close(responseChannel)
6884

85+
err := sem.Acquire(context.Background(), 1)
86+
if err != nil {
87+
slog.Error("Failed to acquire semaphore for clientToServer reader:", "Err", err.Error())
88+
return
89+
}
6990
go func() {
70-
defer wg.Done()
91+
defer sem.Release(1)
92+
defer close(requestChannel)
7193
defer func() {
7294
if r := recover(); r != nil {
7395
slog.Error("Recovered from panic in clientToServer reader:", "Err", r)
@@ -79,7 +101,7 @@ func (s *bidirectionalStream) run() {
79101
slog.Debug("Failed to read request bytes from stream:", "Err", err.Error(), "BytesRead", bytesRead)
80102
return
81103
}
82-
request, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(requestBytes)))
104+
request, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(requestBytes[:bytesRead])))
83105
if err != nil {
84106
slog.Debug("Failed to read request bytes:", "Err", err.Error())
85107
return
@@ -89,8 +111,14 @@ func (s *bidirectionalStream) run() {
89111
requestChannel <- request
90112
}()
91113

114+
err = sem.Acquire(context.Background(), 1)
115+
if err != nil {
116+
slog.Error("Failed to acquire semaphore for serverToClient reader:", "Err", err.Error())
117+
return
118+
}
92119
go func() {
93-
defer wg.Done()
120+
defer sem.Release(1)
121+
defer close(responseChannel)
94122
defer func() {
95123
if r := recover(); r != nil {
96124
slog.Error("Recovered from panic in serverToClient reader:", "Err", r)
@@ -102,15 +130,23 @@ func (s *bidirectionalStream) run() {
102130
slog.Debug("Failed to read response bytes from stream:", "Err", err.Error(), "BytesRead", bytesRead)
103131
return
104132
}
105-
response, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(responseBytes)), nil)
133+
response, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(responseBytes[:bytesRead])), nil)
106134
if err != nil {
107135
slog.Debug("Failed to read response bytes:", "Err", err.Error())
108136
return
109137
}
110138
responseChannel <- response
111139
}()
112140

113-
wg.Wait()
141+
// Wait for both goroutines to finish with timeout of 2 minutes
142+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
143+
defer cancel()
144+
if err := sem.Acquire(ctx, 2); err != nil {
145+
if err != context.DeadlineExceeded {
146+
slog.Error("Failed to acquire semaphore for both readers:", "Err", err.Error())
147+
}
148+
return
149+
}
114150

115151
var capturedRequest *http.Request
116152
var capturedResponse *http.Response

src/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ require (
3737
github.com/x448/float16 v0.8.4 // indirect
3838
golang.org/x/net v0.39.0 // indirect
3939
golang.org/x/oauth2 v0.27.0 // indirect
40+
golang.org/x/sync v0.14.0
4041
golang.org/x/sys v0.32.0 // indirect
4142
golang.org/x/term v0.31.0 // indirect
4243
golang.org/x/text v0.24.0 // indirect

src/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT
117117
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
118118
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
119119
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
120+
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
121+
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
120122
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
121123
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
122124
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

src/main.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func main() {
128128
"Content-Length", strconv.Itoa(int(requestAndResponse.request.ContentLength)),
129129
)
130130
requestAndResponse.request.Header.Set("Host", requestAndResponse.request.Host)
131+
responseRecorder := httptest.NewRecorder()
131132
firetailMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
132133
w.WriteHeader(requestAndResponse.response.StatusCode)
133134
for key, values := range requestAndResponse.response.Header {
@@ -145,9 +146,17 @@ func main() {
145146
}
146147
w.Write(capturedResponseBody)
147148
})).ServeHTTP(
148-
httptest.NewRecorder(),
149+
responseRecorder,
149150
requestAndResponse.request,
150151
)
152+
if responseRecorder != nil {
153+
slog.Debug(
154+
"Response from Firetail middleware:",
155+
"StatusCode", responseRecorder.Code,
156+
"Header", responseRecorder.Header(),
157+
"Body", responseRecorder.Body.String(),
158+
)
159+
}
151160
default:
152161
}
153162
}

src/request_and_response.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,20 @@ type httpRequestAndResponseStreamer struct {
2929
maxBodySize int64
3030
}
3131

32-
func (s *httpRequestAndResponseStreamer) start() {
32+
func (s *httpRequestAndResponseStreamer) getHandleAndPacketsChannel() (*pcap.Handle, <-chan gopacket.Packet) {
3333
handle, err := pcap.OpenLive("any", 1600, true, pcap.BlockForever)
3434
if err != nil {
3535
log.Fatal(err)
3636
}
37-
defer handle.Close()
38-
3937
err = handle.SetBPFFilter(s.bpfExpression)
4038
if err != nil {
4139
log.Fatal(err)
4240
}
41+
packetsChannel := gopacket.NewPacketSource(handle, handle.LinkType()).Packets()
42+
return handle, packetsChannel
43+
}
4344

45+
func (s *httpRequestAndResponseStreamer) start() {
4446
assembler := tcpassembly.NewAssembler(
4547
tcpassembly.NewStreamPool(
4648
&bidirectionalStreamFactory{
@@ -50,11 +52,27 @@ func (s *httpRequestAndResponseStreamer) start() {
5052
},
5153
),
5254
)
53-
ticker := time.Tick(time.Minute)
54-
packetsChannel := gopacket.NewPacketSource(handle, handle.LinkType()).Packets()
55+
56+
go func() {
57+
ticker := time.Tick(time.Minute)
58+
for {
59+
select {
60+
case <-ticker:
61+
slog.Debug("Flushing old conns...")
62+
assembler.FlushOlderThan(time.Now().Add(-2 * time.Minute))
63+
}
64+
}
65+
}()
66+
67+
handler, packetsChannel := s.getHandleAndPacketsChannel()
5568
for {
5669
select {
57-
case packet := <-packetsChannel:
70+
case packet, ok := <-packetsChannel:
71+
if !ok {
72+
slog.Warn("Packet channel closed. Reinitializing...")
73+
handler.Close()
74+
handler, packetsChannel = s.getHandleAndPacketsChannel()
75+
}
5876
if packet.NetworkLayer() == nil || packet.TransportLayer() == nil {
5977
continue
6078
}
@@ -78,10 +96,14 @@ func (s *httpRequestAndResponseStreamer) start() {
7896
)
7997
continue
8098
}
99+
slog.Debug(
100+
"Captured packet:",
101+
"Src", src,
102+
"Dst", dst,
103+
"SrcPort", tcp.SrcPort.String(),
104+
"DstPort", tcp.DstPort.String(),
105+
)
81106
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
82-
case <-ticker:
83-
assembler.FlushOlderThan(time.Now().Add(-2 * time.Minute))
84-
default:
85107
}
86108
}
87109
}

0 commit comments

Comments
 (0)