Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
events_queue = queue.Queue()
storages = {
'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
'segments': InMemorySegmentStorage(),
'rule_based_segments': InMemoryRuleBasedSegmentStorage(),
'segments': InMemorySegmentStorage(events_queue),
'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue),
'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize'], telemetry_runtime_producer),
'events': InMemoryEventStorage(cfg['eventsQueueSize'], telemetry_runtime_producer),
}
Expand Down Expand Up @@ -1101,8 +1101,8 @@ def _build_localhost_factory(cfg):
events_queue = queue.Queue()
storages = {
'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
'segments': InMemorySegmentStorage(), # not used, just to avoid possible future errors.
'rule_based_segments': InMemoryRuleBasedSegmentStorage(),
'segments': InMemorySegmentStorage(events_queue), # not used, just to avoid possible future errors.
'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue),
'impressions': LocalhostImpressionsStorage(),
'events': LocalhostEventsStorage(),
}
Expand Down
24 changes: 20 additions & 4 deletions splitio/storage/inmemmory.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,12 @@ def remove_flag_set(self, flag_sets, feature_flag_name, should_filter):
class InMemoryRuleBasedSegmentStorage(RuleBasedSegmentsStorage):
"""InMemory implementation of a feature flag storage base."""

def __init__(self):
def __init__(self, internal_event_queue):
"""Constructor."""
self._lock = threading.RLock()
self._rule_based_segments = {}
self._change_number = -1
self._internal_event_queue = internal_event_queue

def clear(self):
"""
Expand Down Expand Up @@ -153,6 +154,10 @@ def update(self, to_add, to_delete, new_change_number):
[self._put(add_segment) for add_segment in to_add]
[self._remove(delete_segment) for delete_segment in to_delete]
self._set_change_number(new_change_number)
self._internal_event_queue.put(
SdkInternalEventNotification(
SdkInternalEvent.RB_SEGMENTS_UPDATED,
EventsMetadata(SdkEventType.SEGMENT_UPDATE, {})))

def _put(self, rule_based_segment):
"""
Expand Down Expand Up @@ -934,11 +939,12 @@ async def is_flag_set_exist(self, flag_set):
class InMemorySegmentStorage(SegmentStorage):
"""In-memory implementation of a segment storage."""

def __init__(self):
def __init__(self, internal_event_queue):
"""Constructor."""
self._segments = {}
self._change_numbers = {}
self._lock = threading.RLock()
self._internal_event_queue = internal_event_queue

def get(self, segment_name):
"""
Expand Down Expand Up @@ -968,9 +974,14 @@ def put(self, segment):
with self._lock:
self._segments[segment.name] = segment

self._internal_event_queue.put(
SdkInternalEventNotification(
SdkInternalEvent.SEGMENTS_UPDATED,
EventsMetadata(SdkEventType.SEGMENT_UPDATE, {})))

def update(self, segment_name, to_add, to_remove, change_number=None):
"""
Update a feature flag. Create it if it doesn't exist.
Update a segment. Create it if it doesn't exist.

:param segment_name: Name of the segment to update.
:type segment_name: str
Expand All @@ -988,6 +999,11 @@ def update(self, segment_name, to_add, to_remove, change_number=None):
if change_number is not None:
self._segments[segment_name].change_number = change_number

self._internal_event_queue.put(
SdkInternalEventNotification(
SdkInternalEvent.SEGMENTS_UPDATED,
EventsMetadata(SdkEventType.SEGMENT_UPDATE, {})))

def get_change_number(self, segment_name):
"""
Retrieve latest change number for a segment.
Expand Down Expand Up @@ -1100,7 +1116,7 @@ async def put(self, segment):

async def update(self, segment_name, to_add, to_remove, change_number=None):
"""
Update a feature flag. Create it if it doesn't exist.
Update a segment. Create it if it doesn't exist.

