Skip to content
Snippets Groups Projects
  • Benjamin Bertrand's avatar
    d9516a98
    Use sqlalchemy events hook to trigger tasks · d9516a98
    Benjamin Bertrand authored
    Use the before_flush hook to trigger the core services update.
    It's much nicer than having to call a method in several places.
    
    The core services update is triggered on creation/modification/deletion
    of an Interface.
    
    In the before_flush we can manipulate the session (add objects) but we
    shouldn't call commit.
    The db.session.commit() was removed from the launch_task method.
    d9516a98
    History
    Use sqlalchemy events hook to trigger tasks
    Benjamin Bertrand authored
    Use the before_flush hook to trigger the core services update.
    It's much nicer than having to call a method in several places.
    
    The core services update is triggered on creation/modification/deletion
    of an Interface.
    
    In the before_flush we can manipulate the session (add objects) but we
    shouldn't call commit.
    The db.session.commit() was removed from the launch_task method.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tasks.py 4.97 KiB
# -*- coding: utf-8 -*-
"""
app.tasks
~~~~~~~~~

This module implements tasks to run.

:copyright: (c) 2018 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.

"""
import time
import traceback
import tower_cli
from flask import current_app
from rq import Worker, get_current_job
from .extensions import db
from . import utils, models


class TaskWorker(Worker):
    """
    Modified version of the rq worker which updates
    the task status and end time in the CSEntry database
    """

    def save_exception(self, job, *exc_info):
        """Save the exception to the database

        The exception is only saved if it occured before the AWX job was triggered.
        If the AWX job failed, we can refer to the logs on AWX.
        """
        task = models.Task.query.get(job.id)
        if task.awx_job_id is None:
            # No AWX job was triggered. An exception occured before. Save it.
            task.exception = self._get_safe_exception_string(
                traceback.format_exception(*exc_info)
            )
            db.session.commit()

    def update_task_attributes(self, job, attributes):
        """Update the attributes of the task linked to the given job"""
        # The task is created after enqueueing the job.
        # If the job is processed very quickly, the task might
        # not be found in the database. We wait up to 3 seconds.
        for _ in range(3):
            task = models.Task.query.get(job.id)
            if task is not None:
                break
            else:
                self.log.warning("task not found...")
                time.sleep(1)
        else:
            self.log.error(f"Task {job.id} not found! Task attribute not updated!")
            return
        for name, value in attributes.items():
            setattr(task, name, value)
        db.session.commit()

    def move_to_failed_queue(self, job, *exc_info):
        # This could be achieved by passing a custom exception handler
        # when initializing the worker. As we already subclass it, it's
        # easier to override the default handler in case of failure
        self.update_task_attributes(
            job, {"ended_at": job.ended_at, "status": models.JobStatus.FAILED}
        )
        self.save_exception(job, *exc_info)
        super().move_to_failed_queue(job, *exc_info)

    def handle_job_success(self, job, queue, started_job_registry):
        self.update_task_attributes(
            job, {"ended_at": job.ended_at, "status": models.JobStatus.FINISHED}
        )
        super().handle_job_success(job, queue, started_job_registry)

    def prepare_job_execution(self, job):
        self.update_task_attributes(job, {"status": models.JobStatus.STARTED})
        super().prepare_job_execution(job)


def trigger_vm_creation(name, interface, memory, cores):
    """Trigger a job to create a virtual machine or virtual IOC"""
    extra_vars = [
        f"vmname={name}",
        f"memory={memory}",
        f"cores={cores}",
        f"vcpus={cores}",
        f"vlan_name={interface.network.vlan_name}",
        f"vlan_id={interface.network.vlan_id}",
        f"mac={interface.mac.address}",
    ]
    if interface.is_ioc:
        task_name = "trigger_vioc_creation"
        job_template = current_app.config["AWX_CREATE_VIOC"]
    else:
        task_name = "trigger_vm_creation"
        job_template = current_app.config["AWX_CREATE_VM"]
        extra_vars.extend(
            [
                f"ip_address={interface.ip}",
                f"domain={interface.network.domain.name}",
                f'dns={current_app.config["VM_DEFAULT_DNS"]}',
                f"netmask={interface.network.netmask}",
                f"gateway={interface.network.gateway}",
            ]
        )
    current_app.logger.info(
        f"Launch new job to create the {name} VM: {job_template} with {extra_vars}"
    )
    user = utils.cse_current_user()
    task = user.launch_task(
        task_name,
        func="launch_job_template",
        job_template=job_template,
        extra_vars=extra_vars,
        timeout=500,
    )
    return task


def launch_job_template(job_template, **kwargs):
    rq_job = get_current_job()
    if job_template in (
        current_app.config["AWX_CREATE_VIOC"],
        current_app.config["AWX_CREATE_VM"],
    ) and not current_app.config.get("AWX_VM_CREATION_ENABLED", False):
        current_app.logger.info("AWX VM creation is disabled. Not sending any request.")
        return "AWX VM creation not triggered"
    if not current_app.config.get("AWX_JOB_ENABLED", False):
        current_app.logger.info("AWX job is disabled. Not sending any request.")
        return "AWX job not triggered"
    # Launch the AWX job
    resource = tower_cli.get_resource("job")
    result = resource.launch(job_template=job_template, **kwargs)
    # Save the AWX job id in the task
    task = models.Task.query.get(rq_job.id)
    task.awx_job_id = result["id"]
    db.session.commit()
    # Monitor the job until done
    result = resource.monitor(pk=result["id"])
    return result