Skip to content

Commit 7522397

Browse files
authored
Merge pull request #154 from blackbeam/0.28-release
v0.28.0 release
2 parents f406d2d + 3e11f98 commit 7522397

File tree

22 files changed

+957
-287
lines changed

22 files changed

+957
-287
lines changed

Cargo.toml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,23 @@ license = "MIT/Apache-2.0"
77
name = "mysql_async"
88
readme = "README.md"
99
repository = "https://github.com/blackbeam/mysql_async"
10-
version = "0.27.1"
10+
version = "0.28.0"
1111
exclude = ["test/*"]
1212
edition = "2018"
13+
categories = ["asynchronous", "database"]
1314

1415
[dependencies]
1516
bytes = "1.0"
17+
flate2 = { version = "1.0", default-features = false }
1618
futures-core = "0.3"
1719
futures-util = "0.3"
1820
futures-sink = "0.3"
1921
lazy_static = "1"
2022
lru = "0.6.0"
2123
mio = "0.7.7"
22-
mysql_common = "0.26.0"
24+
mysql_common = { version = "0.27.2", default-features = false }
2325
native-tls = "0.2"
26+
once_cell = "1.7.2"
2427
pem = "0.8.1"
2528
percent-encoding = "2.1.0"
2629
pin-project = "1.0.2"
@@ -36,11 +39,20 @@ uuid = { version = "0.8.1", features = ["v4"] }
3639

3740
[dev-dependencies]
3841
tempfile = "3.1.0"
39-
socket2 = "0.3.17"
42+
socket2 = { version = "0.4.0", features = ["all"] }
4043
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
4144
rand = "0.8.0"
4245

4346
[features]
47+
default = [
48+
"flate2/zlib",
49+
"mysql_common/bigdecimal",
50+
"mysql_common/chrono",
51+
"mysql_common/rust_decimal",
52+
"mysql_common/time",
53+
"mysql_common/uuid",
54+
"mysql_common/frunk",
55+
]
4456
nightly = []
4557

4658
[lib]

azure-pipelines.yml

