From ecf62a1ad1478e02d8d41bcd39455566b1b95f28 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 14 Jan 2026 16:28:57 -0800 Subject: [PATCH 01/11] added pipeline creation logic for cursors --- google/cloud/firestore_v1/base_query.py | 132 ++++++++++++++++++++---- 1 file changed, 114 insertions(+), 18 deletions(-) diff --git a/google/cloud/firestore_v1/base_query.py b/google/cloud/firestore_v1/base_query.py index b1b74fcf1..30e1c8fb7 100644 --- a/google/cloud/firestore_v1/base_query.py +++ b/google/cloud/firestore_v1/base_query.py @@ -1134,12 +1134,8 @@ def _build_pipeline(self, source: "PipelineSource"): """ Convert this query into a Pipeline - Queries containing a `cursor` or `limit_to_last` are not currently supported - Args: source: the PipelineSource to build the pipeline off of - Raises: - - NotImplementedError: raised if the query contains a `cursor` or `limit_to_last` Returns: a Pipeline representing the query """ @@ -1162,9 +1158,10 @@ def _build_pipeline(self, source: "PipelineSource"): # Orders orders = self._normalize_orders() + + exists = [] + orderings = [] if orders: - exists = [] - orderings = [] for order in orders: field = pipeline_expressions.Field.of(order.field.field_path) exists.append(field.exists()) @@ -1178,23 +1175,59 @@ def _build_pipeline(self, source: "PipelineSource"): # Add exists filters to match Query's implicit orderby semantics. if len(exists) == 1: ppl = ppl.where(exists[0]) - else: + elif len(exists) > 1: ppl = ppl.where(pipeline_expressions.And(*exists)) - # Add sort orderings - ppl = ppl.sort(*orderings) + if orderings: + # Normalize cursors to get the raw values corresponding to the orders + start_at_val = None + if self._start_at: + start_at_val = self._normalize_cursor(self._start_at, orders) + + end_at_val = None + if self._end_at: + end_at_val = self._normalize_cursor(self._end_at, orders) + + # If limit_to_last is set, we need to reverse the orderings to find the + # "last" N documents (which effectively become the "first" N in reverse order). + if self._limit_to_last: + actual_orderings = _reverse_orderings(orderings) + ppl = ppl.sort(*actual_orderings) + + # Apply cursor conditions. + # Cursors are translated into filter conditions (e.g., field > value) + # based on the orderings. + if start_at_val: + ppl = ppl.where( + _where_conditions_from_cursor( + start_at_val, orderings, is_start_cursor=True + ) + ) - # Cursors, Limit and Offset - if self._start_at or self._end_at or self._limit_to_last: - raise NotImplementedError( - "Query to Pipeline conversion: cursors and limit_to_last is not supported yet." - ) - else: # Limit & Offset without cursors - if self._offset: - ppl = ppl.offset(self._offset) - if self._limit: + if end_at_val: + ppl = ppl.where( + _where_conditions_from_cursor( + end_at_val, orderings, is_start_cursor=False + ) + ) + + if not self._limit_to_last: + ppl = ppl.sort(*orderings) + + if self._limit is not None: ppl = ppl.limit(self._limit) + # If we reversed the orderings for limit_to_last, we must now re-sort + # using the original orderings to return the results in the user-requested order. + if self._limit_to_last: + ppl = ppl.sort(*orderings) + elif self._limit is not None and not self._limit_to_last: + ppl = ppl.limit(self._limit) + + # Offset + if self._offset: + ppl = ppl.offset(self._offset) + return ppl def _comparator(self, doc1, doc2) -> int: @@ -1366,6 +1399,69 @@ def _cursor_pb(cursor_pair: Optional[Tuple[list, bool]]) -> Optional[Cursor]: return None +def _where_conditions_from_cursor( + cursor: Tuple[List, bool], + orderings: List[pipeline_expressions.Ordering], + is_start_cursor: bool, +) -> pipeline_expressions.BooleanExpression: + """ + Converts a cursor into a filter condition for the pipeline. + + Args: + cursor: The cursor values and the 'before' flag. + orderings: The list of ordering expressions used in the query. + is_start_cursor: True if this is a start_at/start_after cursor, False if it is an end_at/end_before cursor. + Returns: + A BooleanExpression representing the cursor condition. + """ + cursor_values, before = cursor + size = len(cursor_values) + + if is_start_cursor: + filter_func = pipeline_expressions.Expression.greater_than + else: + filter_func = pipeline_expressions.Expression.less_than + + field = orderings[size - 1].expr + value = pipeline_expressions.Constant(cursor_values[size - 1]) + + # Add condition for last bound + condition = filter_func(field, value) + + if (is_start_cursor and before) or (not is_start_cursor and not before): + # When the cursor bound is inclusive, then the last bound + # can be equal to the value, otherwise it's not equal + condition = pipeline_expressions.Or(condition, field.equal(value)) + + # Iterate backwards over the remaining bounds, adding a condition for each one + for i in range(size - 2, -1, -1): + field = orderings[i].expr + value = pipeline_expressions.Constant(cursor_values[i]) + + # For each field in the orderings, the condition is either + # a) lessThan|greaterThan the cursor value, + # b) or equal the cursor value and lessThan|greaterThan the cursor values for other fields + condition = pipeline_expressions.Or( + filter_func(field, value), + pipeline_expressions.And(field.equal(value), condition), + ) + + return condition + + +def _reverse_orderings( + orderings: List[pipeline_expressions.Ordering], +) -> List[pipeline_expressions.Ordering]: + reversed_orderings = [] + for o in orderings: + if o.order_dir == pipeline_expressions.Ordering.Direction.ASCENDING: + new_dir = "descending" + else: + new_dir = "ascending" + reversed_orderings.append(pipeline_expressions.Ordering(o.expr, new_dir)) + return reversed_orderings + + def _query_response_to_snapshot( response_pb: RunQueryResponse, collection, expected_prefix: str ) -> Optional[document.DocumentSnapshot]: From c6e08081e398e38ce0a397d7372d8f3745e048ca Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 14 Jan 2026 16:30:10 -0800 Subject: [PATCH 02/11] removed existence check in BooleanExpression conversion --- google/cloud/firestore_v1/pipeline_expressions.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/google/cloud/firestore_v1/pipeline_expressions.py b/google/cloud/firestore_v1/pipeline_expressions.py index c0ff3923a..b01dc340d 100644 --- a/google/cloud/firestore_v1/pipeline_expressions.py +++ b/google/cloud/firestore_v1/pipeline_expressions.py @@ -1833,7 +1833,10 @@ def _from_query_filter_pb(filter_pb, client): elif filter_pb.op == Query_pb.FieldFilter.Operator.EQUAL: return And(field.exists(), field.equal(value)) elif filter_pb.op == Query_pb.FieldFilter.Operator.NOT_EQUAL: - return And(field.exists(), field.not_equal(value)) + # In Enterprise DBs NOT_EQUAL will match a field that does not exist, + # therefore we do not want an existence filter for the NOT_EQUAL conversion + # so the Query and Pipeline behavior are consistent in Enterprise. + return field.not_equal(value) if filter_pb.op == Query_pb.FieldFilter.Operator.ARRAY_CONTAINS: return And(field.exists(), field.array_contains(value)) elif filter_pb.op == Query_pb.FieldFilter.Operator.ARRAY_CONTAINS_ANY: @@ -1841,7 +1844,10 @@ def _from_query_filter_pb(filter_pb, client): elif filter_pb.op == Query_pb.FieldFilter.Operator.IN: return And(field.exists(), field.equal_any(value)) elif filter_pb.op == Query_pb.FieldFilter.Operator.NOT_IN: - return And(field.exists(), field.not_equal_any(value)) + # In Enterprise DBs NOT_IN will match a field that does not exist, + # therefore we do not want an existence filter for the NOT_IN conversion + # so the Query and Pipeline behavior are consistent in Enterprise. + return field.not_equal_any(value) else: raise TypeError(f"Unexpected FieldFilter operator type: {filter_pb.op}") elif isinstance(filter_pb, Query_pb.Filter): From 3909f71c339f592eef4c205fb7906cd83f4ad96e Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 14 Jan 2026 16:50:36 -0800 Subject: [PATCH 03/11] improved how existance checks are added --- google/cloud/firestore_v1/base_query.py | 32 ++++++++------ tests/unit/v1/test_base_query.py | 57 ++++++++++++++++++++----- 2 files changed, 67 insertions(+), 22 deletions(-) diff --git a/google/cloud/firestore_v1/base_query.py b/google/cloud/firestore_v1/base_query.py index 30e1c8fb7..cdd81daee 100644 --- a/google/cloud/firestore_v1/base_query.py +++ b/google/cloud/firestore_v1/base_query.py @@ -1157,14 +1157,28 @@ def _build_pipeline(self, source: "PipelineSource"): ppl = ppl.select(*[field.field_path for field in self._projection.fields]) # Orders - orders = self._normalize_orders() + # "explicit_orders" are only those explicitly added by the user via order_by(). + # We only generate existence filters for these fields. + explicit_orders = self._orders exists = [] + for order in explicit_orders: + field = pipeline_expressions.Field.of(order.field.field_path) + exists.append(field.exists()) + + # Add exists filters to match Query's implicit orderby semantics. + if len(exists) == 1: + ppl = ppl.where(exists[0]) + elif len(exists) > 1: + ppl = ppl.where(pipeline_expressions.And(*exists)) + + # "normalized_orders" includes both user-specified orders and implicit orders + # (e.g. __name__ or inequality fields) required by Firestore semantics. + normalized_orders = self._normalize_orders() orderings = [] - if orders: - for order in orders: + if normalized_orders: + for order in normalized_orders: field = pipeline_expressions.Field.of(order.field.field_path) - exists.append(field.exists()) direction = ( "ascending" if order.direction == StructuredQuery.Direction.ASCENDING @@ -1172,21 +1186,15 @@ def _build_pipeline(self, source: "PipelineSource"): ) orderings.append(pipeline_expressions.Ordering(field, direction)) - # Add exists filters to match Query's implicit orderby semantics. - if len(exists) == 1: - ppl = ppl.where(exists[0]) - elif len(exists) > 1: - ppl = ppl.where(pipeline_expressions.And(*exists)) - if orderings: # Normalize cursors to get the raw values corresponding to the orders start_at_val = None if self._start_at: - start_at_val = self._normalize_cursor(self._start_at, orders) + start_at_val = self._normalize_cursor(self._start_at, normalized_orders) end_at_val = None if self._end_at: - end_at_val = self._normalize_cursor(self._end_at, orders) + end_at_val = self._normalize_cursor(self._end_at, normalized_orders) # If limit_to_last is set, we need to reverse the orderings to find the # "last" N documents (which effectively become the "first" N in reverse order). diff --git a/tests/unit/v1/test_base_query.py b/tests/unit/v1/test_base_query.py index 4a4dac727..4015bb69e 100644 --- a/tests/unit/v1/test_base_query.py +++ b/tests/unit/v1/test_base_query.py @@ -2116,19 +2116,56 @@ def test__query_pipeline_order_sorts(): assert sort_stage.orders[1].order_dir == expr.Ordering.Direction.DESCENDING -def test__query_pipeline_unsupported(): +def test__query_pipeline_cursors(): + from google.cloud.firestore_v1 import pipeline_expressions as expr + client = make_client() - query_start = client.collection("my_col").start_at({"field_a": "value"}) - with pytest.raises(NotImplementedError, match="cursors"): - query_start._build_pipeline(client.pipeline()) + query_start = client.collection("my_col").order_by("field_a").start_at({"field_a": 10}) + pipeline = query_start._build_pipeline(client.pipeline()) + + # Stages: + # 0: Collection + # 1: Where (exists field_a) - Generated because field_a is explicitly ordered + # 2: Where (cursor condition) + # 3: Sort (field_a) + assert len(pipeline.stages) == 4 + + where_stage = pipeline.stages[2] + assert isinstance(where_stage, stages.Where) + # Expected: (field_a > 10) OR (field_a == 10) + assert isinstance(where_stage.condition, expr.Or) + params = where_stage.condition.params + assert len(params) == 2 + assert params[0].name == "greater_than" + assert params[1].name == "equal" + - query_end = client.collection("my_col").end_at({"field_a": "value"}) - with pytest.raises(NotImplementedError, match="cursors"): - query_end._build_pipeline(client.pipeline()) +def test__query_pipeline_limit_to_last(): + from google.cloud.firestore_v1 import pipeline_expressions as expr + + client = make_client() + query = client.collection("my_col").order_by("field_a").limit_to_last(5) + pipeline = query._build_pipeline(client.pipeline()) - query_limit_last = client.collection("my_col").limit_to_last(10) - with pytest.raises(NotImplementedError, match="limit_to_last"): - query_limit_last._build_pipeline(client.pipeline()) + # Stages: + # 0: Collection + # 1: Where (exists field_a) + # 2: Sort (field_a DESC) - Reversed + # 3: Limit (5) + # 4: Sort (field_a ASC) - Restored + assert len(pipeline.stages) == 5 + + sort_reversed = pipeline.stages[2] + assert isinstance(sort_reversed, stages.Sort) + assert sort_reversed.orders[0].order_dir == expr.Ordering.Direction.DESCENDING + + limit_stage = pipeline.stages[3] + assert isinstance(limit_stage, stages.Limit) + assert limit_stage.limit == 5 + + sort_restored = pipeline.stages[4] + assert isinstance(sort_restored, stages.Sort) + assert sort_restored.orders[0].order_dir == expr.Ordering.Direction.ASCENDING def test__query_pipeline_limit(): From 66e7bd406cd0ddca4222a1cb39bcec4328a2fe38 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 14 Jan 2026 16:55:11 -0800 Subject: [PATCH 04/11] updated docstrings --- google/cloud/firestore_v1/base_aggregation.py | 4 ---- google/cloud/firestore_v1/base_collection.py | 4 ---- google/cloud/firestore_v1/pipeline_source.py | 4 ---- 3 files changed, 12 deletions(-) diff --git a/google/cloud/firestore_v1/base_aggregation.py b/google/cloud/firestore_v1/base_aggregation.py index 6f392207e..521b4fb41 100644 --- a/google/cloud/firestore_v1/base_aggregation.py +++ b/google/cloud/firestore_v1/base_aggregation.py @@ -361,12 +361,8 @@ def _build_pipeline(self, source: "PipelineSource"): """ Convert this query into a Pipeline - Queries containing a `cursor` or `limit_to_last` are not currently supported - Args: source: the PipelineSource to build the pipeline off of - Raises: - - NotImplementedError: raised if the query contains a `cursor` or `limit_to_last` Returns: a Pipeline representing the query """ diff --git a/google/cloud/firestore_v1/base_collection.py b/google/cloud/firestore_v1/base_collection.py index 070e54cc4..25d07aec9 100644 --- a/google/cloud/firestore_v1/base_collection.py +++ b/google/cloud/firestore_v1/base_collection.py @@ -608,12 +608,8 @@ def _build_pipeline(self, source: "PipelineSource"): """ Convert this query into a Pipeline - Queries containing a `cursor` or `limit_to_last` are not currently supported - Args: source: the PipelineSource to build the pipeline off o - Raises: - - NotImplementedError: raised if the query contains a `cursor` or `limit_to_last` Returns: a Pipeline representing the query """ diff --git a/google/cloud/firestore_v1/pipeline_source.py b/google/cloud/firestore_v1/pipeline_source.py index 8f3c0a626..9aded4f75 100644 --- a/google/cloud/firestore_v1/pipeline_source.py +++ b/google/cloud/firestore_v1/pipeline_source.py @@ -57,12 +57,8 @@ def create_from( """ Create a pipeline from an existing query - Queries containing a `cursor` or `limit_to_last` are not currently supported - Args: query: the query to build the pipeline off of - Raises: - - NotImplementedError: raised if the query contains a `cursor` or `limit_to_last` Returns: a new pipeline instance representing the query """ From 8147f7219d0e50b59ba892c9c346cc323b4377e2 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 14 Jan 2026 17:44:17 -0800 Subject: [PATCH 05/11] adjust condition based on sort direction --- google/cloud/firestore_v1/base_query.py | 40 +++++++++++++++++------ tests/unit/v1/test_base_query.py | 42 +++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 9 deletions(-) diff --git a/google/cloud/firestore_v1/base_query.py b/google/cloud/firestore_v1/base_query.py index cdd81daee..59e11a644 100644 --- a/google/cloud/firestore_v1/base_query.py +++ b/google/cloud/firestore_v1/base_query.py @@ -1407,6 +1407,28 @@ def _cursor_pb(cursor_pair: Optional[Tuple[list, bool]]) -> Optional[Cursor]: return None +def _get_cursor_exclusive_condition( + is_start_cursor: bool, + ordering: pipeline_expressions.Ordering, + value: pipeline_expressions.Constant, +) -> pipeline_expressions.BooleanExpression: + """ + Helper to determine the correct comparison operator (greater_than or less_than) + based on the cursor type (start/end) and the sort direction (ascending/descending). + """ + field = ordering.expr + if ( + is_start_cursor + and ordering.order_dir == pipeline_expressions.Ordering.Direction.ASCENDING + ) or ( + not is_start_cursor + and ordering.order_dir == pipeline_expressions.Ordering.Direction.DESCENDING + ): + return field.greater_than(value) + else: + return field.less_than(value) + + def _where_conditions_from_cursor( cursor: Tuple[List, bool], orderings: List[pipeline_expressions.Ordering], @@ -1425,16 +1447,12 @@ def _where_conditions_from_cursor( cursor_values, before = cursor size = len(cursor_values) - if is_start_cursor: - filter_func = pipeline_expressions.Expression.greater_than - else: - filter_func = pipeline_expressions.Expression.less_than - - field = orderings[size - 1].expr + ordering = orderings[size - 1] + field = ordering.expr value = pipeline_expressions.Constant(cursor_values[size - 1]) # Add condition for last bound - condition = filter_func(field, value) + condition = _get_cursor_exclusive_condition(is_start_cursor, ordering, value) if (is_start_cursor and before) or (not is_start_cursor and not before): # When the cursor bound is inclusive, then the last bound @@ -1443,14 +1461,18 @@ def _where_conditions_from_cursor( # Iterate backwards over the remaining bounds, adding a condition for each one for i in range(size - 2, -1, -1): - field = orderings[i].expr + ordering = orderings[i] + field = ordering.expr value = pipeline_expressions.Constant(cursor_values[i]) # For each field in the orderings, the condition is either # a) lessThan|greaterThan the cursor value, # b) or equal the cursor value and lessThan|greaterThan the cursor values for other fields + exclusive_condition = _get_cursor_exclusive_condition( + is_start_cursor, ordering, value + ) condition = pipeline_expressions.Or( - filter_func(field, value), + exclusive_condition, pipeline_expressions.And(field.equal(value), condition), ) diff --git a/tests/unit/v1/test_base_query.py b/tests/unit/v1/test_base_query.py index 4015bb69e..9a7a5fc23 100644 --- a/tests/unit/v1/test_base_query.py +++ b/tests/unit/v1/test_base_query.py @@ -2335,3 +2335,45 @@ def _make_snapshot(docref, values): from google.cloud.firestore_v1 import document return document.DocumentSnapshot(docref, values, True, None, None, None) + +def test__where_conditions_from_cursor_descending(): + from google.cloud.firestore_v1.base_query import _where_conditions_from_cursor + from google.cloud.firestore_v1 import pipeline_expressions + + # Create ordering: field DESC + field_expr = pipeline_expressions.Field.of("field") + ordering = pipeline_expressions.Ordering(field_expr, "descending") + + # Case 1: StartAt (inclusive) -> <= 10 + cursor = ([10], True) + condition = _where_conditions_from_cursor(cursor, [ordering], is_start_cursor=True) + # Expected: field < 10 OR field == 10 + expected = pipeline_expressions.Or( + field_expr.less_than(10), + field_expr.equal(10) + ) + assert condition == expected + + # Case 2: StartAfter (exclusive) -> < 10 + cursor = ([10], False) + condition = _where_conditions_from_cursor(cursor, [ordering], is_start_cursor=True) + # Expected: field < 10 + expected = field_expr.less_than(10) + assert condition == expected + + # Case 3: EndAt (inclusive) -> >= 10 + cursor = ([10], False) + condition = _where_conditions_from_cursor(cursor, [ordering], is_start_cursor=False) + # Expected: field > 10 OR field == 10 + expected = pipeline_expressions.Or( + field_expr.greater_than(10), + field_expr.equal(10) + ) + assert condition == expected + + # Case 4: EndBefore (exclusive) -> > 10 + cursor = ([10], True) + condition = _where_conditions_from_cursor(cursor, [ordering], is_start_cursor=False) + # Expected: field > 10 + expected = field_expr.greater_than(10) + assert condition == expected From 84e344e50fee0f0eef72a397f820fb6946fe9d65 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 14 Jan 2026 17:56:44 -0800 Subject: [PATCH 06/11] fixed tests --- tests/unit/v1/test_base_query.py | 15 ++++++------ tests/unit/v1/test_pipeline_expressions.py | 27 +++++++++++++++++----- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/tests/unit/v1/test_base_query.py b/tests/unit/v1/test_base_query.py index 9a7a5fc23..691e2f66d 100644 --- a/tests/unit/v1/test_base_query.py +++ b/tests/unit/v1/test_base_query.py @@ -2120,7 +2120,9 @@ def test__query_pipeline_cursors(): from google.cloud.firestore_v1 import pipeline_expressions as expr client = make_client() - query_start = client.collection("my_col").order_by("field_a").start_at({"field_a": 10}) + query_start = ( + client.collection("my_col").order_by("field_a").start_at({"field_a": 10}) + ) pipeline = query_start._build_pipeline(client.pipeline()) # Stages: @@ -2129,7 +2131,7 @@ def test__query_pipeline_cursors(): # 2: Where (cursor condition) # 3: Sort (field_a) assert len(pipeline.stages) == 4 - + where_stage = pipeline.stages[2] assert isinstance(where_stage, stages.Where) # Expected: (field_a > 10) OR (field_a == 10) @@ -2336,6 +2338,7 @@ def _make_snapshot(docref, values): return document.DocumentSnapshot(docref, values, True, None, None, None) + def test__where_conditions_from_cursor_descending(): from google.cloud.firestore_v1.base_query import _where_conditions_from_cursor from google.cloud.firestore_v1 import pipeline_expressions @@ -2348,10 +2351,7 @@ def test__where_conditions_from_cursor_descending(): cursor = ([10], True) condition = _where_conditions_from_cursor(cursor, [ordering], is_start_cursor=True) # Expected: field < 10 OR field == 10 - expected = pipeline_expressions.Or( - field_expr.less_than(10), - field_expr.equal(10) - ) + expected = pipeline_expressions.Or(field_expr.less_than(10), field_expr.equal(10)) assert condition == expected # Case 2: StartAfter (exclusive) -> < 10 @@ -2366,8 +2366,7 @@ def test__where_conditions_from_cursor_descending(): condition = _where_conditions_from_cursor(cursor, [ordering], is_start_cursor=False) # Expected: field > 10 OR field == 10 expected = pipeline_expressions.Or( - field_expr.greater_than(10), - field_expr.equal(10) + field_expr.greater_than(10), field_expr.equal(10) ) assert condition == expected diff --git a/tests/unit/v1/test_pipeline_expressions.py b/tests/unit/v1/test_pipeline_expressions.py index e2c6dcd0f..258f0eedf 100644 --- a/tests/unit/v1/test_pipeline_expressions.py +++ b/tests/unit/v1/test_pipeline_expressions.py @@ -463,58 +463,72 @@ def test__from_query_filter_pb_unary_filter_unknown_op(self, mock_client): BooleanExpression._from_query_filter_pb(wrapped_filter_pb, mock_client) @pytest.mark.parametrize( - "op_enum, value, expected_expr_func", + "op_enum, value, expected_expr_func, expects_existance", [ ( query_pb.StructuredQuery.FieldFilter.Operator.LESS_THAN, 10, Expression.less_than, + True, ), ( query_pb.StructuredQuery.FieldFilter.Operator.LESS_THAN_OR_EQUAL, 10, Expression.less_than_or_equal, + True, ), ( query_pb.StructuredQuery.FieldFilter.Operator.GREATER_THAN, 10, Expression.greater_than, + True, ), ( query_pb.StructuredQuery.FieldFilter.Operator.GREATER_THAN_OR_EQUAL, 10, Expression.greater_than_or_equal, + True, + ), + ( + query_pb.StructuredQuery.FieldFilter.Operator.EQUAL, + 10, + Expression.equal, + True, ), - (query_pb.StructuredQuery.FieldFilter.Operator.EQUAL, 10, Expression.equal), ( query_pb.StructuredQuery.FieldFilter.Operator.NOT_EQUAL, 10, Expression.not_equal, + False, ), ( query_pb.StructuredQuery.FieldFilter.Operator.ARRAY_CONTAINS, 10, Expression.array_contains, + True, ), ( query_pb.StructuredQuery.FieldFilter.Operator.ARRAY_CONTAINS_ANY, [10, 20], Expression.array_contains_any, + True, ), ( query_pb.StructuredQuery.FieldFilter.Operator.IN, [10, 20], Expression.equal_any, + True, ), ( query_pb.StructuredQuery.FieldFilter.Operator.NOT_IN, [10, 20], Expression.not_equal_any, + False, ), ], ) def test__from_query_filter_pb_field_filter( - self, mock_client, op_enum, value, expected_expr_func + self, mock_client, op_enum, value, expected_expr_func, expects_existance ): """ test supported field filters @@ -536,10 +550,11 @@ def test__from_query_filter_pb_field_filter( [Constant(e) for e in value] if isinstance(value, list) else Constant(value) ) expected_condition = expected_expr_func(field_expr, value) - # should include existance checks - expected = expr.And(field_expr.exists(), expected_condition) + if expects_existance: + # some expressions include extra existance checks + expected_condition = expr.And(field_expr.exists(), expected_condition) - assert repr(result) == repr(expected) + assert repr(result) == repr(expected_condition) def test__from_query_filter_pb_field_filter_unknown_op(self, mock_client): """ From 538431c819bcb5f94dc8d283ba907cb50b6edd3f Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 14 Jan 2026 21:32:08 -0800 Subject: [PATCH 07/11] added tests --- tests/unit/v1/test_base_query.py | 134 +++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/tests/unit/v1/test_base_query.py b/tests/unit/v1/test_base_query.py index 691e2f66d..a653d07d7 100644 --- a/tests/unit/v1/test_base_query.py +++ b/tests/unit/v1/test_base_query.py @@ -2170,6 +2170,42 @@ def test__query_pipeline_limit_to_last(): assert sort_restored.orders[0].order_dir == expr.Ordering.Direction.ASCENDING +def test__query_pipeline_limit_to_last_descending(): + from google.cloud.firestore_v1 import pipeline_expressions as expr + from google.cloud.firestore_v1.base_query import BaseQuery + + client = make_client() + # User orders by field_a DESCENDING + query = ( + client.collection("my_col") + .order_by("field_a", direction=BaseQuery.DESCENDING) + .limit_to_last(5) + ) + pipeline = query._build_pipeline(client.pipeline()) + + # Stages: + # 0: Collection + # 1: Where (exists field_a) + # 2: Sort (field_a ASCENDING) - Reversed from DESCENDING + # 3: Limit (5) + # 4: Sort (field_a DESCENDING) - Restored to original + assert len(pipeline.stages) == 5 + + sort_reversed = pipeline.stages[2] + assert isinstance(sort_reversed, stages.Sort) + # Should be ASCENDING because original was DESCENDING + assert sort_reversed.orders[0].order_dir == expr.Ordering.Direction.ASCENDING + + limit_stage = pipeline.stages[3] + assert isinstance(limit_stage, stages.Limit) + assert limit_stage.limit == 5 + + sort_restored = pipeline.stages[4] + assert isinstance(sort_restored, stages.Sort) + # Should be DESCENDING (original) + assert sort_restored.orders[0].order_dir == expr.Ordering.Direction.DESCENDING + + def test__query_pipeline_limit(): client = make_client() query = client.collection("my_col").limit(15) @@ -2376,3 +2412,101 @@ def test__where_conditions_from_cursor_descending(): # Expected: field > 10 expected = field_expr.greater_than(10) assert condition == expected + + +def test__query_pipeline_end_at(): + from google.cloud.firestore_v1 import pipeline_expressions as expr + + client = make_client() + query_end = client.collection("my_col").order_by("field_a").end_at({"field_a": 10}) + pipeline = query_end._build_pipeline(client.pipeline()) + + # Stages: + # 0: Collection + # 1: Where (exists field_a) + # 2: Where (cursor condition) + # 3: Sort (field_a) + assert len(pipeline.stages) == 4 + + where_stage = pipeline.stages[2] + assert isinstance(where_stage, stages.Where) + # Expected: (field_a < 10) OR (field_a == 10) + assert isinstance(where_stage.condition, expr.Or) + params = where_stage.condition.params + assert len(params) == 2 + assert params[0].name == "less_than" + assert params[1].name == "equal" + + +def test__where_conditions_from_cursor_multi_field(): + from google.cloud.firestore_v1.base_query import _where_conditions_from_cursor + from google.cloud.firestore_v1 import pipeline_expressions as expr + + # Order by: A ASC, B DESC + field_a = expr.Field.of("A") + field_b = expr.Field.of("B") + ordering_a = expr.Ordering(field_a, "ascending") + ordering_b = expr.Ordering(field_b, "descending") + orderings = [ordering_a, ordering_b] + + # Cursor: A=1, B=2. StartAt (inclusive) + # Logic: (A > 1) OR (A == 1 AND (B < 2 OR B == 2)) + # Note: B is DESC, so start_at means <= 2 + cursor = ([1, 2], True) + + condition = _where_conditions_from_cursor(cursor, orderings, is_start_cursor=True) + + # Verify structure: Or(A > 1, And(A == 1, Or(B < 2, B == 2))) + assert isinstance(condition, expr.Or) + # First term: A > 1 + term1 = condition.params[0] + assert term1.name == "greater_than" + assert term1.params[0] == field_a + assert term1.params[1] == expr.Constant(1) + + # Second term: And(...) + term2 = condition.params[1] + assert isinstance(term2, expr.And) + + # Inside And: A == 1 + sub_term1 = term2.params[0] + assert sub_term1.name == "equal" + assert sub_term1.params[0] == field_a + assert sub_term1.params[1] == expr.Constant(1) + + # Inside And: Or(B < 2, B == 2) <-- DESCENDING logic check + sub_term2 = term2.params[1] + assert isinstance(sub_term2, expr.Or) + + # B < 2 (because DESC start_at) + sub_sub_term1 = sub_term2.params[0] + assert sub_sub_term1.name == "less_than" + assert sub_sub_term1.params[0] == field_b + assert sub_sub_term1.params[1] == expr.Constant(2) + + # B == 2 + sub_sub_term2 = sub_term2.params[1] + assert sub_sub_term2.name == "equal" + assert sub_sub_term2.params[0] == field_b + assert sub_sub_term2.params[1] == expr.Constant(2) + + +def test__reverse_orderings_descending(): + from google.cloud.firestore_v1.base_query import _reverse_orderings + from google.cloud.firestore_v1 import pipeline_expressions as expr + + # Input: A ASC, B DESC + field_a = expr.Field.of("A") + field_b = expr.Field.of("B") + ord_a = expr.Ordering(field_a, "ascending") + ord_b = expr.Ordering(field_b, "descending") + + reversed_ords = _reverse_orderings([ord_a, ord_b]) + + assert len(reversed_ords) == 2 + # Expect: A DESC, B ASC + assert reversed_ords[0].expr == field_a + assert reversed_ords[0].order_dir == expr.Ordering.Direction.DESCENDING + + assert reversed_ords[1].expr == field_b + assert reversed_ords[1].order_dir == expr.Ordering.Direction.ASCENDING From f73378ae70767ceb050cdc6142eb984f8fdc2fa3 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 15 Jan 2026 14:23:14 -0800 Subject: [PATCH 08/11] refactoring --- google/cloud/firestore_v1/base_query.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/google/cloud/firestore_v1/base_query.py b/google/cloud/firestore_v1/base_query.py index 59e11a644..e8558fa8b 100644 --- a/google/cloud/firestore_v1/base_query.py +++ b/google/cloud/firestore_v1/base_query.py @@ -1187,15 +1187,6 @@ def _build_pipeline(self, source: "PipelineSource"): orderings.append(pipeline_expressions.Ordering(field, direction)) if orderings: - # Normalize cursors to get the raw values corresponding to the orders - start_at_val = None - if self._start_at: - start_at_val = self._normalize_cursor(self._start_at, normalized_orders) - - end_at_val = None - if self._end_at: - end_at_val = self._normalize_cursor(self._end_at, normalized_orders) - # If limit_to_last is set, we need to reverse the orderings to find the # "last" N documents (which effectively become the "first" N in reverse order). if self._limit_to_last: @@ -1205,14 +1196,17 @@ def _build_pipeline(self, source: "PipelineSource"): # Apply cursor conditions. # Cursors are translated into filter conditions (e.g., field > value) # based on the orderings. - if start_at_val: + if self._start_at: + # Normalize cursors to get the raw values corresponding to the orders + start_at_val = self._normalize_cursor(self._start_at, normalized_orders) ppl = ppl.where( _where_conditions_from_cursor( start_at_val, orderings, is_start_cursor=True ) ) - if end_at_val: + if self._end_at: + end_at_val = self._normalize_cursor(self._end_at, normalized_orders) ppl = ppl.where( _where_conditions_from_cursor( end_at_val, orderings, is_start_cursor=False From d3fd55f239fcdbdb68b62f21b407f92f70da80a1 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 15 Jan 2026 14:56:15 -0800 Subject: [PATCH 09/11] refactored and cleaned up --- google/cloud/firestore_v1/base_query.py | 87 ++++++++++--------------- 1 file changed, 35 insertions(+), 52 deletions(-) diff --git a/google/cloud/firestore_v1/base_query.py b/google/cloud/firestore_v1/base_query.py index e8558fa8b..106a9b790 100644 --- a/google/cloud/firestore_v1/base_query.py +++ b/google/cloud/firestore_v1/base_query.py @@ -1160,71 +1160,54 @@ def _build_pipeline(self, source: "PipelineSource"): # "explicit_orders" are only those explicitly added by the user via order_by(). # We only generate existence filters for these fields. - explicit_orders = self._orders - exists = [] - for order in explicit_orders: - field = pipeline_expressions.Field.of(order.field.field_path) - exists.append(field.exists()) - - # Add exists filters to match Query's implicit orderby semantics. - if len(exists) == 1: - ppl = ppl.where(exists[0]) - elif len(exists) > 1: - ppl = ppl.where(pipeline_expressions.And(*exists)) + if self._orders: + exists = [ + pipeline_expressions.Field.of(o.field.field_path).exists() + for o in self._orders + ] + ppl = ppl.where( + pipeline_expressions.And(*exists) if len(exists) > 1 else exists[0] + ) # "normalized_orders" includes both user-specified orders and implicit orders # (e.g. __name__ or inequality fields) required by Firestore semantics. normalized_orders = self._normalize_orders() - orderings = [] - if normalized_orders: - for order in normalized_orders: - field = pipeline_expressions.Field.of(order.field.field_path) - direction = ( - "ascending" - if order.direction == StructuredQuery.Direction.ASCENDING - else "descending" - ) - orderings.append(pipeline_expressions.Ordering(field, direction)) + orderings = [ + pipeline_expressions.Ordering( + pipeline_expressions.Field.of(o.field.field_path), + "ascending" + if o.direction == StructuredQuery.Direction.ASCENDING + else "descending", + ) + for o in normalized_orders + ] + # Apply cursors as filters. if orderings: - # If limit_to_last is set, we need to reverse the orderings to find the - # "last" N documents (which effectively become the "first" N in reverse order). - if self._limit_to_last: - actual_orderings = _reverse_orderings(orderings) - ppl = ppl.sort(*actual_orderings) - - # Apply cursor conditions. - # Cursors are translated into filter conditions (e.g., field > value) - # based on the orderings. - if self._start_at: - # Normalize cursors to get the raw values corresponding to the orders - start_at_val = self._normalize_cursor(self._start_at, normalized_orders) - ppl = ppl.where( - _where_conditions_from_cursor( - start_at_val, orderings, is_start_cursor=True + for cursor_data, is_start in [(self._start_at, True), (self._end_at, False)]: + if cursor_data: + val = self._normalize_cursor(cursor_data, normalized_orders) + ppl = ppl.where( + _where_conditions_from_cursor(val, orderings, is_start) ) - ) - if self._end_at: - end_at_val = self._normalize_cursor(self._end_at, normalized_orders) - ppl = ppl.where( - _where_conditions_from_cursor( - end_at_val, orderings, is_start_cursor=False - ) - ) + # Handle sort and limit, including limit_to_last semantics. + is_limit_to_last = self._limit_to_last and bool(orderings) - if not self._limit_to_last: - ppl = ppl.sort(*orderings) + if is_limit_to_last: + # If limit_to_last is set, we need to reverse the orderings to find the + # "last" N documents (which effectively become the "first" N in reverse order). + ppl = ppl.sort(*_reverse_orderings(orderings)) + elif orderings: + ppl = ppl.sort(*orderings) - if self._limit is not None: - ppl = ppl.limit(self._limit) + if self._limit is not None and (not self._limit_to_last or orderings): + ppl = ppl.limit(self._limit) + if is_limit_to_last: # If we reversed the orderings for limit_to_last, we must now re-sort # using the original orderings to return the results in the user-requested order. - if self._limit_to_last: - ppl = ppl.sort(*orderings) - elif self._limit is not None and not self._limit_to_last: - ppl = ppl.limit(self._limit) + ppl = ppl.sort(*orderings) # Offset if self._offset: From 404ff9e1a657ba6b4beafd57181dd042a92697f3 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 15 Jan 2026 15:00:08 -0800 Subject: [PATCH 10/11] fixed lint --- google/cloud/firestore_v1/base_query.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/firestore_v1/base_query.py b/google/cloud/firestore_v1/base_query.py index 106a9b790..129f946fa 100644 --- a/google/cloud/firestore_v1/base_query.py +++ b/google/cloud/firestore_v1/base_query.py @@ -1184,9 +1184,9 @@ def _build_pipeline(self, source: "PipelineSource"): # Apply cursors as filters. if orderings: - for cursor_data, is_start in [(self._start_at, True), (self._end_at, False)]: - if cursor_data: - val = self._normalize_cursor(cursor_data, normalized_orders) + for cursor, is_start in [(self._start_at, True), (self._end_at, False)]: + if cursor: + val = self._normalize_cursor(cursor, normalized_orders) ppl = ppl.where( _where_conditions_from_cursor(val, orderings, is_start) ) From 7a02bf6acda0db48c9f127097ac76efaeeb106c8 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 15 Jan 2026 15:22:06 -0800 Subject: [PATCH 11/11] fixed mypy --- google/cloud/firestore_v1/base_query.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/firestore_v1/base_query.py b/google/cloud/firestore_v1/base_query.py index 129f946fa..54a1f1618 100644 --- a/google/cloud/firestore_v1/base_query.py +++ b/google/cloud/firestore_v1/base_query.py @@ -1185,10 +1185,10 @@ def _build_pipeline(self, source: "PipelineSource"): # Apply cursors as filters. if orderings: for cursor, is_start in [(self._start_at, True), (self._end_at, False)]: + cursor = self._normalize_cursor(cursor, normalized_orders) if cursor: - val = self._normalize_cursor(cursor, normalized_orders) ppl = ppl.where( - _where_conditions_from_cursor(val, orderings, is_start) + _where_conditions_from_cursor(cursor, orderings, is_start) ) # Handle sort and limit, including limit_to_last semantics.