|
1 | 1 | use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll}; |
2 | 2 |
|
3 | | -use crate::{Service, ServiceCtx, ctx::WaitersRef}; |
| 3 | +use crate::{IntoService, Service, ServiceCtx, ctx::WaitersRef}; |
4 | 4 |
|
5 | 5 | #[derive(Debug)] |
6 | 6 | /// Container for a service. |
@@ -167,6 +167,79 @@ impl<S: fmt::Debug> fmt::Debug for PipelineState<S> { |
167 | 167 | } |
168 | 168 | } |
169 | 169 |
|
| 170 | +#[derive(Debug)] |
| 171 | +/// Service wrapper for Pipeline |
| 172 | +pub struct PipelineSvc<S> { |
| 173 | + inner: Pipeline<S>, |
| 174 | +} |
| 175 | + |
| 176 | +impl<S> PipelineSvc<S> { |
| 177 | + #[inline] |
| 178 | + /// Construct new PipelineSvc |
| 179 | + pub fn new(inner: Pipeline<S>) -> Self { |
| 180 | + Self { inner } |
| 181 | + } |
| 182 | +} |
| 183 | + |
| 184 | +impl<S, Req> Service<Req> for PipelineSvc<S> |
| 185 | +where |
| 186 | + S: Service<Req>, |
| 187 | +{ |
| 188 | + type Response = S::Response; |
| 189 | + type Error = S::Error; |
| 190 | + |
| 191 | + #[inline] |
| 192 | + async fn call( |
| 193 | + &self, |
| 194 | + req: Req, |
| 195 | + _: ServiceCtx<'_, Self>, |
| 196 | + ) -> Result<Self::Response, Self::Error> { |
| 197 | + self.inner.call(req).await |
| 198 | + } |
| 199 | + |
| 200 | + #[inline] |
| 201 | + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { |
| 202 | + self.inner.ready().await |
| 203 | + } |
| 204 | + |
| 205 | + #[inline] |
| 206 | + async fn shutdown(&self) { |
| 207 | + self.inner.shutdown().await |
| 208 | + } |
| 209 | + |
| 210 | + #[inline] |
| 211 | + fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> { |
| 212 | + self.inner.poll(cx) |
| 213 | + } |
| 214 | +} |
| 215 | + |
| 216 | +impl<S> From<S> for PipelineSvc<S> { |
| 217 | + #[inline] |
| 218 | + fn from(svc: S) -> Self { |
| 219 | + PipelineSvc { |
| 220 | + inner: Pipeline::new(svc), |
| 221 | + } |
| 222 | + } |
| 223 | +} |
| 224 | + |
| 225 | +impl<S> Clone for PipelineSvc<S> { |
| 226 | + fn clone(&self) -> Self { |
| 227 | + PipelineSvc { |
| 228 | + inner: self.inner.clone(), |
| 229 | + } |
| 230 | + } |
| 231 | +} |
| 232 | + |
| 233 | +impl<S, R> IntoService<PipelineSvc<S>, R> for Pipeline<S> |
| 234 | +where |
| 235 | + S: Service<R>, |
| 236 | +{ |
| 237 | + #[inline] |
| 238 | + fn into_service(self) -> PipelineSvc<S> { |
| 239 | + PipelineSvc::new(self) |
| 240 | + } |
| 241 | +} |
| 242 | + |
170 | 243 | /// Bound container for a service. |
171 | 244 | pub struct PipelineBinding<S, R> |
172 | 245 | where |
@@ -421,3 +494,64 @@ where |
421 | 494 | }) |
422 | 495 | } |
423 | 496 | } |
| 497 | + |
| 498 | +#[cfg(test)] |
| 499 | +mod tests { |
| 500 | + use std::{cell::Cell, rc::Rc}; |
| 501 | + |
| 502 | + use super::*; |
| 503 | + |
| 504 | + #[derive(Debug, Default, Clone)] |
| 505 | + struct Srv(Rc<Cell<usize>>); |
| 506 | + |
| 507 | + impl Service<()> for Srv { |
| 508 | + type Response = (); |
| 509 | + type Error = (); |
| 510 | + |
| 511 | + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { |
| 512 | + Ok(()) |
| 513 | + } |
| 514 | + |
| 515 | + async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> { |
| 516 | + Ok(()) |
| 517 | + } |
| 518 | + |
| 519 | + async fn shutdown(&self) { |
| 520 | + self.0.set(self.0.get() + 1); |
| 521 | + } |
| 522 | + } |
| 523 | + |
| 524 | + #[ntex::test] |
| 525 | + async fn pipeline_service() { |
| 526 | + let cnt_sht = Rc::new(Cell::new(0)); |
| 527 | + let srv = Pipeline::new( |
| 528 | + Pipeline::new(Srv(cnt_sht.clone()).map(|_| "ok")) |
| 529 | + .into_service() |
| 530 | + .clone(), |
| 531 | + ); |
| 532 | + let res = srv.call(()).await; |
| 533 | + assert!(res.is_ok()); |
| 534 | + assert_eq!(res.unwrap(), "ok"); |
| 535 | + |
| 536 | + let res = srv.ready().await; |
| 537 | + assert_eq!(res, Ok(())); |
| 538 | + |
| 539 | + srv.shutdown().await; |
| 540 | + assert_eq!(cnt_sht.get(), 1); |
| 541 | + let _ = format!("{srv:?}"); |
| 542 | + |
| 543 | + let cnt_sht = Rc::new(Cell::new(0)); |
| 544 | + let svc = Srv(cnt_sht.clone()).map(|_| "ok"); |
| 545 | + let srv = Pipeline::new(PipelineSvc::from(&svc)); |
| 546 | + let res = srv.call(()).await; |
| 547 | + assert!(res.is_ok()); |
| 548 | + assert_eq!(res.unwrap(), "ok"); |
| 549 | + |
| 550 | + let res = srv.ready().await; |
| 551 | + assert_eq!(res, Ok(())); |
| 552 | + |
| 553 | + srv.shutdown().await; |
| 554 | + assert_eq!(cnt_sht.get(), 1); |
| 555 | + let _ = format!("{srv:?}"); |
| 556 | + } |
| 557 | +} |
0 commit comments