Skip to content

feat(go): fury-go implements adaptation and optimization for new xlang #2230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 19, 2025
278 changes: 238 additions & 40 deletions go/fury/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,6 @@ func (b *ByteBuffer) WriteInt32(value int32) {
binary.LittleEndian.PutUint32(b.data[b.writerIndex:], uint32(value))
b.writerIndex += 4
}
func (b *ByteBuffer) WriteVarUint32(value uint32) error {
// Ensure enough capacity (max 5 bytes for varint32)
b.grow(5)

// Varint encoding
for value >= 0x80 {
b.data[b.writerIndex] = byte(value) | 0x80
b.writerIndex++
value >>= 7
}
b.data[b.writerIndex] = byte(value)
b.writerIndex++

return nil
}

func (b *ByteBuffer) WriteLength(value int) {
b.grow(4)
Expand Down Expand Up @@ -336,37 +321,250 @@ func (b *ByteBuffer) ReadVarInt32() int32 {
return result
}

type BufferObject interface {
TotalBytes() int
WriteTo(buf *ByteBuffer)
ToBuffer() *ByteBuffer
}

// WriteVarint64 writes the zig-zag encoded varint
func (b *ByteBuffer) WriteVarint64(value int64) {
u := uint64((value << 1) ^ (value >> 63))
b.WriteVarUint64(u)
}

// WriteVarUint64 writes to unsigned varint (up to 9 bytes)
func (b *ByteBuffer) WriteVarUint64(value uint64) {
b.grow(9)
offset := b.writerIndex
data := b.data[offset : offset+9]

i := 0
for ; i < 8; i++ {
data[i] = byte(value & 0x7F)
value >>= 7
if value == 0 {
i++
break
}
data[i] |= 0x80
}
if i == 8 {
data[8] = byte(value)
i = 9
}
b.writerIndex += i
}

// ReadVarint64 reads the varint encoded with zig-zag
func (b *ByteBuffer) ReadVarint64() int64 {
u := b.ReadVarUint64()
v := int64(u >> 1)
if u&1 != 0 {
v = ^v
}
return v
}

// ReadVarUint64 reads unsigned varint
func (b *ByteBuffer) ReadVarUint64() uint64 {
if b.remaining() >= 9 {
return b.readVarUint64Fast()
}
return b.readVarUint64Slow()
}

// Fast path (when the remaining bytes are sufficient)
func (b *ByteBuffer) readVarUint64Fast() uint64 {
data := b.data[b.readerIndex:]
var result uint64
var readLength int

b0 := data[0]
result = uint64(b0 & 0x7F)
if b0 < 0x80 {
readLength = 1
} else {
b1 := data[1]
result |= uint64(b1&0x7F) << 7
if b1 < 0x80 {
readLength = 2
} else {
b2 := data[2]
result |= uint64(b2&0x7F) << 14
if b2 < 0x80 {
readLength = 3
} else {
b3 := data[3]
result |= uint64(b3&0x7F) << 21
if b3 < 0x80 {
readLength = 4
} else {
b4 := data[4]
result |= uint64(b4&0x7F) << 28
if b4 < 0x80 {
readLength = 5
} else {
b5 := data[5]
result |= uint64(b5&0x7F) << 35
if b5 < 0x80 {
readLength = 6
} else {
b6 := data[6]
result |= uint64(b6&0x7F) << 42
if b6 < 0x80 {
readLength = 7
} else {
b7 := data[7]
result |= uint64(b7&0x7F) << 49
if b7 < 0x80 {
readLength = 8
} else {
b8 := data[8]
result |= uint64(b8) << 56
readLength = 9
}
}
}
}
}
}
}
}
b.readerIndex += readLength
return result
}

// Slow path (read byte by byte)
func (b *ByteBuffer) readVarUint64Slow() uint64 {
var result uint64
var shift uint
for {
byteVal := b.ReadUint8()
result |= (uint64(byteVal) & 0x7F) << shift
if byteVal < 0x80 {
break
}
shift += 7
if shift >= 64 {
panic("varuint64 overflow")
}
}
return result
}

// Auxiliary function
func (b *ByteBuffer) remaining() int {
return len(b.data) - b.readerIndex
}

func (b *ByteBuffer) ReadUint8() uint8 {
if b.readerIndex >= len(b.data) {
panic("buffer underflow")
}
v := b.data[b.readerIndex]
b.readerIndex++
return v
}

func (b *ByteBuffer) WriteVarint32(value int32) {
u := uint32((value << 1) ^ (value >> 31))
b.WriteVarUint32(u)
}

func (b *ByteBuffer) WriteVarUint32(value uint32) {
b.grow(5)
offset := b.writerIndex
data := b.data[offset : offset+5]

i := 0
for ; i < 4; i++ {
data[i] = byte(value & 0x7F)
value >>= 7
if value == 0 {
i++
break
}
data[i] |= 0x80
}
if i == 4 {
data[4] = byte(value)
i = 5
}
b.writerIndex += i
}

func (b *ByteBuffer) ReadVarint32() int32 {
u := b.ReadVarUint32()
v := int32(u >> 1)
if u&1 != 0 {
v = ^v
}
return v
}

func (b *ByteBuffer) ReadVarUint32() uint32 {
readerIndex := b.readerIndex
byte_ := uint32(b.data[readerIndex])
readerIndex++
result := byte_ & 0x7F
if (byte_ & 0x80) != 0 {
byte_ = uint32(b.data[readerIndex])
readerIndex++
result |= (byte_ & 0x7F) << 7
if (byte_ & 0x80) != 0 {
byte_ = uint32(b.data[readerIndex])
readerIndex++
result |= (byte_ & 0x7F) << 14
if (byte_ & 0x80) != 0 {
byte_ = uint32(b.data[readerIndex])
readerIndex++
result |= (byte_ & 0x7F) << 21
if (byte_ & 0x80) != 0 {
byte_ = uint32(b.data[readerIndex])
readerIndex++
result |= (byte_ & 0x7F) << 28
if b.remaining() >= 5 {
return b.readVarUint32Fast()
}
return b.readVarUint32Slow()
}

// Fast path reading (when the remaining bytes are sufficient)
func (b *ByteBuffer) readVarUint32Fast() uint32 {
data := b.data[b.readerIndex:]
var result uint32
var readLength int

b0 := data[0]
result = uint32(b0 & 0x7F)
if b0 < 0x80 {
readLength = 1
} else {
b1 := data[1]
result |= uint32(b1&0x7F) << 7
if b1 < 0x80 {
readLength = 2
} else {
b2 := data[2]
result |= uint32(b2&0x7F) << 14
if b2 < 0x80 {
readLength = 3
} else {
b3 := data[3]
result |= uint32(b3&0x7F) << 21
if b3 < 0x80 {
readLength = 4
} else {
b4 := data[4]
result |= uint32(b4&0x7F) << 28
readLength = 5
}
}
}
}
b.readerIndex = readerIndex
b.readerIndex += readLength
return result
}

type BufferObject interface {
TotalBytes() int
WriteTo(buf *ByteBuffer)
ToBuffer() *ByteBuffer
// Slow path reading (processing byte by byte)
func (b *ByteBuffer) readVarUint32Slow() uint32 {
var result uint32
var shift uint
for {
byteVal := b.ReadUint8()
result |= (uint32(byteVal) & 0x7F) << shift
if byteVal < 0x80 {
break
}
shift += 7
if shift >= 28 { // 32位最多需要5字节(28位)
panic("varuint32 overflow")
}
}
return result
}

func (b *ByteBuffer) PutUint8(writerIndex int, value uint8) {
b.data[writerIndex] = byte(value)
}
5 changes: 0 additions & 5 deletions go/fury/fury.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func (f *Fury) Serialize(buf *ByteBuffer, v interface{}, callback BufferCallback
}
if f.language != XLANG {
return fmt.Errorf("%d language is not supported", f.language)
buffer.WriteInt8(int8(GO))
} else {
if err := buffer.WriteByte(GO); err != nil {
return err
Expand All @@ -179,10 +178,6 @@ func (f *Fury) Write(buffer *ByteBuffer, v interface{}) (err error) {
buffer.WriteInt8(NullFlag)
case bool:
f.WriteBool(buffer, v)
case int32:
f.WriteInt32(buffer, v)
case int64:
f.WriteInt64(buffer, v)
case float64:
f.WriteFloat64(buffer, v)
case float32:
Expand Down
9 changes: 6 additions & 3 deletions go/fury/fury_xlang_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
// specific language governing permissions and limitations
// under the License.

//go:build skiptest
// +build skiptest

package fury_test

import (
Expand Down Expand Up @@ -228,6 +225,8 @@ type ComplexObject2 struct {
}

func TestSerializeSimpleStruct(t *testing.T) {
// Temporarily disabled
t.Skip()
fury_ := fury.NewFury(true)
require.Nil(t, fury_.RegisterTagType("test.ComplexObject2", ComplexObject2{}))
obj2 := &ComplexObject2{}
Expand All @@ -237,6 +236,8 @@ func TestSerializeSimpleStruct(t *testing.T) {
}

func TestSerializeComplexStruct(t *testing.T) {
// Temporarily disabled
t.Skip()
fury_ := fury.NewFury(true)
require.Nil(t, fury_.RegisterTagType("test.ComplexObject1", ComplexObject1{}))
require.Nil(t, fury_.RegisterTagType("test.ComplexObject2", ComplexObject2{}))
Expand Down Expand Up @@ -286,6 +287,8 @@ func structRoundBack(t *testing.T, fury_ *fury.Fury, obj interface{}, testName s
}

func TestOutOfBandBuffer(t *testing.T) {
// Temporarily disabled
t.Skip()
fury_ := fury.NewFury(true)
var data [][]byte
for i := 0; i < 10; i++ {
Expand Down
Loading
Loading