Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
views.py 8.97 KiB
# -*- coding: utf-8 -*-
"""
app.inventory.views
~~~~~~~~~~~~~~~~~~~

This module implements the inventory blueprint.

:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.

"""
import sqlalchemy as sa
from flask import (Blueprint, render_template, jsonify, session,
                   request, redirect, url_for, flash, current_app)
from flask_login import login_required
from .forms import AttributeForm, ItemForm, CommentForm
from ..extensions import db
from ..decorators import login_groups_accepted
from .. import utils, models

bp = Blueprint('inventory', __name__)


@bp.route('/_retrieve_items')
@login_required
def retrieve_items():
    # Get the parameters from the query string sent by datatables
    draw = int(request.args.get('draw'))
    start = int(request.args.get('start', 0))
    per_page = int(request.args.get('length', 20))
    search = request.args.get('search[value]', '')
    order_column = int(request.args.get('order[0][column]'))
    if request.args.get('order[0][dir]') == 'desc':
        order_dir = sa.desc
    else:
        order_dir = sa.asc
    # Total number of items before filtering
    nb_items_total = db.session.query(sa.func.count(models.Item.id)).scalar()
    # Construct the query
    query = models.Item.query
    if search:
        if '%' not in search:
            search = f'%{search}%'
        q1 = query.filter(
            sa.or_(
                models.Item.ics_id.like(search),
                models.Item.serial_number.like(search),
            )
        )
        q2 = query.join(models.Item.manufacturer).filter(
            models.Manufacturer.name.like(search))
        q3 = query.join(models.Item.model).filter(
            models.Model.name.like(search))
        q4 = query.join(models.Item.location).filter(
            models.Location.name.like(search))
        q5 = query.join(models.Item.status).filter(
            models.Status.name.like(search))
        q6 = query.join(models.Item.comments).filter(
            models.ItemComment.body.like(search))
        query = q1.union(q2).union(q3).union(q4).union(q5).union(q6)
        nb_items_filtered = query.order_by(None).count()
    else:
        nb_items_filtered = nb_items_total
    # Construct the order_by query
    columns = ('id',
               'ics_id',
               'created_at',
               'updated_at',
               'serial_number',
               'manufacturer',
               'model',
               'location',
               'status',
               'parent',
               )
    query = query.order_by(order_dir(getattr(models.Item, columns[order_column])))
    # Limit and offset the query
    if per_page != -1:
        query = query.limit(per_page)
    query = query.offset(start)
    data = [
        [item.id,
         item.ics_id,
         utils.format_field(item.created_at),
         utils.format_field(item.updated_at),
         item.serial_number,
         utils.format_field(item.manufacturer),
         utils.format_field(item.model),
         utils.format_field(item.location),
         utils.format_field(item.status),
         utils.format_field(item.parent),
         ] for item in query.all()
    ]
    response = {
        'draw': draw,
        'recordsTotal': nb_items_total,
        'recordsFiltered': nb_items_filtered,
        'data': data
    }
    return jsonify(response)


@bp.route('/items')
@login_required
def list_items():
    return render_template('inventory/items.html')


@bp.route('/items/create', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def create_item():
    # The following keys are stored in the session to easily create
    # several identical items
    keys = ('manufacturer_id', 'model_id', 'location_id', 'status_id', 'parent_id')
    settings = {key: session.get(key, '') for key in keys}
    form = ItemForm(request.form, **settings)
    if form.validate_on_submit():
        for key in keys:
            session[key] = getattr(form, key).data
        item = models.Item(ics_id=form.ics_id.data,
                           serial_number=form.serial_number.data,
                           manufacturer_id=form.manufacturer_id.data,
                           model_id=form.model_id.data,
                           location_id=form.location_id.data,
                           status_id=form.status_id.data,
                           parent_id=form.parent_id.data)
        item.macs = [models.Mac(address=address) for address in form.mac_addresses.data.split()]
        current_app.logger.debug(f'Trying to create: {item!r}')
        db.session.add(item)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f'{e}')
            flash(f'{e}', 'error')
        else:
            flash(f'Item {item} created!', 'success')
        return redirect(url_for('inventory.create_item'))
    return render_template('inventory/create_item.html', form=form)


