-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathnetconn.go
153 lines (128 loc) · 3.26 KB
/
netconn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright (c) 2022-present, DiceDB contributors
// All rights reserved. Licensed under the BSD 3-Clause License. See LICENSE file in the project root for full license information.
package ironhawk
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net"
"os"
"time"
"github.com/dicedb/dice/config"
"github.com/dicedb/dicedb-go/wire"
"google.golang.org/protobuf/proto"
)
var (
ErrRequestTooLarge = errors.New("request too large")
ErrIdleTimeout = errors.New("connection idle timeout")
ErrorClosed = errors.New("connection closed")
)
// IOHandler handles I/O operations for a network connection
type IOHandler struct {
fd int
file *os.File
conn net.Conn
}
// NewIOHandler creates a new IOHandler from a file descriptor
func NewIOHandler(clientFD int) (*IOHandler, error) {
file := os.NewFile(uintptr(clientFD), "client-connection")
if file == nil {
return nil, fmt.Errorf("failed to create file from file descriptor")
}
var conn net.Conn
defer func() {
// Only close the file if we haven't successfully created a net.Conn
if conn == nil {
file.Close()
}
}()
var err error
conn, err = net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn from file descriptor: %w", err)
}
if tcpConn, ok := conn.(*net.TCPConn); ok {
if err := tcpConn.SetNoDelay(true); err != nil {
return nil, fmt.Errorf("failed to set TCP_NODELAY: %w", err)
}
if err := tcpConn.SetKeepAlive(true); err != nil {
return nil, fmt.Errorf("failed to set keepalive: %w", err)
}
if err := tcpConn.SetKeepAlivePeriod(time.Duration(config.KeepAlive) * time.Second); err != nil {
return nil, fmt.Errorf("failed to set keepalive period: %w", err)
}
}
return &IOHandler{
fd: clientFD,
file: file,
conn: conn,
}, nil
}
func NewIOHandlerWithConn(conn net.Conn) *IOHandler {
return &IOHandler{
conn: conn,
}
}
// ReadRequest reads data from the network connection
func (h *IOHandler) Read(ctx context.Context) ([]byte, error) {
return nil, nil
}
// ReadRequest reads data from the network connection
func (h *IOHandler) ReadSync() (*wire.Command, error) {
var result []byte
reader := bufio.NewReaderSize(h.conn, config.IoBufferSize)
buf := make([]byte, config.IoBufferSize)
for {
n, err := reader.Read(buf)
if n > 0 {
if len(result)+n > config.MaxRequestSize {
return nil, fmt.Errorf("request too large")
}
result = append(result, buf[:n]...)
}
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
if n < len(buf) {
break
}
}
if len(result) == 0 {
return nil, io.EOF
}
c := &wire.Command{}
if err := proto.Unmarshal(result, c); err != nil {
return nil, fmt.Errorf("failed to unmarshal command: %w", err)
}
return c, nil
}
func (h *IOHandler) Write(ctx context.Context, r interface{}) error {
return nil
}
func (h *IOHandler) WriteSync(ctx context.Context, r *wire.Result) error {
var b []byte
var err error
if b, err = proto.Marshal(r); err != nil {
return err
}
if _, err := h.conn.Write(b); err != nil {
return err
}
return nil
}
// Close underlying network connection
func (h *IOHandler) Close() error {
var err error
if h.conn != nil {
err = errors.Join(err, h.conn.Close())
}
if h.file != nil {
err = errors.Join(err, h.file.Close())
}
return err
}