From e000c2f06486960b1192fde0a904e5a55d2a1498 Mon Sep 17 00:00:00 2001
From: Benjamin Bertrand <benjamin.bertrand@esss.se>
Date: Fri, 8 Mar 2019 23:01:10 +0100
Subject: [PATCH] Trigger AWX inventory update on database change

A job running in AWX prevents an inventory sync (of the same
inventory). This restricts AWX to run only one job at a time as
the inventory is always updated on launch.
To prevent this issue, the inventory is updated from AWX when needed.

- The update is triggered on any model modification that is used in the
inventory (AnsibleGroup, Cname, Domain, Host, Network, NetworkScope)
- For Interface, the inventory sync is triggered by the
trigger_core_services function as it depends on it
- The inventory update job is put on the "high" queue so that it takes
precedence over all other jobs.

JIRA INFRA-887 #action In Progress
---
 app/models.py                   | 86 ++++++++++++++++++++++++++++-----
 app/settings.py                 |  3 ++
 app/tasks.py                    | 50 +++++++++++++++----
 app/utils.py                    | 60 ++++++++++++++++++-----
 tests/functional/test_models.py |  2 +
 5 files changed, 170 insertions(+), 31 deletions(-)

diff --git a/app/models.py b/app/models.py
index b5df4e8..965bb63 100644
--- a/app/models.py
+++ b/app/models.py
@@ -9,7 +9,6 @@ This module implements the models used in the app.
 :license: BSD 2-Clause, see LICENSE for more details.
 
 """
-import datetime
 import ipaddress
 import string
 import qrcode
@@ -39,6 +38,10 @@ from .validators import (
 from . import utils, search
 
 
+# Number of minutes to wait before to consider a deferrred job lost
+WAITING_DELAY = 30
+
+
 make_versioned(plugins=[FlaskUserPlugin()])
 
 
@@ -354,24 +357,45 @@ class User(db.Model, UserMixin):
 
         Waiting means:
             - queued
-            - deferred if not older than 30 minutes
+            - deferred if not older than WAITING_DELAY minutes
 
         A deferred task can stay deferred forever if the task it depends on fails.
         """
-        thirty_minutes_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30)
         count = (
             Task.query.filter_by(name=name)
             .filter(
                 (Task.status == JobStatus.QUEUED)
                 | (
                     (Task.status == JobStatus.DEFERRED)
-                    & (Task.created_at > thirty_minutes_ago)
+                    & (Task.created_at > utils.minutes_ago(WAITING_DELAY))
                 )
             )
             .count()
         )
         return count > 0
 
+    def get_task_waiting(self, name):
+        """Return the latest <name> task currently waiting or None
+
+        Waiting means:
+            - queued
+            - deferred if not older than WAITING_DELAY minutes
+
+        A deferred task can stay deferred forever if the task it depends on fails.
+        """
+        return (
+            Task.query.filter_by(name=name)
+            .filter(
+                (Task.status == JobStatus.QUEUED)
+                | (
+                    (Task.status == JobStatus.DEFERRED)
+                    & (Task.created_at > utils.minutes_ago(WAITING_DELAY))
+                )
+            )
+            .order_by(Task.created_at.desc())
+            .first()
+        )
+
     def __str__(self):
         return self.username
 
@@ -1626,6 +1650,8 @@ class Task(db.Model):
             route = "jobs/playbook"
         elif self.awx_resource == "workflow_job":
             route = "workflows"
+        elif self.awx_resource == "inventory_source":
+            route = "jobs/inventory"
         else:
             current_app.logger.warning(f"Unknown AWX resource: {self.awx_resource}")
             return None
@@ -1652,13 +1678,10 @@ class Task(db.Model):
         }
 
 
-@sa.event.listens_for(db.session, "before_flush")
-def before_flush(session, flush_context, instances):
-    """Before flush hook
-
-    Used to trigger core services update on any Interface modification.
+def trigger_core_services_update(session):
+    """Trigger core services update on any Interface modification.
 
