dds-fmu 0.5.1
DDS-FMU communication integration
DynamicPubSub.hpp
1#pragma once
2
3/*
4 Copyright 2023, SINTEF Ocean
5 This Source Code Form is subject to the terms of the Mozilla Public
6 License, v. 2.0. If a copy of the MPL was not distributed with this
7 file, You can obtain one at http://mozilla.org/MPL/2.0/.
8*/
9
10#include <filesystem>
11#include <functional>
12#include <map>
13
14#include <fastdds/dds/domain/DomainParticipant.hpp>
15#include <fastdds/dds/domain/DomainParticipantListener.hpp>
16#include <fastdds/dds/publisher/DataWriter.hpp>
17#include <fastdds/dds/publisher/Publisher.hpp>
18#include <fastdds/dds/subscriber/DataReader.hpp>
19#include <fastdds/dds/subscriber/Subscriber.hpp>
20#include <fastrtps/types/DynamicPubSubType.h>
21
22#include "CustomKeyFilterFactory.hpp"
23#include "DataMapper.hpp"
24
25namespace cppfmu {
26class Logger;
27}
28
29namespace ddsfmu {
30
31class DomainListener : public eprosima::fastdds::dds::DomainParticipantListener {
32 static const std::map<uint32_t /*eprosima::fastdds::dds::QosPolicyId_t*/, std::string>
33 QosPolicyString;
34
35public:
36 DomainListener() = default;
37 ~DomainListener() override = default;
38
40 eprosima::fastdds::dds::DataReader* reader,
41 const eprosima::fastdds::dds::RequestedIncompatibleQosStatus& status) override {
42 std::ostringstream oss;
43 oss << "The data reader " << reader->guid() << " with topic name '"
44 << reader->get_topicdescription()->get_name() << "' of type '"
45 << reader->get_topicdescription()->get_type_name()
46 << "' has requested incompatible QoS with the one offered by the writer:";
47 for (const auto& policy : status.policies) {
48 if (policy.count > 0) {
49 try {
50 oss << " " << QosPolicyString.at(policy.policy_id) << ", ";
51 } catch (const std::out_of_range& e) { oss << "[Unknown policy!]" << std::endl; }
52 }
53 }
54 oss << std::endl;
55 //std::cout << oss.str();
56 }
57
59 eprosima::fastdds::dds::DataWriter* writer,
60 const eprosima::fastdds::dds::OfferedIncompatibleQosStatus& status) override {
61 std::ostringstream oss;
62 oss << "The data writer " << writer->guid() << " with topic name '"
63 << writer->get_topic()->get_name() << "' of type '" << writer->get_topic()->get_type_name()
64 << "' has offered incompatible QoS with the one requested by the reader:";
65 for (const auto& policy : status.policies) {
66 if (policy.count > 0) {
67 try {
68 oss << " " << QosPolicyString.at(policy.policy_id) << ", ";
69 } catch (const std::out_of_range& e) { oss << "[Unknown policy!]" << std::endl; }
70 }
71 }
72 oss << std::endl;
73 //std::cout << oss.str();
74 }
75
77 eprosima::fastdds::dds::Topic* topic,
78 eprosima::fastdds::dds::InconsistentTopicStatus status) override {
79 // TODO: Not yet implemented by fast-dds and will never trigger (fast-dds v2.11.2)
80 std::ostringstream oss;
81 oss << "There already exist another topic with inconsistent characteristics for topic name '"
82 << topic->get_name() << "' of type '" << topic->get_type_name() << "'" << std::endl;
83 //std::cout << oss.str();
84 }
85};
86
99public:
100 DynamicPubSub();
102 DynamicPubSub(const DynamicPubSub&) = delete;
104
116 void reset(
117 const std::filesystem::path& fmu_resources, DataMapper* const mapper,
118 const std::string& name = "dds-fmu", cppfmu::Logger* const logger = nullptr);
119
125 void write();
126
132 void take();
133
140 void init_key_filters();
141
142private:
143 typedef std::pair<eprosima::xtypes::DynamicData&, eprosima::fastrtps::types::DynamicData_ptr>
144 DynamicDataConnection;
145 enum class PubOrSub {
146 PUBLISH,
147 SUBSCRIBE
148 };
149 DataMapper* m_data_mapper;
150 inline DataMapper& mapper() { return *m_data_mapper; }
151 void clear();
152 bool m_xml_loaded;
153 eprosima::fastdds::dds::DomainParticipant* m_participant;
154 eprosima::fastdds::dds::Publisher* m_publisher;
155 eprosima::fastdds::dds::Subscriber* m_subscriber;
156 DomainListener m_listener;
157 std::map<std::string, std::string> m_topic_to_type;
158 std::map<std::string, eprosima::fastrtps::types::DynamicPubSubType> m_types;
159 std::map<std::string, eprosima::fastdds::dds::Topic*> m_topic_name_ptr;
160 std::map<eprosima::fastdds::dds::DataReader*, eprosima::fastdds::dds::ContentFilteredTopic*>
161 m_reader_topic_filter;
162 std::map<eprosima::fastdds::dds::ContentFilteredTopic*, eprosima::xtypes::DynamicData&>
163 m_filter_data;
164 std::map<eprosima::fastdds::dds::DataWriter*, DynamicDataConnection> m_write_data;
165 std::map<eprosima::fastdds::dds::DataReader*, DynamicDataConnection> m_read_data;
167};
168
169}
Manages mapping between FMU signals and xtypes data storage.
Definition: DataMapper.hpp:36
Definition: DynamicPubSub.hpp:31
void on_offered_incompatible_qos(eprosima::fastdds::dds::DataWriter *writer, const eprosima::fastdds::dds::OfferedIncompatibleQosStatus &status) override
Definition: DynamicPubSub.hpp:58
void on_inconsistent_topic(eprosima::fastdds::dds::Topic *topic, eprosima::fastdds::dds::InconsistentTopicStatus status) override
Definition: DynamicPubSub.hpp:76
~DomainListener() override=default
void on_requested_incompatible_qos(eprosima::fastdds::dds::DataReader *reader, const eprosima::fastdds::dds::RequestedIncompatibleQosStatus &status) override
Definition: DynamicPubSub.hpp:39
Dynamic Publisher and Subscriber.
Definition: DynamicPubSub.hpp:98
DynamicPubSub()
Default constructor sets pointers to nullptr and m_xml_load ed false.
Definition: DynamicPubSub.cpp:76
void take()
Takes DDS data into data in DataMapper.
Definition: DynamicPubSub.cpp:173
DynamicPubSub(const DynamicPubSub &)=delete
Copy constructor.
void init_key_filters()
Initialize content filters for keyed topics.
Definition: DynamicPubSub.cpp:92
~DynamicPubSub()
Destructor calls clear()
Definition: DynamicPubSub.cpp:90
void write()
Writes DDS data by using data from DataMapper.
Definition: DynamicPubSub.cpp:83
DynamicPubSub & operator=(const DynamicPubSub &)=delete
Copy assignment.
void reset(const std::filesystem::path &fmu_resources, DataMapper *const mapper, const std::string &name="dds-fmu", cppfmu::Logger *const logger=nullptr)
Resets all members of DynamicPubSub.
Definition: DynamicPubSub.cpp:240
A custom content filter factory for dynamic types.
Definition: CustomKeyFilterFactory.hpp:31
Definition: DynamicPubSub.hpp:25
Definition: auxiliaries.cpp:26