Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions eval_protocol/dataset_logger/sqlite_evaluation_row_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from eval_protocol.event_bus.sqlite_event_bus_database import (
SQLITE_HARDENED_PRAGMAS,
check_and_repair_database,
connect_with_retry,
execute_with_sqlite_retry,
)
from eval_protocol.models import EvaluationRow
Expand Down Expand Up @@ -42,9 +43,10 @@ class EvaluationRow(BaseModel): # type: ignore

self._EvaluationRow = EvaluationRow

self._db.connect()
# Connect with retry logic that properly handles pragma execution failures
connect_with_retry(self._db)
# Use safe=True to avoid errors when tables/indexes already exist
self._db.create_tables([EvaluationRow], safe=True)
execute_with_sqlite_retry(lambda: self._db.create_tables([EvaluationRow], safe=True))

@property
def db_path(self) -> str:
Expand Down
52 changes: 48 additions & 4 deletions eval_protocol/event_bus/sqlite_event_bus_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@


# Retry configuration for database operations
SQLITE_RETRY_MAX_TRIES = 5
SQLITE_RETRY_MAX_TIME = 30 # seconds
SQLITE_RETRY_MAX_TRIES = 10
SQLITE_RETRY_MAX_TIME = 60 # seconds


def _is_database_locked_error(e: Exception) -> bool:
Expand Down Expand Up @@ -55,6 +55,49 @@ def _execute() -> T:
return _execute()


def connect_with_retry(db: SqliteDatabase) -> None:
"""
Connect to the database with retry logic, ensuring pragmas are always applied.

Peewee's connect() method sets the connection state *before* executing pragmas
(in _initialize_connection). If pragma execution fails with "database is locked",
the connection is marked as open but pragmas are not applied. Subsequent calls
to connect(reuse_if_open=True) would see the connection as already open and
skip pragma execution entirely.

This function handles this edge case by:
1. Closing the connection if a lock error occurs during connect
2. Retrying with exponential backoff until pragmas are successfully applied

Args:
db: The SqliteDatabase instance to connect
"""

@backoff.on_exception(
backoff.expo,
OperationalError,
max_tries=SQLITE_RETRY_MAX_TRIES,
max_time=SQLITE_RETRY_MAX_TIME,
giveup=lambda e: not _is_database_locked_error(e),
jitter=backoff.full_jitter,
)
def _connect() -> None:
try:
# Close any partially-open connection before retrying to ensure
# a fresh connection is opened and pragmas are executed
if not db.is_closed():
db.close()
db.connect()
except OperationalError:
# If connect fails (e.g., during pragma execution), ensure the
# connection is closed so the next retry starts fresh
if not db.is_closed():
db.close()
raise

_connect()


# SQLite pragmas for hardened concurrency safety
SQLITE_HARDENED_PRAGMAS = {
"journal_mode": "wal", # Write-Ahead Logging for concurrent reads/writes
Expand Down Expand Up @@ -181,9 +224,10 @@ class Event(BaseModel): # type: ignore
processed = BooleanField(default=False) # Track if event has been processed

self._Event = Event
self._db.connect()
# Connect with retry logic that properly handles pragma execution failures
connect_with_retry(self._db)
# Use safe=True to avoid errors when tables already exist
self._db.create_tables([Event], safe=True)
execute_with_sqlite_retry(lambda: self._db.create_tables([Event], safe=True))

def publish_event(self, event_type: str, data: Any, process_id: str) -> None:
"""Publish an event to the database."""
Expand Down
Loading