diff --git a/cmd/jocko/main.go b/cmd/jocko/main.go index 4eb3694d..333ff8cf 100644 --- a/cmd/jocko/main.go +++ b/cmd/jocko/main.go @@ -74,7 +74,7 @@ func run(cmd *cobra.Command, args []string) { Param: 1, }, Reporter: &jaegercfg.ReporterConfig{ - LogSpans: true, + //LogSpans: true, }, } diff --git a/commitlog/reader.go b/commitlog/reader.go index a884773e..678c6511 100644 --- a/commitlog/reader.go +++ b/commitlog/reader.go @@ -2,8 +2,11 @@ package commitlog import ( "io" + "os" "sync" + "github.com/travisjeffery/jocko/log" + "github.com/pkg/errors" ) @@ -14,6 +17,17 @@ type Reader struct { pos int64 } +func (r *Reader) FileAndOffset() (*os.File, int64, int) { + r.mu.Lock() + defer r.mu.Unlock() + + segments := r.cl.Segments() + segment := segments[r.idx] + file := segment.File() + //todo size + size := 0 + return file, r.pos, size +} func (r *Reader) Read(p []byte) (n int, err error) { r.mu.Lock() defer r.mu.Unlock() @@ -56,13 +70,55 @@ func (l *CommitLog) NewReader(offset int64, maxBytes int32) (io.Reader, error) { if s == nil { return nil, errors.Wrapf(ErrSegmentNotFound, "segments: %d, offset: %d", len(l.Segments()), offset) } + //TODO: offset relative? + offset = offset - s.BaseOffset e, err := s.findEntry(offset) if err != nil { return nil, err } + { + log.Info.Printf("entry: %+v err: %v", e, err) + idx := s.Index + log.Info.Printf("idx: %p idx.position %d mmap: %v \n", idx, idx.position, idx.mmap[0:idx.position]) + } return &Reader{ cl: l, idx: idx, pos: e.Position, }, nil } + +func (l *CommitLog) SendfileParams(offset int64, maxBytes int32) (*os.File, int64, int, error) { + var s *Segment + var idx int + if offset == 0 { + // TODO: seems hackish, should at least check if segments are set. + s, idx = l.Segments()[0], 0 + } else { + s, idx = findSegment(l.Segments(), offset) + } + if s == nil { + return nil, 0, 0, errors.Wrapf(ErrSegmentNotFound, "segments: %d, offset: %d", len(l.Segments()), offset) + } + //TODO: offset relative? + offset = offset - s.BaseOffset + e, err := s.findEntry(offset) + if err != nil { + return nil, 0, 0, err + } + { + log.Info.Printf("entry: %+v err: %v", e, err) + idx := s.Index + log.Info.Printf("idx: %p idx.position %d mmap: %v \n", idx, idx.position, idx.mmap[0:idx.position]) + } + file := s.File() + _ = idx + //todo : calculate fileOffset and sendSize + fileOffset := int64(0) + sendSize := s.Position + // log.Info.Println("logfile:", file.Name(), + // "fileOffset", fileOffset, + // "sendSize", sendSize) + + return file, fileOffset, int(sendSize), nil +} diff --git a/commitlog/segment.go b/commitlog/segment.go index a9ac360d..4599ee65 100644 --- a/commitlog/segment.go +++ b/commitlog/segment.go @@ -143,6 +143,11 @@ func (s *Segment) IsFull() bool { defer s.Unlock() return s.Position >= s.maxBytes } +func (s *Segment) File() *os.File { + s.Lock() + defer s.Unlock() + return s.log +} // Write writes a byte slice to the log at the current position. // It increments the offset as well as sets the position to the new tail. @@ -214,10 +219,10 @@ func (s *Segment) findEntry(offset int64) (e *Entry, err error) { s.Lock() defer s.Unlock() e = &Entry{} - n := int(s.Index.bytes / entryWidth) + n := int(s.Index.position) / entryWidth idx := sort.Search(n, func(i int) bool { _ = s.Index.ReadEntryAtFileOffset(e, int64(i*entryWidth)) - return e.Offset >= offset || e.Offset == 0 + return e.Offset >= offset }) if idx == n { return nil, errors.New("entry not found") diff --git a/commitlog/sendfile.go b/commitlog/sendfile.go new file mode 100644 index 00000000..3ace0807 --- /dev/null +++ b/commitlog/sendfile.go @@ -0,0 +1,65 @@ +package commitlog + +import ( + "log" + "net" + "os" + "syscall" +) + +const maxSendfileSize int = 4 << 20 + +func Sendfile(conn *net.TCPConn, file *os.File, offsetInt int64, size int, chunkSize int) (int, error) { + offset := &offsetInt + written := 0 + var remain int = size + n := chunkSize + if chunkSize > maxSendfileSize { + chunkSize = maxSendfileSize + } + src := int(file.Fd()) + rawConn, err := conn.SyscallConn() + rawConn.Write(func(dst uintptr) bool { + defer func() { log.Println("returned") }() + for remain > 0 { + if n > remain { + n = remain + } + var err1 error + log.Println("params:", n, "offset:", *offset) + //todo: for bsd and darwin, pass different offset + n, err1 = syscall.Sendfile(int(dst), src, offset, n) + log.Println("after:", n, "offset:", *offset) + if err1 != nil { + log.Println("sent error:", err1.Error()) + } + if n > 0 { + written += n + remain -= n + } else if n == 0 && err1 == nil { + break + } + if err1 == syscall.EAGAIN || err1 == syscall.EWOULDBLOCK { + + if n == -1 { + n = chunkSize + } + log.Println("got eagain") + return false + // waitpread, gopark + } + if err1 != nil { + // This includes syscall.ENOSYS (no kernel + // support) and syscall.EINVAL (fd types which + // don't implement sendfile) + err = err1 + break + } + } + return true + }) + log.Println("written", written) + + return written, err + +} diff --git a/go.mod b/go.mod index 23c6b504..03e8a2d2 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/travisjeffery/jocko go 1.12 require ( - github.com/Shopify/sarama v1.13.0 + github.com/Shopify/sarama v1.27.0 github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 @@ -13,12 +13,12 @@ require ( github.com/cespare/xxhash v1.0.0 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd github.com/davecgh/go-spew v1.1.1 - github.com/eapache/go-resiliency v1.0.0 - github.com/eapache/go-xerial-snappy v0.0.0-20160609142408-bb955e01b934 - github.com/eapache/queue v1.0.2 + github.com/eapache/go-resiliency v1.2.0 + github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 + github.com/eapache/queue v1.1.0 github.com/go-kit/kit v0.6.0 github.com/golang/protobuf v1.2.0 - github.com/golang/snappy v0.0.0-20170215233205-553a64147049 + github.com/golang/snappy v0.0.1 github.com/hashicorp/consul v1.0.3 github.com/hashicorp/errwrap v1.0.0 github.com/hashicorp/go-immutable-radix v1.0.0 @@ -36,7 +36,7 @@ require ( github.com/miekg/dns v1.0.14 github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77 github.com/opentracing/opentracing-go v1.0.2 - github.com/pierrec/lz4 v1.0.1 + github.com/pierrec/lz4 v2.5.2+incompatible github.com/pierrec/xxHash v0.1.1 github.com/pkg/errors v0.8.1 github.com/pmezard/go-difflib v1.0.0 @@ -44,19 +44,19 @@ require ( github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a - github.com/rcrowley/go-metrics v0.0.0-20161128210544-1f30fe9094a5 + github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 github.com/satori/go.uuid v1.2.0 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 github.com/spf13/cobra v0.0.1 github.com/spf13/pflag v1.0.0 - github.com/stretchr/testify v1.3.0 + github.com/stretchr/testify v1.6.0 github.com/tj/go-gracefully v0.0.0-20141227061038-005c1d102f1b github.com/travisjeffery/go-dynaport v0.0.0-20171203090423-24009f4f2f49 github.com/tysontate/gommap v0.0.0-20131202084435-e87a6e482c2c github.com/uber/jaeger-client-go v2.11.2+incompatible github.com/uber/jaeger-lib v1.3.1 github.com/ugorji/go v0.0.0-20180112141927-9831f2c3ac10 - golang.org/x/net v0.0.0-20181201002055-351d144fa1fc - golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 + golang.org/x/net v0.0.0-20200528225125-3c3fba18258b + golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd upspin.io v0.0.0-20180517055408-63f1073c7a3a ) diff --git a/go.sum b/go.sum index 6758dd73..d1971fac 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,9 @@ github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/Shopify/sarama v1.13.0 h1:R+4WFsmMzUxN2uiGzWXoY9apBAQnARC+B+wYvy/kC3k= github.com/Shopify/sarama v1.13.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/sarama v1.27.0 h1:tqo2zmyzPf1+gwTTwhI6W+EXDw4PVSczynpHKFtVAmo= +github.com/Shopify/sarama v1.27.0/go.mod h1:aCdj6ymI8uyPEux1JJ9gcaDT6cinjGhNCAhs54taSUo= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:Fv9bK1Q+ly/ROk4aJsVMeuIwPel4bEnD8EPiI91nZMg= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -27,17 +30,27 @@ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6D github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/go-resiliency v1.0.0 h1:XPZo5qMI0LGzIqT9wRq6dPv2vEuo9MWCar1wHY8Kuf4= github.com/eapache/go-resiliency v1.0.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20160609142408-bb955e01b934 h1:oGLoaVIefp3tiOgi7+KInR/nNPvEpPM6GFo+El7fd14= github.com/eapache/go-xerial-snappy v0.0.0-20160609142408-bb955e01b934/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.0.2 h1:jRJXCx6uciOfN69MfZCC9EZlGRqqHhwlyb6GBeNow+c= github.com/eapache/queue v1.0.2/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y= github.com/go-kit/kit v0.6.0 h1:wTifptAGIyIuir4bRyN4h7+kAa2a4eepLYVmRe5qqQ8= github.com/go-kit/kit v0.6.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/golang/protobuf v0.0.0-20170920220647-130e6b02ab05 h1:Kesru7U6Mhpf/x7rthxAKnr586VFmoE2NdEvkOKvfjg= @@ -46,8 +59,12 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk= github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/hashicorp/consul v1.0.3 h1:gjO3raNGyG5vTv8OqyXVmT2ELBHJCgh2CBfoR+oV1/Q= github.com/hashicorp/consul v1.0.3/go.mod h1:mFrjN1mfidgJfYP1xrJCF+AfRhr6Eaqhb2+sfyn/OOI= github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce h1:prjrVgOk2Yg6w+PflHoszQNLTUh4kaByUcEWM/9uin4= @@ -80,6 +97,8 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.0.0-20160813221303-0a025b7e63ad h1:eMxs9EL0PvIGS9TTtxg4R+JxuPGav82J8rA+GFnY7po= github.com/hashicorp/golang-lru v0.0.0-20160813221303-0a025b7e63ad/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -106,6 +125,15 @@ github.com/hashicorp/serf v0.0.0-20171222050535-911b3cc29e11/go.mod h1:h/Ru6tmZa github.com/hashicorp/serf v0.8.5 h1:ZynDUIQiA8usmRgPdGPHFdPnb1wgGI9tK3mO9hcAJjc= github.com/hashicorp/serf v0.8.5/go.mod h1:UpNcs7fFbpKIyZaUuSW6EPiH+eZC7OuyFD+wc1oal+k= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/klauspost/compress v1.10.10 h1:a/y8CglcM7gLGYmlbP/stPE5sR3hbhFRUjCBfd/0B3I= +github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc= @@ -120,12 +148,16 @@ github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceT github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77 h1:7GoSOOW2jpsfkntVKaS2rAr1TJqfcxotyaUcuxoZSzg= github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pierrec/lz4 v1.0.1 h1:w6GMGWSsCI04fTM8wQRdnW74MuJISakuUU0onU0TYB4= github.com/pierrec/lz4 v1.0.1/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= +github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/xxHash v0.1.1 h1:KP4NrV9023xp3M4FkTYfcXqWigsOCImL1ANJ7sh5vg4= github.com/pierrec/xxHash v0.1.1/go.mod h1:w2waW5Zoa/Wc4Yqe0wgrIYAGKqRMf7czn2HNKXmuL+I= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= @@ -153,6 +185,8 @@ github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nL github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rcrowley/go-metrics v0.0.0-20161128210544-1f30fe9094a5 h1:gwcdIpH6NU2iF8CmcqD+CP6+1CkRBOhHaPR+iu6raBY= github.com/rcrowley/go-metrics v0.0.0-20161128210544-1f30fe9094a5/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= +github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= @@ -168,6 +202,8 @@ github.com/stretchr/testify v1.1.4/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.0 h1:jlIyCplCJFULU/01vCkhKuTyc3OorI3bJFuw6obfgho= +github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tj/go-gracefully v0.0.0-20141227061038-005c1d102f1b h1:EjOGYW5FYZMUVtXjWl01EY359uYLfzTcOkxuNTcKZYc= github.com/tj/go-gracefully v0.0.0-20141227061038-005c1d102f1b/go.mod h1:uqlTeGUUfRdQvlQGkv+DYe3lLST3DionEwMA9YAYibY= github.com/travisjeffery/go-dynaport v0.0.0-20171203090423-24009f4f2f49 h1:K+L347hjHiye7Xijn7oLHC+nIdXd0Z5cYrp2zt+ZrFk= @@ -181,24 +217,55 @@ github.com/uber/jaeger-lib v1.3.1 h1:QaTh7g9oG56uB4I2MiwJbh/svRjHhZogAiQozBzxL3g github.com/uber/jaeger-lib v1.3.1/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go v0.0.0-20180112141927-9831f2c3ac10 h1:4zp+5ElNBLy5qmaDFrbVDolQSOtPmquw+W6EMNEpi+k= github.com/ugorji/go v0.0.0-20180112141927-9831f2c3ac10/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg= go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3 h1:KYQXGkl6vs02hK7pK4eIbw0NpNPedieTSTEiJ//bwGs= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 h1:cg5LA/zNPRzIXIWSCxQW10Rvpy94aQh3LT/ShoCpkHw= +golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01 h1:po1f06KS05FvIQQA2pMuOWZAUXiy1KYdIf0ElUU2Hhc= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc h1:a3CU5tJYVj92DY2LaA1kUkrsqD5/3mLDhx2NcNqyW+0= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200528225125-3c3fba18258b h1:IYiJPiJfzktmDAO1HQiwjMjwjlYKHAL7KzeD544RJPs= +golang.org/x/net v0.0.0-20200528225125-3c3fba18258b/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20171006175012-ebfc5b463182/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:uPxWBzB3+mlnjy9W58qY1j/cjyFjutgw/Vhan2zLy/A= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= +gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg= +gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200601152816-913338de1bd2 h1:VEmvx0P+GVTgkNu2EdTN988YCZPcD3lo9AoczZpucwc= +gopkg.in/yaml.v3 v3.0.0-20200601152816-913338de1bd2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= upspin.io v0.0.0-20180517055408-63f1073c7a3a h1:SmRV4ptPhupMzU2o5cWgX9FmOwGLP135Z7jihhmvgtk= upspin.io v0.0.0-20180517055408-63f1073c7a3a/go.mod h1:4hdXTXkMPXxzbiw/sultoifpccn98hChAFvrU19V2ug= diff --git a/jocko/broker.go b/jocko/broker.go index c89c6a4c..08304361 100644 --- a/jocko/broker.go +++ b/jocko/broker.go @@ -87,6 +87,8 @@ type Broker struct { shutdownCh chan struct{} shutdown bool shutdownLock sync.Mutex + + groupMetadataCache GroupMetadataCache } // New is used to instantiate a new broker. @@ -99,7 +101,9 @@ func NewBroker(config *config.Config, tracer opentracing.Tracer) (*Broker, error replicaLookup: NewReplicaLookup(), reconcileCh: make(chan serf.Member, 32), tracer: tracer, - logStateInterval: time.Millisecond * 250, + logStateInterval: time.Second * 800, + + groupMetadataCache: NewGroupMetadataCache(), } if err := b.setupRaft(); err != nil { @@ -125,14 +129,13 @@ func NewBroker(config *config.Config, tracer opentracing.Tracer) (*Broker, error // Broker API. // Run starts a loop to handle requests send back responses. -func (b *Broker) Run(ctx context.Context, requests <-chan *Context, responses chan<- *Context) { - for { - select { - case reqCtx := <-requests: +func (b *Broker) Run(ctx context.Context, reqCtx *Context) *Context { + { + { log.Debug.Printf("broker/%d: request: %v", b.config.ID, reqCtx) if reqCtx == nil { - goto DONE + return nil } queueSpan, ok := reqCtx.Value(requestQueueSpanKey).(opentracing.Span) @@ -146,6 +149,10 @@ func (b *Broker) Run(ctx context.Context, requests <-chan *Context, responses ch case *protocol.ProduceRequest: res = b.handleProduce(reqCtx, req) case *protocol.FetchRequest: + if b.config.UseSendfile { + b.handleFetchSendFile(reqCtx, req) + return nil + } res = b.handleFetch(reqCtx, req) case *protocol.OffsetsRequest: res = b.handleOffsets(reqCtx, req) @@ -191,22 +198,19 @@ func (b *Broker) Run(ctx context.Context, requests <-chan *Context, responses ch queueSpan = b.tracer.StartSpan("broker: queue response", opentracing.ChildOf(parentSpan.Context())) responseCtx := context.WithValue(reqCtx, responseQueueSpanKey, queueSpan) - responses <- &Context{ + return &Context{ parent: responseCtx, - conn: reqCtx.conn, + Conn: reqCtx.Conn, header: reqCtx.header, res: &protocol.Response{ CorrelationID: reqCtx.header.CorrelationID, Body: res, }, } - case <-ctx.Done(): - goto DONE + } } -DONE: - log.Debug.Printf("broker/%d: run done", b.config.ID) - return + } // Join is used to have the broker join the gossip ring. @@ -264,9 +268,8 @@ func (b *Broker) handleCreateTopic(ctx *Context, reqs *protocol.CreateTopicReque } continue } - err := b.withTimeout(reqs.Timeout, func() protocol.Error { - return b.createTopic(ctx, req) - }) + err := b.createTopic(ctx, req) + res.TopicErrorCodes[i] = &protocol.TopicErrorCode{ Topic: req.Topic, ErrorCode: err.Code(), @@ -402,6 +405,7 @@ func (b *Broker) handleOffsets(ctx *Context, req *protocol.OffsetsRequest) *prot res.Responses[i].PartitionResponses = append(res.Responses[i].PartitionResponses, pres) } } + return res } @@ -558,6 +562,7 @@ func (b *Broker) handleMetadata(ctx *Context, req *protocol.MetadataRequest) *pr return res } +//todo: when to create OffsetsTopicName? func (b *Broker) handleFindCoordinator(ctx *Context, req *protocol.FindCoordinatorRequest) *protocol.FindCoordinatorResponse { sp := span(ctx, b.tracer, "find coordinator") defer sp.Finish() @@ -601,64 +606,224 @@ ERROR: return res } +func MatchingMember( + group *structs.Group, + memberId string, + protocols []*protocol.GroupProtocol) bool { + member, ok := group.Members[memberId] + if !ok { + return false + } + if len(member.SupportedProtocols) != len(protocols) { + log.Debug.Printf("metadatacheck: %s len differ", memberId) + return false + } + for _, protocol := range protocols { + meta, ok := member.SupportedProtocols[protocol.ProtocolName] + if !ok { + log.Debug.Printf("metadatacheck: %s not found", memberId) + return false + } + if bytes.Compare(meta, protocol.ProtocolMetadata) != 0 { + log.Debug.Printf("metadatacheck: %s changed", memberId) + return false + } + } + log.Debug.Printf("metadatacheck: %s same", memberId) + return true +} +func HasCommonProtocolWithEachMember( + reqProtos []*protocol.GroupProtocol, + group *structs.Group) bool { + for _, p := range reqProtos { + name := p.ProtocolName + supported := true + for _, m := range group.Members { + if _, ok := m.SupportedProtocols[name]; !ok { + supported = false + break + } + } + if supported { + return true + } + } + return false -func (b *Broker) handleJoinGroup(ctx *Context, r *protocol.JoinGroupRequest) *protocol.JoinGroupResponse { - sp := span(ctx, b.tracer, "join group") - defer sp.Finish() - - res := &protocol.JoinGroupResponse{} - res.APIVersion = r.Version() - - // // TODO: distribute this. - state := b.fsm.State() - - _, group, err := state.GetGroup(r.GroupID) - if err != nil { - log.Error.Printf("broker/%d: get group error: %s", b.config.ID, err) - res.ErrorCode = protocol.ErrUnknown.Code() - return res +} +func findCommonClientProtocol(group *structs.Group) string { + allProtos := make([]string, 0) + for _, m := range group.Members { + for name, _ := range m.SupportedProtocols { + allProtos = append(allProtos, name) + } } - // TODO: only try to create the group if the group is not unknown AND - // the member id is UNKNOWN, if member is specified but group does not - // exist we should reject the request - if group == nil { - // group doesn't exist so let's create it - group = &structs.Group{ - Group: r.GroupID, - Coordinator: b.config.ID, - Members: make(map[string]structs.Member), + for _, proto := range allProtos { + supportedByAll := true + for _, m := range group.Members { + _, ok := m.SupportedProtocols[proto] + if !ok { + supportedByAll = false + break + } + } + if supportedByAll { + return proto } } + return "" +} +func addMember(group *structs.Group, MemberID string, protos []*protocol.GroupProtocol, SessionTimeout, RebalanceTimeout int32) *structs.Member { + member := &structs.Member{ + ID: MemberID, + SupportedProtocols: make(map[string][]byte), + JoinChan: make(chan protocol.JoinGroupResponse, 1), + } + log.Debug.Printf("sessiontimeout: %d RebalanceTimeout %d", SessionTimeout, RebalanceTimeout) + member.SessionTimeout = SessionTimeout + member.RebalanceTimeout = RebalanceTimeout + for _, gp := range protos { + member.SupportedProtocols[gp.ProtocolName] = gp.ProtocolMetadata + } + group.Members[MemberID] = member + return member +} + +//var memberid int64 = 233 + +func (b *Broker) handleJoinGroup(ctx *Context, r *protocol.JoinGroupRequest) *protocol.JoinGroupResponse { + sp := span(ctx, b.tracer, "join group") + defer sp.Finish() + newMember := false if r.MemberID == "" { + newMember = true // for group member IDs -- can replace with something else r.MemberID = ctx.Header().ClientID + "-" + uuid.NewV1().String() - group.Members[r.MemberID] = structs.Member{ID: r.MemberID} - } - if group.LeaderID == "" { - group.LeaderID = r.MemberID - } - _, err = b.raftApply(structs.RegisterGroupRequestType, structs.RegisterGroupRequest{ - Group: *group, - }) - if err != nil { - log.Error.Printf("broker/%d: register group error: %s", b.config.ID, err) - res.ErrorCode = protocol.ErrUnknown.Code() - return res + //r.MemberID = strconv.FormatInt(atomic.LoadInt64(&memberid), 10) + //atomic.AddInt64(&memberid, 1) } + log.Info.Println("joingroup r.memberid: ", r.MemberID, newMember) - res.GenerationID = 0 - res.LeaderID = group.LeaderID - res.MemberID = r.MemberID + // TODO: only try to create the group if the group is not unknown AND + // the member id is UNKNOWN, if member is specified but group does not + // exist we should reject the request + var waitingChan chan protocol.JoinGroupResponse + var immediateResponse *protocol.JoinGroupResponse + group := b.groupMetadataCache.GetOrCreateGroup(r.GroupID, b.config.ID) + group.InLock(func() { + if len(group.Members) != 0 && + !HasCommonProtocolWithEachMember(r.GroupProtocols, group) { + immediateResponse = &protocol.JoinGroupResponse{ + ErrorCode: protocol.ErrInconsistentGroupProtocol.Code(), + } + return + } + switch group.State { + case structs.GroupStateDead: + immediateResponse = &protocol.JoinGroupResponse{ + ErrorCode: protocol.ErrUnknownMemberId.Code(), + } + return + case structs.GroupStateCompletingRebalance: + if MatchingMember(group, r.MemberID, r.GroupProtocols) { + log.Debug.Println("matching member, continue to get group id") + return + } + //join group from new member or existing member with updated metadata => PreparingRebalance + member := addMember(group, r.MemberID, r.GroupProtocols, r.SessionTimeout, r.RebalanceTimeout) + waitingChan = member.JoinChan + group.State = structs.GroupStatePreparingRebalance + //todo: repeat groupstatepreparingbalance logic + log.Debug.Printf("CompletingRebalance transition to preparing rebalance,"+ + "members len:%d memberid %s leaderid %s, generationid %d", + len(group.Members), r.MemberID, group.LeaderID, group.GenerationID) + case structs.GroupStateStable: + //respond to join group from followers with matching metadata with current group metadata + if MatchingMember(group, r.MemberID, r.GroupProtocols) && group.LeaderID != r.MemberID { + return + } + //leader join-group received => PreparingRebalance + member := addMember(group, r.MemberID, r.GroupProtocols, r.SessionTimeout, r.RebalanceTimeout) + waitingChan = member.JoinChan + group.State = structs.GroupStatePreparingRebalance + group.LeaderID = "" + log.Debug.Printf("stable transition to preparing rebalance,"+ + "members len:%d memberid %s leaderid %s, generationid %d", + len(group.Members), r.MemberID, group.LeaderID, group.GenerationID) + case structs.GroupStatePreparingRebalance: + if existing, ok := group.Members[r.MemberID]; ok { + log.Info.Println("existing member join preparingbalance", r.MemberID, " ", existing.JoinChan == nil) + //recreate joinchan + existing.JoinChan = make(chan protocol.JoinGroupResponse, 1) + waitingChan = existing.JoinChan + } else { + member := addMember(group, r.MemberID, r.GroupProtocols, r.SessionTimeout, r.RebalanceTimeout) + waitingChan = member.JoinChan + log.Debug.Printf("stay preparing rebalance,"+ + "members len:%d memberid %s leaderid %s, generationid %d", + len(group.Members), r.MemberID, group.LeaderID, group.GenerationID) + } + if group.LeaderID == "" { + group.LeaderID = r.MemberID + } + if group.AllKnownMemberJoined() { + var commonProtocol string = findCommonClientProtocol(group) + if len(commonProtocol) == 0 { + panic("no common protocol,shouldnt be here") + } + group.State = structs.GroupStateCompletingRebalance + group.GenerationID++ + log.Debug.Printf("transition to completing rebalance,"+ + "members len:%d memberid %s leaderid %s, common protocol protocol %s, generation %d", + len(group.Members), r.MemberID, group.LeaderID, commonProtocol, group.GenerationID) + for memberId, member := range group.Members { + res := protocol.JoinGroupResponse{ + APIVersion: r.Version(), + GroupProtocol: commonProtocol, + } + if group.LeaderID == memberId { + for _, m := range group.Members { + metadata := m.SupportedProtocols[commonProtocol] + res.Members = append(res.Members, protocol.Member{ + MemberID: m.ID, + MemberMetadata: metadata, + }) + } + log.Debug.Printf("leader res members len: %d", len(res.Members)) + for i := range res.Members { + log.Debug.Printf("leader res meta %s: %d", + res.Members[i].MemberID, + len(res.Members[i].MemberMetadata)) + } - if res.LeaderID == res.MemberID { - // fill in members on response, we only do this for the leader to reduce overhead - for _, m := range group.Members { - res.Members = append(res.Members, protocol.Member{MemberID: m.ID, MemberMetadata: m.Metadata}) + } + res.GenerationID = group.GenerationID + res.LeaderID = group.LeaderID + res.MemberID = memberId + ch := member.JoinChan + member.JoinChan = nil + ch <- res + } + } else { + log.Info.Println("continue waiting ", r.MemberID) + } + return } + }) + if immediateResponse != nil { + immediateResponse.APIVersion = r.Version() + return immediateResponse } + timer := time.NewTimer(time.Duration(r.RebalanceTimeout) * 1e6) + defer timer.Stop() - return res + select { + case joinRes := <-waitingChan: + return &joinRes + case <-timer.C: + return &protocol.JoinGroupResponse{ErrorCode: protocol.ErrRequestTimedOut.Code()} + } } func (b *Broker) handleLeaveGroup(ctx *Context, r *protocol.LeaveGroupRequest) *protocol.LeaveGroupResponse { @@ -694,6 +859,14 @@ func (b *Broker) handleLeaveGroup(ctx *Context, r *protocol.LeaveGroupRequest) * res.ErrorCode = protocol.ErrUnknown.Code() return res } + group = b.groupMetadataCache.GetGroup(r.GroupID) + if group != nil { + group.InLock(func() { + group.Members = make(map[string]*structs.Member) + group.LeaderID = "" + group.State = structs.GroupStatePreparingRebalance + }) + } return res } @@ -701,86 +874,149 @@ func (b *Broker) handleLeaveGroup(ctx *Context, r *protocol.LeaveGroupRequest) * func (b *Broker) handleSyncGroup(ctx *Context, r *protocol.SyncGroupRequest) *protocol.SyncGroupResponse { sp := span(ctx, b.tracer, "sync group") defer sp.Finish() - - state := b.fsm.State() + log.Error.Println("syncgroup r.memberid: ", r.MemberID) res := &protocol.SyncGroupResponse{} res.APIVersion = r.Version() - _, group, err := state.GetGroup(r.GroupID) - if err != nil { - res.ErrorCode = protocol.ErrUnknown.Code() - return res - } + group := b.groupMetadataCache.GetOrCreateGroup(r.GroupID, b.config.ID) if group == nil { + log.Error.Println("ErrInvalidGroupId", r.MemberID) res.ErrorCode = protocol.ErrInvalidGroupId.Code() return res } - if _, ok := group.Members[r.MemberID]; !ok { - res.ErrorCode = protocol.ErrUnknownMemberId.Code() - return res - } - if r.GenerationID != group.GenerationID { - res.ErrorCode = protocol.ErrIllegalGeneration.Code() - return res - } - switch group.State { - case structs.GroupStateEmpty, structs.GroupStateDead: - res.ErrorCode = protocol.ErrUnknownMemberId.Code() - return res - case structs.GroupStatePreparingRebalance: - res.ErrorCode = protocol.ErrRebalanceInProgress.Code() - return res - case structs.GroupStateCompletingRebalance: - // TODO: wait to get member in group + var syncChan chan bool + var rebalanceTimeout int32 +RETRY_GET_ASSIGNMENT: + + group.InLock(func() { - if group.LeaderID == r.MemberID { + if _, ok := group.Members[r.MemberID]; !ok { + log.Error.Println("ErrUnknownMemberId", r.MemberID) + res.ErrorCode = protocol.ErrUnknownMemberId.Code() + return + } + if r.GenerationID != group.GenerationID { + log.Error.Println("ErrIllegalGeneration", r.GenerationID, group.GenerationID, r.MemberID) + res.ErrorCode = protocol.ErrIllegalGeneration.Code() + return + } + switch group.State { + case structs.GroupStateEmpty, structs.GroupStateDead: + res.ErrorCode = protocol.ErrUnknownMemberId.Code() + return + case structs.GroupStatePreparingRebalance: + log.Error.Println("ErrRebalanceInProgress", r.MemberID) + res.ErrorCode = protocol.ErrRebalanceInProgress.Code() + return + case structs.GroupStateCompletingRebalance: + // TODO: wait to get member in group // if is leader, attempt to persist state and transition to stable - var assignment []protocol.GroupAssignment - for _, ga := range r.GroupAssignments { - if _, ok := group.Members[ga.MemberID]; !ok { - // if member isn't set fill in with empty assignment - assignment = append(assignment, protocol.GroupAssignment{ - MemberID: ga.MemberID, - MemberAssignment: nil, - }) - } else { - assignment = append(assignment, ga) + if group.LeaderID == r.MemberID { + log.Error.Println("syncgroup leader r.memberid: ", r.MemberID) + // take the assignments from the leader and save them + for _, ga := range r.GroupAssignments { + //log.Debug.Printf("group assignment %s len%d", ga.MemberID, len(ga.MemberAssignment)) + if m, ok := group.Members[ga.MemberID]; ok { + m.Assignment = ga.MemberAssignment + group.Members[ga.MemberID] = m + if m.SyncChan != nil { + m.SyncChan <- true + } + if r.MemberID == ga.MemberID { + // return leader's own assignment + res.MemberAssignment = m.Assignment + } + } else { + panic("sync group: unknown member") + } } - + log.Debug.Printf( + "leader done assignment %s GenerationID %d lengroups %d", + r.MemberID, + group.GenerationID, + len(r.GroupAssignments)) + //todo:cg should stable // save group + group.State = structs.GroupStateStable + _, err := b.raftApply(structs.RegisterGroupRequestType, structs.RegisterGroupRequest{ + Group: *group, + }) + if err != nil { + res.ErrorCode = protocol.ErrUnknown.Code() + return + } + group.Members[r.MemberID].LastHeartbeat = time.Now().UnixNano() + sessionTimeout := group.Members[r.MemberID].SessionTimeout + go checkMemberHeartbeat(group, r.MemberID, sessionTimeout) + } else { + log.Debug.Printf("syncgroup park follower %s %d\n", r.MemberID, len(group.Members)) + if m, ok := group.Members[r.MemberID]; ok { + m.SyncChan = make(chan bool, 1) + syncChan = m.SyncChan + rebalanceTimeout = m.RebalanceTimeout + } else { + panic(fmt.Errorf("sync group: unknown member: %s", r.MemberID)) + } + } + case structs.GroupStateStable: + log.Debug.Printf("stable, follower %s %d\n", r.MemberID, len(group.Members)) + if m, ok := group.Members[r.MemberID]; ok { + m.LastHeartbeat = time.Now().UnixNano() + log.Debug.Printf("follower got assignment %d", len(m.Assignment)) + res.MemberAssignment = m.Assignment + syncChan = nil + m.SyncChan = nil + go checkMemberHeartbeat(group, r.MemberID, m.SessionTimeout) + } else { + panic(fmt.Errorf("sync group: unknown member: %s", r.MemberID)) } } - case structs.GroupStateStable: - // in stable, return current assignment + }) + if syncChan != nil { + ch := syncChan + syncChan = nil + select { + case <-time.After(time.Duration(int64(rebalanceTimeout) * 1e6)): + return &protocol.SyncGroupResponse{ + ErrorCode: protocol.ErrRequestTimedOut.Code()} + case <-ch: + goto RETRY_GET_ASSIGNMENT + } } - - if group.LeaderID == r.MemberID { - // take the assignments from the leader and save them - for _, ga := range r.GroupAssignments { - if m, ok := group.Members[ga.MemberID]; ok { - m.Assignment = ga.MemberAssignment - } else { - panic("sync group: unknown member") + log.Debug.Printf("SyncGroupResponse %s", r.MemberID) + return res +} +func checkMemberHeartbeat(group *structs.Group, memberId string, + sessionTimeout int32) { + for { + shouldQuit := true + time.Sleep(time.Duration(sessionTimeout) * 1e6) + group.InLock(func() { + if group.State != structs.GroupStateStable { + return } - } - _, err = b.raftApply(structs.RegisterGroupRequestType, structs.RegisterGroupRequest{ - Group: *group, + member, ok := group.Members[memberId] + if !ok { + return + } + log.Debug.Printf("checking member: %s %d %d", memberId, member.LastHeartbeat, sessionTimeout) + if member.LastHeartbeat != 0 && + (time.Now().UnixNano()-member.LastHeartbeat)/1e6 > + int64(sessionTimeout) { + log.Info.Printf("transition to preparingbalance, delete member %s", memberId) + delete(group.Members, memberId) + group.State = structs.GroupStatePreparingRebalance + group.LeaderID = "" + return + } + shouldQuit = false }) - if err != nil { - res.ErrorCode = protocol.ErrUnknown.Code() - return res - } - } else { - // TODO: need to wait until leader sets assignments - if m, ok := group.Members[r.MemberID]; ok { - res.MemberAssignment = m.Assignment - } else { - panic(fmt.Errorf("sync group: unknown member: %s", r.MemberID)) + log.Debug.Printf("member check heartbeat %s %v", memberId, shouldQuit) + if shouldQuit { + return } } - - return res } func (b *Broker) handleHeartbeat(ctx *Context, r *protocol.HeartbeatRequest) *protocol.HeartbeatResponse { @@ -789,20 +1025,38 @@ func (b *Broker) handleHeartbeat(ctx *Context, r *protocol.HeartbeatRequest) *pr res := &protocol.HeartbeatResponse{} res.APIVersion = r.Version() - - state := b.fsm.State() - _, group, err := state.GetGroup(r.GroupID) - if err != nil { - res.ErrorCode = protocol.ErrUnknown.Code() - return res - } + group := b.groupMetadataCache.GetGroup(r.GroupID) if group == nil { res.ErrorCode = protocol.ErrInvalidGroupId.Code() return res } - // TODO: need to handle case when rebalance is in process - - res.ErrorCode = protocol.ErrNone.Code() + log.Debug.Printf("heartbeat %s, generation %d", r.MemberID, r.GroupGenerationID) + group.InLock(func() { + if group.GenerationID != r.GroupGenerationID { + res.ErrorCode = protocol.ErrIllegalGeneration.Code() + log.Debug.Printf("heartbeat %s illegal generation", r.MemberID) + return + } + switch group.State { + case structs.GroupStateDead, + structs.GroupStateEmpty: + res.ErrorCode = protocol.ErrUnknownMemberId.Code() + return + case structs.GroupStatePreparingRebalance, + structs.GroupStateCompletingRebalance: + member, ok := group.Members[r.MemberID] + if ok { + member.LastHeartbeat = time.Now().UnixNano() + } + log.Debug.Printf("heartbeat %s return RebalanceInProgress", r.MemberID) + res.ErrorCode = protocol.ErrRebalanceInProgress.Code() + return + case structs.GroupStateStable: + member := group.Members[r.MemberID] + member.LastHeartbeat = time.Now().UnixNano() + return + } + }) return res } @@ -825,6 +1079,7 @@ func (b *Broker) handleFetch(ctx *Context, r *protocol.FetchRequest) *protocol.F err := b.withTimeout(r.MaxWaitTime, func() protocol.Error { replica, err := b.replicaLookup.Replica(topic.Topic, p.Partition) if err != nil { + log.Error.Printf("err:%s", err.Error()) return protocol.ErrReplicaNotAvailable } if replica.Partition.Leader != b.config.ID { @@ -834,13 +1089,18 @@ func (b *Broker) handleFetch(ctx *Context, r *protocol.FetchRequest) *protocol.F return protocol.ErrReplicaNotAvailable } rdr, rdrErr := replica.Log.NewReader(p.FetchOffset, p.MaxBytes) + hasData := true if rdrErr != nil { - log.Error.Printf("broker/%d: replica log read error: %s", b.config.ID, rdrErr) - return protocol.ErrUnknown.WithErr(rdrErr) + if strings.Contains(rdrErr.Error(), commitlog.ErrSegmentNotFound.Error()) { + hasData = false + } else { + log.Error.Printf("broker/%d: replica log read error: %s", b.config.ID, rdrErr) + return protocol.ErrUnknown.WithErr(rdrErr) + } } buf := new(bytes.Buffer) var n int32 - for n < r.MinBytes { + for n < r.MinBytes && hasData { // TODO: copy these bytes to outer bytes nn, err := io.Copy(buf, rdr) if err != nil && err != io.EOF { @@ -962,7 +1222,6 @@ func (b *Broker) handleOffsetFetch(ctx *Context, req *protocol.OffsetFetchReques res := new(protocol.OffsetFetchResponse) res.APIVersion = req.Version() - res.Responses = make([]protocol.OffsetFetchTopicResponse, len(req.Topics)) // state := b.fsm.State() @@ -973,7 +1232,26 @@ func (b *Broker) handleOffsetFetch(ctx *Context, req *protocol.OffsetFetchReques // // TODO: handle err // panic(err) // } + state := b.fsm.State() + for _, topic := range req.Topics { + offsetFetchTopicResponse := protocol.OffsetFetchTopicResponse{ + Topic: topic.Topic, + } + var offsetPartitions []protocol.OffsetFetchPartition + _, tt, err := state.GetTopic(topic.Topic) + if err != nil { + return res + } + for partitionNum, _ := range tt.Partitions { + offsetPartitions = append(offsetPartitions, + protocol.OffsetFetchPartition{ + Partition: partitionNum, + }) + } + offsetFetchTopicResponse.Partitions = offsetPartitions + res.Responses = append(res.Responses, offsetFetchTopicResponse) + } return res } @@ -1012,7 +1290,7 @@ func (b *Broker) startReplica(replica *Replica) protocol.Error { if replica.Log == nil { log, err := commitlog.New(commitlog.Options{ - Path: filepath.Join(b.config.DataDir, "data", fmt.Sprintf("%d", replica.Partition.ID)), + Path: filepath.Join(b.config.DataDir, "data", topic.Topic, fmt.Sprintf("%d", replica.Partition.ID)), MaxSegmentBytes: 1024, MaxLogBytes: -1, CleanupPolicy: commitlog.CleanupPolicy(topic.Config.GetValue("cleanup.policy").(string)), @@ -1029,6 +1307,7 @@ func (b *Broker) startReplica(replica *Replica) protocol.Error { // createTopic is used to create the topic across the cluster. func (b *Broker) createTopic(ctx *Context, topic *protocol.CreateTopicRequest) protocol.Error { + log.Debug.Printf("creating topic %+v", topic) state := b.fsm.State() _, t, _ := state.GetTopic(topic.Topic) if t != nil { @@ -1039,9 +1318,11 @@ func (b *Broker) createTopic(ctx *Context, topic *protocol.CreateTopicRequest) p return err } tt := structs.Topic{ + ID: topic.Topic, Topic: topic.Topic, Partitions: make(map[int32][]int32), } + for _, partition := range ps { tt.Partitions[partition.ID] = partition.AR } @@ -1098,7 +1379,10 @@ func (b *Broker) buildPartitions(topic string, partitionsCount int32, replicatio count := len(brokers) if int(replicationFactor) > count { - return nil, protocol.ErrInvalidReplicationFactor + //replicationFactor = int16(count) + //TODO tolerate this, + //or change defaultconfig in main.go when started? + //return nil, protocol.ErrInvalidReplicationFactor } // container/ring is dope af diff --git a/jocko/broker_test.go b/jocko/broker_test.go index aba19ea9..f620ebfb 100644 --- a/jocko/broker_test.go +++ b/jocko/broker_test.go @@ -301,6 +301,129 @@ func TestBroker_Run(t *testing.T) { } }, }, + { + name: "create two topics then produce one msg for each then fetch from one", + args: args{ + requestCh: make(chan *Context, 2), + responseCh: make(chan *Context, 2), + requests: []*Context{ + { + header: &protocol.RequestHeader{CorrelationID: 1}, + req: &protocol.CreateTopicRequests{ + Timeout: 100 * time.Millisecond, + Requests: []*protocol.CreateTopicRequest{{ + Topic: "test-topic1", + NumPartitions: 1, + ReplicationFactor: 1, + }}}, + }, + { + header: &protocol.RequestHeader{CorrelationID: 2}, + req: &protocol.CreateTopicRequests{ + Timeout: 100 * time.Millisecond, + Requests: []*protocol.CreateTopicRequest{{ + Topic: "test-topic2", + NumPartitions: 1, + ReplicationFactor: 1, + }}}, + }, + { + header: &protocol.RequestHeader{CorrelationID: 3}, + req: &protocol.ProduceRequest{ + Timeout: 100 * time.Millisecond, + TopicData: []*protocol.TopicData{{ + Topic: "test-topic1", + Data: []*protocol.Data{{ + RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message1.")}}})}}}, + }}, + }, + { + header: &protocol.RequestHeader{CorrelationID: 4}, + req: &protocol.ProduceRequest{ + Timeout: 100 * time.Millisecond, + TopicData: []*protocol.TopicData{{ + Topic: "test-topic2", + Data: []*protocol.Data{{ + RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message2.")}}})}}}, + }}, + }, + { + header: &protocol.RequestHeader{CorrelationID: 5}, + req: &protocol.FetchRequest{ + MaxWaitTime: 100 * time.Millisecond, + ReplicaID: 1, + MinBytes: 5, + Topics: []*protocol.FetchTopic{ + { + Topic: "test-topic1", + Partitions: []*protocol.FetchPartition{{Partition: 0, + FetchOffset: 0, + MaxBytes: 100, + }}, + }, + }}, + }, + }, + responses: []*Context{ + { + header: &protocol.RequestHeader{CorrelationID: 1}, + res: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ + TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "test-topic1", ErrorCode: protocol.ErrNone.Code()}}, + }}, + }, + { + header: &protocol.RequestHeader{CorrelationID: 2}, + res: &protocol.Response{CorrelationID: 2, Body: &protocol.CreateTopicsResponse{ + TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "test-topic2", ErrorCode: protocol.ErrNone.Code()}}, + }}, + }, + { + header: &protocol.RequestHeader{CorrelationID: 3}, + res: &protocol.Response{CorrelationID: 3, Body: &protocol.ProduceResponse{ + Responses: []*protocol.ProduceTopicResponse{ + { + Topic: "test-topic1", + PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, BaseOffset: 0, ErrorCode: protocol.ErrNone.Code()}}, + }, + }, + }}, + }, + { + header: &protocol.RequestHeader{CorrelationID: 4}, + res: &protocol.Response{CorrelationID: 4, Body: &protocol.ProduceResponse{ + Responses: []*protocol.ProduceTopicResponse{ + { + Topic: "test-topic2", + PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, BaseOffset: 0, ErrorCode: protocol.ErrNone.Code()}}, + }, + }, + }}, + }, + { + header: &protocol.RequestHeader{CorrelationID: 5}, + res: &protocol.Response{CorrelationID: 5, Body: &protocol.FetchResponse{ + Responses: protocol.FetchTopicResponses{{ + Topic: "test-topic1", + PartitionResponses: []*protocol.FetchPartitionResponse{{ + Partition: 0, + ErrorCode: protocol.ErrNone.Code(), + HighWatermark: 0, + RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message1.")}}}), + }}, + }}}, + }, + }, + }, + }, + handle: func(t *testing.T, _ *Broker, ctx *Context) { + switch res := ctx.res.(*protocol.Response).Body.(type) { + // handle timestamp explicitly since we don't know what + // it'll be set to + case *protocol.ProduceResponse: + handleProduceResponse(t, res) + } + }, + }, { name: "metadata", args: args{ @@ -489,17 +612,12 @@ func TestBroker_Run(t *testing.T) { } } - go b.Run(ctx, tt.args.requestCh, tt.args.responseCh) - for i := 0; i < len(tt.args.requests); i++ { request := tt.args.requests[i] reqSpan := b.tracer.StartSpan("request", opentracing.ChildOf(span.Context())) ctx := &Context{header: request.header, req: request.req, parent: opentracing.ContextWithSpan(runCtx, reqSpan)} - - tt.args.requestCh <- ctx - - ctx = <-tt.args.responseCh + ctx = b.Run(ctx, ctx) if tt.handle != nil { tt.handle(t, b, ctx) @@ -545,11 +663,8 @@ func setupTest(t *testing.T) ( r.Fatal("server not added") } }) - - reqCh = make(chan *Context, 2) - resCh = make(chan *Context, 2) - - go b.Run(ctx, reqCh, resCh) + //todo + //go b.Run(ctx, reqCh, resCh) teardown = func() { close(reqCh) diff --git a/jocko/commitlog.go b/jocko/commitlog.go index 2e01ec86..da1ba34f 100644 --- a/jocko/commitlog.go +++ b/jocko/commitlog.go @@ -1,6 +1,9 @@ package jocko -import "io" +import ( + "io" + "os" +) type CommitLog interface { Delete() error @@ -9,4 +12,5 @@ type CommitLog interface { NewestOffset() int64 OldestOffset() int64 Append([]byte) (int64, error) + SendfileParams(offset int64, maxBytes int32) (*os.File, int64, int, error) } diff --git a/jocko/config/config.go b/jocko/config/config.go index c4e5cebe..aa420190 100644 --- a/jocko/config/config.go +++ b/jocko/config/config.go @@ -31,6 +31,7 @@ type Config struct { LeaveDrainTime time.Duration ReconcileInterval time.Duration OffsetsTopicReplicationFactor int16 + UseSendfile bool } // DefaultConfig creates/returns a default configuration. @@ -48,6 +49,7 @@ func DefaultConfig() *Config { LeaveDrainTime: 5 * time.Second, ReconcileInterval: 60 * time.Second, OffsetsTopicReplicationFactor: 3, + UseSendfile: true, } conf.SerfLANConfig.ReconnectTimeout = 3 * 24 * time.Hour diff --git a/jocko/consumer_group_cache.go b/jocko/consumer_group_cache.go new file mode 100644 index 00000000..4e22f407 --- /dev/null +++ b/jocko/consumer_group_cache.go @@ -0,0 +1,41 @@ +package jocko + +import ( + "sync" + + "github.com/travisjeffery/jocko/jocko/structs" +) + +type GroupMetadataCache interface { + GetOrCreateGroup(string, int32) *structs.Group + GetGroup(string) *structs.Group +} +type groupMetadataCache struct { + sync.Mutex + cache map[string]*structs.Group +} + +func NewGroupMetadataCache() GroupMetadataCache { + return &groupMetadataCache{ + cache: make(map[string]*structs.Group), + } +} +func (gmc *groupMetadataCache) GetOrCreateGroup(groupId string, coordinatorId int32) *structs.Group { + gmc.Lock() + defer gmc.Unlock() + if res, ok := gmc.cache[groupId]; ok { + return res + } + res := structs.NewGroup(groupId, coordinatorId) + gmc.cache[groupId] = res + return res +} + +func (gmc *groupMetadataCache) GetGroup(groupId string) *structs.Group { + gmc.Lock() + defer gmc.Unlock() + if res, ok := gmc.cache[groupId]; ok { + return res + } + return nil +} diff --git a/jocko/consumer_group_test.go b/jocko/consumer_group_test.go new file mode 100644 index 00000000..41aedf9e --- /dev/null +++ b/jocko/consumer_group_test.go @@ -0,0 +1,309 @@ +package jocko_test + +import ( + "context" + "fmt" + "log" + "os" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/require" + "github.com/travisjeffery/jocko/jocko" + "github.com/travisjeffery/jocko/jocko/config" +) + +//modified from https://godoc.org/github.com/Shopify/sarama#example-ConsumerGroup +func beginConsume(consumer *Consumer, brokers string, group string, topics string) { + config := sarama.NewConfig() + config.Version = sarama.V0_10_2_0 + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange + //config.Consumer.Group.Heartbeat.Interval = 300 * time.Millisecond + //config.Consumer.Group.Session.Timeout = 1000 * time.Millisecond + + config.Consumer.Offsets.Initial = sarama.OffsetNewest + + /** + * Setup a new Sarama consumer group + */ + + ctx, cancel := context.WithCancel(context.Background()) + client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) + if err != nil { + log.Panicf("Error creating consumer group client: %v", err) + } + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for { + // `Consume` should be called inside an infinite loop, when a + // server-side rebalance happens, the consumer session will need to be + // recreated to get the new claims + if err := client.Consume(ctx, strings.Split(topics, ","), consumer); err != nil { + log.Panicf("Error from consumer: %v", err) + } + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + return + } + consumer.ready = make(chan bool) + } + }() + + go func() { + <-consumer.ready // Await till the consumer has been set up + log.Println("Sarama consumer up and running!...") + var shouldClose bool //if shouldclose, actively leave group + select { + case <-ctx.Done(): + log.Println("terminating: context cancelled") + case shouldClose = <-consumer.QuitChan: + log.Println("quit chan") + } + cancel() + wg.Wait() + if shouldClose { + if err = client.Close(); err != nil { + log.Panicf("Error closing client: %v", err) + } + } + log.Println("closed consumer group", consumer.ID) + }() +} + +// Consumer represents a Sarama consumer group consumer +type Consumer struct { + ID int + ready chan bool + ClaimChan chan idAndClaim + currentSession sarama.ConsumerGroupSession + mu *sync.Mutex + MsgChan chan *sarama.ConsumerMessage + QuitChan chan bool +} +type idAndClaim struct { + ID int + claim sarama.ConsumerGroupClaim + + generation int32 +} + +// Setup is run at the beginning of a new session, before ConsumeClaim +func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { + // Mark the consumer as ready + close(consumer.ready) + return nil +} + +// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited +func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { + return nil +} + +// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). +func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + + // NOTE: + // Do not move the code below to a goroutine. + // The `ConsumeClaim` itself is called within a goroutine, see: + log.Printf("claim memberId %s id %d generation %d", session.MemberID(), consumer.ID, session.GenerationID()) + consumer.ClaimChan <- idAndClaim{claim: claim, ID: consumer.ID, generation: session.GenerationID()} + // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 + for message := range claim.Messages() { + log.Printf("consumer id %d, Message claimed: value = %s, timestamp = %v, topic = %s partition: %d, claim partition %d", consumer.ID, string(message.Value), message.Timestamp, message.Topic, message.Partition, claim.Partition()) + if claim.Partition() != message.Partition { + panic("claim partition does not match message partition " + strconv.Itoa(int(claim.Partition())) + "," + strconv.Itoa(int(message.Partition))) + } + consumer.MsgChan <- message + } + + return nil +} +func setupJocko(t *testing.T) (string, func([]string)) { + s1, dir1 := jocko.NewTestServer(t, func(cfg *config.Config) { + cfg.Bootstrap = true + }, nil) + ctx1, cancel1 := context.WithCancel((context.Background())) + err := s1.Start(ctx1) + require.NoError(t, err) + //jocko.WaitForLeader(t, s1) + return s1.Addr().String(), func([]string) { + cancel1() + os.RemoveAll(dir1) + s1.Shutdown() + } +} +func setupKafka(kafkaAddr string) (string, func([]string)) { + return kafkaAddr, func(topics []string) { + b := sarama.NewBroker(kafkaAddr) + config := sarama.NewConfig() + config.Version = sarama.V0_10_2_0 + err := b.Open(config) + if err != nil { + panic(err) + } + _, err = b.DeleteTopics(&sarama.DeleteTopicsRequest{ + Topics: topics, + }) + } +} +func createTopicSarama(t *testing.T, addr string, topicName string, partitionNum int32) { + b := sarama.NewBroker(addr) + config := sarama.NewConfig() + config.Version = sarama.V0_10_2_0 + err := b.Open(config) + if err != nil { + t.Fatal(err) + } + _, err = b.CreateTopics(&sarama.CreateTopicsRequest{ + TopicDetails: map[string]*sarama.TopicDetail{ + topicName: { + NumPartitions: partitionNum, + + ReplicationFactor: 1, + }, + }, + }) + if err != nil { + panic(err) + } +} +func TestConsumerGroupSarama(t *testing.T) { + JoinAndLeave(t, 10, 3) + JoinAndLeave(t, 10, 4) + JoinAndLeave(t, 10, 5) +} + +func JoinAndLeave(t *testing.T, partitionNum, consumerNum int32) { + var brokers string + var brokerCleanup func([]string) + if kafkaAddr := os.Getenv("KAFKA_ADDR"); len(kafkaAddr) > 0 { + log.Println("KAFKA_ADDR:", kafkaAddr) + brokers, brokerCleanup = setupKafka(kafkaAddr) + } else { + brokers, brokerCleanup = setupJocko(t) + } + time.Sleep(3 * time.Second) + consumers := make([]*Consumer, consumerNum) + + topics := "test-parti" + group := "testgroup" + defer brokerCleanup(strings.Split(topics, ",")) + + createTopicSarama(t, brokers, topics, partitionNum) + + for i := 0; i < len(consumers); i++ { + consumers[i] = &Consumer{ + ID: i, + ready: make(chan bool), + mu: &sync.Mutex{}, + ClaimChan: make(chan idAndClaim, partitionNum), + MsgChan: make(chan *sarama.ConsumerMessage, partitionNum), + QuitChan: make(chan bool), + } + } + claimAggregate := make(chan idAndClaim, 2*partitionNum) + for _, consumer := range consumers { + go func(id int, c chan idAndClaim) { + for claim := range c { + claimAggregate <- claim + } + }(consumer.ID, consumer.ClaimChan) + } + for i := 0; i < len(consumers); i++ { + go func(j int) { + beginConsume(consumers[j], brokers, group, topics) + }(i) + } + time.Sleep(20 * time.Second) + checkRangeAssignmentClaims(t, claimAggregate, partitionNum, consumerNum) + consumers[0].QuitChan <- true //actively send leave group request + time.Sleep(20 * time.Second) + checkRangeAssignmentClaims(t, claimAggregate, partitionNum, consumerNum-1) + consumers[1].QuitChan <- false //without leave group, coordinator will find this consumer heartbeat timed out and rebalance + time.Sleep(30 * time.Second) + checkRangeAssignmentClaims(t, claimAggregate, partitionNum, consumerNum-2) +} +func checkRangeAssignmentClaims(t *testing.T, claimAggregate chan idAndClaim, partitionNum int32, consumerNum int32) { + lastGenerationClaims, lastGeneration := getLastGeneration(claimAggregate) + partitionIDtoConsumerID := make(map[int32]int) + for _, claim := range lastGenerationClaims { + partitionIDtoConsumerID[claim.claim.Partition()] = claim.ID + } + if int(partitionNum) != len(partitionIDtoConsumerID) { + panic(fmt.Sprintf( + "claims does not contains all partitions "+ + "partitionNum %d consumerNum %d map: %v", + partitionNum, consumerNum, partitionIDtoConsumerID)) + } + + consumerIDtoPartionID := make(map[int]map[int32]bool) + for _, claim := range lastGenerationClaims { + var m map[int32]bool + var ok bool + if m, ok = consumerIDtoPartionID[claim.ID]; !ok { + m = make(map[int32]bool) + consumerIDtoPartionID[claim.ID] = m + } + m[claim.claim.Partition()] = true + } + q := int(partitionNum / consumerNum) + r := int(partitionNum % consumerNum) + // under range assignment + // r consumers each got q+1 partitions + // consumerNum-r consumers each got q partition + numQ := 0 + numQplus1 := 0 + for _, m := range consumerIDtoPartionID { + switch len(m) { + case q: + numQ++ + case q + 1: + numQplus1++ + } + } + isEven := r == numQplus1 && int(consumerNum)-r == numQ + log.Printf("assignments %v partitionNum:%d consumerNum: %d q:%d r:%d gen: %d %v", + consumerIDtoPartionID, partitionNum, consumerNum, q, r, lastGeneration, isEven) + if !isEven { + panic("partition not even") + } +} +func getLastGeneration(claimAggregate chan idAndClaim) ([]idAndClaim, int32) { + var claims []idAndClaim + //read until blocked + for { + blocked := false + select { + case msg := <-claimAggregate: + claims = append(claims, msg) + default: + blocked = true + break + } + if blocked { + break + } + } + log.Printf("%v len total claims %d", time.Now(), len(claims)) + //there may be a few rounds of rebalancing, so we only checks the last generation + lastGeneration := int32(0) + for _, claim := range claims { + if claim.generation > lastGeneration { + lastGeneration = claim.generation + } + } + var lastGenerationClaims []idAndClaim + for _, claim := range claims { + if claim.generation == lastGeneration { + lastGenerationClaims = append(lastGenerationClaims, claim) + } + } + return lastGenerationClaims, lastGeneration +} diff --git a/jocko/context.go b/jocko/context.go index e42c39ca..decd7705 100644 --- a/jocko/context.go +++ b/jocko/context.go @@ -3,7 +3,7 @@ package jocko import ( "context" "fmt" - "io" + "net" "sync" "time" @@ -12,7 +12,7 @@ import ( type Context struct { mu sync.Mutex - conn io.ReadWriter + Conn *net.TCPConn err error header *protocol.RequestHeader parent context.Context diff --git a/jocko/fetch_sendfile.go b/jocko/fetch_sendfile.go new file mode 100644 index 00000000..567ce499 --- /dev/null +++ b/jocko/fetch_sendfile.go @@ -0,0 +1,127 @@ +package jocko + +import ( + "net" + "time" + + "github.com/travisjeffery/jocko/commitlog" + "github.com/travisjeffery/jocko/protocol" +) + +func (b *Broker) handleFetchSendFile(ctx *Context, r *protocol.FetchRequest) { + sp := span(ctx, b.tracer, "fetch") + defer sp.Finish() + resp := protocol.Response{ + CorrelationID: ctx.header.CorrelationID, + } + conn := ctx.Conn + fres := &protocol.FetchResponse{ + Responses: make(protocol.FetchTopicResponses, len(r.Topics)), + } + msgSetLen := 0 + fres.APIVersion = r.APIVersion + maxBufSize := 0 + // calculate total length of message set + //TODO calc max buf length + maxBufSize = 1024 + for i, topic := range r.Topics { + fr := &protocol.FetchTopicResponse{ + Topic: topic.Topic, + PartitionResponses: make([]*protocol.FetchPartitionResponse, len(topic.Partitions)), + } + for j, p := range topic.Partitions { + fpres := &protocol.FetchPartitionResponse{} + fpres.Partition = p.Partition + replica, err := b.replicaLookup.Replica(topic.Topic, p.Partition) + if err != nil { + panic(err) + } + var rdrErr error + fpres.FileHandle, fpres.SendOffset, fpres.SendSize, rdrErr = replica.Log.SendfileParams(p.FetchOffset, p.MaxBytes) + if rdrErr != nil { + panic(rdrErr) + } + msgSetLen += fpres.SendSize + //get length of record + // + fr.PartitionResponses[j] = fpres + } + fres.Responses[i] = fr + } + lenEnc := new(protocol.LenEncoder) + err := fres.Encode(lenEnc) + if err != nil { + panic(err) + } + // set length field + resp.Size = int32(lenEnc.Length + msgSetLen) + err = sendRes(&resp, maxBufSize, fres, conn) + if err != nil { + panic(err) + } + return +} +func sendRes(resp *protocol.Response, + maxSize int, + r *protocol.FetchResponse, + conn *net.TCPConn) error { + b := make([]byte, maxSize) + e := protocol.NewByteEncoder(b) + // outer response + const correlationIDSize = 4 + e.PutInt32(resp.Size + correlationIDSize) + e.PutInt32(resp.CorrelationID) + //fetch response + var err error + if r.APIVersion >= 1 { + e.PutInt32(int32(r.ThrottleTime / time.Millisecond)) + } + + if err = e.PutArrayLength(len(r.Responses)); err != nil { + return err + } + for _, response := range r.Responses { + if err = e.PutString(response.Topic); err != nil { + return err + } + if err = e.PutArrayLength(len(response.PartitionResponses)); err != nil { + return err + } + for _, p := range response.PartitionResponses { + if err = sendResOfPartition(conn, p, e, r.APIVersion); err != nil { + return err + } + } + } + return nil +} +func sendResOfPartition( + conn *net.TCPConn, + r *protocol.FetchPartitionResponse, + e *protocol.ByteEncoder, + version int16) error { + e.PutInt32(r.Partition) + e.PutInt16(r.ErrorCode) + e.PutInt64(r.HighWatermark) + var err error + if version >= 4 { + e.PutInt64(r.LastStableOffset) + + if err = e.PutArrayLength(len(r.AbortedTransactions)); err != nil { + return err + } + for _, t := range r.AbortedTransactions { + t.Encode(e) + } + } + e.PutInt32(int32(r.SendSize)) + //log.Info.Println("encoder offset", e.GetOffset()) + conn.Write(e.Bytes()[:e.GetOffset()]) + e.SetOffset(0) + chunkSize := 4096 + if _, err = commitlog.Sendfile(conn, r.FileHandle, r.SendOffset, r.SendSize, chunkSize); err != nil { + return err + } + + return nil +} diff --git a/jocko/fsm/commands_test.go b/jocko/fsm/commands_test.go index 1ba136d1..bf445728 100644 --- a/jocko/fsm/commands_test.go +++ b/jocko/fsm/commands_test.go @@ -107,7 +107,7 @@ func TestRegisterGroup(t *testing.T) { } req := structs.RegisterGroupRequest{ - Group: structs.Group{Group: "group-id", Members: map[string]structs.Member{}}, + Group: structs.Group{Group: "group-id", Members: map[string]*structs.Member{}}, } buf, err := structs.Encode(structs.RegisterGroupRequestType, req) if err != nil { diff --git a/jocko/fsm/fsm_test.go b/jocko/fsm/fsm_test.go index c7bf450e..8bbea43c 100644 --- a/jocko/fsm/fsm_test.go +++ b/jocko/fsm/fsm_test.go @@ -215,7 +215,7 @@ func TestStore_RegisterGroup(t *testing.T) { t.Fatalf("err: %s, group: %v", err, p) } - if err := s.EnsureGroup(1, &structs.Group{Group: "test-group", Coordinator: coordinator, Members: map[string]structs.Member{"member": structs.Member{ID: "member"}}}); err != nil { + if err := s.EnsureGroup(1, &structs.Group{Group: "test-group", Coordinator: coordinator, Members: map[string]*structs.Member{"member": &structs.Member{ID: "member"}}}); err != nil { t.Fatalf("err: %s", err) } diff --git a/jocko/replicator.go b/jocko/replicator.go index 3bda3c98..345c1213 100644 --- a/jocko/replicator.go +++ b/jocko/replicator.go @@ -93,6 +93,9 @@ func (r *Replicator) fetchMessages() { if p.RecordSet == nil { goto BACKOFF } + if len(p.RecordSet) == 0 { + goto BACKOFF + } offset := int64(protocol.Encoding.Uint64(p.RecordSet[:8])) if offset > r.offset { r.msgs <- p.RecordSet diff --git a/jocko/server.go b/jocko/server.go index 67a806c9..8d9aafcf 100644 --- a/jocko/server.go +++ b/jocko/server.go @@ -34,7 +34,7 @@ func init() { // Broker is the interface that wraps the Broker's methods. type Handler interface { - Run(context.Context, <-chan *Context, chan<- *Context) + Run(context.Context, *Context) *Context Leave() error Shutdown() error } @@ -49,8 +49,6 @@ type Server struct { shutdownCh chan struct{} shutdownLock sync.Mutex metrics *Metrics - requestCh chan *Context - responseCh chan *Context tracer opentracing.Tracer close func() error } @@ -61,8 +59,6 @@ func NewServer(config *config.Config, handler Handler, metrics *Metrics, tracer handler: handler, metrics: metrics, shutdownCh: make(chan struct{}), - requestCh: make(chan *Context, 1024), - responseCh: make(chan *Context, 1024), tracer: tracer, close: close, } @@ -97,28 +93,6 @@ func (s *Server) Start(ctx context.Context) error { } } }() - - go func() { - for { - select { - case <-ctx.Done(): - break - case <-s.shutdownCh: - break - case respCtx := <-s.responseCh: - if queueSpan, ok := respCtx.Value(responseQueueSpanKey).(opentracing.Span); ok { - queueSpan.Finish() - } - if err := s.handleResponse(respCtx); err != nil { - log.Error.Printf("server/%d: handle response error: %s", s.config.ID, err) - } - } - } - }() - - log.Debug.Printf("server/%d: run handler", s.config.ID) - go s.handler.Run(ctx, s.requestCh, s.responseCh) - return nil } @@ -257,17 +231,25 @@ func (s *Server) handleRequest(conn net.Conn) { ctx := opentracing.ContextWithSpan(context.Background(), span) queueSpan := s.tracer.StartSpan("server: queue request", opentracing.ChildOf(span.Context())) ctx = context.WithValue(ctx, requestQueueSpanKey, queueSpan) - reqCtx := &Context{ parent: ctx, header: header, req: req, - conn: conn, + Conn: conn.(*net.TCPConn), } - log.Debug.Printf("server/%d: handle request: %s", s.config.ID, reqCtx) + log.Info.Printf("server/%d: handle request: %s", s.config.ID, reqCtx) - s.requestCh <- reqCtx + respCtx := s.handler.Run(ctx, reqCtx) + if header.APIKey == protocol.FetchKey && s.config.UseSendfile { + continue + } + if queueSpan, ok := respCtx.Value(responseQueueSpanKey).(opentracing.Span); ok { + queueSpan.Finish() + } + if err := s.handleResponse(respCtx); err != nil { + log.Error.Printf("server/%d: handle response error: %s", s.config.ID, err) + } } } @@ -275,7 +257,7 @@ func (s *Server) handleResponse(respCtx *Context) error { psp := opentracing.SpanFromContext(respCtx) sp := s.tracer.StartSpan("server: handle response", opentracing.ChildOf(psp.Context())) - log.Debug.Printf("server/%d: handle response: %s", s.config.ID, respCtx) + log.Info.Printf("server/%d: handle response: %s", s.config.ID, respCtx) defer psp.Finish() defer sp.Finish() @@ -284,7 +266,7 @@ func (s *Server) handleResponse(respCtx *Context) error { if err != nil { return err } - _, err = respCtx.conn.Write(b) + _, err = respCtx.Conn.Write(b) return err } diff --git a/jocko/structs/structs.go b/jocko/structs/structs.go index c5ea2268..192c2f8c 100644 --- a/jocko/structs/structs.go +++ b/jocko/structs/structs.go @@ -2,7 +2,9 @@ package structs import ( "bytes" + "sync" + "github.com/travisjeffery/jocko/protocol" "github.com/ugorji/go/codec" ) @@ -155,9 +157,15 @@ type Partition struct { // Member type Member struct { - ID string - Metadata []byte - Assignment []byte + ID string + Metadata []byte + Assignment []byte + SupportedProtocols map[string][]byte //converted from JoinGroupRequest's GroupProtocols + JoinChan chan protocol.JoinGroupResponse + SyncChan chan bool + LastHeartbeat int64 + SessionTimeout int32 + RebalanceTimeout int32 } type GroupState int32 @@ -176,9 +184,33 @@ type Group struct { Group string Coordinator int32 LeaderID string - Members map[string]Member + Members map[string]*Member State GroupState GenerationID int32 RaftIndex + sync.Mutex +} + +func NewGroup(groupId string, coordinatorId int32) *Group { + return &Group{ + ID: groupId, + Group: groupId, + Coordinator: coordinatorId, + Members: make(map[string]*Member), + } +} +func (g *Group) AllKnownMemberJoined() bool { + for _, member := range g.Members { + if member.JoinChan == nil { + return false + } + } + return true +} + +func (g *Group) InLock(f func()) { + g.Lock() + defer g.Unlock() + f() } diff --git a/jocko/structs/structs_test.go b/jocko/structs/structs_test.go index 8233a7ca..6b1ca55c 100644 --- a/jocko/structs/structs_test.go +++ b/jocko/structs/structs_test.go @@ -55,7 +55,7 @@ func testGroup(t *testing.T) { in := RegisterGroupRequest{ Group: Group{ Group: "group-id", - Members: map[string]Member{}, + Members: map[string]*Member{}, }, } b, err := Encode(RegisterGroupRequestType, &in) diff --git a/protocol/api_versions.go b/protocol/api_versions.go index 2b0dd903..65dce127 100644 --- a/protocol/api_versions.go +++ b/protocol/api_versions.go @@ -9,12 +9,13 @@ var APIVersions = []APIVersion{ {APIKey: StopReplicaKey, MinVersion: 0, MaxVersion: 0}, {APIKey: FindCoordinatorKey, MinVersion: 0, MaxVersion: 1}, {APIKey: JoinGroupKey, MinVersion: 0, MaxVersion: 1}, - {APIKey: HeartbeatKey, MinVersion: 0, MaxVersion: 1}, + {APIKey: HeartbeatKey, MinVersion: 0, MaxVersion: 0}, {APIKey: LeaveGroupKey, MinVersion: 0, MaxVersion: 1}, - {APIKey: SyncGroupKey, MinVersion: 0, MaxVersion: 1}, + {APIKey: SyncGroupKey, MinVersion: 0, MaxVersion: 0}, {APIKey: DescribeGroupsKey, MinVersion: 0, MaxVersion: 1}, {APIKey: ListGroupsKey, MinVersion: 0, MaxVersion: 1}, {APIKey: APIVersionsKey, MinVersion: 0, MaxVersion: 1}, {APIKey: CreateTopicsKey, MinVersion: 0, MaxVersion: 1}, {APIKey: DeleteTopicsKey, MinVersion: 0, MaxVersion: 1}, + {APIKey: OffsetFetchKey, MinVersion: 0, MaxVersion: 0}, } diff --git a/protocol/create_topic_requests.go b/protocol/create_topic_requests.go index abb65e20..a406f7a9 100644 --- a/protocol/create_topic_requests.go +++ b/protocol/create_topic_requests.go @@ -57,6 +57,7 @@ func (r *CreateTopicRequests) Encode(e PacketEncoder) (err error) { } func (r *CreateTopicRequests) Decode(d PacketDecoder, version int16) error { + r.APIVersion = version var err error requestCount, err := d.ArrayLength() if err != nil { diff --git a/protocol/decoder.go b/protocol/decoder.go index 2583b19d..d07f5ab2 100644 --- a/protocol/decoder.go +++ b/protocol/decoder.go @@ -263,10 +263,10 @@ func (d *ByteDecoder) StringArray() ([]string, error) { d.off = len(d.b) return nil, ErrInsufficientData } - n := int(Encoding.Uint32(d.b[d.off:])) + n := int32(Encoding.Uint32(d.b[d.off:])) d.off += 4 - if n == 0 { + if n == 0 || n == -1 { return nil, nil } diff --git a/protocol/eager_encoder_test.go b/protocol/eager_encoder_test.go new file mode 100644 index 00000000..88ff4a1d --- /dev/null +++ b/protocol/eager_encoder_test.go @@ -0,0 +1,42 @@ +package protocol + +import ( + "log" + "testing" + + "github.com/stretchr/testify/require" +) + +type Fake struct { + b bool + arr []int32 +} + +func (e *Fake) Encode(pe PacketEncoder) error { + pe.PutBool(e.b) + return pe.PutInt32Array(e.arr) +} +func TestEagerEncode(t *testing.T) { + e := NewEagerEncoder() + e.PutBool(true) + require.Equal(t, e.Bytes(), []byte{1}) + arr := []int32{1, 2, 3} + e.PutInt32Array(arr) + expected := []byte{1} + arrLenAndData := make([]byte, 4*(1+len(arr))) + Encoding.PutUint32(arrLenAndData, uint32(len(arr))) + Encoding.PutUint32(arrLenAndData[4:], uint32(arr[0])) + Encoding.PutUint32(arrLenAndData[8:], uint32(arr[1])) + Encoding.PutUint32(arrLenAndData[12:], uint32(arr[2])) + expected = append(expected, arrLenAndData...) + require.Equal(t, e.Bytes(), expected) + ebytes := e.Bytes() + + b2, _ := Encode(&Fake{b: true, arr: []int32{1, 2, 3}}) + require.Equal(t, e.Bytes(), b2) + e2 := NewEagerEncoder() + (&Fake{b: true, arr: []int32{1, 2, 3}}).Encode(e2) + require.Equal(t, e2.Bytes(), b2) + ebytes = e2.Bytes() + log.Printf("bytes::::::::: %p %p", &ebytes[0], &expected[0]) +} diff --git a/protocol/encoder.go b/protocol/encoder.go index f9699464..36a2ff81 100644 --- a/protocol/encoder.go +++ b/protocol/encoder.go @@ -13,6 +13,7 @@ type PacketEncoder interface { PutArrayLength(in int) error PutRawBytes(in []byte) error PutBytes(in []byte) error + PutNotNullBytes(in []byte) error PutString(in string) error PutNullableString(in *string) error PutStringArray(in []string) error @@ -91,6 +92,10 @@ func (e *LenEncoder) PutBytes(in []byte) error { } return e.PutRawBytes(in) } +func (e *LenEncoder) PutNotNullBytes(in []byte) error { + //TODO is this correct? + return e.PutBytes(in) +} func (e *LenEncoder) PutRawBytes(in []byte) error { if len(in) > math.MaxInt32 { @@ -216,6 +221,14 @@ func (e *ByteEncoder) PutBytes(in []byte) error { e.PutInt32(int32(len(in))) return e.PutRawBytes(in) } +func (e *ByteEncoder) PutNotNullBytes(in []byte) error { + if in == nil { + e.PutInt32(0) + return nil + } + e.PutInt32(int32(len(in))) + return e.PutRawBytes(in) +} func (e *ByteEncoder) PutString(in string) error { e.PutInt16(int16(len(in))) @@ -281,3 +294,9 @@ func (e *ByteEncoder) Pop() { e.stack = e.stack[:len(e.stack)-1] pe.Fill(e.off, e.b) } +func (e *ByteEncoder) SetOffset(offset int) { + e.off = offset +} +func (e *ByteEncoder) GetOffset() int { + return e.off +} diff --git a/protocol/fetch_response.go b/protocol/fetch_response.go index 52e6d351..dfc3dc01 100644 --- a/protocol/fetch_response.go +++ b/protocol/fetch_response.go @@ -1,6 +1,9 @@ package protocol -import "time" +import ( + "os" + "time" +) type AbortedTransaction struct { ProducerID int64 @@ -30,6 +33,9 @@ type FetchPartitionResponse struct { LastStableOffset int64 AbortedTransactions []*AbortedTransaction RecordSet []byte + FileHandle *os.File + SendOffset int64 + SendSize int } func (r *FetchPartitionResponse) Decode(d PacketDecoder, version int16) (err error) { @@ -86,7 +92,7 @@ func (r *FetchPartitionResponse) Encode(e PacketEncoder, version int16) (err err } } - if err = e.PutBytes(r.RecordSet); err != nil { + if err = e.PutNotNullBytes(r.RecordSet); err != nil { return err } diff --git a/protocol/join_group_response.go b/protocol/join_group_response.go index 81451c37..8dc0543d 100644 --- a/protocol/join_group_response.go +++ b/protocol/join_group_response.go @@ -20,7 +20,7 @@ type JoinGroupResponse struct { } func (r *JoinGroupResponse) Encode(e PacketEncoder) (err error) { - if r.APIVersion >= 1 { + if r.APIVersion >= 2 { e.PutInt32(int32(r.ThrottleTime / time.Millisecond)) } e.PutInt16(r.ErrorCode) @@ -41,7 +41,7 @@ func (r *JoinGroupResponse) Encode(e PacketEncoder) (err error) { if err = e.PutString(member.MemberID); err != nil { return err } - if err = e.PutBytes(member.MemberMetadata); err != nil { + if err = e.PutNotNullBytes(member.MemberMetadata); err != nil { return err } } diff --git a/protocol/metadata_response.go b/protocol/metadata_response.go index ccf5eb47..e7649f7f 100644 --- a/protocol/metadata_response.go +++ b/protocol/metadata_response.go @@ -39,6 +39,9 @@ func (r *MetadataResponse) Encode(e PacketEncoder) (err error) { return err } e.PutInt32(b.Port) + if r.APIVersion >= 1 { + e.PutString("") + } } if r.APIVersion >= 1 { e.PutInt32(r.ControllerID) @@ -51,6 +54,9 @@ func (r *MetadataResponse) Encode(e PacketEncoder) (err error) { if err = e.PutString(t.Topic); err != nil { return err } + if r.APIVersion >= 1 { + e.PutInt8(0) + } if err = e.PutArrayLength(len(t.PartitionMetadata)); err != nil { return err } diff --git a/protocol/offset_fetch_response.go b/protocol/offset_fetch_response.go index 6ed92a6d..4a3d43c0 100644 --- a/protocol/offset_fetch_response.go +++ b/protocol/offset_fetch_response.go @@ -7,7 +7,7 @@ type OffsetFetchTopicResponse struct { type OffsetFetchPartition struct { Partition int32 - Offset int16 + Offset int64 Metadata *string ErrorCode int16 } @@ -31,7 +31,7 @@ func (r *OffsetFetchResponse) Encode(e PacketEncoder) (err error) { } for _, p := range resp.Partitions { e.PutInt32(p.Partition) - e.PutInt16(p.Offset) + e.PutInt64(p.Offset) if err := e.PutNullableString(p.Metadata); err != nil { return err } @@ -62,7 +62,7 @@ func (r *OffsetFetchResponse) Decode(d PacketDecoder, version int16) (err error) if p.Partition, err = d.Int32(); err != nil { return err } - if p.Offset, err = d.Int16(); err != nil { + if p.Offset, err = d.Int64(); err != nil { return err } if p.Metadata, err = d.NullableString(); err != nil {