-    See http://docs.sqlalchemy.org/en/latest/orm/session_events.html#before-flush
+    Called by before flush hook
     """
     # In session.dirty, we need to check session.is_modified(instance) because when updating a Host,
     # the interface is added to the session even if not modified.
@@ -1671,7 +1694,48 @@ def before_flush(session, flush_context, instances):
                 or (kind in ("new", "deleted"))
             ):
                 utils.trigger_core_services_update()
-                return
+                return True
+    return False
+
+
+def trigger_inventory_update(session):
+    """Trigger an inventory update in AWX
+
+    Update on any AnsibleGroup/Cname/Domain/Host/Network/NetworkScope
+    modification, but not on Interface as it's triggered with core
+    services update.
+
+    Called by before flush hook
+    """
+    # In session.dirty, we need to check session.is_modified(instance) because the instance
+    # could have been added to the session without being modified
+    # In session.deleted, session.is_modified(instance) is usually False (we shouldn't check it).
+    # In session.new, it will always be True and we don't need to check it.
+    for kind in ("new", "dirty", "deleted"):
+        for instance in getattr(session, kind):
+            if isinstance(
+                instance, (AnsibleGroup, Cname, Domain, Host, Network, NetworkScope)
+            ) and (
+                (kind == "dirty" and session.is_modified(instance))
+                or (kind in ("new", "deleted"))
+            ):
+                utils.trigger_inventory_update()
+                return True
+    return False
+
+
+@sa.event.listens_for(db.session, "before_flush")
+def before_flush(session, flush_context, instances):
+    """Before flush hook
+
+    Used to trigger core services and inventory update.
+
+    See http://docs.sqlalchemy.org/en/latest/orm/session_events.html#before-flush
+    """
+    if trigger_core_services_update(session):
+        # This will also trigger an inventory update
+        return
+    trigger_inventory_update(session)
 
 
 # call configure_mappers after defining all the models
diff --git a/app/settings.py b/app/settings.py
index e3d5916..b0f759b 100644
--- a/app/settings.py
+++ b/app/settings.py
@@ -90,6 +90,9 @@ DOCUMENTATION_URL = "http://ics-infrastructure.pages.esss.lu.se/csentry/index.ht
 CSENTRY_ENVIRONMENT = "staging"
 
 AWX_URL = "https://torn.tn.esss.lu.se"
+# AWX dynamic inventory source to update
+# Use the id because resource.update requires a number
+AWX_INVENTORY_SOURCE = 41
 # AWX job templates
 AWX_CORE_SERVICES_UPDATE = "ics-ans-core @ DHCP test"
 # Shall be set to job or workflow_job
diff --git a/app/tasks.py b/app/tasks.py
index e4631c5..0707379 100644
--- a/app/tasks.py
+++ b/app/tasks.py
@@ -109,7 +109,7 @@ def trigger_vm_creation(
         task_name,
         resource="job",
         queue_name="low",
-        func="launch_job_template",
+        func="launch_awx_job",
         job_template=job_template,
         extra_vars=extra_vars,
         timeout=500,
@@ -119,7 +119,7 @@ def trigger_vm_creation(
             "trigger_post_install_job",
             resource="job",
             queue_name="low",
-            func="launch_job_template",
+            func="launch_awx_job",
             job_template=post_job_template,
             limit=f"{host.fqdn}",
             depends_on=task.id,
@@ -150,7 +150,7 @@ def trigger_ztp_configuration(host):
         "trigger_ztp_configuration",
         resource="job",
         queue_name="low",
-        func="launch_job_template",
+        func="launch_awx_job",
         job_template=job_template,
         extra_vars=extra_vars,
         timeout=500,
@@ -158,15 +158,21 @@ def trigger_ztp_configuration(host):
     return task
 
 
-def launch_job_template(job_template, resource="job", **kwargs):
-    """Launch an AWX job or workflow job
+def launch_awx_job(resource="job", **kwargs):
+    """Launch an AWX job
 
-    :param job_template: name or id of the job template
-    :param resource: job|workflow_job
+    job_template or inventory_source shall be passed as keyword argument
+
+    :param resource: job|workflow_job|inventory_source
     :param **kwargs: keyword arguments passed to launch the job
     :returns: A dictionary with information from resource.monitor
     """
     rq_job = get_current_job()
