1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
use crate::{
domain::{
domain_participant::DomainParticipant, domain_participant_factory::THE_PARTICIPANT_FACTORY,
},
implementation::dds_impl::builtin_subscriber::BuiltInSubscriber,
infrastructure::{
condition::StatusCondition,
error::{DdsError, DdsResult},
qos::{DataReaderQos, QosKind, SubscriberQos, TopicQos},
status::{SampleLostStatus, StatusKind},
},
topic_definition::type_support::{DdsDeserialize, DdsType},
};
use crate::{
implementation::{
dds_impl::{
user_defined_data_reader::AnyDataReaderListener,
user_defined_subscriber::UserDefinedSubscriber,
},
utils::shared_object::DdsWeak,
},
infrastructure::instance::InstanceHandle,
};
use crate::topic_definition::topic::Topic;
use super::{
data_reader::DataReader, data_reader_listener::DataReaderListener,
subscriber_listener::SubscriberListener,
};
#[derive(PartialEq, Debug)]
pub(crate) enum SubscriberKind {
BuiltIn(DdsWeak<BuiltInSubscriber>),
UserDefined(DdsWeak<UserDefinedSubscriber>),
}
/// A [`Subscriber`] is the object responsible for the actual reception of the data resulting from its subscriptions.
///
/// A [`Subscriber`] acts on the behalf of one or several [`DataReader`] objects that are related to it. When it receives data (from the
/// other parts of the system), it builds the list of concerned [`DataReader`] objects, and then indicates to the application that data is
/// available, through its listener or by enabling related conditions.
#[derive(PartialEq, Debug)]
pub struct Subscriber(SubscriberKind);
impl Subscriber {
pub(crate) fn new(subscriber_impl: SubscriberKind) -> Self {
Self(subscriber_impl)
}
}
impl Drop for Subscriber {
fn drop(&mut self) {
match &self.0 {
SubscriberKind::BuiltIn(_) => (), // Built-in subscribers don't get deleted
SubscriberKind::UserDefined(subscriber) => {
if subscriber.weak_count() == 1 {
if let Ok(p) = self.get_participant() {
p.delete_subscriber(self).ok();
}
}
}
}
}
}
impl Subscriber {
/// This operation creates a [`DataReader`]. The returned [`DataReader`] will be attached and belong to the [`Subscriber`].
/// The [`DataReader`] returned by this operation has an associated [`Topic`] and a type `Foo`.
/// The [`Topic`] passed to this operation must have been created from the same [`DomainParticipant`] that was used to create this
/// [`Subscriber`]. If the [`Topic`] was created from a different [`DomainParticipant`], the operation will fail and
/// return a [`DdsError::PreconditionNotMet`](crate::infrastructure::error::DdsError). In case of failure, the operation
/// will return an error and no writer will be created.
///
/// The special value [`QosKind::Default`] can be used to indicate that the [`DataReader`] should be created with the
/// default qos set in the factory. The use of this value is equivalent to the application obtaining the default
/// [`DataReaderQos`] by means of the operation [`Subscriber::get_default_datareader_qos`] and using the resulting qos
/// to create the [`DataReader`]. A common application pattern to construct the [`DataReaderQos`] to ensure consistency with the
/// associated [`TopicQos`] is to:
/// 1. Retrieve the QoS policies on the associated [`Topic`] by means of the [`Topic::get_qos`] operation.
/// 2. Retrieve the default [`DataReaderQos`] qos by means of the [`Subscriber::get_default_datareader_qos`] operation.
/// 3. Combine those two qos policies using the [`Subscriber::copy_from_topic_qos`] and selectively modify policies as desired and
/// use the resulting [`DataReaderQos`] to construct the [`DataReader`].
pub fn create_datareader<Foo>(
&self,
a_topic: &Topic<Foo>,
qos: QosKind<DataReaderQos>,
a_listener: Option<Box<dyn DataReaderListener<Foo = Foo> + Send + Sync>>,
mask: &[StatusKind],
) -> DdsResult<DataReader<Foo>>
where
Foo: DdsType + for<'de> DdsDeserialize<'de> + 'static,
{
match &self.0 {
SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
SubscriberKind::UserDefined(s) =>
{
#[allow(clippy::redundant_closure)]
s.upgrade()?
.create_datareader::<Foo>(
&a_topic.0.upgrade()?,
qos,
a_listener.map::<Box<dyn AnyDataReaderListener + Send + Sync>, _>(|x| {
Box::new(x)
}),
mask,
&THE_PARTICIPANT_FACTORY
.lookup_participant_by_entity_handle(self.get_instance_handle()?),
)
.map(|x| DataReader::new(x.downgrade()))
}
}
}
/// This operation deletes a [`DataReader`] that belongs to the [`Subscriber`]. This operation must be called on the
/// same [`Subscriber`] object used to create the [`DataReader`]. If [`Subscriber::delete_datareader`] is called on a
/// different [`Subscriber`], the operation will have no effect and it will return [`DdsError::PreconditionNotMet`](crate::infrastructure::error::DdsError).
pub fn delete_datareader<Foo>(&self, a_datareader: &DataReader<Foo>) -> DdsResult<()>
where
Foo: DdsType + for<'de> DdsDeserialize<'de> + 'static,
{
match &self.0 {
SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
SubscriberKind::UserDefined(s) => s
.upgrade()?
.delete_datareader(a_datareader.get_instance_handle()?),
}
}
/// This operation retrieves a previously created [`DataReader`] belonging to the [`Subscriber`] that is attached to a [`Topic`].
/// If no such [`DataReader`] exists, the operation will succeed but return [`None`].
/// If multiple [`DataReader`] attached to the [`Subscriber`] satisfy this condition, then the operation will return one of them. It is not
/// specified which one.
/// The use of this operation on the built-in [`Subscriber`] allows access to the built-in [`DataReader`] entities for the built-in topics.
pub fn lookup_datareader<Foo>(&self, topic: &Topic<Foo>) -> DdsResult<Option<DataReader<Foo>>>
where
Foo: DdsType + for<'de> DdsDeserialize<'de>,
{
match &self.0 {
SubscriberKind::BuiltIn(s) => s
.upgrade()?
.lookup_datareader::<Foo>(&topic.0.upgrade()?)
.map(|x| Some(DataReader::new(x.downgrade()))),
SubscriberKind::UserDefined(s) => s
.upgrade()?
.lookup_datareader::<Foo>(&topic.0.upgrade()?)
.map(|x| Some(DataReader::new(x.downgrade()))),
}
}
/// This operation invokes the operation [`DataReaderListener::on_data_available`] on the listener objects attached to contained [`DataReader`]
/// entities with a [`StatusKind::DataAvailable`] that is considered changed.
/// This operation is typically invoked from the [`SubscriberListener::on_data_on_readers`] operation. That way the
/// [`SubscriberListener`] can delegate to the [`DataReaderListener`] objects the handling of the data.
pub fn notify_datareaders(&self) -> DdsResult<()> {
match &self.0 {
SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
SubscriberKind::UserDefined(s) => s.upgrade()?.notify_datareaders(),
}
}
/// This operation returns the [`DomainParticipant`] to which the [`Subscriber`] belongs.
pub fn get_participant(&self) -> DdsResult<DomainParticipant> {
let dp = THE_PARTICIPANT_FACTORY
.lookup_participant_by_entity_handle(self.get_instance_handle()?);
Ok(DomainParticipant::new(dp.downgrade()))
}
/// This operation allows access to the [`SampleLostStatus`].
pub fn get_sample_lost_status(&self) -> DdsResult<SampleLostStatus> {
match &self.0 {
SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
SubscriberKind::UserDefined(s) => s.upgrade()?.get_sample_lost_status(),
}
}
/// This operation deletes all the entities that were created by means of the [`Subscriber::create_datareader`] operations.
/// That is, it deletes all contained [`DataReader`] objects.
/// he operation will return [`DdsError::PreconditionNotMet`](crate::infrastructure::error::DdsError) if the any of the
/// contained entities is in a state where it cannot be deleted.
/// Once this operation returns successfully, the application may delete the [`Subscriber`] knowing that it has no
/// contained [`DataReader`] objects.
pub fn delete_contained_entities(&self) -> DdsResult<()> {
match &self.0 {
SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
SubscriberKind::UserDefined(s) => s.upgrade()?.delete_contained_entities(),
}
}
/// This operation sets a default value of the [`DataReaderQos`] which will be used for newly created [`DataReader`] entities in
/// the case where the qos policies are defaulted in the [`Subscriber::create_datareader`] operation.
/// This operation will check that the resulting policies are self consistent; if they are not, the operation will have no effect and
/// return [`DdsError::InconsistentPolicy`](crate::infrastructure::error::DdsError).
/// The special value [`QosKind::Default`] may be passed to this operation to indicate that the default qos should be
/// reset back to the initial values the factory would use, that is the default value of [`DataReaderQos`].
pub fn set_default_datareader_qos(&self, qos: QosKind<DataReaderQos>) -> DdsResult<()> {
match &self.0 {
SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
SubscriberKind::UserDefined(s) => s.upgrade()?.set_default_datareader_qos(qos),
}
}
/// This operation retrieves the default value of the [`DataReaderQos`], that is, the qos policies which will be used for newly
/// created [`DataReader`] entities in the case where the qos policies are defaulted in the [`Subscriber::create_datareader`] operation.
/// The values retrieved by this operation will match the set of values specified on the last successful call to
/// [`Subscriber::get_default_datareader_qos`], or else, if the call was never made, the default values of [`DataReaderQos`].
pub fn get_default_datareader_qos(&self) -> DdsResult<DataReaderQos> {
match &self.0 {
SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
SubscriberKind::UserDefined(s) => s.upgrade()?.get_default_datareader_qos(),
}
}
/// This operation copies the policies in the `a_topic_qos` to the corresponding policies in the `a_datareader_qos`.
/// This is a “convenience” operation most useful in combination with the operations [`Subscriber::get_default_datareader_qos`] and
/// [`Topic::get_qos`]. This operation can be used to merge the [`DataReader`] default qos policies with the
/// corresponding ones on the [`Topic`]. The resulting qos can then be used to create a new [`DataReader`], or set its qos.
/// This operation does not check the resulting `a_datareader_qos` for consistency. This is because the merged `a_datareader_qos`
/// may not be the final one, as the application can still modify some policies prior to applying the policies to the [`DataReader`].
pub fn copy_from_topic_qos(
a_datareader_qos: &mut DataReaderQos,
a_topic_qos: &TopicQos,
) -> DdsResult<()> {
UserDefinedSubscriber::copy_from_topic_qos(a_datareader_qos, a_topic_qos)
}
}
impl Subscriber {
/// This operation is used to set the QoS policies of the Entity and replacing the values of any policies previously set.
/// Certain policies are “immutable;” they can only be set at Entity creation time, or before the entity is made enabled.
/// If [`Self::set_qos()`] is invoked after the Entity is enabled and it attempts to change the value of an “immutable” policy, the operation will
/// fail and returns [`DdsError::ImmutablePolicy`](crate::infrastructure::error::DdsError).
/// Certain values of QoS policies can be incompatible with the settings of the other policies. This operation will also fail if it specifies
/// a set of values that once combined with the existing values would result in an inconsistent set of policies. In this case,
/// the return value is [`DdsError::InconsistentPolicy`](crate::infrastructure::error::DdsError).
/// The existing set of policies are only changed if the [`Self::set_qos()`] operation succeeds. This is indicated by the [`Ok`] return value. In all
/// other cases, none of the policies is modified.
/// The parameter `qos` can be set to [`QosKind::Default`] to indicate that the QoS of the Entity should be changed to match the current default QoS set in the Entity’s factory.
/// The operation [`Self::set_qos()`] cannot modify the immutable QoS so a successful return of the operation indicates that the mutable QoS for the Entity has been
/// modified to match the current default for the Entity’s factory.
pub fn set_qos(&self, qos: QosKind<SubscriberQos>) -> DdsResult<()> {
match &self.0 {
SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
SubscriberKind::UserDefined(s) => s.upgrade()?.set_qos(qos),
}
}
/// This operation allows access to the existing set of [`SubscriberQos`] policies.
pub fn get_qos(&self) -> DdsResult<SubscriberQos> {
Ok(match &self.0 {
SubscriberKind::BuiltIn(s) => s.upgrade()?.get_qos(),
SubscriberKind::UserDefined(s) => s.upgrade()?.get_qos(),
})
}
/// This operation installs a Listener on the Entity. The listener will only be invoked on the changes of communication status
/// indicated by the specified mask. It is permitted to use [`None`] as the value of the listener. The [`None`] listener behaves
/// as a Listener whose operations perform no action.
/// Only one listener can be attached to each Entity. If a listener was already set, the operation [`Self::set_listener()`] will replace it with the
/// new one. Consequently if the value [`None`] is passed for the listener parameter to the [`Self::set_listener()`] operation, any existing listener
/// will be removed.
pub fn set_listener(
&self,
a_listener: Option<Box<dyn SubscriberListener>>,
mask: &[StatusKind],
) -> DdsResult<()> {
match &self.0 {
SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
SubscriberKind::UserDefined(s) => s.upgrade()?.set_listener(a_listener, mask),
}
}
/// This operation allows access to the existing Listener attached to the Entity.
pub fn get_listener(&self) -> DdsResult<Option<Box<dyn SubscriberListener>>> {
match &self.0 {
SubscriberKind::BuiltIn(s) => s.upgrade()?.get_listener(),
SubscriberKind::UserDefined(s) => s.upgrade()?.get_listener(),
}
}
/// This operation allows access to the [`StatusCondition`] associated with the Entity. The returned
/// condition can then be added to a [`WaitSet`](crate::infrastructure::wait_set::WaitSet) so that the application can wait for specific status changes
/// that affect the Entity.
pub fn get_statuscondition(&self) -> DdsResult<StatusCondition> {
match &self.0 {
SubscriberKind::BuiltIn(s) => s.upgrade()?.get_statuscondition(),
SubscriberKind::UserDefined(s) => s.upgrade()?.get_statuscondition(),
}
}
/// This operation retrieves the list of communication statuses in the Entity that are ‘triggered.’ That is, the list of statuses whose
/// value has changed since the last time the application read the status.
/// When the entity is first created or if the entity is not enabled, all communication statuses are in the “untriggered” state so the
/// list returned by the [`Self::get_status_changes`] operation will be empty.
/// The list of statuses returned by the [`Self::get_status_changes`] operation refers to the status that are triggered on the Entity itself
/// and does not include statuses that apply to contained entities.
pub fn get_status_changes(&self) -> DdsResult<Vec<StatusKind>> {
match &self.0 {
SubscriberKind::BuiltIn(s) => s.upgrade()?.get_status_changes(),
SubscriberKind::UserDefined(s) => s.upgrade()?.get_status_changes(),
}
}
/// This operation enables the Entity. Entity objects can be created either enabled or disabled. This is controlled by the value of
/// the [`EntityFactoryQosPolicy`](crate::infrastructure::qos_policy::EntityFactoryQosPolicy) on the corresponding factory for the Entity.
/// The default setting of [`EntityFactoryQosPolicy`](crate::infrastructure::qos_policy::EntityFactoryQosPolicy) is such that, by default, it is not necessary to explicitly call enable on newly
/// created entities.
/// The [`Self::enable()`] operation is idempotent. Calling [`Self::enable()`] on an already enabled Entity returns [`Ok`] and has no effect.
/// If an Entity has not yet been enabled, the following kinds of operations may be invoked on it:
/// - Operations to set or get an Entity’s QoS policies (including default QoS policies) and listener
/// - [`Self::get_statuscondition()`]
/// - Factory and lookup operations
/// - [`Self::get_status_changes()`] and other get status operations (although the status of a disabled entity never changes)
/// Other operations may explicitly state that they may be called on disabled entities; those that do not will return the error
/// NotEnabled.
/// It is legal to delete an Entity that has not been enabled by calling the proper operation on its factory.
/// Entities created from a factory that is disabled, are created disabled regardless of the setting of the
/// [`EntityFactoryQosPolicy`](crate::infrastructure::qos_policy::EntityFactoryQosPolicy).
/// Calling enable on an Entity whose factory is not enabled will fail and return [`DdsError::PreconditionNotMet`](crate::infrastructure::error::DdsError).
/// If the `autoenable_created_entities` field of [`EntityFactoryQosPolicy`](crate::infrastructure::qos_policy::EntityFactoryQosPolicy) is set to [`true`], the [`Self::enable()`] operation on the factory will
/// automatically enable all entities created from the factory.
/// The Listeners associated with an entity are not called until the entity is enabled. Conditions associated with an entity that is not
/// enabled are “inactive”, that is, the operation [`StatusCondition::get_trigger_value()`] will always return `false`.
pub fn enable(&self) -> DdsResult<()> {
match &self.0 {
SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
SubscriberKind::UserDefined(s) => s.upgrade()?.enable(
&THE_PARTICIPANT_FACTORY
.lookup_participant_by_entity_handle(self.get_instance_handle()?),
),
}
}
/// This operation returns the [`InstanceHandle`] that represents the Entity.
pub fn get_instance_handle(&self) -> DdsResult<InstanceHandle> {
Ok(match &self.0 {
SubscriberKind::BuiltIn(s) => s.upgrade()?.get_instance_handle(),
SubscriberKind::UserDefined(s) => s.upgrade()?.get_instance_handle(),
})
}
}