Skip to content

Commit

Permalink
AMQP: added custom delayed queue config (#796)
Browse files Browse the repository at this point in the history
* AMQP: added custom delayed queue config

* added documentation for delayed queue
  • Loading branch information
surendratiwari3 authored Jan 10, 2024
1 parent 26776f9 commit 233ea99
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 36 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ RabbitMQ related configuration. Not necessary if you are using other broker/back
* `QueueBindingArguments`: an optional map of additional arguments used when binding to an AMQP queue
* `BindingKey`: The queue is bind to the exchange with this key, e.g. `machinery_task`
* `PrefetchCount`: How many tasks to prefetch (set to `1` if you have long running tasks)
* `DelayedQueue`: delayed queue name to be used for task retry or delayed task (if empty it will follow auto create and delate delayed queues)

#### DynamoDB

Expand Down
54 changes: 36 additions & 18 deletions v1/brokers/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,24 +354,47 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error {
return fmt.Errorf("JSON marshal error: %s", err)
}

// It's necessary to redeclare the queue each time (to zero its TTL timer).
queueName := fmt.Sprintf(
"delay.%d.%s.%s",
delayMs, // delay duration in mileseconds
b.GetConfig().AMQP.Exchange,
signature.RoutingKey, // routing key
)
queueName := b.GetConfig().AMQP.DelayedQueue
declareQueueArgs := amqp.Table{
// Exchange where to send messages after TTL expiration.
"x-dead-letter-exchange": b.GetConfig().AMQP.Exchange,
// Routing key which use when resending expired messages.
"x-dead-letter-routing-key": signature.RoutingKey,
// Time in milliseconds
// after that message will expire and be sent to destination.
"x-message-ttl": delayMs,
// Time after that the queue will be deleted.
"x-expires": delayMs * 2,
}
messageProperties := amqp.Publishing{
Headers: amqp.Table(signature.Headers),
ContentType: "application/json",
Body: message,
DeliveryMode: amqp.Persistent,
Expiration: fmt.Sprint(delayMs),
}
if queueName == "" {
// It's necessary to redeclare the queue each time (to zero its TTL timer).
queueName = fmt.Sprintf(
"delay.%d.%s.%s",
delayMs, // delay duration in mileseconds
b.GetConfig().AMQP.Exchange,
signature.RoutingKey, // routing key
)
declareQueueArgs = amqp.Table{
// Exchange where to send messages after TTL expiration.
"x-dead-letter-exchange": b.GetConfig().AMQP.Exchange,
// Routing key which use when resending expired messages.
"x-dead-letter-routing-key": signature.RoutingKey,
// Time in milliseconds
// after that message will expire and be sent to destination.
"x-message-ttl": delayMs,
// Time after that the queue will be deleted.
"x-expires": delayMs * 2,
}
messageProperties = amqp.Publishing{
Headers: amqp.Table(signature.Headers),
ContentType: "application/json",
Body: message,
DeliveryMode: amqp.Persistent,
}
}

conn, channel, _, _, _, err := b.Connect(
b.GetConfig().Broker,
b.GetConfig().MultipleBrokerSeparator,
Expand All @@ -397,12 +420,7 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error {
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table(signature.Headers),
ContentType: "application/json",
Body: message,
DeliveryMode: amqp.Persistent,
},
messageProperties,
); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions v1/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type AMQPConfig struct {
BindingKey string `yaml:"binding_key" envconfig:"AMQP_BINDING_KEY"`
PrefetchCount int `yaml:"prefetch_count" envconfig:"AMQP_PREFETCH_COUNT"`
AutoDelete bool `yaml:"auto_delete" envconfig:"AMQP_AUTO_DELETE"`
DelayedQueue string `yaml:"delayed_queue" envconfig:"AMQP_DELAYED_QUEUE"`
}

// DynamoDBConfig wraps DynamoDB related configuration
Expand Down
55 changes: 37 additions & 18 deletions v2/brokers/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,24 +354,48 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error {
return fmt.Errorf("JSON marshal error: %s", err)
}

// It's necessary to redeclare the queue each time (to zero its TTL timer).
queueName := fmt.Sprintf(
"delay.%d.%s.%s",
delayMs, // delay duration in mileseconds
b.GetConfig().AMQP.Exchange,
signature.RoutingKey, // routing key
)
queueName := b.GetConfig().AMQP.DelayedQueue
declareQueueArgs := amqp.Table{
// Exchange where to send messages after TTL expiration.
"x-dead-letter-exchange": b.GetConfig().AMQP.Exchange,
// Routing key which use when resending expired messages.
"x-dead-letter-routing-key": signature.RoutingKey,
// Time in milliseconds
// after that message will expire and be sent to destination.
"x-message-ttl": delayMs,
// Time after that the queue will be deleted.
"x-expires": delayMs * 2,
}
messageProperties := amqp.Publishing{
Headers: amqp.Table(signature.Headers),
ContentType: "application/json",
Body: message,
DeliveryMode: amqp.Persistent,
Expiration: fmt.Sprint(delayMs),
}

if queueName == "" {
// It's necessary to redeclare the queue each time (to zero its TTL timer).
queueName = fmt.Sprintf(
"delay.%d.%s.%s",
delayMs, // delay duration in mileseconds
b.GetConfig().AMQP.Exchange,
signature.RoutingKey, // routing key
)
declareQueueArgs = amqp.Table{
// Exchange where to send messages after TTL expiration.
"x-dead-letter-exchange": b.GetConfig().AMQP.Exchange,
// Routing key which use when resending expired messages.
"x-dead-letter-routing-key": signature.RoutingKey,
// Time in milliseconds
// after that message will expire and be sent to destination.
"x-message-ttl": delayMs,
// Time after that the queue will be deleted.
"x-expires": delayMs * 2,
}
messageProperties = amqp.Publishing{
Headers: amqp.Table(signature.Headers),
ContentType: "application/json",
Body: message,
DeliveryMode: amqp.Persistent,
}
}

conn, channel, _, _, _, err := b.Connect(
b.GetConfig().Broker,
b.GetConfig().MultipleBrokerSeparator,
Expand All @@ -397,12 +421,7 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error {
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table(signature.Headers),
ContentType: "application/json",
Body: message,
DeliveryMode: amqp.Persistent,
},
messageProperties,
); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions v2/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type AMQPConfig struct {
BindingKey string `yaml:"binding_key" envconfig:"AMQP_BINDING_KEY"`
PrefetchCount int `yaml:"prefetch_count" envconfig:"AMQP_PREFETCH_COUNT"`
AutoDelete bool `yaml:"auto_delete" envconfig:"AMQP_AUTO_DELETE"`
DelayedQueue string `yaml:"delayed_queue" envconfig:"AMQP_DELAYED_QUEUE"`
}

// DynamoDBConfig wraps DynamoDB related configuration
Expand Down

0 comments on commit 233ea99

Please sign in to comment.