Skip to content

Commit

Permalink
Adapter API Update (#545)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera authored Oct 3, 2023
1 parent 6c8f1bb commit 421c832
Show file tree
Hide file tree
Showing 8 changed files with 707 additions and 226 deletions.
70 changes: 3 additions & 67 deletions include/aws/crt/mqtt/Mqtt5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ namespace Aws
*/
class AWS_CRT_CPP_API Mqtt5Client final : public std::enable_shared_from_this<Mqtt5Client>
{
friend class Mqtt::MqttConnection;

public:
/**
* Factory function for mqtt5 client
Expand Down Expand Up @@ -318,13 +320,6 @@ namespace Aws
*/
const Mqtt5ClientOperationStatistics &GetOperationStatistics() noexcept;

/**
* Create a new MqttConnection object from the Mqtt5Client.
*
* @return std::shared_ptr<Crt::Mqtt::MqttConnection>
*/
std::shared_ptr<Crt::Mqtt::MqttConnection> NewConnection() noexcept;

virtual ~Mqtt5Client();

private:
Expand All @@ -334,66 +329,15 @@ namespace Aws
std::shared_ptr<Mqtt5ClientCore> m_client_core;

Mqtt5ClientOperationStatistics m_operationStatistics;

ScopedResource<Mqtt5to3AdapterOptions> m_mqtt5to3AdapterOptions;
};

/**
* The extra options required to build MqttConnection from Mqtt5Client
*/
class Mqtt5to3AdapterOptions
{
friend class Mqtt5ClientOptions;
friend class Mqtt5ClientCore;

public:
/* Default constructor */
Mqtt5to3AdapterOptions();

private:
/* Host name of the MQTT server to connect to. */
Crt::String m_hostName;

/* Port to connect to */
uint16_t m_port;

/*
* If the MqttConnection should overwrite the websocket config. If set to true, m_webSocketInterceptor
* must be set.
*/
bool m_overwriteWebsocket;

/*
* The transform function invoked during websocket handshake.
*/
Crt::Mqtt::OnWebSocketHandshakeIntercept m_webSocketInterceptor;

/**
* Controls socket properties of the underlying MQTT connections made by the client. Leave undefined to
* use defaults (no TCP keep alive, 10 second socket timeout).
*/
Crt::Io::SocketOptions m_socketOptions;

/**
* TLS context for secure socket connections.
* If undefined, a plaintext connection will be used.
*/
Crt::Optional<Crt::Io::TlsConnectionOptions> m_tlsConnectionOptions;

/**
* Configures (tunneling) HTTP proxy usage when establishing MQTT connections
*/
Crt::Optional<Crt::Http::HttpClientConnectionProxyOptions> m_proxyOptions;
};

/**
* Configuration interface for mqtt5 clients
*/
class AWS_CRT_CPP_API Mqtt5ClientOptions final
{

friend class Mqtt5Client;
friend class Mqtt5ClientCore;
friend class Mqtt5to3AdapterOptions;

public:
/**
Expand Down Expand Up @@ -624,14 +568,6 @@ namespace Aws
Mqtt5ClientOptions &operator=(Mqtt5ClientOptions &&) = delete;

private:
/*
* Allocate and create a new Mqtt5to3AdapterOptions. This function is internally used by Mqtt5Client to
* support the Mqtt5to3Adapter.
*
* @return Mqtt5to3AdapterOptions
*/
ScopedResource<Mqtt5to3AdapterOptions> NewMqtt5to3AdapterOptions() const noexcept;

/**
* This callback allows a custom transformation of the HTTP request that acts as the websocket
* handshake. Websockets will be used if this is set to a valid transformation callback. To use
Expand Down
12 changes: 11 additions & 1 deletion include/aws/crt/mqtt/MqttConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ namespace Aws

namespace Mqtt5
{
class Mqtt5Client;
class Mqtt5ClientCore;
}
} // namespace Mqtt5

namespace Mqtt
{
Expand Down Expand Up @@ -165,6 +166,15 @@ namespace Aws
MqttConnection &operator=(const MqttConnection &) = delete;
MqttConnection &operator=(MqttConnection &&) = delete;

/**
* Create a new MqttConnection object from the Mqtt5Client.
* @param mqtt5client The shared ptr of Mqtt5Client
*
* @return std::shared_ptr<Crt::Mqtt::MqttConnection>
*/
static std::shared_ptr<Crt::Mqtt::MqttConnection> NewConnectionFromMqtt5Client(
std::shared_ptr<Mqtt5::Mqtt5Client> mqtt5client) noexcept;

/**
* @return true if the instance is in a valid state, false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ namespace Aws
*/
class AWS_CRT_CPP_API Mqtt5ClientCore final : public std::enable_shared_from_this<Mqtt5ClientCore>
{
friend Mqtt5Client;
friend class Mqtt5Client;
friend class Mqtt::MqttConnection;

public:
/**
Expand Down Expand Up @@ -185,6 +186,11 @@ namespace Aws
*/
std::shared_ptr<Mqtt5ClientCore> m_selfReference;

/*
* The Mqtt5to3 Adapter Options. Used to create a mqtt311 connection from mqtt5 client
*/
ScopedResource<Mqtt5to3AdapterOptions> m_mqtt5to3AdapterOptions;

/*
* The callback flag used to indicate if it is safe to invoke the callbacks
*/
Expand All @@ -203,6 +209,47 @@ namespace Aws
Allocator *m_allocator;
};

/**
* The extra options required to build MqttConnection from Mqtt5Client
*/
class Mqtt5to3AdapterOptions
{
friend class Mqtt5ClientOptions;
friend class Mqtt5ClientCore;
friend class Mqtt::MqttConnection;

public:
/* Default constructor */
Mqtt5to3AdapterOptions();
/*
* Allocate and create a new Mqtt5to3AdapterOptions. This function is internally used by Mqtt5Client to
* support the Mqtt5to3Adapter.
*
* @return Mqtt5to3AdapterOptions
*/
static ScopedResource<Mqtt5to3AdapterOptions> NewMqtt5to3AdapterOptions(
const Mqtt5ClientOptions &options) noexcept;

private:
Mqtt::MqttConnectionOptions m_mqtt3Options;

/* Reserve to store memory for m_mqtt3options.hostname */
String m_hostname;

/*
* The transform function invoked during websocket handshake.
*/
Crt::Mqtt::OnWebSocketHandshakeIntercept m_webSocketInterceptor;

/* Store the user intercept handshake function */
OnWebSocketHandshakeIntercept m_websocketHandshakeTransform;

/**
* Configures (tunneling) HTTP proxy usage when establishing MQTT connections
*/
Crt::Optional<Crt::Http::HttpClientConnectionProxyOptions> m_proxyOptions;
};

} // namespace Mqtt5
} // namespace Crt
} // namespace Aws
49 changes: 1 addition & 48 deletions source/mqtt/Mqtt5Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/crt/mqtt/Mqtt5Client.h>
#include <aws/crt/mqtt/Mqtt5ClientCore.h>
#include <aws/crt/mqtt/Mqtt5Packets.h>
#include <aws/crt/mqtt/private/Mqtt5ClientCore.h>

#include <aws/crt/Api.h>
#include <aws/crt/StlAllocator.h>
Expand All @@ -21,23 +21,10 @@ namespace Aws
{
namespace Mqtt5
{
Mqtt5to3AdapterOptions::Mqtt5to3AdapterOptions() : m_port(0), m_overwriteWebsocket(false) {}

Mqtt5Client::Mqtt5Client(const Mqtt5ClientOptions &options, Allocator *allocator) noexcept
: m_client_core(nullptr)
{
m_client_core = Mqtt5ClientCore::NewMqtt5ClientCore(options, allocator);
m_mqtt5to3AdapterOptions = options.NewMqtt5to3AdapterOptions();
}

std::shared_ptr<Crt::Mqtt::MqttConnection> Mqtt5Client::NewConnection() noexcept
{
if (m_client_core == nullptr)
{
AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "Failed to create mqtt3 connection: Mqtt5 Client is invalid.");
return nullptr;
}
return m_client_core->NewConnection(m_mqtt5to3AdapterOptions.get());
}

Mqtt5Client::~Mqtt5Client()
Expand Down Expand Up @@ -246,40 +233,6 @@ namespace Aws

Mqtt5ClientOptions::~Mqtt5ClientOptions() {}

ScopedResource<Mqtt5to3AdapterOptions> Mqtt5ClientOptions::NewMqtt5to3AdapterOptions() const noexcept
{
Allocator *allocator = m_allocator;
ScopedResource<Mqtt5to3AdapterOptions> adapterOptions = ScopedResource<Mqtt5to3AdapterOptions>(
Crt::New<Mqtt5to3AdapterOptions>(allocator),
[allocator](Mqtt5to3AdapterOptions *options) { Crt::Delete(options, allocator); });

adapterOptions->m_hostName = m_hostName;
adapterOptions->m_port = m_port;
adapterOptions->m_socketOptions = m_socketOptions;
if (m_proxyOptions.has_value())
adapterOptions->m_proxyOptions = m_proxyOptions.value();
if (m_tlsConnectionOptions.has_value())
{
adapterOptions->m_tlsConnectionOptions = m_tlsConnectionOptions.value();
}
if (websocketHandshakeTransform)
{
adapterOptions->m_overwriteWebsocket = true;

auto signerTransform = [this](
std::shared_ptr<Crt::Http::HttpRequest> req,
const Crt::Mqtt::OnWebSocketHandshakeInterceptComplete &onComplete) {
this->websocketHandshakeTransform(std::move(req), onComplete);
};
adapterOptions->m_webSocketInterceptor = std::move(signerTransform);
}
else
{
adapterOptions->m_overwriteWebsocket = false;
}
return adapterOptions;
}

Mqtt5ClientOptions &Mqtt5ClientOptions::WithHostName(Crt::String hostname)
{
m_hostName = std::move(hostname);
Expand Down
78 changes: 42 additions & 36 deletions source/mqtt/Mqtt5ClientCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/crt/mqtt/Mqtt5Client.h>
#include <aws/crt/mqtt/Mqtt5ClientCore.h>
#include <aws/crt/mqtt/Mqtt5Packets.h>
#include <aws/crt/mqtt/private/Mqtt5ClientCore.h>

#include <aws/crt/Api.h>
#include <aws/crt/StlAllocator.h>
Expand Down Expand Up @@ -195,41 +195,6 @@ namespace Aws
}
}

std::shared_ptr<Crt::Mqtt::MqttConnection> Mqtt5ClientCore::NewConnection(
const Mqtt5::Mqtt5to3AdapterOptions *options) noexcept
{
Mqtt::MqttConnectionOptions connectionOptions;
connectionOptions.hostName = options->m_hostName.c_str();
connectionOptions.port = options->m_port;
connectionOptions.socketOptions = options->m_socketOptions;
connectionOptions.useWebsocket = options->m_overwriteWebsocket;
connectionOptions.allocator = m_allocator;

if (options->m_tlsConnectionOptions.has_value())
{
connectionOptions.tlsConnectionOptions = options->m_tlsConnectionOptions.value();
connectionOptions.useTls = true;
}

auto connection = Mqtt::MqttConnection::s_CreateMqttConnection(m_client, std::move(connectionOptions));
if (!connection)
{
return {};
}

if (options->m_proxyOptions.has_value())
{
connection->SetHttpProxyOptions(options->m_proxyOptions.value());
}

if (options->m_overwriteWebsocket)
{
connection->WebsocketInterceptor = options->m_webSocketInterceptor;
}

return connection;
}

void Mqtt5ClientCore::s_publishCompletionCallback(
enum aws_mqtt5_packet_type packet_type,
const void *publishCompletionPacket,
Expand Down Expand Up @@ -496,6 +461,8 @@ namespace Aws
clientOptions.client_termination_handler_user_data = this;

m_client = aws_mqtt5_client_new(allocator, &clientOptions);

m_mqtt5to3AdapterOptions = Mqtt5to3AdapterOptions::NewMqtt5to3AdapterOptions(options);
}

Mqtt5ClientCore::~Mqtt5ClientCore() {}
Expand Down Expand Up @@ -642,6 +609,45 @@ namespace Aws
}
}

Mqtt5to3AdapterOptions::Mqtt5to3AdapterOptions() {}

ScopedResource<Mqtt5to3AdapterOptions> Mqtt5to3AdapterOptions::NewMqtt5to3AdapterOptions(
const Mqtt5ClientOptions &options) noexcept
{
Allocator *allocator = options.m_allocator;
ScopedResource<Mqtt5to3AdapterOptions> adapterOptions = ScopedResource<Mqtt5to3AdapterOptions>(
Crt::New<Mqtt5to3AdapterOptions>(allocator),
[allocator](Mqtt5to3AdapterOptions *options) { Crt::Delete(options, allocator); });
adapterOptions->m_mqtt3Options.allocator = options.m_allocator;
adapterOptions->m_hostname = options.m_hostName;
adapterOptions->m_mqtt3Options.hostName = adapterOptions->m_hostname.c_str();
adapterOptions->m_mqtt3Options.port = options.m_port;
adapterOptions->m_mqtt3Options.socketOptions = options.m_socketOptions;
if (options.m_proxyOptions.has_value())
adapterOptions->m_proxyOptions = options.m_proxyOptions.value();
if (options.m_tlsConnectionOptions.has_value())
{
adapterOptions->m_mqtt3Options.tlsConnectionOptions = options.m_tlsConnectionOptions.value();
adapterOptions->m_mqtt3Options.useTls = true;
}
if (options.websocketHandshakeTransform)
{
adapterOptions->m_mqtt3Options.useWebsocket = true;
adapterOptions->m_websocketHandshakeTransform = options.websocketHandshakeTransform;

auto signerTransform = [&adapterOptions](
std::shared_ptr<Crt::Http::HttpRequest> req,
const Crt::Mqtt::OnWebSocketHandshakeInterceptComplete &onComplete) {
adapterOptions->m_websocketHandshakeTransform(std::move(req), onComplete);
};
adapterOptions->m_webSocketInterceptor = std::move(signerTransform);
}
else
{
adapterOptions->m_mqtt3Options.useWebsocket = false;
}
return adapterOptions;
}
} // namespace Mqtt5
} // namespace Crt
} // namespace Aws
Loading

0 comments on commit 421c832

Please sign in to comment.