From 10a7d5761a31c550759a11482d8e1699c80c30c7 Mon Sep 17 00:00:00 2001 From: Benjamin Bertrand <benjamin.bertrand@esss.se> Date: Wed, 20 Dec 2017 15:55:43 +0100 Subject: [PATCH] Fix tests after refactoring --- app/api/inventory.py | 18 +-- app/api/network.py | 35 +++-- app/api/utils.py | 18 ++- app/users/views.py | 2 +- tests/functional/conftest.py | 1 + tests/functional/factories.py | 20 ++- tests/functional/test_api.py | 266 ++++++++++++++++---------------- tests/functional/test_models.py | 16 +- tests/functional/test_web.py | 17 +- 9 files changed, 208 insertions(+), 185 deletions(-) diff --git a/app/api/inventory.py b/app/api/inventory.py index f9cf9a1..e18d61a 100644 --- a/app/api/inventory.py +++ b/app/api/inventory.py @@ -13,7 +13,7 @@ from flask import Blueprint, jsonify, request, current_app from flask_jwt_extended import jwt_required from .. import utils, models from ..decorators import jwt_groups_accepted -from .utils import commit, create_generic_model, get_generic_model +from .utils import commit, create_generic_model, get_qrcode_model, get_generic_model bp = Blueprint('inventory_api', __name__) @@ -36,10 +36,8 @@ def get_item_by_id_or_ics_id(id_): @jwt_required def get_items(): # TODO: add pagination - query = utils.get_query(models.Item.query, request.args) - items = query.order_by(models.Item._created) - data = [item.to_dict() for item in items] - return jsonify(data) + return get_generic_model(models.Item, request.args, + order_by=models.Item.created_at) @bp.route('/items/<id_>') @@ -118,13 +116,13 @@ def patch_item(id_): @bp.route('/actions') @jwt_required def get_actions(): - return get_generic_model(models.Action, request.args) + return get_qrcode_model(models.Action, request.args) @bp.route('/manufacturers') @jwt_required def get_manufacturers(): - return get_generic_model(models.Manufacturer, request.args) + return get_qrcode_model(models.Manufacturer, request.args) @bp.route('/manufacturers', methods=['POST']) @@ -137,7 +135,7 @@ def create_manufacturer(): @bp.route('/models') @jwt_required def get_models(): - return get_generic_model(models.Model, request.args) + return get_qrcode_model(models.Model, request.args) @bp.route('/models', methods=['POST']) @@ -150,7 +148,7 @@ def create_model(): @bp.route('/locations') @jwt_required def get_locations(): - return get_generic_model(models.Location, request.args) + return get_qrcode_model(models.Location, request.args) @bp.route('/locations', methods=['POST']) @@ -163,7 +161,7 @@ def create_locations(): @bp.route('/status') @jwt_required def get_status(): - return get_generic_model(models.Status, request.args) + return get_qrcode_model(models.Status, request.args) @bp.route('/status', methods=['POST']) diff --git a/app/api/network.py b/app/api/network.py index 835ce90..a07fbd8 100644 --- a/app/api/network.py +++ b/app/api/network.py @@ -9,23 +9,38 @@ This module implements the network API. :license: BSD 2-Clause, see LICENSE for more details. """ -from flask import Blueprint, jsonify, request +from flask import Blueprint, request from flask_jwt_extended import jwt_required -from .. import utils, models +from .. import models from ..decorators import jwt_groups_accepted -from .utils import create_generic_model +from .utils import get_generic_model, create_generic_model bp = Blueprint('network_api', __name__) +@bp.route('/scopes') +@jwt_required +def get_scopes(): + # TODO: add pagination + return get_generic_model(models.NetworkScope, request.args, + order_by=models.NetworkScope.name) + + +@bp.route('/scopes', methods=['POST']) +@jwt_required +@jwt_groups_accepted('admin') +def create_scope(): + """Create a new network scope""" + return create_generic_model(models.NetworkScope, mandatory_fields=( + 'name', 'first_vlan', 'last_vlan', 'supernet')) + + @bp.route('/networks') @jwt_required def get_networks(): # TODO: add pagination - query = utils.get_query(models.Network.query, request.args) - networks = query.order_by(models.Network.address) - data = [network.to_dict() for network in networks] - return jsonify(data) + return get_generic_model(models.Network, request.args, + order_by=models.Network.address) @bp.route('/networks', methods=['POST']) @@ -41,10 +56,8 @@ def create_network(): @jwt_required def get_interfaces(): # TODO: add pagination - query = utils.get_query(models.Interface.query, request.args) - interfaces = query.order_by(models.Interface.ip) - data = [interface.to_dict() for interface in interfaces] - return jsonify(data) + return get_generic_model(models.Interface, request.args, + order_by=models.Interface.ip) @bp.route('/interfaces', methods=['POST']) diff --git a/app/api/utils.py b/app/api/utils.py index ffa9da2..be89909 100644 --- a/app/api/utils.py +++ b/app/api/utils.py @@ -23,7 +23,23 @@ def commit(): raise utils.CSEntryError(str(e), status_code=422) -def get_generic_model(model, args): +def get_generic_model(model, args, order_by=None): + """Return data from model as json + + :param model: model class + :param MultiDict args: args from the request + :param order_by: column to order the result by + :returns: data from model as json + """ + query = utils.get_query(model.query, request.args) + if order_by is None: + order_by = getattr(model, 'created_at') + instances = query.order_by(order_by) + data = [instance.to_dict() for instance in instances] + return jsonify(data) + + +def get_qrcode_model(model, args): """Return data from model as json :param model: model class diff --git a/app/users/views.py b/app/users/views.py index 1ea5679..66cadcd 100644 --- a/app/users/views.py +++ b/app/users/views.py @@ -24,7 +24,7 @@ def login(): form = LDAPLoginForm(request.form) if form.validate_on_submit(): login_user(form.user, remember=form.remember_me.data) - return redirect(request.args.get('next') or url_for('items.index')) + return redirect(request.args.get('next') or url_for('main.index')) return render_template('users/login.html', form=form) diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py index bd589a0..c20fe75 100644 --- a/tests/functional/conftest.py +++ b/tests/functional/conftest.py @@ -25,6 +25,7 @@ register(factories.StatusFactory) register(factories.ItemFactory) register(factories.NetworkScopeFactory) register(factories.NetworkFactory) +register(factories.InterfaceFactory) register(factories.HostFactory) diff --git a/tests/functional/factories.py b/tests/functional/factories.py index 006cc31..318d37e 100644 --- a/tests/functional/factories.py +++ b/tests/functional/factories.py @@ -87,7 +87,7 @@ class NetworkScopeFactory(factory.alchemy.SQLAlchemyModelFactory): name = factory.Sequence(lambda n: f'scope{n}') first_vlan = factory.Sequence(lambda n: 1600 + 10 * n) last_vlan = factory.Sequence(lambda n: 1609 + 10 * n) - subnet = factory.Faker('ipv4', network=True) + supernet = factory.Faker('ipv4', network=True) class NetworkFactory(factory.alchemy.SQLAlchemyModelFactory): @@ -97,8 +97,8 @@ class NetworkFactory(factory.alchemy.SQLAlchemyModelFactory): sqlalchemy_session_persistence = 'commit' vlan_name = factory.Sequence(lambda n: f'vlan{n}') - address = factory.Faker('ipv4', network=True) vlan_id = factory.Sequence(lambda n: 1600 + n) + address = factory.Faker('ipv4', network=True) scope = factory.SubFactory(NetworkScopeFactory) @factory.lazy_attribute @@ -113,11 +113,16 @@ class NetworkFactory(factory.alchemy.SQLAlchemyModelFactory): hosts = list(net.hosts()) return str(hosts[-5]) - @factory.lazy_attribute - def gateway(self): - net = ipaddress.ip_network(self.address) - hosts = list(net.hosts()) - return str(hosts[-1]) + +class InterfaceFactory(factory.alchemy.SQLAlchemyModelFactory): + class Meta: + model = models.Interface + sqlalchemy_session = common.Session + sqlalchemy_session_persistence = 'commit' + + name = factory.Sequence(lambda n: f'host{n}') + network = factory.SubFactory(NetworkFactory) + ip = factory.LazyAttributeSequence(lambda o, n: str(ipaddress.ip_address(o.network.first_ip) + n)) class HostFactory(factory.alchemy.SQLAlchemyModelFactory): @@ -127,4 +132,3 @@ class HostFactory(factory.alchemy.SQLAlchemyModelFactory): sqlalchemy_session_persistence = 'commit' name = factory.Sequence(lambda n: f'host{n}') - network = factory.SubFactory(NetworkFactory) diff --git a/tests/functional/test_api.py b/tests/functional/test_api.py index 75111e6..29a2ec5 100644 --- a/tests/functional/test_api.py +++ b/tests/functional/test_api.py @@ -14,19 +14,22 @@ import pytest from app import models +API_URL = '/api/v1' ENDPOINT_MODEL = { - 'actions': models.Action, - 'manufacturers': models.Manufacturer, - 'models': models.Model, - 'locations': models.Location, - 'status': models.Status, - 'items': models.Item, - 'networks': models.Network, - 'hosts': models.Host, + 'inventory/actions': models.Action, + 'inventory/manufacturers': models.Manufacturer, + 'inventory/models': models.Model, + 'inventory/locations': models.Location, + 'inventory/status': models.Status, + 'inventory/items': models.Item, + 'network/networks': models.Network, + 'network/interfaces': models.Interface, } -GENERIC_GET_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key not in ('items', 'networks', 'hosts')] -GENERIC_CREATE_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key not in ('items', 'actions', 'networks', 'hosts')] -CREATE_AUTH_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key != 'actions'] +GENERIC_GET_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() + if key not in ('inventory/items', 'network/networks', 'network/interfaces')] +GENERIC_CREATE_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() + if key not in ('inventory/items', 'inventory/actions', 'network/networks', 'network/interfaces')] +CREATE_AUTH_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key != 'inventory/actions'] def get(client, url, token=None): @@ -65,7 +68,7 @@ def login(client, username, password): 'username': username, 'password': password } - return post(client, '/api/login', data) + return post(client, f'{API_URL}/users/login', data) def get_token(client, username, password): @@ -116,9 +119,9 @@ def check_input_is_subset_of_response(response, inputs): def test_login(client): - response = client.post('/api/login') + response = client.post(f'{API_URL}/users/login') check_response_message(response, 'Body should be a JSON object') - response = post(client, '/api/login', data={'username': 'foo', 'passwd': ''}) + response = post(client, f'{API_URL}/users/login', data={'username': 'foo', 'passwd': ''}) check_response_message(response, 'Missing mandatory field (username or password)', 422) response = login(client, 'foo', 'invalid') check_response_message(response, 'Invalid credentials', 401) @@ -134,17 +137,17 @@ def test_get_generic_model(endpoint, session, client, readonly_token): for name in names: session.add(model(name=name)) session.commit() - response = client.get(f'/api/{endpoint}') + response = client.get(f'{API_URL}/{endpoint}') check_response_message(response, 'Missing Authorization Header', 401) - response = get(client, f'/api/{endpoint}', 'xxxxxxxxx') + response = get(client, f'{API_URL}/{endpoint}', 'xxxxxxxxx') check_response_message(response, 'Not enough segments', 422) - response = get(client, f'/api/{endpoint}', readonly_token) + response = get(client, f'{API_URL}/{endpoint}', readonly_token) check_names(response, names) - response = get(client, f'/api/{endpoint}?qrcode=true', readonly_token) + response = get(client, f'{API_URL}/{endpoint}?qrcode=true', readonly_token) check_names(response, names) for item in response.json: assert 'qrcode' in item - response = get(client, f'/api/{endpoint}?qrcode=false', readonly_token) + response = get(client, f'{API_URL}/{endpoint}?qrcode=false', readonly_token) check_names(response, names) for item in response.json: assert 'qrcode' not in item @@ -152,11 +155,11 @@ def test_get_generic_model(endpoint, session, client, readonly_token): @pytest.mark.parametrize('endpoint', CREATE_AUTH_ENDPOINTS) def test_create_model_auth_fail(endpoint, client, readonly_token): - response = client.post(f'/api/{endpoint}') + response = client.post(f'{API_URL}/{endpoint}') check_response_message(response, 'Missing Authorization Header', 401) - response = post(client, f'/api/{endpoint}', data={}, token='xxxxxxxxx') + response = post(client, f'{API_URL}/{endpoint}', data={}, token='xxxxxxxxx') check_response_message(response, 'Not enough segments', 422) - response = post(client, f'/api/{endpoint}', data={}, token=readonly_token) + response = post(client, f'{API_URL}/{endpoint}', data={}, token=readonly_token) check_response_message(response, "User doesn't have the required group", 403) model = ENDPOINT_MODEL[endpoint] assert model.query.count() == 0 @@ -164,76 +167,76 @@ def test_create_model_auth_fail(endpoint, client, readonly_token): @pytest.mark.parametrize('endpoint', GENERIC_CREATE_ENDPOINTS) def test_create_generic_model(endpoint, client, user_token): - response = post(client, f'/api/{endpoint}', data={}, token=user_token) + response = post(client, f'{API_URL}/{endpoint}', data={}, token=user_token) check_response_message(response, "Missing mandatory field 'name'", 422) data = {'name': 'Foo'} - response = post(client, f'/api/{endpoint}', data=data, token=user_token) + response = post(client, f'{API_URL}/{endpoint}', data=data, token=user_token) assert response.status_code == 201 assert {'id', 'name'} <= set(response.json.keys()) assert response.json['name'] == 'Foo' - response = post(client, f'/api/{endpoint}', data=data, token=user_token) + response = post(client, f'{API_URL}/{endpoint}', data=data, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) - response = post(client, f'/api/{endpoint}', data={'name': 'foo'}, token=user_token) + response = post(client, f'{API_URL}/{endpoint}', data={'name': 'foo'}, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) - response = post(client, f'/api/{endpoint}', data={'name': 'FOO'}, token=user_token) + response = post(client, f'{API_URL}/{endpoint}', data={'name': 'FOO'}, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) data = {'name': 'Bar'} - response = post(client, f'/api/{endpoint}', data=data, token=user_token) + response = post(client, f'{API_URL}/{endpoint}', data=data, token=user_token) assert response.status_code == 201 model = ENDPOINT_MODEL[endpoint] assert model.query.count() == 2 - response = get(client, f'/api/{endpoint}', user_token) + response = get(client, f'{API_URL}/{endpoint}', user_token) check_names(response, ('Foo', 'Bar')) @pytest.mark.parametrize('endpoint', GENERIC_CREATE_ENDPOINTS) def test_create_generic_model_invalid_param(endpoint, client, user_token): model = ENDPOINT_MODEL[endpoint] - response = post(client, f'/api/{endpoint}', data={'name': 'foo', 'hello': 'world'}, token=user_token) + response = post(client, f'{API_URL}/{endpoint}', data={'name': 'foo', 'hello': 'world'}, token=user_token) check_response_message(response, f"'hello' is an invalid keyword argument for {model.__name__}", 422) def test_create_item(client, user_token): # check that serial_number is mandatory - response = post(client, '/api/items', data={}, token=user_token) + response = post(client, f'{API_URL}/inventory/items', data={}, token=user_token) check_response_message(response, "Missing mandatory field 'serial_number'", 422) # check create with only serial_number data = {'serial_number': '123456'} - response = post(client, '/api/items', data=data, token=user_token) + response = post(client, f'{API_URL}/inventory/items', data=data, token=user_token) assert response.status_code == 201 assert {'id', 'ics_id', 'serial_number', 'manufacturer', 'model', - 'location', 'status', 'updated', 'created', 'parent'} == set(response.json.keys()) + 'location', 'status', 'updated_at', 'created_at', 'parent'} == set(response.json.keys()) assert response.json['serial_number'] == '123456' # Check that serial_number doesn't have to be unique - response = post(client, '/api/items', data=data, token=user_token) + response = post(client, f'{API_URL}/inventory/items', data=data, token=user_token) assert response.status_code == 201 # check that ics_id shall be unique data2 = {'serial_number': '456789', 'ics_id': 'AAA001'} - response = post(client, '/api/items', data=data2, token=user_token) + response = post(client, f'{API_URL}/inventory/items', data=data2, token=user_token) assert response.status_code == 201 - response = post(client, '/api/items', data=data2, token=user_token) + response = post(client, f'{API_URL}/inventory/items', data=data2, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) # check all items that were created assert models.Item.query.count() == 3 - response = get(client, '/api/items', user_token) + response = get(client, f'{API_URL}/inventory/items', user_token) check_input_is_subset_of_response(response, (data, data, data2)) def test_create_item_invalid_ics_id(client, user_token): for ics_id in ('foo', 'AAB1234', 'AZ02', 'WS007', 'AAA01'): data = {'serial_number': '123456', 'ics_id': ics_id} - response = post(client, '/api/items', data=data, token=user_token) + response = post(client, f'{API_URL}/inventory/items', data=data, token=user_token) check_response_message(response, 'ICS id shall match [A-Z]{3}[0-9]{3}', 422) def test_get_item_fail(client, session, readonly_token): - response = get(client, '/api/items/50', token=readonly_token) + response = get(client, f'{API_URL}/inventory/items/50', token=readonly_token) check_response_message(response, "Item id '50' not found", 404) - response = get(client, '/api/items/bar', token=readonly_token) + response = get(client, f'{API_URL}/inventory/items/bar', token=readonly_token) check_response_message(response, "Item id 'bar' not found", 404) @@ -244,12 +247,12 @@ def test_get_item(client, status_factory, item_factory, readonly_token): item2 = item_factory(serial_number='234567', ics_id='AAA001', status='Stock') # we can get items by id... - response = get(client, f'/api/items/{item1.id}', token=readonly_token) + response = get(client, f'{API_URL}/inventory/items/{item1.id}', token=readonly_token) assert response.status_code == 200 assert response.json['id'] == item1.id assert response.json['serial_number'] == item1.serial_number # ...or ics_id - response = get(client, f'/api/items/{item2.ics_id}', token=readonly_token) + response = get(client, f'{API_URL}/inventory/items/{item2.ics_id}', token=readonly_token) assert response.status_code == 200 assert response.json['id'] == item2.id assert response.json['ics_id'] == item2.ics_id @@ -258,56 +261,51 @@ def test_get_item(client, status_factory, item_factory, readonly_token): def test_patch_item_auth_fail(client, session, readonly_token): - response = client.patch('/api/items/50') + response = client.patch(f'{API_URL}/inventory/items/50') check_response_message(response, 'Missing Authorization Header', 401) - response = patch(client, '/api/items/50', data={}, token='xxxxxxxxx') + response = patch(client, f'{API_URL}/inventory/items/50', data={}, token='xxxxxxxxx') check_response_message(response, 'Not enough segments', 422) - response = patch(client, '/api/items/50', data={}, token=readonly_token) + response = patch(client, f'{API_URL}/inventory/items/50', data={}, token=readonly_token) check_response_message(response, "User doesn't have the required group", 403) def test_patch_item_fail(client, item_factory, user_token): - response = patch(client, '/api/items/50', data={}, token=user_token) + response = patch(client, f'{API_URL}/inventory/items/50', data={}, token=user_token) check_response_message(response, 'At least one field is required', 422) data = {'location': 'ESS', 'foo': 'bar'} - response = patch(client, '/api/items/50', data=data, token=user_token) + response = patch(client, f'{API_URL}/inventory/items/50', data=data, token=user_token) check_response_message(response, "Invalid field 'foo'", 422) data = {'location': 'ESS'} - response = patch(client, '/api/items/50', data=data, token=user_token) + response = patch(client, f'{API_URL}/inventory/items/50', data=data, token=user_token) check_response_message(response, "Item id '50' not found", 404) - response = patch(client, '/api/items/bar', data=data, token=user_token) + response = patch(client, f'{API_URL}/inventory/items/bar', data=data, token=user_token) check_response_message(response, "Item id 'bar' not found", 404) # Create an item item1 = item_factory(serial_number='234567', ics_id='AAA001') # check that we can't change the serial_number or ics_id - response = patch(client, f'/api/items/{item1.id}', data={'serial_number': '12345'}, token=user_token) + response = patch(client, f'{API_URL}/inventory/items/{item1.id}', data={'serial_number': '12345'}, token=user_token) check_response_message(response, "Invalid field 'serial_number'", 422) - response = patch(client, f'/api/items/{item1.id}', data={'ics_id': 'AAA002'}, token=user_token) + response = patch(client, f'{API_URL}/inventory/items/{item1.id}', data={'ics_id': 'AAA002'}, token=user_token) check_response_message(response, "'ics_id' can't be changed", 422) -def test_patch_item(client, session, user_token): +def test_patch_item(client, status_factory, item_factory, user_token): # Create some items - session.add(models.Status(name='Stock')) - session.add(models.Status(name='In service')) - session.flush() - item1 = models.Item(serial_number='123456') - item2 = models.Item(serial_number='234567', ics_id='AAA001', status='Stock') - for item in (item1, item2): - session.add(item) - session.commit() + status = status_factory(name='In service') + item1 = item_factory(ics_id='ZZZ001') + item2 = item_factory() # we can patch items by id... - data = {'ics_id': 'AAA004'} - response = patch(client, f'/api/items/{item1.id}', data=data, token=user_token) + data = {'ics_id': 'AAB004'} + response = patch(client, f'{API_URL}/inventory/items/{item1.id}', data=data, token=user_token) assert response.status_code == 200 assert response.json['id'] == item1.id assert response.json['serial_number'] == item1.serial_number assert response.json['ics_id'] == data['ics_id'] # ...or ics_id - data = {'status': 'In service'} - response = patch(client, f'/api/items/{item2.ics_id}', data=data, token=user_token) + data = {'status': status.name} + response = patch(client, f'{API_URL}/inventory/items/{item2.ics_id}', data=data, token=user_token) assert response.status_code == 200 assert response.json['id'] == item2.id assert response.json['ics_id'] == item2.ics_id @@ -315,15 +313,12 @@ def test_patch_item(client, session, user_token): assert response.json['status'] == data['status'] -def test_patch_item_integrity_error(client, session, user_token): +def test_patch_item_integrity_error(client, user_token, item_factory): # Create some items - item1 = models.Item(serial_number='123456', ics_id='AAA001') - item2 = models.Item(serial_number='234567') - for item in (item1, item2): - session.add(item) - session.commit() - data = {'ics_id': 'AAA001'} - response = patch(client, f'/api/items/{item2.id}', data=data, token=user_token) + item1 = item_factory() + item2 = item_factory(ics_id='ZZZ001') + data = {'ics_id': item1.ics_id} + response = patch(client, f'{API_URL}/inventory/items/{item2.id}', data=data, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) @@ -348,7 +343,7 @@ def test_patch_item_parent(client, session, user_token): # set parent changes the status and location data1 = {'parent': item1.ics_id} - response = patch(client, f'/api/items/{item2.ics_id}', data=data1, token=user_token) + response = patch(client, f'{API_URL}/inventory/items/{item2.ics_id}', data=data1, token=user_token) assert response.status_code == 200 assert response.json['id'] == item2.id assert response.json['ics_id'] == item2.ics_id @@ -360,7 +355,7 @@ def test_patch_item_parent(client, session, user_token): # updating a parent, modifies the status and location of all children # check location data2 = {'location': 'ESS'} - response = patch(client, f'/api/items/{item1.ics_id}', data=data2, token=user_token) + response = patch(client, f'{API_URL}/inventory/items/{item1.ics_id}', data=data2, token=user_token) assert response.status_code == 200 assert response.json['id'] == item1.id assert response.json['ics_id'] == item1.ics_id @@ -368,28 +363,28 @@ def test_patch_item_parent(client, session, user_token): assert response.json['status'] == str(item1.status) assert response.json['location'] == data2['location'] for ics_id in ('AAA002', 'AAA003'): - response = get(client, f'/api/items/{ics_id}', token=user_token) + response = get(client, f'{API_URL}/inventory/items/{ics_id}', token=user_token) assert response.json['location'] == data2['location'] assert response.json['status'] == 'In service' # check status data3 = {'status': 'Stock'} - response = patch(client, f'/api/items/{item1.ics_id}', data=data3, token=user_token) + response = patch(client, f'{API_URL}/inventory/items/{item1.ics_id}', data=data3, token=user_token) assert response.status_code == 200 assert response.json['status'] == data3['status'] for ics_id in ('AAA002', 'AAA003'): - response = get(client, f'/api/items/{ics_id}', token=user_token) + response = get(client, f'{API_URL}/inventory/items/{ics_id}', token=user_token) assert response.json['location'] == data2['location'] assert response.json['status'] == data3['status'] # manufacturer has no impact on children data4 = {'manufacturer': 'HP'} - response = patch(client, f'/api/items/{item1.ics_id}', data=data4, token=user_token) + response = patch(client, f'{API_URL}/inventory/items/{item1.ics_id}', data=data4, token=user_token) assert response.status_code == 200 assert response.json['manufacturer'] == 'HP' # Manufacturer didn't change on children - response = get(client, f'/api/items/{item2.ics_id}', token=user_token) + response = get(client, f'{API_URL}/inventory/items/{item2.ics_id}', token=user_token) assert response.json['manufacturer'] is None - response = get(client, f'/api/items/{item3.ics_id}', token=user_token) + response = get(client, f'{API_URL}/inventory/items/{item3.ics_id}', token=user_token) assert response.json['manufacturer'] == 'Dell' @@ -409,25 +404,25 @@ def test_get_items(client, session, readonly_token): session.add(item) session.commit() - response = get(client, '/api/items', token=readonly_token) + response = get(client, f'{API_URL}/inventory/items', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 3 check_input_is_subset_of_response(response, (item1.to_dict(), item2.to_dict(), item3.to_dict())) # test filtering - response = get(client, '/api/items?serial_number=234567', token=readonly_token) + response = get(client, f'{API_URL}/inventory/items?serial_number=234567', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 1 check_input_is_subset_of_response(response, (item2.to_dict(),)) # filtering on location_id works but not location (might want to change that) - response = get(client, f'/api/items?location_id={item1.location_id}', token=readonly_token) + response = get(client, f'{API_URL}/inventory/items?location_id={item1.location_id}', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 1 check_input_is_subset_of_response(response, (item1.to_dict(),)) - response = get(client, '/api/items?location=ESS', token=readonly_token) + response = get(client, f'{API_URL}/inventory/items?location=ESS', token=readonly_token) check_response_message(response, 'Invalid query arguments', 422) # using an unknown key raises a 422 - response = get(client, '/api/items?foo=bar', token=readonly_token) + response = get(client, f'{API_URL}/inventory/items?foo=bar', token=readonly_token) check_response_message(response, 'Invalid query arguments', 422) @@ -437,13 +432,13 @@ def test_get_networks(client, network_factory, readonly_token): network2 = network_factory(address='172.16.20.0/22', first_ip='172.16.20.11', last_ip='172.16.20.250') network3 = network_factory(address='172.16.5.0/24', first_ip='172.16.5.10', last_ip='172.16.5.254') - response = get(client, '/api/networks', token=readonly_token) + response = get(client, f'{API_URL}/network/networks', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 3 check_input_is_subset_of_response(response, (network1.to_dict(), network2.to_dict(), network3.to_dict())) # test filtering by address - response = get(client, '/api/networks?address=172.16.20.0/22', token=readonly_token) + response = get(client, f'{API_URL}/network/networks?address=172.16.20.0/22', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 1 check_input_is_subset_of_response(response, (network2.to_dict(),)) @@ -451,24 +446,24 @@ def test_get_networks(client, network_factory, readonly_token): def test_create_network_auth_fail(client, session, user_token): # admin is required to create networks - response = post(client, '/api/networks', data={}, token=user_token) + response = post(client, f'{API_URL}/network/networks', data={}, token=user_token) check_response_message(response, "User doesn't have the required group", 403) def test_create_network(client, admin_token, network_scope_factory): - scope = network_scope_factory(subnet='172.16.0.0/16') + scope = network_scope_factory(supernet='172.16.0.0/16') # check that vlan_name, vlan_id, address, first_ip, last_ip and scope are mandatory - response = post(client, '/api/networks', data={}, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data={}, token=admin_token) check_response_message(response, "Missing mandatory field 'vlan_name'", 422) - response = post(client, '/api/networks', data={'first_ip': '172.16.1.10', 'last_ip': '172.16.1.250'}, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data={'first_ip': '172.16.1.10', 'last_ip': '172.16.1.250'}, token=admin_token) check_response_message(response, "Missing mandatory field 'vlan_name'", 422) - response = post(client, '/api/networks', data={'address': '172.16.1.0/24'}, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data={'address': '172.16.1.0/24'}, token=admin_token) check_response_message(response, "Missing mandatory field 'vlan_name'", 422) - response = post(client, '/api/networks', data={'vlan_name': 'network1'}, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data={'vlan_name': 'network1'}, token=admin_token) check_response_message(response, "Missing mandatory field 'vlan_id'", 422) - response = post(client, '/api/networks', data={'vlan_name': 'network1', 'vlan_id': 1600}, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data={'vlan_name': 'network1', 'vlan_id': 1600}, token=admin_token) check_response_message(response, "Missing mandatory field 'address'", 422) - response = post(client, '/api/networks', data={'vlan_name': 'network1', 'vlan_id': 1600, 'address': '172.16.1.0/24', 'first_ip': '172.16.1.10'}, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data={'vlan_name': 'network1', 'vlan_id': 1600, 'address': '172.16.1.0/24', 'first_ip': '172.16.1.10'}, token=admin_token) check_response_message(response, "Missing mandatory field 'last_ip'", 422) data = {'vlan_name': 'network1', @@ -477,10 +472,10 @@ def test_create_network(client, admin_token, network_scope_factory): 'first_ip': '172.16.1.10', 'last_ip': '172.16.1.250', 'scope': scope.name} - response = post(client, '/api/networks', data=data, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) assert response.status_code == 201 assert {'id', 'vlan_name', 'vlan_id', 'address', 'first_ip', - 'last_ip', 'gateway', 'description', 'is_admin', 'scope'} == set(response.json.keys()) + 'last_ip', 'description', 'admin_only', 'scope'} == set(response.json.keys()) assert response.json['vlan_name'] == 'network1' assert response.json['vlan_id'] == 1600 assert response.json['address'] == '172.16.1.0/24' @@ -488,11 +483,11 @@ def test_create_network(client, admin_token, network_scope_factory): assert response.json['last_ip'] == '172.16.1.250' # Check that address and name shall be unique - response = post(client, '/api/networks', data=data, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) data_same_address = data.copy() data_same_address['vlan_name'] = 'networkX' - response = post(client, '/api/networks', data=data_same_address, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data_same_address, token=admin_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) data_same_name = {'vlan_name': 'network1', 'vlan_id': '1600', @@ -500,7 +495,7 @@ def test_create_network(client, admin_token, network_scope_factory): 'first_ip': '172.16.2.10', 'last_ip': '172.16.2.250', 'scope': scope.name} - response = post(client, '/api/networks', data=data_same_name, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data_same_name, token=admin_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) # Check that all parameters can be passed @@ -509,10 +504,9 @@ def test_create_network(client, admin_token, network_scope_factory): 'address': '172.16.5.0/24', 'first_ip': '172.16.5.11', 'last_ip': '172.16.5.250', - 'gateway': '172.16.5.10', 'description': 'long description', 'scope': scope.name} - response = post(client, '/api/networks', data=data2, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data2, token=admin_token) assert response.status_code == 201 assert response.json['description'] == 'long description' @@ -528,14 +522,14 @@ def test_create_network_invalid_address(client, session, admin_token, network_sc 'first_ip': '172.16.1.10', 'last_ip': '172.16.1.250', 'scope': network_scope.name} - response = post(client, '/api/networks', data=data, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, "'foo' does not appear to be an IPv4 or IPv6 network", 422) data['address'] = '172.16.1' - response = post(client, '/api/networks', data=data, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, "'172.16.1' does not appear to be an IPv4 or IPv6 network", 422) # address has host bits set data['address'] = '172.16.1.1/24' - response = post(client, '/api/networks', data=data, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, '172.16.1.1/24 has host bits set', 422) @@ -548,7 +542,7 @@ def test_create_network_invalid_ip(address, client, session, admin_token, networ 'first_ip': address, 'last_ip': '192.168.0.250', 'scope': network_scope.name} - response = post(client, '/api/networks', data=data, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, f"'{address}' does not appear to be an IPv4 or IPv6 address", 422) # invalid last IP address data = {'vlan_name': 'network1', @@ -557,7 +551,7 @@ def test_create_network_invalid_ip(address, client, session, admin_token, networ 'first_ip': '192.168.0.250', 'last_ip': address, 'scope': network_scope.name} - response = post(client, '/api/networks', data=data, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, f"'{address}' does not appear to be an IPv4 or IPv6 address", 422) @@ -569,7 +563,7 @@ def test_create_network_invalid_range(client, session, admin_token, network_scop 'first_ip': '172.16.2.10', 'last_ip': '172.16.1.250', 'scope': network_scope.name} - response = post(client, '/api/networks', data=data, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, 'IP address 172.16.2.10 is not in network 172.16.1.0/24', 422) # last_ip not in network address data = {'vlan_name': 'network1', @@ -578,7 +572,7 @@ def test_create_network_invalid_range(client, session, admin_token, network_scop 'first_ip': '172.16.1.10', 'last_ip': '172.16.5.250', 'scope': network_scope.name} - response = post(client, '/api/networks', data=data, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, 'IP address 172.16.5.250 is not in network 172.16.1.0/24', 422) # first_ip > last_ip data = {'vlan_name': 'network1', @@ -587,83 +581,81 @@ def test_create_network_invalid_range(client, session, admin_token, network_scop 'first_ip': '172.16.1.10', 'last_ip': '172.16.1.9', 'scope': network_scope.name} - response = post(client, '/api/networks', data=data, token=admin_token) + response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, 'Last IP address 172.16.1.9 is less than the first address 172.16.1.10', 422) -def test_get_hosts(client, network_factory, host_factory, readonly_token): - # Create some hosts +def test_get_interfaces(client, network_factory, interface_factory, readonly_token): + # Create some interfaces network1 = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250') network2 = network_factory(address='192.168.2.0/24', first_ip='192.168.2.10', last_ip='192.168.2.250') - host1 = host_factory(network=network1, ip='192.168.1.10') - host2 = host_factory(network=network1, ip='192.168.1.11', name='hostname2') - host3 = host_factory(network=network2, ip='192.168.2.10') + interface1 = interface_factory(network=network1, ip='192.168.1.10') + interface2 = interface_factory(network=network1, ip='192.168.1.11', name='interface2') + interface3 = interface_factory(network=network2, ip='192.168.2.10') - response = get(client, '/api/hosts', token=readonly_token) + response = get(client, f'{API_URL}/network/interfaces', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 3 - check_input_is_subset_of_response(response, (host1.to_dict(), host2.to_dict(), host3.to_dict())) + check_input_is_subset_of_response(response, (interface1.to_dict(), interface2.to_dict(), interface3.to_dict())) # test filtering by network_id - response = get(client, f'/api/hosts?network_id={network2.id}', token=readonly_token) + response = get(client, f'{API_URL}/network/interfaces?network_id={network2.id}', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 1 - check_input_is_subset_of_response(response, (host3.to_dict(),)) + check_input_is_subset_of_response(response, (interface3.to_dict(),)) -def test_create_host(client, network_factory, user_token): +def test_create_interface(client, network_factory, user_token): network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250') # check that network_id and ip are mandatory - response = post(client, '/api/hosts', data={}, token=user_token) + response = post(client, f'{API_URL}/network/interfaces', data={}, token=user_token) check_response_message(response, "Missing mandatory field 'network'", 422) - response = post(client, '/api/hosts', data={'ip': '192.168.1.20'}, token=user_token) + response = post(client, f'{API_URL}/network/interfaces', data={'ip': '192.168.1.20'}, token=user_token) check_response_message(response, "Missing mandatory field 'network'", 422) - response = post(client, '/api/hosts', data={'network': network.address}, token=user_token) + response = post(client, f'{API_URL}/network/interfaces', data={'network': network.address}, token=user_token) check_response_message(response, "Missing mandatory field 'ip'", 422) data = {'network': network.address, 'ip': '192.168.1.20', - 'name': 'hostname1'} - response = post(client, '/api/hosts', data=data, token=user_token) + 'name': 'interface1'} + response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token) assert response.status_code == 201 - assert {'id', 'network_id', 'ip', 'name', 'description'} == set(response.json.keys()) + assert {'id', 'network_id', 'ip', 'name'} == set(response.json.keys()) assert response.json['network_id'] == network.id assert response.json['ip'] == '192.168.1.20' - assert response.json['name'] == 'hostname1' + assert response.json['name'] == 'interface1' # Check that IP and name shall be unique - response = post(client, '/api/hosts', data=data, token=user_token) + response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) # Check that all parameters can be passed data2 = {'network': network.address, 'ip': '192.168.1.21', - 'name': 'myhostname', - 'description': 'my description'} - response = post(client, '/api/hosts', data=data2, token=user_token) + 'name': 'myhostname'} + response = post(client, f'{API_URL}/network/interfaces', data=data2, token=user_token) assert response.status_code == 201 - assert response.json['description'] == 'my description' # check all items that were created - assert models.Host.query.count() == 2 + assert models.Interface.query.count() == 2 @pytest.mark.parametrize('ip', ('', 'foo', '192.168')) -def test_create_host_invalid_ip(ip, client, network_factory, user_token): +def test_create_interface_invalid_ip(ip, client, network_factory, user_token): network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250') # invalid IP address data = {'network': network.address, 'ip': ip, 'name': 'hostname'} - response = post(client, '/api/hosts', data=data, token=user_token) + response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token) check_response_message(response, f"'{ip}' does not appear to be an IPv4 or IPv6 address", 422) -def test_create_host_ip_not_in_network(client, network_factory, user_token): +def test_create_interface_ip_not_in_network(client, network_factory, user_token): network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250') # IP address not in range data = {'network': network.address, 'ip': '192.168.2.4', 'name': 'hostname'} - response = post(client, '/api/hosts', data=data, token=user_token) + response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token) check_response_message(response, 'IP address 192.168.2.4 is not in network 192.168.1.0/24', 422) diff --git a/tests/functional/test_models.py b/tests/functional/test_models.py index 1009f7b..83a2fc8 100644 --- a/tests/functional/test_models.py +++ b/tests/functional/test_models.py @@ -34,13 +34,13 @@ def test_network_ip_properties(network_factory): assert network2.used_ips() == [] -def test_network_available_and_used_ips(network_factory, host_factory): - # Create some networks and hosts +def test_network_available_and_used_ips(network_factory, interface_factory): + # Create some networks and interfaces network1 = network_factory(address='172.16.1.0/24', first_ip='172.16.1.10', last_ip='172.16.1.250') network2 = network_factory(address='172.16.20.0/26', first_ip='172.16.20.11', last_ip='172.16.20.14') for i in range(10, 20): - host_factory(network=network1, ip=f'172.16.1.{i}') - host_factory(network=network2, ip='172.16.20.13') + interface_factory(network=network1, ip=f'172.16.1.{i}') + interface_factory(network=network2, ip='172.16.20.13') # Check available and used IPs assert network1.used_ips() == [ipaddress.ip_address(f'172.16.1.{i}') for i in range(10, 20)] assert network1.available_ips() == [ipaddress.ip_address(f'172.16.1.{i}') for i in range(20, 251)] @@ -49,9 +49,9 @@ def test_network_available_and_used_ips(network_factory, host_factory): ipaddress.ip_address('172.16.20.12'), ipaddress.ip_address('172.16.20.14')] - # Add more hosts - host_factory(network=network2, ip='172.16.20.11') - host_factory(network=network2, ip='172.16.20.14') + # Add more interfaces + interface_factory(network=network2, ip='172.16.20.11') + interface_factory(network=network2, ip='172.16.20.14') assert len(network2.used_ips()) == 3 assert network2.used_ips() == [ipaddress.ip_address('172.16.20.11'), ipaddress.ip_address('172.16.20.13'), @@ -59,6 +59,6 @@ def test_network_available_and_used_ips(network_factory, host_factory): assert network2.available_ips() == [ipaddress.ip_address('172.16.20.12')] # Add last available IP - host_factory(network=network2, ip='172.16.20.12') + interface_factory(network=network2, ip='172.16.20.12') assert network2.used_ips() == [ipaddress.ip_address(f'172.16.20.{i}') for i in range(11, 15)] assert list(network2.available_ips()) == [] diff --git a/tests/functional/test_web.py b/tests/functional/test_web.py index 9233a07..b298c41 100644 --- a/tests/functional/test_web.py +++ b/tests/functional/test_web.py @@ -11,7 +11,6 @@ This module defines basic web tests. """ import json import pytest -from app import models def get(client, url): @@ -26,11 +25,11 @@ def login(client, username, password): 'username': username, 'password': password } - return client.post('/login', data=data, follow_redirects=True) + return client.post('/users/login', data=data, follow_redirects=True) def logout(client): - return client.get('/logout', follow_redirects=True) + return client.get('/users/logout', follow_redirects=True) @pytest.fixture @@ -60,26 +59,26 @@ def test_index(logged_client): @pytest.mark.parametrize('url', [ '/', - '/items', - '/qrcodes', - '_retrieve_items', + '/inventory/items', + '/inventory/_retrieve_items', + '/network/networks', ]) def test_protected_url(url, client): response = client.get(url) assert response.status_code == 302 - assert '/login' in response.headers['Location'] + assert '/users/login' in response.headers['Location'] login(client, 'user_ro', 'userro') response = client.get(url) assert response.status_code == 200 def test_retrieve_items(logged_client, item_factory): - response = get(logged_client, '/_retrieve_items') + response = get(logged_client, '/inventory/_retrieve_items') assert response.json['data'] == [] serial_numbers = ('12345', '45678') for sn in serial_numbers: item_factory(serial_number=sn) - response = get(logged_client, '/_retrieve_items') + response = get(logged_client, '/inventory/_retrieve_items') items = response.json['data'] assert set(serial_numbers) == set(item[4] for item in items) assert len(items[0]) == 10 -- GitLab