Skip to content

Commit

Permalink
Update ntex-service (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Jun 22, 2023
1 parent f0bc879 commit 319207a
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 31 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.8.0] - 2023-06-22

* Release v0.8.0

## [0.8.0-beta.3] - 2023-06-19

* Use ServiceCtx instead of Ctx
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.8.0-beta.3"
version = "0.8.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,7 +24,7 @@ default = []
frame-trace = []

[dependencies]
ntex = "0.7.0-beta.1"
ntex = "0.7.0"
ntex-amqp-codec = "0.9.0"

bitflags = "1.3"
Expand All @@ -36,7 +36,7 @@ uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
env_logger = "0.10"
ntex = { version = "0.7.0-beta.1", features = ["tokio"] }
ntex = { version = "0.7.0", features = ["tokio"] }

[patch.crates-io]
ntex-amqp = { path = "." }
Expand Down
10 changes: 5 additions & 5 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ntex::io::{Dispatcher as IoDispatcher, IoBoxed};
use ntex::service::{fn_service, Container, IntoService, Service};
use ntex::service::{fn_service, IntoService, Pipeline, Service};
use ntex::{time::Seconds, util::Ready};

use crate::codec::{AmqpCodec, AmqpFrame};
Expand Down Expand Up @@ -66,8 +66,8 @@ where
pub async fn start_default(self) -> Result<(), AmqpDispatcherError> {
let dispatcher = Dispatcher::new(
self.connection,
Container::new(fn_service(|_| Ready::<_, LinkError>::Ok(()))),
Container::new(fn_service(|_| Ready::<_, LinkError>::Ok(()))),
Pipeline::new(fn_service(|_| Ready::<_, LinkError>::Ok(()))),
Pipeline::new(fn_service(|_| Ready::<_, LinkError>::Ok(()))),
self.remote_config.timeout_remote_secs().into(),
);

Expand All @@ -92,8 +92,8 @@ where
{
let dispatcher = Dispatcher::new(
self.connection,
Container::new(fn_service(|_| Ready::<_, S::Error>::Ok(()))),
Container::new(service.into_service()),
Pipeline::new(fn_service(|_| Ready::<_, S::Error>::Ok(()))),
Pipeline::new(service.into_service()),
self.remote_config.timeout_remote_secs().into(),
);

Expand Down
10 changes: 5 additions & 5 deletions src/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{future::Future, marker::PhantomData};

use ntex::connect::{self, Address, Connect};
use ntex::io::IoBoxed;
use ntex::service::{Container, Service};
use ntex::service::{Pipeline, Service};
use ntex::time::{timeout_checked, Seconds};
use ntex::util::{ByteString, PoolId, PoolRef};

Expand All @@ -14,7 +14,7 @@ use super::{connection::Client, error::ConnectError, SaslAuth};

/// Amqp client connector
pub struct Connector<A, T = ()> {
connector: Container<T>,
connector: Pipeline<T>,
config: Configuration,
handshake_timeout: Seconds,
disconnect_timeout: Seconds,
Expand All @@ -27,7 +27,7 @@ impl<A> Connector<A> {
/// Create new amqp connector
pub fn new() -> Connector<A, connect::Connector<A>> {
Connector {
connector: Container::new(connect::Connector::default()),
connector: Pipeline::new(connect::Connector::default()),
handshake_timeout: Seconds::ZERO,
disconnect_timeout: Seconds(3),
config: Configuration::default(),
Expand Down Expand Up @@ -112,7 +112,7 @@ where
}

/// Use custom connector
pub fn connector<U>(self, connector: Container<U>) -> Connector<A, U>
pub fn connector<U>(self, connector: Pipeline<U>) -> Connector<A, U>
where
U: Service<Connect<A>, Error = connect::ConnectError>,
IoBoxed: From<U::Response>,
Expand All @@ -130,7 +130,7 @@ where
#[doc(hidden)]
#[deprecated]
/// Use custom connector
pub fn boxed_connector<U>(self, connector: Container<U>) -> Connector<A, U>
pub fn boxed_connector<U>(self, connector: Pipeline<U>) -> Connector<A, U>
where
U: Service<Connect<A>, Response = IoBoxed, Error = connect::ConnectError>,
{
Expand Down
12 changes: 6 additions & 6 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::VecDeque;
use std::{cell, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll};

use ntex::service::{Container, Service, ServiceCall, ServiceCtx};
use ntex::service::{Pipeline, Service, ServiceCall, ServiceCtx};
use ntex::time::{sleep, Millis, Sleep};
use ntex::util::{ready, BoxFuture, Either, Ready};
use ntex::{io::DispatchItem, rt::spawn, task::LocalWaker};
Expand All @@ -28,8 +28,8 @@ impl ControlQueue {
/// Amqp server dispatcher service.
pub(crate) struct Dispatcher<Sr, Ctl: Service<ControlFrame>> {
sink: Connection,
service: Container<Sr>,
ctl_service: Container<Ctl>,
service: Pipeline<Sr>,
ctl_service: Pipeline<Ctl>,
ctl_fut: cell::RefCell<Vec<ControlItem<Ctl::Response, Ctl::Error>>>,
ctl_queue: Rc<ControlQueue>,
shutdown: cell::RefCell<Option<BoxFuture<'static, ()>>>,
Expand All @@ -45,8 +45,8 @@ where
{
pub(crate) fn new(
sink: Connection,
service: Container<Sr>,
ctl_service: Container<Ctl>,
service: Pipeline<Sr>,
ctl_service: Pipeline<Ctl>,
idle_timeout: Millis,
) -> Self {
let ctl_queue = sink.get_control_queue().clone();
Expand Down Expand Up @@ -298,7 +298,7 @@ where
types::Action::Transfer(link) => {
return Either::Left(ServiceResult {
link: link.clone(),
fut: self.service.call(types::Message::Transfer(link)),
fut: self.service.service_call(types::Message::Transfer(link)),
_t: marker::PhantomData,
});
}
Expand Down
8 changes: 4 additions & 4 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{convert::TryFrom, future::Future, marker, pin::Pin, rc::Rc, task::Cont

use ntex::router::{IntoPattern, Router as PatternRouter};
use ntex::service::{
boxed, fn_factory_with_config, Container, IntoServiceFactory, Service, ServiceCall, ServiceCtx,
boxed, fn_factory_with_config, IntoServiceFactory, Pipeline, Service, ServiceCall, ServiceCtx,
ServiceFactory,
};
use ntex::util::{join_all, BoxFuture, Either, HashMap, Ready};
Expand Down Expand Up @@ -76,7 +76,7 @@ struct RouterService<S>(Cell<RouterServiceInner<S>>);
struct RouterServiceInner<S> {
state: State<S>,
router: Rc<PatternRouter<Handle<S>>>,
handlers: HashMap<ReceiverLink, Option<Container<HandleService>>>,
handlers: HashMap<ReceiverLink, Option<Pipeline<HandleService>>>,
}

impl<S: 'static> Service<Message> for RouterService<S> {
Expand Down Expand Up @@ -247,7 +247,7 @@ impl<'f, S> Future for RouterServiceResponse<'f, S> {
};

this.state =
RouterServiceResponseState::Transfer(srv.call(tr), delivery_id);
RouterServiceResponseState::Transfer(srv.service_call(tr), delivery_id);
} else {
return Poll::Ready(Ok(()));
}
Expand Down Expand Up @@ -281,7 +281,7 @@ impl<'f, S> Future for RouterServiceResponse<'f, S> {
this.inner
.get_mut()
.handlers
.insert(this.link.clone(), Some(Container::new(srv)));
.insert(this.link.clone(), Some(Pipeline::new(srv)));
if let Some(tr) = this.link.get_transfer() {
this.state = RouterServiceResponseState::Service(Some(tr));
} else {
Expand Down
16 changes: 8 additions & 8 deletions src/server/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{fmt, future::Future, marker, pin::Pin, rc::Rc};

use ntex::io::{Dispatcher as FramedDispatcher, Filter, Io, IoBoxed};
use ntex::service::{Container, IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use ntex::service::{IntoServiceFactory, Pipeline, Service, ServiceCtx, ServiceFactory};
use ntex::time::{timeout_checked, Millis, Seconds};
use ntex::util::BoxFuture;

Expand Down Expand Up @@ -169,7 +169,7 @@ where
fn create(&self, _: ()) -> Self::Future<'_> {
Box::pin(async move {
self.handshake
.container(())
.pipeline(())
.await
.map(move |handshake| ServerHandler {
handshake,
Expand Down Expand Up @@ -198,7 +198,7 @@ where
fn create(&self, _: ()) -> Self::Future<'_> {
Box::pin(async move {
self.handshake
.container(())
.pipeline(())
.await
.map(move |handshake| ServerHandler {
handshake,
Expand All @@ -210,7 +210,7 @@ where

/// Amqp connections handler
pub struct ServerHandler<St, H, Ctl, Pb> {
handshake: Container<H>,
handshake: Pipeline<H>,
inner: Rc<ServerInner<St, Ctl, Pb>>,
}

Expand Down Expand Up @@ -247,13 +247,13 @@ where
.map_err(|_| HandshakeError::Timeout)??;

// create publish service
let pb_srv = inner.publish.container(st.clone()).await.map_err(|e| {
let pb_srv = inner.publish.pipeline(st.clone()).await.map_err(|e| {
error!("Publish service init error: {:?}", e);
ServerError::PublishServiceError
})?;

// create control service
let ctl_srv = inner.control.container(st.clone()).await.map_err(|e| {
let ctl_srv = inner.control.pipeline(st.clone()).await.map_err(|e| {
error!("Control service init error: {:?}", e);
ServerError::ControlServiceError
})?;
Expand Down Expand Up @@ -319,7 +319,7 @@ where
async fn handshake<St, H, Ctl, Pb>(
state: IoBoxed,
max_size: usize,
handshake: Container<H>,
handshake: Pipeline<H>,
inner: Rc<ServerInner<St, Ctl, Pb>>,
) -> Result<(IoBoxed, AmqpCodec<AmqpFrame>, Connection, State<St>, Millis), ServerError<H::Error>>
where
Expand Down Expand Up @@ -348,7 +348,7 @@ where

// handshake protocol
let ack = handshake
.call(if protocol == ProtocolId::Amqp {
.service_call(if protocol == ProtocolId::Amqp {
Handshake::new_plain(state, inner.config.clone())
} else {
Handshake::new_sasl(state, inner.config.clone())
Expand Down

0 comments on commit 319207a

Please sign in to comment.