Skip to content
Snippets Groups Projects
Commit 1332808d authored by Benjamin Bertrand's avatar Benjamin Bertrand
Browse files

Improve task attributes update

Avoid calling the database several times.
Several attributes can be updated in one call.
parent 523f2cb5
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,7 @@ from . import utils, models
class TaskWorker(Worker):
"""
Modified version of the rq worker which updates
the task status in the CSEntry database
the task status and end time in the CSEntry database
"""
def save_exception(self, job, *exc_info):
......@@ -37,7 +37,8 @@ class TaskWorker(Worker):
traceback.format_exception(*exc_info))
db.session.commit()
def update_task_attribute(self, job, name, value):
def update_task_attributes(self, job, attributes):
"""Update the attributes of the task linked to the given job"""
# The task is created after enqueueing the job.
# If the job is processed very quickly, the task might
# not be found in the database. We wait up to 3 seconds.
......@@ -51,22 +52,32 @@ class TaskWorker(Worker):
else:
self.log.error(f'Task {job.id} not found! Task attribute not updated!')
return
setattr(task, name, value)
for name, value in attributes.items():
setattr(task, name, value)
db.session.commit()
def move_to_failed_queue(self, job, *exc_info):
self.update_task_attribute(job, 'ended_at', job.ended_at)
self.update_task_attribute(job, 'status', models.JobStatus.FAILED)
# This could be achieved by passing a custom exception handler
# when initializing the worker. As we already subclass it, it's
# easier to override the default handler in case of failure
self.update_task_attributes(job, {
'ended_at': job.ended_at,
'status': models.JobStatus.FAILED,
})
self.save_exception(job, *exc_info)
super().move_to_failed_queue(job, *exc_info)
def handle_job_success(self, job, queue, started_job_registry):
self.update_task_attribute(job, 'ended_at', job.ended_at)
self.update_task_attribute(job, 'status', models.JobStatus.FINISHED)
self.update_task_attributes(job, {
'ended_at': job.ended_at,
'status': models.JobStatus.FINISHED,
})
super().handle_job_success(job, queue, started_job_registry)
def prepare_job_execution(self, job):
self.update_task_attribute(job, 'status', models.JobStatus.STARTED)
self.update_task_attributes(job, {
'status': models.JobStatus.STARTED,
})
super().prepare_job_execution(job)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment