From 6171c9f766e3b5461d1af8ce37af2a69b49eb4dc Mon Sep 17 00:00:00 2001 From: Fahrudin Halilovic <fahrudin.halilovic@ess.eu> Date: Mon, 21 Oct 2024 14:46:08 +0200 Subject: [PATCH] INFRA-10659: Implement self-healing mechanism for Netbox jobs --- netbox_awx_plugin/serializers.py | 151 +++++++++++++ netbox_awx_plugin/synchronization.py | 323 +++++++++++---------------- 2 files changed, 280 insertions(+), 194 deletions(-) create mode 100644 netbox_awx_plugin/serializers.py diff --git a/netbox_awx_plugin/serializers.py b/netbox_awx_plugin/serializers.py new file mode 100644 index 0000000..6426b63 --- /dev/null +++ b/netbox_awx_plugin/serializers.py @@ -0,0 +1,151 @@ +import json +from rest_framework import serializers +from dcim.models import Device, DeviceRole, DeviceType, Interface, Site +from ipam.models import Prefix, IPAddress +from virtualization.models import VirtualMachine, VMInterface +from extras.models import Tag +from dcim.choices import DeviceStatusChoices +from virtualization.choices import VirtualMachineStatusChoices + +group_prefixes = { + Site: "site_", + DeviceRole: "devicerole_", + DeviceType: "devicetype_", + Prefix: "prefix_", + Tag: "tag_" +} + +class TagSerializer(serializers.BaseSerializer): + def to_representation(self, instance): + return { + "name": f"{group_prefixes[Tag]}{instance.slug.replace('-', '_')}", + "variables": json.dumps({"netbox_tag_name": instance.name}), + } + +class SiteSerializer(serializers.BaseSerializer): + def to_representation(self, instance): + return { + "name": f"{group_prefixes[Site]}{instance.slug.replace('-', '_')}", + "variables": json.dumps( + { + "netbox_site_status": instance.status, + } + ), + } + +class DeviceRoleSerializer(serializers.BaseSerializer): + def to_representation(self, instance): + return { + "name": f"{group_prefixes[DeviceRole]}{instance.slug.replace('-', '_')}", + "variables": json.dumps({}) + } + +class DeviceTypeSerializer(serializers.BaseSerializer): + def to_representation(self, instance): + return { + "name": f"{group_prefixes[DeviceType]}{instance.slug.replace('-', '_')}", + "description": instance.description, + "variables": json.dumps( + { + "netbox_devicetype_model": instance.model, + } + ), + } + +class PrefixSerializer(serializers.BaseSerializer): + def to_representation(self, instance): + prefix_str = str(instance.prefix).replace(".", "_").replace("/", "_") + return { + "name": f"{group_prefixes[Prefix]}{prefix_str}", + "variables": json.dumps({"netbox_prefix": str(instance.prefix)}), + } + +class InterfaceSerializer(serializers.BaseSerializer): + def to_representation(self, instance): + ip_addresses = [] + for ip in instance.ip_addresses.all(): + serializer = IPAddressSerializer(ip) + ip_addresses.append(serializer.data) + return { + "name": instance.name, + "mac": instance.mac_address, + "ip_addresses": ip_addresses + } + +class IPAddressSerializer(serializers.BaseSerializer): + def to_representation(self, instance): + return { + "address": str(instance.address), + "dns_name": instance.dns_name + } + +class DeviceSerializer(serializers.BaseSerializer): + def to_representation(self, instance): + variables = {} + variables["netbox_interfaces"] = [] + for interface in instance.interfaces.all(): + serializer = InterfaceSerializer(interface) + variables["netbox_interfaces"].append(serializer.data) + return { + "name": getattr(instance.primary_ip4, 'dns_name', instance.name), + "description": instance.description, + "enabled": instance.status == DeviceStatusChoices.STATUS_ACTIVE, + "variables": json.dumps(variables), + } + +class VMInterfaceSerializer(serializers.BaseSerializer): + def to_representation(self, instance): + ip_addresses = [] + for ip in instance.ip_addresses.all(): + serializer = IPAddressSerializer(ip) + ip_addresses.append(serializer.data) + return { + "name": instance.name, + "mac": instance.mac_address, + "ip_addresses": ip_addresses + } + +class VMSerializer(serializers.BaseSerializer): + def to_representation(self, instance): + variables = { + "netbox_virtualmachine_name": instance.name, + "netbox_virtualmachine_vcpus": float(instance.vcpus) if instance.vcpus is not None else 0.0, + "netbox_virtualmachine_memory": instance.memory or 0, + "netbox_virtualmachine_disk": instance.disk or 0, + } + variables["netbox_interfaces"] = [] + for interface in instance.interfaces.all(): + serializer = VMInterfaceSerializer(interface) + variables["netbox_interfaces"].append(serializer.data) + return { + "name": getattr(instance.primary_ip4, 'dns_name', instance.name), + "description": instance.description, + "enabled": instance.status == VirtualMachineStatusChoices.STATUS_ACTIVE, + "variables": json.dumps(variables), + } + +class AWXHostSerializer(serializers.BaseSerializer): + def to_internal_value(self, data): + return { + "name": data.get('name'), + "description": data.get('description'), + "enabled": data.get('enabled'), + "variables": data.get('variables') + } + +class AWXGroupSerializer(serializers.BaseSerializer): + def to_internal_value(self, data): + return { + "name": data.get('name'), + "variables": data.get('variables') + } + +serializers_dict = { + Site: SiteSerializer, + DeviceRole: DeviceRoleSerializer, + DeviceType: DeviceTypeSerializer, + Prefix: PrefixSerializer, + Device: DeviceSerializer, + VirtualMachine: VMSerializer, + Tag: TagSerializer +} diff --git a/netbox_awx_plugin/synchronization.py b/netbox_awx_plugin/synchronization.py index 3a12cbe..7b90ba5 100644 --- a/netbox_awx_plugin/synchronization.py +++ b/netbox_awx_plugin/synchronization.py @@ -1,268 +1,203 @@ +import logging +from datetime import timedelta from dcim.models import Device, DeviceRole, DeviceType, Site from ipam.models import Prefix -from dcim.choices import DeviceStatusChoices -from virtualization.choices import VirtualMachineStatusChoices from virtualization.models import VirtualMachine from extras.models import Tag -from rest_framework import serializers -import json -import logging from core.models import Job from .models import AWXInventory - +from .serializers import ( + serializers_dict, + AWXHostSerializer, + AWXGroupSerializer, +) +from django_rq import get_queue +from rq import get_current_job, Queue +from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout +from urllib3.exceptions import InsecureRequestWarning +import urllib3 + +# Set up logging logger = logging.getLogger(__name__) -group_prefixes = {Site: "site_", DeviceRole: "devicerole_", DeviceType: "devicetype_", Prefix: "prefix_", Tag:"tag_"} - - -class TagSerializer(serializers.BaseSerializer): - def to_representation(self, instance): - return { - "name": "{}{}".format(group_prefixes[Tag], instance.slug.replace("-", "_")), - "variables": json.dumps({"netbox_tag_name": instance.name}), - } - -class SiteSerializer(serializers.BaseSerializer): - def to_representation(self, instance): - return { - "name": "{}{}".format(group_prefixes[Site], instance.slug.replace("-", "_")), - "variables": json.dumps( - { - "netbox_site_status": instance.status, - } - ), - } - - -class DeviceRoleSerializer(serializers.BaseSerializer): - def to_representation(self, instance): - return { - "name": "{}{}".format(group_prefixes[DeviceRole], instance.slug.replace("-", "_")), - "variables": json.dumps({}) - } - - -class DeviceTypeSerializer(serializers.BaseSerializer): - def to_representation(self, instance): - return { - "name": "{}{}".format(group_prefixes[DeviceType], instance.slug.replace("-", "_")), - "description": instance.description, - "variables": json.dumps( - { - "netbox_devicetype_model": instance.model, - } - ), - } - - -class PrefixSerializer(serializers.BaseSerializer): - def to_representation(self, instance): - return { - "name": "{}{}".format(group_prefixes[Prefix], str(instance.prefix).replace(".", "_").replace("/", "_")), - "variables": json.dumps({"netbox_prefix": "{}".format(str(instance.prefix))}), - } - - -class InterfaceSerializer(serializers.BaseSerializer): - def to_representation(self, instance): - ip_addresses = [] - for ip in instance.ip_addresses.all(): - serializer = IPAddressSerializer(ip) - ip_addresses.append(serializer.data) - return { - "name": instance.name, - "mac": str(instance.mac_address) if instance.mac_address else None, - "ip_addresses": ip_addresses - } - - -class IPAddressSerializer(serializers.BaseSerializer): - def to_representation(self, instance): - return { - "address": str(instance.address), - "dns_name": instance.dns_name - } - - -class DeviceSerializer(serializers.BaseSerializer): - def to_representation(self, instance): - variables = {} - variables["netbox_interfaces"] = [] - for interface in instance.interfaces.all(): - serializer = InterfaceSerializer(interface) - variables["netbox_interfaces"].append(serializer.data) - return { - "name": getattr(instance.primary_ip4, 'dns_name', instance.name), - "description": instance.description, - "enabled": instance.status == DeviceStatusChoices.STATUS_ACTIVE, - "variables": json.dumps(variables), - } - - -class VMInterfaceSerializer(serializers.BaseSerializer): - def to_representation(self, instance): - ip_addresses = [] - for ip in instance.ip_addresses.all(): - serializer = IPAddressSerializer(ip) - ip_addresses.append(serializer.data) - return { - "name": instance.name, - "mac": str(instance.mac_address) if instance.mac_address else None, - "ip_addresses": ip_addresses - } - - -class VMSerializer(serializers.BaseSerializer): - def to_representation(self, instance): - variables = { - "netbox_virtualmachine_name": instance.name, - "netbox_virtualmachine_vcpus": float(instance.vcpus) if instance.vcpus is not None else 0.0, - "netbox_virtualmachine_memory": instance.memory or 0, - "netbox_virtualmachine_disk": instance.disk or 0, - } - variables["netbox_interfaces"] = [] - for interface in instance.interfaces.all(): - serializer = VMInterfaceSerializer(interface) - variables["netbox_interfaces"].append(serializer.data) - return { - "name": getattr(instance.primary_ip4, 'dns_name', instance.name), - "description": instance.description, - "enabled": instance.status == VirtualMachineStatusChoices.STATUS_ACTIVE, - "variables": json.dumps(variables), - } - - -class AWXHostSerializer(serializers.BaseSerializer): - def to_internal_value(self, data): - return { - "name": data.get('name'), - "description": data.get('description'), - "enabled": data.get('enabled'), - "variables": data.get('variables') - } - - -class AWXGroupSerializer(serializers.BaseSerializer): - def to_internal_value(self, data): - return { - "name": data.get('name'), - "variables": data.get('variables') - } +# Suppress only the single InsecureRequestWarning from urllib3 +urllib3.disable_warnings(InsecureRequestWarning) +# Configuration for retry mechanism +MAX_RETRIES = 5 +INITIAL_DELAY = 1 +BACKOFF_FACTOR = 2 -serializers = { - Site: SiteSerializer, - DeviceRole: DeviceRoleSerializer, - DeviceType: DeviceTypeSerializer, - Prefix: PrefixSerializer, - Device: DeviceSerializer, - VirtualMachine: VMSerializer, - Tag: TagSerializer +def reschedule_job(job, delay_seconds, attempt): + """ + Reschedules the current job with the same job ID and the specified delay. + """ + try: + job.meta['attempt'] = attempt + job.save_meta() + + # Remove the job from its current queue and re-add it with a delay + queue = Queue(job.origin, connection=job.connection) + queue.enqueue_in(timedelta(seconds=delay_seconds), job.func, *job.args, **job.kwargs, job_id=job.id, meta=job.meta) + + logger.info(f"Job {job.id} rescheduled with delay of {delay_seconds} seconds for attempt {attempt}.") + except Exception as e: + logger.error(f"Failed to reschedule job {job.id}: {e}") + raise + +def retry_on_failure(task_name): + """ + A decorator that retries a function if specific network-related exceptions occur. + It retries up to MAX_RETRIES times, and if all attempts fail, it raises an exception + so that Django RQ can mark the job as failed. + """ + def decorator(func): + def wrapper(*args, **kwargs): + job = get_current_job() + attempt = job.meta.get('attempt', 1) + + try: + return func(*args, **kwargs) + except (RequestException, HTTPError, ConnectionError, Timeout) as e: + logger.error(f"Error occurred during {task_name} on attempt {attempt}: {e}") + if attempt < MAX_RETRIES: + delay_seconds = INITIAL_DELAY * (BACKOFF_FACTOR ** (attempt - 1)) + job.timeout = delay_seconds + 60 # Adjust the timeout for the next attempt + reschedule_job(job, delay_seconds, attempt + 1) + else: + logger.error(f"Task {task_name} failed on the last attempt: {e}") + # Let the exception propagate so the job is marked as failed + raise + return wrapper + return decorator + +group_prefixes = { + Site: "site_", + DeviceRole: "devicerole_", + DeviceType: "devicetype_", + Prefix: "prefix_", + Tag: "tag_" } - +@retry_on_failure('sync_host') def sync_host(inventory, sender, instance): - serializer = serializers[sender](instance) + """ + Synchronizes a host in AWX inventory. + """ + serializer_class = serializers_dict[sender] + serializer = serializer_class(instance) host = inventory.get_host(serializer.data["name"]) if host is None: - # If the host doesn't exist, create it. inventory.create_host(serializer.data) host = inventory.get_host(serializer.data["name"]) else: - # If the host exists, update it if necessary. host_serializer = AWXHostSerializer(data=host) - if not host_serializer.is_valid() or host_serializer.validated_data != serializer.data: - inventory.update_host(host["id"], serializer.data) - - # Sync group associations + if host_serializer.is_valid(): + if host_serializer.validated_data != serializer.data: + inventory.update_host(host["id"], serializer.data) + else: + logger.error(f"Invalid data for host_serializer: {host_serializer.errors}") + current_groups = host["summary_fields"]["groups"]["results"] - - # Handle associations for each type (Site, DeviceRole, DeviceType, and Tags) - if instance.site: + + if hasattr(instance, 'site') and instance.site: sync_host_group_association(inventory, host, Site, instance.site, current_groups) - if instance.role: + if hasattr(instance, 'role') and instance.role: sync_host_group_association(inventory, host, DeviceRole, instance.role, current_groups) if isinstance(instance, Device) and instance.device_type: sync_host_group_association(inventory, host, DeviceType, instance.device_type, current_groups) - if tags := instance.tags.all(): + if hasattr(instance, 'tags'): + tags = instance.tags.all() for tag in tags: sync_host_group_association(inventory, host, Tag, tag, current_groups) - # Disassociate groups that are no longer relevant disassociate_removed_groups(inventory, host, instance, current_groups) - +@retry_on_failure('sync_host_group_association') def sync_host_group_association(inventory, host, object_type, instance, current_groups): """ - Associates the host with the appropriate AWX group. + Synchronizes host group associations in AWX inventory. """ - serializer = serializers[object_type](instance) + serializer_class = serializers_dict[object_type] + serializer = serializer_class(instance) group_name = serializer.data["name"] - - # Filter out the groups that match the current object_type (e.g., Site, Tag) + relevant_groups = [ group for group in current_groups if group["name"].startswith(group_prefixes[object_type]) ] - - # Check if the host is already in the desired group + if group_name not in [group["name"] for group in relevant_groups]: group = inventory.get_group(group_name) if group: inventory.associate_host_group(host["id"], group["id"]) logger.info(f"Host {host['name']} associated with group {group_name}.") - +@retry_on_failure('disassociate_removed_groups') def disassociate_removed_groups(inventory, host, instance, current_groups): """ - Disassociates the host from AWX groups that are no longer relevant. + Disassociates removed groups from a host in AWX inventory. """ - # Collect the current groups that should remain (based on the instance's attributes) valid_group_names = set() - - if instance.site: + + if hasattr(instance, 'site') and instance.site: valid_group_names.add(f"{group_prefixes[Site]}{instance.site.slug.replace('-', '_')}") - if instance.role: + if hasattr(instance, 'role') and instance.role: valid_group_names.add(f"{group_prefixes[DeviceRole]}{instance.role.slug.replace('-', '_')}") if isinstance(instance, Device) and instance.device_type: valid_group_names.add(f"{group_prefixes[DeviceType]}{instance.device_type.slug.replace('-', '_')}") - if tags := instance.tags.all(): + if hasattr(instance, 'tags'): + tags = instance.tags.all() valid_group_names.update( f"{group_prefixes[Tag]}{tag.slug.replace('-', '_')}" for tag in tags ) - # Disassociate from groups that are no longer valid + for group in current_groups: if group["name"] not in valid_group_names: inventory.disassociate_host_group(host["id"], group["id"]) logger.info(f"Host {host['name']} disassociated from group {group['name']}.") +@retry_on_failure('delete_host') def delete_host(inventory, sender, instance): - serializer = serializers[sender](instance) + """ + Deletes a host from AWX inventory. + """ + serializer_class = serializers_dict[sender] + serializer = serializer_class(instance) inventory.delete_host(serializer.data["name"]) - +@retry_on_failure('sync_group') def sync_group(inventory, sender, instance): - serializer = serializers[sender](instance) + """ + Synchronizes a group in AWX inventory. + """ + serializer_class = serializers_dict[sender] + serializer = serializer_class(instance) group = inventory.get_group(serializer.data["name"]) if group is None: inventory.create_group(serializer.data) - group = inventory.get_group(serializer.data["name"]) else: group_serializer = AWXGroupSerializer(data=group) - if not group_serializer.is_valid() or group_serializer.validated_data != serializer.data: - print("Group will be updated due to:") - print(group_serializer.validated_data) - print(serializer.data) - inventory.update_group(serializer.data["name"], serializer.data) - + if group_serializer.is_valid(): + if group_serializer.validated_data != serializer.data: + inventory.update_group(serializer.data["name"], serializer.data) + else: + logger.error(f"Invalid data for group_serializer: {group_serializer.errors}") +@retry_on_failure('delete_group') def delete_group(inventory, sender, instance): - serializer = serializers[sender](instance) + """ + Deletes a group from AWX inventory. + """ + serializer_class = serializers_dict[sender] + serializer = serializer_class(instance) inventory.delete_group(serializer.data["name"]) - def sync_all(job): + """ + Performs a full synchronization of the AWX inventory. + """ inventory = job.object - logger.info("Performing full inventory sync for inventory {}".format(inventory.inventory_id)) + logger.info(f"Performing full inventory sync for inventory {inventory.inventory_id}") job.start() for site in Site.objects.all(): sync_group(inventory, Site, site) @@ -274,9 +209,9 @@ def sync_all(job): sync_group(inventory, Tag, tag) for device in Device.objects.all(): - if not device.primary_ip4 is None and device.primary_ip4.dns_name: + if device.primary_ip4 and device.primary_ip4.dns_name: sync_host(inventory, Device, device) for vm in VirtualMachine.objects.all(): - if not vm.primary_ip4 is None and vm.primary_ip4.dns_name: + if vm.primary_ip4 and vm.primary_ip4.dns_name: sync_host(inventory, VirtualMachine, vm) job.terminate() -- GitLab