generated from cpp-best-practices/gui_starter_template
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Topic.hpp
188 lines (159 loc) · 7.01 KB
/
Topic.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#ifndef OPENCMW_CORE_TOPIC_H
#define OPENCMW_CORE_TOPIC_H
#include <TimingCtx.hpp> // hash_combine
#include <URI.hpp>
#include <optional>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <fmt/format.h>
namespace opencmw::mdp {
constexpr bool isValidServiceName(std::string_view str) {
auto is_allowed = [](char ch) {
// std::isalnum is not constexpr
return ch == '/' || ch == '.' || ch == '_' || (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z');
};
return str.starts_with("/") && str.size() > 1 && !str.ends_with("/") && std::ranges::all_of(str, is_allowed);
}
/**
* A topic is the combination of a service name (mandatory) and query parameters (optional).
*
* Examples:
* - /DeviceName/Acquisition - service name only, no params
* - /DeviceName/Acquisition?signal=test - service name (/DeviceName/Acquisition) and query
* - /dashboards - service name (/dashboards)
* - /dashboards/dashboard1?what=header - service name (/dashboards/dashboard1) and query
*
* When encoded as mdp/mds or http/https URLs, the service name is the URL's path (with leading slash), with the
* topic's query parameters being the URI's query parameters (minus e.g. REST-specific parameters like "LongPollingIdx"
* that are not forwarded).
*
* Examples:
* - http://localhost:8080/DeviceName/Acquisition => service: /DeviceName/Acquisition
* - http://localhost:8080/DeviceName/Acquisition?LongPollingIdx=Next&signal=test => service: /DeviceName/Acquisition, query: signal=test
* - mds://localhost:12345/DeviceName/Acquisition?signal=test => service: /DeviceName/Acquisition, query: signal=test
* - mdp://localhost:12345/dashboards/dashboard1?what=header => serviceName: /dashboards/dashboard1, query: what=header
*
* Note that the whole path is considered the service name, and that there's no additional path component denoting
* different entities, objects etc. neither for subscriptions nor GET/SET requests. Requesting specific objects like
* in the "/dashboards/dashboard1" example are handled via the service-matching for requests in the broker, where a
* service name "/dashboard/dashboard1" would match a worker "/dashboard", which then can extract the "dashboard1" component
* from the topic frame (see below).
*
* For subscriptions, only the worker's service name is to be used, any filtering for specific messages is
* done via the query parameters.
*
* On the protocol level, the topic is used in two contexts:
*
* - for ZMQ PUB/SUB subscriptions: ZMQ uses a string-based subscription mechanism, where topics are simple strings
* (allowing trailing wildcards) that must match exactly (or via prefix, if using wildcards). For that purpose
* Topic::toZmqTopic() ensures that the params in e.g. /service?b&a are always ordered alphabetically by key
* (/service?a&b), as /service?a&b and service?b&a are supposed to be equivalent, but wouldn't match with the ZMQ
* mechanism.
* - The OpenCMW MDP topic frame (frame 5), used by the commands GET, SET, SUBSCRIBE, UNSUBSCRIBE, FINAL, PARTIAL and
* NOTIFY: Here the frame contains a URI (with unspecified ordering of query parameters).
**/
struct Topic {
using Params = std::unordered_map<std::string, std::optional<std::string>>;
private:
std::string _service;
Params _params;
public:
Topic(const Topic &other) = default;
Topic &operator=(const Topic &) = default;
Topic(Topic &&) noexcept = default;
Topic &operator=(Topic &&) noexcept = default;
bool operator==(const Topic &) const = default;
/**
* Parses subscription from a "service" or "service?param" string
*
* @param str A string where the first path segment is the service name, e.g. "/service/" or "/service?param"
* @param params Optional query parameters, if non-empty, @p str must not contain query parameters
*/
static Topic fromString(std::string_view str, Params params = {}) {
return Topic(str, std::move(params));
}
template<uri_check CHECK>
static Topic fromMdpTopic(const URI<CHECK> &topic) {
return Topic(topic.path().value_or("/"), topic.queryParamMap());
}
static Topic fromZmqTopic(std::string_view topic) {
return fromString(topic, {});
}
opencmw::URI<STRICT> toMdpTopic() const {
return opencmw::URI<STRICT>::factory().path(_service).setQuery(_params).build();
}
std::string toZmqTopic() const {
using namespace std::string_literals;
std::string zmqTopic = _service;
if (_params.empty()) {
return zmqTopic;
}
zmqTopic += "?"s;
bool isFirst = true;
// sort params
for (const auto &[key, value] : std::map{ _params.begin(), _params.end() }) {
if (!isFirst) {
zmqTopic += "&"s;
}
zmqTopic += key;
if (value) {
zmqTopic += "="s + opencmw::URI<>::encode(*value);
}
isFirst = false;
}
return zmqTopic;
}
[[nodiscard]] std::size_t hash() const noexcept {
std::size_t seed = 0;
opencmw::detail::hash_combine(seed, _service);
for (const auto &[key, value] : _params) {
opencmw::detail::hash_combine(seed, key);
opencmw::detail::hash_combine(seed, value);
}
return seed;
}
std::string_view service() const { return _service; }
const auto ¶ms() const { return _params; }
void addParam(std::string_view key, std::string_view value) {
_params[std::string(key)] = std::string(value);
}
private:
static std::string parseService(std::string_view str) {
if (const auto queryPos = str.find_first_of("?"); queryPos != std::string::npos) {
str = str.substr(0, queryPos);
}
while (str.ends_with("/")) {
str.remove_suffix(1);
}
auto r = std::string(str);
if (!r.starts_with("/")) {
return "/" + r;
}
return r;
}
Topic(std::string_view serviceOrServiceAndQuery, Params params)
: _service(parseService(serviceOrServiceAndQuery))
, _params(std::move(params)) {
if (serviceOrServiceAndQuery.find("?") != std::string::npos) {
if (!_params.empty()) {
throw std::invalid_argument(fmt::format("Parameters are not empty ({}), and there are more in the service string ({})\n", _params, serviceOrServiceAndQuery));
}
const auto parsed = opencmw::URI<RELAXED>(std::string(serviceOrServiceAndQuery));
_params = parsed.queryParamMap();
}
if (!isValidServiceName(_service)) {
throw std::invalid_argument(fmt::format("Invalid service name '{}'\n", _service));
}
}
};
} // namespace opencmw::mdp
namespace std {
template<>
struct hash<opencmw::mdp::Topic> {
std::size_t operator()(const opencmw::mdp::Topic &k) const {
return k.hash();
}
};
} // namespace std
#endif