Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt5 GA API Review #575

Merged
merged 18 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions bin/mqtt5_canary/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,14 @@ static int s_AwsMqtt5CanaryOperationSubscribe(struct AwsMqtt5CanaryTestClient *t
.WithNoLocal(false)
.WithQOS(Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE)
.WithRetainHandlingType(Mqtt5::RetainHandlingType::AWS_MQTT5_RHT_SEND_ON_SUBSCRIBE)
.WithRetain(false);
.WithRetainAsPublished(false);

Mqtt5::Subscription subscription2;
subscription2.WithTopicFilter(testClient->sharedTopic)
.WithNoLocal(false)
.WithQOS(Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE)
.WithRetainHandlingType(Mqtt5::RetainHandlingType::AWS_MQTT5_RHT_SEND_ON_SUBSCRIBE)
.WithRetain(false);
.WithRetainAsPublished(false);

std::shared_ptr<Mqtt5::SubscribePacket> packet = std::make_shared<Mqtt5::SubscribePacket>(allocator);
packet->WithSubscription(std::move(subscription1));
Expand Down Expand Up @@ -450,7 +450,7 @@ static int s_AwsMqtt5CanaryOperationUnsubscribeBad(struct AwsMqtt5CanaryTestClie
unsubscription, [testClient](int, std::shared_ptr<Mqtt5::UnSubAckPacket> packet) {
if (packet == nullptr)
return;
if (packet->getReasonCodes()[0] == AWS_MQTT5_UARC_SUCCESS)
if (packet->getReasonCodes()[0] == UnSubAckReasonCode::AWS_MQTT5_UARC_SUCCESS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't what I had in mind for the enums. Did the original proposal not work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I'm creating the enum using the same value name in aws-c-mqtt (UnSubAckReasonCode:: AWS_MQTT5_UARC_SUCCESS v.s. aws_mqtt5_unsuback_reason_code::AWS_MQTT5_UARC_SUCCESS), the compiler would report an ambiguity error. I had to add the namespace to avoid the error.

One way to fix it is using a different enum value name in CPP. For example: UnSubAckReasonCode ::AWS_MQTT5_UARC_SUCCESS -> UnSubAckReasonCode::SUCCESS. In this case, we could still directly pass AWS_MQTT5_UARC_SUCCESS as the enum would be converted implicitly. However, it would break the user who was using UnSubAckReasonCode::AWS_MQTT5_UARC_SUCCESS.

I'm not sure if there is a better way to fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was what I intended:

enum class UnsubackReasonCode {
Success = AWS_MQTT5_UARC_SUCCESS,
... etc...
};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It seems that enum class would not be able to directly convert between int and enum value. Using enum class would definitely breaks the customer.

  2. If we removed AWS_MQTT5_UARC_SUCCESS and rename it to Success, then it would break the customer if they already use the enum with the namespace like following pattern:

UnSubAckReasonCode reasonCode = UnSubAckReasonCode::AWS_MQTT5_UARC_SUCCESS

Copy link
Contributor

@bretambrose bretambrose Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could double the entries with the palatable names, but I'm not sure the extra effort is worth it.

Forcing people to use the name mangling prefix is crummy though.

{
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CANARY,
Expand Down Expand Up @@ -558,7 +558,7 @@ static int s_AwsMqtt5CanaryOperationPublishQos0(struct AwsMqtt5CanaryTestClient

Aws::Crt::String topic = "topic1";
AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Publish qos0", testClient->clientId.c_str());
return s_AwsMqtt5CanaryOperationPublish(testClient, topic, AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
return s_AwsMqtt5CanaryOperationPublish(testClient, topic, QOS::AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
}

static int s_AwsMqtt5CanaryOperationPublishQos1(struct AwsMqtt5CanaryTestClient *testClient, Allocator *allocator)
Expand All @@ -569,7 +569,7 @@ static int s_AwsMqtt5CanaryOperationPublishQos1(struct AwsMqtt5CanaryTestClient
}
Aws::Crt::String topic = "topic1";
AWS_LOGF_INFO(AWS_LS_MQTT5_CANARY, "ID:%s Publish qos1", testClient->clientId.c_str());
return s_AwsMqtt5CanaryOperationPublish(testClient, topic, AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
return s_AwsMqtt5CanaryOperationPublish(testClient, topic, QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
}

static int s_AwsMqtt5CanaryOperationPublishToSubscribedTopicQos0(
Expand All @@ -591,7 +591,7 @@ static int s_AwsMqtt5CanaryOperationPublishToSubscribedTopicQos0(

AWS_LOGF_INFO(
AWS_LS_MQTT5_CANARY, "ID:%s Publish qos 0 to subscribed topic: %s", testClient->clientId.c_str(), topicArray);
return s_AwsMqtt5CanaryOperationPublish(testClient, topicArray, AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
return s_AwsMqtt5CanaryOperationPublish(testClient, topicArray, QOS::AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
}

static int s_AwsMqtt5CanaryOperationPublishToSubscribedTopicQos1(
Expand All @@ -614,7 +614,7 @@ static int s_AwsMqtt5CanaryOperationPublishToSubscribedTopicQos1(

AWS_LOGF_INFO(
AWS_LS_MQTT5_CANARY, "ID:%s Publish qos 1 to subscribed topic: %s", testClient->clientId.c_str(), topicArray);
return s_AwsMqtt5CanaryOperationPublish(testClient, topicArray, AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
return s_AwsMqtt5CanaryOperationPublish(testClient, topicArray, QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
}

static int s_AwsMqtt5CanaryOperationPublishToSharedTopicQos0(
Expand All @@ -630,7 +630,7 @@ static int s_AwsMqtt5CanaryOperationPublishToSharedTopicQos0(
"ID:%s Publish qos 0 to shared topic: %s",
testClient->clientId.c_str(),
testClient->sharedTopic.c_str());
return s_AwsMqtt5CanaryOperationPublish(testClient, testClient->sharedTopic, AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
return s_AwsMqtt5CanaryOperationPublish(testClient, testClient->sharedTopic, QOS::AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
}

static int s_AwsMqtt5CanaryOperationPublishToSharedTopicQos1(
Expand All @@ -647,7 +647,7 @@ static int s_AwsMqtt5CanaryOperationPublishToSharedTopicQos1(
testClient->clientId.c_str(),
testClient->sharedTopic.c_str());
return s_AwsMqtt5CanaryOperationPublish(
testClient, testClient->sharedTopic, AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
testClient, testClient->sharedTopic, QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
}

static struct AwsMqtt5CanaryOperationsFunctionTable s_AwsMqtt5CanaryOperationTable = {{
Expand Down Expand Up @@ -828,7 +828,8 @@ int main(int argc, char **argv)
.WithSocketOptions(socketOptions)
.WithBootstrap(&clientBootstrap)
.WithPingTimeoutMs(10000)
.WithReconnectOptions({AWS_EXPONENTIAL_BACKOFF_JITTER_NONE, 1000, 120000, 3000});
.WithReconnectOptions(
{ExponentialBackoffJitterMode::AWS_EXPONENTIAL_BACKOFF_JITTER_NONE, 1000, 120000, 3000});

if (appCtx.use_tls)
{
Expand Down
38 changes: 25 additions & 13 deletions include/aws/crt/mqtt/Mqtt5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ namespace Aws
* Controls how the reconnect delay is modified in order to smooth out the distribution of reconnection
* attempt timepoints for a large set of reconnecting clients.
*/
JitterMode m_reconnectMode;
ExponentialBackoffJitterMode m_reconnectMode;

/**
* Minimum amount of time to wait to reconnect after a disconnect. Exponential backoff is performed
Expand Down Expand Up @@ -383,47 +383,47 @@ namespace Aws
* Notifies the MQTT5 client that you want it to transition to the stopped state, disconnecting any
* existing connection and stopping subsequent reconnect attempts.
*
* @param disconnectOptions (optional) properties of a DISCONNECT packet to send as part of the shutdown
* @param disconnectPacket (optional) properties of a DISCONNECT packet to send as part of the shutdown
* process
*
* @return bool: true if operation succeed, otherwise false
*/
bool Stop(std::shared_ptr<DisconnectPacket> disconnectOptions) noexcept;
bool Stop(std::shared_ptr<DisconnectPacket> disconnectPacket) noexcept;

/**
* Tells the client to attempt to send a PUBLISH packet
*
* @param publishOptions: packet PUBLISH to send to the server
* @param publishPacket: packet PUBLISH to send to the server
* @param onPublishCompletionCallback: callback on publish complete, default to NULL
*
* @return true if the publish operation succeed otherwise false
*/
bool Publish(
std::shared_ptr<PublishPacket> publishOptions,
std::shared_ptr<PublishPacket> publishPacket,
OnPublishCompletionHandler onPublishCompletionCallback = NULL) noexcept;

/**
* Tells the client to attempt to subscribe to one or more topic filters.
*
* @param subscribeOptions: SUBSCRIBE packet to send to the server
* @param subscribePacket: SUBSCRIBE packet to send to the server
* @param onSubscribeCompletionCallback: callback on subscribe complete, default to NULL
*
* @return true if the subscription operation succeed otherwise false
*/
bool Subscribe(
std::shared_ptr<SubscribePacket> subscribeOptions,
std::shared_ptr<SubscribePacket> subscribePacket,
OnSubscribeCompletionHandler onSubscribeCompletionCallback = NULL) noexcept;

/**
* Tells the client to attempt to unsubscribe to one or more topic filters.
*
* @param unsubscribeOptions: UNSUBSCRIBE packet to send to the server
* @param unsubscribePacket: UNSUBSCRIBE packet to send to the server
* @param onUnsubscribeCompletionCallback: callback on unsubscribe complete, default to NULL
*
* @return true if the unsubscription operation succeed otherwise false
*/
bool Unsubscribe(
std::shared_ptr<UnsubscribePacket> unsubscribeOptions,
std::shared_ptr<UnsubscribePacket> unsubscribePacket,
OnUnsubscribeCompletionHandler onUnsubscribeCompletionCallback = NULL) noexcept;

/**
Expand Down Expand Up @@ -517,11 +517,11 @@ namespace Aws
/**
* Sets mqtt5 connection options
*
* @param packetConnect package connection options
* @param connectPacket package connection options
*
* @return this option object
*/
Mqtt5ClientOptions &WithConnectOptions(std::shared_ptr<ConnectPacket> packetConnect) noexcept;
Mqtt5ClientOptions &WithConnectOptions(std::shared_ptr<ConnectPacket> connectPacket) noexcept;

/**
* Sets session behavior. Overrides how the MQTT5 client should behave with respect to MQTT sessions.
Expand Down Expand Up @@ -595,15 +595,27 @@ namespace Aws
*/
Mqtt5ClientOptions &WithConnackTimeoutMs(uint32_t connackTimeoutMs) noexcept;

/**
* @deprecated The function is deprecated, please use `Mqtt5ClientOptions::WithAckTimeoutSec(uint32_t)`
*
* Sets Operation Timeout(Seconds). Time interval to wait for an ack after sending a QoS 1+ PUBLISH,
* SUBSCRIBE, or UNSUBSCRIBE before failing the operation.
*
* @param ackTimeoutSec
*
* @return this option object
*/
Mqtt5ClientOptions &WithAckTimeoutSeconds(uint32_t ackTimeoutSec) noexcept;

/**
* Sets Operation Timeout(Seconds). Time interval to wait for an ack after sending a QoS 1+ PUBLISH,
* SUBSCRIBE, or UNSUBSCRIBE before failing the operation.
*
* @param ackTimeoutSeconds
* @param ackTimeoutSec
*
* @return this option object
*/
Mqtt5ClientOptions &WithAckTimeoutSeconds(uint32_t ackTimeoutSeconds) noexcept;
Mqtt5ClientOptions &WithAckTimeoutSec(uint32_t ackTimeoutSec) noexcept;

/**
* Sets callback for transform HTTP request.
Expand Down
78 changes: 75 additions & 3 deletions include/aws/crt/mqtt/Mqtt5Packets.h
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,18 @@ namespace Aws
uint16_t getReceiveMaximumFromServer() const noexcept;

/**
* @deprecated the function is deprecated, please use
* `NegotiatedSettings::getMaximumPacketSizeToServer()`
*
* @return The maximum packet size the server is willing to accept.
*/
uint32_t getMaximumPacketSizeBytes() const noexcept;

/**
* @return The maximum packet size the server is willing to accept.
*/
uint32_t getMaximumPacketSizeToServer() const noexcept;

/**
* @return returns the maximum allowed topic alias value on publishes sent from client to server
*/
Expand All @@ -521,6 +529,17 @@ namespace Aws
*
* @return The maximum amount of time in seconds between client packets.
*/
uint16_t getServerKeepAliveSec() const noexcept;

/**
* @deprecated The function is deprecated, please use `NegotiatedSettings::getServerKeepAliveSec()`
*
* The maximum amount of time in seconds between client packets. The client should use PINGREQs to
* ensure this limit is not breached. The server will disconnect the client for inactivity if no MQTT
* packet is received in a time interval equal to 1.5 x this value.
*
* @return The maximum amount of time in seconds between client packets.
*/
uint16_t getServerKeepAlive() const noexcept;

/**
Expand Down Expand Up @@ -939,6 +958,8 @@ namespace Aws
const Crt::Optional<uint16_t> &getReceiveMaximum() const noexcept;

/**
* @deprecated The function is deprecated, please use `ConnectPacket::getMaximumPacketSizeToServer()`
*
* Notifies the server of the maximum packet size the client is willing to handle. If
* omitted or null, then no limit beyond the natural limits of MQTT packet size is requested.
*
Expand All @@ -949,6 +970,17 @@ namespace Aws
*/
const Crt::Optional<uint32_t> &getMaximumPacketSizeBytes() const noexcept;

/**
* Notifies the server of the maximum packet size the client is willing to handle. If
* omitted or null, then no limit beyond the natural limits of MQTT packet size is requested.
*
* See [MQTT5 Maximum Packet
* Size](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901050)
*
* @return The maximum packet size the client is willing to handle
*/
const Crt::Optional<uint32_t> &getMaximumPacketSizeToServer() const noexcept;

/**
* A time interval, in seconds, that the server should wait (for a session reconnection) before sending
* the will message associated with the connection's session. If omitted or null, the server will send
Expand Down Expand Up @@ -1175,6 +1207,20 @@ namespace Aws
* @return A time interval, in seconds, that the server will persist this connection's MQTT session
* state for.
*/
const Crt::Optional<uint32_t> &getSessionExpiryIntervalSec() const noexcept;

/**
* @deprecated The function is deprecated, please use `ConnAckPacket::getSessionExpiryIntervalSec()`.
*
* A time interval, in seconds, that the server will persist this connection's MQTT session state
* for. If present, this value overrides any session expiry specified in the preceding CONNECT packet.
*
* See [MQTT5 Session Expiry
* Interval](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901082)
*
* @return A time interval, in seconds, that the server will persist this connection's MQTT session
* state for.
*/
const Crt::Optional<uint32_t> &getSessionExpiryInterval() const noexcept;

/**
Expand Down Expand Up @@ -1304,6 +1350,18 @@ namespace Aws
*
* @return Server-requested override of the keep alive interval, in seconds
*/
const Crt::Optional<uint16_t> &getServerKeepAliveSec() const noexcept;

/**
* @deprecated The function is deprecated, please use `ConnAckPacket::getServerKeepAliveSec()`.
* Server-requested override of the keep alive interval, in seconds. If null, the keep alive value sent
* by the client should be used.
*
* See [MQTT5 Server Keep
* Alive](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901094)
*
* @return Server-requested override of the keep alive interval, in seconds
*/
const Crt::Optional<uint16_t> &getServerKeepAlive() const noexcept;

/**
Expand Down Expand Up @@ -1360,7 +1418,7 @@ namespace Aws
* See [MQTT5 Session Expiry
* Interval](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901082)
*/
Crt::Optional<uint32_t> m_sessionExpiryInterval;
Crt::Optional<uint32_t> m_sessionExpiryIntervalSec;

/**
* The maximum amount of in-flight QoS 1 or 2 messages that the server is willing to handle at once. If
Expand Down Expand Up @@ -1455,7 +1513,7 @@ namespace Aws
* See [MQTT5 Server Keep
* Alive](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901094)
*/
Crt::Optional<uint16_t> m_serverKeepAlive;
Crt::Optional<uint16_t> m_serverKeepAliveSec;

/**
* A value that can be used in the creation of a response topic associated with this connection.
Expand Down Expand Up @@ -1880,6 +1938,20 @@ namespace Aws
* @param retain bool
* @return The Subscription Object after setting the reason string.
*/
Subscription &WithRetainAsPublished(bool retain) noexcept;

/**
* @deprecated The function is deprecated, please use `Subscription::WithRetainAsPublished(bool)`.
*
* Sets should the server not send publishes to a client when that client was the one who sent the
* publish? The value will be default to false.
*
* See [MQTT5 Subscription
* Options](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901169)
*
* @param retain bool
* @return The Subscription Object after setting the reason string.
*/
Subscription &WithRetain(bool retain) noexcept;

/**
Expand Down Expand Up @@ -1937,7 +2009,7 @@ namespace Aws
* See [MQTT5 Subscription
* Options](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901169)
*/
bool m_retain;
bool m_retainAsPublished;

/**
* Should retained messages on matching topics be sent in reaction to this subscription? If undefined,
Expand Down
Loading