# -*- coding: utf-8 -*- """ app.api.utils ~~~~~~~~~~~~~ This module implements useful functions for the API. :copyright: (c) 2017 European Spallation Source ERIC :license: BSD 2-Clause, see LICENSE for more details. """ import urllib.parse import sqlalchemy as sa from flask import current_app, jsonify, request from flask_login import current_user from flask_sqlalchemy import Pagination from ..extensions import db from .. import utils def commit(): try: db.session.commit() except sa.exc.SQLAlchemyError as e: db.session.rollback() raise utils.CSEntryError(str(e), status_code=422) def build_pagination_header(pagination, base_url, **kwargs): """Return the X-Total-Count and Link header information :param pagination: flask_sqlalchemy Pagination class instance :param base_url: request base_url :param kwargs: extra query string parameters (without page and per_page) :returns: dict with X-Total-Count and Link keys """ header = {"X-Total-Count": pagination.total} links = [] if pagination.page > 1: params = urllib.parse.urlencode( {"per_page": pagination.per_page, "page": 1, **kwargs} ) links.append(f'<{base_url}?{params}>; rel="first"') if pagination.has_prev: params = urllib.parse.urlencode( {"per_page": pagination.per_page, "page": pagination.prev_num, **kwargs} ) links.append(f'<{base_url}?{params}>; rel="prev"') if pagination.has_next: params = urllib.parse.urlencode( {"per_page": pagination.per_page, "page": pagination.next_num, **kwargs} ) links.append(f'<{base_url}?{params}>; rel="next"') if pagination.pages > pagination.page: params = urllib.parse.urlencode( {"per_page": pagination.per_page, "page": pagination.pages, **kwargs} ) links.append(f'<{base_url}?{params}>; rel="last"') if links: header["Link"] = ", ".join(links) return header def get_generic_model(model, order_by=None, base_query=None, query=None): """Return data from model as json :param model: model class :param order_by: column to order the result by (not used if base_query or query is passed) :param query: optional base_query to use [default: model.query] :param query: optional query to use (for more complex queries) :returns: data from model as json """ kwargs = request.args.to_dict() page = int(kwargs.pop("page", 1)) per_page = int(kwargs.pop("per_page", 20)) # Remove recursive from kwargs so that it doesn't get passed # to query.filter_by in get_query recursive = kwargs.pop("recursive", "false").lower() == "true" if query is None: if base_query is None: base_query = model.query if order_by is None: order_by = getattr(model, "name") base_query = base_query.order_by(order_by) query = utils.get_query(base_query, model, **kwargs) pagination = query.paginate( page, per_page, max_per_page=current_app.config["MAX_PER_PAGE"] ) data = [item.to_dict(recursive=recursive) for item in pagination.items] # Re-add recursive to kwargs so that it's part of the pagination url kwargs["recursive"] = recursive header = build_pagination_header(pagination, request.base_url, **kwargs) return jsonify(data), 200, header def search_generic_model(model, filter_sensitive=False): """Return filtered data from model as json :param model: model class :param bool filter_sensitive: filter out sensitive data if set to True :returns: filtered data from model as json """ kwargs = request.args.to_dict() page = int(kwargs.pop("page", 1)) per_page = int(kwargs.pop("per_page", 20)) per_page = min(per_page, current_app.config["MAX_PER_PAGE"]) search = kwargs.get("q", "*") instances, nb_filtered = model.search( search, page=page, per_page=per_page, filter_sensitive=filter_sensitive ) current_app.logger.debug( f'Found {nb_filtered} {model.__tablename__}(s) when searching "{search}"' ) data = [instance.to_dict(recursive=True) for instance in instances] pagination = Pagination(None, page, per_page, nb_filtered, None) header = build_pagination_header(pagination, request.base_url, **kwargs) return jsonify(data), 200, header def get_json_body(): """Return the json body from the current request Raise a CSEntryError if the body is not a json object """ data = request.get_json() if data is None: raise utils.CSEntryError("Body should be a JSON object") if not data: raise utils.CSEntryError("At least one field is required", status_code=422) return data def create_generic_model(model, mandatory_fields=("name",), **kwargs): """Create an instance of the model :param model: model class :param mandatory_fields: list of fields that shall be passed in the body :param kwargs: extra fields to use :returns: representation of the created instance as json """ data = get_json_body() data.update(kwargs) for mandatory_field in mandatory_fields: if mandatory_field not in data: raise utils.CSEntryError( f"Missing mandatory field '{mandatory_field}'", status_code=422 ) try: instance = model(**data) except TypeError as e: message = str(e).replace("__init__() got an ", "") raise utils.CSEntryError(message, status_code=422) except ValueError as e: raise utils.CSEntryError(str(e), status_code=422) db.session.add(instance) commit() current_app.logger.info( f"New {model.__tablename__} created by {current_user}: {instance.to_dict()}" ) return jsonify(instance.to_dict()), 201 def delete_generic_model(model, primary_key): """Delete the model based on the primary_key :param model: model class :param primary_key: primary key of the instance to delete """ instance = model.query.get_or_404(primary_key) db.session.delete(instance) commit() current_app.logger.info( f"{model.__tablename__} {instance} deleted by {current_user}" ) return jsonify(), 204 def update_generic_model(model, primary_key, allowed_fields): """Update the model based on the primary_key :param model: model class :param primary_key: primary key of the instance to update :param allowed_fields: list of fields that can be updated :returns: representation of the updated model as json """ data = get_json_body() # Use a list and not a set because the order is important allowed_keys = [field[0] for field in allowed_fields] for key in data: if key not in allowed_keys: raise utils.CSEntryError(f"Invalid field '{key}'", status_code=422) instance = model.query.get_or_404(primary_key) data = utils.convert_to_models(data, allowed_fields) try: # Loop on allowed_keys and not data to respect # the order in which the fields are set # setting ip before network is important for Interface for key in allowed_keys: if key in data: setattr(instance, key, data[key]) except Exception as e: raise utils.CSEntryError(str(e), status_code=422) commit() current_app.logger.info( f"{model.__tablename__} {primary_key} updated by {current_user}: {instance.to_dict()}" ) return jsonify(instance.to_dict()), 200