From 56c8aa5061e0472cb365d2ffa2fecea9d2bcda6b Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 29 Jan 2026 14:15:16 -0400 Subject: [PATCH] Include original error in ConnectionShutdown messages When a connection is closed or becomes defunct, include the last_error in the ConnectionShutdown exception message. This helps investigate issues where the original cause (e.g., "Bad file descriptor") was previously lost when subsequent operations tried to use the connection. Refs #614 --- cassandra/connection.py | 15 +++++-- cassandra/io/asyncioreactor.py | 6 ++- cassandra/io/asyncorereactor.py | 8 ++-- cassandra/io/eventletreactor.py | 6 ++- cassandra/io/geventreactor.py | 6 ++- cassandra/io/libevreactor.py | 6 ++- cassandra/io/twistedreactor.py | 6 ++- tests/unit/test_connection.py | 70 ++++++++++++++++++++++++++++++++- 8 files changed, 106 insertions(+), 17 deletions(-) diff --git a/cassandra/connection.py b/cassandra/connection.py index 9ac02c9776..87f860f32b 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -1087,9 +1087,15 @@ def handle_pushed(self, response): def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=None): if self.is_defunct: - raise ConnectionShutdown("Connection to %s is defunct" % self.endpoint) + msg = "Connection to %s is defunct" % self.endpoint + if self.last_error: + msg += ": %s" % (self.last_error,) + raise ConnectionShutdown(msg) elif self.is_closed: - raise ConnectionShutdown("Connection to %s is closed" % self.endpoint) + msg = "Connection to %s is closed" % self.endpoint + if self.last_error: + msg += ": %s" % (self.last_error,) + raise ConnectionShutdown(msg) elif not self._socket_writable: raise ConnectionBusy("Connection %s is overloaded" % self.endpoint) @@ -1120,7 +1126,10 @@ def wait_for_responses(self, *msgs, **kwargs): failed, the corresponding Exception will be raised. """ if self.is_closed or self.is_defunct: - raise ConnectionShutdown("Connection %s is already closed" % (self, )) + msg = "Connection %s is already closed" % (self,) + if self.last_error: + msg += ": %s" % (self.last_error,) + raise ConnectionShutdown(msg) timeout = kwargs.get('timeout') fail_on_error = kwargs.get('fail_on_error', True) waiter = ResponseWaiter(self, len(msgs), fail_on_error) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 41b744602d..66e1d7295c 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -160,8 +160,10 @@ async def _close(self): log.debug("Closed socket to %s" % (self.endpoint,)) if not self.is_defunct: - self.error_all_requests( - ConnectionShutdown("Connection to %s was closed" % self.endpoint)) + msg = "Connection to %s was closed" % self.endpoint + if self.last_error: + msg += ": %s" % (self.last_error,) + self.error_all_requests(ConnectionShutdown(msg)) # don't leave in-progress operations hanging self.connected_event.set() diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 2c75e7139d..02466ad0d2 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -385,12 +385,14 @@ def close(self): log.debug("Closed socket to %s", self.endpoint) if not self.is_defunct: - self.error_all_requests( - ConnectionShutdown("Connection to %s was closed" % self.endpoint)) + msg = "Connection to %s was closed" % self.endpoint + if self.last_error: + msg += ": %s" % (self.last_error,) + self.error_all_requests(ConnectionShutdown(msg)) #This happens when the connection is shutdown while waiting for the ReadyMessage if not self.connected_event.is_set(): - self.last_error = ConnectionShutdown("Connection to %s was closed" % self.endpoint) + self.last_error = ConnectionShutdown(msg) # don't leave in-progress operations hanging self.connected_event.set() diff --git a/cassandra/io/eventletreactor.py b/cassandra/io/eventletreactor.py index 42874036d5..234a4a574c 100644 --- a/cassandra/io/eventletreactor.py +++ b/cassandra/io/eventletreactor.py @@ -145,8 +145,10 @@ def close(self): log.debug("Closed socket to %s" % (self.endpoint,)) if not self.is_defunct: - self.error_all_requests( - ConnectionShutdown("Connection to %s was closed" % self.endpoint)) + msg = "Connection to %s was closed" % self.endpoint + if self.last_error: + msg += ": %s" % (self.last_error,) + self.error_all_requests(ConnectionShutdown(msg)) # don't leave in-progress operations hanging self.connected_event.set() diff --git a/cassandra/io/geventreactor.py b/cassandra/io/geventreactor.py index 4f1f158aa7..7516fdd6df 100644 --- a/cassandra/io/geventreactor.py +++ b/cassandra/io/geventreactor.py @@ -95,8 +95,10 @@ def close(self): log.debug("Closed socket to %s" % (self.endpoint,)) if not self.is_defunct: - self.error_all_requests( - ConnectionShutdown("Connection to %s was closed" % self.endpoint)) + msg = "Connection to %s was closed" % self.endpoint + if self.last_error: + msg += ": %s" % (self.last_error,) + self.error_all_requests(ConnectionShutdown(msg)) # don't leave in-progress operations hanging self.connected_event.set() diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 58c876fdcc..d7b365e451 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -297,8 +297,10 @@ def close(self): # don't leave in-progress operations hanging if not self.is_defunct: - self.error_all_requests( - ConnectionShutdown("Connection to %s was closed" % self.endpoint)) + msg = "Connection to %s was closed" % self.endpoint + if self.last_error: + msg += ": %s" % (self.last_error,) + self.error_all_requests(ConnectionShutdown(msg)) self.connected_event.set() def handle_write(self, watcher, revents, errno=None): diff --git a/cassandra/io/twistedreactor.py b/cassandra/io/twistedreactor.py index e4605a7446..446200bf63 100644 --- a/cassandra/io/twistedreactor.py +++ b/cassandra/io/twistedreactor.py @@ -283,8 +283,10 @@ def close(self): log.debug("Closed socket to %s", self.endpoint) if not self.is_defunct: - self.error_all_requests( - ConnectionShutdown("Connection to %s was closed" % self.endpoint)) + msg = "Connection to %s was closed" % self.endpoint + if self.last_error: + msg += ": %s" % (self.last_error,) + self.error_all_requests(ConnectionShutdown(msg)) # don't leave in-progress operations hanging self.connected_event.set() diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index 97dbe39957..6ac63ff761 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -22,7 +22,7 @@ from cassandra.cluster import Cluster from cassandra.connection import (Connection, HEADER_DIRECTION_TO_CLIENT, ProtocolError, locally_supported_compressions, ConnectionHeartbeat, _Frame, Timer, TimerManager, - ConnectionException, DefaultEndPoint, ShardAwarePortGenerator) + ConnectionException, ConnectionShutdown, DefaultEndPoint, ShardAwarePortGenerator) from cassandra.marshal import uint8_pack, uint32_pack, int32_pack from cassandra.protocol import (write_stringmultimap, write_int, write_string, SupportedMessage, ProtocolHandler) @@ -260,6 +260,74 @@ def test_set_connection_class(self): cluster = Cluster(connection_class='test') assert 'test' == cluster.connection_class + def test_connection_shutdown_includes_last_error(self): + """ + Test that ConnectionShutdown exceptions include the last_error when available. + This helps debug issues like "Bad file descriptor" by showing the original cause. + See https://github.com/scylladb/python-driver/issues/614 + """ + c = self.make_connection() + c.lock = Lock() + c._requests = {} + + # Simulate the connection becoming defunct with a specific error + original_error = OSError(9, "Bad file descriptor") + c.is_defunct = True + c.last_error = original_error + + # send_msg should raise ConnectionShutdown that includes the last_error + with pytest.raises(ConnectionShutdown) as exc_info: + c.send_msg(Mock(), 1, Mock()) + + # Verify the error message includes the original error + error_message = str(exc_info.value) + assert "is defunct" in error_message + assert "Bad file descriptor" in error_message + + def test_connection_shutdown_closed_includes_last_error(self): + """ + Test that ConnectionShutdown exceptions for closed connections include last_error. + """ + c = self.make_connection() + c.lock = Lock() + c._requests = {} + + # Simulate the connection being closed with a specific error + original_error = OSError(9, "Bad file descriptor") + c.is_closed = True + c.last_error = original_error + + # send_msg should raise ConnectionShutdown that includes the last_error + with pytest.raises(ConnectionShutdown) as exc_info: + c.send_msg(Mock(), 1, Mock()) + + # Verify the error message includes the original error + error_message = str(exc_info.value) + assert "is closed" in error_message + assert "Bad file descriptor" in error_message + + def test_wait_for_responses_shutdown_includes_last_error(self): + """ + Test that wait_for_responses raises ConnectionShutdown with last_error. + """ + c = self.make_connection() + c.lock = Lock() + c._requests = {} + + # Simulate the connection being defunct with a specific error + original_error = OSError(9, "Bad file descriptor") + c.is_defunct = True + c.last_error = original_error + + # wait_for_responses should raise ConnectionShutdown that includes the last_error + with pytest.raises(ConnectionShutdown) as exc_info: + c.wait_for_responses(Mock()) + + # Verify the error message includes the original error + error_message = str(exc_info.value) + assert "already closed" in error_message + assert "Bad file descriptor" in error_message + @patch('cassandra.connection.ConnectionHeartbeat._raise_if_stopped') class ConnectionHeartbeatTest(unittest.TestCase):