From be6a492c6d216764be0e15059471be4df761ebf2 Mon Sep 17 00:00:00 2001
From: Benjamin Bertrand <benjamin.bertrand@esss.se>
Date: Sat, 30 Jun 2018 11:04:21 +0200
Subject: [PATCH] Add task table

Table used to save the RQ background jobs

JIRA INFRA-403
---
 app/commands.py                               |  3 +-
 app/models.py                                 | 74 +++++++++++++++++
 app/network/views.py                          |  4 +-
 app/tasks.py                                  | 79 +++++++++++++++----
 .../versions/7d0d580cdb1a_add_task_table.py   | 37 +++++++++
 5 files changed, 178 insertions(+), 19 deletions(-)
 create mode 100644 migrations/versions/7d0d580cdb1a_add_task_table.py

diff --git a/app/commands.py b/app/commands.py
index 221ca71..d590302 100644
--- a/app/commands.py
+++ b/app/commands.py
@@ -17,6 +17,7 @@ from flask import current_app
 from .extensions import db, ldap_manager
 from .defaults import defaults
 from .models import User
+from .tasks import TaskWorker
 from . import utils, tokens
 
 
@@ -97,5 +98,5 @@ def register_cli(app):
         redis_url = current_app.config['REDIS_URL']
         redis_connection = redis.from_url(redis_url)
         with rq.Connection(redis_connection):
-            worker = rq.Worker(current_app.config['QUEUES'])
+            worker = TaskWorker(current_app.config['QUEUES'])
             worker.work()
diff --git a/app/models.py b/app/models.py
index fdc1f1a..1a3eb98 100644
--- a/app/models.py
+++ b/app/models.py
@@ -13,6 +13,7 @@ import ipaddress
 import string
 import qrcode
 import sqlalchemy as sa
+from enum import Enum
 from sqlalchemy.ext.declarative import declared_attr
 from sqlalchemy.dialects import postgresql
 from sqlalchemy.orm import validates
@@ -21,6 +22,7 @@ from citext import CIText
 from flask import current_app
 from flask_login import UserMixin
 from wtforms import ValidationError
+from rq import Queue
 from .extensions import db, login_manager, ldap_manager, cache
 from .plugins import FlaskUserPlugin
 from .validators import (ICS_ID_RE, HOST_NAME_RE, VLAN_NAME_RE, MAC_ADDRESS_RE,
@@ -135,6 +137,7 @@ class User(db.Model, UserMixin):
     email = db.Column(db.Text)
     groups = db.Column(postgresql.ARRAY(db.Text), default=[])
     tokens = db.relationship("Token", backref="user")
+    tasks = db.relationship('Task', backref='user')
     # The favorites won't be accessed very often so we load them
     # only when necessary (lazy=True)
     favorite_manufacturers = db.relationship(
@@ -200,6 +203,41 @@ class User(db.Model, UserMixin):
                           self.favorite_actions]
         return [favorite for favorites in favorites_list for favorite in favorites]
 
+    def launch_task(self, name, func, *args, **kwargs):
+        """Launch a task in the background using RQ"""
+        q = Queue()
+        job = q.enqueue(f'app.tasks.{func}', *args, **kwargs)
+        # The status will be set to QUEUED or DEFERRED
+        task = Task(id=job.id, name=name, command=job.get_call_string(),
+                    status=JobStatus(job.status), user=self)
+        db.session.add(task)
+        db.session.commit()
+        return task
+
+    def get_tasks(self, all=False):
+        """Return all tasks created by the current user
+
+        If the user is admin and all is set to True, will return all tasks
+        """
+        if all and self.is_admin:
+            return Task.query.order_by(Task.created_at).all()
+        return Task.query.filter_by(user=self).order_by(Task.created_at).all()
+
+    def get_tasks_in_progress(self, name):
+        """Return all the <name> tasks not finished or failed"""
+        return Task.query.filter_by(name=name).filter(
+            ~Task.status.in_([JobStatus.FINISHED, JobStatus.FAILED])).all()
+
+    def get_task_started(self, name):
+        """Return the <name> task currently running or None"""
+        return Task.query.filter_by(name=name, status=JobStatus.STARTED).first()
+
+    def is_task_waiting(self, name):
+        """Return True if a <name> task is waiting (queued or deferred)"""
+        count = Task.query.filter_by(name=name).filter(
+            Task.status.in_([JobStatus.DEFERRED, JobStatus.QUEUED])).count()
+        return count > 0
+
     def __str__(self):
         return self.username
 
@@ -907,6 +945,42 @@ class NetworkScope(CreatedMixin, db.Model):
         return d
 
 
+# Define RQ JobStatus as a Python enum
+# We can't use the one defined in rq/job.py as it's
+# not a real enum (it's a custom one) and is not
+# compatible with sqlalchemy
+class JobStatus(Enum):
+    QUEUED = 'queued'
+    FINISHED = 'finished'
+    FAILED = 'failed'
+    STARTED = 'started'
+    DEFERRED = 'deferred'
+
+
+class Task(db.Model):
+    # Use job id generated by RQ
+    id = db.Column(postgresql.UUID, primary_key=True)
+    created_at = db.Column(db.DateTime, default=utcnow())
+    name = db.Column(db.Text, nullable=False, index=True)
+    command = db.Column(db.Text)
+    status = db.Column(db.Enum(JobStatus, name='job_status'))
+    user_id = db.Column(db.Integer, db.ForeignKey('user_account.id'),
+                        nullable=False, default=utils.fetch_current_user_id)
+
+    def __str__(self):
+        return str(self.id)
+
+    def to_dict(self):
+        return {
+            'id': self.id,
+            'created_at': utils.format_field(self.created_at),
+            'name': self.name,
+            'command': self.command,
+            'status': self.status,
+            'user': str(self.user),
+        }
+
+
 # call configure_mappers after defining all the models
 # required by sqlalchemy_continuum
 sa.orm.configure_mappers()
diff --git a/app/network/views.py b/app/network/views.py
index b638449..bad768c 100644
--- a/app/network/views.py
+++ b/app/network/views.py
@@ -92,8 +92,8 @@ def view_host(name):
             flash(f'Only admin users are allowed to create a VM!', 'info')
         else:
             interface = host.interfaces[0]
-            job = tasks.trigger_vm_creation(name, interface, int(form.memory.data) * 1000, form.cores.data)
-            current_app.logger.info(f'Creation of {name} requested: job {job.get_id()}')
+            task = tasks.trigger_vm_creation(name, interface, int(form.memory.data) * 1000, form.cores.data)
+            current_app.logger.info(f'Creation of {name} requested: task {task.id}')
             flash(f'Creation of {name} requested!', 'success')
         return redirect(url_for('network.view_host', name=name))
     return render_template('network/view_host.html', host=host, form=form)
diff --git a/app/tasks.py b/app/tasks.py
index bea8ce4..d546806 100644
--- a/app/tasks.py
+++ b/app/tasks.py
@@ -9,9 +9,48 @@ This module implements tasks to run.
 :license: BSD 2-Clause, see LICENSE for more details.
 
 """
+import time
 import tower_cli
 from flask import current_app
-from rq import Queue
+from rq import Worker
+from .extensions import db
+from . import utils, models
+
+
+class TaskWorker(Worker):
+    """
+    Modified version of the rq worker which updates
+    the task status in the CSEntry database
+    """
+
+    def update_task_status(self, job, status):
+        # The task is created after enqueueing the job.
+        # If the job is processed very quickly, the task might
+        # not be found in the database. We wait up to 3 seconds.
+        for _ in range(3):
+            task = models.Task.query.get(job.id)
+            if task is not None:
+                break
+            else:
+                self.log.warning('task not found...')
+                time.sleep(1)
+        else:
+            self.log.error(f'Task {job.id} not found! Task status not updated!')
+            return
+        task.status = status
+        db.session.commit()
+
+    def move_to_failed_queue(self, job, *exc_info):
+        self.update_task_status(job, status=models.JobStatus.FAILED)
+        super().move_to_failed_queue(job, *exc_info)
+
+    def handle_job_success(self, job, queue, started_job_registry):
+        self.update_task_status(job, status=models.JobStatus.FINISHED)
+        super().handle_job_success(job, queue, started_job_registry)
+
+    def prepare_job_execution(self, job):
+        self.update_task_status(job, status=models.JobStatus.STARTED)
+        super().prepare_job_execution(job)
 
 
 def trigger_vm_creation(name, interface, memory, cores):
@@ -26,8 +65,10 @@ def trigger_vm_creation(name, interface, memory, cores):
         f'mac={interface.mac.address}',
     ]
     if interface.is_ioc:
+        task_name = 'trigger_vioc_creation'
         job_template = current_app.config['AWX_CREATE_VIOC']
     else:
+        task_name = 'trigger_vm_creation'
         job_template = current_app.config['AWX_CREATE_VM']
         extra_vars.extend([
             f'ip_address={interface.ip}',
@@ -36,15 +77,16 @@ def trigger_vm_creation(name, interface, memory, cores):
             f'netmask={interface.network.netmask}',
             f'gateway={interface.network.gateway}',
         ])
-    q = Queue()
     current_app.logger.info(f'Launch new job to create the {name} VM: {job_template} with {extra_vars}')
-    job = q.enqueue(
-        launch_job_template,
+    user = utils.cse_current_user()
+    task = user.launch_task(
+        task_name,
+        func='launch_job_template',
         job_template=job_template,
         extra_vars=extra_vars,
         timeout=500,
     )
-    return job
+    return task
 
 
 def trigger_core_services_update():
@@ -56,19 +98,24 @@ def trigger_core_services_update():
     Make sure that we don't have more than one in queue.
     """
     job_template = current_app.config['AWX_CORE_SERVICES_UPDATE']
-    q = Queue()
-    # Only trigger a new job if none is already waiting in queue
-    for job in q.jobs:
-        if (job.func_name == 'app.tasks.launch_job_template' and
-                job.kwargs.get('job_template', '') == job_template):
-            current_app.logger.info(f'Already one {job_template} job in queue. No need to trigger a new one.')
-            return None
+    user = utils.cse_current_user()
+    if user.is_task_waiting('trigger_core_services_update'):
+        current_app.logger.info('Already one "trigger_core_services_update" task waiting. No need to trigger a new one.')
+        return None
+    kwargs = {
+        'func': 'launch_job_template',
+        'job_template': job_template
+    }
+    started = user.get_task_started('trigger_core_services_update')
+    if started:
+        # There is already one running task. Trigger a new one when it's done.
+        kwargs['depends_on'] = started.id
     current_app.logger.info(f'Launch new job to update core services: {job_template}')
-    job = q.enqueue(
-        launch_job_template,
-        job_template=job_template,
+    task = user.launch_task(
+        'trigger_core_services_update',
+        **kwargs,
     )
-    return job
+    return task
 
 
 def launch_job_template(job_template, monitor=True, **kwargs):
diff --git a/migrations/versions/7d0d580cdb1a_add_task_table.py b/migrations/versions/7d0d580cdb1a_add_task_table.py
new file mode 100644
index 0000000..33cdd67
--- /dev/null
+++ b/migrations/versions/7d0d580cdb1a_add_task_table.py
@@ -0,0 +1,37 @@
+"""add task table
+
+Revision ID: 7d0d580cdb1a
+Revises: f5a605c0c835
+Create Date: 2018-06-28 21:01:13.171328
+
+"""
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects import postgresql
+
+# revision identifiers, used by Alembic.
+revision = '7d0d580cdb1a'
+down_revision = 'f5a605c0c835'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    op.create_table(
+        'task',
+        sa.Column('id', postgresql.UUID(), nullable=False),
+        sa.Column('created_at', sa.DateTime(), nullable=True),
+        sa.Column('name', sa.Text(), nullable=False),
+        sa.Column('command', sa.Text(), nullable=True),
+        sa.Column('status', sa.Enum('QUEUED', 'FINISHED', 'FAILED', 'STARTED', 'DEFERRED', name='job_status'), nullable=True),
+        sa.Column('user_id', sa.Integer(), nullable=False),
+        sa.ForeignKeyConstraint(['user_id'], ['user_account.id'], name=op.f('fk_task_user_id_user_account')),
+        sa.PrimaryKeyConstraint('id', name=op.f('pk_task'))
+    )
+    op.create_index(op.f('ix_task_name'), 'task', ['name'], unique=False)
+
+
+def downgrade():
+    op.drop_index(op.f('ix_task_name'), table_name='task')
+    op.drop_table('task')
+    op.execute('DROP TYPE job_status')
-- 
GitLab