Skip to content
Snippets Groups Projects
Commit d9516a98 authored by Benjamin Bertrand's avatar Benjamin Bertrand
Browse files

Use sqlalchemy events hook to trigger tasks

Use the before_flush hook to trigger the core services update.
It's much nicer than having to call a method in several places.

The core services update is triggered on creation/modification/deletion
of an Interface.

In the before_flush we can manipulate the session (add objects) but we
shouldn't call commit.
The db.session.commit() was removed from the launch_task method.
parent ff3295e4
No related branches found
No related tags found
No related merge requests found
...@@ -12,6 +12,7 @@ This module implements the models used in the app. ...@@ -12,6 +12,7 @@ This module implements the models used in the app.
import ipaddress import ipaddress
import string import string
import qrcode import qrcode
import itertools
import urllib.parse import urllib.parse
import sqlalchemy as sa import sqlalchemy as sa
from enum import Enum from enum import Enum
...@@ -243,7 +244,10 @@ class User(db.Model, UserMixin): ...@@ -243,7 +244,10 @@ class User(db.Model, UserMixin):
return [favorite for favorites in favorites_list for favorite in favorites] return [favorite for favorites in favorites_list for favorite in favorites]
def launch_task(self, name, func, *args, **kwargs): def launch_task(self, name, func, *args, **kwargs):
"""Launch a task in the background using RQ""" """Launch a task in the background using RQ
The task is added to the session but not committed.
"""
q = Queue() q = Queue()
job = q.enqueue(f"app.tasks.{func}", *args, **kwargs) job = q.enqueue(f"app.tasks.{func}", *args, **kwargs)
# The status will be set to QUEUED or DEFERRED # The status will be set to QUEUED or DEFERRED
...@@ -255,7 +259,6 @@ class User(db.Model, UserMixin): ...@@ -255,7 +259,6 @@ class User(db.Model, UserMixin):
user=self, user=self,
) )
db.session.add(task) db.session.add(task)
db.session.commit()
return task return task
def get_tasks(self, all=False): def get_tasks(self, all=False):
...@@ -1097,6 +1100,22 @@ class Task(db.Model): ...@@ -1097,6 +1100,22 @@ class Task(db.Model):
} }
@sa.event.listens_for(db.session, "before_flush")
def before_flush(session, flush_context, instances):
"""Before flush hook
Used to trigger core services update on any Interface modification.
See http://docs.sqlalchemy.org/en/latest/orm/session_events.html#before-flush
"""
for instance in itertools.chain(session.new, session.dirty, session.deleted):
if isinstance(instance, Interface):
# In session.dirty, we could check session.is_modified(instance)
# but it's always True due to the updated_at column.
utils.trigger_core_services_update()
break
# call configure_mappers after defining all the models # call configure_mappers after defining all the models
# required by sqlalchemy_continuum # required by sqlalchemy_continuum
sa.orm.configure_mappers() sa.orm.configure_mappers()
......
...@@ -94,7 +94,6 @@ def create_host(): ...@@ -94,7 +94,6 @@ def create_host():
flash(f"{e}", "error") flash(f"{e}", "error")
else: else:
flash(f"Host {host} created!", "success") flash(f"Host {host} created!", "success")
tasks.trigger_core_services_update()
# Save network_id to the session to retrieve it after the redirect # Save network_id to the session to retrieve it after the redirect
session["network_id"] = network_id session["network_id"] = network_id
return redirect(url_for("network.view_host", name=host.name)) return redirect(url_for("network.view_host", name=host.name))
...@@ -120,6 +119,7 @@ def view_host(name): ...@@ -120,6 +119,7 @@ def view_host(name):
task = tasks.trigger_vm_creation( task = tasks.trigger_vm_creation(
name, interface, int(form.memory.data) * 1000, form.cores.data name, interface, int(form.memory.data) * 1000, form.cores.data
) )
db.session.commit()
current_app.logger.info(f"Creation of {name} requested: task {task.id}") current_app.logger.info(f"Creation of {name} requested: task {task.id}")
flash( flash(
f"Creation of {name} requested! Refresh the page to update the status.", f"Creation of {name} requested! Refresh the page to update the status.",
...@@ -147,7 +147,6 @@ def edit_host(name): ...@@ -147,7 +147,6 @@ def edit_host(name):
flash(f"{e}", "error") flash(f"{e}", "error")
else: else:
flash(f"Host {host} updated!", "success") flash(f"Host {host} updated!", "success")
tasks.trigger_core_services_update()
return redirect(url_for("network.view_host", name=host.name)) return redirect(url_for("network.view_host", name=host.name))
return render_template("network/edit_host.html", form=form) return render_template("network/edit_host.html", form=form)
...@@ -187,7 +186,6 @@ def create_interface(hostname): ...@@ -187,7 +186,6 @@ def create_interface(hostname):
flash(f"{e}", "error") flash(f"{e}", "error")
else: else:
flash(f"Host {interface} created!", "success") flash(f"Host {interface} created!", "success")
tasks.trigger_core_services_update()
return redirect(url_for("network.create_interface", hostname=hostname)) return redirect(url_for("network.create_interface", hostname=hostname))
return render_template( return render_template(
"network/create_interface.html", form=form, hostname=hostname "network/create_interface.html", form=form, hostname=hostname
...@@ -261,7 +259,6 @@ def edit_interface(name): ...@@ -261,7 +259,6 @@ def edit_interface(name):
flash(f"{e}", "error") flash(f"{e}", "error")
else: else:
flash(f"Interface {interface} updated!", "success") flash(f"Interface {interface} updated!", "success")
tasks.trigger_core_services_update()
return redirect(url_for("network.view_host", name=interface.host.name)) return redirect(url_for("network.view_host", name=interface.host.name))
return render_template( return render_template(
"network/edit_interface.html", form=form, hostname=interface.host.name "network/edit_interface.html", form=form, hostname=interface.host.name
......
...@@ -118,31 +118,6 @@ def trigger_vm_creation(name, interface, memory, cores): ...@@ -118,31 +118,6 @@ def trigger_vm_creation(name, interface, memory, cores):
return task return task
def trigger_core_services_update():
"""Trigger a job to update the core services on the TN (DNS and DHCP)
This function should be called every time an host or interface is created/edited
We can have one running job + one in queue to apply the latest changes.
Make sure that we don't have more than one in queue.
"""
job_template = current_app.config["AWX_CORE_SERVICES_UPDATE"]
user = utils.cse_current_user()
if user.is_task_waiting("trigger_core_services_update"):
current_app.logger.info(
'Already one "trigger_core_services_update" task waiting. No need to trigger a new one.'
)
return None
kwargs = {"func": "launch_job_template", "job_template": job_template}
started = user.get_task_started("trigger_core_services_update")
if started:
# There is already one running task. Trigger a new one when it's done.
kwargs["depends_on"] = started.id
current_app.logger.info(f"Launch new job to update core services: {job_template}")
task = user.launch_task("trigger_core_services_update", **kwargs)
return task
def launch_job_template(job_template, **kwargs): def launch_job_template(job_template, **kwargs):
rq_job = get_current_job() rq_job = get_current_job()
if job_template in ( if job_template in (
......
...@@ -223,3 +223,28 @@ def format_datetime(value, format="%Y-%m-%d %H:%M"): ...@@ -223,3 +223,28 @@ def format_datetime(value, format="%Y-%m-%d %H:%M"):
Function used as a jinja2 filter Function used as a jinja2 filter
""" """
return value.strftime(format) return value.strftime(format)
def trigger_core_services_update():
"""Trigger a job to update the core services on the TN (DNS and DHCP)
This function should be called every time an interface is created/edited
We can have one running job + one in queue to apply the latest changes.
Make sure that we don't have more than one in queue.
"""
job_template = current_app.config["AWX_CORE_SERVICES_UPDATE"]
user = cse_current_user()
if user.is_task_waiting("trigger_core_services_update"):
current_app.logger.info(
'Already one "trigger_core_services_update" task waiting. No need to trigger a new one.'
)
return None
kwargs = {"func": "launch_job_template", "job_template": job_template}
started = user.get_task_started("trigger_core_services_update")
if started:
# There is already one running task. Trigger a new one when it's done.
kwargs["depends_on"] = started.id
current_app.logger.info(f"Launch new job to update core services: {job_template}")
task = user.launch_task("trigger_core_services_update", **kwargs)
return task
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment