Skip to content

Commit 9bd7b53

Browse files
committed
Add bb8-faktory connection pool
1 parent b6977c3 commit 9bd7b53

File tree

9 files changed

+1092
-18
lines changed

9 files changed

+1092
-18
lines changed

Cargo.lock

Lines changed: 748 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[workspace]
22
members = [
33
"bb8",
4+
"faktory",
45
"postgres",
56
"redis",
67
]

faktory/Cargo.toml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
[package]
2+
name = "bb8-faktory"
3+
version = "0.1.0"
4+
edition = "2021"
5+
rust-version = "1.75"
6+
description = "Full-featured async (tokio-based) Faktory work server connection pool (like r2d2)"
7+
license = "MIT"
8+
repository = "https://github.com/djc/bb8"
9+
10+
[features]
11+
default = []
12+
native-tls = ["faktory/native_tls", "dep:native-tls", "dep:tokio-native-tls", "dep:url"]
13+
rustls = ["faktory/rustls", "dep:tokio-rustls", "dep:rustls", "dep:url"]
14+
15+
[dependencies]
16+
bb8 = { version = "0.9", path = "../bb8" }
17+
faktory = "0.13"
18+
tokio = { version = "1.0.0", features = ["io-util"] }
19+
url = { version = "2", optional = true }
20+
21+
# Optional TLS dependencies
22+
native-tls = { version = "0.2", optional = true }
23+
tokio-native-tls = { version = "0.3", optional = true }
24+
tokio-rustls = { version = "0.25", optional = true }
25+
rustls = { version = "0.22", optional = true }
26+
27+
[dev-dependencies]
28+
tokio = { version = "1.0.0", features = ["macros", "rt-multi-thread"] }
29+
30+
[[example]]
31+
name = "native_tls"
32+
required-features = ["native-tls"]
33+
34+
[[example]]
35+
name = "rustls"
36+
required-features = ["rustls"]

faktory/LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../LICENSE

faktory/examples/enqueue.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use bb8::Pool;
2+
use bb8_faktory::FaktoryConnectionManager;
3+
use faktory::Job;
4+
5+
// Enqueue a job to a Faktory work server
6+
//
7+
// Run with: FAKTORY_URL="tcp://localhost:7419" cargo run --example enqueue
8+
#[tokio::main]
9+
async fn main() {
10+
let manager = FaktoryConnectionManager::from_env();
11+
12+
let pool = match Pool::builder().build(manager).await {
13+
Ok(pool) => pool,
14+
Err(e) => panic!("builder error: {e:?}"),
15+
};
16+
17+
let mut conn = pool.get().await.unwrap();
18+
conn.enqueue(Job::new("email", vec!["user@example.com"]))
19+
.await
20+
.unwrap();
21+
22+
println!("Job enqueued successfully!");
23+
}

faktory/examples/enqueue_many.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use bb8::Pool;
2+
use bb8_faktory::FaktoryConnectionManager;
3+
use faktory::Job;
4+
5+
// Enqueue multiple jobs to a Faktory work server
6+
//
7+
// Run with: FAKTORY_URL="tcp://localhost:7419" cargo run --example enqueue_many
8+
#[tokio::main]
9+
async fn main() {
10+
let manager = FaktoryConnectionManager::from_env();
11+
12+
let pool = Pool::builder().build(manager).await.unwrap();
13+
14+
let mut conn = pool.get().await.unwrap();
15+
16+
let jobs = vec![
17+
Job::new("email", vec!["user1@example.com"]),
18+
Job::new("email", vec!["user2@example.com"]),
19+
Job::new("email", vec!["user3@example.com"]),
20+
];
21+
22+
let (count, errors) = conn.enqueue_many(jobs).await.unwrap();
23+
24+
match errors {
25+
Some(errs) => println!("Enqueued {count} jobs with errors: {errs:?}"),
26+
None => println!("Successfully enqueued {count} jobs"),
27+
}
28+
}

faktory/examples/native_tls.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use bb8::Pool;
2+
use bb8_faktory::{FaktoryConnectionManager, TlsConnector};
3+
use faktory::Job;
4+
5+
// Enqueue a job using native-tls for secure connections//
6+
// Start Faktory with TLS enabled (requires TLS configuration)
7+
#[tokio::main]
8+
async fn main() {
9+
let manager = FaktoryConnectionManager::new("tcp+tls://localhost:7419")
10+
.with_tls(TlsConnector::native_tls_default().unwrap());
11+
12+
// Or with custom connector (e.g., accepting invalid certs for testing):
13+
// let connector = native_tls::TlsConnector::builder()
14+
// .danger_accept_invalid_certs(true)
15+
// .build()
16+
// .unwrap();
17+
// let manager = FaktoryConnectionManager::new("tcp+tls://localhost:7419")
18+
// .with_tls(TlsConnector::NativeTls(connector));
19+
20+
// Or from environment (auto-selects TLS when feature enabled):
21+
// let manager = FaktoryConnectionManager::from_env();
22+
23+
let pool = match Pool::builder().build(manager).await {
24+
Ok(pool) => pool,
25+
Err(e) => panic!("builder error: {e:?}"),
26+
};
27+
28+
let mut conn = pool.get().await.unwrap();
29+
conn.enqueue(Job::new("email", vec!["user@example.com"]))
30+
.await
31+
.unwrap();
32+
33+
println!("Job enqueued successfully over TLS (native-tls)!");
34+
}

faktory/examples/rustls.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use bb8::Pool;
2+
use bb8_faktory::{FaktoryConnectionManager, TlsConnector};
3+
use faktory::Job;
4+
5+
// Enqueue a job using rustls for secure connections
6+
// Start Faktory with TLS enabled (requires TLS configuration)
7+
#[tokio::main]
8+
async fn main() {
9+
let manager = FaktoryConnectionManager::new("tcp+tls://localhost:7419")
10+
.with_tls(TlsConnector::rustls_default());
11+
12+
// Or with custom ClientConfig:
13+
// let config = rustls::ClientConfig::builder()
14+
// .with_root_certificates(your_root_store)
15+
// .with_no_client_auth();
16+
// let manager = FaktoryConnectionManager::new("tcp+tls://localhost:7419")
17+
// .with_tls(TlsConnector::rustls_with_config(config));
18+
19+
// Or from environment (auto-selects TLS when feature enabled):
20+
// let manager = FaktoryConnectionManager::from_env();
21+
22+
let pool = match Pool::builder().build(manager).await {
23+
Ok(pool) => pool,
24+
Err(e) => panic!("builder error: {e:?}"),
25+
};
26+
27+
let mut conn = pool.get().await.unwrap();
28+
conn.enqueue(Job::new("email", vec!["user@example.com"]))
29+
.await
30+
.unwrap();
31+
32+
println!("Job enqueued successfully over TLS (rustls)!");
33+
}

