Skip to content

Commit 7dba62a

Browse files
committed
some prettyfications
1 parent ab314c0 commit 7dba62a

File tree

3 files changed

+82
-74
lines changed

3 files changed

+82
-74
lines changed

conn.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ func (s *startupCoordinator) setupConn(ctx context.Context) error {
394394
return errors.New("gocql: no response to connection startup within timeout")
395395
}
396396

397-
// connection is set up and ready to use native proto v5 if it is set
397+
// connection is set up and ready to use native proto v5
398398
s.conn.connReady = true
399399
return nil
400400
}
@@ -584,12 +584,15 @@ func (c *Conn) serve(ctx context.Context) {
584584
c.closeWithError(err)
585585
}
586586

587-
// readUncompressedFrame
588587
func (c *Conn) recvV5Frame(ctx context.Context) error {
589-
var payload []byte
590-
var isSelfContained bool
591-
var err error
588+
const frameHeaderLength = 9
589+
var (
590+
payload []byte
591+
isSelfContained bool
592+
err error
593+
)
592594

595+
// Read frame based on compression
593596
if c.compressor != nil {
594597
payload, isSelfContained, err = readCompressedFrame(c.r, c.compressor)
595598
} else {
@@ -600,7 +603,6 @@ func (c *Conn) recvV5Frame(ctx context.Context) error {
600603
}
601604

602605
if isSelfContained {
603-
// TODO handle case when there are more than 1 envelop inside the frame
604606
return c.processFrame(ctx, bytes.NewBuffer(payload))
605607
}
606608

@@ -609,12 +611,12 @@ func (c *Conn) recvV5Frame(ctx context.Context) error {
609611
return err
610612
}
611613

612-
bytesToRead := head.length - len(payload) + 9
614+
// Calculate bytes to read
615+
bytesToRead := head.length - len(payload) + frameHeaderLength
613616

614617
buf := bytes.NewBuffer(payload)
615618

616-
err = c.recvMultiFrame(ctx, buf, bytesToRead)
617-
if err != nil {
619+
if err = c.recvMultiFrame(buf, bytesToRead); err != nil {
618620
return err
619621
}
620622

@@ -1813,11 +1815,16 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
18131815
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", schemas)
18141816
}
18151817

1816-
func (c *Conn) recvMultiFrame(ctx context.Context, src io.Writer, bytesToRead int) error {
1817-
var read int
1818-
var segment []byte
1819-
var err error
1818+
func (c *Conn) recvMultiFrame(src io.Writer, bytesToRead int) error {
1819+
var (
1820+
read int
1821+
segment []byte
1822+
err error
1823+
)
1824+
1825+
// Loop until the total bytes read equals the expected bytes to read
18201826
for read != bytesToRead {
1827+
// Read frame based on compression
18211828
if c.compressor != nil {
18221829
segment, _, err = readCompressedFrame(c.r, c.compressor)
18231830
} else {
@@ -1827,6 +1834,7 @@ func (c *Conn) recvMultiFrame(ctx context.Context, src io.Writer, bytesToRead in
18271834
return fmt.Errorf("failed to read multi-frame frame: %w", err)
18281835
}
18291836

1837+
// Write the segment to the destination writer
18301838
n, _ := src.Write(segment)
18311839
read += n
18321840
}

crc.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,34 @@ package gocql
33
import "hash/crc32"
44

55
var (
6-
// (byte) 0xFA, (byte) 0x2D, (byte) 0x55, (byte) 0xCA
6+
// Initial CRC32 bytes: 0xFA, 0x2D, 0x55, 0xCA
77
initialCRC32Bytes = []byte{0xfa, 0x2d, 0x55, 0xca}
88
)
99

10+
// Checksum calculates the CRC32 checksum of the given byte slice.
1011
func Checksum(b []byte) uint32 {
1112
crc := crc32.NewIEEE()
1213
crc.Reset()
13-
crc.Write(initialCRC32Bytes)
14+
crc.Write(initialCRC32Bytes) // Include initial CRC32 bytes
1415
crc.Write(b)
1516
return crc.Sum32()
1617
}
1718

1819
const (
19-
crc24Init = 0x875060
20-
crc24Poly = 0x1974F0B
20+
crc24Init = 0x875060 // Initial value for CRC24 calculation
21+
crc24Poly = 0x1974F0B // Polynomial for CRC24 calculation
2122
)
2223

24+
// KoopmanChecksum calculates the CRC24 checksum using the Koopman polynomial.
2325
func KoopmanChecksum(buf []byte) uint32 {
24-
crc := crc24Init
26+
crc := crc24Init // Initialize CRC with crc24Init value
2527
for _, b := range buf {
26-
crc ^= int(b) << 16
28+
crc ^= int(b) << 16 // XOR the byte shifted left by 16 bits with the current CRC value
2729

28-
for i := 0; i < 8; i++ {
29-
crc <<= 1
30-
if crc&0x1000000 != 0 {
31-
crc ^= crc24Poly
30+
for i := 0; i < 8; i++ { // Process each bit in the byte
31+
crc <<= 1 // Shift CRC left by 1 bit
32+
if crc&0x1000000 != 0 { // If the highest bit (24th bit) is set
33+
crc ^= crc24Poly // XOR the CRC value with the crc24Poly
3234
}
3335
}
3436
}

frame.go

Lines changed: 49 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2069,16 +2069,20 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
20692069
}
20702070

20712071
func (f *framer) prepareModernLayout() error {
2072+
// Ensure protocol version is V5 or higher
20722073
if f.proto < protoVersion5 {
20732074
panic("Modern layout is not supported with version V4 or less")
20742075
}
20752076

20762077
selfContained := true
20772078

2078-
var adjustedBuf []byte
2079-
var tempBuf []byte
2080-
var err error
2079+
var (
2080+
adjustedBuf []byte
2081+
tempBuf []byte
2082+
err error
2083+
)
20812084

2085+
// Process the buffer in chunks if it exceeds the max payload size
20822086
for len(f.buf) > maxPayloadSize {
20832087
if f.compres != nil {
20842088
tempBuf, err = newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
@@ -2094,6 +2098,7 @@ func (f *framer) prepareModernLayout() error {
20942098
selfContained = false
20952099
}
20962100

2101+
// Process the remaining buffer
20972102
if f.compres != nil {
20982103
tempBuf, err = newCompressedFrame(f.buf, selfContained, f.compres)
20992104
} else {
@@ -2113,11 +2118,12 @@ func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
21132118
const headerSize = 6
21142119
header := [headerSize + 1]byte{}
21152120

2116-
_, err := io.ReadFull(r, header[:headerSize])
2117-
if err != nil {
2121+
// Read the frame header
2122+
if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
21182123
return nil, false, fmt.Errorf("gocql: failed to read uncompressed frame, err: %w", err)
21192124
}
21202125

2126+
// Compute and verify the header CRC24
21212127
computedHeaderCRC24 := KoopmanChecksum(header[:3])
21222128
readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
21232129
if computedHeaderCRC24 != readHeaderCRC24 {
@@ -2129,15 +2135,14 @@ func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
21292135
payloadLen := int(headerInt & 0x1FFFF)
21302136
isSelfContained := (headerInt & (1 << 17)) != 0
21312137

2138+
// Read the payload
21322139
payload := make([]byte, payloadLen)
2133-
_, err = io.ReadFull(r, payload)
2134-
if err != nil {
2140+
if _, err := io.ReadFull(r, payload); err != nil {
21352141
return nil, false, fmt.Errorf("gocql: failed to read uncompressed frame payload, err: %w", err)
21362142
}
21372143

2138-
// reading payload crc32
2139-
_, err = io.ReadFull(r, header[:4])
2140-
if err != nil {
2144+
// Read and verify the payload CRC32
2145+
if _, err := io.ReadFull(r, header[:4]); err != nil {
21412146
return nil, false, fmt.Errorf("gocql: failed to read payload crc32, err: %w", err)
21422147
}
21432148

@@ -2160,10 +2165,9 @@ func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, error)
21602165

21612166
payloadLen := len(payload)
21622167
if payloadLen > maxPayloadSize {
2163-
return nil, fmt.Errorf("payload length (%d) excides maximum size of 128 KiB", payloadLen)
2168+
return nil, fmt.Errorf("payload length (%d) exceeds maximum size of 128 KiB", payloadLen)
21642169
}
21652170

2166-
// Create the header
21672171
header := make([]byte, headerSize)
21682172

21692173
// First 3 bytes: payload length and self-contained flag
@@ -2188,8 +2192,8 @@ func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, error)
21882192
// Create the frame
21892193
frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
21902194
frame := make([]byte, frameSize)
2191-
copy(frame, header)
2192-
copy(frame[headerSize:], payload)
2195+
copy(frame, header) // Copy the header to the frame
2196+
copy(frame[headerSize:], payload) // Copy the payload to the frame
21932197

21942198
// Calculate CRC32 for the payload
21952199
payloadCRC32 := Checksum(payload)
@@ -2203,106 +2207,100 @@ func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, compre
22032207
if err != nil {
22042208
return nil, err
22052209
}
2206-
// skipping first 4 bytes because size of uncompressed payload now is written in frame header,
2207-
// not in the body of compressed envelope
2210+
2211+
// Skip the first 4 bytes because the size of the uncompressed payload is written in the frame header, not in the
2212+
// body of the compressed envelope
22082213
compressedPayload = compressedPayload[4:]
22092214

22102215
compressedLen := len(compressedPayload)
22112216
uncompressedLen := len(uncompressedPayload)
22122217

22132218
if compressedLen > maxPayloadSize {
2214-
return nil, fmt.Errorf("compressed payload length exceedes max size of frame payload %d/%d", compressedLen, maxPayloadSize)
2219+
return nil, fmt.Errorf("compressed payload length exceeds max size of frame payload %d/%d", compressedLen, maxPayloadSize)
22152220
}
22162221

22172222
if uncompressedLen > maxPayloadSize {
2218-
return nil, fmt.Errorf("uncompressed compressed payload length exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
2223+
return nil, fmt.Errorf("uncompressed payload length exceeds max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
22192224
}
22202225

2226+
// Combine compressed and uncompressed lengths and set the self-contained flag if needed
22212227
combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
22222228
if isSelfContained {
22232229
combined |= 1 << 34
22242230
}
22252231

22262232
var headerBuf [8]byte
22272233

2234+
// Write the combined value into the header buffer
22282235
binary.LittleEndian.PutUint64(headerBuf[:], combined)
22292236

2230-
// 8 - size of header, 4 - size of crc32 for payload
2237+
// Create a buffer with enough capacity to hold the header, compressed payload, and checksums
22312238
buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
22322239

2233-
// writing compressed and uncompressed sizes
2240+
// Write the first 5 bytes of the header (compressed and uncompressed sizes)
22342241
buf.Write(headerBuf[:5])
22352242

2236-
// writing crc24 of first 5 bytes
2243+
// Compute and write the CRC24 checksum of the first 5 bytes
22372244
headerChecksum := KoopmanChecksum(headerBuf[:5])
22382245
binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
22392246
buf.Write(headerBuf[:3])
2240-
2241-
// writing compressed payload
22422247
buf.Write(compressedPayload)
22432248

2244-
// writing checksum of payload
2249+
// Compute and write the CRC32 checksum of the payload
22452250
payloadChecksum := Checksum(compressedPayload)
22462251
binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
22472252
buf.Write(headerBuf[:4])
22482253

2249-
return buf.Bytes(), err
2254+
return buf.Bytes(), nil
22502255
}
22512256

22522257
func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool, error) {
2253-
var headerBuf [8]byte
2254-
_, err := io.ReadFull(r, headerBuf[:])
2255-
if err != nil {
2258+
var (
2259+
headerBuf [8]byte
2260+
err error
2261+
)
2262+
2263+
if _, err = io.ReadFull(r, headerBuf[:]); err != nil {
22562264
return nil, false, err
22572265
}
22582266

2259-
// reading checksum from frame header
2267+
// Reading checksum from frame header
22602268
readHeaderChecksum := uint32(headerBuf[5]) | uint32(headerBuf[6])<<8 | uint32(headerBuf[7])<<16
2261-
computedHeaderChecksum := KoopmanChecksum(headerBuf[:5])
2262-
if computedHeaderChecksum != readHeaderChecksum {
2269+
if computedHeaderChecksum := KoopmanChecksum(headerBuf[:5]); computedHeaderChecksum != readHeaderChecksum {
22632270
return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame header, read: %d, computed: %d", readHeaderChecksum, computedHeaderChecksum)
22642271
}
22652272

2266-
// first 17 bits - payload size after compression
2267-
compressedLen := uint32(headerBuf[0]) |
2268-
uint32(headerBuf[1])<<8 |
2269-
uint32(headerBuf[2]&0x1)<<16
2273+
// First 17 bits - payload size after compression
2274+
compressedLen := uint32(headerBuf[0]) | uint32(headerBuf[1])<<8 | uint32(headerBuf[2]&0x1)<<16
22702275

2271-
// the next 17 bits - payload size before compression
2272-
uncompressedLen := (uint32(headerBuf[2]) >> 1) |
2273-
uint32(headerBuf[3])<<7 |
2274-
uint32(headerBuf[4]&0b11)<<15
2276+
// The next 17 bits - payload size before compression
2277+
uncompressedLen := (uint32(headerBuf[2]) >> 1) | uint32(headerBuf[3])<<7 | uint32(headerBuf[4]&0b11)<<15
22752278

2276-
// self-contained flag
2279+
// Self-contained flag
22772280
selfContained := (headerBuf[4] & 0b100) != 0
22782281

22792282
compressedPayload := make([]byte, compressedLen)
2280-
_, err = io.ReadFull(r, compressedPayload)
2281-
if err != nil {
2283+
if _, err = io.ReadFull(r, compressedPayload); err != nil {
22822284
return nil, false, err
22832285
}
22842286

2285-
_, err = io.ReadFull(r, headerBuf[:4])
2286-
if err != nil {
2287+
if _, err = io.ReadFull(r, headerBuf[:4]); err != nil {
22872288
return nil, false, err
22882289
}
22892290

2290-
// ensuring if payload checksum matches
2291+
// Ensuring if payload checksum matches
22912292
readPayloadChecksum := binary.LittleEndian.Uint32(headerBuf[:4])
2292-
computedPayloadChecksum := Checksum(compressedPayload)
2293-
if readPayloadChecksum != computedPayloadChecksum {
2293+
if computedPayloadChecksum := Checksum(compressedPayload); readPayloadChecksum != computedPayloadChecksum {
22942294
return nil, false, fmt.Errorf("gocql: crc32 mismatch in payload, read: %d, computed: %d", readPayloadChecksum, computedPayloadChecksum)
22952295
}
22962296

22972297
var uncompressedPayload []byte
22982298
if uncompressedLen > 0 {
2299-
uncompressedPayload, err = compressor.DecodeSized(compressedPayload, uncompressedLen)
2300-
if err != nil {
2299+
if uncompressedPayload, err = compressor.DecodeSized(compressedPayload, uncompressedLen); err != nil {
23012300
return nil, false, err
23022301
}
2303-
23042302
if uint32(len(uncompressedPayload)) != uncompressedLen {
2305-
return nil, false, fmt.Errorf("gocql: length mismatch after payload decompression, got %d, read %d", len(uncompressedPayload), uncompressedLen)
2303+
return nil, false, fmt.Errorf("gocql: length mismatch after payload decompression, got %d, expected %d", len(uncompressedPayload), uncompressedLen)
23062304
}
23072305
} else {
23082306
uncompressedPayload = compressedPayload

0 commit comments

Comments
 (0)