Skip to content

Commit

Permalink
Use async fn in trait for Service definition (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Jan 7, 2024
1 parent f952443 commit 1ba331a
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 353 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
fail-fast: false
matrix:
version:
- 1.66.0 # MSRV
- 1.75.0 # MSRV
- stable
- nightly

Expand All @@ -34,7 +34,7 @@ jobs:
uses: Swatinem/[email protected]

- name: Cache cargo tarpaulin
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.75.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
uses: actions/cache@v1
with:
path: ~/.cargo/bin
Expand All @@ -48,19 +48,19 @@ jobs:
args: --all --no-fail-fast --features=ntex/tokio -- --nocapture

- name: Install tarpaulin
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.75.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
run: |
cargo install cargo-tarpaulin
- name: Generate coverage report
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.75.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
run: |
cargo tarpaulin --out Xml --all --features=ntex/tokio
- name: Upload to Codecov
if: matrix.version == '1.65.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
if: matrix.version == '1.75.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
uses: codecov/codecov-action@v1
with:
Expand All @@ -69,7 +69,7 @@ jobs:
- name: Install cargo-cache
continue-on-error: true
run: |
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
cargo install cargo-cache --no-default-features --features ci-autoclean
- name: Clear the cargo caches
continue-on-error: true
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.0.0-b.0] - 2024-01-07

* Use "async fn" in trait for Service definition

## [0.8.9] - 2024-01-04

* Remove internal circular references
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.8.9"
version = "1.0.0-b.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,7 +24,7 @@ default = []
frame-trace = []

[dependencies]
ntex = "0.7.16"
ntex = "1.0.0-b.0"
ntex-amqp-codec = "0.9.1"

bitflags = "2.4"
Expand All @@ -36,7 +36,7 @@ uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
env_logger = "0.10"
ntex = { version = "0.7", features = ["tokio"] }
ntex = { version = "1.0.0-b.0", features = ["tokio"] }

[patch.crates-io]
ntex-amqp = { path = "." }
Expand Down
4 changes: 2 additions & 2 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where
self.remote_config.timeout_remote_secs().into(),
);

IoDispatcher::with_config(
IoDispatcher::new(
self.io,
self.codec,
dispatcher,
Expand All @@ -91,7 +91,7 @@ where
self.remote_config.timeout_remote_secs().into(),
);

IoDispatcher::with_config(
IoDispatcher::new(
self.io,
self.codec,
dispatcher,
Expand Down
31 changes: 16 additions & 15 deletions src/default.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::marker::PhantomData;

use ntex::service::{Service, ServiceCtx, ServiceFactory};
use ntex::util::Ready;

use crate::error::LinkError;
use crate::{types::Link, ControlFrame, State};
Expand All @@ -20,22 +19,23 @@ impl<S, E> ServiceFactory<Link<S>, State<S>> for DefaultPublishService<S, E> {
type Error = E;
type InitError = LinkError;
type Service = DefaultPublishService<S, E>;
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;

fn create(&self, _: State<S>) -> Self::Future<'_> {
Ready::Err(LinkError::force_detach().description("not configured"))
async fn create(&self, _: State<S>) -> Result<Self::Service, Self::InitError> {
Err(LinkError::force_detach().description("not configured"))
}
}

impl<S, E> Service<Link<S>> for DefaultPublishService<S, E> {
type Response = ();
type Error = E;
type Future<'f> = Ready<Self::Response, Self::Error> where Self: 'f;

#[inline]
fn call<'a>(&'a self, _: Link<S>, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
async fn call(
&self,
_: Link<S>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
log::warn!("AMQP Publish service is not configured");
Ready::Ok(())
Ok(())
}
}

Expand All @@ -53,20 +53,21 @@ impl<S, E> ServiceFactory<ControlFrame, State<S>> for DefaultControlService<S, E
type Error = E;
type InitError = E;
type Service = DefaultControlService<S, E>;
type Future<'f> = Ready<Self::Service, Self::InitError> where Self: 'f;

fn create(&self, _: State<S>) -> Self::Future<'_> {
Ready::Ok(DefaultControlService(PhantomData))
async fn create(&self, _: State<S>) -> Result<Self::Service, Self::InitError> {
Ok(DefaultControlService(PhantomData))
}
}

impl<S, E> Service<ControlFrame> for DefaultControlService<S, E> {
type Response = ();
type Error = E;
type Future<'f> = Ready<Self::Response, Self::Error> where Self: 'f;

#[inline]
fn call<'a>(&'a self, _: ControlFrame, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
Ready::Ok(())
async fn call(
&self,
_: ControlFrame,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
Ok(())
}
}
50 changes: 22 additions & 28 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::VecDeque;
use std::{cell, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll};

use ntex::service::{Pipeline, Service, ServiceCall, ServiceCtx};
use ntex::service::{Pipeline, Service, ServiceCtx};
use ntex::time::{sleep, Millis, Sleep};
use ntex::util::{ready, BoxFuture, Either, Ready};
use ntex::util::{ready, BoxFuture, Either};
use ntex::{io::DispatchItem, rt::spawn, task::LocalWaker};

use crate::codec::{protocol::Frame, AmqpCodec, AmqpFrame};
Expand Down Expand Up @@ -201,10 +201,6 @@ where
{
type Response = Option<AmqpFrame>;
type Error = AmqpDispatcherError;
type Future<'f> = Either<
ServiceResult<'f, ServiceCall<'f, Sr, types::Message>, Sr::Error>,
Ready<Self::Response, Self::Error>,
>;

fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.ctl_queue.waker.register(cx.waker());
Expand Down Expand Up @@ -281,11 +277,11 @@ where
}
}

fn call<'a>(
&'a self,
async fn call(
&self,
request: DispatchItem<AmqpCodec<AmqpFrame>>,
_: ServiceCtx<'a, Self>,
) -> Self::Future<'a> {
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
match request {
DispatchItem::Item(frame) => {
#[cfg(feature = "frame-trace")]
Expand All @@ -297,20 +293,20 @@ where
.map_err(AmqpDispatcherError::Protocol)
{
Ok(a) => a,
Err(e) => return Either::Right(Ready::Err(e)),
Err(e) => return Err(e),
};

match action {
types::Action::Transfer(link) => {
return if self.sink.is_opened() {
Either::Left(ServiceResult {
link: link.clone(),
fut: self.service.call(types::Message::Transfer(link)),
_t: marker::PhantomData,
})
} else {
Either::Right(Ready::Ok(None))
};
if self.sink.is_opened() {
let lnk = link.clone();
if let Err(e) = self.service.call(types::Message::Transfer(link)).await
{
let e = Error::from(e);
log::trace!("Service error {:?}", e);
let _ = lnk.close_with_error(e);
}
}
}
types::Action::Flow(link, frm) => {
// apply flow to specific link
Expand Down Expand Up @@ -375,35 +371,33 @@ where
types::Action::None => (),
};

Either::Right(Ready::Ok(None))
Ok(None)
}
DispatchItem::EncoderError(err) | DispatchItem::DecoderError(err) => {
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::ProtocolError(
err.into(),
)));
Either::Right(Ready::Ok(None))
Ok(None)
}
DispatchItem::KeepAliveTimeout => {
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::ProtocolError(
AmqpProtocolError::KeepAliveTimeout,
)));
Either::Right(Ready::Ok(None))
Ok(None)
}
DispatchItem::ReadTimeout => {
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::ProtocolError(
AmqpProtocolError::ReadTimeout,
)));
Either::Right(Ready::Ok(None))
Ok(None)
}
DispatchItem::Disconnect(e) => {
self.call_control_service(ControlFrame::new_kind(ControlFrameKind::Disconnected(
e,
)));
Either::Right(Ready::Ok(None))
}
DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => {
Either::Right(Ready::Ok(None))
Ok(None)
}
DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => Ok(None),
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/rcvlink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{collections::VecDeque, future::Future, hash, pin::Pin, task::Context, task::Poll};
use std::{
collections::VecDeque, future::poll_fn, future::Future, hash, pin::Pin, task::Context,
task::Poll,
};

use ntex::util::{poll_fn, ByteString, BytesMut, PoolRef, Stream};
use ntex::util::{ByteString, BytesMut, PoolRef, Stream};
use ntex::{channel::oneshot, task::LocalWaker};
use ntex_amqp_codec::protocol::{
self as codec, Attach, DeliveryNumber, Disposition, Error, Handle, LinkError,
Expand Down
Loading

0 comments on commit 1ba331a

Please sign in to comment.