faktory/src/lib.rs

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
//! Faktory support for the `bb8` connection pool.
2+
#![deny(missing_docs, missing_debug_implementations)]
3+
4+
pub use bb8;
5+
pub use faktory;
6+
7+
use faktory::{Client, Error};
8+
9+
#[cfg(any(feature = "native-tls", feature = "rustls"))]
10+
use url::Url;
11+
12+
#[cfg(feature = "rustls")]
13+
use std::sync::Arc;
14+
15+
/// TLS connector configuration.
16+
#[derive(Clone, Default)]
17+
pub enum TlsConnector {
18+
/// Plain TCP connection.
19+
#[default]
20+
NoTls,
21+
/// TLS using native-tls.
22+
#[cfg(feature = "native-tls")]
23+
NativeTls(native_tls::TlsConnector),
24+
/// TLS using rustls.
25+
#[cfg(feature = "rustls")]
26+
Rustls(tokio_rustls::TlsConnector),
27+
}
28+
29+
impl std::fmt::Debug for TlsConnector {
30+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31+
match self {
32+
Self::NoTls => f.debug_tuple("NoTls").finish(),
33+
#[cfg(feature = "native-tls")]
34+
Self::NativeTls(connector) => f.debug_tuple("NativeTls").field(connector).finish(),
35+
#[cfg(feature = "rustls")]
36+
Self::Rustls(_) => f.debug_tuple("Rustls").field(&"..").finish(),
37+
}
38+
}
39+
}
40+
41+
impl TlsConnector {
42+
/// Create a native-tls connector with default settings.
43+
#[cfg(feature = "native-tls")]
44+
pub fn native_tls_default() -> Result<Self, native_tls::Error> {
45+
let connector = native_tls::TlsConnector::builder().build()?;
46+
Ok(Self::NativeTls(connector))
47+
}
48+
49+
/// Create a rustls connector with default settings (empty root store).
50+
///
51+
/// Note: This uses an empty certificate store. For production use,
52+
/// consider using `rustls_default_with_native_certs()` or providing
53+
/// your own `ClientConfig`.
54+
#[cfg(feature = "rustls")]
55+
pub fn rustls_default() -> Self {
56+
let config = rustls::ClientConfig::builder()
57+
.with_root_certificates(rustls::RootCertStore::empty())
58+
.with_no_client_auth();
59+
Self::Rustls(tokio_rustls::TlsConnector::from(Arc::new(config)))
60+
}
61+
62+
/// Create a rustls connector from a custom ClientConfig.
63+
#[cfg(feature = "rustls")]
64+
pub fn rustls_with_config(config: rustls::ClientConfig) -> Self {
65+
Self::Rustls(tokio_rustls::TlsConnector::from(Arc::new(config)))
66+
}
67+
}
68+
69+
/// A `bb8::ManageConnection` for `faktory::Client`.
70+
#[derive(Debug, Clone)]
71+
pub struct FaktoryConnectionManager {
72+
url: String,
73+
tls: TlsConnector,
74+
}
75+
76+
impl FaktoryConnectionManager {
77+
/// Create a new `FaktoryConnectionManager` with the specified URL.
78+
///
79+
/// The URL should be in the format: `protocol://[:password@]hostname[:port]`
80+
///
81+
/// # Examples
82+
///
83+
/// ```
84+
/// use bb8_faktory::FaktoryConnectionManager;
85+
///
86+
/// let manager = FaktoryConnectionManager::new("tcp://localhost:7419");
87+
/// ```
88+
pub fn new(url: impl Into<String>) -> Self {
89+
Self {
90+
url: url.into(),
91+
tls: TlsConnector::default(),
92+
}
93+
}
94+
95+
/// Create a new `FaktoryConnectionManager` using environment variables.
96+
///
97+
/// This reads `FAKTORY_PROVIDER` to get the env var name (defaults to `FAKTORY_URL`),
98+
/// then reads that env var for the URL (defaults to `tcp://localhost:7419`).
99+
///
100+
/// If a TLS feature is enabled, it will automatically use that TLS implementation
101+
/// with default settings.
102+
///
103+
/// # Example
104+
///
105+
/// ```
106+
/// use bb8_faktory::FaktoryConnectionManager;
107+
///
108+
/// let manager = FaktoryConnectionManager::from_env();
109+
/// ```
110+
pub fn from_env() -> Self {
111+
let url = std::env::var("FAKTORY_PROVIDER")
112+
.ok()
113+
.and_then(|provider| std::env::var(provider).ok())
114+
.or_else(|| std::env::var("FAKTORY_URL").ok())
115+
.unwrap_or_else(|| "tcp://localhost:7419".to_string());
116+
117+
#[cfg(feature = "rustls")]
118+
let tls = TlsConnector::rustls_default();
119+
120+
#[cfg(all(feature = "native-tls", not(feature = "rustls")))]
121+
let tls = TlsConnector::native_tls_default().expect("Failed to build native-tls connector");
122+
123+
#[cfg(not(any(feature = "native-tls", feature = "rustls")))]
124+
let tls = TlsConnector::NoTls;
125+
126+
Self { url, tls }
127+
}
128+
129+
/// Set the TLS connector for the connection.
130+
///
131+
/// # Example
132+
///
133+
/// ```ignore
134+
/// use bb8_faktory::{FaktoryConnectionManager, TlsConnector};
135+
///
136+
/// let manager = FaktoryConnectionManager::new("tcp+tls://localhost:7419")
137+
/// .with_tls(TlsConnector::native_tls_default().unwrap());
138+
/// ```
139+
pub fn with_tls(mut self, tls: TlsConnector) -> Self {
140+
self.tls = tls;
141+
self
142+
}
143+
}
144+
145+
impl bb8::ManageConnection for FaktoryConnectionManager {
146+
type Connection = Client;
147+
type Error = Error;
148+
149+
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
150+
match &self.tls {
151+
TlsConnector::NoTls => Client::connect_to(&self.url).await,
152+
#[cfg(feature = "native-tls")]
153+
TlsConnector::NativeTls(connector) => {
154+
let password = Url::parse(&self.url)
155+
.ok()
156+
.and_then(|url| url.password().map(|p| p.to_string()));
157+
let stream = faktory::native_tls::TlsStream::with_connector(
158+
connector.clone(),
159+
Some(self.url.as_str()),
160+
)
161+
.await?;
162+
let buffered = tokio::io::BufStream::new(stream);
163+
Client::connect_with(buffered, password).await
164+
}
165+
#[cfg(feature = "rustls")]
166+
TlsConnector::Rustls(connector) => {
167+
let password = Url::parse(&self.url)
168+
.ok()
169+
.and_then(|url| url.password().map(|p| p.to_string()));
170+
let stream = faktory::rustls::TlsStream::with_connector(
171+
connector.clone(),
172+
Some(self.url.as_str()),
173+
)
174+
.await?;
175+
let buffered = tokio::io::BufStream::new(stream);
176+
Client::connect_with(buffered, password).await
177+
}
178+
}
179+
}
180+
181+
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
182+
conn.current_info().await.map(|_| ())
183+
}
184+
185+
fn has_broken(&self, _: &mut Self::Connection) -> bool {
186+
false
187+
}
188+
}

0 commit comments

Comments
 (0)