# -*- coding: utf-8 -*- """ tests.functional.test_api ~~~~~~~~~~~~~~~~~~~~~~~~~ This module defines API tests. :copyright: (c) 2017 European Spallation Source ERIC :license: BSD 2-Clause, see LICENSE for more details. """ import datetime import json import pytest from app import models API_URL = '/api/v1' ENDPOINT_MODEL = { 'inventory/actions': models.Action, 'inventory/manufacturers': models.Manufacturer, 'inventory/models': models.Model, 'inventory/locations': models.Location, 'inventory/statuses': models.Status, 'inventory/items': models.Item, 'network/networks': models.Network, 'network/interfaces': models.Interface, 'network/hosts': models.Host, 'network/macs': models.Mac, 'network/domains': models.Domain, 'network/cnames': models.Cname, } GENERIC_GET_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key.startswith('inventory') and key != 'inventory/items'] GENERIC_CREATE_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key.startswith('inventory') and key not in ('inventory/items', 'inventory/actions')] CREATE_AUTH_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key != 'inventory/actions'] def get(client, url, token=None): response = client.get( url, headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {token}'}, ) if response.headers['Content-Type'] == 'application/json': response.json = json.loads(response.data) return response def post(client, url, data, token=None): headers = {'Content-Type': 'application/json'} if token is not None: headers['Authorization'] = f'Bearer {token}' response = client.post(url, data=json.dumps(data), headers=headers) if response.headers['Content-Type'] == 'application/json': response.json = json.loads(response.data) return response def patch(client, url, data, token=None): headers = {'Content-Type': 'application/json'} if token is not None: headers['Authorization'] = f'Bearer {token}' response = client.patch(url, data=json.dumps(data), headers=headers) if response.headers['Content-Type'] == 'application/json': response.json = json.loads(response.data) return response def login(client, username, password): data = { 'username': username, 'password': password } return post(client, f'{API_URL}/user/login', data) def get_token(client, username, password): response = login(client, username, password) return response.json['access_token'] @pytest.fixture() def readonly_token(client): return get_token(client, 'user_ro', 'userro') @pytest.fixture() def user_token(client): return get_token(client, 'user_rw', 'userrw') @pytest.fixture() def consultant_token(client): return get_token(client, 'consultant', 'consultantpwd') @pytest.fixture() def admin_token(client): return get_token(client, 'admin', 'adminpasswd') def check_response_message(response, msg, status_code=400): assert response.status_code == status_code try: data = response.json except AttributeError: data = json.loads(response.data) try: message = data['message'] except KeyError: # flask-jwt-extended is using "msg" instead of "message" # in its default callbacks message = data['msg'] assert message.startswith(msg) def check_names(response, names): response_names = set(item['name'] for item in response.json) assert set(names) == response_names def check_input_is_subset_of_response(response, inputs): # Sort the response by id to match the inputs order response_elts = sorted(response.json, key=lambda d: d['id']) for d1, d2 in zip(inputs, response_elts): for key, value in d1.items(): if isinstance(value, datetime.datetime): value = value.strftime('%Y-%m-%d %H:%M') assert d2[key] == value def test_login(client): response = client.post(f'{API_URL}/user/login') check_response_message(response, 'Body should be a JSON object') response = post(client, f'{API_URL}/user/login', data={'username': 'foo', 'passwd': ''}) check_response_message(response, 'Missing mandatory field (username or password)', 422) response = login(client, 'foo', 'invalid') check_response_message(response, 'Invalid credentials', 401) response = login(client, 'user_ro', 'userro') assert response.status_code == 200 assert 'access_token' in response.json @pytest.mark.parametrize('endpoint', GENERIC_GET_ENDPOINTS) def test_get_generic_model(endpoint, session, client, readonly_token): model = ENDPOINT_MODEL[endpoint] names = ('Foo', 'Bar', 'Alice') for name in names: session.add(model(name=name)) session.commit() response = client.get(f'{API_URL}/{endpoint}') check_response_message(response, 'Missing Authorization Header', 401) response = get(client, f'{API_URL}/{endpoint}', 'xxxxxxxxx') check_response_message(response, 'Not enough segments', 422) response = get(client, f'{API_URL}/{endpoint}', readonly_token) check_names(response, names) response = get(client, f'{API_URL}/{endpoint}', readonly_token) check_names(response, names) for item in response.json: assert 'qrcode' in item @pytest.mark.parametrize('endpoint', CREATE_AUTH_ENDPOINTS) def test_create_model_auth_fail(endpoint, client, readonly_token): response = client.post(f'{API_URL}/{endpoint}') check_response_message(response, 'Missing Authorization Header', 401) response = post(client, f'{API_URL}/{endpoint}', data={}, token='xxxxxxxxx') check_response_message(response, 'Not enough segments', 422) response = post(client, f'{API_URL}/{endpoint}', data={}, token=readonly_token) check_response_message(response, "User doesn't have the required group", 403) model = ENDPOINT_MODEL[endpoint] assert model.query.count() == 0 @pytest.mark.parametrize('endpoint', GENERIC_CREATE_ENDPOINTS) def test_create_generic_model(endpoint, client, user_token): response = post(client, f'{API_URL}/{endpoint}', data={}, token=user_token) check_response_message(response, "Missing mandatory field 'name'", 422) data = {'name': 'Foo'} response = post(client, f'{API_URL}/{endpoint}', data=data, token=user_token) assert response.status_code == 201 assert {'id', 'name'} <= set(response.json.keys()) assert response.json['name'] == 'Foo' response = post(client, f'{API_URL}/{endpoint}', data=data, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) response = post(client, f'{API_URL}/{endpoint}', data={'name': 'foo'}, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) response = post(client, f'{API_URL}/{endpoint}', data={'name': 'FOO'}, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) data = {'name': 'Bar', 'description': 'Bar description'} response = post(client, f'{API_URL}/{endpoint}', data=data, token=user_token) assert response.status_code == 201 assert response.json['description'] == 'Bar description' model = ENDPOINT_MODEL[endpoint] assert model.query.count() == 2 response = get(client, f'{API_URL}/{endpoint}', user_token) check_names(response, ('Foo', 'Bar')) @pytest.mark.parametrize('endpoint', GENERIC_CREATE_ENDPOINTS) def test_create_generic_model_invalid_param(endpoint, client, user_token): model = ENDPOINT_MODEL[endpoint] response = post(client, f'{API_URL}/{endpoint}', data={'name': 'foo', 'hello': 'world'}, token=user_token) check_response_message(response, f"'hello' is an invalid keyword argument for {model.__name__}", 422) def test_create_item(client, user_token): # check that serial_number is mandatory response = post(client, f'{API_URL}/inventory/items', data={}, token=user_token) check_response_message(response, "Missing mandatory field 'serial_number'", 422) # check create with only serial_number data = {'serial_number': '123456'} response = post(client, f'{API_URL}/inventory/items', data=data, token=user_token) assert response.status_code == 201 assert {'id', 'ics_id', 'serial_number', 'manufacturer', 'model', 'quantity', 'location', 'status', 'parent', 'children', 'macs', 'history', 'host', 'updated_at', 'created_at', 'user', 'comments'} == set(response.json.keys()) assert response.json['serial_number'] == '123456' # Check that serial_number doesn't have to be unique response = post(client, f'{API_URL}/inventory/items', data=data, token=user_token) assert response.status_code == 201 # check that ics_id shall be unique data2 = {'serial_number': '456789', 'ics_id': 'AAA001'} response = post(client, f'{API_URL}/inventory/items', data=data2, token=user_token) assert response.status_code == 201 response = post(client, f'{API_URL}/inventory/items', data=data2, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) # check all items that were created assert models.Item.query.count() == 3 response = get(client, f'{API_URL}/inventory/items', user_token) check_input_is_subset_of_response(response, (data, data, data2)) def test_create_item_with_host_id(client, host_factory, user_token): host = host_factory() # Check that we can pass an host_id data = {'serial_number': '123456', 'host_id': host.id} response = post(client, f'{API_URL}/inventory/items', data=data, token=user_token) assert response.status_code == 201 item = models.Item.query.filter_by(serial_number=data['serial_number']).first() assert item.host_id == host.id def test_create_item_invalid_ics_id(client, user_token): for ics_id in ('foo', 'AAB1234', 'AZ02', 'WS007', 'AAA01'): data = {'serial_number': '123456', 'ics_id': ics_id} response = post(client, f'{API_URL}/inventory/items', data=data, token=user_token) check_response_message(response, 'ICS id shall match [A-Z]{3}[0-9]{3}', 422) def test_get_item_fail(client, session, readonly_token): response = get(client, f'{API_URL}/inventory/items/50', token=readonly_token) check_response_message(response, "Item id '50' not found", 404) response = get(client, f'{API_URL}/inventory/items/bar', token=readonly_token) check_response_message(response, "Item id 'bar' not found", 404) def test_get_item(client, status_factory, item_factory, readonly_token): # Create some items status_factory(name='Stock') item1 = item_factory(serial_number='123456') item2 = item_factory(serial_number='234567', ics_id='AAA001', status='Stock') # we can get items by id... response = get(client, f'{API_URL}/inventory/items/{item1.id}', token=readonly_token) assert response.status_code == 200 assert response.json['id'] == item1.id assert response.json['serial_number'] == item1.serial_number # ...or ics_id response = get(client, f'{API_URL}/inventory/items/{item2.ics_id}', token=readonly_token) assert response.status_code == 200 assert response.json['id'] == item2.id assert response.json['ics_id'] == item2.ics_id assert response.json['serial_number'] == item2.serial_number assert response.json['status'] == str(item2.status) def test_patch_item_auth_fail(client, session, readonly_token): response = client.patch(f'{API_URL}/inventory/items/50') check_response_message(response, 'Missing Authorization Header', 401) response = patch(client, f'{API_URL}/inventory/items/50', data={}, token='xxxxxxxxx') check_response_message(response, 'Not enough segments', 422) response = patch(client, f'{API_URL}/inventory/items/50', data={}, token=readonly_token) check_response_message(response, "User doesn't have the required group", 403) def test_patch_item_fail(client, item_factory, user_token): response = patch(client, f'{API_URL}/inventory/items/50', data={}, token=user_token) check_response_message(response, 'At least one field is required', 422) data = {'location': 'ESS', 'foo': 'bar'} response = patch(client, f'{API_URL}/inventory/items/50', data=data, token=user_token) check_response_message(response, "Invalid field 'foo'", 422) data = {'location': 'ESS'} response = patch(client, f'{API_URL}/inventory/items/50', data=data, token=user_token) check_response_message(response, "Item id '50' not found", 404) response = patch(client, f'{API_URL}/inventory/items/bar', data=data, token=user_token) check_response_message(response, "Item id 'bar' not found", 404) # Create an item item1 = item_factory(serial_number='234567', ics_id='AAA001') # check that we can't change the serial_number or ics_id response = patch(client, f'{API_URL}/inventory/items/{item1.id}', data={'serial_number': '12345'}, token=user_token) check_response_message(response, "Invalid field 'serial_number'", 422) response = patch(client, f'{API_URL}/inventory/items/{item1.id}', data={'ics_id': 'AAA002'}, token=user_token) check_response_message(response, "'ics_id' can't be changed", 422) def test_patch_item(client, status, item_factory, user_token): # Create some items item1 = item_factory(ics_id='ZZZ001') item2 = item_factory() # we can patch items by id... data = {'ics_id': 'AAB004'} response = patch(client, f'{API_URL}/inventory/items/{item1.id}', data=data, token=user_token) assert response.status_code == 200 assert response.json['id'] == item1.id assert response.json['serial_number'] == item1.serial_number assert response.json['ics_id'] == data['ics_id'] # ...or ics_id data = {'status': status.name} response = patch(client, f'{API_URL}/inventory/items/{item2.ics_id}', data=data, token=user_token) assert response.status_code == 200 assert response.json['id'] == item2.id assert response.json['ics_id'] == item2.ics_id assert response.json['serial_number'] == item2.serial_number assert response.json['status'] == data['status'] def test_patch_item_integrity_error(client, user_token, item_factory): # Create some items item1 = item_factory() item2 = item_factory(ics_id='ZZZ001') data = {'ics_id': item1.ics_id} response = patch(client, f'{API_URL}/inventory/items/{item2.id}', data=data, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) def test_patch_item_parent(client, location_factory, manufacturer_factory, status_factory, item_factory, user_token): # Create some items location_factory(name='ESS') manufacturer_factory(name='HP') status_factory(name='In service') status_factory(name='Stock') item1 = item_factory(ics_id='AAA001', status='In service') item2 = item_factory(ics_id='AAA002') item3 = item_factory(ics_id='AAA003') item3.parent_id = item1.id # set parent changes the status and location data1 = {'parent': item1.ics_id} response = patch(client, f'{API_URL}/inventory/items/{item2.ics_id}', data=data1, token=user_token) assert response.status_code == 200 assert response.json['id'] == item2.id assert response.json['ics_id'] == item2.ics_id assert response.json['serial_number'] == item2.serial_number assert response.json['parent'] == item1.ics_id assert response.json['status'] == str(item1.status) assert response.json['location'] == str(item1.location) # updating a parent, modifies the status and location of all children # check location data2 = {'location': 'ESS'} response = patch(client, f'{API_URL}/inventory/items/{item1.ics_id}', data=data2, token=user_token) assert response.status_code == 200 assert response.json['id'] == item1.id assert response.json['ics_id'] == item1.ics_id assert response.json['serial_number'] == item1.serial_number assert response.json['status'] == str(item1.status) assert response.json['location'] == data2['location'] for ics_id in ('AAA002', 'AAA003'): response = get(client, f'{API_URL}/inventory/items/{ics_id}', token=user_token) assert response.json['location'] == data2['location'] assert response.json['status'] == 'In service' # check status data3 = {'status': 'Stock'} response = patch(client, f'{API_URL}/inventory/items/{item1.ics_id}', data=data3, token=user_token) assert response.status_code == 200 assert response.json['status'] == data3['status'] for ics_id in ('AAA002', 'AAA003'): response = get(client, f'{API_URL}/inventory/items/{ics_id}', token=user_token) assert response.json['location'] == data2['location'] assert response.json['status'] == data3['status'] # manufacturer has no impact on children data4 = {'manufacturer': 'HP'} response = patch(client, f'{API_URL}/inventory/items/{item1.ics_id}', data=data4, token=user_token) assert response.status_code == 200 assert response.json['manufacturer'] == 'HP' # Manufacturer didn't change on children response = get(client, f'{API_URL}/inventory/items/{item2.ics_id}', token=user_token) assert response.json['manufacturer'] == str(item2.manufacturer) assert str(item2.manufacturer) != 'HP' response = get(client, f'{API_URL}/inventory/items/{item3.ics_id}', token=user_token) assert response.json['manufacturer'] == str(item3.manufacturer) assert str(item3.manufacturer) != 'HP' def test_get_items(client, location_factory, item_factory, readonly_token): # Create some items location_factory(name='ESS') item1 = item_factory(location='ESS') item2 = item_factory(serial_number='234567') item3 = item_factory() response = get(client, f'{API_URL}/inventory/items', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 3 check_input_is_subset_of_response(response, (item1.to_dict(), item2.to_dict(), item3.to_dict())) # test filtering response = get(client, f'{API_URL}/inventory/items?serial_number=234567', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 1 check_input_is_subset_of_response(response, (item2.to_dict(),)) # filtering on location_id works but not location (might want to change that) response = get(client, f'{API_URL}/inventory/items?location_id={item1.location_id}', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 1 check_input_is_subset_of_response(response, (item1.to_dict(),)) response = get(client, f'{API_URL}/inventory/items?location=ESS', token=readonly_token) check_response_message(response, 'Invalid query arguments', 422) # using an unknown key raises a 422 response = get(client, f'{API_URL}/inventory/items?foo=bar', token=readonly_token) check_response_message(response, 'Invalid query arguments', 422) def test_get_networks(client, network_factory, readonly_token): # Create some networks network1 = network_factory(address='172.16.1.0/24', first_ip='172.16.1.1', last_ip='172.16.1.254') network2 = network_factory(address='172.16.20.0/22', first_ip='172.16.20.11', last_ip='172.16.20.250') network3 = network_factory(address='172.16.5.0/24', first_ip='172.16.5.10', last_ip='172.16.5.254') response = get(client, f'{API_URL}/network/networks', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 3 check_input_is_subset_of_response(response, (network1.to_dict(), network2.to_dict(), network3.to_dict())) # test filtering by address response = get(client, f'{API_URL}/network/networks?address=172.16.20.0/22', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 1 check_input_is_subset_of_response(response, (network2.to_dict(),)) def test_create_network_auth_fail(client, session, user_token): # admin is required to create networks response = post(client, f'{API_URL}/network/networks', data={}, token=user_token) check_response_message(response, "User doesn't have the required group", 403) def test_create_network(client, admin_token, network_scope_factory): scope = network_scope_factory(supernet='172.16.0.0/16') # check that vlan_name, vlan_id, address, first_ip, last_ip and scope are mandatory response = post(client, f'{API_URL}/network/networks', data={}, token=admin_token) check_response_message(response, "Missing mandatory field 'vlan_name'", 422) response = post(client, f'{API_URL}/network/networks', data={'first_ip': '172.16.1.10', 'last_ip': '172.16.1.250'}, token=admin_token) check_response_message(response, "Missing mandatory field 'vlan_name'", 422) response = post(client, f'{API_URL}/network/networks', data={'address': '172.16.1.0/24'}, token=admin_token) check_response_message(response, "Missing mandatory field 'vlan_name'", 422) response = post(client, f'{API_URL}/network/networks', data={'vlan_name': 'network1'}, token=admin_token) check_response_message(response, "Missing mandatory field 'vlan_id'", 422) response = post(client, f'{API_URL}/network/networks', data={'vlan_name': 'network1', 'vlan_id': 1600}, token=admin_token) check_response_message(response, "Missing mandatory field 'address'", 422) response = post(client, f'{API_URL}/network/networks', data={'vlan_name': 'network1', 'vlan_id': 1600, 'address': '172.16.1.0/24', 'first_ip': '172.16.1.10'}, token=admin_token) check_response_message(response, "Missing mandatory field 'last_ip'", 422) data = {'vlan_name': 'network1', 'vlan_id': 1600, 'address': '172.16.1.0/24', 'first_ip': '172.16.1.10', 'last_ip': '172.16.1.250', 'scope': scope.name} response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) assert response.status_code == 201 assert {'id', 'vlan_name', 'vlan_id', 'address', 'netmask', 'first_ip', 'last_ip', 'description', 'admin_only', 'scope', 'domain', 'interfaces', 'created_at', 'updated_at', 'user'} == set(response.json.keys()) assert response.json['vlan_name'] == 'network1' assert response.json['vlan_id'] == 1600 assert response.json['address'] == '172.16.1.0/24' assert response.json['first_ip'] == '172.16.1.10' assert response.json['last_ip'] == '172.16.1.250' assert response.json['netmask'] == '255.255.255.0' # Check that address and name shall be unique response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) data_same_address = data.copy() data_same_address['vlan_name'] = 'networkX' response = post(client, f'{API_URL}/network/networks', data=data_same_address, token=admin_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) data_same_name = {'vlan_name': 'network1', 'vlan_id': '1600', 'address': '172.16.2.0/24', 'first_ip': '172.16.2.10', 'last_ip': '172.16.2.250', 'scope': scope.name} response = post(client, f'{API_URL}/network/networks', data=data_same_name, token=admin_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) # Check that all parameters can be passed data2 = {'vlan_name': 'network2', 'vlan_id': '1601', 'address': '172.16.5.0/24', 'first_ip': '172.16.5.11', 'last_ip': '172.16.5.250', 'description': 'long description', 'scope': scope.name} response = post(client, f'{API_URL}/network/networks', data=data2, token=admin_token) assert response.status_code == 201 assert response.json['description'] == 'long description' # check all items that were created assert models.Network.query.count() == 2 def test_create_network_invalid_address(client, admin_token, network_scope): # invalid network address data = {'vlan_name': 'network1', 'vlan_id': '1600', 'address': 'foo', 'first_ip': '172.16.1.10', 'last_ip': '172.16.1.250', 'scope': network_scope.name} response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, "'foo' does not appear to be an IPv4 or IPv6 network", 422) data['address'] = '172.16.1' response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, "'172.16.1' does not appear to be an IPv4 or IPv6 network", 422) # address has host bits set data['address'] = '172.16.1.1/24' response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, '172.16.1.1/24 has host bits set', 422) @pytest.mark.parametrize('address', ('', 'foo', '192.168')) def test_create_network_invalid_ip(address, client, session, admin_token, network_scope): # invalid first IP address data = {'vlan_name': 'network1', 'vlan_id': '1600', 'address': '192.168.0.0/24', 'first_ip': address, 'last_ip': '192.168.0.250', 'scope': network_scope.name} response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, f"'{address}' does not appear to be an IPv4 or IPv6 address", 422) # invalid last IP address data = {'vlan_name': 'network1', 'vlan_id': '1600', 'address': '192.168.0.0/24', 'first_ip': '192.168.0.250', 'last_ip': address, 'scope': network_scope.name} response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, f"'{address}' does not appear to be an IPv4 or IPv6 address", 422) def test_create_network_invalid_range(client, session, admin_token, network_scope): # first_ip not in network address data = {'vlan_name': 'network1', 'vlan_id': '1600', 'address': '172.16.1.0/24', 'first_ip': '172.16.2.10', 'last_ip': '172.16.1.250', 'scope': network_scope.name} response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, 'IP address 172.16.2.10 is not in network 172.16.1.0/24', 422) # last_ip not in network address data = {'vlan_name': 'network1', 'vlan_id': '1600', 'address': '172.16.1.0/24', 'first_ip': '172.16.1.10', 'last_ip': '172.16.5.250', 'scope': network_scope.name} response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, 'IP address 172.16.5.250 is not in network 172.16.1.0/24', 422) # first_ip > last_ip data = {'vlan_name': 'network1', 'vlan_id': '1600', 'address': '172.16.1.0/24', 'first_ip': '172.16.1.10', 'last_ip': '172.16.1.9', 'scope': network_scope.name} response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token) check_response_message(response, 'Last IP address 172.16.1.9 is less than the first address 172.16.1.10', 422) def test_get_interfaces(client, network_factory, interface_factory, readonly_token): # Create some interfaces network1 = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250') network2 = network_factory(address='192.168.2.0/24', first_ip='192.168.2.10', last_ip='192.168.2.250') interface1 = interface_factory(network=network1, ip='192.168.1.10') interface2 = interface_factory(network=network1, ip='192.168.1.11', name='interface2') interface3 = interface_factory(network=network2, ip='192.168.2.10') response = get(client, f'{API_URL}/network/interfaces', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 3 check_input_is_subset_of_response(response, (interface1.to_dict(), interface2.to_dict(), interface3.to_dict())) # test filtering by network_id response = get(client, f'{API_URL}/network/interfaces?network_id={network2.id}', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 1 check_input_is_subset_of_response(response, (interface3.to_dict(),)) def test_get_interfaces_by_domain(client, domain_factory, network_factory, interface_factory, readonly_token): # Create some interfaces domain1 = domain_factory(name='tn.esss.lu.se') domain2 = domain_factory(name='ics.esss.lu.se') network1 = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250', domain=domain1) network2 = network_factory(address='192.168.2.0/24', first_ip='192.168.2.10', last_ip='192.168.2.250', domain=domain2) interface1 = interface_factory(network=network1, ip='192.168.1.10') interface2 = interface_factory(network=network1, ip='192.168.1.11', name='interface2') interface3 = interface_factory(network=network2, ip='192.168.2.10') # test filtering by domain response = get(client, f'{API_URL}/network/interfaces?domain=tn.esss.lu.se', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 2 check_input_is_subset_of_response(response, (interface1.to_dict(), interface2.to_dict())) response = get(client, f'{API_URL}/network/interfaces?domain=ics.esss.lu.se', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 1 check_input_is_subset_of_response(response, (interface3.to_dict(),)) def test_create_interface(client, network_factory, user_token): network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250') # check that network_id and ip are mandatory response = post(client, f'{API_URL}/network/interfaces', data={}, token=user_token) check_response_message(response, "Missing mandatory field 'network'", 422) response = post(client, f'{API_URL}/network/interfaces', data={'ip': '192.168.1.20'}, token=user_token) check_response_message(response, "Missing mandatory field 'network'", 422) response = post(client, f'{API_URL}/network/interfaces', data={'network': network.address}, token=user_token) check_response_message(response, "Missing mandatory field 'ip'", 422) data = {'network': network.vlan_name, 'ip': '192.168.1.20', 'name': 'interface1'} response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token) assert response.status_code == 201 assert {'id', 'network', 'ip', 'name', 'mac', 'domain', 'host', 'device_type', 'cnames', 'tags', 'created_at', 'updated_at', 'user'} == set(response.json.keys()) assert response.json['network'] == network.vlan_name assert response.json['ip'] == '192.168.1.20' assert response.json['name'] == 'interface1' # Check that IP and name shall be unique response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) # Check that all parameters can be passed data2 = {'network': network.vlan_name, 'ip': '192.168.1.21', 'name': 'myhostname'} response = post(client, f'{API_URL}/network/interfaces', data=data2, token=user_token) assert response.status_code == 201 # check all items that were created assert models.Interface.query.count() == 2 @pytest.mark.parametrize('ip', ('', 'foo', '192.168')) def test_create_interface_invalid_ip(ip, client, network_factory, user_token): network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250') # invalid IP address data = {'network': network.vlan_name, 'ip': ip, 'name': 'hostname'} response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token) check_response_message(response, f"'{ip}' does not appear to be an IPv4 or IPv6 address", 422) def test_create_interface_ip_not_in_network(client, network_factory, user_token): network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250') # IP address not in range data = {'network': network.vlan_name, 'ip': '192.168.2.4', 'name': 'hostname'} response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token) check_response_message(response, 'IP address 192.168.2.4 is not in network 192.168.1.0/24', 422) def test_create_interface_ip_not_in_range(client, network_factory, user_token): network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250') # IP address not in range data = {'network': network.vlan_name, 'ip': '192.168.1.4', 'name': 'hostname'} response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token) check_response_message(response, 'IP address 192.168.1.4 is not in range 192.168.1.10 - 192.168.1.250', 422) def test_create_interface_ip_not_in_range_as_admin(client, network_factory, admin_token): network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250') # IP address not in range data = {'network': network.vlan_name, 'ip': '192.168.1.4', 'name': 'hostname'} response = post(client, f'{API_URL}/network/interfaces', data=data, token=admin_token) assert response.status_code == 201 def test_get_macs(client, mac_factory, readonly_token): # Create some macs mac1 = mac_factory() mac2 = mac_factory() response = get(client, f'{API_URL}/network/macs', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 2 check_input_is_subset_of_response(response, (mac1.to_dict(), mac2.to_dict())) def test_create_mac(client, item_factory, user_token): item = item_factory() # check that address is mandatory response = post(client, f'{API_URL}/network/macs', data={}, token=user_token) check_response_message(response, "Missing mandatory field 'address'", 422) data = {'address': 'b5:4b:7d:a4:23:43'} response = post(client, f'{API_URL}/network/macs', data=data, token=user_token) assert response.status_code == 201 assert {'id', 'address', 'item', 'interfaces'} == set(response.json.keys()) assert response.json['address'] == data['address'] # Check that address shall be unique response = post(client, f'{API_URL}/network/macs', data=data, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) # Check that all parameters can be passed data2 = {'address': 'b5:4b:7d:a4:23:44', 'item_id': item.id} response = post(client, f'{API_URL}/network/macs', data=data2, token=user_token) assert response.status_code == 201 # check that all items were created assert models.Mac.query.count() == 2 @pytest.mark.parametrize('address', ('', 'foo', 'b5:4b:7d:a4:23')) def test_create_mac_invalid_address(address, client, user_token): data = {'address': address} response = post(client, f'{API_URL}/network/macs', data=data, token=user_token) check_response_message(response, f"'{address}' does not appear to be a MAC address", 422) def test_get_hosts(client, host_factory, readonly_token): # Create some hosts host1 = host_factory() host2 = host_factory() response = get(client, f'{API_URL}/network/hosts', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 2 check_input_is_subset_of_response(response, (host1.to_dict(), host2.to_dict())) def test_create_host(client, device_type_factory, user_token): device_type = device_type_factory(name='Virtual') # check that name and device_type are mandatory response = post(client, f'{API_URL}/network/hosts', data={}, token=user_token) check_response_message(response, "Missing mandatory field 'name'", 422) response = post(client, f'{API_URL}/network/hosts', data={'name': 'myhost'}, token=user_token) check_response_message(response, "Missing mandatory field 'device_type'", 422) response = post(client, f'{API_URL}/network/hosts', data={'device_type': 'Physical'}, token=user_token) check_response_message(response, "Missing mandatory field 'name'", 422) data = {'name': 'my-hostname', 'device_type': device_type.name} response = post(client, f'{API_URL}/network/hosts', data=data, token=user_token) assert response.status_code == 201 assert {'id', 'name', 'device_type', 'description', 'items', 'interfaces', 'created_at', 'updated_at', 'user'} == set(response.json.keys()) assert response.json['name'] == data['name'] # Check that name shall be unique response = post(client, f'{API_URL}/network/hosts', data=data, token=user_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) # check that the number of items created assert models.Host.query.count() == 1 def test_create_host_with_items(client, item_factory, device_type_factory, user_token): device_type = device_type_factory(name='Switch') item1 = item_factory(ics_id='AAA001') item2 = item_factory(ics_id='AAA002') # Check that we can pass a list of items ics_id data = {'name': 'my-switch', 'device_type': device_type.name, 'items': [item1.ics_id, item2.ics_id]} response = post(client, f'{API_URL}/network/hosts', data=data, token=user_token) assert response.status_code == 201 host = models.Host.query.filter_by(name='my-switch').first() assert models.Item.query.get(item1.id).host_id == host.id assert models.Item.query.get(item2.id).host_id == host.id def test_create_host_as_consultant(client, item_factory, device_type_factory, consultant_token): device_type = device_type_factory() data = {'name': 'my-hostname', 'device_type': device_type.name} response = post(client, f'{API_URL}/network/hosts', data=data, token=consultant_token) assert response.status_code == 201 def test_get_user_profile(client, readonly_token): response = get(client, f'{API_URL}/user/profile', token=readonly_token) assert response.status_code == 200 user = response.json assert {'id', 'username', 'groups', 'email', 'display_name'} == set(user.keys()) assert user['username'] == 'user_ro' assert user['display_name'] == 'User RO' assert user['email'] == 'user_ro@example.com' def test_get_domains(client, domain_factory, readonly_token): # Create some domains domain1 = domain_factory() domain2 = domain_factory() response = get(client, f'{API_URL}/network/domains', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 2 check_input_is_subset_of_response(response, (domain1.to_dict(), domain2.to_dict())) def test_create_domain(client, admin_token): # check that name is mandatory response = post(client, f'{API_URL}/network/domains', data={}, token=admin_token) check_response_message(response, "Missing mandatory field 'name'", 422) data = {'name': 'tn.esss.lu.se'} response = post(client, f'{API_URL}/network/domains', data=data, token=admin_token) assert response.status_code == 201 assert {'id', 'name', 'scopes', 'networks', 'created_at', 'updated_at', 'user'} == set(response.json.keys()) assert response.json['name'] == data['name'] # Check that name shall be unique response = post(client, f'{API_URL}/network/domains', data=data, token=admin_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422) def test_get_cnames(client, cname_factory, readonly_token): # Create some cnames cname1 = cname_factory() cname2 = cname_factory() response = get(client, f'{API_URL}/network/cnames', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 2 check_input_is_subset_of_response(response, (cname1.to_dict(), cname2.to_dict())) def test_get_cnames_by_domain(client, domain_factory, network_factory, interface_factory, cname_factory, readonly_token): # Create some cnames domain_a = domain_factory(name='a.esss.lu.se') domain_b = domain_factory(name='b.esss.lu.se') network_a = network_factory(domain=domain_a) network_b = network_factory(domain=domain_b) interface_a1 = interface_factory(network=network_a) interface_a2 = interface_factory(network=network_a) interface_b1 = interface_factory(network=network_b) cname_a1 = cname_factory(interface=interface_a1) cname_a2 = cname_factory(interface=interface_a1) cname_a3 = cname_factory(interface=interface_a2) cname_b1 = cname_factory(interface=interface_b1) cname_b2 = cname_factory(interface=interface_b1) response = get(client, f'{API_URL}/network/cnames', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 5 response = get(client, f'{API_URL}/network/cnames?domain=a.esss.lu.se', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 3 check_input_is_subset_of_response(response, (cname_a1.to_dict(), cname_a2.to_dict(), cname_a3.to_dict())) response = get(client, f'{API_URL}/network/cnames?domain=b.esss.lu.se', token=readonly_token) assert response.status_code == 200 assert len(response.json) == 2 check_input_is_subset_of_response(response, (cname_b1.to_dict(), cname_b2.to_dict())) def test_create_cname(client, interface, admin_token): # check that name and interface_id are mandatory response = post(client, f'{API_URL}/network/cnames', data={}, token=admin_token) check_response_message(response, "Missing mandatory field 'name'", 422) response = post(client, f'{API_URL}/network/cnames', data={'name': 'myhost'}, token=admin_token) check_response_message(response, "Missing mandatory field 'interface_id'", 422) response = post(client, f'{API_URL}/network/cnames', data={'interface_id': interface.id}, token=admin_token) check_response_message(response, "Missing mandatory field 'name'", 422) data = {'name': 'myhost.tn.esss.lu.se', 'interface_id': interface.id} response = post(client, f'{API_URL}/network/cnames', data=data, token=admin_token) assert response.status_code == 201 assert {'id', 'name', 'interface', 'created_at', 'updated_at', 'user'} == set(response.json.keys()) assert response.json['name'] == data['name'] # Check that name shall be unique response = post(client, f'{API_URL}/network/cnames', data=data, token=admin_token) check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)