From d9516a987bdd3e907613937493a9a685bdce0681 Mon Sep 17 00:00:00 2001 From: Benjamin Bertrand <benjamin.bertrand@esss.se> Date: Mon, 9 Jul 2018 12:11:11 +0200 Subject: [PATCH] Use sqlalchemy events hook to trigger tasks Use the before_flush hook to trigger the core services update. It's much nicer than having to call a method in several places. The core services update is triggered on creation/modification/deletion of an Interface. In the before_flush we can manipulate the session (add objects) but we shouldn't call commit. The db.session.commit() was removed from the launch_task method. --- app/models.py | 23 +++++++++++++++++++++-- app/network/views.py | 5 +---- app/tasks.py | 25 ------------------------- app/utils.py | 25 +++++++++++++++++++++++++ 4 files changed, 47 insertions(+), 31 deletions(-) diff --git a/app/models.py b/app/models.py index 80ccc1c..ed36729 100644 --- a/app/models.py +++ b/app/models.py @@ -12,6 +12,7 @@ This module implements the models used in the app. import ipaddress import string import qrcode +import itertools import urllib.parse import sqlalchemy as sa from enum import Enum @@ -243,7 +244,10 @@ class User(db.Model, UserMixin): return [favorite for favorites in favorites_list for favorite in favorites] def launch_task(self, name, func, *args, **kwargs): - """Launch a task in the background using RQ""" + """Launch a task in the background using RQ + + The task is added to the session but not committed. + """ q = Queue() job = q.enqueue(f"app.tasks.{func}", *args, **kwargs) # The status will be set to QUEUED or DEFERRED @@ -255,7 +259,6 @@ class User(db.Model, UserMixin): user=self, ) db.session.add(task) - db.session.commit() return task def get_tasks(self, all=False): @@ -1097,6 +1100,22 @@ class Task(db.Model): } +@sa.event.listens_for(db.session, "before_flush") +def before_flush(session, flush_context, instances): + """Before flush hook + + Used to trigger core services update on any Interface modification. + + See http://docs.sqlalchemy.org/en/latest/orm/session_events.html#before-flush + """ + for instance in itertools.chain(session.new, session.dirty, session.deleted): + if isinstance(instance, Interface): + # In session.dirty, we could check session.is_modified(instance) + # but it's always True due to the updated_at column. + utils.trigger_core_services_update() + break + + # call configure_mappers after defining all the models # required by sqlalchemy_continuum sa.orm.configure_mappers() diff --git a/app/network/views.py b/app/network/views.py index 9cbbbd4..c0373ed 100644 --- a/app/network/views.py +++ b/app/network/views.py @@ -94,7 +94,6 @@ def create_host(): flash(f"{e}", "error") else: flash(f"Host {host} created!", "success") - tasks.trigger_core_services_update() # Save network_id to the session to retrieve it after the redirect session["network_id"] = network_id return redirect(url_for("network.view_host", name=host.name)) @@ -120,6 +119,7 @@ def view_host(name): task = tasks.trigger_vm_creation( name, interface, int(form.memory.data) * 1000, form.cores.data ) + db.session.commit() current_app.logger.info(f"Creation of {name} requested: task {task.id}") flash( f"Creation of {name} requested! Refresh the page to update the status.", @@ -147,7 +147,6 @@ def edit_host(name): flash(f"{e}", "error") else: flash(f"Host {host} updated!", "success") - tasks.trigger_core_services_update() return redirect(url_for("network.view_host", name=host.name)) return render_template("network/edit_host.html", form=form) @@ -187,7 +186,6 @@ def create_interface(hostname): flash(f"{e}", "error") else: flash(f"Host {interface} created!", "success") - tasks.trigger_core_services_update() return redirect(url_for("network.create_interface", hostname=hostname)) return render_template( "network/create_interface.html", form=form, hostname=hostname @@ -261,7 +259,6 @@ def edit_interface(name): flash(f"{e}", "error") else: flash(f"Interface {interface} updated!", "success") - tasks.trigger_core_services_update() return redirect(url_for("network.view_host", name=interface.host.name)) return render_template( "network/edit_interface.html", form=form, hostname=interface.host.name diff --git a/app/tasks.py b/app/tasks.py index 3c8c69e..97d938a 100644 --- a/app/tasks.py +++ b/app/tasks.py @@ -118,31 +118,6 @@ def trigger_vm_creation(name, interface, memory, cores): return task -def trigger_core_services_update(): - """Trigger a job to update the core services on the TN (DNS and DHCP) - - This function should be called every time an host or interface is created/edited - - We can have one running job + one in queue to apply the latest changes. - Make sure that we don't have more than one in queue. - """ - job_template = current_app.config["AWX_CORE_SERVICES_UPDATE"] - user = utils.cse_current_user() - if user.is_task_waiting("trigger_core_services_update"): - current_app.logger.info( - 'Already one "trigger_core_services_update" task waiting. No need to trigger a new one.' - ) - return None - kwargs = {"func": "launch_job_template", "job_template": job_template} - started = user.get_task_started("trigger_core_services_update") - if started: - # There is already one running task. Trigger a new one when it's done. - kwargs["depends_on"] = started.id - current_app.logger.info(f"Launch new job to update core services: {job_template}") - task = user.launch_task("trigger_core_services_update", **kwargs) - return task - - def launch_job_template(job_template, **kwargs): rq_job = get_current_job() if job_template in ( diff --git a/app/utils.py b/app/utils.py index 4493ae5..758e4d4 100644 --- a/app/utils.py +++ b/app/utils.py @@ -223,3 +223,28 @@ def format_datetime(value, format="%Y-%m-%d %H:%M"): Function used as a jinja2 filter """ return value.strftime(format) + + +def trigger_core_services_update(): + """Trigger a job to update the core services on the TN (DNS and DHCP) + + This function should be called every time an interface is created/edited + + We can have one running job + one in queue to apply the latest changes. + Make sure that we don't have more than one in queue. + """ + job_template = current_app.config["AWX_CORE_SERVICES_UPDATE"] + user = cse_current_user() + if user.is_task_waiting("trigger_core_services_update"): + current_app.logger.info( + 'Already one "trigger_core_services_update" task waiting. No need to trigger a new one.' + ) + return None + kwargs = {"func": "launch_job_template", "job_template": job_template} + started = user.get_task_started("trigger_core_services_update") + if started: + # There is already one running task. Trigger a new one when it's done. + kwargs["depends_on"] = started.id + current_app.logger.info(f"Launch new job to update core services: {job_template}") + task = user.launch_task("trigger_core_services_update", **kwargs) + return task -- GitLab