Skip to content

Conversation

@2chanhaeng
Copy link
Contributor

Summary

Introduce the SqliteMessageQueue class along with a testing framework for
message queues. Refactor existing tests to utilize the new testMessageQueue
utility for improved test structure and readability.

Related issue

Changes

  • Added SqliteMessageQueue class to @fedify/sqlite package implementing
    the MessageQueue interface using SQLite as the backing store
  • Added testMessageQueue() utility function to @fedify/testing package
    for standardized testing of MessageQueue implementations
  • Added waitFor() and getRandomKey() helper functions to @fedify/testing
  • Refactored message queue tests across multiple packages to use
    testMessageQueue:
    • @fedify/amqp
    • @fedify/denokv
    • @fedify/postgres
    • @fedify/redis
  • Added new test file for SqliteMessageQueue
  • Exported new classes and functions from their respective modules

Benefits

  • Provides a SQLite-based message queue implementation
  • Reduces code duplication across message queue test files by providing
    a reusable test harness
  • Ensures consistent test coverage across all MessageQueue implementations
  • Makes it easier to add new MessageQueue implementations in the future
    with standardized testing

Checklist

  • Did you add a changelog entry to the CHANGES.md?
  • Did you write some relevant docs about this change (if it's a new feature)?
  • Did you write a regression test to reproduce the bug (if it's a bug fix)?
  • Did you write some tests for this change (if it's a new feature)?
  • Did you run mise test on your machine?

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @2chanhaeng, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the message queuing capabilities by introducing a new SQLite-based message queue implementation. Concurrently, it establishes a robust and standardized testing framework for message queues, which has been applied to existing implementations. This dual approach not only expands the available message queue options but also improves the consistency, reliability, and maintainability of message queue tests across the project, making future integrations and testing more efficient.

Highlights

  • New SqliteMessageQueue Implementation: Introduced a new SqliteMessageQueue class within the @fedify/sqlite package, providing a SQLite-backed message queuing solution. This implementation supports enqueueing single and multiple messages, including delayed messages, and listening for incoming messages using a polling mechanism.
  • Standardized Message Queue Testing Framework: Developed a new testMessageQueue() utility function in the @fedify/testing package. This utility provides a standardized and reusable test harness for MessageQueue implementations, ensuring consistent test coverage and reducing boilerplate code.
  • Refactoring of Existing Message Queue Tests: Refactored the message queue tests for @fedify/amqp, @fedify/denokv, @fedify/postgres, and @fedify/redis to utilize the newly created testMessageQueue() utility. This streamlines the test files, improves readability, and ensures uniformity across different message queue implementations.
  • New Testing Helper Functions: Added waitFor() and getRandomKey() helper functions to the @fedify/testing package. These functions assist in asynchronous test assertions and generating unique keys for test resources, respectively.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new SqliteMessageQueue and a standardized testing utility, testMessageQueue, to unify message queue tests across the repository. The refactoring of existing tests to use this new utility is a great step towards reducing code duplication and ensuring consistent test coverage.

My review focuses on the new implementations. I've found a couple of critical issues in the SqliteMessageQueue related to race conditions and error handling that could lead to message loss and listener crashes. I've also identified areas for improvement in the new testMessageQueue utility to make it more robust and align with the MessageQueue interface. Additionally, I've suggested a minor improvement to the SQLite test to ensure proper cleanup of temporary database files.

@codecov
Copy link

codecov bot commented Jan 16, 2026

Codecov Report

❌ Patch coverage is 88.47584% with 31 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
packages/sqlite/src/mq.ts 85.38% 25 Missing ⚠️
packages/testing/src/mq-tester.ts 93.47% 6 Missing ⚠️
Files with missing lines Coverage Δ
packages/postgres/src/mq.ts 14.28% <ø> (-76.20%) ⬇️
packages/sqlite/src/mod.ts 100.00% <100.00%> (ø)
packages/testing/src/mod.ts 100.00% <100.00%> (ø)
packages/testing/src/mq-tester.ts 93.47% <93.47%> (ø)
packages/sqlite/src/mq.ts 85.38% <85.38%> (ø)

... and 3 files with indirect coverage changes

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@2chanhaeng 2chanhaeng marked this pull request as ready for review January 17, 2026 22:34
}

const dbUrl = process.env.DATABASE_URL;
const dbUrl = process.env.POSTGRES_URL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this renaming is intentional, could you explain why this is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other database URL variable names are matched with their respective database names.(e.g. REDIS_URL for redis) Therefore, to avoid confusion, I've also modified this one to match the naming convention.

@sij411
Copy link
Contributor

sij411 commented Jan 18, 2026

How about adding [Symbol.dispose]() for consistency with other MessageQueue implementations?

// Initialize if needed (e.g., create database tables)
if (options.initialize != null) {
await options.initialize(mq1);
}
Copy link
Contributor

@sij411 sij411 Jan 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some comments of options.initialize():

  • If we have to initialise the message queues, why don't we init both mq1 and mq2?
  • I think message queues such as SqliteMessageQueue and PostgresMessageQueue already initialise internally so I don't think it's necessary.
  • Last, I couldn't find any tests use this (options.initialize()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to implement initialize but then decided to remove it and forgot to do so. I'll fix it. Thanks!

this.#initialized = options.initialized ?? false;
this.#tableName = options.tableName ?? SqliteMessageQueue.#defaultTableName;
this.#pollIntervalMs = Temporal.Duration.from(
options.pollInterval ?? { milliseconds: 500 },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**

  • The poll interval for the message queue. 5 seconds by default.
    */
    pollInterval?: Temporal.Duration | Temporal.DurationLike;
    }

There is a mismatch between in the document(line 31) and the actual default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot the comment. Thanks!

const now = Temporal.Now.instant().epochMilliseconds;

// Atomically fetch and delete the oldest message that is ready to be
// processed using DELETE ... RETURNING (SQLite >= 3.35.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could put this in the document as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK!

"Failed to process message {id}: {error}",
{ id: result.id, error },
);
throw error;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation Handler Error Result
Redis Not caught Breaks poll(), outer loop retries
Postgres Not caught Breaks poll(), outer loop retries
Sqlite Caught + re-thrown Terminates entire listen()

I found SqliteMessageQueue is inconsistent in error handling. I think it should either:

  1. Not re-throw (like Redis/Postgres) - let outer loop continue
  2. Or document this "fail-fast" behavior explicitly

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's a good point! I'll fix it.

@2chanhaeng
Copy link
Contributor Author

How about adding [Symbol.dispose]() for consistency with other MessageQueue implementations?

That's a good idea, but it seems to go beyond #477. How about you create a separate issue for it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SqliteMessageQueue for single-node deployments

2 participants