Skip to content
This repository was archived by the owner on Sep 28, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions queue/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -18,6 +19,7 @@ var (
ErrConnectionFailed = errors.NewKind("failed to connect to RabbitMQ: %s")
ErrOpenChannel = errors.NewKind("failed to open a channel: %s")
ErrRetrievingHeader = errors.NewKind("error retrieving '%s' header from message %s")
ErrRepublishingJobs = errors.NewKind("couldn't republish some jobs : %s")
)

const (
Expand Down Expand Up @@ -288,9 +290,14 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error {
)
}

// RepublishBuried will republish in the main queue all the jobs that timed out without Ack
// or were Rejected with requeue = False.
func (q *AMQPQueue) RepublishBuried() error {
type jobErr struct {
job *Job
err error
}

// RepublishBuried will republish in the main queue those jobs that timed out without Ack
// or were Rejected with requeue = False and makes comply return true.
func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error {
if q.buriedQueue == nil {
return fmt.Errorf("buriedQueue is nil, called RepublishBuried on the internal buried queue?")
}
Expand All @@ -304,6 +311,8 @@ func (q *AMQPQueue) RepublishBuried() error {
defer iter.Close()

retries := 0
var notComplying []*Job
var errorsPublishing []*jobErr
for {
j, err := iter.(*AMQPJobIter).nextNonBlocking()
if err != nil {
Expand All @@ -316,24 +325,53 @@ func (q *AMQPQueue) RepublishBuried() error {
// if there is nothing after all the retries (meaning: BuriedQueue is surely
// empty or any arriving jobs will have to wait to the next call).
if retries > buriedNonBlockingRetries {
return nil
break
}

time.Sleep(50 * time.Millisecond)
retries++
continue
}

retries = 0

if err = j.Ack(); err != nil {
return err
}

retries = 0
if republishConditions(conditions).comply(j) {
if err = q.Publish(j); err != nil {
errorsPublishing = append(errorsPublishing, &jobErr{j, err})
}
} else {
notComplying = append(notComplying, j)

}
}

if err = q.Publish(j); err != nil {
for _, job := range notComplying {
if err = job.Reject(true); err != nil {
return err
}
}

return q.handleRepublishErrors(errorsPublishing)
}

func (q *AMQPQueue) handleRepublishErrors(list []*jobErr) error {
if len(list) > 0 {
stringErrors := []string{}
for _, je := range list {
stringErrors = append(stringErrors, je.err.Error())
if err := q.buriedQueue.Publish(je.job); err != nil {
return err
}
}

return ErrRepublishingJobs.New(strings.Join(stringErrors, ": "))
}

return nil
}

// Transaction executes the given callback inside a transaction.
Expand Down
52 changes: 52 additions & 0 deletions queue/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -144,3 +145,54 @@ func TestAMQPHeaders(t *testing.T) {
})
}
}

func TestAMQPRepublishBuried(t *testing.T) {
broker, err := NewBroker(testAMQPURI)
require.NoError(t, err)
defer func() { require.NoError(t, broker.Close()) }()

queueName := newName()
queue, err := broker.Queue(queueName)
require.NoError(t, err)

amqpQueue, ok := queue.(*AMQPQueue)
require.True(t, ok)

buried := amqpQueue.buriedQueue

tests := []struct {
name string
payload string
}{
{name: "message 1", payload: "payload 1"},
{name: "message 2", payload: "republish"},
{name: "message 3", payload: "payload 3"},
{name: "message 3", payload: "payload 4"},
}

for _, test := range tests {
job, err := NewJob()
require.NoError(t, err)

job.raw = []byte(test.payload)

err = buried.Publish(job)
require.NoError(t, err)
time.Sleep(1 * time.Second)
}

var condition RepublishConditionFunc = func(j *Job) bool {
return string(j.raw) == "republish"
}

err = queue.RepublishBuried(condition)
require.NoError(t, err)

jobIter, err := queue.Consume(1)
require.NoError(t, err)
defer func() { require.NoError(t, jobIter.Close()) }()

job, err := jobIter.Next()
require.NoError(t, err)
require.Equal(t, string(job.raw), "republish")
}
24 changes: 22 additions & 2 deletions queue/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ func NewBroker(uri string) (Broker, error) {
// TxCallback is a function to be called in a transaction.
type TxCallback func(q Queue) error

// RepublishConditionFunc is a function used to filter jobs to republish.
type RepublishConditionFunc func(job *Job) bool

type republishConditions []RepublishConditionFunc

func (c republishConditions) comply(job *Job) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this be simplified?

if len(c) == 0 {
  return true
}

for _, condition := range c {
  if condition(job) {
    return true
  }
}

return false

Otherwise LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it looks like much better 👍

if len(c) == 0 {
return true
}

for _, condition := range c {
if condition(job) {
return true
}
}

return false
}

// Queue represents a message queue.
type Queue interface {
// Publish publishes the given Job to the queue.
Expand All @@ -80,8 +99,9 @@ type Queue interface {
// number of undelivered jobs the iterator will allow at any given
// time (see the Acknowledger interface).
Consume(advertisedWindow int) (JobIter, error)
// RepublishBuried republish all jobs in the buried queue to the main one
RepublishBuried() error
// RepublishBuried republish to the main queue those jobs complying
// one of the conditions, leaving the rest of them in the buried queue.
RepublishBuried(conditions ...RepublishConditionFunc) error
}

// JobIter represents an iterator over a set of Jobs.
Expand Down
12 changes: 9 additions & 3 deletions queue/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,15 @@ func (q *memoryQueue) PublishDelayed(j *Job, delay time.Duration) error {
return nil
}

func (q *memoryQueue) RepublishBuried() error {
for _, j := range q.buriedJobs {
q.Publish(j)
// RepublishBuried implement the Queue interface.
func (q *memoryQueue) RepublishBuried(conditions ...RepublishConditionFunc) error {
for _, job := range q.buriedJobs {
if republishConditions(conditions).comply(job) {
job.ErrorType = ""
if err := q.Publish(job); err != nil {
return err
}
}
}
return nil
}
Expand Down