Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.google.common.collect.Iterables;
import com.launchdarkly.sdk.LDValue;
import com.launchdarkly.sdk.server.DataModel.Operator;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
Expand Down Expand Up @@ -132,6 +134,23 @@ public static FullDataSet<ItemDescriptor> sortAllCollections(FullDataSet<ItemDes
return new FullDataSet<>(builder.build().entrySet());
}

/**
* Sort the data in the changeset in dependency order. If there are any duplicates, then the highest version
* of the duplicate item will be retained.
*
* @param inSet the changeset to sort
* @return a sorted copy of the changeset
*/
public static ChangeSet<ItemDescriptor> sortChangeset(ChangeSet<ItemDescriptor> inSet) {
ImmutableSortedMap.Builder<DataKind, KeyedItems<ItemDescriptor>> builder =
ImmutableSortedMap.orderedBy(dataKindPriorityOrder);
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> entry: inSet.getData()) {
DataKind kind = entry.getKey();
builder.put(kind, sortCollection(kind, entry.getValue()));
}
return new ChangeSet<>(inSet.getType(), inSet.getSelector(), builder.build().entrySet(), inSet.getEnvironmentId());
}

private static KeyedItems<ItemDescriptor> sortCollection(DataKind kind, KeyedItems<ItemDescriptor> input) {
if (!isDependencyOrdered(kind) || isEmpty(input.getItems())) {
return input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.Status;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.StatusListener;
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink;
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2;
import com.launchdarkly.sdk.server.subsystems.DataStore;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems;
import com.launchdarkly.sdk.server.subsystems.TransactionalDataStore;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.interfaces.FlagChangeEvent;
import com.launchdarkly.sdk.server.interfaces.FlagChangeListener;
Expand Down Expand Up @@ -48,7 +51,7 @@
*
* @since 4.11.0
*/
final class DataSourceUpdatesImpl implements DataSourceUpdateSink {
final class DataSourceUpdatesImpl implements DataSourceUpdateSink, DataSourceUpdateSinkV2 {
private final DataStore store;
private final EventBroadcasterImpl<FlagChangeListener, FlagChangeEvent> flagChangeEventNotifier;
private final EventBroadcasterImpl<StatusListener, Status> dataSourceStatusNotifier;
Expand Down Expand Up @@ -365,4 +368,176 @@ private void onTimeout() {
private static String describeErrorCount(Map.Entry<ErrorInfo, Integer> entry) {
return entry.getKey() + " (" + entry.getValue() + (entry.getValue() == 1 ? " time" : " times") + ")";
}

@Override
public boolean apply(ChangeSet<ItemDescriptor> changeSet) {
if (store instanceof TransactionalDataStore) {
return applyToTransactionalStore((TransactionalDataStore) store, changeSet);
}

// Legacy update path for non-transactional stores
return applyToLegacyStore(changeSet);
}

private boolean applyToTransactionalStore(TransactionalDataStore transactionalDataStore,
ChangeSet<ItemDescriptor> changeSet) {
Map<DataKind, Map<String, ItemDescriptor>> oldData;
// Getting the old values requires accessing the store, which can fail.
// If there is a failure to read the store, then we stop treating it as a failure.
try {
oldData = getOldDataIfFlagChangeListeners();
} catch (RuntimeException e) {
reportStoreFailure(e);
return false;
}

ChangeSet<ItemDescriptor> sortedChangeSet = DataModelDependencies.sortChangeset(changeSet);

try {
transactionalDataStore.apply(sortedChangeSet);
lastStoreUpdateFailed = false;
} catch (RuntimeException e) {
reportStoreFailure(e);
return false;
}

// Calling Apply implies that the data source is now in a valid state.
updateStatus(State.VALID, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be duplicating this notification, but we can sort the correct layering once we have things connected together.


Set<KindAndKey> changes = updateDependencyTrackerForChangesetAndDetermineChanges(oldData, sortedChangeSet);

// Now, if we previously queried the old data because someone is listening for flag change events, compare
// the versions of all items and generate events for those (and any other items that depend on them)
if (changes != null) {
sendChangeEvents(changes);
}

return true;
}

private boolean applyToLegacyStore(ChangeSet<ItemDescriptor> sortedChangeSet) {
switch (sortedChangeSet.getType()) {
case Full:
return applyFullChangeSetToLegacyStore(sortedChangeSet);
case Partial:
return applyPartialChangeSetToLegacyStore(sortedChangeSet);
case None:
default:
return true;
}
}

private boolean applyFullChangeSetToLegacyStore(ChangeSet<ItemDescriptor> unsortedChangeset) {
// Convert ChangeSet to FullDataSet for legacy init path
return init(new FullDataSet<>(unsortedChangeset.getData()));
}

private boolean applyPartialChangeSetToLegacyStore(ChangeSet<ItemDescriptor> changeSet) {
// Sorting isn't strictly required here, as upsert behavior didn't traditionally have it,
// but it also doesn't hurt, and there could be cases where it results in slightly
// greater store consistency for persistent stores.
ChangeSet<ItemDescriptor> sortedChangeset = DataModelDependencies.sortChangeset(changeSet);

for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindItemsPair: sortedChangeset.getData()) {
for (Map.Entry<String, ItemDescriptor> item: kindItemsPair.getValue().getItems()) {
boolean applySuccess = upsert(kindItemsPair.getKey(), item.getKey(), item.getValue());
if (!applySuccess) {
return false;
}
}
}
// The upsert will update the store status in the case of a store failure.
// The application of the upserts does not set the store initialized.

// Considering the store will be the same for the duration of the application
// lifecycle we will not be applying a partial update to a store that didn't
// already get a full update. The non-transactional store will also not support a selector.

return true;
}

private Map<DataKind, Map<String, ItemDescriptor>> getOldDataIfFlagChangeListeners() {
if (hasFlagChangeEventListeners()) {
// Query the existing data if any, so that after the update we can send events for
// whatever was changed
Map<DataKind, Map<String, ItemDescriptor>> oldData = new HashMap<>();
for (DataKind kind: ALL_DATA_KINDS) {
KeyedItems<ItemDescriptor> items = store.getAll(kind);
oldData.put(kind, ImmutableMap.copyOf(items.getItems()));
}
return oldData;
} else {
return null;
}
}

private Map<DataKind, Map<String, ItemDescriptor>> changeSetToMap(ChangeSet<ItemDescriptor> changeSet) {
Map<DataKind, Map<String, ItemDescriptor>> ret = new HashMap<>();
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> e: changeSet.getData()) {
ret.put(e.getKey(), ImmutableMap.copyOf(e.getValue().getItems()));
}
return ret;
}

private Set<KindAndKey> updateDependencyTrackerForChangesetAndDetermineChanges(
Map<DataKind, Map<String, ItemDescriptor>> oldDataMap,
ChangeSet<ItemDescriptor> changeSet) {
switch (changeSet.getType()) {
case Full:
return handleFullChangeset(oldDataMap, changeSet);
case Partial:
return handlePartialChangeset(oldDataMap, changeSet);
case None:
return null;
default:
return null;
}
}

private Set<KindAndKey> handleFullChangeset(
Map<DataKind, Map<String, ItemDescriptor>> oldDataMap,
ChangeSet<ItemDescriptor> changeSet) {
dependencyTracker.reset();
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry: changeSet.getData()) {
DataKind kind = kindEntry.getKey();
for (Map.Entry<String, ItemDescriptor> itemEntry: kindEntry.getValue().getItems()) {
String key = itemEntry.getKey();
dependencyTracker.updateDependenciesFrom(kind, key, itemEntry.getValue());
}
}

if (oldDataMap == null) {
return null;
}

Map<DataKind, Map<String, ItemDescriptor>> newDataMap = changeSetToMap(changeSet);
return computeChangedItemsForFullDataSet(oldDataMap, newDataMap);
}

private Set<KindAndKey> handlePartialChangeset(
Map<DataKind, Map<String, ItemDescriptor>> oldDataMap,
ChangeSet<ItemDescriptor> changeSet) {
if (oldDataMap == null) {
// Update dependencies but don't track changes when no listeners
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry: changeSet.getData()) {
DataKind kind = kindEntry.getKey();
for (Map.Entry<String, ItemDescriptor> itemEntry: kindEntry.getValue().getItems()) {
dependencyTracker.updateDependenciesFrom(kind, itemEntry.getKey(), itemEntry.getValue());
}
}
return null;
}

Set<KindAndKey> affectedItems = new HashSet<>();
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry: changeSet.getData()) {
DataKind kind = kindEntry.getKey();
for (Map.Entry<String, ItemDescriptor> itemEntry: kindEntry.getValue().getItems()) {
String key = itemEntry.getKey();
dependencyTracker.updateDependenciesFrom(kind, key, itemEntry.getValue());
dependencyTracker.addAffectedItems(affectedItems, new KindAndKey(kind, key));
}
}

return affectedItems;
}
}
Copy link
Contributor Author

