Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 72 additions & 3 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,20 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
};

static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
auto& proxy = registry.get<FairMQDeviceProxy>();

O2_SIGNPOST_ID_GENERATE(sid, forwarding);
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Cleaning up slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
// Always copy them, because we do not want to actually send them.
// We merely need the side effect of the consume, if applicable.
auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, true, consume);

O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
};

extern volatile int region_read_global_dummy_variable;
volatile int region_read_global_dummy_variable;

Expand Down Expand Up @@ -1680,6 +1694,53 @@ struct WaitBackpressurePolicy {
}
};

auto forwardOnInsertion(ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>& messages) -> void
{
O2_LOG_ENABLE(forwarding);
O2_SIGNPOST_ID_GENERATE(sid, forwarding);

auto& spec = ref.get<DeviceSpec const>();
auto& context = ref.get<DataProcessorContext>();
if (!context.canForwardEarly || spec.forwards.empty()) {
O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding not enabled / needed.");
return;
}

O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding before injecting data into relayer.");
auto& timesliceIndex = ref.get<TimesliceIndex>();
auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput();

auto& proxy = ref.get<FairMQDeviceProxy>();

O2_SIGNPOST_START(forwarding, sid, "forwardInputs",
"Starting forwarding for incoming messages with oldestTimeslice %zu with copy",
oldestTimeslice.timeslice.value);
std::vector<fair::mq::Parts> forwardedParts(proxy.getNumForwardChannels());
DataProcessingHelpers::routeForwardedMessages(proxy, messages, forwardedParts, true, false);

for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
if (forwardedParts[fi].Size() == 0) {
continue;
}
ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
auto& parts = forwardedParts[fi];
if (info.policy == nullptr) {
O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
continue;
}
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
info.policy->forward(parts, ChannelIndex{fi}, ref);
}
auto& asyncQueue = ref.get<AsyncQueue>();
auto& decongestion = ref.get<DecongestionService>();
O2_SIGNPOST_ID_GENERATE(aid, async_queue);
O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
.user<DecongestionContext>({.ref = ref, .oldestTimeslice = oldestTimeslice}));
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
O2_LOG_DISABLE(forwarding);
};

/// This is the inner loop of our framework. The actual implementation
/// is divided in two parts. In the first one we define a set of lambdas
/// which describe what is actually going to happen, hiding all the state
Expand Down Expand Up @@ -1854,12 +1915,13 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
VariableContextHelpers::getTimeslice(variables);
forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
};

auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
&parts.At(headerIndex),
input,
nMessages,
nPayloadsPerHeader,
nullptr,
forwardOnInsertion,
onDrop);
switch (relayed.type) {
case DataRelayer::RelayChoice::Type::Backpressured:
Expand Down Expand Up @@ -2274,9 +2336,16 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;

if (context.canForwardEarly && hasForwards && consumeSomething) {
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
// We used to do fowarding here, however we now do it much earlier.
// We still need to clean the inputs which were already consumed
// via ConsumeExisting and which still have an header to hold the slot.
// FIXME: do we? This should really happen when we do the forwarding on
// insertion, because otherwise we lose the relevant information on how to
// navigate the set of headers. We could actually rely on the messageset index,
// is that the right thing to do though?
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "cleaning early forwarding: %{public}s.", fmt::format("{}", action.op).c_str());
auto& timesliceIndex = ref.get<TimesliceIndex>();
forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
cleanEarlyForward(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
}
markInputsAsDone(action.slot);

Expand Down
9 changes: 6 additions & 3 deletions Framework/Core/src/DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,12 @@ DataRelayer::RelayChoice
// DataRelayer::relay
assert(nPayloads > 0);
size_t saved = 0;
// It's guaranteed we will see all these messages only once, so we can
// do the forwarding here.
auto allMessages = std::span<fair::mq::MessagePtr>(messages, messages + nMessages);
if (onInsertion) {
onInsertion(services, allMessages);
}
for (size_t mi = 0; mi < nMessages; ++mi) {
assert(mi + nPayloads < nMessages);
// We are in calibration mode and the data does not have the calibration bit set.
Expand All @@ -515,9 +521,6 @@ DataRelayer::RelayChoice
continue;
}
auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
if (onInsertion) {
onInsertion(services, span);
}
target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1);
mi += nPayloads;
saved += nPayloads;
Expand Down
Loading