+    job_template = kwargs.pop("job_template", None)
+    inventory_source = kwargs.pop("inventory_source", None)
+    if job_template is None and inventory_source is None:
+        current_app.logger.warning("No job_template nor inventory_source passed!")
+        return "No job_template nor inventory_source passed!"
     if job_template in (
         current_app.config["AWX_CREATE_VIOC"],
         current_app.config["AWX_CREATE_VM"],
@@ -177,8 +183,34 @@ def launch_job_template(job_template, resource="job", **kwargs):
         current_app.logger.info("AWX job is disabled. Not sending any request.")
         return "AWX job not triggered"
     # Launch the AWX job
-    resource = tower_cli.get_resource(resource)
-    result = resource.launch(job_template, **kwargs)
+    tower_resource = tower_cli.get_resource(resource)
+    if resource == "inventory_source":
+        result = tower_resource.update(inventory_source, **kwargs)
+    else:
+        result = tower_resource.launch(job_template, **kwargs)
+    # Save the AWX job id in the task
+    task = models.Task.query.get(rq_job.id)
+    task.awx_job_id = result["id"]
+    db.session.commit()
+    # Monitor the job until done
+    result = tower_resource.monitor(pk=result["id"])
+    return result
+
+
+def update_inventory(inventory_source, **kwargs):
+    """Update an inventory source in AWX
+
+    :param inventory_source: name or id of the inventory source to be updated
+    :param **kwargs: keyword arguments passed to update the inventory
+    :returns: A dictionary with information from resource.monitor
+    """
+    rq_job = get_current_job()
+    if not current_app.config.get("AWX_JOB_ENABLED", False):
+        current_app.logger.info("AWX job is disabled. Not sending any request.")
+        return "AWX inventory update not triggered"
+    # Launch the AWX job
+    resource = tower_cli.get_resource("inventory_source")
+    result = resource.update(inventory_source, **kwargs)
     # Save the AWX job id in the task
     task = models.Task.query.get(rq_job.id)
     task.awx_job_id = result["id"]
diff --git a/app/utils.py b/app/utils.py
index 803e69f..29740a0 100644
--- a/app/utils.py
+++ b/app/utils.py
@@ -236,28 +236,61 @@ def trigger_core_services_update():
 
     This function should be called every time an interface is created/edited
 
-    We can have one running job + one in queue to apply the latest changes.
-    Make sure that we don't have more than one in queue.
+    Always trigger an inventory update first.
     """
+    # Start by triggering an inventory update
+    inventory_task = trigger_inventory_update()
+    # If there is already a core service update in queue, we could probably skip starting a new
+    # one but we would have to be sure that the inventory sync it depends on didn't already
+    # start
+    # Without this optimization, we might trigger a job that won't change anything.
+    # If jobs can run in parallel this shouldn't be an issue.
     job_template = current_app.config["AWX_CORE_SERVICES_UPDATE"]
     resource = current_app.config.get("AWX_CORE_SERVICES_UPDATE_RESOURCE", "job")
-    if current_user.is_task_waiting("trigger_core_services_update"):
-        current_app.logger.info(
-            'Already one "trigger_core_services_update" task waiting. No need to trigger a new one.'
-        )
-        return None
     kwargs = {
-        "func": "launch_job_template",
+        "func": "launch_awx_job",
         "job_template": job_template,
         "resource": resource,
+        "depends_on": inventory_task.id,
+    }
+    current_app.logger.info(f"Launch new job to update core services: {job_template}")
+    task = current_user.launch_task(
+        "trigger_core_services_update", queue_name="normal", **kwargs
+    )
+    return task
+
+
+def trigger_inventory_update():
+    """Trigger a job to update the inventory in AWX
+
+    This function should be called every time something impacting the inventory is updated
+    (AnsibleGroup, Host, Interface, Network...)
+
+    We can have one running job + one in queue to apply the latest changes.
+    Make sure that we don't have more than one in queue.
+    """
+    inventory_source = current_app.config["AWX_INVENTORY_SOURCE"]
+    waiting_task = current_user.get_task_waiting("trigger_inventory_update")
+    if waiting_task is not None:
+        current_app.logger.info(
+            'Already one "trigger_inventory_update" task waiting. No need to trigger a new one.'
+        )
+        return waiting_task
+    kwargs = {
+        "func": "launch_awx_job",
+        "resource": "inventory_source",
+        "inventory_source": inventory_source,
     }
-    started = current_user.get_task_started("trigger_core_services_update")
+    started = current_user.get_task_started("trigger_inventory_update")
     if started:
         # There is already one running task. Trigger a new one when it's done.
         kwargs["depends_on"] = started.id
-    current_app.logger.info(f"Launch new job to update core services: {job_template}")
+    current_app.logger.info(
+        f"Launch new job to update the inventory: {inventory_source}"
+    )
+    # Put it on the "high" queue so that it has higher priority than all other jobs
     task = current_user.launch_task(
-        "trigger_core_services_update", queue_name="normal", **kwargs
+        "trigger_inventory_update", queue_name="high", **kwargs
     )
     return task
 
@@ -336,3 +369,8 @@ def retrieve_data_for_datatables(values, model):
         "data": data,
     }
     return jsonify(response)
+
+
+def minutes_ago(minutes):
+    """Return the datetime x minutes ago"""
+    return datetime.datetime.utcnow() - datetime.timedelta(minutes=minutes)
diff --git a/tests/functional/test_models.py b/tests/functional/test_models.py
index 18c491c..71ebc48 100644
--- a/tests/functional/test_models.py
+++ b/tests/functional/test_models.py
@@ -621,3 +621,5 @@ def test_task_awx_job_url(db, task_factory):
     assert task3.awx_job_url is None
     task4 = task_factory(awx_job_id=45)
     assert task4.awx_job_url is None
+    task5 = task_factory(awx_resource="inventory_source", awx_job_id=12)
+    assert task5.awx_job_url == "https://awx.example.org/#/jobs/inventory/12"
-- 
GitLab