Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[config] cleanup #1655

Merged
merged 5 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ecal/core/include/ecal/config/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ namespace eCAL
struct Configuration
{
bool enable; //!< enable layer

size_t num_executor_reader{}; //!< reader amount of threads that shall execute workload (Default: 4)
size_t num_executor_writer{}; //!< writer amount of threads that shall execute workload (Default: 4)
};
}

Expand Down
5 changes: 5 additions & 0 deletions ecal/core/include/ecal/config/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ namespace eCAL
struct Configuration
{
bool enable; //!< enable layer

size_t num_executor_reader{}; //!< reader amount of threads that shall execute workload (Default: 4)
size_t num_executor_writer{}; //!< writer amount of threads that shall execute workload (Default: 4)

size_t max_reconnections{}; //!< reconnection attemps the session will try to reconnect in (Default: 5)
};
}

Expand Down
20 changes: 1 addition & 19 deletions ecal/core/include/ecal/config/transport_layer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,12 @@ namespace eCAL
{
namespace TransportLayer
{
namespace TCPPubSub
{
struct Configuration
{
size_t num_executor_reader{}; //!< Tcp_pubsub reader amount of threads that shall execute workload (Default: 4)
size_t num_executor_writer{}; //!< Tcp_pubsub writer amount of threads that shall execute workload (Default: 4)
size_t max_reconnections{}; //!< Tcp_pubsub reconnection attemps the session will try to reconnect in (Default: 5)
};
}

namespace SHM
{
struct Configuration
{
std::string host_group_name{}; /*!< Common host group name that enables interprocess mechanisms across
(virtual) host borders (e.g, Docker); by default equivalent to local host name (Default: "")*/
Types::ConstrainedInteger<4096, 4096> memfile_minsize{}; //!< Default memory file size for new publisher (Default: 4096)
Types::ConstrainedInteger<50, 1, 100> memfile_reserve{}; //!< Dynamic file size reserve before recreating memory file if topic size changes in % (Default: 50)
unsigned int memfile_ack_timeout{}; //!< Publisher timeout for ack event from subscriber that memory file content is processed (Default: 0)
Types::ConstrainedInteger<0, 1> memfile_buffer_count{}; //!< Number of parallel used memory file buffers for 1:n publish/subscribe ipc connections (Default = 1)
bool drop_out_of_order_messages{}; //!< (Default: )
bool memfile_zero_copy{}; //!< Allow matching subscriber to access memory file without copying its content in advance (Default: false)
};
(virtual) host borders (e.g, Docker); by default equivalent to local host name (Default: "")*/ };
}

namespace UDPMC
Expand Down Expand Up @@ -85,7 +68,6 @@ namespace eCAL
{
bool drop_out_of_order_messages{}; //!< Enable dropping of payload messages that arrive out of order (Default: false)
UDPMC::Configuration mc_options{};
TCPPubSub::Configuration tcp_options{};
SHM::Configuration shm_options{};
};
}
Expand Down
19 changes: 7 additions & 12 deletions ecal/core/include/ecal/ecal_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ namespace eCAL
ECAL_API int GetTcpPubsubWriterThreadpoolSize ();
ECAL_API int GetTcpPubsubMaxReconnectionAttemps ();

ECAL_API int GetTcpPubReaderThreadpoolSize ();
ECAL_API int GetTcpPubWriterThreadpoolSize ();

ECAL_API int GetTcpSubReaderThreadpoolSize ();
ECAL_API int GetTcpSubWriterThreadpoolSize ();
ECAL_API int GetTcpSubMaxReconnectionAttemps ();

ECAL_API std::string GetHostGroupName ();

/////////////////////////////////////
Expand Down Expand Up @@ -104,16 +111,6 @@ namespace eCAL
/////////////////////////////////////
// publisher
/////////////////////////////////////
ECAL_API bool GetPublisherShmMode ();
ECAL_API bool GetPublisherTcpMode ();
ECAL_API bool GetPublisherUdpMulticastMode ();

ECAL_API size_t GetMemfileMinsizeBytes ();
ECAL_API size_t GetMemfileOverprovisioningPercentage ();
ECAL_API int GetMemfileAckTimeoutMs ();
ECAL_API bool IsMemfileZerocopyEnabled ();
ECAL_API size_t GetMemfileBufferCount ();

ECAL_API bool IsTopicTypeSharingEnabled ();
ECAL_API bool IsTopicDescriptionSharingEnabled ();

