From a3e9fb88c5139dad2597c6d67b77ab1bcd6abf33 Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Mon, 4 Nov 2024 12:21:19 -0300 Subject: [PATCH 01/14] Add implementation --- aggregator/internal/pkg/server.go | 69 ++++++++++++++++++++++++------- core/chainio/avs_reader.go | 45 ++++++++++++++++++++ 2 files changed, 99 insertions(+), 15 deletions(-) diff --git a/aggregator/internal/pkg/server.go b/aggregator/internal/pkg/server.go index b4f1867cf6..a65a37ee33 100644 --- a/aggregator/internal/pkg/server.go +++ b/aggregator/internal/pkg/server.go @@ -35,6 +35,48 @@ func (agg *Aggregator) ServeOperators() error { return err } +// Waits for the arrival of task associated with signedTaskResponse and returns true on success or false on failure +// If the task is not present in the internal map, it will try to fetch it from logs and retry. +// The number of retries is specified by `waitForEventRetries`, and the waiting time between each by `waitForEventSleepSeconds` +func (agg *Aggregator) waitForTask(signedTaskResponse *types.SignedTaskResponse) bool { + for i := 0; i < waitForEventRetries; i++ { + // Lock + agg.taskMutex.Lock() + agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") + _, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash] + // Unlock + agg.logger.Info("- Unlocked Resources: Task not found in the internal map") + agg.taskMutex.Unlock() + if ok { + return true + } + + // Task was not found in internal map, let's try to fetch it from logs + agg.logger.Info("Trying to fetch missed task from logs...") + batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot) + + if err == nil && batch != nil { + agg.logger.Info("Found missed task in logs with merkle root 0x%e", batch.BatchMerkleRoot) + // Adding new task will fail only if it already exists + agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock) + return true + } + + if err != nil { + agg.logger.Warn("Error fetching task from logs: %v", err) + } + + if batch == nil { + agg.logger.Info("Task not found in logs") + } + + // Task was not found, wait and retry + time.Sleep(waitForEventSleepSeconds) + } + + return false +} + // Aggregator Methods // This is the list of methods that the Aggregator exposes to the Operator // The Operator can call these methods to interact with the Aggregator @@ -49,27 +91,25 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t "SenderAddress", "0x"+hex.EncodeToString(signedTaskResponse.SenderAddress[:]), "BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]), "operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:])) - taskIndex := uint32(0) - ok := false - for i := 0; i < waitForEventRetries; i++ { - agg.taskMutex.Lock() - agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") - taskIndex, ok = agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash] - if !ok { - agg.taskMutex.Unlock() - agg.logger.Info("- Unlocked Resources: Task not found in the internal map") - time.Sleep(waitForEventSleepSeconds) - } else { - break - } + if !agg.waitForTask(signedTaskResponse) { + agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") + *reply = 1 + return nil } + agg.taskMutex.Lock() + agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") + taskIndex, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash] if !ok { - agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") + agg.logger.Errorf("Unexpected error fetching for task with merkle root 0x%x", signedTaskResponse.BatchMerkleRoot) *reply = 1 return nil } + // Unlock + agg.logger.Info("- Unlocked Resources: Task not found in the internal map") + agg.taskMutex.Unlock() + agg.telemetry.LogOperatorResponse(signedTaskResponse.BatchMerkleRoot, signedTaskResponse.OperatorId) // Don't wait infinitely if it can't answer @@ -110,7 +150,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t } agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Task response processing finished") - agg.taskMutex.Unlock() return nil } diff --git a/core/chainio/avs_reader.go b/core/chainio/avs_reader.go index 69b8f281b1..41191c85f0 100644 --- a/core/chainio/avs_reader.go +++ b/core/chainio/avs_reader.go @@ -17,6 +17,10 @@ import ( "github.com/Layr-Labs/eigensdk-go/logging" ) +const ( + BatchFetchBlocksRange uint64 = 1000 +) + type AvsReader struct { *sdkavsregistry.ChainReader AvsContractBindings *AvsServiceBindings @@ -150,3 +154,44 @@ func (r *AvsReader) GetOldTaskHash(nBlocksOld uint64, interval uint64) (*[32]byt batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier)) return &batchIdentifierHash, nil } + +// Returns a pending batch from its merkle root or nil if it doesn't exist +// Searches the last `BatchFetchBlocksRange` blocks at most +func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) { + latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(context.Background()) + if err != nil { + latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to get latest block number: %w", err) + } + } + + var fromBlock uint64 = 0 + + if latestBlock > BatchFetchBlocksRange { + fromBlock = latestBlock - BatchFetchBlocksRange + } + + logs, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(&bind.FilterOpts{Start: fromBlock, End: &latestBlock, Context: context.Background()}, [][32]byte{merkleRoot}) + if err != nil { + return nil, err + } + if err := logs.Error(); err != nil { + return nil, err + } + + if !logs.Next() { + return nil, nil //not an error, but no tasks found + } + + batch := logs.Event + + batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...) + batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier)) + state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash) + if state.Responded { + return nil, nil + } + + return batch, nil +} From 4ee91c92531e3f0a455d633df0113dd066445790 Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Mon, 4 Nov 2024 13:10:37 -0300 Subject: [PATCH 02/14] Add missed error handling --- core/chainio/avs_reader.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/chainio/avs_reader.go b/core/chainio/avs_reader.go index 41191c85f0..be3f3c49dc 100644 --- a/core/chainio/avs_reader.go +++ b/core/chainio/avs_reader.go @@ -189,6 +189,9 @@ func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte) (*service batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...) batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier)) state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash) + if err != nil { + return nil, err + } if state.Responded { return nil, nil } From ad522cd4920003d66a069607ad311e117c1986f9 Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Mon, 4 Nov 2024 16:17:18 -0300 Subject: [PATCH 03/14] Requested changes --- aggregator/internal/pkg/server.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/aggregator/internal/pkg/server.go b/aggregator/internal/pkg/server.go index a65a37ee33..bbd5d1334e 100644 --- a/aggregator/internal/pkg/server.go +++ b/aggregator/internal/pkg/server.go @@ -38,14 +38,14 @@ func (agg *Aggregator) ServeOperators() error { // Waits for the arrival of task associated with signedTaskResponse and returns true on success or false on failure // If the task is not present in the internal map, it will try to fetch it from logs and retry. // The number of retries is specified by `waitForEventRetries`, and the waiting time between each by `waitForEventSleepSeconds` -func (agg *Aggregator) waitForTask(signedTaskResponse *types.SignedTaskResponse) bool { +func (agg *Aggregator) waitForTaskAndFetchIfLost(signedTaskResponse *types.SignedTaskResponse) bool { for i := 0; i < waitForEventRetries; i++ { // Lock agg.taskMutex.Lock() - agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") + agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Check if task is present") _, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash] // Unlock - agg.logger.Info("- Unlocked Resources: Task not found in the internal map") + agg.logger.Info("- Unlocked Resources: Check if task is present") agg.taskMutex.Unlock() if ok { return true @@ -92,23 +92,23 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t "BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]), "operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:])) - if !agg.waitForTask(signedTaskResponse) { + if !agg.waitForTaskAndFetchIfLost(signedTaskResponse) { agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") *reply = 1 return nil } agg.taskMutex.Lock() - agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") + agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task taskIndex") taskIndex, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash] + // Unlock + agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Get task taskIndex") + agg.taskMutex.Unlock() if !ok { agg.logger.Errorf("Unexpected error fetching for task with merkle root 0x%x", signedTaskResponse.BatchMerkleRoot) *reply = 1 return nil } - // Unlock - agg.logger.Info("- Unlocked Resources: Task not found in the internal map") - agg.taskMutex.Unlock() agg.telemetry.LogOperatorResponse(signedTaskResponse.BatchMerkleRoot, signedTaskResponse.OperatorId) @@ -149,8 +149,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t *reply = 0 } - agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Task response processing finished") - return nil } From 9c964bbe2a746c006f336d0af3b7e3eab20abcfa Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Wed, 13 Nov 2024 10:51:29 -0300 Subject: [PATCH 04/14] Refactor with retries --- aggregator/pkg/server.go | 94 +++++++++++--------------------------- core/chainio/avs_reader.go | 17 ++++--- core/chainio/retryable.go | 67 +++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 76 deletions(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 7e0c39562b..326468db1d 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -4,15 +4,14 @@ import ( "context" "encoding/hex" "fmt" - "net/http" - "net/rpc" - "strings" - "time" - "github.com/Layr-Labs/eigensdk-go/crypto/bls" eigentypes "github.com/Layr-Labs/eigensdk-go/types" retry "github.com/yetanotherco/aligned_layer/core" "github.com/yetanotherco/aligned_layer/core/types" + "net/http" + "net/rpc" + "strings" + "time" ) func (agg *Aggregator) ServeOperators() error { @@ -37,53 +36,13 @@ func (agg *Aggregator) ServeOperators() error { return err } -// Waits for the arrival of task associated with signedTaskResponse and returns true on success or false on failure -// If the task is not present in the internal map, it will try to fetch it from logs and retry. -// The number of retries is specified by `waitForEventRetries`, and the waiting time between each by `waitForEventSleepSeconds` -func (agg *Aggregator) waitForTaskAndFetchIfLost(signedTaskResponse *types.SignedTaskResponse) bool { - for i := 0; i < waitForEventRetries; i++ { - // Lock - agg.taskMutex.Lock() - agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Check if task is present") - _, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash] - // Unlock - agg.logger.Info("- Unlocked Resources: Check if task is present") - agg.taskMutex.Unlock() - if ok { - return true - } - - // Task was not found in internal map, let's try to fetch it from logs - agg.logger.Info("Trying to fetch missed task from logs...") - batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot) - - if err == nil && batch != nil { - agg.logger.Info("Found missed task in logs with merkle root 0x%e", batch.BatchMerkleRoot) - // Adding new task will fail only if it already exists - agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock) - return true - } - - if err != nil { - agg.logger.Warn("Error fetching task from logs: %v", err) - } - - if batch == nil { - agg.logger.Info("Task not found in logs") - } - - // Task was not found, wait and retry - time.Sleep(waitForEventSleepSeconds) - } - - return false -} - -// Aggregator Methods +// ~~ AGGREGATOR METHODS ~~ // This is the list of methods that the Aggregator exposes to the Operator // The Operator can call these methods to interact with the Aggregator // This methods are automatically registered by the RPC server -// This takes a response an adds it to the internal. If reaching the quorum, it sends the aggregated signatures to ethereum + +// Takes a response from an operator and process it. After processing the response, the associated task may reach quorum, triggering a BLS service response. +// If the task related to the response is not known to the aggregator (not stored in internal map), it will try to fetch it from the logs. // Returns: // - 0: Success // - 1: Error @@ -97,21 +56,22 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) if err != nil { - agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") - *reply = 1 - return nil - } - - agg.taskMutex.Lock() - agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task taskIndex") - taskIndex, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash] - // Unlock - agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Get task taskIndex") - agg.taskMutex.Unlock() - if !ok { - agg.logger.Errorf("Unexpected error fetching for task with merkle root 0x%x", signedTaskResponse.BatchMerkleRoot) - *reply = 1 - return nil + agg.logger.Warn("Task not found in the internal map, might have been missed. Trying to fetch it from logs") + batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot) + if err != nil { + agg.logger.Warn("Pending task with merkle root 0x%x not found in logs") + *reply = 1 + return nil + } + agg.logger.Info("Task was found in the logs, adding it to the internal map") + agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock) + taskIndex, err = agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) + if err != nil { + // This shouldn't happen, since we just added the task + agg.logger.Error("Unexpected error trying to get taskIndex from internal map") + *reply = 1 + return nil + } } agg.telemetry.LogOperatorResponse(signedTaskResponse.BatchMerkleRoot, signedTaskResponse.OperatorId) @@ -194,11 +154,11 @@ func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32 func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) { getTaskIndex_func := func() (uint32, error) { agg.taskMutex.Lock() - agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") + agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task index") taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash] + agg.taskMutex.Unlock() + agg.logger.Info("- Unlocked Resources: Get task index") if !ok { - agg.taskMutex.Unlock() - agg.logger.Info("- Unlocked Resources: Task not found in the internal map") return taskIndex, fmt.Errorf("Task not found in the internal map") } else { return taskIndex, nil diff --git a/core/chainio/avs_reader.go b/core/chainio/avs_reader.go index be3f3c49dc..93bfc9e100 100644 --- a/core/chainio/avs_reader.go +++ b/core/chainio/avs_reader.go @@ -158,21 +158,18 @@ func (r *AvsReader) GetOldTaskHash(nBlocksOld uint64, interval uint64) (*[32]byt // Returns a pending batch from its merkle root or nil if it doesn't exist // Searches the last `BatchFetchBlocksRange` blocks at most func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) { - latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(context.Background()) + latestBlock, err := r.BlockNumberRetryable(context.Background()) if err != nil { - latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(context.Background()) - if err != nil { - return nil, fmt.Errorf("failed to get latest block number: %w", err) - } + return nil, fmt.Errorf("Failed to get latest block number: %w", err) } var fromBlock uint64 = 0 if latestBlock > BatchFetchBlocksRange { - fromBlock = latestBlock - BatchFetchBlocksRange + fromBlock = latestBlock - BatchFetchBlocksRange // TODO: Add this to config } - logs, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(&bind.FilterOpts{Start: fromBlock, End: &latestBlock, Context: context.Background()}, [][32]byte{merkleRoot}) + logs, err := r.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: &latestBlock, Context: context.Background()}, [][32]byte{merkleRoot}) if err != nil { return nil, err } @@ -181,17 +178,19 @@ func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte) (*service } if !logs.Next() { - return nil, nil //not an error, but no tasks found + return nil, nil //Not an error, but no tasks found } batch := logs.Event batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...) batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier)) - state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash) + state, err := r.BatchesStateRetryable(nil, batchIdentifierHash) + if err != nil { return nil, err } + if state.Responded { return nil, nil } diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index 09c53fb4ae..a6c46a902a 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -202,3 +202,70 @@ func SubscribeToNewTasksV3Retryable( } return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) } + +// |---AVS_READER---| + +// TODO: These functions are being copied from AvsSubscriber and should be refactorized +// we don't actually need access to the AvsReader, AvsSubscriber or AbsWriter, but instead to the AvsContractBindings + +// TODO: We should also add the fallback calls to the functions which are missing it + +/* +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (r *AvsReader) BlockNumberRetryable(ctx context.Context) (uint64, error) { + latestBlock_func := func() (uint64, error) { + // Try with main connection + latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(ctx) + if err != nil { + // If error try with fallback connection + latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(ctx) + } + return latestBlock, err + } + return retry.RetryWithData(latestBlock_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) +} + +/* +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (r *AvsReader) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + // Try with main connection + batch, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) + if err != nil { + // If error try with fallback connection + batch, err = r.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV3(opts, batchMerkleRoot) + } + return batch, err + } + return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) +} + +/* +- All errors are considered Transient Errors +- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) +*/ +func (r *AvsReader) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + batchState_func := func() (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int + }, error) { + // Try with main connection + state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + if err != nil { + // If error try with fallback connection + state, err = r.AvsContractBindings.ServiceManagerFallback.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + } + return state, err + } + + return retry.RetryWithData(batchState_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) +} From c8cb8e7794700eb7ad44e111656d722a0c82252f Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Wed, 13 Nov 2024 11:41:04 -0300 Subject: [PATCH 05/14] Add block fetch range to config, fix related bug --- aggregator/pkg/server.go | 6 +++--- config-files/config-aggregator.yaml | 1 + core/chainio/avs_reader.go | 12 ++++-------- core/config/aggregator.go | 3 +++ 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 326468db1d..34a2822f00 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -57,9 +57,9 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t if err != nil { agg.logger.Warn("Task not found in the internal map, might have been missed. Trying to fetch it from logs") - batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot) - if err != nil { - agg.logger.Warn("Pending task with merkle root 0x%x not found in logs") + batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot, agg.AggregatorConfig.Aggregator.PendingBatchFetchBlockRange) + if err != nil || batch == nil { + agg.logger.Warnf("Pending task with merkle root 0x%x not found in logs", signedTaskResponse.BatchMerkleRoot) *reply = 1 return nil } diff --git a/config-files/config-aggregator.yaml b/config-files/config-aggregator.yaml index e5ab14eee5..d21fc9c9e2 100644 --- a/config-files/config-aggregator.yaml +++ b/config-files/config-aggregator.yaml @@ -38,6 +38,7 @@ aggregator: garbage_collector_tasks_age: 20 #The age of tasks that will be removed by the GC, in blocks. Suggested value for prod: '216000' (30 days) garbage_collector_tasks_interval: 10 #The interval of queried blocks to get an old batch. Suggested value for prod: '900' (3 hours) bls_service_task_timeout: 168h # The timeout of bls aggregation service tasks. Suggested value for prod '168h' (7 days) + pending_batch_fetch_block_range: 1000 #The interval of queried blocks to get a pending batch by logs. Suggested valued for prod `1000` ## Operator Configurations # operator: diff --git a/core/chainio/avs_reader.go b/core/chainio/avs_reader.go index 93bfc9e100..6688553d7d 100644 --- a/core/chainio/avs_reader.go +++ b/core/chainio/avs_reader.go @@ -17,10 +17,6 @@ import ( "github.com/Layr-Labs/eigensdk-go/logging" ) -const ( - BatchFetchBlocksRange uint64 = 1000 -) - type AvsReader struct { *sdkavsregistry.ChainReader AvsContractBindings *AvsServiceBindings @@ -156,8 +152,8 @@ func (r *AvsReader) GetOldTaskHash(nBlocksOld uint64, interval uint64) (*[32]byt } // Returns a pending batch from its merkle root or nil if it doesn't exist -// Searches the last `BatchFetchBlocksRange` blocks at most -func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) { +// Searches the last `blockRange` blocks at most +func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte, blockRange uint64) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) { latestBlock, err := r.BlockNumberRetryable(context.Background()) if err != nil { return nil, fmt.Errorf("Failed to get latest block number: %w", err) @@ -165,8 +161,8 @@ func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte) (*service var fromBlock uint64 = 0 - if latestBlock > BatchFetchBlocksRange { - fromBlock = latestBlock - BatchFetchBlocksRange // TODO: Add this to config + if latestBlock > blockRange { + fromBlock = latestBlock - blockRange } logs, err := r.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: &latestBlock, Context: context.Background()}, [][32]byte{merkleRoot}) diff --git a/core/config/aggregator.go b/core/config/aggregator.go index a9dea503c6..df825c88d2 100644 --- a/core/config/aggregator.go +++ b/core/config/aggregator.go @@ -25,6 +25,7 @@ type AggregatorConfig struct { GarbageCollectorTasksAge uint64 GarbageCollectorTasksInterval uint64 BlsServiceTaskTimeout time.Duration + PendingBatchFetchBlockRange uint64 } } @@ -40,6 +41,7 @@ type AggregatorConfigFromYaml struct { GarbageCollectorTasksAge uint64 `yaml:"garbage_collector_tasks_age"` GarbageCollectorTasksInterval uint64 `yaml:"garbage_collector_tasks_interval"` BlsServiceTaskTimeout time.Duration `yaml:"bls_service_task_timeout"` + PendingBatchFetchBlockRange uint64 `yaml:"pending_batch_fetch_block_range"` } `yaml:"aggregator"` } @@ -85,6 +87,7 @@ func NewAggregatorConfig(configFilePath string) *AggregatorConfig { GarbageCollectorTasksAge uint64 GarbageCollectorTasksInterval uint64 BlsServiceTaskTimeout time.Duration + PendingBatchFetchBlockRange uint64 }(aggregatorConfigFromYaml.Aggregator), } } From 775999622cf398ea0871adf102aabca844a4db41 Mon Sep 17 00:00:00 2001 From: Mauro Toscano <12560266+MauroToscano@users.noreply.github.com> Date: Wed, 13 Nov 2024 14:18:03 -0300 Subject: [PATCH 06/14] Update aggregator/pkg/server.go --- aggregator/pkg/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 34a2822f00..6372c25453 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -56,7 +56,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) if err != nil { - agg.logger.Warn("Task not found in the internal map, might have been missed. Trying to fetch it from logs") + agg.logger.Warn("Task not found in the internal map, might have been missed. Trying to fetch task data from Ethereum") batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot, agg.AggregatorConfig.Aggregator.PendingBatchFetchBlockRange) if err != nil || batch == nil { agg.logger.Warnf("Pending task with merkle root 0x%x not found in logs", signedTaskResponse.BatchMerkleRoot) From 9072d8b4123819c1dc9cfbd65ff3cef7637f3c29 Mon Sep 17 00:00:00 2001 From: Mauro Toscano <12560266+MauroToscano@users.noreply.github.com> Date: Wed, 13 Nov 2024 14:28:15 -0300 Subject: [PATCH 07/14] Update aggregator/pkg/server.go --- aggregator/pkg/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 6372c25453..21c290292b 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -63,7 +63,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t *reply = 1 return nil } - agg.logger.Info("Task was found in the logs, adding it to the internal map") + agg.logger.Info("Task was found in Ethereum, adding it to the internal map") agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock) taskIndex, err = agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) if err != nil { From cfe283d51d0c06a8c6544bf51be8b3f25f592f69 Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Wed, 13 Nov 2024 15:33:50 -0300 Subject: [PATCH 08/14] Restore taskMutex behavior on operator response handler --- aggregator/pkg/server.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 21c290292b..1627df2400 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -53,6 +53,14 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t "BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]), "operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:])) + agg.taskMutex.Lock() + agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Process operator response") + // Unlock mutex after returning + defer func() { + agg.taskMutex.Unlock() + agg.logger.Info("- Unlocked Resources: Process operator response") + }() + taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) if err != nil { @@ -153,11 +161,7 @@ func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32 func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) { getTaskIndex_func := func() (uint32, error) { - agg.taskMutex.Lock() - agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task index") taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash] - agg.taskMutex.Unlock() - agg.logger.Info("- Unlocked Resources: Get task index") if !ok { return taskIndex, fmt.Errorf("Task not found in the internal map") } else { From 4773b0a691f99d2a559367961b63b01652fc51b1 Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Wed, 13 Nov 2024 18:52:51 -0300 Subject: [PATCH 09/14] Revert "Restore taskMutex behavior on operator response handler" This reverts commit cfe283d51d0c06a8c6544bf51be8b3f25f592f69. --- aggregator/pkg/server.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 1627df2400..21c290292b 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -53,14 +53,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t "BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]), "operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:])) - agg.taskMutex.Lock() - agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Process operator response") - // Unlock mutex after returning - defer func() { - agg.taskMutex.Unlock() - agg.logger.Info("- Unlocked Resources: Process operator response") - }() - taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) if err != nil { @@ -161,7 +153,11 @@ func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32 func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) { getTaskIndex_func := func() (uint32, error) { + agg.taskMutex.Lock() + agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task index") taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash] + agg.taskMutex.Unlock() + agg.logger.Info("- Unlocked Resources: Get task index") if !ok { return taskIndex, fmt.Errorf("Task not found in the internal map") } else { From 009375fc576d5ce42e0bbf9b29d83694ff53a6e5 Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Thu, 14 Nov 2024 11:41:42 -0300 Subject: [PATCH 10/14] chore: detail in comment --- aggregator/pkg/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 21c290292b..4ab42fd620 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -42,7 +42,7 @@ func (agg *Aggregator) ServeOperators() error { // This methods are automatically registered by the RPC server // Takes a response from an operator and process it. After processing the response, the associated task may reach quorum, triggering a BLS service response. -// If the task related to the response is not known to the aggregator (not stored in internal map), it will try to fetch it from the logs. +// If the task related to the response is not known to the aggregator (not stored in internal map), it will try to fetch it from the contract's Events. // Returns: // - 0: Success // - 1: Error From 9d56887ec4764bc5b3e68c33be8813122a45e7d1 Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Thu, 14 Nov 2024 12:58:04 -0300 Subject: [PATCH 11/14] chore: detail comments --- aggregator/pkg/server.go | 4 ++-- core/chainio/avs_reader.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 4ab42fd620..b880de0f68 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -59,9 +59,9 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t agg.logger.Warn("Task not found in the internal map, might have been missed. Trying to fetch task data from Ethereum") batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot, agg.AggregatorConfig.Aggregator.PendingBatchFetchBlockRange) if err != nil || batch == nil { - agg.logger.Warnf("Pending task with merkle root 0x%x not found in logs", signedTaskResponse.BatchMerkleRoot) + agg.logger.Warnf("Pending task with merkle root 0x%x not found in the contract", signedTaskResponse.BatchMerkleRoot) *reply = 1 - return nil + return nil // TODO non urgent nice to have: return an error. With it, the Operator would know that his signature corresponded to a not found task } agg.logger.Info("Task was found in Ethereum, adding it to the internal map") agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock) diff --git a/core/chainio/avs_reader.go b/core/chainio/avs_reader.go index 6688553d7d..2732872066 100644 --- a/core/chainio/avs_reader.go +++ b/core/chainio/avs_reader.go @@ -174,7 +174,7 @@ func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte, blockRang } if !logs.Next() { - return nil, nil //Not an error, but no tasks found + return nil, nil // Not an error, but no tasks found } batch := logs.Event @@ -188,7 +188,7 @@ func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte, blockRang } if state.Responded { - return nil, nil + return nil, nil // Task found but already responded } return batch, nil From 4b3eef0088044b0bd3aa99345ec1d9be7354da6b Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Wed, 20 Nov 2024 19:59:50 -0300 Subject: [PATCH 12/14] feat: add batcher_start_local_no_fund make target (again) --- Makefile | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Makefile b/Makefile index 6988298b41..cbea91c66f 100644 --- a/Makefile +++ b/Makefile @@ -297,6 +297,11 @@ batcher_start_local: user_fund_payment_service @$(MAKE) run_storage & @cargo run --manifest-path ./batcher/aligned-batcher/Cargo.toml --release -- --config ./config-files/config-batcher.yaml --env-file ./batcher/aligned-batcher/.env.dev +batcher_start_local_no_fund: + @echo "Starting Batcher..." + @$(MAKE) run_storage & + @cargo run --manifest-path ./batcher/aligned-batcher/Cargo.toml --release -- --config ./config-files/config-batcher.yaml --env-file ./batcher/aligned-batcher/.env.dev + install_batcher: @cargo install --path batcher/aligned-batcher From 9392336917e3870d0c9e8a23766face48612877d Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Thu, 21 Nov 2024 00:04:02 -0300 Subject: [PATCH 13/14] Remove unused imports --- aggregator/pkg/server.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 64a014a601..e059653d6e 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -4,13 +4,10 @@ import ( "context" "encoding/hex" "fmt" - "github.com/Layr-Labs/eigensdk-go/crypto/bls" - eigentypes "github.com/Layr-Labs/eigensdk-go/types" retry "github.com/yetanotherco/aligned_layer/core" "github.com/yetanotherco/aligned_layer/core/types" "net/http" "net/rpc" - "strings" "time" ) From 2c5a7d165ba29aa1aad1def397dbb6e8f730b4d6 Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Fri, 22 Nov 2024 12:59:35 -0300 Subject: [PATCH 14/14] Fix merge --- aggregator/pkg/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index d332daf3e2..247902c0b3 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -66,7 +66,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t } agg.logger.Info("Task was found in Ethereum, adding it to the internal map") agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock) - taskIndex, err = agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) + taskIndex, err = agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash) if err != nil { // This shouldn't happen, since we just added the task agg.logger.Error("Unexpected error trying to get taskIndex from internal map")