-
Notifications
You must be signed in to change notification settings - Fork 5
Create DDS Transport Protocol #1144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
* raw rospubsub and benchmarks * typefixes, shm added to the benchmark * SHM is not so important to tell us every time when it starts * greptile comments * Add co-authorship line to commit message filter patterns * Remove unused contextmanager import --------- Co-authored-by: Ivan Nikolic <lesh@sysphere.org>
Replace base64 string encoding with native IDL bytearray type to eliminate buffer overflow issues. The original base64 encoding exceeded CycloneDDS's default string size limit (~256 bytes) and caused crashes on messages >= 1KB. Key changes: - Use make_idl_struct with bytearray field instead of string - Convert bytes to bytearray when publishing to DDS - Convert bytearray back to bytes when receiving from DDS - Add _DDSMessageListener for async message dispatch - Implement thread-safe DataWriter/DataReader management - Add pickle support via __getstate__/__setstate__ Result: All 12 DDS benchmark tests pass (64B to 10MB messages).
…or Message payload.
The double-checked locking pattern avoids lock contention on every call after initial object creation. Initial benchmarking shows this pattern performs better than simple locking for repeated accesses to the same topics.
Greptile OverviewGreptile SummaryThis PR adds DDS (Data Distribution Service) transport protocol support to dimos, implementing a new communication layer using Cyclone DDS alongside the existing LCM transport. Key Changes
Issues Found
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant App as Application
participant DDSTransport as DDSTransport
participant DDS as DDS
participant DDSService as DDSService
participant DP as DomainParticipant
participant Writer as DataWriter
participant Reader as DataReader
participant Listener as Listener
Note over App,Listener: Publishing Flow
App->>DDSTransport: broadcast(msg)
DDSTransport->>DDSTransport: check _started
alt not started
DDSTransport->>DDSTransport: start()
DDSTransport->>DDS: start()
end
DDSTransport->>DDS: publish(topic, msg)
DDS->>DDS: _get_writer(topic)
alt writer not exists
DDS->>DDSService: get_participant()
DDSService->>DDSService: double-checked lock
alt participant not exists
DDSService->>DP: new(domain_id)
end
DDSService-->>DDS: participant
DDS->>Writer: new(participant, topic)
DDS->>DDS: cache writer
end
DDS->>Writer: write(msg)
Writer->>DP: publish to network
Note over App,Listener: Subscription Flow
App->>DDSTransport: subscribe(callback)
DDSTransport->>DDSTransport: check _started
alt not started
DDSTransport->>DDSTransport: start()
end
DDSTransport->>DDS: subscribe(topic, wrapped_cb)
DDS->>DDS: register callback
DDS->>DDS: _get_reader(topic)
alt reader not exists
DDS->>DDSService: get_participant()
DDSService-->>DDS: participant
DDS->>Listener: new(topic, callbacks_dict)
DDS->>Reader: new(participant, topic, listener)
DDS->>DDS: cache reader
end
DDS-->>DDSTransport: unsubscribe_fn
DDSTransport-->>App: unsubscribe_fn
Note over App,Listener: Message Receive Flow
DP->>Reader: data available
Reader->>Listener: on_data_available(reader)
Listener->>Reader: take()
Reader-->>Listener: samples
loop for each sample
loop for each callback
Listener->>App: callback(sample, topic)
end
end
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 files reviewed, 3 comments
| def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]: | ||
| """Subscribe to a DDS topic with a callback.""" | ||
| with self._callback_lock: | ||
| if topic not in self._callbacks: | ||
| self._callbacks[topic] = [] | ||
| self._callbacks[topic].append(callback) | ||
| self._get_reader(topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition: callbacks added after reader/listener creation
The listener references self._callbacks[topic] at line 59 when created (line 110), but callbacks are added at lines 119-121. If a message arrives between reader creation and callback addition, the listener will dispatch to an empty callback list, potentially missing early messages.
| def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]: | |
| """Subscribe to a DDS topic with a callback.""" | |
| with self._callback_lock: | |
| if topic not in self._callbacks: | |
| self._callbacks[topic] = [] | |
| self._callbacks[topic].append(callback) | |
| self._get_reader(topic) | |
| def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]: | |
| """Subscribe to a DDS topic with a callback.""" | |
| with self._callback_lock: | |
| if topic not in self._callbacks: | |
| self._callbacks[topic] = [] | |
| self._callbacks[topic].append(callback) | |
| # Create reader after callback is registered | |
| if topic not in self._readers: | |
| with self._reader_lock: | |
| if topic not in self._readers: | |
| dds_topic = DDSTopic(self.get_participant(), topic.name, topic.typename) | |
| listener = _DDSMessageListener(topic, self._callbacks) | |
| self._readers[topic] = DDSDataReader( | |
| self.get_participant(), dds_topic, listener=listener | |
| ) | |
| return lambda: self._unsubscribe_callback(topic, callback) |
| def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] | ||
| if not self._started: | ||
| self.start() | ||
| self.dds.publish(self.topic, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition: _started check without synchronization
Multiple threads calling broadcast() or subscribe() simultaneously can both see _started=False and call start() multiple times. Need thread-safe initialization.
| def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] | |
| if not self._started: | |
| self.start() | |
| self.dds.publish(self.topic, msg) | |
| def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] | |
| if not self._started: | |
| with self._start_lock: | |
| if not self._started: | |
| self.start() | |
| self.dds.publish(self.topic, msg) |
(Also add self._start_lock = threading.Lock() in __init__)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override] | ||
| if not self._started: | ||
| self.start() | ||
| return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition: same _started synchronization issue
| def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override] | |
| if not self._started: | |
| self.start() | |
| return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] | |
| def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override] | |
| if not self._started: | |
| with self._start_lock: | |
| if not self._started: | |
| self.start() | |
| return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] |
No description provided.