diff --git a/app/models.py b/app/models.py index b5df4e86418dcf646aa158b2d8de32fabd0fb276..965bb638b9dce981caf43d112f1af01c82be4188 100644 --- a/app/models.py +++ b/app/models.py @@ -9,7 +9,6 @@ This module implements the models used in the app. :license: BSD 2-Clause, see LICENSE for more details. """ -import datetime import ipaddress import string import qrcode @@ -39,6 +38,10 @@ from .validators import ( from . import utils, search +# Number of minutes to wait before to consider a deferrred job lost +WAITING_DELAY = 30 + + make_versioned(plugins=[FlaskUserPlugin()]) @@ -354,24 +357,45 @@ class User(db.Model, UserMixin): Waiting means: - queued - - deferred if not older than 30 minutes + - deferred if not older than WAITING_DELAY minutes A deferred task can stay deferred forever if the task it depends on fails. """ - thirty_minutes_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) count = ( Task.query.filter_by(name=name) .filter( (Task.status == JobStatus.QUEUED) | ( (Task.status == JobStatus.DEFERRED) - & (Task.created_at > thirty_minutes_ago) + & (Task.created_at > utils.minutes_ago(WAITING_DELAY)) ) ) .count() ) return count > 0 + def get_task_waiting(self, name): + """Return the latest <name> task currently waiting or None + + Waiting means: + - queued + - deferred if not older than WAITING_DELAY minutes + + A deferred task can stay deferred forever if the task it depends on fails. + """ + return ( + Task.query.filter_by(name=name) + .filter( + (Task.status == JobStatus.QUEUED) + | ( + (Task.status == JobStatus.DEFERRED) + & (Task.created_at > utils.minutes_ago(WAITING_DELAY)) + ) + ) + .order_by(Task.created_at.desc()) + .first() + ) + def __str__(self): return self.username @@ -1626,6 +1650,8 @@ class Task(db.Model): route = "jobs/playbook" elif self.awx_resource == "workflow_job": route = "workflows" + elif self.awx_resource == "inventory_source": + route = "jobs/inventory" else: current_app.logger.warning(f"Unknown AWX resource: {self.awx_resource}") return None @@ -1652,13 +1678,10 @@ class Task(db.Model): } -@sa.event.listens_for(db.session, "before_flush") -def before_flush(session, flush_context, instances): - """Before flush hook - - Used to trigger core services update on any Interface modification. +def trigger_core_services_update(session): + """Trigger core services update on any Interface modification. - See http://docs.sqlalchemy.org/en/latest/orm/session_events.html#before-flush + Called by before flush hook """ # In session.dirty, we need to check session.is_modified(instance) because when updating a Host, # the interface is added to the session even if not modified. @@ -1671,7 +1694,48 @@ def before_flush(session, flush_context, instances): or (kind in ("new", "deleted")) ): utils.trigger_core_services_update() - return + return True + return False + + +def trigger_inventory_update(session): + """Trigger an inventory update in AWX + + Update on any AnsibleGroup/Cname/Domain/Host/Network/NetworkScope + modification, but not on Interface as it's triggered with core + services update. + + Called by before flush hook + """ + # In session.dirty, we need to check session.is_modified(instance) because the instance + # could have been added to the session without being modified + # In session.deleted, session.is_modified(instance) is usually False (we shouldn't check it). + # In session.new, it will always be True and we don't need to check it. + for kind in ("new", "dirty", "deleted"): + for instance in getattr(session, kind): + if isinstance( + instance, (AnsibleGroup, Cname, Domain, Host, Network, NetworkScope) + ) and ( + (kind == "dirty" and session.is_modified(instance)) + or (kind in ("new", "deleted")) + ): + utils.trigger_inventory_update() + return True + return False + + +@sa.event.listens_for(db.session, "before_flush") +def before_flush(session, flush_context, instances): + """Before flush hook + + Used to trigger core services and inventory update. + + See http://docs.sqlalchemy.org/en/latest/orm/session_events.html#before-flush + """ + if trigger_core_services_update(session): + # This will also trigger an inventory update + return + trigger_inventory_update(session) # call configure_mappers after defining all the models diff --git a/app/settings.py b/app/settings.py index e3d59165507f3063ec8676f2bad5d1025db0fa0d..b0f759bb95257d1ca389d4236a57e8e0ecc5cc59 100644 --- a/app/settings.py +++ b/app/settings.py @@ -90,6 +90,9 @@ DOCUMENTATION_URL = "http://ics-infrastructure.pages.esss.lu.se/csentry/index.ht CSENTRY_ENVIRONMENT = "staging" AWX_URL = "https://torn.tn.esss.lu.se" +# AWX dynamic inventory source to update +# Use the id because resource.update requires a number +AWX_INVENTORY_SOURCE = 41 # AWX job templates AWX_CORE_SERVICES_UPDATE = "ics-ans-core @ DHCP test" # Shall be set to job or workflow_job diff --git a/app/tasks.py b/app/tasks.py index e4631c5251ee3a55f00efca512abbe98cc9fd206..070737965be6d02f9824efc8fdb1a95e83d561ab 100644 --- a/app/tasks.py +++ b/app/tasks.py @@ -109,7 +109,7 @@ def trigger_vm_creation( task_name, resource="job", queue_name="low", - func="launch_job_template", + func="launch_awx_job", job_template=job_template, extra_vars=extra_vars, timeout=500, @@ -119,7 +119,7 @@ def trigger_vm_creation( "trigger_post_install_job", resource="job", queue_name="low", - func="launch_job_template", + func="launch_awx_job", job_template=post_job_template, limit=f"{host.fqdn}", depends_on=task.id, @@ -150,7 +150,7 @@ def trigger_ztp_configuration(host): "trigger_ztp_configuration", resource="job", queue_name="low", - func="launch_job_template", + func="launch_awx_job", job_template=job_template, extra_vars=extra_vars, timeout=500, @@ -158,15 +158,21 @@ def trigger_ztp_configuration(host): return task -def launch_job_template(job_template, resource="job", **kwargs): - """Launch an AWX job or workflow job +def launch_awx_job(resource="job", **kwargs): + """Launch an AWX job - :param job_template: name or id of the job template - :param resource: job|workflow_job + job_template or inventory_source shall be passed as keyword argument + + :param resource: job|workflow_job|inventory_source :param **kwargs: keyword arguments passed to launch the job :returns: A dictionary with information from resource.monitor """ rq_job = get_current_job() + job_template = kwargs.pop("job_template", None) + inventory_source = kwargs.pop("inventory_source", None) + if job_template is None and inventory_source is None: + current_app.logger.warning("No job_template nor inventory_source passed!") + return "No job_template nor inventory_source passed!" if job_template in ( current_app.config["AWX_CREATE_VIOC"], current_app.config["AWX_CREATE_VM"], @@ -177,8 +183,34 @@ def launch_job_template(job_template, resource="job", **kwargs): current_app.logger.info("AWX job is disabled. Not sending any request.") return "AWX job not triggered" # Launch the AWX job - resource = tower_cli.get_resource(resource) - result = resource.launch(job_template, **kwargs) + tower_resource = tower_cli.get_resource(resource) + if resource == "inventory_source": + result = tower_resource.update(inventory_source, **kwargs) + else: + result = tower_resource.launch(job_template, **kwargs) + # Save the AWX job id in the task + task = models.Task.query.get(rq_job.id) + task.awx_job_id = result["id"] + db.session.commit() + # Monitor the job until done + result = tower_resource.monitor(pk=result["id"]) + return result + + +def update_inventory(inventory_source, **kwargs): + """Update an inventory source in AWX + + :param inventory_source: name or id of the inventory source to be updated + :param **kwargs: keyword arguments passed to update the inventory + :returns: A dictionary with information from resource.monitor + """ + rq_job = get_current_job() + if not current_app.config.get("AWX_JOB_ENABLED", False): + current_app.logger.info("AWX job is disabled. Not sending any request.") + return "AWX inventory update not triggered" + # Launch the AWX job + resource = tower_cli.get_resource("inventory_source") + result = resource.update(inventory_source, **kwargs) # Save the AWX job id in the task task = models.Task.query.get(rq_job.id) task.awx_job_id = result["id"] diff --git a/app/utils.py b/app/utils.py index 803e69fcedc1d359b556de322d41802a64224c2c..29740a00979d07e80c3635c4056eff86e1772300 100644 --- a/app/utils.py +++ b/app/utils.py @@ -236,28 +236,61 @@ def trigger_core_services_update(): This function should be called every time an interface is created/edited - We can have one running job + one in queue to apply the latest changes. - Make sure that we don't have more than one in queue. + Always trigger an inventory update first. """ + # Start by triggering an inventory update + inventory_task = trigger_inventory_update() + # If there is already a core service update in queue, we could probably skip starting a new + # one but we would have to be sure that the inventory sync it depends on didn't already + # start + # Without this optimization, we might trigger a job that won't change anything. + # If jobs can run in parallel this shouldn't be an issue. job_template = current_app.config["AWX_CORE_SERVICES_UPDATE"] resource = current_app.config.get("AWX_CORE_SERVICES_UPDATE_RESOURCE", "job") - if current_user.is_task_waiting("trigger_core_services_update"): - current_app.logger.info( - 'Already one "trigger_core_services_update" task waiting. No need to trigger a new one.' - ) - return None kwargs = { - "func": "launch_job_template", + "func": "launch_awx_job", "job_template": job_template, "resource": resource, + "depends_on": inventory_task.id, + } + current_app.logger.info(f"Launch new job to update core services: {job_template}") + task = current_user.launch_task( + "trigger_core_services_update", queue_name="normal", **kwargs + ) + return task + + +def trigger_inventory_update(): + """Trigger a job to update the inventory in AWX + + This function should be called every time something impacting the inventory is updated + (AnsibleGroup, Host, Interface, Network...) + + We can have one running job + one in queue to apply the latest changes. + Make sure that we don't have more than one in queue. + """ + inventory_source = current_app.config["AWX_INVENTORY_SOURCE"] + waiting_task = current_user.get_task_waiting("trigger_inventory_update") + if waiting_task is not None: + current_app.logger.info( + 'Already one "trigger_inventory_update" task waiting. No need to trigger a new one.' + ) + return waiting_task + kwargs = { + "func": "launch_awx_job", + "resource": "inventory_source", + "inventory_source": inventory_source, } - started = current_user.get_task_started("trigger_core_services_update") + started = current_user.get_task_started("trigger_inventory_update") if started: # There is already one running task. Trigger a new one when it's done. kwargs["depends_on"] = started.id - current_app.logger.info(f"Launch new job to update core services: {job_template}") + current_app.logger.info( + f"Launch new job to update the inventory: {inventory_source}" + ) + # Put it on the "high" queue so that it has higher priority than all other jobs task = current_user.launch_task( - "trigger_core_services_update", queue_name="normal", **kwargs + "trigger_inventory_update", queue_name="high", **kwargs ) return task @@ -336,3 +369,8 @@ def retrieve_data_for_datatables(values, model): "data": data, } return jsonify(response) + + +def minutes_ago(minutes): + """Return the datetime x minutes ago""" + return datetime.datetime.utcnow() - datetime.timedelta(minutes=minutes) diff --git a/tests/functional/test_models.py b/tests/functional/test_models.py index 18c491c9188e903fe291f4de1e035c920f0be87e..71ebc48dcc35f306ee6e21f534ce6de354d23920 100644 --- a/tests/functional/test_models.py +++ b/tests/functional/test_models.py @@ -621,3 +621,5 @@ def test_task_awx_job_url(db, task_factory): assert task3.awx_job_url is None task4 = task_factory(awx_job_id=45) assert task4.awx_job_url is None + task5 = task_factory(awx_resource="inventory_source", awx_job_id=12) + assert task5.awx_job_url == "https://awx.example.org/#/jobs/inventory/12"