From 319207a1ad88ced5482549919491c8f469d3bad4 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 22 Jun 2023 19:20:20 +0600 Subject: [PATCH] Update ntex-service (#31) --- CHANGES.md | 4 ++++ Cargo.toml | 6 +++--- src/client/connection.rs | 10 +++++----- src/client/connector.rs | 10 +++++----- src/dispatcher.rs | 12 ++++++------ src/router.rs | 8 ++++---- src/server/service.rs | 16 ++++++++-------- 7 files changed, 35 insertions(+), 31 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6ead6c7..24006dd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.8.0] - 2023-06-22 + +* Release v0.8.0 + ## [0.8.0-beta.3] - 2023-06-19 * Use ServiceCtx instead of Ctx diff --git a/Cargo.toml b/Cargo.toml index da3deb5..d70f203 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "0.8.0-beta.3" +version = "0.8.0" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" @@ -24,7 +24,7 @@ default = [] frame-trace = [] [dependencies] -ntex = "0.7.0-beta.1" +ntex = "0.7.0" ntex-amqp-codec = "0.9.0" bitflags = "1.3" @@ -36,7 +36,7 @@ uuid = { version = "1", features = ["v4"] } [dev-dependencies] env_logger = "0.10" -ntex = { version = "0.7.0-beta.1", features = ["tokio"] } +ntex = { version = "0.7.0", features = ["tokio"] } [patch.crates-io] ntex-amqp = { path = "." } diff --git a/src/client/connection.rs b/src/client/connection.rs index 45c939a..742da72 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -1,5 +1,5 @@ use ntex::io::{Dispatcher as IoDispatcher, IoBoxed}; -use ntex::service::{fn_service, Container, IntoService, Service}; +use ntex::service::{fn_service, IntoService, Pipeline, Service}; use ntex::{time::Seconds, util::Ready}; use crate::codec::{AmqpCodec, AmqpFrame}; @@ -66,8 +66,8 @@ where pub async fn start_default(self) -> Result<(), AmqpDispatcherError> { let dispatcher = Dispatcher::new( self.connection, - Container::new(fn_service(|_| Ready::<_, LinkError>::Ok(()))), - Container::new(fn_service(|_| Ready::<_, LinkError>::Ok(()))), + Pipeline::new(fn_service(|_| Ready::<_, LinkError>::Ok(()))), + Pipeline::new(fn_service(|_| Ready::<_, LinkError>::Ok(()))), self.remote_config.timeout_remote_secs().into(), ); @@ -92,8 +92,8 @@ where { let dispatcher = Dispatcher::new( self.connection, - Container::new(fn_service(|_| Ready::<_, S::Error>::Ok(()))), - Container::new(service.into_service()), + Pipeline::new(fn_service(|_| Ready::<_, S::Error>::Ok(()))), + Pipeline::new(service.into_service()), self.remote_config.timeout_remote_secs().into(), ); diff --git a/src/client/connector.rs b/src/client/connector.rs index 2f1906f..78676b8 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -2,7 +2,7 @@ use std::{future::Future, marker::PhantomData}; use ntex::connect::{self, Address, Connect}; use ntex::io::IoBoxed; -use ntex::service::{Container, Service}; +use ntex::service::{Pipeline, Service}; use ntex::time::{timeout_checked, Seconds}; use ntex::util::{ByteString, PoolId, PoolRef}; @@ -14,7 +14,7 @@ use super::{connection::Client, error::ConnectError, SaslAuth}; /// Amqp client connector pub struct Connector { - connector: Container, + connector: Pipeline, config: Configuration, handshake_timeout: Seconds, disconnect_timeout: Seconds, @@ -27,7 +27,7 @@ impl Connector { /// Create new amqp connector pub fn new() -> Connector> { Connector { - connector: Container::new(connect::Connector::default()), + connector: Pipeline::new(connect::Connector::default()), handshake_timeout: Seconds::ZERO, disconnect_timeout: Seconds(3), config: Configuration::default(), @@ -112,7 +112,7 @@ where } /// Use custom connector - pub fn connector(self, connector: Container) -> Connector + pub fn connector(self, connector: Pipeline) -> Connector where U: Service, Error = connect::ConnectError>, IoBoxed: From, @@ -130,7 +130,7 @@ where #[doc(hidden)] #[deprecated] /// Use custom connector - pub fn boxed_connector(self, connector: Container) -> Connector + pub fn boxed_connector(self, connector: Pipeline) -> Connector where U: Service, Response = IoBoxed, Error = connect::ConnectError>, { diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 0e1626a..b181a1c 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -1,7 +1,7 @@ use std::collections::VecDeque; use std::{cell, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll}; -use ntex::service::{Container, Service, ServiceCall, ServiceCtx}; +use ntex::service::{Pipeline, Service, ServiceCall, ServiceCtx}; use ntex::time::{sleep, Millis, Sleep}; use ntex::util::{ready, BoxFuture, Either, Ready}; use ntex::{io::DispatchItem, rt::spawn, task::LocalWaker}; @@ -28,8 +28,8 @@ impl ControlQueue { /// Amqp server dispatcher service. pub(crate) struct Dispatcher> { sink: Connection, - service: Container, - ctl_service: Container, + service: Pipeline, + ctl_service: Pipeline, ctl_fut: cell::RefCell>>, ctl_queue: Rc, shutdown: cell::RefCell>>, @@ -45,8 +45,8 @@ where { pub(crate) fn new( sink: Connection, - service: Container, - ctl_service: Container, + service: Pipeline, + ctl_service: Pipeline, idle_timeout: Millis, ) -> Self { let ctl_queue = sink.get_control_queue().clone(); @@ -298,7 +298,7 @@ where types::Action::Transfer(link) => { return Either::Left(ServiceResult { link: link.clone(), - fut: self.service.call(types::Message::Transfer(link)), + fut: self.service.service_call(types::Message::Transfer(link)), _t: marker::PhantomData, }); } diff --git a/src/router.rs b/src/router.rs index 9b81a46..f88c425 100644 --- a/src/router.rs +++ b/src/router.rs @@ -2,7 +2,7 @@ use std::{convert::TryFrom, future::Future, marker, pin::Pin, rc::Rc, task::Cont use ntex::router::{IntoPattern, Router as PatternRouter}; use ntex::service::{ - boxed, fn_factory_with_config, Container, IntoServiceFactory, Service, ServiceCall, ServiceCtx, + boxed, fn_factory_with_config, IntoServiceFactory, Pipeline, Service, ServiceCall, ServiceCtx, ServiceFactory, }; use ntex::util::{join_all, BoxFuture, Either, HashMap, Ready}; @@ -76,7 +76,7 @@ struct RouterService(Cell>); struct RouterServiceInner { state: State, router: Rc>>, - handlers: HashMap>>, + handlers: HashMap>>, } impl Service for RouterService { @@ -247,7 +247,7 @@ impl<'f, S> Future for RouterServiceResponse<'f, S> { }; this.state = - RouterServiceResponseState::Transfer(srv.call(tr), delivery_id); + RouterServiceResponseState::Transfer(srv.service_call(tr), delivery_id); } else { return Poll::Ready(Ok(())); } @@ -281,7 +281,7 @@ impl<'f, S> Future for RouterServiceResponse<'f, S> { this.inner .get_mut() .handlers - .insert(this.link.clone(), Some(Container::new(srv))); + .insert(this.link.clone(), Some(Pipeline::new(srv))); if let Some(tr) = this.link.get_transfer() { this.state = RouterServiceResponseState::Service(Some(tr)); } else { diff --git a/src/server/service.rs b/src/server/service.rs index c88c3eb..1703b25 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -1,7 +1,7 @@ use std::{fmt, future::Future, marker, pin::Pin, rc::Rc}; use ntex::io::{Dispatcher as FramedDispatcher, Filter, Io, IoBoxed}; -use ntex::service::{Container, IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; +use ntex::service::{IntoServiceFactory, Pipeline, Service, ServiceCtx, ServiceFactory}; use ntex::time::{timeout_checked, Millis, Seconds}; use ntex::util::BoxFuture; @@ -169,7 +169,7 @@ where fn create(&self, _: ()) -> Self::Future<'_> { Box::pin(async move { self.handshake - .container(()) + .pipeline(()) .await .map(move |handshake| ServerHandler { handshake, @@ -198,7 +198,7 @@ where fn create(&self, _: ()) -> Self::Future<'_> { Box::pin(async move { self.handshake - .container(()) + .pipeline(()) .await .map(move |handshake| ServerHandler { handshake, @@ -210,7 +210,7 @@ where /// Amqp connections handler pub struct ServerHandler { - handshake: Container, + handshake: Pipeline, inner: Rc>, } @@ -247,13 +247,13 @@ where .map_err(|_| HandshakeError::Timeout)??; // create publish service - let pb_srv = inner.publish.container(st.clone()).await.map_err(|e| { + let pb_srv = inner.publish.pipeline(st.clone()).await.map_err(|e| { error!("Publish service init error: {:?}", e); ServerError::PublishServiceError })?; // create control service - let ctl_srv = inner.control.container(st.clone()).await.map_err(|e| { + let ctl_srv = inner.control.pipeline(st.clone()).await.map_err(|e| { error!("Control service init error: {:?}", e); ServerError::ControlServiceError })?; @@ -319,7 +319,7 @@ where async fn handshake( state: IoBoxed, max_size: usize, - handshake: Container, + handshake: Pipeline, inner: Rc>, ) -> Result<(IoBoxed, AmqpCodec, Connection, State, Millis), ServerError> where @@ -348,7 +348,7 @@ where // handshake protocol let ack = handshake - .call(if protocol == ProtocolId::Amqp { + .service_call(if protocol == ProtocolId::Amqp { Handshake::new_plain(state, inner.config.clone()) } else { Handshake::new_sasl(state, inner.config.clone())