Skip to content
Snippets Groups Projects
utils.py 7.4 KiB
Newer Older
Benjamin Bertrand's avatar
Benjamin Bertrand committed
# -*- coding: utf-8 -*-
"""
app.api.utils
~~~~~~~~~~~~~

This module implements useful functions for the API.

:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.

"""
import urllib.parse
Benjamin Bertrand's avatar
Benjamin Bertrand committed
import sqlalchemy as sa
from flask import current_app, jsonify, request
Benjamin Bertrand's avatar
Benjamin Bertrand committed
from flask_login import current_user
from flask_sqlalchemy import Pagination
Benjamin Bertrand's avatar
Benjamin Bertrand committed
from ..extensions import db
from .. import utils


def commit():
    try:
        db.session.commit()
    except sa.exc.SQLAlchemyError as e:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        db.session.rollback()
        raise utils.CSEntryError(str(e), status_code=422)


def build_pagination_header(pagination, base_url, **kwargs):
    """Return the X-Total-Count and Link header information

    :param pagination: flask_sqlalchemy Pagination class instance
    :param base_url: request base_url
    :param kwargs: extra query string parameters (without page and per_page)
    :returns: dict with X-Total-Count and Link keys
    """
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    header = {"X-Total-Count": pagination.total}
    links = []
    if pagination.page > 1:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        params = urllib.parse.urlencode(
            {"per_page": pagination.per_page, "page": 1, **kwargs}
        )
        links.append(f'<{base_url}?{params}>; rel="first"')
    if pagination.has_prev:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        params = urllib.parse.urlencode(
            {"per_page": pagination.per_page, "page": pagination.prev_num, **kwargs}
        )
        links.append(f'<{base_url}?{params}>; rel="prev"')
    if pagination.has_next:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        params = urllib.parse.urlencode(
            {"per_page": pagination.per_page, "page": pagination.next_num, **kwargs}
        )
        links.append(f'<{base_url}?{params}>; rel="next"')
    if pagination.pages > pagination.page:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        params = urllib.parse.urlencode(
            {"per_page": pagination.per_page, "page": pagination.pages, **kwargs}
        )
        links.append(f'<{base_url}?{params}>; rel="last"')
    if links:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        header["Link"] = ", ".join(links)
    return header


def get_generic_model(model, order_by=None, base_query=None, query=None):
    """Return data from model as json

    :param model: model class
    :param order_by: column to order the result by (not used if base_query or query is passed)
    :param query: optional base_query to use [default: model.query]
    :param query: optional query to use (for more complex queries)
    :returns: data from model as json
    """
    kwargs = request.args.to_dict()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    page = int(kwargs.pop("page", 1))
    per_page = int(kwargs.pop("per_page", 20))
    # Remove recursive from kwargs so that it doesn't get passed
    # to query.filter_by in get_query
    recursive = kwargs.pop("recursive", "false").lower() == "true"
    if query is None:
        if base_query is None:
            base_query = model.query
            if order_by is None:
                order_by = getattr(model, "name")
            base_query = base_query.order_by(order_by)
        query = utils.get_query(base_query, model, **kwargs)
    pagination = query.paginate(
        page, per_page, max_per_page=current_app.config["MAX_PER_PAGE"]
    )
    data = [item.to_dict(recursive=recursive) for item in pagination.items]
    # Re-add recursive to kwargs so that it's part of the pagination url
    kwargs["recursive"] = recursive
    header = build_pagination_header(pagination, request.base_url, **kwargs)
    return jsonify(data), 200, header
def search_generic_model(model, filter_sensitive=False):
    """Return filtered data from model as json

    :param model: model class
    :param bool filter_sensitive: filter out sensitive data if set to True
    :returns: filtered data from model as json
    """
    kwargs = request.args.to_dict()
    page = int(kwargs.pop("page", 1))
    per_page = int(kwargs.pop("per_page", 20))
    per_page = min(per_page, current_app.config["MAX_PER_PAGE"])
    search = kwargs.get("q", "*")
    instances, nb_filtered = model.search(
        search, page=page, per_page=per_page, filter_sensitive=filter_sensitive
    )
    current_app.logger.debug(
        f'Found {nb_filtered} {model.__tablename__}(s) when searching "{search}"'
    )
    data = [instance.to_dict(recursive=True) for instance in instances]
    pagination = Pagination(None, page, per_page, nb_filtered, None)
    header = build_pagination_header(pagination, request.base_url, **kwargs)
    return jsonify(data), 200, header


def get_json_body():
    """Return the json body from the current request

    Raise a CSEntryError if the body is not a json object
    """
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    data = request.get_json()
    if data is None:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        raise utils.CSEntryError("Body should be a JSON object")
    if not data:
        raise utils.CSEntryError("At least one field is required", status_code=422)
    return data


def create_generic_model(model, mandatory_fields=("name",), **kwargs):
    """Create an instance of the model

    :param model: model class
    :param mandatory_fields: list of fields that shall be passed in the body
    :param kwargs: extra fields to use
    :returns: representation of the created instance as json
    """
    data = get_json_body()
    data.update(kwargs)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    for mandatory_field in mandatory_fields:
        if mandatory_field not in data:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            raise utils.CSEntryError(
                f"Missing mandatory field '{mandatory_field}'", status_code=422
            )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    try:
        instance = model(**data)
    except TypeError as e:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        message = str(e).replace("__init__() got an ", "")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        raise utils.CSEntryError(message, status_code=422)
    except ValueError as e:
        raise utils.CSEntryError(str(e), status_code=422)
    db.session.add(instance)
    commit()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    current_app.logger.info(
        f"New {model.__tablename__} created by {current_user}: {instance.to_dict()}"
    )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    return jsonify(instance.to_dict()), 201


def delete_generic_model(model, primary_key):
    """Delete the model based on the primary_key

    :param model: model class
    :param primary_key: primary key of the instance to delete
    """
    instance = model.query.get_or_404(primary_key)
    db.session.delete(instance)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    current_app.logger.info(
        f"{model.__tablename__} {instance} deleted by {current_user}"
    )


def update_generic_model(model, primary_key, allowed_fields):
    """Update the model based on the primary_key

    :param model: model class
    :param primary_key: primary key of the instance to update
    :param allowed_fields: list of fields that can be updated
    :returns: representation of the updated model as json
    """
    data = get_json_body()
    # Use a list and not a set because the order is important
    allowed_keys = [field[0] for field in allowed_fields]
    for key in data:
        if key not in allowed_keys:
            raise utils.CSEntryError(f"Invalid field '{key}'", status_code=422)
    instance = model.query.get_or_404(primary_key)
    data = utils.convert_to_models(data, allowed_fields)
    try:
        # Loop on allowed_keys and not data to respect
        # the order in which the fields are set
        # setting ip before network is important for Interface
        for key in allowed_keys:
            if key in data:
                setattr(instance, key, data[key])
    except Exception as e:
        raise utils.CSEntryError(str(e), status_code=422)
    commit()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    current_app.logger.info(
        f"{model.__tablename__} {primary_key} updated by {current_user}: {instance.to_dict()}"
    )
    return jsonify(instance.to_dict()), 200