Skip to content

Conversation

@leshy
Copy link
Contributor

@leshy leshy commented Jan 26, 2026

Idk

judge me, not 100% sure

Intro

Point of this is to construct a generic bridging language, ability to define mappers from one protocol topic/msg to another.

This is easy - if you have a way to subscribe to all messages, this is what this PR lays the groundwork for

We need bridges mostly for spying/visualization - a rerun bridge atm

We want to do standard (and usually heaviest) OUT visualization related computation to be done outside of the modules themselves. Module OUTs can be tapped, then modules can rr.log individually only for specific specific fancy vis (projected pointcloud, planner algo rendering)

Rerun could be implemented as a funny (pub only) pubsub that can receive msgs that are automatically .to_rerun()-ed
.. or something like that

reorg

  • Move implementations to impl/ subdirectory (lcmpubsub, memory, shmpubsub, etc.)
  • Extract encoder mixins to encoders.py (PubSubEncoderMixin, PickleEncoderMixin, LCMEncoderMixin, JpegEncoderMixin) fix typing
  • Add grid tests for pattern matching subs test_pattern_sub.py
  • Add docs on how to grid test in general (useful for agents) docs/development/grid_testing.md

AllPubSub and DiscoveryPubSub

There are two ways in which a protocol can facilitate topic discovery, we implement base classes for both,

AllPubsub supports subscribing to all topics (Redis, LCM, MQTT)
DiscoveryPubSub supports discovering new topics (ROS)

These capabilities are orthogonal but they can implement one another and normalize the API

both implement subscribe_all function we care about.

Extending LCM

Topic class to support Regex or Glob patterns (as per the underlying proto)

TODO

other protocols need similar extensions (I will only do SHM),

we just want to get to something like subscribe_all (in potentially different ways) and once there we can bridge

once subscribe_all is reached, potentially post-reception (expensive) glob/regex implementations can also be implemented implicitly

Bridge

added preliminary bridge, mostly as a type spec to demonstrate the idea, shows how we could bridge different protocols once subscribe_all or glob/regex topics are implemented

Tests issue

Some tests are failing because some other test, lcm msgs are flying around while these tests are running. I'll investigate this, didn't want to sweep under rug, likely this is causing flakyness with other LCM tests also

- Move implementations to impl/ subdirectory (lcmpubsub, memory, shmpubsub, etc.)
- Extract encoder mixins to encoders.py (PubSubEncoderMixin, PickleEncoderMixin, LCMEncoderMixin, JpegEncoderMixin)
- Add AllPubSub and DiscoveryPubSub mixins with complementary default implementations
- Add GlobPubSub and RegexPubSub marker classes with docstring examples
- Update imports across codebase
- Add CLAUDE.md to .gitignore
@greptile-apps
Copy link

greptile-apps bot commented Jan 26, 2026

Greptile Overview

Greptile Summary

This PR implements pattern-based subscriptions and bridging infrastructure for pubsub systems. The changes enable subscribing to topics using glob/regex patterns and facilitate cross-protocol message translation.

Major Changes:

  • Reorganized pubsub implementations into impl/ subdirectory
  • Extracted encoder mixins (PickleEncoderMixin, LCMEncoderMixin, JpegEncoderMixin) to encoders.py
  • Added AllPubSub and DiscoveryPubSub base classes to support subscribe_all() functionality
  • Implemented Glob pattern matching and regex support for LCM topics
  • Added Topic.from_channel_str() for parsing channel strings with type information
  • Created bridge.py module with Translator protocol and Bridge service for cross-protocol bridging
  • Added comprehensive grid tests in test_pattern_sub.py
  • Extracted resolve_msg_type() helper to dimos.msgs.helpers

Issues Found:

  • Type annotation bug in bridge() function where callback parameter types are swapped
  • Bridge.start() doesn't use the subscribe_topic config parameter

Confidence Score: 4/5

  • This PR is mostly safe to merge with 2 bugs that need fixing in the bridge module
  • The core refactoring and pattern subscription implementation is solid with comprehensive tests. However, there are 2 logic bugs in the new bridge.py module: incorrect type annotations in the callback and unused config parameter. These are straightforward fixes but could cause runtime issues if the bridge functionality is used before being fixed.
  • Pay close attention to dimos/protocol/pubsub/bridge.py - contains type annotation bug and unused config parameter

Important Files Changed

Filename Overview
dimos/protocol/pubsub/bridge.py New bridge module with type annotation issue in bridge() function - callback receives arguments in wrong order
dimos/protocol/pubsub/spec.py Refactored to extract base mixin and add AllPubSub/DiscoveryPubSub classes for subscribe-all pattern support
dimos/protocol/pubsub/encoders.py Extracted encoder mixins to separate module with clean separation of concerns
dimos/protocol/pubsub/impl/lcmpubsub.py New LCM implementation with Glob/regex pattern support and subscribe_all functionality
dimos/protocol/pubsub/test_pattern_sub.py Comprehensive grid tests for pattern-based subscriptions including glob and regex
dimos/msgs/helpers.py New helper to resolve message type names to classes, with caching for performance

