Skip to content

apalis-dev/apalis-amqp

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

40 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

apalis-amqp

Message queuing for Rust using apalis and AMQP.


Overview

apalis-amqp is a Rust crate that provides utilities for integrating apalis with AMQP message queuing systems. It includes an AmqpBackend implementation for use with the pushing and popping messages.

Features

  • Integration between apalis and AMQP message queuing systems.
  • Easy creation of AMQP-backed message queues.
  • Simple consumption of AMQP messages as apalis messages.
  • Supports message acknowledgement and rejection via tower layers.
  • Supports all apalis middleware such as rate-limiting, timeouts, filtering, sentry, prometheus etc.
  • Supports ack messages and allows custom saving results to other backends

Getting started

Before attempting to connect, you need a working amqp backend. We can easily setup using Docker:

Setup RabbitMq

docker run -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=my_user -e RABBITMQ_DEFAULT_PASS=******** rabbitmq:3.8.4-management

# Setup a Vhost
docker exec $(docker ps -q -f ancestor=rabbitmq:3.8.4-management) rabbitmqctl add_vhost my_vhost 

# Add the Vhost  
docker exec $(docker ps -q -f ancestor=rabbitmq:3.8.4-management) rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"

Enabling scheduling (Optional)

docker exec $(docker ps -q -f ancestor=rabbitmq:3.8.4-management) rabbitmq-plugins directories -s

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez

docker cp rabbitmq_delayed_message_exchange-4.1.0.ez \
  $(docker ps -q -f ancestor=rabbitmq:3.8.4-management):/opt/rabbitmq/plugins/

Setup the rust code

Add apalis-amqp to your Cargo.toml

[dependencies]
apalis = "1"
apalis-amqp = "0.4"
serde = "1"

Then add to your main.rs

 use apalis::prelude::*;
 use apalis_amqp::AmqpBackend;
 use serde::{Deserialize, Serialize};

 #[derive(Debug, Serialize, Deserialize)]
 struct TestMessage(usize);

 async fn test_message(message: TestMessage) {
     dbg!(message);
 }

 #[tokio::main]
 async fn main() {
    let env = std::env::var("AMQP_ADDR").unwrap();
    let mq = AmqpBackend::<TestMessage>::new_from_addr(&env).await.unwrap();

     // This can be in another place in the program
    mq.push(TestMessage(42)).await.unwrap();
    
    WorkerBuilder::new("rango-amigo")
      .backend(mq)
      .build(test_message)
      .run()
      .await
      .unwrap();
 }

Run your code and profit!

License

apalis-amqp is licensed under the Apache license. See the LICENSE file for details.

About

Message queuing utilities for Rust using apalis and AMQP.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages