Forked from
ICS Control System Infrastructure / csentry
442 commits behind the upstream repository.
-
Benjamin Bertrand authored
Sorting on manufacturer/model/location/status requires a join which removes the row with empty field. Disable it until we find a way to sort properly including null values.
Benjamin Bertrand authoredSorting on manufacturer/model/location/status requires a join which removes the row with empty field. Disable it until we find a way to sort properly including null values.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
views.py 8.97 KiB
# -*- coding: utf-8 -*-
"""
app.inventory.views
~~~~~~~~~~~~~~~~~~~
This module implements the inventory blueprint.
:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.
"""
import sqlalchemy as sa
from flask import (Blueprint, render_template, jsonify, session,
request, redirect, url_for, flash, current_app)
from flask_login import login_required
from .forms import AttributeForm, ItemForm, CommentForm
from ..extensions import db
from ..decorators import login_groups_accepted
from .. import utils, models
bp = Blueprint('inventory', __name__)
@bp.route('/_retrieve_items')
@login_required
def retrieve_items():
# Get the parameters from the query string sent by datatables
draw = int(request.args.get('draw'))
start = int(request.args.get('start', 0))
per_page = int(request.args.get('length', 20))
search = request.args.get('search[value]', '')
order_column = int(request.args.get('order[0][column]'))
if request.args.get('order[0][dir]') == 'desc':
order_dir = sa.desc
else:
order_dir = sa.asc
# Total number of items before filtering
nb_items_total = db.session.query(sa.func.count(models.Item.id)).scalar()
# Construct the query
query = models.Item.query
if search:
if '%' not in search:
search = f'%{search}%'
q1 = query.filter(
sa.or_(
models.Item.ics_id.like(search),
models.Item.serial_number.like(search),
)
)
q2 = query.join(models.Item.manufacturer).filter(
models.Manufacturer.name.like(search))
q3 = query.join(models.Item.model).filter(
models.Model.name.like(search))
q4 = query.join(models.Item.location).filter(
models.Location.name.like(search))
q5 = query.join(models.Item.status).filter(
models.Status.name.like(search))
q6 = query.join(models.Item.comments).filter(
models.ItemComment.body.like(search))
query = q1.union(q2).union(q3).union(q4).union(q5).union(q6)
nb_items_filtered = query.order_by(None).count()
else:
nb_items_filtered = nb_items_total
# Construct the order_by query
columns = ('id',
'ics_id',
'created_at',
'updated_at',
'serial_number',
'manufacturer',
'model',
'location',
'status',
'parent',
)
query = query.order_by(order_dir(getattr(models.Item, columns[order_column])))
# Limit and offset the query
if per_page != -1:
query = query.limit(per_page)
query = query.offset(start)
data = [
[item.id,
item.ics_id,
utils.format_field(item.created_at),
utils.format_field(item.updated_at),
item.serial_number,
utils.format_field(item.manufacturer),
utils.format_field(item.model),
utils.format_field(item.location),
utils.format_field(item.status),
utils.format_field(item.parent),
] for item in query.all()
]
response = {
'draw': draw,
'recordsTotal': nb_items_total,
'recordsFiltered': nb_items_filtered,
'data': data
}
return jsonify(response)
@bp.route('/items')
@login_required
def list_items():
return render_template('inventory/items.html')
@bp.route('/items/create', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def create_item():
# The following keys are stored in the session to easily create
# several identical items
keys = ('manufacturer_id', 'model_id', 'location_id', 'status_id', 'parent_id')
settings = {key: session.get(key, '') for key in keys}
form = ItemForm(request.form, **settings)
if form.validate_on_submit():
for key in keys:
session[key] = getattr(form, key).data
item = models.Item(ics_id=form.ics_id.data,
serial_number=form.serial_number.data,
manufacturer_id=form.manufacturer_id.data,
model_id=form.model_id.data,
location_id=form.location_id.data,
status_id=form.status_id.data,
parent_id=form.parent_id.data)
item.macs = [models.Mac(address=address) for address in form.mac_addresses.data.split()]
current_app.logger.debug(f'Trying to create: {item!r}')
db.session.add(item)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f'{e}')
flash(f'{e}', 'error')
else:
flash(f'Item {item} created!', 'success')
return redirect(url_for('inventory.create_item'))
return render_template('inventory/create_item.html', form=form)
@bp.route('/items/view/<ics_id>')
@login_required
def view_item(ics_id):
item = models.Item.query.filter_by(ics_id=ics_id).first_or_404()
return render_template('inventory/view_item.html', item=item)
@bp.route('/items/comment/<ics_id>', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def comment_item(ics_id):
item = models.Item.query.filter_by(ics_id=ics_id).first_or_404()
form = CommentForm()
if form.validate_on_submit():
comment = models.ItemComment(body=form.body.data,
item_id=item.id)
db.session.add(comment)
db.session.commit()
return redirect(url_for('inventory.view_item', ics_id=ics_id))
return render_template('inventory/comment_item.html', item=item, form=form)
@bp.route('/items/edit/<ics_id>', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def edit_item(ics_id):
item = models.Item.query.filter_by(ics_id=ics_id).first_or_404()
mac_addresses = ' '.join([str(mac) for mac in item.macs])
form = ItemForm(request.form, obj=item, mac_addresses=mac_addresses)
if form.validate_on_submit():
# Only allow to update temporary ics_id
if item.ics_id.startswith(current_app.config['TEMPORARY_ICS_ID']):
item.ics_id = form.ics_id.data
item.serial_number = form.serial_number.data
for key in ('manufacturer_id', 'model_id', 'location_id', 'status_id', 'parent_id'):
setattr(item, key, getattr(form, key).data)
new_addresses = form.mac_addresses.data.split()
# Delete the MAC addresses that have been removed
for (index, mac) in enumerate(item.macs):
if mac.address not in new_addresses:
item.macs.pop(index)
db.session.delete(mac)
# Add new MAC addresseses
for address in new_addresses:
if address not in mac_addresses:
mac = models.Mac(address=address)
item.macs.append(mac)
current_app.logger.debug(f'Trying to update: {item!r}')
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f'{e}')
flash(f'{e}', 'error')
else:
flash(f'Item {item} updated!', 'success')
return redirect(url_for('inventory.view_item', ics_id=item.ics_id))
return render_template('inventory/edit_item.html', form=form)
@bp.route('/qrcodes/<kind>')
@login_required
def qrcodes(kind='Action'):
try:
model = getattr(models, kind)
except AttributeError:
raise utils.CSEntryError(f"Unknown model '{kind}'", status_code=422)
items = db.session.query(model).order_by(model.name)
images = [{'name': item.name,
'data': item.base64_image()}
for item in items]
return render_template('inventory/qrcodes.html', kind=kind, images=images)
@bp.route('/attributes/<kind>', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def attributes(kind):
form = AttributeForm()
if form.validate_on_submit():
model = getattr(models, kind)
new_model = model(name=form.name.data,
description=form.description.data or None)
db.session.add(new_model)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
flash(f'{form.name.data} already exists! {kind} not created.', 'error')
else:
flash(f'{kind} {new_model} created!', 'success')
return redirect(url_for('inventory.attributes', kind=kind))
return render_template('inventory/attributes.html', kind=kind, form=form)
@bp.route('/_retrieve_attributes_name/<kind>')
@login_required
def retrieve_attributes_name(kind):
try:
model = getattr(models, kind)
except AttributeError:
raise utils.CSEntryError(f"Unknown model '{kind}'", status_code=422)
items = db.session.query(model).order_by(model.name)
data = [(item.name, item.description) for item in items]
return jsonify(data=data)