Skip to content
Snippets Groups Projects
Commit 6171c9f7 authored by Fahrudin Halilovic's avatar Fahrudin Halilovic
Browse files

INFRA-10659: Implement self-healing mechanism for Netbox jobs

parent ef80926c
No related branches found
No related tags found
1 merge request!9INFRA-10659: Implement self-healing mechanism for Netbox jobs
Pipeline #202607 failed
This commit is part of merge request !9. Comments created here will be created in the context of that merge request.
import json
from rest_framework import serializers
from dcim.models import Device, DeviceRole, DeviceType, Interface, Site
from ipam.models import Prefix, IPAddress
from virtualization.models import VirtualMachine, VMInterface
from extras.models import Tag
from dcim.choices import DeviceStatusChoices
from virtualization.choices import VirtualMachineStatusChoices
group_prefixes = {
Site: "site_",
DeviceRole: "devicerole_",
DeviceType: "devicetype_",
Prefix: "prefix_",
Tag: "tag_"
}
class TagSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
return {
"name": f"{group_prefixes[Tag]}{instance.slug.replace('-', '_')}",
"variables": json.dumps({"netbox_tag_name": instance.name}),
}
class SiteSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
return {
"name": f"{group_prefixes[Site]}{instance.slug.replace('-', '_')}",
"variables": json.dumps(
{
"netbox_site_status": instance.status,
}
),
}
class DeviceRoleSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
return {
"name": f"{group_prefixes[DeviceRole]}{instance.slug.replace('-', '_')}",
"variables": json.dumps({})
}
class DeviceTypeSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
return {
"name": f"{group_prefixes[DeviceType]}{instance.slug.replace('-', '_')}",
"description": instance.description,
"variables": json.dumps(
{
"netbox_devicetype_model": instance.model,
}
),
}
class PrefixSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
prefix_str = str(instance.prefix).replace(".", "_").replace("/", "_")
return {
"name": f"{group_prefixes[Prefix]}{prefix_str}",
"variables": json.dumps({"netbox_prefix": str(instance.prefix)}),
}
class InterfaceSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
ip_addresses = []
for ip in instance.ip_addresses.all():
serializer = IPAddressSerializer(ip)
ip_addresses.append(serializer.data)
return {
"name": instance.name,
"mac": instance.mac_address,
"ip_addresses": ip_addresses
}
class IPAddressSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
return {
"address": str(instance.address),
"dns_name": instance.dns_name
}
class DeviceSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
variables = {}
variables["netbox_interfaces"] = []
for interface in instance.interfaces.all():
serializer = InterfaceSerializer(interface)
variables["netbox_interfaces"].append(serializer.data)
return {
"name": getattr(instance.primary_ip4, 'dns_name', instance.name),
"description": instance.description,
"enabled": instance.status == DeviceStatusChoices.STATUS_ACTIVE,
"variables": json.dumps(variables),
}
class VMInterfaceSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
ip_addresses = []
for ip in instance.ip_addresses.all():
serializer = IPAddressSerializer(ip)
ip_addresses.append(serializer.data)
return {
"name": instance.name,
"mac": instance.mac_address,
"ip_addresses": ip_addresses
}
class VMSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
variables = {
"netbox_virtualmachine_name": instance.name,
"netbox_virtualmachine_vcpus": float(instance.vcpus) if instance.vcpus is not None else 0.0,
"netbox_virtualmachine_memory": instance.memory or 0,
"netbox_virtualmachine_disk": instance.disk or 0,
}
variables["netbox_interfaces"] = []
for interface in instance.interfaces.all():
serializer = VMInterfaceSerializer(interface)
variables["netbox_interfaces"].append(serializer.data)
return {
"name": getattr(instance.primary_ip4, 'dns_name', instance.name),
"description": instance.description,
"enabled": instance.status == VirtualMachineStatusChoices.STATUS_ACTIVE,
"variables": json.dumps(variables),
}
class AWXHostSerializer(serializers.BaseSerializer):
def to_internal_value(self, data):
return {
"name": data.get('name'),
"description": data.get('description'),
"enabled": data.get('enabled'),
"variables": data.get('variables')
}
class AWXGroupSerializer(serializers.BaseSerializer):
def to_internal_value(self, data):
return {
"name": data.get('name'),
"variables": data.get('variables')
}
serializers_dict = {
Site: SiteSerializer,
DeviceRole: DeviceRoleSerializer,
DeviceType: DeviceTypeSerializer,
Prefix: PrefixSerializer,
Device: DeviceSerializer,
VirtualMachine: VMSerializer,
Tag: TagSerializer
}
import logging
from datetime import timedelta
from dcim.models import Device, DeviceRole, DeviceType, Site
from ipam.models import Prefix
from dcim.choices import DeviceStatusChoices
from virtualization.choices import VirtualMachineStatusChoices
from virtualization.models import VirtualMachine
from extras.models import Tag
from rest_framework import serializers
import json
import logging
from core.models import Job
from .models import AWXInventory
from .serializers import (
serializers_dict,
AWXHostSerializer,
AWXGroupSerializer,
)
from django_rq import get_queue
from rq import get_current_job, Queue
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
from urllib3.exceptions import InsecureRequestWarning
import urllib3
# Set up logging
logger = logging.getLogger(__name__)
group_prefixes = {Site: "site_", DeviceRole: "devicerole_", DeviceType: "devicetype_", Prefix: "prefix_", Tag:"tag_"}
class TagSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
return {
"name": "{}{}".format(group_prefixes[Tag], instance.slug.replace("-", "_")),
"variables": json.dumps({"netbox_tag_name": instance.name}),
}
class SiteSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
return {
"name": "{}{}".format(group_prefixes[Site], instance.slug.replace("-", "_")),
"variables": json.dumps(
{
"netbox_site_status": instance.status,
}
),
}
class DeviceRoleSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
return {
"name": "{}{}".format(group_prefixes[DeviceRole], instance.slug.replace("-", "_")),
"variables": json.dumps({})
}
class DeviceTypeSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
return {
"name": "{}{}".format(group_prefixes[DeviceType], instance.slug.replace("-", "_")),
"description": instance.description,
"variables": json.dumps(
{
"netbox_devicetype_model": instance.model,
}
),
}
class PrefixSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
return {
"name": "{}{}".format(group_prefixes[Prefix], str(instance.prefix).replace(".", "_").replace("/", "_")),
"variables": json.dumps({"netbox_prefix": "{}".format(str(instance.prefix))}),
}
class InterfaceSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
ip_addresses = []
for ip in instance.ip_addresses.all():
serializer = IPAddressSerializer(ip)
ip_addresses.append(serializer.data)
return {
"name": instance.name,
"mac": str(instance.mac_address) if instance.mac_address else None,
"ip_addresses": ip_addresses
}
class IPAddressSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
return {
"address": str(instance.address),
"dns_name": instance.dns_name
}
class DeviceSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
variables = {}
variables["netbox_interfaces"] = []
for interface in instance.interfaces.all():
serializer = InterfaceSerializer(interface)
variables["netbox_interfaces"].append(serializer.data)
return {
"name": getattr(instance.primary_ip4, 'dns_name', instance.name),
"description": instance.description,
"enabled": instance.status == DeviceStatusChoices.STATUS_ACTIVE,
"variables": json.dumps(variables),
}
class VMInterfaceSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
ip_addresses = []
for ip in instance.ip_addresses.all():
serializer = IPAddressSerializer(ip)
ip_addresses.append(serializer.data)
return {
"name": instance.name,
"mac": str(instance.mac_address) if instance.mac_address else None,
"ip_addresses": ip_addresses
}
class VMSerializer(serializers.BaseSerializer):
def to_representation(self, instance):
variables = {
"netbox_virtualmachine_name": instance.name,
"netbox_virtualmachine_vcpus": float(instance.vcpus) if instance.vcpus is not None else 0.0,
"netbox_virtualmachine_memory": instance.memory or 0,
"netbox_virtualmachine_disk": instance.disk or 0,
}
variables["netbox_interfaces"] = []
for interface in instance.interfaces.all():
serializer = VMInterfaceSerializer(interface)
variables["netbox_interfaces"].append(serializer.data)
return {
"name": getattr(instance.primary_ip4, 'dns_name', instance.name),
"description": instance.description,
"enabled": instance.status == VirtualMachineStatusChoices.STATUS_ACTIVE,
"variables": json.dumps(variables),
}
class AWXHostSerializer(serializers.BaseSerializer):
def to_internal_value(self, data):
return {
"name": data.get('name'),
"description": data.get('description'),
"enabled": data.get('enabled'),
"variables": data.get('variables')
}
class AWXGroupSerializer(serializers.BaseSerializer):
def to_internal_value(self, data):
return {
"name": data.get('name'),
"variables": data.get('variables')
}
# Suppress only the single InsecureRequestWarning from urllib3
urllib3.disable_warnings(InsecureRequestWarning)
# Configuration for retry mechanism
MAX_RETRIES = 5
INITIAL_DELAY = 1
BACKOFF_FACTOR = 2
serializers = {
Site: SiteSerializer,
DeviceRole: DeviceRoleSerializer,
DeviceType: DeviceTypeSerializer,
Prefix: PrefixSerializer,
Device: DeviceSerializer,
VirtualMachine: VMSerializer,
Tag: TagSerializer
def reschedule_job(job, delay_seconds, attempt):
"""
Reschedules the current job with the same job ID and the specified delay.
"""
try:
job.meta['attempt'] = attempt
job.save_meta()
# Remove the job from its current queue and re-add it with a delay
queue = Queue(job.origin, connection=job.connection)
queue.enqueue_in(timedelta(seconds=delay_seconds), job.func, *job.args, **job.kwargs, job_id=job.id, meta=job.meta)
logger.info(f"Job {job.id} rescheduled with delay of {delay_seconds} seconds for attempt {attempt}.")
except Exception as e:
logger.error(f"Failed to reschedule job {job.id}: {e}")
raise
def retry_on_failure(task_name):
"""
A decorator that retries a function if specific network-related exceptions occur.
It retries up to MAX_RETRIES times, and if all attempts fail, it raises an exception
so that Django RQ can mark the job as failed.
"""
def decorator(func):
def wrapper(*args, **kwargs):
job = get_current_job()
attempt = job.meta.get('attempt', 1)
try:
return func(*args, **kwargs)
except (RequestException, HTTPError, ConnectionError, Timeout) as e:
logger.error(f"Error occurred during {task_name} on attempt {attempt}: {e}")
if attempt < MAX_RETRIES:
delay_seconds = INITIAL_DELAY * (BACKOFF_FACTOR ** (attempt - 1))
job.timeout = delay_seconds + 60 # Adjust the timeout for the next attempt
reschedule_job(job, delay_seconds, attempt + 1)
else:
logger.error(f"Task {task_name} failed on the last attempt: {e}")
# Let the exception propagate so the job is marked as failed
raise
return wrapper
return decorator
group_prefixes = {
Site: "site_",
DeviceRole: "devicerole_",
DeviceType: "devicetype_",
Prefix: "prefix_",
Tag: "tag_"
}
@retry_on_failure('sync_host')
def sync_host(inventory, sender, instance):
serializer = serializers[sender](instance)
"""
Synchronizes a host in AWX inventory.
"""
serializer_class = serializers_dict[sender]
serializer = serializer_class(instance)
host = inventory.get_host(serializer.data["name"])
if host is None:
# If the host doesn't exist, create it.
inventory.create_host(serializer.data)
host = inventory.get_host(serializer.data["name"])
else:
# If the host exists, update it if necessary.
host_serializer = AWXHostSerializer(data=host)
if not host_serializer.is_valid() or host_serializer.validated_data != serializer.data:
inventory.update_host(host["id"], serializer.data)
# Sync group associations
if host_serializer.is_valid():
if host_serializer.validated_data != serializer.data:
inventory.update_host(host["id"], serializer.data)
else:
logger.error(f"Invalid data for host_serializer: {host_serializer.errors}")
current_groups = host["summary_fields"]["groups"]["results"]
# Handle associations for each type (Site, DeviceRole, DeviceType, and Tags)
if instance.site:
if hasattr(instance, 'site') and instance.site:
sync_host_group_association(inventory, host, Site, instance.site, current_groups)
if instance.role:
if hasattr(instance, 'role') and instance.role:
sync_host_group_association(inventory, host, DeviceRole, instance.role, current_groups)
if isinstance(instance, Device) and instance.device_type:
sync_host_group_association(inventory, host, DeviceType, instance.device_type, current_groups)
if tags := instance.tags.all():
if hasattr(instance, 'tags'):
tags = instance.tags.all()
for tag in tags:
sync_host_group_association(inventory, host, Tag, tag, current_groups)
# Disassociate groups that are no longer relevant
disassociate_removed_groups(inventory, host, instance, current_groups)
@retry_on_failure('sync_host_group_association')
def sync_host_group_association(inventory, host, object_type, instance, current_groups):
"""
Associates the host with the appropriate AWX group.
Synchronizes host group associations in AWX inventory.
"""
serializer = serializers[object_type](instance)
serializer_class = serializers_dict[object_type]
serializer = serializer_class(instance)
group_name = serializer.data["name"]
# Filter out the groups that match the current object_type (e.g., Site, Tag)
relevant_groups = [
group for group in current_groups if group["name"].startswith(group_prefixes[object_type])
]
# Check if the host is already in the desired group
if group_name not in [group["name"] for group in relevant_groups]:
group = inventory.get_group(group_name)
if group:
inventory.associate_host_group(host["id"], group["id"])
logger.info(f"Host {host['name']} associated with group {group_name}.")
@retry_on_failure('disassociate_removed_groups')
def disassociate_removed_groups(inventory, host, instance, current_groups):
"""
Disassociates the host from AWX groups that are no longer relevant.
Disassociates removed groups from a host in AWX inventory.
"""
# Collect the current groups that should remain (based on the instance's attributes)
valid_group_names = set()
if instance.site:
if hasattr(instance, 'site') and instance.site:
valid_group_names.add(f"{group_prefixes[Site]}{instance.site.slug.replace('-', '_')}")
if instance.role:
if hasattr(instance, 'role') and instance.role:
valid_group_names.add(f"{group_prefixes[DeviceRole]}{instance.role.slug.replace('-', '_')}")
if isinstance(instance, Device) and instance.device_type:
valid_group_names.add(f"{group_prefixes[DeviceType]}{instance.device_type.slug.replace('-', '_')}")
if tags := instance.tags.all():
if hasattr(instance, 'tags'):
tags = instance.tags.all()
valid_group_names.update(
f"{group_prefixes[Tag]}{tag.slug.replace('-', '_')}" for tag in tags
)
# Disassociate from groups that are no longer valid
for group in current_groups:
if group["name"] not in valid_group_names:
inventory.disassociate_host_group(host["id"], group["id"])
logger.info(f"Host {host['name']} disassociated from group {group['name']}.")
@retry_on_failure('delete_host')
def delete_host(inventory, sender, instance):
serializer = serializers[sender](instance)
"""
Deletes a host from AWX inventory.
"""
serializer_class = serializers_dict[sender]
serializer = serializer_class(instance)
inventory.delete_host(serializer.data["name"])
@retry_on_failure('sync_group')
def sync_group(inventory, sender, instance):
serializer = serializers[sender](instance)
"""
Synchronizes a group in AWX inventory.
"""
serializer_class = serializers_dict[sender]
serializer = serializer_class(instance)
group = inventory.get_group(serializer.data["name"])
if group is None:
inventory.create_group(serializer.data)
group = inventory.get_group(serializer.data["name"])
else:
group_serializer = AWXGroupSerializer(data=group)
if not group_serializer.is_valid() or group_serializer.validated_data != serializer.data:
print("Group will be updated due to:")
print(group_serializer.validated_data)
print(serializer.data)
inventory.update_group(serializer.data["name"], serializer.data)
if group_serializer.is_valid():
if group_serializer.validated_data != serializer.data:
inventory.update_group(serializer.data["name"], serializer.data)
else:
logger.error(f"Invalid data for group_serializer: {group_serializer.errors}")
@retry_on_failure('delete_group')
def delete_group(inventory, sender, instance):
serializer = serializers[sender](instance)
"""
Deletes a group from AWX inventory.
"""
serializer_class = serializers_dict[sender]
serializer = serializer_class(instance)
inventory.delete_group(serializer.data["name"])
def sync_all(job):
"""
Performs a full synchronization of the AWX inventory.
"""
inventory = job.object
logger.info("Performing full inventory sync for inventory {}".format(inventory.inventory_id))
logger.info(f"Performing full inventory sync for inventory {inventory.inventory_id}")
job.start()
for site in Site.objects.all():
sync_group(inventory, Site, site)
......@@ -274,9 +209,9 @@ def sync_all(job):
sync_group(inventory, Tag, tag)
for device in Device.objects.all():
if not device.primary_ip4 is None and device.primary_ip4.dns_name:
if device.primary_ip4 and device.primary_ip4.dns_name:
sync_host(inventory, Device, device)
for vm in VirtualMachine.objects.all():
if not vm.primary_ip4 is None and vm.primary_ip4.dns_name:
if vm.primary_ip4 and vm.primary_ip4.dns_name:
sync_host(inventory, VirtualMachine, vm)
job.terminate()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment