Skip to content

Commit

Permalink
Remove internal circular references
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Jan 4, 2024
1 parent e71c3da commit c3f1be6
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 59 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.8.9] - 2024-01-04

* Remove internal circular references

## [0.8.8] - 2024-01-03

* Use io tags for logging
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "0.8.8"
version = "0.8.9"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand Down
6 changes: 3 additions & 3 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ntex::util::Ready;
use crate::codec::{AmqpCodec, AmqpFrame};
use crate::control::ControlFrame;
use crate::error::{AmqpDispatcherError, LinkError};
use crate::{dispatcher::Dispatcher, Configuration, Connection, State};
use crate::{dispatcher::Dispatcher, Configuration, Connection, ConnectionRef, State};

/// Mqtt client
pub struct Client<St = ()> {
Expand Down Expand Up @@ -40,8 +40,8 @@ where
{
#[inline]
/// Get client sink
pub fn sink(&self) -> Connection {
self.connection.clone()
pub fn sink(&self) -> ConnectionRef {
self.connection.get_ref()
}

#[inline]
Expand Down
51 changes: 45 additions & 6 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, rc::Rc};
use std::{fmt, future::Future, ops, rc::Rc};

use ntex::channel::{condition::Condition, condition::Waiter, oneshot};
use ntex::io::IoRef;
Expand All @@ -11,9 +11,12 @@ use crate::session::{Session, SessionInner};
use crate::sndlink::{SenderLink, SenderLinkInner};
use crate::{cell::Cell, error::AmqpProtocolError, types::Action, Configuration};

pub struct Connection(ConnectionRef);

#[derive(Clone)]
pub struct Connection(pub(crate) Cell<ConnectionInner>);
pub struct ConnectionRef(pub(crate) Cell<ConnectionInner>);

#[derive(Debug)]
pub(crate) struct ConnectionInner {
io: IoRef,
state: ConnectionState,
Expand All @@ -27,6 +30,7 @@ pub(crate) struct ConnectionInner {
pub(crate) max_frame_size: u32,
}

#[derive(Debug)]
pub(crate) enum SessionState {
Opening(Option<oneshot::Sender<Session>>, Cell<ConnectionInner>),
Established(Cell<SessionInner>),
Expand All @@ -53,7 +57,7 @@ impl Connection {
local_config: &Configuration,
remote_config: &Configuration,
) -> Connection {
Connection(Cell::new(ConnectionInner {
Connection(ConnectionRef(Cell::new(ConnectionInner {
io,
codec: AmqpCodec::new(),
state: ConnectionState::Normal,
Expand All @@ -64,9 +68,37 @@ impl Connection {
on_close: Condition::new(),
channel_max: local_config.channel_max,
max_frame_size: remote_config.max_frame_size,
}))
})))
}

pub fn get_ref(&self) -> ConnectionRef {
self.0.clone()
}
}

impl AsRef<ConnectionRef> for Connection {
#[inline]
fn as_ref(&self) -> &ConnectionRef {
&self.0
}
}

impl ops::Deref for Connection {
type Target = ConnectionRef;

#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Drop for Connection {
fn drop(&mut self) {
self.0.force_close()
}
}

impl ConnectionRef {
#[inline]
/// Get io tag for current connection
pub fn tag(&self) -> &'static str {
Expand All @@ -79,6 +111,7 @@ impl Connection {
let inner = self.0.get_mut();
inner.state = ConnectionState::Drop;
inner.io.force_close();
inner.set_error(AmqpProtocolError::ConnectionDropped);
}

#[inline]
Expand Down Expand Up @@ -259,7 +292,7 @@ impl ConnectionInner {
let session = Cell::new(SessionInner::new(
local_token,
false,
Connection(cell.clone()),
ConnectionRef(cell.clone()),
remote_channel_id,
begin.next_outgoing_id(),
begin.incoming_window(),
Expand Down Expand Up @@ -309,7 +342,7 @@ impl ConnectionInner {
let session = Cell::new(SessionInner::new(
local_token,
true,
Connection(cell.clone()),
ConnectionRef(cell.clone()),
remote_channel_id,
begin.next_outgoing_id(),
begin.incoming_window(),
Expand Down Expand Up @@ -479,3 +512,9 @@ impl ConnectionInner {
}
}
}

impl fmt::Debug for ConnectionRef {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("ConnectionRef").finish()
}
}
2 changes: 1 addition & 1 deletion src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{connection::Connection, types, ControlFrame, ControlFrameKind, Recei

type ControlItem<R, E> = (ControlFrame, BoxFuture<'static, Result<R, E>>);

#[derive(Default)]
#[derive(Default, Debug)]
pub(crate) struct ControlQueue {
pending: cell::RefCell<VecDeque<ControlFrame>>,
waker: LocalWaker,
Expand Down
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub enum AmqpProtocolError {
UnexpectedOpeningState(protocol::Frame),
#[display(fmt = "Unexpected frame: {:?}", _0)]
Unexpected(protocol::Frame),
ConnectionDropped,
}

impl error::Error for AmqpProtocolError {}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod sndlink;
mod state;
pub mod types;

pub use self::connection::Connection;
pub use self::connection::{Connection, ConnectionRef};
pub use self::control::{ControlFrame, ControlFrameKind};
pub use self::rcvlink::{ReceiverLink, ReceiverLinkBuilder};
pub use self::session::Session;
Expand Down
36 changes: 18 additions & 18 deletions src/rcvlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@ pub struct ReceiverLink {
pub(crate) inner: Cell<ReceiverLinkInner>,
}

#[derive(Debug)]
pub(crate) struct ReceiverLinkInner {
name: ByteString,
handle: Handle,
remote_handle: Handle,
session: Session,
closed: bool,
reader_task: LocalWaker,
queue: VecDeque<Transfer>,
credit: u32,
delivery_count: u32,
error: Option<Error>,
partial_body: Option<BytesMut>,
partial_body_max: usize,
max_message_size: u64,
pool: PoolRef,
}

impl Eq for ReceiverLink {}

impl PartialEq<ReceiverLink> for ReceiverLink {
Expand Down Expand Up @@ -197,24 +215,6 @@ impl Stream for ReceiverLink {
}
}

#[derive(Debug)]
pub(crate) struct ReceiverLinkInner {
name: ByteString,
handle: Handle,
remote_handle: Handle,
session: Session,
closed: bool,
reader_task: LocalWaker,
queue: VecDeque<Transfer>,
credit: u32,
delivery_count: u32,
error: Option<Error>,
partial_body: Option<BytesMut>,
partial_body_max: usize,
max_message_size: u64,
pool: PoolRef,
}

impl ReceiverLinkInner {
pub(crate) fn new(
session: Cell<SessionInner>,
Expand Down
60 changes: 31 additions & 29 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ use ntex_amqp_codec::protocol::{
};
use ntex_amqp_codec::AmqpFrame;

use crate::connection::Connection;
use crate::error::AmqpProtocolError;
use crate::rcvlink::{ReceiverLink, ReceiverLinkBuilder, ReceiverLinkInner};
use crate::sndlink::{DeliveryPromise, SenderLink, SenderLinkBuilder, SenderLinkInner};
use crate::{cell::Cell, types::Action, ControlFrame};
use crate::{cell::Cell, types::Action, ConnectionRef, ControlFrame};

const INITIAL_OUTGOING_ID: TransferNumber = 0;

Expand All @@ -24,6 +23,32 @@ pub struct Session {
pub(crate) inner: Cell<SessionInner>,
}

#[derive(Debug)]
pub(crate) struct SessionInner {
id: usize,
sink: ConnectionRef,
next_outgoing_id: TransferNumber,
flags: Flags,

remote_channel_id: u16,
next_incoming_id: TransferNumber,
remote_outgoing_window: u32,
remote_incoming_window: u32,

links: Slab<Either<SenderLinkState, ReceiverLinkState>>,
links_by_name: HashMap<ByteString, usize>,
remote_handles: HashMap<Handle, usize>,
error: Option<AmqpProtocolError>,

pending_transfers: VecDeque<PendingTransfer>,
unsettled_deliveries: HashMap<DeliveryNumber, DeliveryPromise>,
disposition_subscribers: HashMap<DeliveryNumber, pool::Sender<Disposition>>,

pub(crate) pool: pool::Pool<Result<Disposition, AmqpProtocolError>>,
pool_disp: pool::Pool<Disposition>,
closed: condition::Condition,
}

impl fmt::Debug for Session {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Session").finish()
Expand All @@ -42,7 +67,7 @@ impl Session {
}

#[inline]
pub fn connection(&self) -> &Connection {
pub fn connection(&self) -> &ConnectionRef {
&self.inner.get_ref().sink
}

Expand Down Expand Up @@ -242,38 +267,15 @@ impl ReceiverLinkState {
}

bitflags::bitflags! {
#[derive(Copy, Clone, Debug)]
struct Flags: u8 {
const LOCAL = 0b0000_0001;
const ENDED = 0b0000_0010;
const ENDING = 0b0000_0100;
}
}

pub(crate) struct SessionInner {
id: usize,
sink: Connection,
next_outgoing_id: TransferNumber,
flags: Flags,

remote_channel_id: u16,
next_incoming_id: TransferNumber,
remote_outgoing_window: u32,
remote_incoming_window: u32,

links: Slab<Either<SenderLinkState, ReceiverLinkState>>,
links_by_name: HashMap<ByteString, usize>,
remote_handles: HashMap<Handle, usize>,
error: Option<AmqpProtocolError>,

pending_transfers: VecDeque<PendingTransfer>,
unsettled_deliveries: HashMap<DeliveryNumber, DeliveryPromise>,
disposition_subscribers: HashMap<DeliveryNumber, pool::Sender<Disposition>>,

pub(crate) pool: pool::Pool<Result<Disposition, AmqpProtocolError>>,
pool_disp: pool::Pool<Disposition>,
closed: condition::Condition,
}

#[derive(Debug)]
struct PendingTransfer {
link_handle: Handle,
body: Option<TransferBody>,
Expand All @@ -300,7 +302,7 @@ impl SessionInner {
pub(crate) fn new(
id: usize,
local: bool,
sink: Connection,
sink: ConnectionRef,
remote_channel_id: u16,
next_incoming_id: DeliveryNumber,
remote_incoming_window: u32,
Expand Down

0 comments on commit c3f1be6

Please sign in to comment.