Newer
Older
# -*- coding: utf-8 -*-
"""
app.api.utils
~~~~~~~~~~~~~
This module implements useful functions for the API.
:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.
"""
import sqlalchemy as sa
from flask import current_app, jsonify, request
from flask_sqlalchemy import Pagination
from ..extensions import db
from .. import utils
def commit():
try:
db.session.commit()
except sa.exc.SQLAlchemyError as e:
db.session.rollback()
raise utils.CSEntryError(str(e), status_code=422)
def build_pagination_header(pagination, base_url, **kwargs):
"""Return the X-Total-Count and Link header information
:param pagination: flask_sqlalchemy Pagination class instance
:param base_url: request base_url
:param kwargs: extra query string parameters (without page and per_page)
:returns: dict with X-Total-Count and Link keys
"""
params = urllib.parse.urlencode(
{"per_page": pagination.per_page, "page": 1, **kwargs}
)
links.append(f'<{base_url}?{params}>; rel="first"')
if pagination.has_prev:
params = urllib.parse.urlencode(
{"per_page": pagination.per_page, "page": pagination.prev_num, **kwargs}
)
links.append(f'<{base_url}?{params}>; rel="prev"')
if pagination.has_next:
params = urllib.parse.urlencode(
{"per_page": pagination.per_page, "page": pagination.next_num, **kwargs}
)
links.append(f'<{base_url}?{params}>; rel="next"')
if pagination.pages > pagination.page:
params = urllib.parse.urlencode(
{"per_page": pagination.per_page, "page": pagination.pages, **kwargs}
)
links.append(f'<{base_url}?{params}>; rel="last"')
if links:
def get_generic_model(model, order_by=None, base_query=None, query=None):
"""Return data from model as json
:param model: model class
:param order_by: column to order the result by (not used if base_query or query is passed)
:param query: optional base_query to use [default: model.query]
:param query: optional query to use (for more complex queries)
page = int(kwargs.pop("page", 1))
per_page = int(kwargs.pop("per_page", 20))
# Remove recursive from kwargs so that it doesn't get passed
# to query.filter_by in get_query
recursive = kwargs.pop("recursive", "false").lower() == "true"
if base_query is None:
base_query = model.query
if order_by is None:
order_by = getattr(model, "name")
base_query = base_query.order_by(order_by)
query = utils.get_query(base_query, model, **kwargs)
pagination = query.paginate(
page, per_page, max_per_page=current_app.config["MAX_PER_PAGE"]
)
data = [item.to_dict(recursive=recursive) for item in pagination.items]
# Re-add recursive to kwargs so that it's part of the pagination url
kwargs["recursive"] = recursive
header = build_pagination_header(pagination, request.base_url, **kwargs)
return jsonify(data), 200, header
def search_generic_model(model, filter_sensitive=False):
"""Return filtered data from model as json
:param model: model class
:param bool filter_sensitive: filter out sensitive data if set to True
:returns: filtered data from model as json
"""
kwargs = request.args.to_dict()
page = int(kwargs.pop("page", 1))
per_page = int(kwargs.pop("per_page", 20))
per_page = min(per_page, current_app.config["MAX_PER_PAGE"])
instances, nb_filtered = model.search(
search, page=page, per_page=per_page, filter_sensitive=filter_sensitive
)
current_app.logger.debug(
f'Found {nb_filtered} {model.__tablename__}(s) when searching "{search}"'
)
data = [instance.to_dict(recursive=True) for instance in instances]
pagination = Pagination(None, page, per_page, nb_filtered, None)
header = build_pagination_header(pagination, request.base_url, **kwargs)
return jsonify(data), 200, header
def get_json_body():
"""Return the json body from the current request
Raise a CSEntryError if the body is not a json object
"""
raise utils.CSEntryError("Body should be a JSON object")
if not data:
raise utils.CSEntryError("At least one field is required", status_code=422)
return data
def create_generic_model(model, mandatory_fields=("name",), **kwargs):
"""Create an instance of the model
:param model: model class
:param mandatory_fields: list of fields that shall be passed in the body
:param kwargs: extra fields to use
:returns: representation of the created instance as json
"""
data = get_json_body()
for mandatory_field in mandatory_fields:
if mandatory_field not in data:
raise utils.CSEntryError(
f"Missing mandatory field '{mandatory_field}'", status_code=422
)
try:
instance = model(**data)
except TypeError as e:
message = str(e).replace("__init__() got an ", "")
raise utils.CSEntryError(message, status_code=422)
except ValueError as e:
raise utils.CSEntryError(str(e), status_code=422)
db.session.add(instance)
commit()
current_app.logger.info(
f"New {model.__tablename__} created by {current_user}: {instance.to_dict()}"
)
def delete_generic_model(model, primary_key):
"""Delete the model based on the primary_key
:param model: model class
:param primary_key: primary key of the instance to delete
"""
instance = model.query.get_or_404(primary_key)
db.session.delete(instance)
current_app.logger.info(
f"{model.__tablename__} {instance} deleted by {current_user}"
)
return jsonify(), 204
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def update_generic_model(model, primary_key, allowed_fields):
"""Update the model based on the primary_key
:param model: model class
:param primary_key: primary key of the instance to update
:param allowed_fields: list of fields that can be updated
:returns: representation of the updated model as json
"""
data = get_json_body()
# Use a list and not a set because the order is important
allowed_keys = [field[0] for field in allowed_fields]
for key in data:
if key not in allowed_keys:
raise utils.CSEntryError(f"Invalid field '{key}'", status_code=422)
instance = model.query.get_or_404(primary_key)
data = utils.convert_to_models(data, allowed_fields)
try:
# Loop on allowed_keys and not data to respect
# the order in which the fields are set
# setting ip before network is important for Interface
for key in allowed_keys:
if key in data:
setattr(instance, key, data[key])
except Exception as e:
raise utils.CSEntryError(str(e), status_code=422)
commit()
current_app.logger.info(
f"{model.__tablename__} {primary_key} updated by {current_user}: {instance.to_dict()}"
)
return jsonify(instance.to_dict()), 200