From f610257b8caf766a359d324f4f4643fdee87fb4a Mon Sep 17 00:00:00 2001 From: Benjamin Bertrand <benjamin.bertrand@esss.se> Date: Tue, 4 Jun 2019 13:34:23 +0200 Subject: [PATCH] Update reverse dependencies of failed tasks Deferred tasks are now set to failed when the task it depends on fails. No need to have an arbitrary time to disregard "old" deferred tasks. JIRA INFRA-1051 #action In Progress --- app/commands.py | 20 +++++++++++++ app/models.py | 51 +++++++++++++++++---------------- app/tasks.py | 8 ++++++ tests/functional/test_models.py | 29 ++++++++++++------- 4 files changed, 73 insertions(+), 35 deletions(-) diff --git a/app/commands.py b/app/commands.py index 164cbb0..4a80ad0 100644 --- a/app/commands.py +++ b/app/commands.py @@ -87,6 +87,21 @@ def sync_users(): db.session.commit() +def clean_deferred_tasks(): + """Set all deferred tasks to failed""" + for task in ( + models.Task.query.filter_by(status=models.JobStatus.DEFERRED) + .order_by(models.Task.created_at) + .all() + ): + if task.depends_on is None or task.depends_on.status == models.JobStatus.FAILED: + current_app.logger.info( + f"Set deferred task {task.id} ({task.name}) to failed" + ) + task.status = models.JobStatus.FAILED + db.session.commit() + + def register_cli(app): @app.cli.command() def create_defaults(): @@ -99,6 +114,11 @@ def register_cli(app): db.session.rollback() app.logger.debug(f"{instance} already exists") + @app.cli.command() + def clean_deferred(): + """Set deferred tasks to failed if the task it depends on failed""" + clean_deferred_tasks() + @app.cli.command() def syncusers(): """Synchronize all users from the database with information the LDAP server""" diff --git a/app/models.py b/app/models.py index 8ce4e5d..1d23a02 100644 --- a/app/models.py +++ b/app/models.py @@ -38,10 +38,6 @@ from .validators import ( from . import utils, search -# Number of minutes to wait before to consider a deferrred job lost -WAITING_DELAY = 30 - - make_versioned(plugins=[FlaskUserPlugin()]) @@ -392,19 +388,13 @@ class User(db.Model, UserMixin): Waiting means: - queued - - deferred if not older than WAITING_DELAY minutes + - deferred - A deferred task can stay deferred forever if the task it depends on fails. + A deferred task will be set to failed if the task it depends on fails. """ count = ( Task.query.filter_by(name=name) - .filter( - (Task.status == JobStatus.QUEUED) - | ( - (Task.status == JobStatus.DEFERRED) - & (Task.created_at > utils.minutes_ago(WAITING_DELAY)) - ) - ) + .filter(Task.status.in_([JobStatus.DEFERRED, JobStatus.QUEUED])) .count() ) return count > 0 @@ -414,19 +404,13 @@ class User(db.Model, UserMixin): Waiting means: - queued - - deferred if not older than WAITING_DELAY minutes + - deferred - A deferred task can stay deferred forever if the task it depends on fails. + A deferred task will be set to failed if the task it depends on fails. """ return ( Task.query.filter_by(name=name) - .filter( - (Task.status == JobStatus.QUEUED) - | ( - (Task.status == JobStatus.DEFERRED) - & (Task.created_at > utils.minutes_ago(WAITING_DELAY)) - ) - ) + .filter(Task.status.in_([JobStatus.DEFERRED, JobStatus.QUEUED])) .order_by(Task.created_at.desc()) .first() ) @@ -497,7 +481,7 @@ class SearchableMixin(object): @classmethod def after_flush_postexec(cls, session, flush_context): """Retrieve the new and updated objects representation""" - if session._changes is None: + if not hasattr(session, "_changes") or session._changes is None: return # - We can't call obj.to_dict() in the before_flush event because the id # hasn't been allocated yet (for new objects) and other fields haven't been updated @@ -513,7 +497,7 @@ class SearchableMixin(object): @classmethod def after_commit(cls, session): """Update the elasticsearch index""" - if session._changes is None: + if not hasattr(session, "_changes") or session._changes is None: return for index, body in session._changes["add"]: search.add_to_index(index, body) @@ -1703,6 +1687,25 @@ class Task(db.Model): current_app.config["AWX_URL"], f"/#/{route}/{self.awx_job_id}" ) + def update_reverse_dependencies(self): + """Recursively set all reverse dependencies to FAILED + + When a RQ job is set to FAILED, its reverse dependencies will stay to DEFERRED. + This method allows to easily update the corresponding tasks status. + + The tasks are modified but the session is not committed. + """ + + def set_reverse_dependencies_to_failed(task): + for dependency in task.reverse_dependencies: + current_app.logger.info( + f"Setting {dependency.id} ({dependency.name}) to FAILED due to failed dependency" + ) + dependency.status = JobStatus.FAILED + set_reverse_dependencies_to_failed(dependency) + + set_reverse_dependencies_to_failed(self) + def __str__(self): return str(self.id) diff --git a/app/tasks.py b/app/tasks.py index a0b00d7..c771426 100644 --- a/app/tasks.py +++ b/app/tasks.py @@ -64,6 +64,13 @@ class TaskWorker(Worker): setattr(task, name, value) db.session.commit() + def update_reverse_dependencies(self, job): + task = models.Task.query.get(job.id) + if task is None: + return + task.update_reverse_dependencies() + db.session.commit() + def move_to_failed_queue(self, job, *exc_info): # This could be achieved by passing a custom exception handler # when initializing the worker. As we already subclass it, it's @@ -71,6 +78,7 @@ class TaskWorker(Worker): self.update_task_attributes( job, {"ended_at": job.ended_at, "status": models.JobStatus.FAILED} ) + self.update_reverse_dependencies(job) self.save_exception(job, *exc_info) super().move_to_failed_queue(job, *exc_info) diff --git a/tests/functional/test_models.py b/tests/functional/test_models.py index d3448e0..f8eaaf9 100644 --- a/tests/functional/test_models.py +++ b/tests/functional/test_models.py @@ -500,8 +500,8 @@ def test_task_waiting(status, user, task_factory): assert user.is_task_waiting("my-task") -@pytest.mark.parametrize("minutes", [5, 10, 29]) -def test_task_waiting_with_recent_deferred(minutes, user, task_factory): +@pytest.mark.parametrize("minutes", [5, 10, 29, 31, 60, 7200]) +def test_task_waiting_with_recent_or_old_deferred(minutes, user, task_factory): minutes_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=minutes) task_factory( created_at=minutes_ago, name="my-task", status=models.JobStatus.DEFERRED @@ -509,15 +509,6 @@ def test_task_waiting_with_recent_deferred(minutes, user, task_factory): assert user.is_task_waiting("my-task") -@pytest.mark.parametrize("minutes", [31, 60, 7200]) -def test_no_task_waiting_with_old_deferred(minutes, user, task_factory): - minutes_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=minutes) - task_factory( - created_at=minutes_ago, name="my-task", status=models.JobStatus.DEFERRED - ) - assert not user.is_task_waiting("my-task") - - @pytest.mark.parametrize("minutes", [5, 30, 7200]) def test_task_waiting_with_old_queued(minutes, user, task_factory): minutes_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=minutes) @@ -549,6 +540,22 @@ def test_get_tasks_in_progress(user, task_factory): assert user.get_tasks_in_progress("my-task") == [task1, task2, task3] +def test_update_task_reverse_dependencies(user, task_factory): + task1 = task_factory(name="my-task", status=models.JobStatus.STARTED) + task2 = task_factory( + name="my-task", status=models.JobStatus.DEFERRED, depends_on=task1 + ) + task3 = task_factory( + name="my-task", status=models.JobStatus.DEFERRED, depends_on=task1 + ) + task4 = task_factory( + name="my-task", status=models.JobStatus.DEFERRED, depends_on=task3 + ) + task1.update_reverse_dependencies() + for task in (task2, task3, task4): + assert task.status == models.JobStatus.FAILED + + def test_host_indexed(db, host_factory): host1 = host_factory(name="myhost") res = db.app.elasticsearch.search(index="host-test", q="*") -- GitLab