Skip to content

Commit aa64ab2

Browse files
committed
Fix bug replication and now don't need to connect manual
1 parent dcbca92 commit aa64ab2

File tree

8 files changed

+40
-20
lines changed

8 files changed

+40
-20
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
- Force int `pool_recycle`.
88
- Fix `echo` option.
9+
- Fix bug replication and now don't need to connect manual.
910

1011
### 0.1.8
1112

README.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ async def run():
105105
resume_stream=True,
106106
blocking=True,
107107
)
108-
await stream.connect()
109108
async for event in stream:
110109
print(event)
111110

@@ -116,7 +115,7 @@ if __name__ == '__main__':
116115

117116
## ThanksTo
118117

119-
> asyncmy is build on top of these nice projects.
118+
> asyncmy is build on top of these awesome projects.
120119
121120
- [pymysql](https://github/pymysql/PyMySQL), a pure python MySQL client.
122121
- [aiomysql](https://github.com/aio-libs/aiomysql), a library for accessing a MySQL database from the asyncio.

asyncmy/replication/binlogstream.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,7 @@ def _allowed_event_list(
179179
pass
180180
return frozenset(events)
181181

182-
async def connect(self):
183-
if self._connected:
184-
return
182+
async def _connect(self):
185183
await self._connection.connect()
186184
self._use_checksum = await self._checksum_enable()
187185
async with self._connection.cursor() as cursor:
@@ -272,7 +270,7 @@ async def close(self):
272270

273271
async def _read(self):
274272
if not self._connected:
275-
await self.connect()
273+
await self._connect()
276274

277275
pkt = await self._connection.read_packet()
278276
if pkt.is_eof_packet():
@@ -316,7 +314,7 @@ async def _read(self):
316314
async def _checksum_enable(self):
317315
async with self._connection.cursor() as cursor:
318316
await cursor.execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'")
319-
result = cursor.fetchone()
317+
result = await cursor.fetchone()
320318
if result is None:
321319
return False
322320
var, value = result[:2]
@@ -354,6 +352,8 @@ def __aiter__(self):
354352
return self
355353

356354
async def __anext__(self):
355+
if not self._connected:
356+
await self._connect()
357357
try:
358358
ret = await self._read()
359359
while ret is None:

asyncmy/replication/events.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import binascii
22
import struct
33

4+
from asyncmy.replication.utils import byte2int, int2byte
5+
46

57
class BinLogEvent:
68
def __init__(
@@ -35,7 +37,7 @@ def processed(self):
3537
def _read_table_id(self):
3638
# Table ID is 6 byte
3739
# pad little-endian number
38-
table_id = self.packet.read(6) + bytes(0) + bytes(0)
40+
table_id = self.packet.read(6) + int2byte(0) + int2byte(0)
3941
return struct.unpack("<Q", table_id)[0]
4042

4143
async def init(self):
@@ -127,7 +129,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
127129
# Post-header
128130
self.slave_proxy_id = self.packet.read_uint32()
129131
self.execution_time = self.packet.read_uint32()
130-
self.schema_length = self.packet.read(1)
132+
self.schema_length = byte2int(self.packet.read(1))
131133
self.error_code = self.packet.read_uint16()
132134
self.status_vars_length = self.packet.read_uint16()
133135

asyncmy/replication/packets.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
WRITE_ROWS_EVENT_V2,
4949
XID_EVENT,
5050
)
51+
from asyncmy.replication.utils import byte2int
5152

5253

5354
class BinLogPacket:
@@ -99,7 +100,7 @@ def __init__(
99100
# server_id
100101
# log_pos
101102
# flags
102-
unpack = struct.unpack("<cIcIIIH", self._packet.read(20))
103+
unpack = struct.unpack("<cIBIIIH", self._packet.read(20))
103104

104105
# Header
105106
self.timestamp = unpack[1]
@@ -166,7 +167,7 @@ def advance(self, size):
166167
self._packet.advance(size)
167168

168169
def read_length_coded_binary(self):
169-
c = self.read(1)
170+
c = byte2int(self.read(1))
170171
if c == NULL_COLUMN:
171172
return None
172173
if c < UNSIGNED_CHAR_COLUMN:

asyncmy/replication/row_events.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from .errors import TableMetadataUnavailableError
4141
from .events import BinLogEvent
4242
from .table import Table
43+
from .utils import byte2int
4344

4445

4546
class RowsEvent(BinLogEvent):
@@ -539,10 +540,10 @@ def __init__(self, from_packet, event_size, table_map, connection, **kwargs):
539540
self.flags = struct.unpack("<H", self.packet.read(2))[0]
540541

541542
# Payload
542-
self.schema_length = self.packet.read(1)
543+
self.schema_length = byte2int(self.packet.read(1))
543544
self.schema = self.packet.read(self.schema_length).decode()
544545
self.packet.advance(1)
545-
self.table_length = self.packet.read(1)
546+
self.table_length = byte2int(self.packet.read(1))
546547
self.table_name = self.packet.read(self.table_length).decode()
547548
schema_table = f"{self.schema}.{self.table_name}"
548549
if self._only_tables is not None and schema_table not in self._only_tables:

asyncmy/replication/utils.py

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import struct
2+
3+
4+
def byte2int(b):
5+
if isinstance(b, int):
6+
return b
7+
else:
8+
return struct.unpack("!B", b)[0]
9+
10+
11+
def int2byte(i: int):
12+
return struct.pack("!B", i)
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
13
import pytest
24

35
from asyncmy import connect
@@ -6,21 +8,23 @@
68
from conftest import connection_kwargs
79

810

9-
@pytest.mark.skip(reason="need test in local")
10-
@pytest.mark.asyncio
11-
async def test_binlogstream(connection):
11+
async def main():
1212
conn = await connect(**connection_kwargs)
13+
ctl_conn = await connect(**connection_kwargs)
1314

1415
stream = BinLogStream(
15-
connection,
1616
conn,
17+
ctl_conn,
1718
1,
18-
master_log_file="binlog.000172",
19+
master_log_file="binlog.000019",
20+
master_log_position=155276,
1921
resume_stream=True,
2022
blocking=True,
21-
master_log_position=2235312,
2223
)
23-
await stream.connect()
2424
async for event in stream:
2525
if isinstance(event, WriteRowsEvent):
2626
print(event.schema, event.table, event.rows)
27+
28+
29+
if __name__ == "__main__":
30+
asyncio.run(main())

0 commit comments

Comments
 (0)