Skip to content

Commit 57190b6

Browse files
committed
Update mysql_common to 0.21.0
1 parent 2e19b2f commit 57190b6

File tree

5 files changed

+100
-27
lines changed

5 files changed

+100
-27
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ futures-util = "0.3"
1919
futures-sink = "0.3"
2020
lazy_static = "1"
2121
mio-named-pipes = "0.1.6"
22-
mysql_common = "0.20.0"
22+
mysql_common = "0.21.0"
2323
native-tls = "0.2"
2424
percent-encoding = "2.1.0"
2525
pin-project = "0.4.6"

src/conn/mod.rs

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ fn disconnect(mut conn: Conn) {
6868
}
6969
}
7070

71+
#[derive(Debug)]
72+
pub enum PendingResult {
73+
Text(Arc<Vec<Column>>),
74+
Binary(Arc<Vec<Column>>, StmtCacheResult),
75+
Empty,
76+
}
77+
7178
/// Mysql connection
7279
struct ConnInner {
7380
stream: Option<Stream>,
@@ -79,7 +86,7 @@ struct ConnInner {
7986
status: consts::StatusFlags,
8087
last_ok_packet: Option<OkPacket<'static>>,
8188
pool: Option<Pool>,
82-
has_result: Option<(Arc<Vec<Column>>, Option<StmtCacheResult>)>,
89+
has_result: Option<PendingResult>,
8390
in_transaction: bool,
8491
opts: Opts,
8592
last_io: Instant,
@@ -521,13 +528,18 @@ impl Conn {
521528

522529
async fn drop_result(mut self) -> Result<Conn> {
523530
match self.inner.has_result.take() {
524-
Some((columns, None)) => {
531+
Some(PendingResult::Text(columns)) => {
525532
query_result::assemble::<_, TextProtocol>(self, Some(columns), None)
526533
.drop_result()
527534
.await
528535
}
529-
Some((columns, cached)) => {
530-
query_result::assemble::<_, BinaryProtocol>(self, Some(columns), cached)
536+
Some(PendingResult::Binary(columns, cached)) => {
537+
query_result::assemble::<_, BinaryProtocol>(self, Some(columns), Some(cached))
538+
.drop_result()
539+
.await
540+
}
541+
Some(PendingResult::Empty) => {
542+
query_result::assemble::<_, TextProtocol>(self, None, None)
531543
.drop_result()
532544
.await
533545
}
@@ -619,7 +631,7 @@ impl ConnectionLike for Conn {
619631
&self.inner.opts
620632
}
621633

622-
fn get_pending_result(&self) -> Option<&(Arc<Vec<Column>>, Option<StmtCacheResult>)> {
634+
fn get_pending_result(&self) -> Option<&PendingResult> {
623635
self.inner.has_result.as_ref()
624636
}
625637

@@ -639,7 +651,7 @@ impl ConnectionLike for Conn {
639651
self.inner.in_transaction = in_transaction;
640652
}
641653

642-
fn set_pending_result(&mut self, meta: Option<(Arc<Vec<Column>>, Option<StmtCacheResult>)>) {
654+
fn set_pending_result(&mut self, meta: Option<PendingResult>) {
643655
self.inner.has_result = meta;
644656
}
645657

@@ -1194,6 +1206,52 @@ mod test {
11941206
Ok(())
11951207
}
11961208

1209+
#[tokio::test]
1210+
async fn issue_107() -> super::Result<()> {
1211+
let conn = Conn::new(get_opts()).await?;
1212+
let conn = conn
1213+
.drop_query(
1214+
r"CREATE TEMPORARY TABLE mysql.issue (
1215+
a BIGINT(20) UNSIGNED,
1216+
b VARBINARY(16),
1217+
c BINARY(32),
1218+
d BIGINT(20) UNSIGNED,
1219+
e BINARY(32)
1220+
)",
1221+
)
1222+
.await?;
1223+
let conn = conn
1224+
.drop_query(
1225+
r"INSERT INTO mysql.issue VALUES (
1226+
0,
1227+
0xC066F966B0860000,
1228+
0x7939DA98E524C5F969FC2DE8D905FD9501EBC6F20001B0A9C941E0BE6D50CF44,
1229+
0,
1230+
''
1231+
), (
1232+
1,
1233+
'',
1234+
0x076311DF4D407B0854371BA13A5F3FB1A4555AC22B361375FD47B263F31822F2,
1235+
0,
1236+
''
1237+
)",
1238+
)
1239+
.await?;
1240+
1241+
let q = "SELECT b, c, d, e FROM mysql.issue";
1242+
let result = conn.query(q).await?;
1243+
1244+
let (conn, loaded_structs) = result
1245+
.map_and_drop(|row| crate::from_row::<(Vec<u8>, Vec<u8>, u64, Vec<u8>)>(dbg!(row)))
1246+
.await?;
1247+
1248+
conn.disconnect().await?;
1249+
1250+
assert_eq!(loaded_structs.len(), 2);
1251+
1252+
Ok(())
1253+
}
1254+
11971255
#[tokio::test]
11981256
async fn should_run_transactions() -> super::Result<()> {
11991257
let conn = Conn::new(get_opts()).await?;

src/connection_like/mod.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
// option. All files in the project carrying such notice may not be copied,
77
// modified, or distributed except according to those terms.
88

9+
use crate::conn::PendingResult;
910
use futures_util::future::ok;
1011
use mysql_common::{
1112
io::ReadMysqlExt,
@@ -51,7 +52,7 @@ pub mod streamless {
5152
}
5253
pub mod write_packet;
5354

54-
#[derive(Debug)]
55+
#[derive(Debug, Clone, Copy)]
5556
pub enum StmtCacheResult {
5657
Cached,
5758
NotCached(u32),
@@ -130,7 +131,7 @@ where
130131
self.conn_like_ref().get_opts()
131132
}
132133

133-
fn get_pending_result(&self) -> Option<&(Arc<Vec<Column>>, Option<StmtCacheResult>)> {
134+
fn get_pending_result(&self) -> Option<&PendingResult> {
134135
self.conn_like_ref().get_pending_result()
135136
}
136137

@@ -150,7 +151,7 @@ where
150151
self.conn_like_mut().set_in_transaction(in_transaction);
151152
}
152153

153-
fn set_pending_result(&mut self, meta: Option<(Arc<Vec<Column>>, Option<StmtCacheResult>)>) {
154+
fn set_pending_result(&mut self, meta: Option<PendingResult>) {
154155
self.conn_like_mut().set_pending_result(meta);
155156
}
156157

@@ -191,12 +192,12 @@ pub trait ConnectionLike: Send {
191192
fn get_local_infile_handler(&self) -> Option<Arc<dyn LocalInfileHandler>>;
192193
fn get_max_allowed_packet(&self) -> usize;
193194
fn get_opts(&self) -> &Opts;
194-
fn get_pending_result(&self) -> Option<&(Arc<Vec<Column>>, Option<StmtCacheResult>)>;
195+
fn get_pending_result(&self) -> Option<&PendingResult>;
195196
fn get_server_version(&self) -> (u16, u16, u16);
196197
fn get_status(&self) -> StatusFlags;
197198
fn set_last_ok_packet(&mut self, ok_packet: Option<OkPacket<'static>>);
198199
fn set_in_transaction(&mut self, in_transaction: bool);
199-
fn set_pending_result(&mut self, meta: Option<(Arc<Vec<Column>>, Option<StmtCacheResult>)>);
200+
fn set_pending_result(&mut self, meta: Option<PendingResult>);
200201
fn set_status(&mut self, status: StatusFlags);
201202
fn reset_seq_id(&mut self);
202203
fn sync_seq_id(&mut self);
@@ -435,7 +436,17 @@ where
435436
this = this.read_packet().await?.0;
436437
}
437438

438-
let columns = Arc::new(columns);
439-
this.set_pending_result(Some((Clone::clone(&columns), None)));
440-
Ok(query_result::new(this, Some(columns), cached))
439+
if column_count > 0 {
440+
let columns = Arc::new(columns);
441+
match cached {
442+
Some(cached) => {
443+
this.set_pending_result(Some(PendingResult::Binary(columns.clone(), cached)))
444+
}
445+
None => this.set_pending_result(Some(PendingResult::Text(columns.clone()))),
446+
}
447+
Ok(query_result::new(this, Some(columns), cached))
448+
} else {
449+
this.set_pending_result(Some(PendingResult::Empty));
450+
Ok(query_result::new(this, None, cached))
451+
}
441452
}

src/connection_like/read_packet.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
use futures_core::ready;
1010
use futures_util::stream::{StreamExt, StreamFuture};
11-
use mysql_common::packets::{parse_err_packet, parse_ok_packet};
11+
use mysql_common::packets::{parse_err_packet, parse_ok_packet, OkPacketKind};
1212
use pin_project::pin_project;
1313
use std::{
1414
future::Future,
@@ -50,7 +50,13 @@ impl<T: ConnectionLike> Future for ReadPacket<T> {
5050
let mut conn_like = this.conn_like.take().unwrap().return_stream(stream);
5151
match packet_opt {
5252
Some(packet) => {
53-
if let Ok(ok_packet) = parse_ok_packet(&*packet, conn_like.get_capabilities()) {
53+
let kind = if conn_like.get_pending_result().is_some() {
54+
OkPacketKind::ResultSetTerminator
55+
} else {
56+
OkPacketKind::Other
57+
};
58+
if let Ok(ok_packet) = parse_ok_packet(&*packet, conn_like.get_capabilities(), kind)
59+
{
5460
conn_like.set_status(ok_packet.status_flags());
5561
conn_like.set_last_ok_packet(Some(ok_packet.into_owned()));
5662
} else if let Ok(err_packet) =

src/queryable/mod.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
// modified, or distributed except according to those terms.
88

99
use mysql_common::{
10-
packets::parse_ok_packet,
10+
packets::{parse_ok_packet, OkPacketKind},
1111
row::new_row,
1212
value::{read_bin_values, read_text_values},
1313
};
@@ -35,7 +35,12 @@ pub trait Protocol: Send + 'static {
3535
where
3636
T: ConnectionLike,
3737
{
38-
parse_ok_packet(packet, conn_like.get_capabilities()).is_ok()
38+
parse_ok_packet(
39+
packet,
40+
conn_like.get_capabilities(),
41+
OkPacketKind::ResultSetTerminator,
42+
)
43+
.is_ok()
3944
}
4045
}
4146

@@ -52,20 +57,13 @@ impl Protocol for TextProtocol {
5257
.map_err(Into::into)
5358
}
5459
}
60+
5561
impl Protocol for BinaryProtocol {
5662
fn read_result_set_row(packet: &[u8], columns: Arc<Vec<Column>>) -> Result<Row> {
5763
read_bin_values::<ServerSide>(packet, &*columns)
5864
.map(|values| new_row(values, columns))
5965
.map_err(Into::into)
6066
}
61-
62-
fn is_last_result_set_packet<T>(conn_like: &T, packet: &[u8]) -> bool
63-
where
64-
T: ConnectionLike,
65-
{
66-
parse_ok_packet(packet, conn_like.get_capabilities()).is_ok()
67-
&& packet.get(0).cloned() == Some(0xFE)
68-
}
6967
}
7068

7169
/// Represents something queryable like connection or transaction.

0 commit comments

Comments
 (0)