Sequence Diagram

sequenceDiagram
    participant Source as Source PubSub<br/>(AllPubSub)
    participant Bridge as Bridge Service
    participant Translator as Translator
    participant Dest as Destination PubSub

    Note over Bridge: start()
    Bridge->>Source: subscribe_all(callback) or<br/>subscribe(topic_from, callback)
    Source-->>Bridge: unsubscribe function

    Note over Source: Message published
    Source->>Bridge: callback(msg_from, topic_from)
    Bridge->>Translator: topic(topic_from)
    Translator-->>Bridge: topic_to
    Bridge->>Translator: msg(msg_from)
    Translator-->>Bridge: msg_to
    Bridge->>Dest: publish(topic_to, msg_to)

    Note over Bridge: stop()
    Bridge->>Source: unsubscribe()
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 files reviewed, 4 comments

Edit Code Review Agent Settings | Greptile

leshy added 9 commits January 26, 2026 15:05
- Add Glob class for glob-style pattern matching (*, **, ?)
- Topic now accepts str, re.Pattern, or Glob for flexible subscription patterns
- Pattern subscriptions return the actual matched channel in callback
- Remove duplicate LCMMsg and Topic from lcmservice.py (use DimosMsg)
- Add PubSubProtocol for structural typing
- Add tests for regex and glob pattern subscriptions
…subscriptions

- Add subscribe_all() method to LCMPubSubBase for subscribing to all topics
- Add Topic.from_channel_str() factory method to parse channel strings with embedded type info
- Channel format: /topic#module.ClassName enables automatic type extraction
- Add test for subscribe_all with typed message decoding
Move message type resolution logic to a dedicated helper module with
lru_cache for performance. Supports fallback to dimos_lcm module path.
@leshy leshy force-pushed the feature/pubsub-pattern-subscriptions branch from af9a79b to d077c67 Compare January 26, 2026 10:37
@leshy leshy changed the title WIP: Pubsub pattern subs Pubsub pattern subs Jan 26, 2026
@leshy leshy force-pushed the feature/pubsub-pattern-subscriptions branch from b86a7d0 to 3a0c089 Compare January 26, 2026 11:00
@leshy
Copy link
Contributor Author

leshy commented Jan 26, 2026

@greptileai can you review?

@dimensionalOS dimensionalOS deleted a comment from greptile-apps bot Jan 26, 2026
Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

leshy added 4 commits January 26, 2026 20:05
- Fix AllPubSub type parameter order (TopicFrom, MsgFrom not MsgFrom, TopicFrom)
- Fix pass_msg callback signature to match spec (MsgFrom, TopicFrom)
- Pass subscribe_topic config to bridge() function
- helpers.py: Add type: ignore for getattr Any return
- lcmpubsub.py: Add type: ignore for callback type variance and mixin incompatibility
- shmpubsub.py: Add type: ignore for mixin incompatibility
- transport.py: Add arg-type to existing type: ignore
@paul-nechifor
Copy link
Contributor

So this works, but I think there's a certain bit of complexity with mixins and generics which isn't needed.

Just some brainstorming ideas

The way I view it there are 4 things:

  1. topics
  2. brokers
  3. publishers
  4. subscribers

Topics should contain every information about what is subscribed/published to and be just static data (do nothing).

One of the things I don't like about the current system is how I'm doing this currently for setting transports:

            ("detections", Detection3DModule): LCMTransport(
                "/detector3d/detections", Detection2DArray
            ),

LCMTransport constains LCM() in it's __init__. So it starts an LCMService just by specifying it. This means unitree_go2_blueprints.py starts a lot of LCM servers unnecessarely.

I think we should have something like:

PubSubProtocol = Literal["LCM", "SHM", ...]

@dataclass
class Topic
    protocol: PubSubProtocol
    topic: str # maybe better name lol
    type: RootMsgClass

We could have a singleton broker gateway which passes data to all real brokers. So you can have:

topic = Topic("LCM", '/color_image', Image)
Gateway.publish(topic, Image(...))

It checks if LcmBroker is started, if it's not, it starts it.

(For cleanup, we can have Gateway.clean_all() which stops everything, but simply calling Gateway.subscribe/publish cleanly restarts the relevant broker.)

This way, modules don't have to store their own LCMServers, or any server. They just have to know what topics they publish and what messages they publish.

Instead of the encoder being a mixin, it could just be a param. So we can have

Gateway.register_broker("LCM", LcmBroker()) # uses default encoder
Gateway.register_broker("pLCM", LcmBroker(encoder=PicleEncoder()))
Gateway.register_broker("JpegLCM", LcmBroker(encoder=JpegEncoder()))

It would be nice if Gateway.subscribe(topic, callback) returned a Disposable.


In/Out could have subscribe/publish which just uses the gateway