:param segment_name: Name of the segment to update.
:type segment_name: str
Expand Down
62 changes: 31 additions & 31 deletions tests/client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def test_get_treatment(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
event_storage = mocker.Mock(spec=EventStorage)
Expand Down Expand Up @@ -117,8 +117,8 @@ def test_get_treatment_with_config(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
Expand Down Expand Up @@ -195,8 +195,8 @@ def test_get_treatments(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
Expand Down Expand Up @@ -276,8 +276,8 @@ def test_get_treatments_by_flag_set(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
Expand Down Expand Up @@ -356,8 +356,8 @@ def test_get_treatments_by_flag_sets(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
Expand Down Expand Up @@ -436,8 +436,8 @@ def test_get_treatments_with_config(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
Expand Down Expand Up @@ -520,8 +520,8 @@ def test_get_treatments_with_config_by_flag_set(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
Expand Down Expand Up @@ -601,8 +601,8 @@ def test_get_treatments_with_config_by_flag_sets(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
Expand Down Expand Up @@ -682,8 +682,8 @@ def test_impression_toggle_optimized(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
event_storage = mocker.Mock(spec=EventStorage)
Expand Down Expand Up @@ -747,8 +747,8 @@ def test_impression_toggle_debug(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
event_storage = mocker.Mock(spec=EventStorage)
Expand Down Expand Up @@ -812,8 +812,8 @@ def test_impression_toggle_none(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
event_storage = mocker.Mock(spec=EventStorage)
Expand Down Expand Up @@ -953,8 +953,8 @@ def test_evaluations_before_running_post_fork(self, mocker):
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
destroyed_property = mocker.PropertyMock()
destroyed_property.return_value = False
Expand Down Expand Up @@ -1035,8 +1035,8 @@ def test_telemetry_not_ready(self, mocker):
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
recorder = StandardRecorder(impmanager, mocker.Mock(), mocker.Mock(), telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
factory = SplitFactory('localhost',
Expand Down Expand Up @@ -1071,7 +1071,7 @@ def test_telemetry_record_treatment_exception(self, mocker):
split_storage = InMemorySplitStorage(events_queue)
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
segment_storage = mocker.Mock(spec=SegmentStorage)
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
impression_storage = mocker.Mock(spec=ImpressionStorage)
event_storage = mocker.Mock(spec=EventStorage)
destroyed_property = mocker.PropertyMock()
Expand Down Expand Up @@ -1175,8 +1175,8 @@ def test_telemetry_method_latency(self, mocker):
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
destroyed_property = mocker.PropertyMock()
Expand Down Expand Up @@ -1288,8 +1288,8 @@ def test_impressions_properties(self, mocker):
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage()
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
event_storage = mocker.Mock(spec=EventStorage)
Expand Down
20 changes: 10 additions & 10 deletions tests/engine/test_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ def test_evaluate_treatment_with_rbs_in_condition(self):
e = evaluator.Evaluator(splitters.Splitter())
events_queue = queue.Queue()
splits_storage = InMemorySplitStorage(events_queue)
rbs_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage()
rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue)
segment_storage = InMemorySegmentStorage(events_queue)
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)

rbs_segments = os.path.join(os.path.dirname(__file__), 'files', 'rule_base_segments.json')
Expand All @@ -291,8 +291,8 @@ def test_using_segment_in_excluded(self):
e = evaluator.Evaluator(splitters.Splitter())
events_queue = queue.Queue()
splits_storage = InMemorySplitStorage(events_queue)
rbs_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage()
rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue)
segment_storage = InMemorySegmentStorage(events_queue)
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)

mocked_split = Split('some', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, [])
Expand All @@ -316,8 +316,8 @@ def test_using_rbs_in_excluded(self):
e = evaluator.Evaluator(splitters.Splitter())
events_queue = queue.Queue()
splits_storage = InMemorySplitStorage(events_queue)
rbs_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage()
rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue)
segment_storage = InMemorySegmentStorage(events_queue)
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)

mocked_split = Split('some', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, [])
Expand All @@ -340,8 +340,8 @@ def test_prerequisites(self):
e = evaluator.Evaluator(splitters.Splitter())
events_queue = queue.Queue()
splits_storage = InMemorySplitStorage(events_queue)
rbs_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage()
rbs_storage = InMemoryRuleBasedSegmentStorage(events_queue)
segment_storage = InMemorySegmentStorage(events_queue)
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)

rbs = rule_based_segments.from_raw(data["rbs"]["d"][0])
Expand Down Expand Up @@ -549,8 +549,8 @@ def test_get_context(self):
split2 = Split('split2', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, [])
events_queue = queue.Queue()
flag_storage = InMemorySplitStorage(events_queue, [])
segment_storage = InMemorySegmentStorage()
rbs_segment_storage = InMemoryRuleBasedSegmentStorage()
segment_storage = InMemorySegmentStorage(events_queue)
rbs_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
flag_storage.update([mocked_split, split2], [], -1)
rbs = copy.deepcopy(rbs_raw)
rbs['conditions'].append(
Expand Down
Loading