Expand All @@ -128,8 +125,6 @@ namespace eCAL
/////////////////////////////////////
namespace Experimental
{
ECAL_API bool IsShmMonitoringEnabled ();
ECAL_API bool IsNetworkMonitoringDisabled ();
ECAL_API size_t GetShmMonitoringQueueSize ();
ECAL_API std::string GetShmMonitoringDomain ();
ECAL_API bool GetDropOutOfOrderMessages ();
Expand Down
25 changes: 11 additions & 14 deletions ecal/core/src/config/ecal_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,17 @@ namespace eCAL

ECAL_API bool IsNpcapEnabled () { return GetConfiguration().transport_layer.mc_options.npcap_enabled; }

ECAL_API int GetTcpPubsubReaderThreadpoolSize () { return static_cast<int>(GetConfiguration().transport_layer.tcp_options.num_executor_reader); }
ECAL_API int GetTcpPubsubWriterThreadpoolSize () { return static_cast<int>(GetConfiguration().transport_layer.tcp_options.num_executor_writer); }
ECAL_API int GetTcpPubsubMaxReconnectionAttemps () { return static_cast<int>(GetConfiguration().transport_layer.tcp_options.max_reconnections); }
ECAL_API int GetTcpPubReaderThreadpoolSize () { return static_cast<int>(GetConfiguration().publisher.tcp.num_executor_reader); }
ECAL_API int GetTcpPubWriterThreadpoolSize () { return static_cast<int>(GetConfiguration().publisher.tcp.num_executor_writer); }

ECAL_API int GetTcpSubReaderThreadpoolSize () { return static_cast<int>(GetConfiguration().subscriber.tcp.num_executor_reader); }
ECAL_API int GetTcpSubWriterThreadpoolSize () { return static_cast<int>(GetConfiguration().subscriber.tcp.num_executor_writer); }
ECAL_API int GetTcpSubMaxReconnectionAttemps () { return static_cast<int>(GetConfiguration().subscriber.tcp.max_reconnections); }

// Keep this until new logic is implemented
ECAL_API int GetTcpPubsubReaderThreadpoolSize () { return GetTcpSubReaderThreadpoolSize(); };
ECAL_API int GetTcpPubsubWriterThreadpoolSize () { return GetTcpSubWriterThreadpoolSize(); };
ECAL_API int GetTcpPubsubMaxReconnectionAttemps () { return GetTcpSubMaxReconnectionAttemps();};

ECAL_API std::string GetHostGroupName () { return GetConfiguration().transport_layer.shm_options.host_group_name; }

Expand Down Expand Up @@ -156,17 +164,6 @@ namespace eCAL
/////////////////////////////////////
// publisher
/////////////////////////////////////

ECAL_API bool GetPublisherUdpMulticastMode () { return GetConfiguration().publisher.udp.enable; }
ECAL_API bool GetPublisherShmMode () { return GetConfiguration().publisher.shm.enable; }
ECAL_API bool GetPublisherTcpMode () { return GetConfiguration().publisher.tcp.enable; }

ECAL_API size_t GetMemfileMinsizeBytes () { return GetConfiguration().transport_layer.shm_options.memfile_minsize; }
ECAL_API size_t GetMemfileOverprovisioningPercentage () { return GetConfiguration().transport_layer.shm_options.memfile_reserve; }
ECAL_API int GetMemfileAckTimeoutMs () { return GetConfiguration().transport_layer.shm_options.memfile_ack_timeout; }
ECAL_API bool IsMemfileZerocopyEnabled () { return GetConfiguration().transport_layer.shm_options.memfile_zero_copy; }
ECAL_API size_t GetMemfileBufferCount () { return GetConfiguration().transport_layer.shm_options.memfile_buffer_count; }

ECAL_API bool IsTopicTypeSharingEnabled () { return GetConfiguration().registration.share_ttype; }
ECAL_API bool IsTopicDescriptionSharingEnabled () { return GetConfiguration().registration.share_tdesc; }

Expand Down
23 changes: 8 additions & 15 deletions ecal/core/src/config/ecal_config_initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,8 @@ namespace eCAL
multicastOptions.join_all_interfaces = iniConfig.get(NETWORK, "multicast_join_all_if", NET_UDP_MULTICAST_JOIN_ALL_IF_ENABLED);
multicastOptions.npcap_enabled = iniConfig.get(NETWORK, "npcap_enabled", NET_NPCAP_ENABLED);

auto& tcpPubSubOptions = transportLayerOptions.tcp_options;
tcpPubSubOptions.num_executor_reader = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_reader", NET_TCP_PUBSUB_NUM_EXECUTOR_READER);
tcpPubSubOptions.num_executor_writer = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_writer", NET_TCP_PUBSUB_NUM_EXECUTOR_WRITER);
tcpPubSubOptions.max_reconnections = iniConfig.get(NETWORK, "tcp_pubsup_max_reconnections", NET_TCP_PUBSUB_MAX_RECONNECTIONS);

auto& shmOptions = transportLayerOptions.shm_options;
shmOptions.host_group_name = iniConfig.get(NETWORK, "host_group_name", NET_HOST_GROUP_NAME);
shmOptions.memfile_minsize = iniConfig.get(PUBLISHER, "memfile_minsize", PUB_MEMFILE_MINSIZE);
shmOptions.memfile_reserve = iniConfig.get(PUBLISHER, "memfile_reserve", PUB_MEMFILE_RESERVE);
shmOptions.memfile_ack_timeout = iniConfig.get(PUBLISHER, "memfile_ack_timeout", PUB_MEMFILE_ACK_TO);
shmOptions.memfile_buffer_count = iniConfig.get(PUBLISHER, "memfile_buffer_count", PUB_MEMFILE_BUF_COUNT);
shmOptions.drop_out_of_order_messages = iniConfig.get(EXPERIMENTAL, "drop_out_of_order_messages", EXP_DROP_OUT_OF_ORDER_MESSAGES);
shmOptions.memfile_zero_copy = iniConfig.get(PUBLISHER, "memfile_zero_copy", PUB_MEMFILE_ZERO_COPY);

// registration options
auto registrationTimeout = iniConfig.get(COMMON, "registration_timeout", CMN_REGISTRATION_TO);
Expand Down Expand Up @@ -170,17 +159,19 @@ namespace eCAL
// subscriber options
auto& subscriberOptions = subscriber;
subscriberOptions.shm.enable = iniConfig.get(NETWORK, "shm_rec_enabled", NET_SHM_REC_ENABLED) != 0;
subscriberOptions.tcp.enable = iniConfig.get(NETWORK, "tcp_rec_enabled", NET_TCP_REC_ENABLED) != 0;

subscriberOptions.tcp.enable = iniConfig.get(NETWORK, "tcp_rec_enabled", NET_TCP_REC_ENABLED) != 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: implicit conversion bool -> 'int' [readability-implicit-bool-conversion]

Suggested change
subscriberOptions.tcp.enable = iniConfig.get(NETWORK, "tcp_rec_enabled", NET_TCP_REC_ENABLED) != 0;
subscriberOptions.tcp.enable = static_cast<int>(iniConfig.get(NETWORK, "tcp_rec_enabled", NET_TCP_REC_ENABLED)) != 0;

subscriberOptions.tcp.max_reconnections = iniConfig.get(NETWORK, "tcp_pubsup_max_reconnections", NET_TCP_PUBSUB_MAX_RECONNECTIONS);
subscriberOptions.tcp.num_executor_reader = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_reader", NET_TCP_PUBSUB_NUM_EXECUTOR_READER);
subscriberOptions.tcp.num_executor_writer = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_writer", NET_TCP_PUBSUB_NUM_EXECUTOR_WRITER);

subscriberOptions.udp.enable = iniConfig.get(NETWORK, "udp_mc_rec_enabled", NET_UDP_MC_REC_ENABLED) != 0;

// publisher options
auto& publisherOptions = publisher;
publisherOptions.shm.enable = iniConfig.get(PUBLISHER, "use_shm", static_cast<int>(PUB_USE_SHM)) != 0;
publisherOptions.shm.zero_copy_mode = iniConfig.get(PUBLISHER, "memfile_zero_copy", PUB_MEMFILE_ZERO_COPY);
publisherOptions.shm.acknowledge_timeout_ms = iniConfig.get(PUBLISHER, "memfile_ack_timeout", PUB_MEMFILE_ACK_TO);
publisherOptions.shm.memfile_min_size_bytes = iniConfig.get(PUBLISHER, "memfile_minsize", PUB_MEMFILE_MINSIZE);
publisherOptions.shm.memfile_reserve_percent = iniConfig.get(PUBLISHER, "memfile_reserve", PUB_MEMFILE_RESERVE);
publisherOptions.shm.memfile_buffer_count = iniConfig.get(PUBLISHER, "memfile_buffer_count", PUB_MEMFILE_BUF_COUNT);

publisherOptions.udp.enable = iniConfig.get(PUBLISHER, "use_udp_mc", static_cast<int>(PUB_USE_UDP_MC)) != 0;
// TODO PG: Add here when its available in config file
Expand All @@ -191,6 +182,8 @@ namespace eCAL
publisherOptions.share_topic_type = iniConfig.get(PUBLISHER, "share_ttype", PUB_SHARE_TTYPE);

publisherOptions.tcp.enable = iniConfig.get(PUBLISHER, "use_tcp", static_cast<int>(PUB_USE_TCP)) != 0;
publisherOptions.tcp.num_executor_reader = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_reader", NET_TCP_PUBSUB_NUM_EXECUTOR_READER);
publisherOptions.tcp.num_executor_writer = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_writer", NET_TCP_PUBSUB_NUM_EXECUTOR_WRITER);

// timesync options
auto& timesyncOptions = timesync;
Expand Down
5 changes: 0 additions & 5 deletions ecal/tests/cpp/config_test/src/config_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,6 @@ TEST(ConfigDeathTest, user_config_death_test)
SetValue(custom_config.transport_layer.mc_options.sndbuf, (5242880 + 512)),
std::invalid_argument);

// Value exceeds MAX. Default MAX = 100
ASSERT_THROW(
SetValue(custom_config.transport_layer.shm_options.memfile_reserve, 150),
std::invalid_argument);

// Test the registration option limits
// Refresh timeout > registration timeout
ASSERT_THROW(
Expand Down
Loading