diff --git a/queue/amqp.go b/queue/amqp.go index 6ed04b3..de86a58 100644 --- a/queue/amqp.go +++ b/queue/amqp.go @@ -3,6 +3,7 @@ package queue import ( "fmt" "os" + "strings" "sync" "sync/atomic" "time" @@ -18,6 +19,7 @@ var ( ErrConnectionFailed = errors.NewKind("failed to connect to RabbitMQ: %s") ErrOpenChannel = errors.NewKind("failed to open a channel: %s") ErrRetrievingHeader = errors.NewKind("error retrieving '%s' header from message %s") + ErrRepublishingJobs = errors.NewKind("couldn't republish some jobs : %s") ) const ( @@ -288,9 +290,14 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error { ) } -// RepublishBuried will republish in the main queue all the jobs that timed out without Ack -// or were Rejected with requeue = False. -func (q *AMQPQueue) RepublishBuried() error { +type jobErr struct { + job *Job + err error +} + +// RepublishBuried will republish in the main queue those jobs that timed out without Ack +// or were Rejected with requeue = False and makes comply return true. +func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error { if q.buriedQueue == nil { return fmt.Errorf("buriedQueue is nil, called RepublishBuried on the internal buried queue?") } @@ -304,6 +311,8 @@ func (q *AMQPQueue) RepublishBuried() error { defer iter.Close() retries := 0 + var notComplying []*Job + var errorsPublishing []*jobErr for { j, err := iter.(*AMQPJobIter).nextNonBlocking() if err != nil { @@ -316,7 +325,7 @@ func (q *AMQPQueue) RepublishBuried() error { // if there is nothing after all the retries (meaning: BuriedQueue is surely // empty or any arriving jobs will have to wait to the next call). if retries > buriedNonBlockingRetries { - return nil + break } time.Sleep(50 * time.Millisecond) @@ -324,16 +333,45 @@ func (q *AMQPQueue) RepublishBuried() error { continue } + retries = 0 + if err = j.Ack(); err != nil { return err } - retries = 0 + if republishConditions(conditions).comply(j) { + if err = q.Publish(j); err != nil { + errorsPublishing = append(errorsPublishing, &jobErr{j, err}) + } + } else { + notComplying = append(notComplying, j) + + } + } - if err = q.Publish(j); err != nil { + for _, job := range notComplying { + if err = job.Reject(true); err != nil { return err } } + + return q.handleRepublishErrors(errorsPublishing) +} + +func (q *AMQPQueue) handleRepublishErrors(list []*jobErr) error { + if len(list) > 0 { + stringErrors := []string{} + for _, je := range list { + stringErrors = append(stringErrors, je.err.Error()) + if err := q.buriedQueue.Publish(je.job); err != nil { + return err + } + } + + return ErrRepublishingJobs.New(strings.Join(stringErrors, ": ")) + } + + return nil } // Transaction executes the given callback inside a transaction. diff --git a/queue/amqp_test.go b/queue/amqp_test.go index 7967c80..f6f229c 100644 --- a/queue/amqp_test.go +++ b/queue/amqp_test.go @@ -3,6 +3,7 @@ package queue import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -144,3 +145,54 @@ func TestAMQPHeaders(t *testing.T) { }) } } + +func TestAMQPRepublishBuried(t *testing.T) { + broker, err := NewBroker(testAMQPURI) + require.NoError(t, err) + defer func() { require.NoError(t, broker.Close()) }() + + queueName := newName() + queue, err := broker.Queue(queueName) + require.NoError(t, err) + + amqpQueue, ok := queue.(*AMQPQueue) + require.True(t, ok) + + buried := amqpQueue.buriedQueue + + tests := []struct { + name string + payload string + }{ + {name: "message 1", payload: "payload 1"}, + {name: "message 2", payload: "republish"}, + {name: "message 3", payload: "payload 3"}, + {name: "message 3", payload: "payload 4"}, + } + + for _, test := range tests { + job, err := NewJob() + require.NoError(t, err) + + job.raw = []byte(test.payload) + + err = buried.Publish(job) + require.NoError(t, err) + time.Sleep(1 * time.Second) + } + + var condition RepublishConditionFunc = func(j *Job) bool { + return string(j.raw) == "republish" + } + + err = queue.RepublishBuried(condition) + require.NoError(t, err) + + jobIter, err := queue.Consume(1) + require.NoError(t, err) + defer func() { require.NoError(t, jobIter.Close()) }() + + job, err := jobIter.Next() + require.NoError(t, err) + require.Equal(t, string(job.raw), "republish") +} diff --git a/queue/common.go b/queue/common.go index afc614d..0269dc2 100644 --- a/queue/common.go +++ b/queue/common.go @@ -68,6 +68,25 @@ func NewBroker(uri string) (Broker, error) { // TxCallback is a function to be called in a transaction. type TxCallback func(q Queue) error +// RepublishConditionFunc is a function used to filter jobs to republish. +type RepublishConditionFunc func(job *Job) bool + +type republishConditions []RepublishConditionFunc + +func (c republishConditions) comply(job *Job) bool { + if len(c) == 0 { + return true + } + + for _, condition := range c { + if condition(job) { + return true + } + } + + return false +} + // Queue represents a message queue. type Queue interface { // Publish publishes the given Job to the queue. @@ -80,8 +99,9 @@ type Queue interface { // number of undelivered jobs the iterator will allow at any given // time (see the Acknowledger interface). Consume(advertisedWindow int) (JobIter, error) - // RepublishBuried republish all jobs in the buried queue to the main one - RepublishBuried() error + // RepublishBuried republish to the main queue those jobs complying + // one of the conditions, leaving the rest of them in the buried queue. + RepublishBuried(conditions ...RepublishConditionFunc) error } // JobIter represents an iterator over a set of Jobs. diff --git a/queue/memory.go b/queue/memory.go index 07421e1..26f717e 100644 --- a/queue/memory.go +++ b/queue/memory.go @@ -64,9 +64,15 @@ func (q *memoryQueue) PublishDelayed(j *Job, delay time.Duration) error { return nil } -func (q *memoryQueue) RepublishBuried() error { - for _, j := range q.buriedJobs { - q.Publish(j) +// RepublishBuried implement the Queue interface. +func (q *memoryQueue) RepublishBuried(conditions ...RepublishConditionFunc) error { + for _, job := range q.buriedJobs { + if republishConditions(conditions).comply(job) { + job.ErrorType = "" + if err := q.Publish(job); err != nil { + return err + } + } } return nil }