@tanderson-ld tanderson-ld Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: I had accidentally removed these previously.

Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ abstract class Version {
private Version() {}

// This constant is updated automatically by our Gradle script during a release, if the project version has changed
// x-release-please-start-version
static final String SDK_VERSION = "7.10.2";
// x-release-please-end
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.launchdarkly.sdk.server.subsystems;

import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorInfo;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;

/**
* Interfaces required by data source updates implementations in FDv2.
* <p>
* This interface extends {@link TransactionalDataSourceUpdateSink} to add status tracking
* and status update capabilities required for FDv2 data sources.
* <p>
* This interface is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
* It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
*
* @since 5.0.0
* @see TransactionalDataSourceUpdateSink
* @see DataSource
*/
public interface DataSourceUpdateSinkV2 extends TransactionalDataSourceUpdateSink {
/**
* An object that provides status tracking for the data store, if applicable.
* <p>
* This may be useful if the data source needs to be aware of storage problems that might require it
* to take some special action: for instance, if a database outage may have caused some data to be
* lost and therefore the data should be re-requested from LaunchDarkly.
*
* @return a {@link DataStoreStatusProvider}
*/
DataStoreStatusProvider getDataStoreStatusProvider();

/**
* Informs the SDK of a change in the data source's status.
* <p>
* Data source implementations should use this method if they have any concept of being in a valid
* state, a temporarily disconnected state, or a permanently stopped state.
* <p>
* If {@code newState} is different from the previous state, and/or {@code newError} is non-null, the
* SDK will start returning the new status (adding a timestamp for the change) from
* {@link DataSourceStatusProvider#getStatus()}, and will trigger status change events to any
* registered listeners.
* <p>
* A special case is that if {@code newState} is {@link State#INTERRUPTED},
* but the previous state was {@link State#INITIALIZING}, the state will
* remain at {@link State#INITIALIZING} because {@link State#INTERRUPTED}
* is only meaningful after a successful startup.
*
* @param newState the data source state
* @param newError information about a new error, if any
* @see DataSourceStatusProvider
*/
void updateStatus(State newState, ErrorInfo newError);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.launchdarkly.sdk.server.subsystems;

import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;

/**
* Interface that an implementation of {@link DataSource} will use to push data into the SDK transactionally.
* <p>
* The data source interacts with this object, rather than manipulating the data store directly, so
* that the SDK can perform any other necessary operations that must happen when data is updated. This
* object also provides a mechanism to report status changes.
* <p>
* Component factories for {@link DataSource} implementations will receive an implementation of this
* interface in the {@link ClientContext#getDataSourceUpdateSink()} property of {@link ClientContext}.
* <p>
* This interface is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
* It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
*
* @since 5.0.0
* @see DataSource
* @see ClientContext
*/
public interface TransactionalDataSourceUpdateSink {
/**
* Apply the given change set to the store. This should be done atomically if possible.
*
* @param changeSet the changeset to apply
* @return true if the update succeeded, false if it failed
*/
boolean apply(ChangeSet<ItemDescriptor> changeSet);
}

Loading