Skip to content

Commit 2ac0796

Browse files
committed
nsqd: read/write concurrently in MPUB benchmarks
1 parent fc261f9 commit 2ac0796

File tree

1 file changed

+54
-34
lines changed

1 file changed

+54
-34
lines changed

nsqd/protocol_v2_test.go

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1547,25 +1547,35 @@ func benchmarkProtocolV2PubMultiTopic(b *testing.B, numTopics int) {
15471547
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
15481548

15491549
num := b.N / numTopics / batchSize
1550-
for i := 0; i < num; i++ {
1551-
cmd, _ := nsq.MultiPublish(topicName, batch)
1552-
_, err := cmd.WriteTo(rw)
1553-
if err != nil {
1554-
panic(err.Error())
1555-
}
1556-
err = rw.Flush()
1557-
if err != nil {
1558-
panic(err.Error())
1559-
}
1560-
resp, err := nsq.ReadResponse(rw)
1561-
if err != nil {
1562-
panic(err.Error())
1550+
wg.Add(1)
1551+
go func() {
1552+
for i := 0; i < num; i++ {
1553+
cmd, _ := nsq.MultiPublish(topicName, batch)
1554+
_, err := cmd.WriteTo(rw)
1555+
if err != nil {
1556+
panic(err.Error())
1557+
}
1558+
err = rw.Flush()
1559+
if err != nil {
1560+
panic(err.Error())
1561+
}
15631562
}
1564-
_, data, _ := nsq.UnpackResponse(resp)
1565-
if !bytes.Equal(data, []byte("OK")) {
1566-
panic("invalid response")
1563+
wg.Done()
1564+
}()
1565+
wg.Add(1)
1566+
go func() {
1567+
for i := 0; i < num; i++ {
1568+
resp, err := nsq.ReadResponse(rw)
1569+
if err != nil {
1570+
panic(err.Error())
1571+
}
1572+
_, data, _ := nsq.UnpackResponse(resp)
1573+
if !bytes.Equal(data, []byte("OK")) {
1574+
panic("invalid response")
1575+
}
15671576
}
1568-
}
1577+
wg.Done()
1578+
}()
15691579
wg.Done()
15701580
}()
15711581
}
@@ -1611,25 +1621,35 @@ func benchmarkProtocolV2Pub(b *testing.B, size int) {
16111621
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
16121622

16131623
num := b.N / runtime.GOMAXPROCS(0) / batchSize
1614-
for i := 0; i < num; i++ {
1615-
cmd, _ := nsq.MultiPublish(topicName, batch)
1616-
_, err := cmd.WriteTo(rw)
1617-
if err != nil {
1618-
panic(err.Error())
1619-
}
1620-
err = rw.Flush()
1621-
if err != nil {
1622-
panic(err.Error())
1623-
}
1624-
resp, err := nsq.ReadResponse(rw)
1625-
if err != nil {
1626-
panic(err.Error())
1624+
wg.Add(1)
1625+
go func() {
1626+
for i := 0; i < num; i++ {
1627+
cmd, _ := nsq.MultiPublish(topicName, batch)
1628+
_, err := cmd.WriteTo(rw)
1629+
if err != nil {
1630+
panic(err.Error())
1631+
}
1632+
err = rw.Flush()
1633+
if err != nil {
1634+
panic(err.Error())
1635+
}
16271636
}
1628-
_, data, _ := nsq.UnpackResponse(resp)
1629-
if !bytes.Equal(data, []byte("OK")) {
1630-
panic("invalid response")
1637+
wg.Done()
1638+
}()
1639+
wg.Add(1)
1640+
go func() {
1641+
for i := 0; i < num; i++ {
1642+
resp, err := nsq.ReadResponse(rw)
1643+
if err != nil {
1644+
panic(err.Error())
1645+
}
1646+
_, data, _ := nsq.UnpackResponse(resp)
1647+
if !bytes.Equal(data, []byte("OK")) {
1648+
panic("invalid response")
1649+
}
16311650
}
1632-
}
1651+
wg.Done()
1652+
}()
16331653
wg.Done()
16341654
}()
16351655
}

0 commit comments

Comments
 (0)