Skip to content

Commit 2284f83

Browse files
committed
refactor: migrate mysql service from adapter::kv to impl Access directly
1 parent 42545a5 commit 2284f83

File tree

7 files changed

+284
-83
lines changed

7 files changed

+284
-83
lines changed

core/src/services/mysql/backend.rs

Lines changed: 78 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717

1818
use std::fmt::Debug;
1919
use std::str::FromStr;
20+
use std::sync::Arc;
2021

21-
use sqlx::MySqlPool;
2222
use sqlx::mysql::MySqlConnectOptions;
2323
use tokio::sync::OnceCell;
2424

25-
use crate::raw::adapters::kv;
25+
use super::config::MysqlConfig;
26+
use super::core::*;
27+
use super::deleter::MysqlDeleter;
28+
use super::writer::MysqlWriter;
29+
use crate::raw::oio;
2630
use crate::raw::*;
27-
use crate::services::MysqlConfig;
2831
use crate::*;
2932

3033
#[doc = include_str!("docs.md")]
@@ -142,7 +145,7 @@ impl Builder for MysqlBuilder {
142145

143146
let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
144147

145-
Ok(MySqlBackend::new(Adapter {
148+
Ok(MysqlBackend::new(MysqlCore {
146149
pool: OnceCell::new(),
147150
config,
148151
table,
@@ -154,96 +157,92 @@ impl Builder for MysqlBuilder {
154157
}
155158

156159
/// Backend for mysql service
157-
pub type MySqlBackend = kv::Backend<Adapter>;
158-
159-
#[derive(Debug, Clone)]
160-
pub struct Adapter {
161-
pool: OnceCell<MySqlPool>,
162-
config: MySqlConnectOptions,
163-
164-
table: String,
165-
key_field: String,
166-
value_field: String,
160+
#[derive(Clone, Debug)]
161+
pub struct MysqlBackend {
162+
core: Arc<MysqlCore>,
163+
root: String,
164+
info: Arc<AccessorInfo>,
167165
}
168166

169-
impl Adapter {
170-
async fn get_client(&self) -> Result<&MySqlPool> {
171-
self.pool
172-
.get_or_try_init(|| async {
173-
let pool = MySqlPool::connect_with(self.config.clone())
174-
.await
175-
.map_err(parse_mysql_error)?;
176-
Ok(pool)
177-
})
178-
.await
167+
impl MysqlBackend {
168+
pub fn new(core: MysqlCore) -> Self {
169+
let info = AccessorInfo::default();
170+
info.set_scheme(Scheme::Mysql.into_static());
171+
info.set_name(&core.table);
172+
info.set_root("/");
173+
info.set_native_capability(Capability {
174+
read: true,
175+
stat: true,
176+
write: true,
177+
write_can_empty: true,
178+
delete: true,
179+
shared: true,
180+
..Default::default()
181+
});
182+
183+
Self {
184+
core: Arc::new(core),
185+
root: "/".to_string(),
186+
info: Arc::new(info),
187+
}
179188
}
180-
}
181189

182-
impl kv::Adapter for Adapter {
183-
type Scanner = ();
184-
185-
fn info(&self) -> kv::Info {
186-
kv::Info::new(
187-
Scheme::Mysql,
188-
&self.table,
189-
Capability {
190-
read: true,
191-
write: true,
192-
delete: true,
193-
shared: true,
194-
..Default::default()
195-
},
196-
)
190+
fn with_normalized_root(mut self, root: String) -> Self {
191+
self.info.set_root(&root);
192+
self.root = root;
193+
self
197194
}
195+
}
198196

199-
async fn get(&self, path: &str) -> Result<Option<Buffer>> {
200-
let pool = self.get_client().await?;
201-
202-
let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
203-
"SELECT `{}` FROM `{}` WHERE `{}` = ? LIMIT 1",
204-
self.value_field, self.table, self.key_field
205-
))
206-
.bind(path)
207-
.fetch_optional(pool)
208-
.await
209-
.map_err(parse_mysql_error)?;
197+
impl Access for MysqlBackend {
198+
type Reader = Buffer;
199+
type Writer = MysqlWriter;
200+
type Lister = ();
201+
type Deleter = oio::OneShotDeleter<MysqlDeleter>;
210202

211-
Ok(value.map(Buffer::from))
203+
fn info(&self) -> Arc<AccessorInfo> {
204+
self.info.clone()
212205
}
213206

214-
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
215-
let pool = self.get_client().await?;
207+
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
208+
let p = build_abs_path(&self.root, path);
216209

217-
sqlx::query(&format!(
218-
r#"INSERT INTO `{}` (`{}`, `{}`) VALUES (?, ?)
219-
ON DUPLICATE KEY UPDATE `{}` = VALUES({})"#,
220-
self.table, self.key_field, self.value_field, self.value_field, self.value_field
221-
))
222-
.bind(path)
223-
.bind(value.to_vec())
224-
.execute(pool)
225-
.await
226-
.map_err(parse_mysql_error)?;
210+
if p == build_abs_path(&self.root, "") {
211+
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
212+
} else {
213+
let bs = self.core.get(&p).await?;
214+
match bs {
215+
Some(bs) => Ok(RpStat::new(
216+
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
217+
)),
218+
None => Err(Error::new(ErrorKind::NotFound, "kv not found in mysql")),
219+
}
220+
}
221+
}
227222

228-
Ok(())
223+
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
224+
let p = build_abs_path(&self.root, path);
225+
let bs = match self.core.get(&p).await? {
226+
Some(bs) => bs,
227+
None => return Err(Error::new(ErrorKind::NotFound, "kv not found in mysql")),
228+
};
229+
Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
229230
}
230231

231-
async fn delete(&self, path: &str) -> Result<()> {
232-
let pool = self.get_client().await?;
232+
async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
233+
let p = build_abs_path(&self.root, path);
234+
Ok((RpWrite::new(), MysqlWriter::new(self.core.clone(), p)))
235+
}
233236

234-
sqlx::query(&format!(
235-
"DELETE FROM `{}` WHERE `{}` = ?",
236-
self.table, self.key_field
237+
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
238+
Ok((
239+
RpDelete::default(),
240+
oio::OneShotDeleter::new(MysqlDeleter::new(self.core.clone(), self.root.clone())),
237241
))
238-
.bind(path)
239-
.execute(pool)
240-
.await
241-
.map_err(parse_mysql_error)?;
242-
243-
Ok(())
244242
}
245-
}
246243

247-
fn parse_mysql_error(err: sqlx::Error) -> Error {
248-
Error::new(ErrorKind::Unexpected, "unhandled error from mysql").set_source(err)
244+
async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
245+
let _ = build_abs_path(&self.root, path);
246+
Ok((RpList::default(), ()))
247+
}
249248
}

core/src/services/mysql/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
use std::fmt::Debug;
1919
use std::fmt::Formatter;
2020

21-
use super::backend::MysqlBuilder;
2221
use serde::Deserialize;
2322
use serde::Serialize;
2423

24+
use super::backend::MysqlBuilder;
25+
2526
/// Config for Mysql services support.
2627
#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2728
#[serde(default)]
@@ -68,6 +69,7 @@ impl Debug for MysqlConfig {
6869

6970
impl crate::Configurator for MysqlConfig {
7071
type Builder = MysqlBuilder;
72+
7173
fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
7274
let mut map = uri.options().clone();
7375

core/src/services/mysql/core.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use sqlx::MySqlPool;
19+
use sqlx::mysql::MySqlConnectOptions;
20+
use tokio::sync::OnceCell;
21+
22+
use crate::*;
23+
24+
#[derive(Clone, Debug)]
25+
pub struct MysqlCore {
26+
pub pool: OnceCell<MySqlPool>,
27+
pub config: MySqlConnectOptions,
28+
29+
pub table: String,
30+
pub key_field: String,
31+
pub value_field: String,
32+
}
33+
34+
impl MysqlCore {
35+
async fn get_client(&self) -> Result<&MySqlPool> {
36+
self.pool
37+
.get_or_try_init(|| async {
38+
let pool = MySqlPool::connect_with(self.config.clone())
39+
.await
40+
.map_err(parse_mysql_error)?;
41+
Ok(pool)
42+
})
43+
.await
44+
}
45+
46+
pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
47+
let pool = self.get_client().await?;
48+
49+
let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
50+
"SELECT `{}` FROM `{}` WHERE `{}` = ? LIMIT 1",
51+
self.value_field, self.table, self.key_field
52+
))
53+
.bind(path)
54+
.fetch_optional(pool)
55+
.await
56+
.map_err(parse_mysql_error)?;
57+
58+
Ok(value.map(Buffer::from))
59+
}
60+
61+
pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
62+
let pool = self.get_client().await?;
63+
64+
sqlx::query(&format!(
65+
r#"INSERT INTO `{}` (`{}`, `{}`) VALUES (?, ?)
66+
ON DUPLICATE KEY UPDATE `{}` = VALUES({})"#,
67+
self.table, self.key_field, self.value_field, self.value_field, self.value_field
68+
))
69+
.bind(path)
70+
.bind(value.to_vec())
71+
.execute(pool)
72+
.await
73+
.map_err(parse_mysql_error)?;
74+
75+
Ok(())
76+
}
77+
78+
pub async fn delete(&self, path: &str) -> Result<()> {
79+
let pool = self.get_client().await?;
80+
81+
sqlx::query(&format!(
82+
"DELETE FROM `{}` WHERE `{}` = ?",
83+
self.table, self.key_field
84+
))
85+
.bind(path)
86+
.execute(pool)
87+
.await
88+
.map_err(parse_mysql_error)?;
89+
90+
Ok(())
91+
}
92+
}
93+
94+
fn parse_mysql_error(err: sqlx::Error) -> Error {
95+
Error::new(ErrorKind::Unexpected, "unhandled error from mysql").set_source(err)
96+
}

core/src/services/mysql/deleter.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use super::core::*;
21+
use crate::raw::oio;
22+
use crate::raw::*;
23+
use crate::*;
24+
25+
pub struct MysqlDeleter {
26+
core: Arc<MysqlCore>,
27+
root: String,
28+
}
29+
30+
impl MysqlDeleter {
31+
pub fn new(core: Arc<MysqlCore>, root: String) -> Self {
32+
Self { core, root }
33+
}
34+
}
35+
36+
impl oio::OneShotDelete for MysqlDeleter {
37+
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
38+
let p = build_abs_path(&self.root, &path);
39+
self.core.delete(&p).await?;
40+
Ok(())
41+
}
42+
}

core/src/services/mysql/docs.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,15 @@
22

33
This service can be used to:
44

5+
- [ ] create_dir
56
- [x] stat
67
- [x] read
78
- [x] write
8-
- [x] create_dir
99
- [x] delete
1010
- [ ] copy
1111
- [ ] rename
12-
- [ ] ~~list~~
12+
- [ ] list
1313
- [ ] ~~presign~~
14-
- [ ] blocking
1514

1615
## Configuration
1716

core/src/services/mysql/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
// under the License.
1717

1818
mod backend;
19+
mod core;
20+
mod deleter;
21+
mod writer;
22+
1923
pub use backend::MysqlBuilder as Mysql;
2024

2125
mod config;

0 commit comments

Comments
 (0)