Lines changed: 19 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ jobs:
2121
sudo apt-get -y install mysql-server libmysqlclient-dev curl
2222
sudo service mysql start
2323
mysql -e "SET GLOBAL max_allowed_packet = 36700160;" -uroot -proot
24+
mysql -e "SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = WARN;" -uroot -proot
25+
mysql -e "SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = ON;" -uroot -proot
26+
mysql -e "SET @@GLOBAL.GTID_MODE = OFF_PERMISSIVE;" -uroot -proot
27+
mysql -e "SET @@GLOBAL.GTID_MODE = ON_PERMISSIVE;" -uroot -proot
28+
mysql -e "SET @@GLOBAL.GTID_MODE = ON;" -uroot -proot
29+
mysql -e "PURGE BINARY LOGS BEFORE now();" -uroot -proot
2430
displayName: Install MySql
2531
- bash: |
2632
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $(RUST_TOOLCHAIN)
@@ -43,40 +49,6 @@ jobs:
4349
DATABASE_URL: mysql://root:[email protected]:3306/mysql
4450
displayName: Run tests
4551
46-
# - job: "TestBasicMacOs"
47-
# pool:
48-
# vmImage: "macOS-10.15"
49-
# strategy:
50-
# maxParallel: 10
51-
# matrix:
52-
# stable:
53-
# RUST_TOOLCHAIN: stable
54-
# steps:
55-
# - bash: |
56-
# brew update
57-
# brew install mysql
58-
# brew services start mysql
59-
# brew services stop mysql
60-
# sleep 3
61-
# echo 'local_infile=1' >> /usr/local/etc/my.cnf
62-
# echo 'socket=/tmp/mysql.sock' >> /usr/local/etc/my.cnf
63-
# brew services start mysql
64-
# sleep 5
65-
# /usr/local/Cellar/mysql/*/bin/mysql -e "SET GLOBAL max_allowed_packet = 36700160;" -uroot
66-
# displayName: Install MySql
67-
# - bash: |
68-
# curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $RUST_TOOLCHAIN
69-
# displayName: Install rust (MacOs)
70-
# - bash: |
71-
# SSL=false COMPRESS=false cargo test
72-
# SSL=true COMPRESS=false cargo test
73-
# SSL=false COMPRESS=true cargo test
74-
# SSL=true COMPRESS=true cargo test
75-
# env:
76-
# RUST_BACKTRACE: 1
77-
# DATABASE_URL: mysql://[email protected]/mysql
78-
# displayName: Run tests
79-
8052
- job: "TestBasicWindows"
8153
pool:
8254
vmImage: "vs2017-win2016"
@@ -95,6 +67,11 @@ jobs:
9567
call "C:\Program Files (x86)\MySQL\MySQL Installer for Windows\MySQLInstallerConsole.exe" community install server;8.0.11;x64:*:port=3306;rootpasswd=password;servicename=MySQL -silent
9668
netsh advfirewall firewall add rule name="Allow mysql" dir=in action=allow edge=yes remoteip=any protocol=TCP localport=80,8080,3306
9769
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET GLOBAL max_allowed_packet = 36700160;" -uroot -ppassword
70+
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = WARN;" -uroot -ppassword
71+
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = ON;" -uroot -ppassword
72+
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET @@GLOBAL.GTID_MODE = OFF_PERMISSIVE;" -uroot -ppassword
73+
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET @@GLOBAL.GTID_MODE = ON_PERMISSIVE;" -uroot -ppassword
74+
"C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql" -e "SET @@GLOBAL.GTID_MODE = ON;" -uroot -ppassword
9875
displayName: Install MySql
9976
- bash: |
10077
rustup install $RUST_TOOLCHAIN
@@ -130,16 +107,20 @@ jobs:
130107
docker --version
131108
displayName: Install docker
132109
- bash: |
133-
docker run --rm --name container -v `pwd`:/root -p 3307:3306 -d -e MYSQL_ROOT_PASSWORD=password mysql:$(DB_VERSION) --max-allowed-packet=36700160 --local-infile
110+
if [[ "5.6" == "$(DB_VERSION)" ]]; then ARG="--secure-auth=OFF"; fi
111+
docker run -d --name container -v `pwd`:/root -p 3307:3306 -e MYSQL_ROOT_PASSWORD=password mysql:$(DB_VERSION) --max-allowed-packet=36700160 --local-infile --log-bin=mysql-bin --log-slave-updates --gtid_mode=ON --enforce_gtid_consistency=ON --server-id=1 $ARG
134112
while ! nc -W 1 localhost 3307 | grep -q -P '.+'; do sleep 1; done
135113
displayName: Run MySql in Docker
114+
- bash: |
115+
docker exec container bash -l -c "mysql -uroot -ppassword -e \"SET old_passwords = 1; GRANT ALL PRIVILEGES ON *.* TO 'root2'@'%' IDENTIFIED WITH mysql_old_password AS 'password'; SET PASSWORD FOR 'root2'@'%' = OLD_PASSWORD('password')\"";
116+
condition: eq(variables['DB_VERSION'], '5.6')
136117
- bash: |
137118
docker exec container bash -l -c "apt-get update"
138119
docker exec container bash -l -c "apt-get install -y curl clang libssl-dev pkg-config"
139120
docker exec container bash -l -c "curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain stable"
140121
displayName: Install Rust in docker
141122
- bash: |
142-
if [[ "5.6" != "$(DB_VERSION)" ]]; then SSL=true; fi
123+
if [[ "5.6" != "$(DB_VERSION)" ]]; then SSL=true; else DATABASE_URL="mysql://root2:[email protected]/mysql?secure_auth=false"; fi
143124
docker exec container bash -l -c "cd \$HOME && DATABASE_URL=$DATABASE_URL cargo test"
144125
docker exec container bash -l -c "cd \$HOME && DATABASE_URL=$DATABASE_URL COMPRESS=true cargo test"
145126
docker exec container bash -l -c "cd \$HOME && DATABASE_URL=$DATABASE_URL SSL=$SSL cargo test"
@@ -186,10 +167,11 @@ jobs:
186167
--max-allowed-packet=36700160 \
187168
--local-infile \
188169
--performance-schema=on \
170+
--log-bin=mysql-bin --gtid-domain-id=1 --server-id=1 \
189171
--ssl \
190172
--ssl-ca=/root/rust-mysql-simple/tests/ca-cert.pem \
191173
--ssl-cert=/root/rust-mysql-simple/tests/server-cert.pem \
192-
--ssl-key=/root/rust-mysql-simple/tests/server-key.pem
174+
--ssl-key=/root/rust-mysql-simple/tests/server-key.pem &
193175
while ! nc -W 1 localhost 3307 | grep -q -P '.+'; do sleep 1; done
194176
displayName: Run MariaDb in Docker
195177
- bash: |

src/buffer_pool.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright (c) 2021 Anatoly Ikorsky
2+
//
3+
// Licensed under the Apache License, Version 2.0
4+
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5+
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6+
// option. All files in the project carrying such notice may not be copied,
7+
// modified, or distributed except according to those terms.
8+
9+
use std::{
10+
mem::replace,
11+
ops::Deref,
12+
sync::{Arc, Mutex},
13+
};
14+
15+
#[derive(Debug)]
16+
pub struct BufferPool {
17+
pool_cap: usize,
18+
buffer_cap: usize,
19+
pool: Mutex<Vec<Vec<u8>>>,
20+
}
21+
22+
impl BufferPool {
23+
pub fn new() -> Self {
24+
let pool_cap = std::env::var("MYSQL_ASYNC_BUFFER_POOL_CAP")
25+
.ok()
26+
.and_then(|x| x.parse().ok())
27+
.unwrap_or(128_usize);
28+
29+
let buffer_cap = std::env::var("MYSQL_ASYNC_BUFFER_SIZE_CAP")
30+
.ok()
31+
.and_then(|x| x.parse().ok())
32+
.unwrap_or(4 * 1024 * 1024);
33+
34+
Self {
35+
pool: Default::default(),
36+
pool_cap,
37+
buffer_cap,
38+
}
39+
}
40+
41+
pub fn get(self: &Arc<Self>) -> PooledBuf {
42+
let mut buf = self.pool.lock().unwrap().pop().unwrap_or_default();
43+
44+
// SAFETY:
45+
// 1. OK – 0 is always within capacity
46+
// 2. OK - nothing to initialize
47+
unsafe { buf.set_len(0) }
48+
49+
PooledBuf(buf, self.clone())
50+
}
51+
52+
pub fn get_with<T: AsRef<[u8]>>(self: &Arc<Self>, content: T) -> PooledBuf {
53+
let mut buf = self.get();
54+
buf.as_mut().extend_from_slice(content.as_ref());
55+
buf
56+
}
57+
58+
fn put(self: &Arc<Self>, mut buf: Vec<u8>) {
59+
if buf.len() > self.buffer_cap {
60+
// TODO: until `Vec::shrink_to` stabilization
61+
62+
// SAFETY:
63+
// 1. OK – new_len <= capacity
64+
// 2. OK - 0..new_len is initialized
65+
unsafe { buf.set_len(self.buffer_cap) }
66+
buf.shrink_to_fit();
67+
}
68+
69+
let mut pool = self.pool.lock().unwrap();
70+
if pool.len() < self.pool_cap {
71+
pool.push(buf);
72+
}
73+
}
74+
}
75+
76+
impl Default for BufferPool {
77+
fn default() -> Self {
78+
Self::new()
79+
}
80+
}
81+
82+
#[derive(Debug)]
83+
pub struct PooledBuf(Vec<u8>, Arc<BufferPool>);
84+
85+
impl AsMut<Vec<u8>> for PooledBuf {
86+
fn as_mut(&mut self) -> &mut Vec<u8> {
87+
&mut self.0
88+
}
89+
}
90+
91+
impl Deref for PooledBuf {
92+
type Target = [u8];
93+
94+
fn deref(&self) -> &Self::Target {
95+
self.0.deref()
96+
}
97+
}
98+
99+
impl Drop for PooledBuf {
100+
fn drop(&mut self) {
101+
self.1.put(replace(&mut self.0, vec![]))
102+
}
103+
}