class In:
    ...
    def publish(self, msg):
        Gateway.publish(self.topic, msg)

    @property
    def topic(self):
        # the topic str and protocol gets set by the blueprint

# so that we could have:

class MyModule(Module):
    color_image = In(Image) # Instantiation, not type

    def func(self):
        self.color_image.publish(Image(...))


But then we can implement bridges at the Gateway level.

Gateway.register_bridge(LcmToRosBridge())
Gateway.register_bridge(RerunListener())

Subscribe all could be just:

Gateway.subscribe(Topic("LCM", None, None))

def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(*args, **kwargs)
self._encode_callback_map: dict = {} # type: ignore[type-arg]
def on_msg(msg: MsgT, topic: TopicT) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need locks on modifying seen.

import traceback
def subscribe_all(self, callback: Callable[[MsgT, TopicT], Any]) -> Callable[[], None]:
"""Subscribe to all topics by subscribing to each discovered topic."""
subscriptions: list[Callable[[], None]] = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a lock

@leshy
Copy link
Contributor Author

leshy commented Jan 27, 2026

One of the things I don't like about the current system is how I'm doing this currently for setting transports:

            ("detections", Detection3DModule): LCMTransport(
                "/detector3d/detections", Detection2DArray
            ),

LCMTransport constains LCM() in it's init. So it starts an LCMService just by specifying it. This means unitree_go2_blueprints.py starts a lot of LCM servers unnecessarely.

Yeah agreed, the reason I auto-started LCM was that it was early in dev and very cheap. Generally transports should be a Resource and they do have start() and stop() methods and are supposed to be called, until then they should be dead data, the rest of the system (blueprints?) was built with the assumption of auto-start of a Transport. we should just turn off start on init for LCM, I think maybe easy for you?

I think we should have something like:

PubSubProtocol = Literal["LCM", "SHM", ...]

@dataclass
class Topic
    protocol: PubSubProtocol
    topic: str # maybe better name lol
    type: RootMsgClass

So my stuff is more abstract then this because I didn't want to make assumptions pubsub being used for transport or what is required to configure a topic for a specific pubsub protocol

I imagined

# off site detections, local 3d projections
Detection3dModule.2d_detections.transport = DimosServerTransport("dimos.com")
# video specific transport
VideoParser.color_image.transport = CameraStream("rtsp://....")
# sending odom to remote server
robot.odom.transport = HTTPPost("http://bla.com/receiver.php")

Topic is just an init arg, and the concept of a Topic is not tied to Transport, and Topics if used in pubsub are not generalized since they not 1-to-1 across protocols, for example ROS Topic includes ROS specific QOS, buffer settings, zenoh topic might contain a broker IP address or node discovery mechanism etc. I can imagine a websocket connection to a dimos server being a Transport in the future, this requires a web server url.

Right now transport spec should change to have more clear naming of methods

class Transport():
    send(msg: MsgT): 
       ...
    subscribe(callback):
      ...

and that's all, so people don't feel tied to Pubsub as base for this

and the whole Stream class can be thrown away / rewritten, it's very hacky and complex

mariage

That being said, I think we can marry the two ideas, nothing stops you for having

VideoParser.color_image.transport = mybroker.topic("/something")

OR (what I did before)

# mytransport in the background uses a process-central broker
VideoParser.color_image.transport = mytransport(topic("/something"))

brief thoughts, what do you think?

@paul-nechifor
Copy link
Contributor

TL;DR: I think we should be able to specify how something is meant to be transported without reifying the transport.


I view a topic as the equivalent of a connection string for a database. (Maybe I should use a different name, not topic.)

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

def make_session(conn_url: str):
    engine = create_engine(
        conn_url,
        pool_size=10,
        max_overflow=20,
    )
    Session = sessionmaker(bind=engine)
    return Session()

You can call make_session("postgresql+psycopg2://user:pass@host/db") or make_session("mysql+pymysql://user:pass@host/db").

make_session doesn't care if it's connecting to a Postgres DB or a MySQL DB. Everything is encoded in the connection string.

So it's kinda like saying:

Detection3dModule.2d_detections.transport = 'lcm://Detection2D@/2d_detections'

But instead of using a string I break up the URI into its constituent parts.

Detection3dModule.2d_detections.transport = Topic("LCM", "/2d_detections", Detection2D)

Topic is just an init arg, and the concept of a Topic is not tied to Transport, and Topics if used in pubsub are not generalized since they not 1-to-1 across protocols, for example ROS Topic includes ROS specific QOS, buffer settings, zenoh topic might contain a broker IP address or node discovery mechanism etc.

The same is true for connection strings. A MySQL connection string can include charset, which makes no sense in Postgres:

make_session('mysql+pymysql://appuser:secret@db.example.com:3306/appdb?charset=utf8mb4')

In Postgres you'd use client_encoding=UTF8 for UTF8.

For us, we could have:

video_parser.color_image.transport = Topic("JpegSHM", '/color_image', Image, options={"quality": 75})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants