Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt5to3 adapter #528

Merged
merged 6 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions include/aws/crt/mqtt/Mqtt5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
#include <aws/crt/http/HttpConnection.h>
#include <aws/crt/mqtt/Mqtt5Types.h>
#include <aws/crt/mqtt/MqttClient.h>

namespace Aws
{
Expand All @@ -27,6 +28,8 @@ namespace Aws
class UnSubAckPacket;
class Mqtt5ClientCore;

class Mqtt5to3AdapterOptions;

struct AWS_CRT_CPP_API ReconnectOptions
{
/**
Expand Down Expand Up @@ -315,6 +318,13 @@ namespace Aws
*/
const Mqtt5ClientOperationStatistics &GetOperationStatistics() noexcept;

/**
* Create a new connection object from the client5.
*
* @return std::shared_ptr<Crt::Mqtt::MqttConnection>
*/
std::shared_ptr<Crt::Mqtt::MqttConnection> NewConnection() noexcept;

virtual ~Mqtt5Client();

private:
Expand All @@ -324,6 +334,47 @@ namespace Aws
std::shared_ptr<Mqtt5ClientCore> m_client_core;

Mqtt5ClientOperationStatistics m_operationStatistics;

Mqtt5to3AdapterOptions *m_mqtt5to3AdapterOptions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be clarified as I get further in but just wondering if we'll only ever have one of these adapter options per Mqtt5Client. Thinking of a scenario where we have a number of different adapters from a single Mqtt5 client. e.g. if someone makes a bunch to feed to different service clients instead of using the same one for them all. May be a non-issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only have one adapter options for each Mqtt5Client. When we create MqttConnection from the Mqtt5Client, we will pass the options in the MqttConnection, which will copy the options over.

};

/**
* The extra option required to build MqttConnection
*/
class Mqtt5to3AdapterOptions
{
friend class Mqtt5ClientOptions;
friend class Mqtt5ClientCore;

public:
// Default constructor
Mqtt5to3AdapterOptions();

private:
Crt::String m_hostName;

uint16_t m_port;

bool m_overwriteWebsocket;

Crt::Mqtt::OnWebSocketHandshakeIntercept m_webSocketInterceptor;

/**
* Controls socket properties of the underlying MQTT connections made by the client. Leave undefined to
* use defaults (no TCP keep alive, 10 second socket timeout).
*/
Crt::Io::SocketOptions m_socketOptions;

/**
* TLS context for secure socket connections.
* If undefined, then a plaintext connection will be used.
*/
Crt::Optional<Crt::Io::TlsConnectionOptions> m_tlsConnectionOptions;

/**
* Configures (tunneling) HTTP proxy usage when establishing MQTT connections
*/
Crt::Optional<Crt::Http::HttpClientConnectionProxyOptions> m_proxyOptions;
};

/**
Expand Down Expand Up @@ -564,6 +615,8 @@ namespace Aws
Mqtt5ClientOptions &operator=(Mqtt5ClientOptions &&) = delete;

private:
Mqtt5to3AdapterOptions *NewMqtt5to3AdapterOptions() const noexcept;

/**
* This callback allows a custom transformation of the HTTP request that acts as the websocket
* handshake. Websockets will be used if this is set to a valid transformation callback. To use
Expand Down
12 changes: 12 additions & 0 deletions include/aws/crt/mqtt/Mqtt5ClientCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ namespace Aws
private:
Mqtt5ClientCore(const Mqtt5ClientOptions &options, Allocator *allocator = ApiAllocator()) noexcept;

/**
* Create a new connection object over plain text from the client5. The client must outlive
* all of its connection instances.The Mqtt5 Options will be overwritten by the options,
* passed in here.
*
* @param options the options from Mqtt5Client used to support the MqttConnection
*
* @return std::shared_ptr<Crt::Mqtt::MqttConnection>
*/
std::shared_ptr<Crt::Mqtt::MqttConnection> NewConnection(
Mqtt5::Mqtt5to3AdapterOptions *options) noexcept;

/* Static Callbacks */
static void s_publishCompletionCallback(
enum aws_mqtt5_packet_type packet_type,
Expand Down
31 changes: 31 additions & 0 deletions include/aws/crt/mqtt/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <aws/crt/io/TlsOptions.h>

#include <aws/mqtt/client.h>
#include <aws/mqtt/v5/mqtt5_client.h>

#include <atomic>
#include <functional>
Expand All @@ -30,6 +31,11 @@ namespace Aws
class HttpRequest;
}

namespace Mqtt5
{
class Mqtt5ClientCore;
}

namespace Mqtt
{
class MqttClient;
Expand Down Expand Up @@ -220,6 +226,7 @@ namespace Aws
class AWS_CRT_CPP_API MqttConnection final
{
friend class MqttClient;
friend class Mqtt5::Mqtt5ClientCore;

public:
~MqttConnection();
Expand Down Expand Up @@ -444,6 +451,7 @@ namespace Aws
bool m_useTls;
bool m_useWebsocket;
MqttConnectionOperationStatistics m_operationStatistics;
Allocator *m_allocator;

MqttConnection(
aws_mqtt_client *client,
Expand All @@ -460,6 +468,23 @@ namespace Aws
const Io::SocketOptions &socketOptions,
bool useWebsocket) noexcept;

MqttConnection(
aws_mqtt5_client *client,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
const Crt::Io::TlsConnectionOptions &tlsConnectionOptions,
bool useWebsocket,
aws_allocator *allocaotr) noexcept;

MqttConnection(
aws_mqtt5_client *client,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
bool useWebsocket,
aws_allocator *allocaotr) noexcept;

static void s_onConnectionInterrupted(aws_mqtt_client_connection *, int errorCode, void *userData);
static void s_onConnectionCompleted(
aws_mqtt_client_connection *,
Expand Down Expand Up @@ -527,6 +552,12 @@ namespace Aws
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions);
static void s_connectionInit(
MqttConnection *self,
const char *hostName,
uint16_t port,
const Io::SocketOptions &socketOptions,
aws_mqtt5_client *mqtt5client);
};

/**
Expand Down
44 changes: 44 additions & 0 deletions source/mqtt/Mqtt5Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,23 @@ namespace Aws
{
namespace Mqtt5
{
Mqtt5to3AdapterOptions::Mqtt5to3AdapterOptions() : m_port(0), m_overwriteWebsocket(false) {}

Mqtt5Client::Mqtt5Client(const Mqtt5ClientOptions &options, Allocator *allocator) noexcept
: m_client_core(nullptr)
{
m_client_core = Mqtt5ClientCore::NewMqtt5ClientCore(options, allocator);
m_mqtt5to3AdapterOptions = options.NewMqtt5to3AdapterOptions();
}

std::shared_ptr<Crt::Mqtt::MqttConnection> Mqtt5Client::NewConnection() noexcept
{
if (m_client_core == nullptr)
{
AWS_LOGF_DEBUG(AWS_LS_MQTT5_CLIENT, "Failed to create mqtt3 connection: Mqtt5 Client is invalid.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be the ERROR log level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a good idea to set the error level to ERROR for those "invalid mqtt5 client" issue here. As if the issue happened (the client core is nullptr), it usually means the Mqtt5Client is unusable, and we should expose it to user.
Since we've been using DEBUG log for all other operations, I will keep it here for now and create a separate PR to update the log level all together.

return nullptr;
}
return m_client_core->NewConnection(m_mqtt5to3AdapterOptions);
}

Mqtt5Client::~Mqtt5Client()
Expand All @@ -34,6 +47,7 @@ namespace Aws
m_client_core->Close();
m_client_core.reset();
}
delete m_mqtt5to3AdapterOptions;
}

std::shared_ptr<Mqtt5Client> Mqtt5Client::NewMqtt5Client(
Expand Down Expand Up @@ -233,6 +247,36 @@ namespace Aws

Mqtt5ClientOptions::~Mqtt5ClientOptions() {}

Mqtt5to3AdapterOptions *Mqtt5ClientOptions::NewMqtt5to3AdapterOptions() const noexcept
{
Mqtt5to3AdapterOptions *adapterOptions = new Mqtt5to3AdapterOptions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debatable: If m_mqtt5to3AdapterOptions is populated directly in this method, then there will be no need in allocating m_mqtt5to3AdapterOptions on the heap or even making it a pointer.
My main concern is a new call in the code: there is no issue with it, but it always enforces you to make sure that corresponding delete will be called. Maybe at least replace it with std::unique_ptr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the crt we always pair *_destroy() for anything that has a *_new() that takes an allocator to make sure we clean up anything we allocate. Unsure if we have the same for something with New in cpp but I feel like I've seen New used that returns a pointer or a null with no corresponding delete or destroy. Good to stay consistent though so worth investigating.

adapterOptions->m_hostName = m_hostName;
adapterOptions->m_port = m_port;
adapterOptions->m_socketOptions = m_socketOptions;
if (m_proxyOptions.has_value())
adapterOptions->m_proxyOptions = m_proxyOptions.value();
if (m_tlsConnectionOptions.has_value())
{
adapterOptions->m_tlsConnectionOptions = m_tlsConnectionOptions.value();
}
if (websocketHandshakeTransform)
{
adapterOptions->m_overwriteWebsocket = true;

auto signerTransform = [this](
std::shared_ptr<Crt::Http::HttpRequest> req,
const Crt::Mqtt::OnWebSocketHandshakeInterceptComplete &onComplete) {
this->websocketHandshakeTransform(req, onComplete);
};
adapterOptions->m_webSocketInterceptor = std::move(signerTransform);
}
else
{
adapterOptions->m_overwriteWebsocket = false;
}
return adapterOptions;
}

Mqtt5ClientOptions &Mqtt5ClientOptions::WithHostName(Crt::String hostname)
{
m_hostName = std::move(hostname);
Expand Down
52 changes: 52 additions & 0 deletions source/mqtt/Mqtt5ClientCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,58 @@ namespace Aws
}
}

std::shared_ptr<Crt::Mqtt::MqttConnection> Mqtt5ClientCore::NewConnection(
Mqtt5::Mqtt5to3AdapterOptions *options) noexcept
{
// If you're reading this and asking.... why is this so complicated? Why not use make_shared
// or allocate_shared? Well, MqttConnection constructors are private and stl is dumb like that.
// so, we do it manually.
Allocator *allocator = this->m_allocator;
Crt::Mqtt::MqttConnection *toSeat = reinterpret_cast<Crt::Mqtt::MqttConnection *>(
aws_mem_acquire(allocator, sizeof(Crt::Mqtt::MqttConnection)));
if (!toSeat)
{
return nullptr;
}

if (options->m_tlsConnectionOptions.has_value())
{
toSeat = new (toSeat) Crt::Mqtt::MqttConnection(
m_client,
options->m_hostName.c_str(),
options->m_port,
options->m_socketOptions,
options->m_tlsConnectionOptions.value(),
options->m_overwriteWebsocket,
allocator);
}
else
{
toSeat = new (toSeat) Crt::Mqtt::MqttConnection(
m_client,
options->m_hostName.c_str(),
options->m_port,
options->m_socketOptions,
options->m_overwriteWebsocket,
allocator);
}
if (options->m_proxyOptions.has_value())
{
toSeat->SetHttpProxyOptions(options->m_proxyOptions.value());
}

if (options->m_overwriteWebsocket)
{
toSeat->WebsocketInterceptor = options->m_webSocketInterceptor;
}

return std::shared_ptr<Crt::Mqtt::MqttConnection>(
toSeat, [allocator](Crt::Mqtt::MqttConnection *connection) {
connection->~MqttConnection();
aws_mem_release(allocator, reinterpret_cast<void *>(connection));
});
}

void Mqtt5ClientCore::s_publishCompletionCallback(
enum aws_mqtt5_packet_type packet_type,
const void *publishCompletionPacket,
Expand Down
Loading