Skip to content

Decoding and Encoding Enhancements & Fixes #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 91 additions & 46 deletions decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/nu7hatch/gouuid"
)

var UnsupportedType = errors.New("KDB-Decoder: type is unsupported")

var typeSize = map[int8]int{
1: 1, 4: 1, 10: 1,
2: 16,
Expand Down Expand Up @@ -137,30 +139,59 @@ func Uncompress(b []byte) (dst []byte) {

// Decodes data from src in q ipc format.
func Decode(src *bufio.Reader) (data *K, msgtype int, e error) {
var header ipcHeader
e = binary.Read(src, binary.LittleEndian, &header)
header, e := DecodeHeader(src)
if e != nil {
return nil, -1, errors.New("Failed to read message header:" + e.Error())
}
// try to buffer entire message in one go
src.Peek(int(header.MsgSize - 8))
return DecodeData(src, header)
}

func DecodeHeader(src *bufio.Reader) (*ipcHeader, error) {
var header ipcHeader
e := binary.Read(src, binary.LittleEndian, &header)
if e != nil {
return nil, e
}
if !header.ok() {
return nil, -1, errors.New("header is invalid")
return nil, errors.New("header is invalid")
}
return &header, nil
}

func DecodeRaw(src *bufio.Reader) ([]byte, *ipcHeader, error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it for decoding from disk/files? Do you have example for this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, I used it to bypass all parsing (apart from the header) and have the code pass-through all data while incurring minimal overhead.
This comment was in direct reference to this commit:

I'm not sure how people feel about commit 5975019; it adds in the functionality to encode and decode directly to/from bytes without having a connection open. I found this functionality useful, but it might be too use-case specific.

Additionally, you can turn a byte slice into a bufio.Reader using: bufio.NewReader(bytes.NewReader(byteSlice))

headerRaw := make([]byte, 8)
binary.Read(src, binary.LittleEndian, &headerRaw)
header, e := DecodeHeader(bufio.NewReader(bytes.NewReader(headerRaw)))
if e != nil {
return nil, header, errors.New("Failed to read message header:" + e.Error())
}
// try to buffer entire message in one go
src.Peek(int(header.MsgSize - 8))
dataRaw := make([]byte, header.MsgSize-8)
_, e = io.ReadFull(src, dataRaw)
if e != nil {
return nil, header, errors.New("Decode:read error - " + e.Error())
}
return append(headerRaw, dataRaw...), header, nil
}

func DecodeData(src *bufio.Reader, header *ipcHeader) (data *K, msgtype int, e error) {
var order = header.getByteOrder()

if header.Compressed == 0x01 {
compressed := make([]byte, header.MsgSize-8)
_, e = io.ReadFull(src, compressed)
buf := make([]byte, header.MsgSize-8)
_, e = io.ReadFull(src, buf)
if e != nil {
return nil, int(header.RequestType), errors.New("Decode:readcompressed error - " + e.Error())
return nil, int(header.RequestType), errors.New("Decode:read error - " + e.Error())
}
var uncompressed = Uncompress(compressed)
var buf = bufio.NewReader(bytes.NewReader(uncompressed[8:]))
data, e = readData(buf, order)
return data, int(header.RequestType), e
var uncompressed = Uncompress(buf)
data, e = readData(bufio.NewReader(bytes.NewReader(uncompressed[8:])), order)
} else {
data, e = readData(src, order)
}
data, e = readData(src, order)

return data, int(header.RequestType), e
}

Expand Down Expand Up @@ -190,7 +221,7 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) {
binary.Read(r, order, &sh)
return &K{msgtype, NONE, sh}, nil

case -KI, -KD, -KU, -KV:
case -KI, -KU, -KV, -KT:
var i int32
binary.Read(r, order, &i)
return &K{msgtype, NONE, i}, nil
Expand All @@ -202,7 +233,7 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) {
var e float32
binary.Read(r, order, &e)
return &K{msgtype, NONE, e}, nil
case -KF, -KZ:
case -KF:
var f float64
binary.Read(r, order, &f)
return &K{msgtype, NONE, f}, nil
Expand All @@ -222,6 +253,15 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) {
var ts time.Duration
binary.Read(r, order, &ts)
return &K{msgtype, NONE, qEpoch.Add(ts)}, nil
case -KZ:
var ts float64
binary.Read(r, order, &ts)
d := time.Duration(86400000*ts) * time.Millisecond
return &K{msgtype, NONE, qEpoch.Add(d)}, nil
case -KD:
var d int32
binary.Read(r, order, &d)
return &K{msgtype, NONE, qEpoch.Add(time.Duration(d) * 24 * time.Hour)}, nil
case -KM:
var m Month
binary.Read(r, order, &m)
Expand Down Expand Up @@ -262,6 +302,7 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) {
if msgtype == KC {
return &K{msgtype, vecattr, string(arr.([]byte))}, nil
}

if msgtype == KP {
arr := arr.([]time.Duration)
var timearr = make([]time.Time, veclen)
Expand All @@ -270,15 +311,6 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) {
}
return &K{msgtype, vecattr, timearr}, nil
}
if msgtype == KD {
arr := arr.([]int32)
var timearr = make([]time.Time, veclen)
for i := 0; i < int(veclen); i++ {
d := time.Duration(arr[i]) * 24 * time.Hour
timearr[i] = qEpoch.Add(d)
}
return &K{msgtype, vecattr, timearr}, nil
}
if msgtype == KZ {
arr := arr.([]float64)
var timearr = make([]time.Time, veclen)
Expand All @@ -288,32 +320,42 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) {
}
return &K{msgtype, vecattr, timearr}, nil
}
if msgtype == KU {
arr := arr.([]int32)
var timearr = make([]Minute, veclen)
for i := 0; i < int(veclen); i++ {
d := time.Duration(arr[i]) * time.Minute
timearr[i] = Minute(time.Time{}.Add(d))
}
return &K{msgtype, vecattr, timearr}, nil
}
if msgtype == KV {
arr := arr.([]int32)
var timearr = make([]Second, veclen)
for i := 0; i < int(veclen); i++ {
d := time.Duration(arr[i]) * time.Second
timearr[i] = Second(time.Time{}.Add(d))
}
return &K{msgtype, vecattr, timearr}, nil
}
if msgtype == KT {
if msgtype == KD {
arr := arr.([]int32)
var timearr = make([]Time, veclen)
var timearr = make([]time.Time, veclen)
for i := 0; i < int(veclen); i++ {
timearr[i] = Time(qEpoch.Add(time.Duration(arr[i]) * time.Millisecond))
d := time.Duration(arr[i]) * 24 * time.Hour
timearr[i] = qEpoch.Add(d)
}
return &K{msgtype, vecattr, timearr}, nil
}
} /*
// These were removed due to incorrectly handling negative values
if msgtype == KU {
arr := arr.([]int32)
var timearr = make([]Minute, veclen)
for i := 0; i < int(veclen); i++ {
d := time.Duration(arr[i]) * time.Minute
timearr[i] = Minute(time.Time{}.Add(d))
}
return &K{msgtype, vecattr, timearr}, nil
}
if msgtype == KV {
arr := arr.([]int32)
var timearr = make([]Second, veclen)
for i := 0; i < int(veclen); i++ {
d := time.Duration(arr[i]) * time.Second
timearr[i] = Second(time.Time{}.Add(d))
}
return &K{msgtype, vecattr, timearr}, nil
}
if msgtype == KT {
arr := arr.([]int32)
var timearr = make([]Time, veclen)
for i := 0; i < int(veclen); i++ {
timearr[i] = Time(qEpoch.Add(time.Duration(arr[i]) * time.Millisecond))
}
return &K{msgtype, vecattr, timearr}, nil
}*/
return &K{msgtype, vecattr, arr}, nil
case K0:
var vecattr Attr
Expand Down Expand Up @@ -420,11 +462,14 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) {
}
}
return &K{msgtype, NONE, res}, nil
//These were not being decoded correctly
case KEACH, KOVER, KSCAN, KPRIOR, KEACHRIGHT, KEACHLEFT:
return readData(r, order)
_, _ = readData(r, order)
return nil, UnsupportedType
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not really an error and decoder should keep going. it does need wrapping to K struct though
&K{msgtype,NONE,readData(r,order)} (except readData should be handled separately

case KDYNLOAD:
// 112 - dynamic load
return nil, errors.New("type is unsupported")
_, _ = readData(r, order)
return nil, UnsupportedType
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not possible to send 112 over ipc and there is no need to do readData - it is likely mistake somewhere else. Should be more of 'protocol error'

case KERR:
line, err := r.ReadSlice(0)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func TestCharArray(t *testing.T) {
var TimestampAsBytes = []byte{0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0xf4, 0x28, 0xbf, 0xce, 0x27, 0x35, 0xec, 0xe9, 0x07}
var TimestampAsTime = time.Date(2018, 1, 26, 1, 49, 0, 884361000, time.UTC)
var TimestampAsInt64 = int64(570246540884361000)
var DatetimeAsTime = time.Date(2013, 6, 10, 22, 03, 49, 713000000, time.UTC)
var DateAsTime = time.Date(2013, 6, 10, 0, 0, 0, 00000000, time.UTC)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add couple of examples with negative data/datetime/timestamp where it breaks?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with negatives I was referring to was regarding datatypes U, V, and T.
Each one is treated as a duration of time by the q interpreter but the decode function was turning them into a time object.
So, for example, sending -01:00 will result in it being decoded as 23:00


func TestTimestampEpoch(t *testing.T) {
d := TimestampAsTime
Expand Down
70 changes: 52 additions & 18 deletions encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,22 @@ func writeData(dbuf io.Writer, order binary.ByteOrder, data *K) (err error) {
binary.Write(dbuf, order, []byte(tosend[i]))
binary.Write(dbuf, order, byte(0))
}
case -KB:
tosend := data.Data.(bool)
binary.Write(dbuf, order, int8(data.Type))
var val byte
if tosend {
val = 0x01
} else {
val = 0x00
}
binary.Write(dbuf, order, val)
case -KI, -KJ, -KE, -KF, -UU:
case -KB, -KG, -KI, -KJ, -KE, -KF, -KU, -KV, -KT, -KM, -KN, -KH, -UU, -KC,
KFUNCUP, KFUNCBP, KFUNCTR:
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, data.Data)
case -KP:
tosend := data.Data.(time.Time)
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, tosend.Sub(qEpoch))
case -KZ:
tosend := data.Data.(time.Time)
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, -1*(float64(qEpoch.Sub(tosend)/time.Millisecond)/86400000))
case -KD:
tosend := data.Data.(time.Time)
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, -1*(int32(qEpoch.Sub(tosend)/time.Hour)/24))
case KP:
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, data.Attr) // attributes
Expand All @@ -70,6 +69,22 @@ func writeData(dbuf io.Writer, order binary.ByteOrder, data *K) (err error) {
for _, ts := range tosend {
binary.Write(dbuf, order, ts.Sub(qEpoch))
}
case KZ:
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, data.Attr) // attributes
binary.Write(dbuf, order, int32(reflect.ValueOf(data.Data).Len()))
tosend := data.Data.([]time.Time)
for _, ts := range tosend {
binary.Write(dbuf, order, -1*(float64(qEpoch.Sub(ts)/time.Millisecond)/86400000))
}
case KD:
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, data.Attr) // attributes
binary.Write(dbuf, order, int32(reflect.ValueOf(data.Data).Len()))
tosend := data.Data.([]time.Time)
for _, ts := range tosend {
binary.Write(dbuf, order, -1*(int32(qEpoch.Sub(ts)/time.Hour)/24))
}
case KB:
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, data.Attr) // attributes
Expand All @@ -79,7 +94,7 @@ func writeData(dbuf io.Writer, order binary.ByteOrder, data *K) (err error) {
for _, b := range tosend {
binary.Write(dbuf, order, boolmap[b])
}
case KG, KI, KJ, KE, KF, KZ, KT, KD, KV, KU, KM, KN, UU:
case KG, KI, KJ, KE, KF, KU, KV, KT, KM, KN, KH, UU:
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, data.Attr) // attributes
binary.Write(dbuf, order, int32(reflect.ValueOf(data.Data).Len()))
Expand Down Expand Up @@ -108,7 +123,7 @@ func writeData(dbuf io.Writer, order binary.ByteOrder, data *K) (err error) {
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, []byte(tosend.Error()))
binary.Write(dbuf, order, byte(0))
case KFUNC:
case KFUNC, KOVER, KSCAN, KEACH, KPRIOR, KEACHRIGHT, KEACHLEFT:
tosend := data.Data.(Function)
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, []byte(tosend.Namespace))
Expand All @@ -117,6 +132,16 @@ func writeData(dbuf io.Writer, order binary.ByteOrder, data *K) (err error) {
if err != nil {
return err
}
case KPROJ, KCOMP:
tosend := data.Data.([]interface{})
binary.Write(dbuf, order, int8(data.Type))
binary.Write(dbuf, order, int32(len(tosend)))
for i := 0; i < len(tosend); i++ {
err = writeData(dbuf, order, &K{tosend[i].(*K).Type, NONE, tosend[i].(*K).Data})
if err != nil {
return err
}
}
default:
return errors.New("unknown type " + strconv.Itoa(int(data.Type)))
}
Expand Down Expand Up @@ -205,19 +230,28 @@ func Compress(b []byte) (dst []byte) {
return dst[:d:d]
}