@bp.route('/items/view/<ics_id>')
@login_required
def view_item(ics_id):
    item = models.Item.query.filter_by(ics_id=ics_id).first_or_404()
    return render_template('inventory/view_item.html', item=item)


@bp.route('/items/comment/<ics_id>', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def comment_item(ics_id):
    item = models.Item.query.filter_by(ics_id=ics_id).first_or_404()
    form = CommentForm()
    if form.validate_on_submit():
        comment = models.ItemComment(body=form.body.data,
                                     item_id=item.id)
        db.session.add(comment)
        db.session.commit()
        return redirect(url_for('inventory.view_item', ics_id=ics_id))
    return render_template('inventory/comment_item.html', item=item, form=form)


@bp.route('/items/edit/<ics_id>', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def edit_item(ics_id):
    item = models.Item.query.filter_by(ics_id=ics_id).first_or_404()
    mac_addresses = ' '.join([str(mac) for mac in item.macs])
    form = ItemForm(request.form, obj=item, mac_addresses=mac_addresses)
    if form.validate_on_submit():
        # Only allow to update temporary ics_id
        if item.ics_id.startswith(current_app.config['TEMPORARY_ICS_ID']):
            item.ics_id = form.ics_id.data
        item.serial_number = form.serial_number.data
        for key in ('manufacturer_id', 'model_id', 'location_id', 'status_id', 'parent_id'):
            setattr(item, key, getattr(form, key).data)
        new_addresses = form.mac_addresses.data.split()
        # Delete the MAC addresses that have been removed
        for (index, mac) in enumerate(item.macs):
            if mac.address not in new_addresses:
                item.macs.pop(index)
                db.session.delete(mac)
        # Add new MAC addresseses
        for address in new_addresses:
            if address not in mac_addresses:
                mac = models.Mac(address=address)
                item.macs.append(mac)
        current_app.logger.debug(f'Trying to update: {item!r}')
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f'{e}')
            flash(f'{e}', 'error')
        else:
            flash(f'Item {item} updated!', 'success')
        return redirect(url_for('inventory.view_item', ics_id=item.ics_id))
    return render_template('inventory/edit_item.html', form=form)


@bp.route('/qrcodes/<kind>')
@login_required
def qrcodes(kind='Action'):
    try:
        model = getattr(models, kind)
    except AttributeError:
        raise utils.CSEntryError(f"Unknown model '{kind}'", status_code=422)
    items = db.session.query(model).order_by(model.name)
    images = [{'name': item.name,
               'data': item.base64_image()}
              for item in items]
    return render_template('inventory/qrcodes.html', kind=kind, images=images)


@bp.route('/attributes/<kind>', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def attributes(kind):
    form = AttributeForm()
    if form.validate_on_submit():
        model = getattr(models, kind)
        new_model = model(name=form.name.data,
                          description=form.description.data or None)
        db.session.add(new_model)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            flash(f'{form.name.data} already exists! {kind} not created.', 'error')
        else:
            flash(f'{kind} {new_model} created!', 'success')
        return redirect(url_for('inventory.attributes', kind=kind))
    return render_template('inventory/attributes.html', kind=kind, form=form)


@bp.route('/_retrieve_attributes_name/<kind>')
@login_required
def retrieve_attributes_name(kind):
    try:
        model = getattr(models, kind)
    except AttributeError:
        raise utils.CSEntryError(f"Unknown model '{kind}'", status_code=422)
    items = db.session.query(model).order_by(model.name)
    data = [(item.name, item.description) for item in items]
    return jsonify(data=data)