Using Ansible to consume and publish to RabbitMQ

Ansible has had support for managing the setup of RabbitMQ servers for some time now. However, being new to managing RabbitMQ with Ansible, I was surprised to find that there were not any modules to publish to, or, consume messages from RabbitMQ.

It seemed that an Ansible lookup plugin would be ideal for a RabbitMQ basic blocking consumer, which, would allow consuming messages off a queue and iterating over the resulting messages using with_items or loop.

In Ansible 2.8 you will be able to do this with the lookup rabbitmq plugin. (Before running this you will require the python pika module, and, Ansible 2.8).

    - name: Lookup queue
      set_fact:
        contents: "{{ lookup('rabbitmq', url='amqp://guest:[email protected]:5672/%2F', queue='simple_test') }}"

    - name: Picked up message from the queue
      debug:
        msg: "the queue contained {{ contents }}"

Output from running:


TASK [Picked up message from the queue] ********************************************************************************************************************************************************************
ok: [localhost] => {
    "msg": "the queue contained [{'exchange': u'', 'delivery_mode': 1, 'routing_key': u'simple_test', 'message_count': 0, 'content_type': u'text/plain', 'msg': u'Test message to queue simple_test', 'redelivered': False, 'delivery_tag': 1}]"
}

Note: In a latest update, the headers will be returned in the output as well.

Publishing to a queue

You’ll probably also want to publish some messages onto a queue from Ansible. Maybe you’ll want to process something with Ansible and publish the results to a queue?

In 2.8 you will also be able to do this with the rabbitmq_publish module:

    - name: Publish some text to the hello queue
      rabbitmq_publish:
        url: "amqp://guest:[email protected]:5672/%2F"
        queue: 'hello'
        body: "Hello world from ansible module rabitmq_publish"
        content_type: "text/plain"
      register: output
      delegate_to: localhost

    - debug:
        var: output

    # Testing random queue
    - name: Publish to a random queue
      rabbitmq_publish:
        url: "amqp://guest:[email protected]:5672/%2F"
        body: "RANDOM QUEUE POST"
        content_type: "text/plain"
      register: output_random
      delegate_to: localhost

    - debug:
        var: output_random

    - name: Publish an image to a queue
      rabbitmq_publish:
        url: "amqp://guest:[email protected]:5672/%2F"
        queue: 'hello'
        src: 'ajax-loader.gif'
      register: output2
      delegate_to: localhost

     - debug:
         var: output2

Output from publish play run

TASK [Task 1] **********************************************************************************************************************************************************************************************
changed: [localhost -> localhost]

TASK [debug] ***********************************************************************************************************************************************************************************************
ok: [localhost] => {
    "output": {
        "changed": true, 
        "failed": false, 
        "result": {
            "content_type": "text/plain", 
            "msg": "Successfully published to queue hello", 
            "queue": "hello"
        }
    }
}

TASK [Post to random queue] ********************************************************************************************************************************************************************************
changed: [localhost -> localhost]

TASK [debug] ***********************************************************************************************************************************************************************************************
ok: [localhost] => {
    "output_random": {
        "changed": true, 
        "failed": false, 
        "result": {
            "content_type": "text/plain", 
            "msg": "Successfully published to queue amq.gen-Hkzn8BdgS98PZtSaM3Mdhg", 
            "queue": "amq.gen-Hkzn8BdgS98PZtSaM3Mdhg"
        }
    }
}

TASK [rabbitmq_publish] ************************************************************************************************************************************************************************************
changed: [localhost -> localhost]

TASK [debug] ***********************************************************************************************************************************************************************************************
ok: [localhost] => {
    "output2": {
        "changed": true, 
        "failed": false, 
        "result": {
            "content_type": "image/gif", 
            "msg": "Successfully published to queue hello", 
            "queue": "hello"
        }
    }
}

Want to see more examples of how the lookup plugin and publish module works, check out the integration tests.

  • RabbitMQ lookup plugin
  • Ansible triggered by a consumed RabbitMQ message

    At work we settled on Ansible Tower to take care of scheduling/triggering Ansible playbook runs, however, during the evaluation of Tower the question was always there:  “Do we need Tower?  Can we trigger Ansible playbooks another way with existing systems?”

    Tower is the right choice for us at the moment due to all the features it brings, such as, role based access control (RBAC), API interfaces, logging and much more…

    At home the question still intrigued me,  how to trigger an Ansible playbook run in a eloquent manner.

    Whilst it would be simple and effective enough to SSH into a box and execute an ansible-playbook command line (ssh user@box ‘ansible-playbook playbook.yml’), it didn’t feel eloquent.

    Under the hood Tower uses RabbitMQ, celery, postgresql and Django.  I wondered what it would take to trigger an Ansible playbook run via a RabbitMQ message.

    Ansible-runner to the rescue

    https://github.com/ansible/ansible-runner states:

    A tool and python library that helps when interfacing with Ansible directly or as part of another system whether that be through a container image interface, as a standalone tool, or as a Python module that can be imported. The goal is to provide a stable and consistent interface abstraction to Ansible.

    Fantastic!  From the documentation site: https://ansible-runner.readthedocs.io/en/latest/

    Ansible Runner represents the modularization of the part of Ansible Tower/AWX that is responsible for running ansible and ansible-playbook tasks and gathers the output from it. It does this by presenting a common interface that doesn’t change, even as Ansible itself grows and evolves.

    Workflow

    Breaking down the requirement, I wanted to:

    1. Connect to a RabbitMQ server and subscribe to a queue.
    2. Upon receiving a message on the queue, trigger an Ansible runner playbook execution, or, ad-hoc command.
    3. Print out results to console.
    4. Sit and wait for the next message on the queue.

    Python packages required

    The following packages make it very easy to do this.

    pika is a python package written in python for talking to RabbitMQ (link):

    Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.

    ansible-runner is as discussed above… checkout the documentation (here).

    The script below basically smashes together two examples from:

    You obviously need a RabbitMQ server and queue setup to run this.

     

    Using pika and ansible-runner to execute ansible via a message queue

    The example code is available here: https://github.com/Im0/ansible-runner-rabbitmq

    import ansible_runner
    import pika
    
    '''
    Just a rough a ready example of combining ansible_runner with pika.  Pika subscribes to a
    channel and when a message is present, it grabs the message and fires ansible-runner.
    
    Basically smashing together examples from:
    * https://ansible-runner.readthedocs.io/en/latest/python_interface.html#usage-examples
    * https://pika.readthedocs.io/en/stable/examples/blocking_consume.html
    
    Consider:
    * If content-type of the rabbitmq message is JSON... load that into a dict.
    * Using data from the queue in the ansible-runner (extravars).
    * How to pass large data to ansible-runner.
    * Security considerations
    '''
    
    def on_message(channel, method_frame, header_frame, body):
        print(method_frame.delivery_tag)
        print(body)
        print()
        exec_ansible_runner()
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    
    def exec_ansible_runner():
        # Use private_data_dir if you want the output of the ansible run saved
        #r = ansible_runner.run(private_data_dir='/tmp/demo', host_pattern='localhost', module='shell', module_args='whoami')
        r = ansible_runner.run(json_mode=True, host_pattern='localhost', module='shell', module_args='whoami')
        print("{}: {}".format(r.status, r.rc))
        # successful: 0
        for each_host_event in r.events:
            print(each_host_event['event'])
        print("Final status:")
        print(r.stats)
    
    def main():
        url = 'amqp://guest:[email protected]:5672/%2F'
        parameters = pika.URLParameters(url)
        connection = pika.BlockingConnection(parameters)
        channel = connection.channel()
        channel.basic_consume(on_message, 'hello')
        try:
            channel.start_consuming()
        except KeyboardInterrupt:
            channel.stop_consuming()
        connection.close()
    
    if __name__ == "__main__":
        main()
    
    
    

    This was only intended as a basic example, which, could be extended.