// Encode data to ipc format as msgtype(sync/async/response) to specified writer
func Encode(w io.Writer, msgtype int, data *K) (err error) {
func EncodeRaw(msgtype int, data *K) ([]byte, error) {
var order = binary.LittleEndian
dbuf := new(bytes.Buffer)
err = writeData(dbuf, order, data)
err := writeData(dbuf, order, data)
if err != nil {
return err
return nil, err
}
msglen := uint32(8 + dbuf.Len())
var header = ipcHeader{1, byte(msgtype), 0, 0, msglen}
buf := new(bytes.Buffer)
err = binary.Write(buf, order, header)
err = binary.Write(buf, order, dbuf.Bytes())
_, err = w.Write(Compress(buf.Bytes()))
return buf.Bytes(), nil
}

// Encode data to ipc format as msgtype(sync/async/response) to specified writer
func Encode(w io.Writer, msgtype int, data *K) (err error) {
buf, err := EncodeRaw(msgtype, data)
if err != nil {
return nil
}
_, err = w.Write(Compress(buf))
//_, err = w.Write(buf.Bytes())
return err
}
4 changes: 2 additions & 2 deletions encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ var encodingTests = []struct {
{"`a`b!enlist each 2 3", NewDict(SymbolV([]string{"a", "b"}),
&K{K0, NONE, []*K{{KI, NONE, []int32{2}}, {KI, NONE, []int32{3}}}}),
DictWithVectorsBytes},
{"1#2013.06.10T22:03:49.713", &K{KZ, NONE, []float64{4909.9193253819449}}, DateTimeVecBytes},
{"1#2013.06.10", &K{KD, NONE, []int32{4909}}, DateVecBytes},
{"1#2013.06.10T22:03:49.713", &K{KZ, NONE, []time.Time{DatetimeAsTime}}, DateTimeVecBytes},
{"1#2013.06.10", &K{KD, NONE, []time.Time{DateAsTime}}, DateVecBytes},
{"1#21:53:37.963", &K{KT, NONE, []int32{78817963}}, TimeVecBytes},
{"21:22:01 + 1 2", &K{KV, NONE, []int32{76922, 76923}}, SecondVecBytes},
{"21:22*til 2", &K{KU, NONE, []int32{0, 1282}}, MinuteVecBytes},
Expand Down
11 changes: 7 additions & 4 deletions kdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"time"
)
Expand Down Expand Up @@ -37,8 +36,12 @@ func (c *KDBConn) ok() bool {
return c.con != nil
}

// process clients requests
func HandleClientConnection(conn net.Conn) {
func (c *KDBConn) Conn() net.Conn {
return c.con
}

// Previous listen handler, kept for posterity
/*func HandleClientConnection(conn net.Conn) {
c := conn.(*net.TCPConn)
c.SetKeepAlive(true)
c.SetNoDelay(true)
Expand All @@ -64,7 +67,7 @@ func HandleClientConnection(conn net.Conn) {
// don't respond
i++
}
}
}*/

// Make synchronous call to kdb+ similar to h(func;arg1;arg2;...)
func (c *KDBConn) Call(cmd string, args ...*K) (data *K, err error) {
Expand Down
Loading