Advanced Ansible Topics
Custom Modules
Create your own Ansible modules for custom functionality.
Simple Custom Module
#!/usr/bin/python
# library/my_module.py
from ansible.module_utils.basic import AnsibleModule
def main():
module = AnsibleModule(
argument_spec=dict(
name=dict(type='str', required=True),
state=dict(type='str', choices=['present', 'absent'], default='present')
)
)
name = module.params['name']
state = module.params['state']
# Your logic here
result = {
'changed': False,
'message': f'Processing {name} with state {state}'
}
module.exit_json(**result)
if __name__ == '__main__':
main()
Custom Plugins
Filter Plugin
# filter_plugins/custom_filters.py
class FilterModule(object):
def filters(self):
return {
'reverse_string': self.reverse_string,
'multiply': self.multiply
}
def reverse_string(self, string):
return string[::-1]
def multiply(self, value, factor=2):
return value * factor
Use in playbook:
- name: Use custom filter
debug:
msg: "{{ 'hello' | reverse_string }}"
Callback Plugins
# callback_plugins/timing.py
from ansible.plugins.callback import CallbackBase
import time
class CallbackModule(CallbackBase):
def __init__(self):
super(CallbackModule, self).__init__()
self.start_time = time.time()
def v2_playbook_on_stats(self, stats):
end_time = time.time()
self.display.display(
f"Playbook execution time: {end_time - self.start_time:.2f} seconds"
)
Dynamic Inventory
Custom Inventory Script
#!/usr/bin/env python3
import json
import requests
def get_inventory():
# Fetch from API, database, etc.
api_data = requests.get('https://api.example.com/hosts').json()
inventory = {
'webservers': {
'hosts': [],
'vars': {}
},
'_meta': {
'hostvars': {}
}
}
for host in api_data:
inventory['webservers']['hosts'].append(host['hostname'])
inventory['_meta']['hostvars'][host['hostname']] = {
'ansible_host': host['ip_address'],
'custom_var': host['custom_field']
}
return inventory
if __name__ == '__main__':
print(json.dumps(get_inventory()))
Ansible Lookup Plugins
---
- name: Use lookup plugins
hosts: localhost
tasks:
# Read from environment
- debug:
msg: "{{ lookup('env', 'HOME') }}"
# Read file
- debug:
msg: "{{ lookup('file', '/etc/hostname') }}"
# Password generation
- debug:
msg: "{{ lookup('password', '/tmp/passwordfile length=15') }}"
# DNS lookup
- debug:
msg: "{{ lookup('dig', 'example.com') }}"
Ansible Strategies
Linear Strategy (Default)
- hosts: all
strategy: linear
tasks:
- name: Task runs on all hosts before moving to next
debug:
msg: "Hello"
Free Strategy
- hosts: all
strategy: free
tasks:
- name: Each host runs through all tasks independently
debug:
msg: "Hello"
Debug Strategy
- hosts: all
strategy: debug
tasks:
- name: Drops into debugger on failure
command: /bin/false
Asynchronous Actions
---
- name: Async tasks
hosts: all
tasks:
- name: Long running task
command: /usr/local/bin/long-script.sh
async: 3600 # Run for up to 1 hour
poll: 0 # Fire and forget
register: long_task
- name: Do other work
debug:
msg: "Doing other things..."
- name: Check on async task
async_status:
jid: "{{ long_task.ansible_job_id }}"
register: job_result
until: job_result.finished
retries: 30
delay: 10
Ansible Mitogen
Dramatically speeds up Ansible execution.
# Install
pip install mitogen
# ansible.cfg
[defaults]
strategy_plugins = /path/to/mitogen/ansible_mitogen/plugins/strategy
strategy = mitogen_linear
Testing with Molecule
# Install
pip install molecule molecule-docker
# Initialize
molecule init role my-role --driver-name docker
# Test
molecule test
Performance Optimization
Ansible Configuration Tuning
# ansible.cfg
[defaults]
# Increase parallelism
forks = 50
# Enable pipelining (requires sudo without requiretty)
pipelining = True
# Disable fact gathering when not needed
gathering = explicit
# Cache facts to speed up subsequent runs
fact_caching = jsonfile
fact_caching_connection = /tmp/ansible_facts
fact_caching_timeout = 3600
# SSH connection pooling
[ssh_connection]
ssh_args = -o ControlMaster=auto -o ControlPersist=3600s
pipelining = True
control_path = ~/.ssh/ansible-%%r@%%h:%%p
Profiling Playbook Performance
# Enable callback plugin for profiling
# ansible.cfg
[defaults]
callbacks_enabled = ansible.posix.profile_tasks, ansible.posix.timer
# Run playbook
ansible-playbook site.yml
# Output shows task execution times:
# TASK [Install packages] ************************************************ 45.32s
# TASK [Start service] *************************************************** 2.15s
Optimizing Task Execution
---
- name: Optimized playbook patterns
hosts: all
gather_facts: no # Disable if not needed
tasks:
# Use package module instead of apt/yum for portability
- name: Install packages efficiently
package:
name: "{{ packages }}"
state: present
vars:
packages:
- nginx
- postgresql
- redis
# Gather only specific facts when needed
- name: Gather minimal facts
setup:
gather_subset:
- '!all'
- '!min'
- network
# Use async for independent tasks
- name: Run tasks in parallel
command: "{{ item }}"
async: 300
poll: 0
loop:
- /usr/local/bin/task1.sh
- /usr/local/bin/task2.sh
- /usr/local/bin/task3.sh
register: async_tasks
- name: Wait for all tasks
async_status:
jid: "{{ item.ansible_job_id }}"
loop: "{{ async_tasks.results }}"
register: async_results
until: async_results.finished
retries: 30
delay: 10
Advanced Error Handling
Comprehensive Error Handling Strategy
---
- name: Advanced error handling
hosts: webservers
any_errors_fatal: false # Continue despite failures
tasks:
- name: Risky operation with multiple fallbacks
block:
- name: Try primary method
command: /usr/local/bin/deploy.sh
register: primary_result
rescue:
- name: Primary failed, try alternative
command: /usr/local/bin/deploy_alt.sh
register: alt_result
failed_when: false
- name: Check alternative result
fail:
msg: "Alternative deployment also failed"
when: alt_result.rc != 0
always:
- name: Cleanup regardless of success/failure
file:
path: /tmp/deploy_lock
state: absent
- name: Send notification
uri:
url: "https://webhook.example.com/notify"
method: POST
body_format: json
body:
status: "{{ 'success' if primary_result.rc == 0 else 'failed' }}"
host: "{{ inventory_hostname }}"
- name: Ignore errors for specific tasks
command: /usr/bin/optional-command
ignore_errors: yes
- name: Custom failure conditions
shell: check_status.sh
register: status_check
failed_when: "'ERROR' in status_check.stdout or status_check.rc > 5"
changed_when: "'CHANGED' in status_check.stdout"
Advanced Debugging Techniques
Interactive Debugging
---
- name: Debug playbook interactively
hosts: localhost
debugger: on_failed # or 'always', 'never', 'on_unreachable', 'on_skipped'
tasks:
- name: This might fail
command: /bin/false
# When failure occurs:
# [localhost] TASK: This might fail (debug)> p task
# [localhost] TASK: This might fail (debug)> p task_vars
# [localhost] TASK: This might fail (debug)> p result
# [localhost] TASK: This might fail (debug)> c # continue
# [localhost] TASK: This might fail (debug)> r # redo
# [localhost] TASK: This might fail (debug)> q # quit
Advanced Logging
# ansible.cfg
[defaults]
log_path = /var/log/ansible/ansible.log
display_args_to_stdout = True
display_skipped_hosts = False
# Custom logging callback plugin
# callback_plugins/detailed_logging.py
from ansible.plugins.callback import CallbackBase
import json
import logging
class CallbackModule(CallbackBase):
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'notification'
CALLBACK_NAME = 'detailed_logging'
def __init__(self):
super(CallbackModule, self).__init__()
logging.basicConfig(
filename='/var/log/ansible/detailed.log',
level=logging.INFO,
format='%(asctime)s - %(message)s'
)
def v2_runner_on_ok(self, result):
logging.info(json.dumps({
'host': result._host.name,
'task': result.task_name,
'status': 'OK',
'changed': result._result.get('changed', False)
}))
def v2_runner_on_failed(self, result, ignore_errors=False):
logging.error(json.dumps({
'host': result._host.name,
'task': result.task_name,
'status': 'FAILED',
'error': result._result.get('msg', 'Unknown error')
}))
Advanced Jinja2 Templating
Complex Template Patterns
# templates/nginx_config.j2
{% set app_servers = groups['webservers'] | map('extract', hostvars) | list %}
{% set ssl_enabled = ssl_certificate is defined and ssl_certificate %}
upstream {{ app_name }} {
{% for server in app_servers %}
server {{ server.ansible_host }}:{{ server.app_port | default(8080) }} weight={{ server.weight | default(1) }};
{% endfor %}
{% if keepalive_connections is defined %}
keepalive {{ keepalive_connections }};
{% endif %}
}
server {
listen {{ listen_port | default(80) }};
server_name {{ server_name }};
{% if ssl_enabled %}
listen 443 ssl http2;
ssl_certificate {{ ssl_certificate }};
ssl_certificate_key {{ ssl_certificate_key }};
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
{% endif %}
{% for location in locations | default([]) %}
location {{ location.path }} {
{% if location.type == 'proxy' %}
proxy_pass http://{{ location.backend | default(app_name) }};
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
{% elif location.type == 'static' %}
alias {{ location.root }};
expires {{ location.expires | default('30d') }};
{% endif %}
}
{% endfor %}
}
# Advanced filter usage in playbooks
---
- name: Complex Jinja2 examples
hosts: localhost
vars:
servers:
- { name: web1, ip: 10.0.1.10, roles: [web, app] }
- { name: web2, ip: 10.0.1.11, roles: [web] }
- { name: db1, ip: 10.0.2.10, roles: [db] }
tasks:
- name: Filter servers by role
debug:
msg: "{{ servers | selectattr('roles', 'contains', 'web') | map(attribute='name') | list }}"
- name: Group by attribute
debug:
msg: "{{ servers | groupby('roles') }}"
- name: Complex transformation
set_fact:
web_config: >-
{{
servers
| selectattr('roles', 'contains', 'web')
| map(attribute='ip')
| product([8080, 8443])
| map('join', ':')
| list
}}
Action Plugins
Custom Action Plugin
# action_plugins/deploy_with_verification.py
from ansible.plugins.action import ActionBase
import time
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
super(ActionModule, self).run(tmp, task_vars)
# Get module args
source = self._task.args.get('source')
dest = self._task.args.get('dest')
verify_url = self._task.args.get('verify_url')
# Deploy file
copy_result = self._execute_module(
module_name='copy',
module_args={'src': source, 'dest': dest},
task_vars=task_vars,
tmp=tmp
)
if copy_result.get('failed'):
return copy_result
# Wait and verify
time.sleep(5)
verify_result = self._execute_module(
module_name='uri',
module_args={'url': verify_url, 'status_code': 200},
task_vars=task_vars,
tmp=tmp
)
return {
'changed': copy_result.get('changed', False),
'deploy': copy_result,
'verify': verify_result
}
# Use in playbook:
- name: Deploy with automatic verification
deploy_with_verification:
source: /local/app.jar
dest: /opt/app/app.jar
verify_url: http://localhost:8080/health
Inventory Plugins
Custom Inventory Plugin
# inventory_plugins/custom_cmdb.py
from ansible.plugins.inventory import BaseInventoryPlugin
import requests
DOCUMENTATION = r'''
name: custom_cmdb
plugin_type: inventory
short_description: CMDB inventory source
description:
- Get inventory from custom CMDB
options:
plugin:
description: Name of the plugin
required: true
choices: ['custom_cmdb']
cmdb_url:
description: CMDB API URL
required: true
api_key:
description: API authentication key
required: true
'''
class InventoryModule(BaseInventoryPlugin):
NAME = 'custom_cmdb'
def verify_file(self, path):
return super(InventoryModule, self).verify_file(path) and path.endswith(('cmdb.yml', 'cmdb.yaml'))
def parse(self, inventory, loader, path, cache=True):
super(InventoryModule, self).parse(inventory, loader, path, cache)
config = self._read_config_data(path)
cmdb_url = config.get('cmdb_url')
api_key = config.get('api_key')
# Fetch from CMDB
response = requests.get(
cmdb_url,
headers={'Authorization': f'Bearer {api_key}'}
)
for host_data in response.json():
hostname = host_data['hostname']
# Add host
self.inventory.add_host(hostname)
# Set variables
for key, value in host_data.items():
self.inventory.set_variable(hostname, key, value)
# Add to groups
for group in host_data.get('groups', []):
self.inventory.add_group(group)
self.inventory.add_child(group, hostname)
# Usage: inventory/cmdb.yml
plugin: custom_cmdb
cmdb_url: https://cmdb.example.com/api/inventory
api_key: "{{ lookup('env', 'CMDB_API_KEY') }}"
Advanced Module Development
Module with Return Values and Documentation
#!/usr/bin/python
# library/advanced_service.py
DOCUMENTATION = r'''
---
module: advanced_service
short_description: Manage services with advanced features
description:
- Start, stop, restart services with health checks
options:
name:
description: Service name
required: true
type: str
state:
description: Desired state
choices: [started, stopped, restarted]
default: started
type: str
health_check_url:
description: URL to verify service health
type: str
timeout:
description: Health check timeout in seconds
default: 30
type: int
'''
EXAMPLES = r'''
- name: Start nginx with health check
advanced_service:
name: nginx
state: started
health_check_url: http://localhost/health
'''
RETURN = r'''
name:
description: Service name
returned: always
type: str
state:
description: Service state
returned: always
type: str
health_check:
description: Health check result
returned: when health_check_url provided
type: dict
'''
from ansible.module_utils.basic import AnsibleModule
import time
import requests
def check_health(url, timeout):
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
return {'status': 'healthy', 'code': 200}
except Exception as e:
time.sleep(2)
continue
return {'status': 'unhealthy', 'error': 'Timeout'}
def main():
module = AnsibleModule(
argument_spec=dict(
name=dict(type='str', required=True),
state=dict(type='str', default='started', choices=['started', 'stopped', 'restarted']),
health_check_url=dict(type='str', required=False),
timeout=dict(type='int', default=30)
),
supports_check_mode=True
)
result = {
'changed': False,
'name': module.params['name'],
'state': module.params['state']
}
if module.check_mode:
module.exit_json(**result)
# Service management logic here
# ... systemctl commands ...
result['changed'] = True
# Health check if URL provided
if module.params['health_check_url']:
health = check_health(
module.params['health_check_url'],
module.params['timeout']
)
result['health_check'] = health
if health['status'] != 'healthy':
module.fail_json(msg='Health check failed', **result)
module.exit_json(**result)
if __name__ == '__main__':
main()
Best Practices Summary
Advanced Ansible Best Practices
- Performance: Enable pipelining, increase forks, cache facts
- Profiling: Use profile_tasks callback to identify bottlenecks
- Custom Code: Write modules/plugins for complex logic and reusability
- Error Handling: Use block/rescue/always for comprehensive error management
- Debugging: Use interactive debugger and detailed logging
- Async: Leverage async/await for long-running independent tasks
- Templating: Master Jinja2 filters for complex data transformations
- Testing: Use Molecule for thorough testing of roles and playbooks
- Documentation: Document custom modules with DOCUMENTATION blocks
- Monitoring: Implement callback plugins for execution tracking
- Optimization: Disable fact gathering when not needed, use minimal facts
- Version Control: Track all Ansible code, configurations, and inventories