Skip to content

Conversation

@Kaweees
Copy link
Member

@Kaweees Kaweees commented Jan 28, 2026

No description provided.

Kaweees and others added 22 commits January 15, 2026 18:06
* raw rospubsub and benchmarks

* typefixes, shm added to the benchmark

* SHM is not so important to tell us every time when it starts

* greptile comments

* Add co-authorship line to commit message filter patterns

* Remove unused contextmanager import

---------

Co-authored-by: Ivan Nikolic <lesh@sysphere.org>
Replace base64 string encoding with native IDL bytearray type to eliminate
buffer overflow issues. The original base64 encoding exceeded CycloneDDS's
default string size limit (~256 bytes) and caused crashes on messages >= 1KB.

Key changes:
- Use make_idl_struct with bytearray field instead of string
- Convert bytes to bytearray when publishing to DDS
- Convert bytearray back to bytes when receiving from DDS
- Add _DDSMessageListener for async message dispatch
- Implement thread-safe DataWriter/DataReader management
- Add pickle support via __getstate__/__setstate__

Result: All 12 DDS benchmark tests pass (64B to 10MB messages).
The double-checked locking pattern avoids lock contention on every
call after initial object creation. Initial benchmarking shows this
pattern performs better than simple locking for repeated accesses
to the same topics.
@greptile-apps
Copy link

greptile-apps bot commented Jan 28, 2026

Greptile Overview

Greptile Summary

This PR adds DDS (Data Distribution Service) transport protocol support to dimos, implementing a new communication layer using Cyclone DDS alongside the existing LCM transport.

Key Changes

  • New DDS Transport Implementation: Added DDSTransport class in dimos/core/transport.py following the same pattern as LCMTransport, providing DDS-based pub/sub communication
  • DDS Service Layer: Created DDSService with singleton DomainParticipant management using double-checked locking for thread-safe initialization
  • DDS PubSub Protocol: Implemented DDSPubSubBase and DDS classes with callback-based subscriptions and automatic DataReader/DataWriter management
  • ROS 2 Support: Added comprehensive ROS 2 pub/sub implementation (RawROS, DimosROS) with proper threading, executor management, and QoS configuration
  • Benchmark Suite: Added extensive throughput benchmarking infrastructure to compare DDS, LCM, and other transports across message sizes from 64B to 10MB
  • Dependencies: Added cyclonedds>=0.10.5 to pyproject.toml and configured Nix build environment with Cyclone DDS libraries

Issues Found

  • Race conditions in DDSTransport lazy initialization where multiple threads can call start() simultaneously
  • Race condition in DDSPubSubBase.subscribe() where callbacks are registered after the listener is created, potentially missing early messages
  • Type annotation improvements throughout test files

Confidence Score: 3/5

  • This PR requires fixes for race conditions before merging safely
  • Score reflects critical race conditions in DDSTransport and DDSPubSubBase that need to be addressed. The implementation is well-structured and follows existing patterns, but the threading issues could cause unpredictable behavior in concurrent scenarios. The benchmark suite and ROS support are solid additions.
  • dimos/core/transport.py and dimos/protocol/pubsub/ddspubsub.py need thread-safety fixes for the race conditions identified

Important Files Changed

Filename Overview
dimos/protocol/service/ddsservice.py New DDS service with double-checked locking for singleton participant
dimos/protocol/pubsub/ddspubsub.py DDS pubsub implementation with potential race condition in listener initialization
dimos/core/transport.py Added DDSTransport with race condition in start/broadcast sequence
dimos/protocol/pubsub/rospubsub.py New ROS 2 pubsub implementation with proper threading and cleanup
dimos/protocol/pubsub/benchmark/test_benchmark.py Comprehensive benchmarking suite for DDS throughput testing

Sequence Diagram

