From 8186142a296969b23b96a827c38562fbb4e610d8 Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Tue, 10 Apr 2018 17:01:57 +0200 Subject: [PATCH 1/6] Add paramanter to RepublishBuried signature to filter jobs Signed-off-by: Manuel Carmona --- queue/amqp.go | 30 +++++++++++++++++++++--------- queue/common.go | 8 ++++++-- queue/common_test.go | 3 ++- queue/memory.go | 12 ++++++++++-- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/queue/amqp.go b/queue/amqp.go index 6ed04b3..2b29e9a 100644 --- a/queue/amqp.go +++ b/queue/amqp.go @@ -288,9 +288,9 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error { ) } -// RepublishBuried will republish in the main queue all the jobs that timed out without Ack -// or were Rejected with requeue = False. -func (q *AMQPQueue) RepublishBuried() error { +// RepublishBuried will republish in the main queue those jobs that timed out without Ack +// or were Rejected with requeue = False and and makes complay return true. +func (q *AMQPQueue) RepublishBuried(comply RepublishConditionFunc) error { if q.buriedQueue == nil { return fmt.Errorf("buriedQueue is nil, called RepublishBuried on the internal buried queue?") } @@ -304,6 +304,7 @@ func (q *AMQPQueue) RepublishBuried() error { defer iter.Close() retries := 0 + notComplying := make([]*Job, 0, 3) for { j, err := iter.(*AMQPJobIter).nextNonBlocking() if err != nil { @@ -316,7 +317,7 @@ func (q *AMQPQueue) RepublishBuried() error { // if there is nothing after all the retries (meaning: BuriedQueue is surely // empty or any arriving jobs will have to wait to the next call). if retries > buriedNonBlockingRetries { - return nil + break } time.Sleep(50 * time.Millisecond) @@ -324,16 +325,27 @@ func (q *AMQPQueue) RepublishBuried() error { continue } - if err = j.Ack(); err != nil { - return err - } - retries = 0 + if comply(j) { + if err = j.Ack(); err != nil { + return err + } - if err = q.Publish(j); err != nil { + if err = q.Publish(j); err != nil { + return err + } + } else { + notComplying = append(notComplying, j) + } + } + + for _, job := range notComplying { + if err = job.Reject(true); err != nil { return err } } + + return nil } // Transaction executes the given callback inside a transaction. diff --git a/queue/common.go b/queue/common.go index afc614d..1cc0a3c 100644 --- a/queue/common.go +++ b/queue/common.go @@ -68,6 +68,9 @@ func NewBroker(uri string) (Broker, error) { // TxCallback is a function to be called in a transaction. type TxCallback func(q Queue) error +// RepublishConditionFunc is a function used to filter jobs to republish. +type RepublishConditionFunc func(job *Job) bool + // Queue represents a message queue. type Queue interface { // Publish publishes the given Job to the queue. @@ -80,8 +83,9 @@ type Queue interface { // number of undelivered jobs the iterator will allow at any given // time (see the Acknowledger interface). Consume(advertisedWindow int) (JobIter, error) - // RepublishBuried republish all jobs in the buried queue to the main one - RepublishBuried() error + // RepublishBuried republish to the main queue those jobs complying + // the condition, leaving the rest of them in the buried queue. + RepublishBuried(comply RepublishConditionFunc) error } // JobIter represents an iterator over a set of Jobs. diff --git a/queue/common_test.go b/queue/common_test.go index ec52e4b..434c29a 100644 --- a/queue/common_test.go +++ b/queue/common_test.go @@ -517,7 +517,8 @@ func (s *QueueSuite) TestRetryQueue() { assert.NoError(err) // 3. republish the jobs in the retry queue. - err = q.RepublishBuried() + testCondition := func(*Job) bool { return true } + err = q.RepublishBuried(testCondition) assert.NoError(err) // 4. re-read the jobs on the main queue. diff --git a/queue/memory.go b/queue/memory.go index 07421e1..8c25475 100644 --- a/queue/memory.go +++ b/queue/memory.go @@ -64,10 +64,18 @@ func (q *memoryQueue) PublishDelayed(j *Job, delay time.Duration) error { return nil } -func (q *memoryQueue) RepublishBuried() error { +// RepublishBuried implement the Queue interface. +func (q *memoryQueue) RepublishBuried(comply RepublishConditionFunc) error { for _, j := range q.buriedJobs { - q.Publish(j) + if comply(j) { + j.ErrorType = "" + if err := q.Publish(j); err != nil { + return err + } + // TODO: remove job fom q.buriedJobs + } } + return nil } From 4219edd27cf3dd9c521522f433b2162f25f03eae Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Thu, 12 Apr 2018 14:01:19 +0200 Subject: [PATCH 2/6] Handle republish errors Signed-off-by: Manuel Carmona --- queue/amqp.go | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/queue/amqp.go b/queue/amqp.go index 2b29e9a..48af681 100644 --- a/queue/amqp.go +++ b/queue/amqp.go @@ -3,6 +3,7 @@ package queue import ( "fmt" "os" + "strings" "sync" "sync/atomic" "time" @@ -18,6 +19,7 @@ var ( ErrConnectionFailed = errors.NewKind("failed to connect to RabbitMQ: %s") ErrOpenChannel = errors.NewKind("failed to open a channel: %s") ErrRetrievingHeader = errors.NewKind("error retrieving '%s' header from message %s") + ErrRepublishingJobs = errors.NewKind("couldn't republish some jobs : %s") ) const ( @@ -288,6 +290,11 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error { ) } +type jobErr struct { + job *Job + err error +} + // RepublishBuried will republish in the main queue those jobs that timed out without Ack // or were Rejected with requeue = False and and makes complay return true. func (q *AMQPQueue) RepublishBuried(comply RepublishConditionFunc) error { @@ -304,7 +311,8 @@ func (q *AMQPQueue) RepublishBuried(comply RepublishConditionFunc) error { defer iter.Close() retries := 0 - notComplying := make([]*Job, 0, 3) + var notComplying []*Job + var errorsPublishing []*jobErr for { j, err := iter.(*AMQPJobIter).nextNonBlocking() if err != nil { @@ -332,7 +340,7 @@ func (q *AMQPQueue) RepublishBuried(comply RepublishConditionFunc) error { } if err = q.Publish(j); err != nil { - return err + errorsPublishing = append(errorsPublishing, &jobErr{j, err}) } } else { notComplying = append(notComplying, j) @@ -345,6 +353,22 @@ func (q *AMQPQueue) RepublishBuried(comply RepublishConditionFunc) error { } } + return handleRepublishErrors(errorsPublishing) +} + +func handleRepublishErrors(list []*jobErr) error { + if len(list) > 1 { + stringErrors := []string{} + for _, je := range list { + stringErrors = append(stringErrors, je.err.Error()) + if err := je.job.Reject(true); err != nil { + return err + } + } + + return ErrRepublishingJobs.New(strings.Join(stringErrors, ": ")) + } + return nil } From 2b234614bae4c2610ddfcb963d1768d623e443bc Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Thu, 12 Apr 2018 14:38:44 +0200 Subject: [PATCH 3/6] Change RepublishBuried to accept '...RepublishConditionFunc' as the parameter Signed-off-by: Manuel Carmona --- queue/amqp.go | 5 +++-- queue/common.go | 21 +++++++++++++++++++-- queue/common_test.go | 3 +-- queue/memory.go | 12 +++++------- 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/queue/amqp.go b/queue/amqp.go index 48af681..bc34adb 100644 --- a/queue/amqp.go +++ b/queue/amqp.go @@ -297,7 +297,7 @@ type jobErr struct { // RepublishBuried will republish in the main queue those jobs that timed out without Ack // or were Rejected with requeue = False and and makes complay return true. -func (q *AMQPQueue) RepublishBuried(comply RepublishConditionFunc) error { +func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error { if q.buriedQueue == nil { return fmt.Errorf("buriedQueue is nil, called RepublishBuried on the internal buried queue?") } @@ -334,7 +334,8 @@ func (q *AMQPQueue) RepublishBuried(comply RepublishConditionFunc) error { } retries = 0 - if comply(j) { + + if republishConditions(conditions).comply(j) { if err = j.Ack(); err != nil { return err } diff --git a/queue/common.go b/queue/common.go index 1cc0a3c..8a2500a 100644 --- a/queue/common.go +++ b/queue/common.go @@ -71,6 +71,23 @@ type TxCallback func(q Queue) error // RepublishConditionFunc is a function used to filter jobs to republish. type RepublishConditionFunc func(job *Job) bool +type republishConditions []RepublishConditionFunc + +func (c republishConditions) comply(job *Job) bool { + if len(c) == 0 { + c = []RepublishConditionFunc{ + func(*Job) bool { return true }, + } + } + + var ok bool + for _, condition := range c { + ok = ok || condition(job) + } + + return ok +} + // Queue represents a message queue. type Queue interface { // Publish publishes the given Job to the queue. @@ -84,8 +101,8 @@ type Queue interface { // time (see the Acknowledger interface). Consume(advertisedWindow int) (JobIter, error) // RepublishBuried republish to the main queue those jobs complying - // the condition, leaving the rest of them in the buried queue. - RepublishBuried(comply RepublishConditionFunc) error + // one of the conditions, leaving the rest of them in the buried queue. + RepublishBuried(conditions ...RepublishConditionFunc) error } // JobIter represents an iterator over a set of Jobs. diff --git a/queue/common_test.go b/queue/common_test.go index 434c29a..ec52e4b 100644 --- a/queue/common_test.go +++ b/queue/common_test.go @@ -517,8 +517,7 @@ func (s *QueueSuite) TestRetryQueue() { assert.NoError(err) // 3. republish the jobs in the retry queue. - testCondition := func(*Job) bool { return true } - err = q.RepublishBuried(testCondition) + err = q.RepublishBuried() assert.NoError(err) // 4. re-read the jobs on the main queue. diff --git a/queue/memory.go b/queue/memory.go index 8c25475..26f717e 100644 --- a/queue/memory.go +++ b/queue/memory.go @@ -65,17 +65,15 @@ func (q *memoryQueue) PublishDelayed(j *Job, delay time.Duration) error { } // RepublishBuried implement the Queue interface. -func (q *memoryQueue) RepublishBuried(comply RepublishConditionFunc) error { - for _, j := range q.buriedJobs { - if comply(j) { - j.ErrorType = "" - if err := q.Publish(j); err != nil { +func (q *memoryQueue) RepublishBuried(conditions ...RepublishConditionFunc) error { + for _, job := range q.buriedJobs { + if republishConditions(conditions).comply(job) { + job.ErrorType = "" + if err := q.Publish(job); err != nil { return err } - // TODO: remove job fom q.buriedJobs } } - return nil } From ffea2b344f454befd614c080319f5b63ee5759da Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Tue, 17 Apr 2018 11:11:50 +0200 Subject: [PATCH 4/6] fix handleRepublishErrors bug in len(list) checking Signed-off-by: Manuel Carmona --- queue/amqp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue/amqp.go b/queue/amqp.go index bc34adb..9ff7278 100644 --- a/queue/amqp.go +++ b/queue/amqp.go @@ -358,7 +358,7 @@ func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error } func handleRepublishErrors(list []*jobErr) error { - if len(list) > 1 { + if len(list) > 0 { stringErrors := []string{} for _, je := range list { stringErrors = append(stringErrors, je.err.Error()) From 886c2b084d7b02ad180b126912697592112d0a3c Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Fri, 20 Apr 2018 15:08:57 +0200 Subject: [PATCH 5/6] queue: fix republish bug Signed-off-by: Manuel Carmona --- queue/amqp.go | 17 +++++++++-------- queue/common.go | 11 +++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/queue/amqp.go b/queue/amqp.go index 9ff7278..de86a58 100644 --- a/queue/amqp.go +++ b/queue/amqp.go @@ -296,7 +296,7 @@ type jobErr struct { } // RepublishBuried will republish in the main queue those jobs that timed out without Ack -// or were Rejected with requeue = False and and makes complay return true. +// or were Rejected with requeue = False and makes comply return true. func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error { if q.buriedQueue == nil { return fmt.Errorf("buriedQueue is nil, called RepublishBuried on the internal buried queue?") @@ -335,16 +335,17 @@ func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error retries = 0 - if republishConditions(conditions).comply(j) { - if err = j.Ack(); err != nil { - return err - } + if err = j.Ack(); err != nil { + return err + } + if republishConditions(conditions).comply(j) { if err = q.Publish(j); err != nil { errorsPublishing = append(errorsPublishing, &jobErr{j, err}) } } else { notComplying = append(notComplying, j) + } } @@ -354,15 +355,15 @@ func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error } } - return handleRepublishErrors(errorsPublishing) + return q.handleRepublishErrors(errorsPublishing) } -func handleRepublishErrors(list []*jobErr) error { +func (q *AMQPQueue) handleRepublishErrors(list []*jobErr) error { if len(list) > 0 { stringErrors := []string{} for _, je := range list { stringErrors = append(stringErrors, je.err.Error()) - if err := je.job.Reject(true); err != nil { + if err := q.buriedQueue.Publish(je.job); err != nil { return err } } diff --git a/queue/common.go b/queue/common.go index 8a2500a..0269dc2 100644 --- a/queue/common.go +++ b/queue/common.go @@ -75,17 +75,16 @@ type republishConditions []RepublishConditionFunc func (c republishConditions) comply(job *Job) bool { if len(c) == 0 { - c = []RepublishConditionFunc{ - func(*Job) bool { return true }, - } + return true } - var ok bool for _, condition := range c { - ok = ok || condition(job) + if condition(job) { + return true + } } - return ok + return false } // Queue represents a message queue. From e5c6eaf720f93b00a1652c8842c7c2ff903e105d Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Fri, 20 Apr 2018 15:09:42 +0200 Subject: [PATCH 6/6] queue: add test for amqp RepublishBuried Signed-off-by: Manuel Carmona --- queue/amqp_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/queue/amqp_test.go b/queue/amqp_test.go index 7967c80..f6f229c 100644 --- a/queue/amqp_test.go +++ b/queue/amqp_test.go @@ -3,6 +3,7 @@ package queue import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -144,3 +145,54 @@ func TestAMQPHeaders(t *testing.T) { }) } } + +func TestAMQPRepublishBuried(t *testing.T) { + broker, err := NewBroker(testAMQPURI) + require.NoError(t, err) + defer func() { require.NoError(t, broker.Close()) }() + + queueName := newName() + queue, err := broker.Queue(queueName) + require.NoError(t, err) + + amqpQueue, ok := queue.(*AMQPQueue) + require.True(t, ok) + + buried := amqpQueue.buriedQueue + + tests := []struct { + name string + payload string + }{ + {name: "message 1", payload: "payload 1"}, + {name: "message 2", payload: "republish"}, + {name: "message 3", payload: "payload 3"}, + {name: "message 3", payload: "payload 4"}, + } + + for _, test := range tests { + job, err := NewJob() + require.NoError(t, err) + + job.raw = []byte(test.payload) + + err = buried.Publish(job) + require.NoError(t, err) + time.Sleep(1 * time.Second) + } + + var condition RepublishConditionFunc = func(j *Job) bool { + return string(j.raw) == "republish" + } + + err = queue.RepublishBuried(condition) + require.NoError(t, err) + + jobIter, err := queue.Consume(1) + require.NoError(t, err) + defer func() { require.NoError(t, jobIter.Close()) }() + + job, err := jobIter.Next() + require.NoError(t, err) + require.Equal(t, string(job.raw), "republish") +}