src/conn/binlog_stream.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright (c) 2020 Anatoly Ikorsky
2+
//
3+
// Licensed under the Apache License, Version 2.0
4+
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5+
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6+
// option. All files in the project carrying such notice may not be copied,
7+
// modified, or distributed except according to those terms.
8+
9+
use futures_core::ready;
10+
use mysql_common::{
11+
binlog::{
12+
consts::BinlogVersion::Version4,
13+
events::{Event, TableMapEvent},
14+
EventStreamReader,
15+
},
16+
io::ParseBuf,
17+
packets::{ErrPacket, NetworkStreamTerminator, OkPacketDeserializer},
18+
};
19+
20+
use std::{
21+
future::Future,
22+
pin::Pin,
23+
task::{Context, Poll},
24+
};
25+
26+
use crate::{error::DriverError, io::ReadPacket, Conn, Result};
27+
28+
/// Binlog event stream.
29+
///
30+
/// Stream initialization is lazy, i.e. binlog won't be requested until this stream is polled.
31+
pub struct BinlogStream {
32+
read_packet: ReadPacket<'static, 'static>,
33+
esr: EventStreamReader,
34+
}
35+
36+
impl BinlogStream {
37+
/// `conn` is a `Conn` with `request_binlog` executed on it.
38+
pub(super) fn new(conn: Conn) -> Self {
39+
BinlogStream {
40+
read_packet: ReadPacket::new(conn),
41+
esr: EventStreamReader::new(Version4),
42+
}
43+
}
44+
45+
/// Returns a table map event for the given table id.
46+
pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
47+
self.esr.get_tme(table_id)
48+
}
49+
}
50+
51+
impl futures_core::stream::Stream for BinlogStream {
52+
type Item = Result<Event>;
53+
54+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55+
let packet = match ready!(Pin::new(&mut self.read_packet).poll(cx)) {
56+
Ok(packet) => packet,
57+
Err(err) => return Poll::Ready(Some(Err(err.into()))),
58+
};
59+
60+
let first_byte = packet.get(0).copied();
61+
62+
if first_byte == Some(255) {
63+
if let Ok(ErrPacket::Error(err)) =
64+
ParseBuf(&*packet).parse(self.read_packet.conn_ref().capabilities())
65+
{
66+
return Poll::Ready(Some(Err(From::from(err))));
67+
}
68+
}
69+
70+
if first_byte == Some(254) && packet.len() < 8 {
71+
if ParseBuf(&*packet)
72+
.parse::<OkPacketDeserializer<NetworkStreamTerminator>>(
73+
self.read_packet.conn_ref().capabilities(),
74+
)
75+
.is_ok()
76+
{
77+
return Poll::Ready(None);
78+
}
79+
}
80+
81+
if first_byte == Some(0) {
82+
let event_data = &packet[1..];
83+
match self.esr.read(event_data) {
84+
Ok(event) => {
85+
return Poll::Ready(Some(Ok(event)));
86+
}
87+
Err(err) => return Poll::Ready(Some(Err(err.into()))),
88+
}
89+
} else {
90+
return Poll::Ready(Some(Err(DriverError::UnexpectedPacket {
91+
payload: packet.to_vec(),
92+
}
93+
.into())));
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)