From 4610e6f0e53bd8c2d75f603c0ac2fac74b5bc29f Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Thu, 15 Jan 2026 16:29:17 -0500 Subject: [PATCH 1/2] chore: add DataSourceUpdatesSinkV2 support to DataSourceUpdatesImpl --- .../sdk/server/DataModelDependencies.java | 19 + .../sdk/server/DataSourceUpdatesImpl.java | 179 +++++- .../com/launchdarkly/sdk/server/Version.java | 2 + .../subsystems/DataSourceUpdateSinkV2.java | 55 ++ .../TransactionalDataSourceUpdateSink.java | 32 + .../sdk/server/DataModelDependenciesTest.java | 225 +++++++ .../sdk/server/DataSourceUpdatesImplTest.java | 598 +++++++++++++++++- .../sdk/server/TestComponents.java | 12 +- .../sdk/server/integrations/TestDataTest.java | 11 +- 9 files changed, 1129 insertions(+), 4 deletions(-) create mode 100644 lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSourceUpdateSinkV2.java create mode 100644 lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/TransactionalDataSourceUpdateSink.java diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataModelDependencies.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataModelDependencies.java index 46cb04e..10453d7 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataModelDependencies.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataModelDependencies.java @@ -6,6 +6,8 @@ import com.google.common.collect.Iterables; import com.launchdarkly.sdk.LDValue; import com.launchdarkly.sdk.server.DataModel.Operator; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; @@ -132,6 +134,23 @@ public static FullDataSet sortAllCollections(FullDataSet(builder.build().entrySet()); } + /** + * Sort the data in the changeset in dependency order. If there are any duplicates, then the highest version + * of the duplicate item will be retained. + * + * @param inSet the changeset to sort + * @return a sorted copy of the changeset + */ + public static ChangeSet sortChangeset(ChangeSet inSet) { + ImmutableSortedMap.Builder> builder = + ImmutableSortedMap.orderedBy(dataKindPriorityOrder); + for (Map.Entry> entry: inSet.getData()) { + DataKind kind = entry.getKey(); + builder.put(kind, sortCollection(kind, entry.getValue())); + } + return new ChangeSet<>(inSet.getType(), inSet.getSelector(), builder.build().entrySet(), inSet.getEnvironmentId()); + } + private static KeyedItems sortCollection(DataKind kind, KeyedItems input) { if (!isDependencyOrdered(kind) || isEmpty(input.getItems())) { return input; diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceUpdatesImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceUpdatesImpl.java index 76aff81..1b87534 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceUpdatesImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceUpdatesImpl.java @@ -12,11 +12,14 @@ import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.Status; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.StatusListener; import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink; +import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2; import com.launchdarkly.sdk.server.subsystems.DataStore; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; +import com.launchdarkly.sdk.server.subsystems.TransactionalDataStore; import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider; import com.launchdarkly.sdk.server.interfaces.FlagChangeEvent; import com.launchdarkly.sdk.server.interfaces.FlagChangeListener; @@ -48,7 +51,7 @@ * * @since 4.11.0 */ -final class DataSourceUpdatesImpl implements DataSourceUpdateSink { +final class DataSourceUpdatesImpl implements DataSourceUpdateSink, DataSourceUpdateSinkV2 { private final DataStore store; private final EventBroadcasterImpl flagChangeEventNotifier; private final EventBroadcasterImpl dataSourceStatusNotifier; @@ -365,4 +368,178 @@ private void onTimeout() { private static String describeErrorCount(Map.Entry entry) { return entry.getKey() + " (" + entry.getValue() + (entry.getValue() == 1 ? " time" : " times") + ")"; } + + // ===== ITransactionalDataSourceUpdates methods ===== + + @Override + public boolean apply(ChangeSet changeSet) { + if (store instanceof TransactionalDataStore) { + return applyToTransactionalStore((TransactionalDataStore) store, changeSet); + } + + // Legacy update path for non-transactional stores + return applyToLegacyStore(changeSet); + } + + private boolean applyToTransactionalStore(TransactionalDataStore transactionalDataStore, + ChangeSet changeSet) { + Map> oldData; + // Getting the old values requires accessing the store, which can fail. + // If there is a failure to read the store, then we stop treating it as a failure. + try { + oldData = getOldDataIfFlagChangeListeners(); + } catch (RuntimeException e) { + reportStoreFailure(e); + return false; + } + + ChangeSet sortedChangeSet = DataModelDependencies.sortChangeset(changeSet); + + try { + transactionalDataStore.apply(sortedChangeSet); + lastStoreUpdateFailed = false; + } catch (RuntimeException e) { + reportStoreFailure(e); + return false; + } + + // Calling Apply implies that the data source is now in a valid state. + updateStatus(State.VALID, null); + + Set changes = updateDependencyTrackerForChangesetAndDetermineChanges(oldData, sortedChangeSet); + + // Now, if we previously queried the old data because someone is listening for flag change events, compare + // the versions of all items and generate events for those (and any other items that depend on them) + if (changes != null) { + sendChangeEvents(changes); + } + + return true; + } + + private boolean applyToLegacyStore(ChangeSet sortedChangeSet) { + switch (sortedChangeSet.getType()) { + case Full: + return applyFullChangeSetToLegacyStore(sortedChangeSet); + case Partial: + return applyPartialChangeSetToLegacyStore(sortedChangeSet); + case None: + default: + return true; + } + } + + private boolean applyFullChangeSetToLegacyStore(ChangeSet unsortedChangeset) { + // Convert ChangeSet to FullDataSet for legacy init path + return init(new FullDataSet<>(unsortedChangeset.getData())); + } + + private boolean applyPartialChangeSetToLegacyStore(ChangeSet changeSet) { + // Sorting isn't strictly required here, as upsert behavior didn't traditionally have it, + // but it also doesn't hurt, and there could be cases where it results in slightly + // greater store consistency for persistent stores. + ChangeSet sortedChangeset = DataModelDependencies.sortChangeset(changeSet); + + for (Map.Entry> kindItemsPair: sortedChangeset.getData()) { + for (Map.Entry item: kindItemsPair.getValue().getItems()) { + boolean applySuccess = upsert(kindItemsPair.getKey(), item.getKey(), item.getValue()); + if (!applySuccess) { + return false; + } + } + } + // The upsert will update the store status in the case of a store failure. + // The application of the upserts does not set the store initialized. + + // Considering the store will be the same for the duration of the application + // lifecycle we will not be applying a partial update to a store that didn't + // already get a full update. The non-transactional store will also not support a selector. + + return true; + } + + private Map> getOldDataIfFlagChangeListeners() { + if (hasFlagChangeEventListeners()) { + // Query the existing data if any, so that after the update we can send events for + // whatever was changed + Map> oldData = new HashMap<>(); + for (DataKind kind: ALL_DATA_KINDS) { + KeyedItems items = store.getAll(kind); + oldData.put(kind, ImmutableMap.copyOf(items.getItems())); + } + return oldData; + } else { + return null; + } + } + + private Map> changeSetToMap(ChangeSet changeSet) { + Map> ret = new HashMap<>(); + for (Map.Entry> e: changeSet.getData()) { + ret.put(e.getKey(), ImmutableMap.copyOf(e.getValue().getItems())); + } + return ret; + } + + private Set updateDependencyTrackerForChangesetAndDetermineChanges( + Map> oldDataMap, + ChangeSet changeSet) { + switch (changeSet.getType()) { + case Full: + return handleFullChangeset(oldDataMap, changeSet); + case Partial: + return handlePartialChangeset(oldDataMap, changeSet); + case None: + return null; + default: + return null; + } + } + + private Set handleFullChangeset( + Map> oldDataMap, + ChangeSet changeSet) { + dependencyTracker.reset(); + for (Map.Entry> kindEntry: changeSet.getData()) { + DataKind kind = kindEntry.getKey(); + for (Map.Entry itemEntry: kindEntry.getValue().getItems()) { + String key = itemEntry.getKey(); + dependencyTracker.updateDependenciesFrom(kind, key, itemEntry.getValue()); + } + } + + if (oldDataMap == null) { + return null; + } + + Map> newDataMap = changeSetToMap(changeSet); + return computeChangedItemsForFullDataSet(oldDataMap, newDataMap); + } + + private Set handlePartialChangeset( + Map> oldDataMap, + ChangeSet changeSet) { + if (oldDataMap == null) { + // Update dependencies but don't track changes when no listeners + for (Map.Entry> kindEntry: changeSet.getData()) { + DataKind kind = kindEntry.getKey(); + for (Map.Entry itemEntry: kindEntry.getValue().getItems()) { + dependencyTracker.updateDependenciesFrom(kind, itemEntry.getKey(), itemEntry.getValue()); + } + } + return null; + } + + Set affectedItems = new HashSet<>(); + for (Map.Entry> kindEntry: changeSet.getData()) { + DataKind kind = kindEntry.getKey(); + for (Map.Entry itemEntry: kindEntry.getValue().getItems()) { + String key = itemEntry.getKey(); + dependencyTracker.updateDependenciesFrom(kind, key, itemEntry.getValue()); + dependencyTracker.addAffectedItems(affectedItems, new KindAndKey(kind, key)); + } + } + + return affectedItems; + } } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Version.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Version.java index c92affa..85a5238 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Version.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Version.java @@ -4,5 +4,7 @@ abstract class Version { private Version() {} // This constant is updated automatically by our Gradle script during a release, if the project version has changed + // x-release-please-start-version static final String SDK_VERSION = "7.10.2"; + // x-release-please-end } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSourceUpdateSinkV2.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSourceUpdateSinkV2.java new file mode 100644 index 0000000..f225631 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSourceUpdateSinkV2.java @@ -0,0 +1,55 @@ +package com.launchdarkly.sdk.server.subsystems; + +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorInfo; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State; +import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider; + +/** + * Interfaces required by data source updates implementations in FDv2. + *

+ * This interface extends {@link TransactionalDataSourceUpdateSink} to add status tracking + * and status update capabilities required for FDv2 data sources. + *

+ * This interface is not stable, and not subject to any backwards compatibility guarantees or semantic versioning. + * It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode + * + * @since 5.0.0 + * @see TransactionalDataSourceUpdateSink + * @see DataSource + */ +public interface DataSourceUpdateSinkV2 extends TransactionalDataSourceUpdateSink { + /** + * An object that provides status tracking for the data store, if applicable. + *

+ * This may be useful if the data source needs to be aware of storage problems that might require it + * to take some special action: for instance, if a database outage may have caused some data to be + * lost and therefore the data should be re-requested from LaunchDarkly. + * + * @return a {@link DataStoreStatusProvider} + */ + DataStoreStatusProvider getDataStoreStatusProvider(); + + /** + * Informs the SDK of a change in the data source's status. + *

+ * Data source implementations should use this method if they have any concept of being in a valid + * state, a temporarily disconnected state, or a permanently stopped state. + *

+ * If {@code newState} is different from the previous state, and/or {@code newError} is non-null, the + * SDK will start returning the new status (adding a timestamp for the change) from + * {@link DataSourceStatusProvider#getStatus()}, and will trigger status change events to any + * registered listeners. + *

+ * A special case is that if {@code newState} is {@link State#INTERRUPTED}, + * but the previous state was {@link State#INITIALIZING}, the state will + * remain at {@link State#INITIALIZING} because {@link State#INTERRUPTED} + * is only meaningful after a successful startup. + * + * @param newState the data source state + * @param newError information about a new error, if any + * @see DataSourceStatusProvider + */ + void updateStatus(State newState, ErrorInfo newError); +} + diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/TransactionalDataSourceUpdateSink.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/TransactionalDataSourceUpdateSink.java new file mode 100644 index 0000000..a4e9b8d --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/TransactionalDataSourceUpdateSink.java @@ -0,0 +1,32 @@ +package com.launchdarkly.sdk.server.subsystems; + +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; + +/** + * Interface that an implementation of {@link DataSource} will use to push data into the SDK transactionally. + *

+ * The data source interacts with this object, rather than manipulating the data store directly, so + * that the SDK can perform any other necessary operations that must happen when data is updated. This + * object also provides a mechanism to report status changes. + *

+ * Component factories for {@link DataSource} implementations will receive an implementation of this + * interface in the {@link ClientContext#getDataSourceUpdateSink()} property of {@link ClientContext}. + *

+ * This interface is not stable, and not subject to any backwards compatibility guarantees or semantic versioning. + * It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode + * + * @since 5.0.0 + * @see DataSource + * @see ClientContext + */ +public interface TransactionalDataSourceUpdateSink { + /** + * Apply the given change set to the store. This should be done atomically if possible. + * + * @param changeSet the changeset to apply + * @return true if the update succeeded, false if it failed + */ + boolean apply(ChangeSet changeSet); +} + diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataModelDependenciesTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataModelDependenciesTest.java index e92603b..4875e53 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataModelDependenciesTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataModelDependenciesTest.java @@ -8,16 +8,21 @@ import com.launchdarkly.sdk.server.DataModel.FeatureFlag; import com.launchdarkly.sdk.server.DataModel.Operator; import com.launchdarkly.sdk.server.DataModel.Segment; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; import com.launchdarkly.sdk.server.DataModelDependencies.DependencyTracker; import com.launchdarkly.sdk.server.DataModelDependencies.KindAndKey; import com.launchdarkly.sdk.server.DataStoreTestTypes.DataBuilder; import com.launchdarkly.sdk.server.DataStoreTestTypes.TestItem; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; import org.junit.Test; +import java.util.AbstractMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -42,6 +47,8 @@ import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @SuppressWarnings("javadoc") @@ -378,4 +385,222 @@ public void kindAndKeyEquality() { .addAny(SEGMENTS, segmentBuilder("o").build()) .build(); + + // ===== SortChangeset tests ===== + + @Test + public void sortChangesetPreservesChangeSetMetadata() { + Selector selector = Selector.make(42, "test-state"); + String environmentId = "test-env"; + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + selector, + ImmutableList.of(), + environmentId + ); + + ChangeSet result = DataModelDependencies.sortChangeset(changeSet); + + assertEquals(ChangeSetType.Partial, result.getType()); + assertEquals(selector.getVersion(), result.getSelector().getVersion()); + assertEquals(selector.getState(), result.getSelector().getState()); + assertEquals(environmentId, result.getEnvironmentId()); + } + + @Test + public void sortChangesetSortsPrerequisiteFlagsFirst() { + FeatureFlag flagC = flagBuilder("c").build(); + FeatureFlag flagE = flagBuilder("e").build(); + FeatureFlag flagB = flagBuilder("b") + .prerequisites( + prerequisite("c", 0), + prerequisite("e", 0) + ) + .build(); + FeatureFlag flagA = flagBuilder("a") + .prerequisites( + prerequisite("b", 0), + prerequisite("c", 0) + ) + .build(); + + ImmutableList>> changeSetData = + ImmutableList.of( + new AbstractMap.SimpleEntry<>( + FEATURES, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("a", new ItemDescriptor(1, flagA)), + new AbstractMap.SimpleEntry<>("b", new ItemDescriptor(1, flagB)), + new AbstractMap.SimpleEntry<>("c", new ItemDescriptor(1, flagC)), + new AbstractMap.SimpleEntry<>("e", new ItemDescriptor(1, flagE)) + )) + ) + ); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(1, "state1"), + changeSetData, + null + ); + + ChangeSet result = DataModelDependencies.sortChangeset(changeSet); + + Map.Entry> flagsData = null; + for (Map.Entry> entry: result.getData()) { + if (entry.getKey() == FEATURES) { + flagsData = entry; + break; + } + } + assertTrue("Should have found FEATURES data", flagsData != null); + + List resultKeys = ImmutableList.copyOf(transform(flagsData.getValue().getItems(), Map.Entry::getKey)); + + // Verify ordering constraints + assertTrue("c should come before b", resultKeys.indexOf("c") < resultKeys.indexOf("b")); + assertTrue("e should come before b", resultKeys.indexOf("e") < resultKeys.indexOf("b")); + assertTrue("b should come before a", resultKeys.indexOf("b") < resultKeys.indexOf("a")); + assertTrue("c should come before a", resultKeys.indexOf("c") < resultKeys.indexOf("a")); + } + + @Test + public void sortChangesetSortsSegmentsBeforeFlags() { + FeatureFlag flag1 = flagBuilder("flag1").build(); + Segment segment1 = segmentBuilder("seg1").build(); + + ImmutableList>> changeSetData = + ImmutableList.of( + new AbstractMap.SimpleEntry<>( + FEATURES, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("flag1", new ItemDescriptor(1, flag1)) + )) + ), + new AbstractMap.SimpleEntry<>( + SEGMENTS, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("seg1", new ItemDescriptor(1, segment1)) + )) + ) + ); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Full, + Selector.make(1, "state1"), + changeSetData, + null + ); + + ChangeSet result = DataModelDependencies.sortChangeset(changeSet); + + List kinds = ImmutableList.copyOf(transform(result.getData(), Map.Entry::getKey)); + assertEquals(2, kinds.size()); + assertEquals(SEGMENTS, kinds.get(0)); + assertEquals(FEATURES, kinds.get(1)); + } + + @Test + public void sortChangesetHandlesEmptyChangeset() { + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Full, + Selector.make(1, "state1"), + ImmutableList.of(), + null + ); + + ChangeSet result = DataModelDependencies.sortChangeset(changeSet); + + assertTrue("Result data should be empty", Iterables.isEmpty(result.getData())); + assertEquals(ChangeSetType.Full, result.getType()); + } + + @Test + public void sortChangesetHandlesMultipleDataKinds() { + FeatureFlag flag1 = flagBuilder("flag1").build(); + Segment segment1 = segmentBuilder("seg1").build(); + Segment segment2 = segmentBuilder("seg2").version(2).build(); + + ImmutableList>> changeSetData = + ImmutableList.of( + new AbstractMap.SimpleEntry<>( + FEATURES, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("flag1", new ItemDescriptor(1, flag1)) + )) + ), + new AbstractMap.SimpleEntry<>( + SEGMENTS, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("seg1", new ItemDescriptor(1, segment1)), + new AbstractMap.SimpleEntry<>("seg2", new ItemDescriptor(2, segment2)) + )) + ) + ); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(1, "state1"), + changeSetData, + null + ); + + ChangeSet result = DataModelDependencies.sortChangeset(changeSet); + + assertEquals(2, Iterables.size(result.getData())); + + Map.Entry> segments = null; + Map.Entry> flags = null; + for (Map.Entry> entry: result.getData()) { + if (entry.getKey() == SEGMENTS) { + segments = entry; + } else if (entry.getKey() == FEATURES) { + flags = entry; + } + } + + assertTrue("Should have found SEGMENTS", segments != null); + assertEquals(2, Iterables.size(segments.getValue().getItems())); + + assertTrue("Should have found FEATURES", flags != null); + assertEquals(1, Iterables.size(flags.getValue().getItems())); + } + + @Test + public void sortChangesetPreservesDeletedItems() { + ImmutableList>> changeSetData = + ImmutableList.of( + new AbstractMap.SimpleEntry<>( + FEATURES, + new KeyedItems<>(ImmutableList.of( + new AbstractMap.SimpleEntry<>("flag1", ItemDescriptor.deletedItem(5)) + )) + ) + ); + + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(1, "state1"), + changeSetData, + null + ); + + ChangeSet result = DataModelDependencies.sortChangeset(changeSet); + + Map.Entry> flagsData = null; + for (Map.Entry> entry: result.getData()) { + if (entry.getKey() == FEATURES) { + flagsData = entry; + break; + } + } + assertTrue("Should have found FEATURES data", flagsData != null); + + Map.Entry item = Iterables.getFirst(flagsData.getValue().getItems(), null); + assertTrue("Should have found item", item != null); + + assertEquals("flag1", item.getKey()); + assertNull(item.getValue().getItem()); + assertEquals(5, item.getValue().getVersion()); + } } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSourceUpdatesImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSourceUpdatesImplTest.java index a4fd47c..d1aa416 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSourceUpdatesImplTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSourceUpdatesImplTest.java @@ -1,14 +1,24 @@ package com.launchdarkly.sdk.server; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.DataModel.FeatureFlag; +import com.launchdarkly.sdk.server.DataModel.Segment; import com.launchdarkly.sdk.server.DataStoreTestTypes.DataBuilder; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorInfo; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorKind; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.Status; +import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider; import com.launchdarkly.sdk.server.subsystems.DataStore; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; import com.launchdarkly.sdk.server.interfaces.FlagChangeEvent; import com.launchdarkly.sdk.server.interfaces.FlagChangeListener; @@ -20,6 +30,11 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -30,6 +45,7 @@ import static com.launchdarkly.sdk.server.ModelBuilders.prerequisite; import static com.launchdarkly.sdk.server.ModelBuilders.ruleBuilder; import static com.launchdarkly.sdk.server.ModelBuilders.segmentBuilder; +import static com.launchdarkly.sdk.server.ModelBuilders.segmentRuleBuilder; import static com.launchdarkly.sdk.server.TestComponents.inMemoryDataStore; import static com.launchdarkly.sdk.server.TestComponents.nullLogger; import static com.launchdarkly.sdk.server.TestComponents.sharedExecutor; @@ -52,7 +68,7 @@ public class DataSourceUpdatesImplTest { private final EasyMockSupport mocks = new EasyMockSupport(); private DataSourceUpdatesImpl makeInstance(DataStore store) { - return makeInstance(store, null); + return makeInstance(store, EventBroadcasterImpl.forDataSourceStatus(sharedExecutor, nullLogger)); } private DataSourceUpdatesImpl makeInstance( @@ -62,6 +78,89 @@ private DataSourceUpdatesImpl makeInstance( return new DataSourceUpdatesImpl(store, null, flagChangeBroadcaster, statusBroadcaster, sharedExecutor, null, nullLogger); } + // ===== Private helper functions ===== + + private static FeatureFlag flagWithPrerequisiteReference(FeatureFlag fromFlag, FeatureFlag toFlag) { + List prereqs = new ArrayList<>(fromFlag.getPrerequisites()); + prereqs.add(prerequisite(toFlag.getKey(), 0)); + return flagBuilder(fromFlag.getKey()) + .version(fromFlag.getVersion()) + .prerequisites(prereqs.toArray(new DataModel.Prerequisite[0])) + .build(); + } + + private static FeatureFlag flagWithSegmentReference(FeatureFlag flag, Segment... segments) { + String[] segmentKeys = new String[segments.length]; + for (int i = 0; i < segments.length; i++) { + segmentKeys[i] = segments[i].getKey(); + } + return flagBuilder(flag.getKey()) + .version(flag.getVersion()) + .rules(ruleBuilder().clauses(ModelBuilders.clauseMatchingSegment(segmentKeys)).build()) + .build(); + } + + private static Segment segmentWithSegmentReference(Segment segment, Segment... segments) { + String[] segmentKeys = new String[segments.length]; + for (int i = 0; i < segments.length; i++) { + segmentKeys[i] = segments[i].getKey(); + } + return segmentBuilder(segment.getKey()) + .version(segment.getVersion()) + .rules(segmentRuleBuilder().clauses(ModelBuilders.clauseMatchingSegment(segmentKeys)).build()) + .build(); + } + + private static FeatureFlag nextVersion(FeatureFlag flag) { + return flagBuilder(flag.getKey()).version(flag.getVersion() + 1).build(); + } + + private static Segment nextVersion(Segment segment) { + return segmentBuilder(segment.getKey()).version(segment.getVersion() + 1).build(); + } + + private static ChangeSet makeFullChangeSet(FeatureFlag... flags) { + List>> data = new ArrayList<>(); + if (flags.length > 0) { + Map flagItems = new HashMap<>(); + for (FeatureFlag flag : flags) { + flagItems.put(flag.getKey(), new ItemDescriptor(flag.getVersion(), flag)); + } + data.add(new AbstractMap.SimpleEntry<>( + FEATURES, + new KeyedItems<>(ImmutableList.copyOf(flagItems.entrySet())) + )); + } + return new ChangeSet<>( + ChangeSetType.Full, + Selector.make(1, "state1"), + data, + null + ); + } + + private static ChangeSet makePartialChangeSet(FeatureFlag... flags) { + List>> data = new ArrayList<>(); + if (flags.length > 0) { + Map flagItems = new HashMap<>(); + for (FeatureFlag flag : flags) { + flagItems.put(flag.getKey(), new ItemDescriptor(flag.getVersion(), flag)); + } + data.add(new AbstractMap.SimpleEntry<>( + FEATURES, + new KeyedItems<>(ImmutableList.copyOf(flagItems.entrySet())) + )); + } + return new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(1, "state1"), + data, + null + ); + } + + // ===== Tests ===== + @Test public void sendsEventsOnInitForNewlyAddedFlags() throws Exception { DataStore store = inMemoryDataStore(); @@ -441,4 +540,501 @@ public void outageTimeoutLogging() throws Exception { assertThat(errorsDesc, containsString("ERROR_RESPONSE(501) (2 times)")); assertThat(errorsDesc, containsString("ERROR_RESPONSE(502) (1 time)")); } + + @Test + public void applyFullChangeSetSendsEventsForNewlyAddedFlags() throws Exception { + DataSourceUpdatesImpl updates = makeInstance(inMemoryDataStore()); + updates.apply(makeFullChangeSet(flagBuilder("flag1").version(1).build())); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + updates.apply(makeFullChangeSet( + flagBuilder("flag1").version(1).build(), + flagBuilder("flag2").version(1).build() + )); + + expectEvents(eventSink, "flag2"); + } + + @Test + public void applyPartialChangeSetSendsEventForNewlyAddedFlag() throws Exception { + DataSourceUpdatesImpl updates = makeInstance(inMemoryDataStore()); + updates.apply(makeFullChangeSet(flagBuilder("flag1").version(1).build())); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + updates.apply(makePartialChangeSet(flagBuilder("flag2").version(1).build())); + + expectEvents(eventSink, "flag2"); + } + + @Test + public void applyFullChangeSetSendsEventsForUpdatedFlag() throws Exception { + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + DataSourceUpdatesImpl updates = makeInstance(inMemoryDataStore()); + updates.apply(makeFullChangeSet(flag1, flag2)); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + updates.apply(makeFullChangeSet(flag1, nextVersion(flag2))); + + expectEvents(eventSink, "flag2"); + } + + @Test + public void applyPartialChangeSetSendsEventForUpdatedFlag() throws Exception { + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + DataSourceUpdatesImpl updates = makeInstance(inMemoryDataStore()); + updates.apply(makeFullChangeSet(flag1, flag2)); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + updates.apply(makePartialChangeSet(nextVersion(flag2))); + + expectEvents(eventSink, "flag2"); + } + + @Test + public void applyFullChangeSetSendsEventsForDeletedFlags() throws Exception { + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + DataSourceUpdatesImpl updates = makeInstance(inMemoryDataStore()); + updates.apply(makeFullChangeSet(flag1, flag2)); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + updates.apply(makeFullChangeSet(flag1)); + + expectEvents(eventSink, "flag2"); + } + + @Test + public void applyPartialChangeSetSendsEventForDeletedFlag() throws Exception { + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + DataSourceUpdatesImpl updates = makeInstance(inMemoryDataStore()); + updates.apply(makeFullChangeSet(flag1, flag2)); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + ItemDescriptor deletedFlag = ItemDescriptor.deletedItem(flag2.getVersion() + 1); + Map flagItems = ImmutableMap.of(flag2.getKey(), deletedFlag); + List>> data = ImmutableList.of( + new AbstractMap.SimpleEntry<>( + FEATURES, + new KeyedItems<>(ImmutableList.copyOf(flagItems.entrySet())) + ) + ); + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(1, "state1"), + data, + null + ); + updates.apply(changeSet); + + expectEvents(eventSink, "flag2"); + } + + @Test + public void applyFullChangeSetSendsEventsForFlagsWhosePrerequisitesChanged() throws Exception { + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + FeatureFlag flag3 = flagBuilder("flag3").version(1).build(); + FeatureFlag flag4 = flagBuilder("flag4").version(1).build(); + FeatureFlag flag5 = flagBuilder("flag5").version(1).build(); + FeatureFlag flag6 = flagBuilder("flag6").version(1).build(); + + DataSourceUpdatesImpl updates = makeInstance(inMemoryDataStore()); + + FeatureFlag[] initialFlags = { + flag1, + flagWithPrerequisiteReference(flag2, flag1), + flag3, + flagWithPrerequisiteReference(flag4, flag1), + flagWithPrerequisiteReference(flag5, flag4), + flag6 + }; + updates.apply(makeFullChangeSet(initialFlags)); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + FeatureFlag[] updatedFlags = { + nextVersion(flag1), + flagWithPrerequisiteReference(flag2, flag1), + flag3, + flagWithPrerequisiteReference(flag4, flag1), + flagWithPrerequisiteReference(flag5, flag4), + flag6 + }; + updates.apply(makeFullChangeSet(updatedFlags)); + + expectEvents(eventSink, "flag1", "flag2", "flag4", "flag5"); + } + + @Test + public void applyPartialChangeSetSendsEventsForFlagsWhosePrerequisitesChanged() throws Exception { + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + FeatureFlag flag3 = flagBuilder("flag3").version(1).build(); + FeatureFlag flag4 = flagBuilder("flag4").version(1).build(); + FeatureFlag flag5 = flagBuilder("flag5").version(1).build(); + FeatureFlag flag6 = flagBuilder("flag6").version(1).build(); + + DataSourceUpdatesImpl updates = makeInstance(inMemoryDataStore()); + + FeatureFlag[] initialFlags = { + flag1, + flagWithPrerequisiteReference(flag2, flag1), + flag3, + flagWithPrerequisiteReference(flag4, flag1), + flagWithPrerequisiteReference(flag5, flag4), + flag6 + }; + updates.apply(makeFullChangeSet(initialFlags)); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + updates.apply(makePartialChangeSet(nextVersion(flag1))); + + expectEvents(eventSink, "flag1", "flag2", "flag4", "flag5"); + } + + @Test + public void applyFullChangeSetSendsEventsForFlagsWhoseSegmentsChanged() throws Exception { + Segment segment1 = segmentBuilder("segment1").version(1).build(); + Segment segment2 = segmentBuilder("segment2").version(1).build(); + Segment segment3 = segmentBuilder("segment3").version(1).build(); + Segment segment1WithSegment2Ref = segmentWithSegmentReference(segment1, segment2); + + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + FeatureFlag flag3 = flagBuilder("flag3").version(1).build(); + FeatureFlag flag4 = flagBuilder("flag4").version(1).build(); + + DataSourceUpdatesImpl updates = makeInstance(inMemoryDataStore()); + + FullDataSet initialData = new DataBuilder() + .addAny(FEATURES, + flag1, + flagWithSegmentReference(flag2, segment1), + flagWithSegmentReference(flag3, segment2), + flagWithPrerequisiteReference(flag4, flag2)) + .addAny(SEGMENTS, + segment1WithSegment2Ref, + segment2, + segment3) + .build(); + updates.init(initialData); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + Segment updatedSegment = nextVersion(segment1WithSegment2Ref); + FullDataSet updatedData = new DataBuilder() + .addAny(FEATURES, + flag1, + flagWithSegmentReference(flag2, segment1), + flagWithSegmentReference(flag3, segment2), + flagWithPrerequisiteReference(flag4, flag2)) + .addAny(SEGMENTS, + updatedSegment, + segment2, + segment3) + .build(); + + List>> changeSetData = new ArrayList<>(); + for (Map.Entry> kindEntry : updatedData.getData()) { + changeSetData.add(new AbstractMap.SimpleEntry<>( + kindEntry.getKey(), + kindEntry.getValue() + )); + } + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Full, + Selector.make(1, "state1"), + changeSetData, + null + ); + updates.apply(changeSet); + + expectEvents(eventSink, "flag2", "flag4"); + } + + @Test + public void applyPartialChangeSetSendsEventsForFlagsWhoseSegmentsChanged() throws Exception { + Segment segment1 = segmentBuilder("segment1").version(1).build(); + Segment segment2 = segmentBuilder("segment2").version(1).build(); + Segment segment3 = segmentBuilder("segment3").version(1).build(); + Segment segment1WithSegment2Ref = segmentWithSegmentReference(segment1, segment2); + + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + FeatureFlag flag3 = flagBuilder("flag3").version(1).build(); + FeatureFlag flag4 = flagBuilder("flag4").version(1).build(); + + DataSourceUpdatesImpl updates = makeInstance(inMemoryDataStore()); + + FullDataSet initialData = new DataBuilder() + .addAny(FEATURES, + flag1, + flagWithSegmentReference(flag2, segment1), + flagWithSegmentReference(flag3, segment2), + flagWithPrerequisiteReference(flag4, flag2)) + .addAny(SEGMENTS, + segment1WithSegment2Ref, + segment2, + segment3) + .build(); + updates.init(initialData); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + Segment updatedSegment = nextVersion(segment1WithSegment2Ref); + Map segmentItems = ImmutableMap.of( + updatedSegment.getKey(), + new ItemDescriptor(updatedSegment.getVersion(), updatedSegment) + ); + List>> segmentData = ImmutableList.of( + new AbstractMap.SimpleEntry<>( + SEGMENTS, + new KeyedItems<>(ImmutableList.copyOf(segmentItems.entrySet())) + ) + ); + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Partial, + Selector.make(1, "state1"), + segmentData, + null + ); + updates.apply(changeSet); + + expectEvents(eventSink, "flag2", "flag4"); + } + + // Tests for legacy (non-transactional) data store path + + private static class LegacyDataStore implements DataStore { + private final Map> data = new HashMap<>(); + + @Override + public void init(FullDataSet allData) { + data.clear(); + for (Map.Entry> kindEntry : allData.getData()) { + DataKind kind = kindEntry.getKey(); + Map items = new HashMap<>(); + for (Map.Entry itemEntry : kindEntry.getValue().getItems()) { + items.put(itemEntry.getKey(), itemEntry.getValue()); + } + data.put(kind, items); + } + } + + @Override + public boolean upsert(DataKind kind, String key, ItemDescriptor item) { + Map items = data.get(kind); + if (items == null) { + items = new HashMap<>(); + data.put(kind, items); + } + + ItemDescriptor oldItem = items.get(key); + if (oldItem != null && oldItem.getVersion() >= item.getVersion()) { + return false; + } + + items.put(key, item); + return true; + } + + @Override + public ItemDescriptor get(DataKind kind, String key) { + Map items = data.get(kind); + if (items != null) { + return items.get(key); + } + return null; + } + + @Override + public KeyedItems getAll(DataKind kind) { + Map items = data.get(kind); + if (items != null) { + return new KeyedItems<>(ImmutableList.copyOf(items.entrySet())); + } + return new KeyedItems<>(ImmutableList.of()); + } + + @Override + public boolean isInitialized() { + return !data.isEmpty(); + } + + @Override + public boolean isStatusMonitoringEnabled() { + return false; + } + + @Override + public DataStoreStatusProvider.CacheStats getCacheStats() { + return null; + } + + @Override + public void close() throws IOException { + // No-op + } + } + + @Test + public void applyFullChangeSetToLegacyStoreCallsInit() throws Exception { + LegacyDataStore legacyStore = new LegacyDataStore(); + DataSourceUpdatesImpl updates = new DataSourceUpdatesImpl( + legacyStore, + null, + flagChangeBroadcaster, + EventBroadcasterImpl.forDataSourceStatus(sharedExecutor, nullLogger), + sharedExecutor, + null, + nullLogger + ); + + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + updates.apply(makeFullChangeSet(flag1, flag2)); + + ItemDescriptor retrievedFlag1 = legacyStore.get(FEATURES, flag1.getKey()); + ItemDescriptor retrievedFlag2 = legacyStore.get(FEATURES, flag2.getKey()); + + assertThat(retrievedFlag1, is(org.hamcrest.Matchers.notNullValue())); + assertThat(retrievedFlag2, is(org.hamcrest.Matchers.notNullValue())); + assertThat(retrievedFlag1.getVersion(), is(flag1.getVersion())); + assertThat(retrievedFlag2.getVersion(), is(flag2.getVersion())); + } + + @Test + public void applyPartialChangeSetToLegacyStoreCallsUpsert() throws Exception { + LegacyDataStore legacyStore = new LegacyDataStore(); + DataSourceUpdatesImpl updates = new DataSourceUpdatesImpl( + legacyStore, + null, + flagChangeBroadcaster, + EventBroadcasterImpl.forDataSourceStatus(sharedExecutor, nullLogger), + sharedExecutor, + null, + nullLogger + ); + + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + updates.apply(makeFullChangeSet(flag1)); + updates.apply(makePartialChangeSet(flag2)); + + ItemDescriptor retrievedFlag1 = legacyStore.get(FEATURES, flag1.getKey()); + ItemDescriptor retrievedFlag2 = legacyStore.get(FEATURES, flag2.getKey()); + + assertThat(retrievedFlag1, is(org.hamcrest.Matchers.notNullValue())); + assertThat(retrievedFlag2, is(org.hamcrest.Matchers.notNullValue())); + assertThat(retrievedFlag1.getVersion(), is(flag1.getVersion())); + assertThat(retrievedFlag2.getVersion(), is(flag2.getVersion())); + } + + @Test + public void applyFullChangeSetToLegacyStoreSendsEvents() throws Exception { + LegacyDataStore legacyStore = new LegacyDataStore(); + DataSourceUpdatesImpl updates = new DataSourceUpdatesImpl( + legacyStore, + null, + flagChangeBroadcaster, + EventBroadcasterImpl.forDataSourceStatus(sharedExecutor, nullLogger), + sharedExecutor, + null, + nullLogger + ); + + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + updates.apply(makeFullChangeSet(flag1)); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + updates.apply(makeFullChangeSet(flag1, flag2)); + + expectEvents(eventSink, "flag2"); + } + + @Test + public void applyPartialChangeSetToLegacyStoreSendsEvents() throws Exception { + LegacyDataStore legacyStore = new LegacyDataStore(); + DataSourceUpdatesImpl updates = new DataSourceUpdatesImpl( + legacyStore, + null, + flagChangeBroadcaster, + EventBroadcasterImpl.forDataSourceStatus(sharedExecutor, nullLogger), + sharedExecutor, + null, + nullLogger + ); + + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + updates.apply(makeFullChangeSet(flag1)); + + BlockingQueue eventSink = new LinkedBlockingQueue<>(); + flagChangeBroadcaster.register(eventSink::add); + + FeatureFlag flag2 = flagBuilder("flag2").version(1).build(); + updates.apply(makePartialChangeSet(flag2)); + + expectEvents(eventSink, "flag2"); + } + + @Test + public void applyFullChangeSetToLegacyStoreWithEnvironmentId() throws Exception { + LegacyDataStore legacyStore = new LegacyDataStore(); + DataSourceUpdatesImpl updates = new DataSourceUpdatesImpl( + legacyStore, + null, + flagChangeBroadcaster, + EventBroadcasterImpl.forDataSourceStatus(sharedExecutor, nullLogger), + sharedExecutor, + null, + nullLogger + ); + + FeatureFlag flag1 = flagBuilder("flag1").version(1).build(); + Map flagItems = ImmutableMap.of( + flag1.getKey(), + new ItemDescriptor(flag1.getVersion(), flag1) + ); + List>> data = ImmutableList.of( + new AbstractMap.SimpleEntry<>( + FEATURES, + new KeyedItems<>(ImmutableList.copyOf(flagItems.entrySet())) + ) + ); + ChangeSet changeSet = new ChangeSet<>( + ChangeSetType.Full, + Selector.make(1, "state1"), + data, + "test-env-id" + ); + updates.apply(changeSet); + + // Note: Java SDK doesn't have InitMetadata/EnvironmentId support in the same way as C#, + // so this test just verifies the changeset is applied without error + ItemDescriptor retrievedFlag1 = legacyStore.get(FEATURES, flag1.getKey()); + assertThat(retrievedFlag1, is(org.hamcrest.Matchers.notNullValue())); + } } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/TestComponents.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/TestComponents.java index 5719309..8982c36 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/TestComponents.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/TestComponents.java @@ -24,6 +24,8 @@ import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer; import com.launchdarkly.sdk.server.subsystems.DataSource; import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink; +import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet; import com.launchdarkly.sdk.server.subsystems.DataStore; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; @@ -183,7 +185,7 @@ public void close() throws IOException { } }; - public static class MockDataSourceUpdates implements DataSourceUpdateSink { + public static class MockDataSourceUpdates implements DataSourceUpdateSink, DataSourceUpdateSinkV2 { public static class UpsertParams { public final DataKind kind; public final String key; @@ -204,6 +206,7 @@ public static class UpsertParams { statusBroadcaster; public final BlockingQueue> receivedInits = new LinkedBlockingQueue<>(); public final BlockingQueue receivedUpserts = new LinkedBlockingQueue<>(); + public final BlockingQueue> receivedApplies = new LinkedBlockingQueue<>(); public MockDataSourceUpdates(DataStore store, DataStoreStatusProvider dataStoreStatusProvider) { this.dataStoreStatusProvider = dataStoreStatusProvider; @@ -244,6 +247,13 @@ public void updateStatus(State newState, ErrorInfo newError) { wrappedInstance.updateStatus(newState, newError); } + @Override + public boolean apply(ChangeSet changeSet) { + boolean result = wrappedInstance.apply(changeSet); + receivedApplies.add(changeSet); + return result; + } + public DataSourceStatusProvider.Status getLastStatus() { return wrappedInstance.getLastStatus(); } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataTest.java index fe71166..e8f207d 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataTest.java @@ -11,6 +11,8 @@ import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider; import com.launchdarkly.sdk.server.subsystems.DataSource; import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink; +import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; @@ -463,9 +465,10 @@ private static class UpsertParams { } } - private static class CapturingDataSourceUpdates implements DataSourceUpdateSink { + private static class CapturingDataSourceUpdates implements DataSourceUpdateSink, DataSourceUpdateSinkV2 { BlockingQueue> inits = new LinkedBlockingQueue<>(); BlockingQueue upserts = new LinkedBlockingQueue<>(); + BlockingQueue> applies = new LinkedBlockingQueue<>(); boolean valid; @Override @@ -489,5 +492,11 @@ public DataStoreStatusProvider getDataStoreStatusProvider() { public void updateStatus(State newState, ErrorInfo newError) { valid = newState == State.VALID; } + + @Override + public boolean apply(ChangeSet changeSet) { + applies.add(changeSet); + return true; + } } } From f3b09f5878f8382334ce6aefa12c0ec366dc9313 Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Thu, 15 Jan 2026 16:36:16 -0500 Subject: [PATCH 2/2] self review --- .../sdk/server/DataSourceUpdatesImpl.java | 4 +- .../sdk/server/DataSourceUpdatesImplTest.java | 148 +++++++++--------- 2 files changed, 73 insertions(+), 79 deletions(-) diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceUpdatesImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceUpdatesImpl.java index 1b87534..17a4f98 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceUpdatesImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceUpdatesImpl.java @@ -368,9 +368,7 @@ private void onTimeout() { private static String describeErrorCount(Map.Entry entry) { return entry.getKey() + " (" + entry.getValue() + (entry.getValue() == 1 ? " time" : " times") + ")"; } - - // ===== ITransactionalDataSourceUpdates methods ===== - + @Override public boolean apply(ChangeSet changeSet) { if (store instanceof TransactionalDataStore) { diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSourceUpdatesImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSourceUpdatesImplTest.java index d1aa416..a6f3744 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSourceUpdatesImplTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSourceUpdatesImplTest.java @@ -158,8 +158,78 @@ private static ChangeSet makePartialChangeSet(FeatureFlag... fla null ); } - - // ===== Tests ===== + + private static class LegacyDataStore implements DataStore { + private final Map> data = new HashMap<>(); + + @Override + public void init(FullDataSet allData) { + data.clear(); + for (Map.Entry> kindEntry : allData.getData()) { + DataKind kind = kindEntry.getKey(); + Map items = new HashMap<>(); + for (Map.Entry itemEntry : kindEntry.getValue().getItems()) { + items.put(itemEntry.getKey(), itemEntry.getValue()); + } + data.put(kind, items); + } + } + + @Override + public boolean upsert(DataKind kind, String key, ItemDescriptor item) { + Map items = data.get(kind); + if (items == null) { + items = new HashMap<>(); + data.put(kind, items); + } + + ItemDescriptor oldItem = items.get(key); + if (oldItem != null && oldItem.getVersion() >= item.getVersion()) { + return false; + } + + items.put(key, item); + return true; + } + + @Override + public ItemDescriptor get(DataKind kind, String key) { + Map items = data.get(kind); + if (items != null) { + return items.get(key); + } + return null; + } + + @Override + public KeyedItems getAll(DataKind kind) { + Map items = data.get(kind); + if (items != null) { + return new KeyedItems<>(ImmutableList.copyOf(items.entrySet())); + } + return new KeyedItems<>(ImmutableList.of()); + } + + @Override + public boolean isInitialized() { + return !data.isEmpty(); + } + + @Override + public boolean isStatusMonitoringEnabled() { + return false; + } + + @Override + public DataStoreStatusProvider.CacheStats getCacheStats() { + return null; + } + + @Override + public void close() throws IOException { + // No-op + } + } @Test public void sendsEventsOnInitForNewlyAddedFlags() throws Exception { @@ -823,80 +893,6 @@ public void applyPartialChangeSetSendsEventsForFlagsWhoseSegmentsChanged() throw expectEvents(eventSink, "flag2", "flag4"); } - // Tests for legacy (non-transactional) data store path - - private static class LegacyDataStore implements DataStore { - private final Map> data = new HashMap<>(); - - @Override - public void init(FullDataSet allData) { - data.clear(); - for (Map.Entry> kindEntry : allData.getData()) { - DataKind kind = kindEntry.getKey(); - Map items = new HashMap<>(); - for (Map.Entry itemEntry : kindEntry.getValue().getItems()) { - items.put(itemEntry.getKey(), itemEntry.getValue()); - } - data.put(kind, items); - } - } - - @Override - public boolean upsert(DataKind kind, String key, ItemDescriptor item) { - Map items = data.get(kind); - if (items == null) { - items = new HashMap<>(); - data.put(kind, items); - } - - ItemDescriptor oldItem = items.get(key); - if (oldItem != null && oldItem.getVersion() >= item.getVersion()) { - return false; - } - - items.put(key, item); - return true; - } - - @Override - public ItemDescriptor get(DataKind kind, String key) { - Map items = data.get(kind); - if (items != null) { - return items.get(key); - } - return null; - } - - @Override - public KeyedItems getAll(DataKind kind) { - Map items = data.get(kind); - if (items != null) { - return new KeyedItems<>(ImmutableList.copyOf(items.entrySet())); - } - return new KeyedItems<>(ImmutableList.of()); - } - - @Override - public boolean isInitialized() { - return !data.isEmpty(); - } - - @Override - public boolean isStatusMonitoringEnabled() { - return false; - } - - @Override - public DataStoreStatusProvider.CacheStats getCacheStats() { - return null; - } - - @Override - public void close() throws IOException { - // No-op - } - } - @Test public void applyFullChangeSetToLegacyStoreCallsInit() throws Exception { LegacyDataStore legacyStore = new LegacyDataStore();