Skip to content

Commit

Permalink
fix review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
dgarus committed Aug 1, 2023
1 parent 4db318f commit 1b39aaf
Showing 1 changed file with 33 additions and 49 deletions.
82 changes: 33 additions & 49 deletions protocols/relay/src/behaviour/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,7 @@ pub struct Handler {
circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,

stop_requested_streams: VecDeque<StopCommand>,
outbound_stop_futs: FuturesUnordered<
BoxFuture<'static, Result<(Event, Option<oneshot::Receiver<()>>), FatalUpgradeError>>,
>,
outbound_stop_futs: FuturesUnordered<BoxFuture<'static, Result<Event, FatalUpgradeError>>>,
}

impl Handler {
Expand All @@ -416,7 +414,7 @@ impl Handler {
&self,
io: Stream,
stop_command: StopCommand,
) -> BoxFuture<'static, Result<(Event, Option<oneshot::Receiver<()>>), FatalUpgradeError>> {
) -> BoxFuture<'static, Result<Event, FatalUpgradeError>> {
let msg = proto::StopMessage {
type_pb: proto::StopMessageType::CONNECT,
peer: Some(proto::Peer {
Expand All @@ -436,6 +434,9 @@ impl Handler {
status: None,
};

let (tx, rx) = oneshot::channel();
self.alive_lend_out_substreams.push(rx);

let mut substream = Framed::new(io, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE));
async move {
substream.send(msg).await?;
Expand All @@ -461,34 +462,26 @@ impl Handler {
Some(proto_status) => match proto_status {
Status::OK => {}
Status::RESOURCE_LIMIT_EXCEEDED => {
return Ok((
Event::OutboundConnectNegotiationFailed {
circuit_id: stop_command.circuit_id,
src_peer_id: stop_command.src_peer_id,
src_connection_id: stop_command.src_connection_id,
inbound_circuit_req: stop_command.inbound_circuit_req,
status: proto_status,
error: StreamUpgradeError::Apply(
CircuitFailedReason::ResourceLimitExceeded,
),
},
None,
))
return Ok(Event::OutboundConnectNegotiationFailed {
circuit_id: stop_command.circuit_id,
src_peer_id: stop_command.src_peer_id,
src_connection_id: stop_command.src_connection_id,
inbound_circuit_req: stop_command.inbound_circuit_req,
status: proto_status,
error: StreamUpgradeError::Apply(
CircuitFailedReason::ResourceLimitExceeded,
),
})
}
Status::PERMISSION_DENIED => {
return Ok((
Event::OutboundConnectNegotiationFailed {
circuit_id: stop_command.circuit_id,
src_peer_id: stop_command.src_peer_id,
src_connection_id: stop_command.src_connection_id,
inbound_circuit_req: stop_command.inbound_circuit_req,
status: proto_status,
error: StreamUpgradeError::Apply(
CircuitFailedReason::PermissionDenied,
),
},
None,
))
return Ok(Event::OutboundConnectNegotiationFailed {
circuit_id: stop_command.circuit_id,
src_peer_id: stop_command.src_peer_id,
src_connection_id: stop_command.src_connection_id,
inbound_circuit_req: stop_command.inbound_circuit_req,
status: proto_status,
error: StreamUpgradeError::Apply(CircuitFailedReason::PermissionDenied),
})
}
s => return Err(FatalUpgradeError::UnexpectedStatus(s)),
},
Expand All @@ -508,19 +501,15 @@ impl Handler {
"Expect a flushed Framed to have an empty write buffer."
);

let (tx, rx) = oneshot::channel();
Ok((
Event::OutboundConnectNegotiated {
circuit_id: stop_command.circuit_id,
src_peer_id: stop_command.src_peer_id,
src_connection_id: stop_command.src_connection_id,
inbound_circuit_req: stop_command.inbound_circuit_req,
dst_handler_notifier: tx,
dst_stream: io,
dst_pending_data: read_buffer.freeze(),
},
Some(rx),
))
Ok(Event::OutboundConnectNegotiated {
circuit_id: stop_command.circuit_id,
src_peer_id: stop_command.src_peer_id,
src_connection_id: stop_command.src_connection_id,
inbound_circuit_req: stop_command.inbound_circuit_req,
dst_handler_notifier: tx,
dst_stream: io,
dst_pending_data: read_buffer.freeze(),
})
}
.boxed()
}
Expand Down Expand Up @@ -744,12 +733,7 @@ impl ConnectionHandler for Handler {
// Send stop commands
if let Poll::Ready(Some(result)) = self.outbound_stop_futs.poll_next_unpin(cx) {
return match result {
Ok((event, receiver)) => {
if let Some(rx) = receiver {
self.alive_lend_out_substreams.push(rx);
}
Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event))
}
Ok(event) => Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)),
Err(e) => Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Apply(
Either::Right(e),
))),
Expand Down

0 comments on commit 1b39aaf

Please sign in to comment.