sequenceDiagram
    participant App as Application
    participant DDSTransport as DDSTransport
    participant DDS as DDS
    participant DDSService as DDSService
    participant DP as DomainParticipant
    participant Writer as DataWriter
    participant Reader as DataReader
    participant Listener as Listener

    Note over App,Listener: Publishing Flow
    App->>DDSTransport: broadcast(msg)
    DDSTransport->>DDSTransport: check _started
    alt not started
        DDSTransport->>DDSTransport: start()
        DDSTransport->>DDS: start()
    end
    DDSTransport->>DDS: publish(topic, msg)
    DDS->>DDS: _get_writer(topic)
    alt writer not exists
        DDS->>DDSService: get_participant()
        DDSService->>DDSService: double-checked lock
        alt participant not exists
            DDSService->>DP: new(domain_id)
        end
        DDSService-->>DDS: participant
        DDS->>Writer: new(participant, topic)
        DDS->>DDS: cache writer
    end
    DDS->>Writer: write(msg)
    Writer->>DP: publish to network

    Note over App,Listener: Subscription Flow
    App->>DDSTransport: subscribe(callback)
    DDSTransport->>DDSTransport: check _started
    alt not started
        DDSTransport->>DDSTransport: start()
    end
    DDSTransport->>DDS: subscribe(topic, wrapped_cb)
    DDS->>DDS: register callback
    DDS->>DDS: _get_reader(topic)
    alt reader not exists
        DDS->>DDSService: get_participant()
        DDSService-->>DDS: participant
        DDS->>Listener: new(topic, callbacks_dict)
        DDS->>Reader: new(participant, topic, listener)
        DDS->>DDS: cache reader
    end
    DDS-->>DDSTransport: unsubscribe_fn
    DDSTransport-->>App: unsubscribe_fn

    Note over App,Listener: Message Receive Flow
    DP->>Reader: data available
    Reader->>Listener: on_data_available(reader)
    Listener->>Reader: take()
    Reader-->>Listener: samples
    loop for each sample
        loop for each callback
            Listener->>App: callback(sample, topic)
        end
    end
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

Comment on lines +116 to +122
def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]:
"""Subscribe to a DDS topic with a callback."""
with self._callback_lock:
if topic not in self._callbacks:
self._callbacks[topic] = []
self._callbacks[topic].append(callback)
self._get_reader(topic)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition: callbacks added after reader/listener creation

The listener references self._callbacks[topic] at line 59 when created (line 110), but callbacks are added at lines 119-121. If a message arrives between reader creation and callback addition, the listener will dispatch to an empty callback list, potentially missing early messages.

Suggested change
def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]:
"""Subscribe to a DDS topic with a callback."""
with self._callback_lock:
if topic not in self._callbacks:
self._callbacks[topic] = []
self._callbacks[topic].append(callback)
self._get_reader(topic)
def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]:
"""Subscribe to a DDS topic with a callback."""
with self._callback_lock:
if topic not in self._callbacks:
self._callbacks[topic] = []
self._callbacks[topic].append(callback)
# Create reader after callback is registered
if topic not in self._readers:
with self._reader_lock:
if topic not in self._readers:
dds_topic = DDSTopic(self.get_participant(), topic.name, topic.typename)
listener = _DDSMessageListener(topic, self._callbacks)
self._readers[topic] = DDSDataReader(
self.get_participant(), dds_topic, listener=listener
)
return lambda: self._unsubscribe_callback(topic, callback)

Comment on lines +232 to +235
def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
if not self._started:
self.start()
self.dds.publish(self.topic, msg)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition: _started check without synchronization

Multiple threads calling broadcast() or subscribe() simultaneously can both see _started=False and call start() multiple times. Need thread-safe initialization.

Suggested change
def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
if not self._started:
self.start()
self.dds.publish(self.topic, msg)
def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
if not self._started:
with self._start_lock:
if not self._started:
self.start()
self.dds.publish(self.topic, msg)

(Also add self._start_lock = threading.Lock() in __init__)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leshy should this also be applied to Transports affected by #1062

Comment on lines +237 to +240
def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override]
if not self._started:
self.start()
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition: same _started synchronization issue

Suggested change
def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override]
if not self._started:
self.start()
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value]
def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override]
if not self._started:
with self._start_lock:
if not self._started:
self.start()
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants