Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_api.py 43.07 KiB
# -*- coding: utf-8 -*-
"""
tests.functional.test_api
~~~~~~~~~~~~~~~~~~~~~~~~~

This module defines API tests.

:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.

"""
import datetime
import json
import pytest
from app import models


API_URL = '/api/v1'
ENDPOINT_MODEL = {
    'inventory/actions': models.Action,
    'inventory/manufacturers': models.Manufacturer,
    'inventory/models': models.Model,
    'inventory/locations': models.Location,
    'inventory/statuses': models.Status,
    'inventory/items': models.Item,
    'network/networks': models.Network,
    'network/interfaces': models.Interface,
    'network/hosts': models.Host,
    'network/macs': models.Mac,
    'network/domains': models.Domain,
    'network/cnames': models.Cname,
}
GENERIC_GET_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys()
                         if key.startswith('inventory') and key != 'inventory/items']
GENERIC_CREATE_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys()
                            if key.startswith('inventory') and key not in ('inventory/items', 'inventory/actions')]
CREATE_AUTH_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key != 'inventory/actions']


def get(client, url, token=None):
    response = client.get(
        url,
        headers={'Content-Type': 'application/json',
                 'Authorization': f'Bearer {token}'},
    )
    if response.headers['Content-Type'] == 'application/json':
        response.json = json.loads(response.data)
    return response


def post(client, url, data, token=None):
    headers = {'Content-Type': 'application/json'}
    if token is not None:
        headers['Authorization'] = f'Bearer {token}'
    response = client.post(url, data=json.dumps(data), headers=headers)
    if response.headers['Content-Type'] == 'application/json':
        response.json = json.loads(response.data)
    return response


def patch(client, url, data, token=None):
    headers = {'Content-Type': 'application/json'}
    if token is not None:
        headers['Authorization'] = f'Bearer {token}'
    response = client.patch(url, data=json.dumps(data), headers=headers)
    if response.headers['Content-Type'] == 'application/json':
        response.json = json.loads(response.data)
    return response


def login(client, username, password):
    data = {
        'username': username,
        'password': password
    }
    return post(client, f'{API_URL}/user/login', data)


def get_token(client, username, password):
    response = login(client, username, password)
    return response.json['access_token']


@pytest.fixture()
def readonly_token(client):
    return get_token(client, 'user_ro', 'userro')


@pytest.fixture()
def user_token(client):
    return get_token(client, 'user_rw', 'userrw')


@pytest.fixture()
def consultant_token(client):
    return get_token(client, 'consultant', 'consultantpwd')


@pytest.fixture()
def admin_token(client):
    return get_token(client, 'admin', 'adminpasswd')


def check_response_message(response, msg, status_code=400):
    assert response.status_code == status_code
    try:
        data = response.json
    except AttributeError:
        data = json.loads(response.data)
    try:
        message = data['message']
    except KeyError:
        # flask-jwt-extended is using "msg" instead of "message"
        # in its default callbacks
        message = data['msg']
    assert message.startswith(msg)


def check_names(response, names):
    response_names = set(item['name'] for item in response.json)
    assert set(names) == response_names


def check_input_is_subset_of_response(response, inputs):
    # Sort the response by id to match the inputs order
    response_elts = sorted(response.json, key=lambda d: d['id'])
    for d1, d2 in zip(inputs, response_elts):
        for key, value in d1.items():
            if isinstance(value, datetime.datetime):
                value = value.strftime('%Y-%m-%d %H:%M')
            assert d2[key] == value


def test_login(client):
    response = client.post(f'{API_URL}/user/login')
    check_response_message(response, 'Body should be a JSON object')
    response = post(client, f'{API_URL}/user/login', data={'username': 'foo', 'passwd': ''})
    check_response_message(response, 'Missing mandatory field (username or password)', 422)
    response = login(client, 'foo', 'invalid')
    check_response_message(response, 'Invalid credentials', 401)
    response = login(client, 'user_ro', 'userro')
    assert response.status_code == 200
    assert 'access_token' in response.json


@pytest.mark.parametrize('endpoint', GENERIC_GET_ENDPOINTS)
def test_get_generic_model(endpoint, session, client, readonly_token):
    model = ENDPOINT_MODEL[endpoint]
    names = ('Foo', 'Bar', 'Alice')
    for name in names:
        session.add(model(name=name))
    session.commit()
    response = client.get(f'{API_URL}/{endpoint}')
    check_response_message(response, 'Missing Authorization Header', 401)
    response = get(client, f'{API_URL}/{endpoint}', 'xxxxxxxxx')
    check_response_message(response, 'Not enough segments', 422)
    response = get(client, f'{API_URL}/{endpoint}', readonly_token)
    check_names(response, names)
    response = get(client, f'{API_URL}/{endpoint}', readonly_token)
    check_names(response, names)
    for item in response.json:
        assert 'qrcode' in item


@pytest.mark.parametrize('endpoint', CREATE_AUTH_ENDPOINTS)
def test_create_model_auth_fail(endpoint, client, readonly_token):
    response = client.post(f'{API_URL}/{endpoint}')
    check_response_message(response, 'Missing Authorization Header', 401)
    response = post(client, f'{API_URL}/{endpoint}', data={}, token='xxxxxxxxx')
    check_response_message(response, 'Not enough segments', 422)
    response = post(client, f'{API_URL}/{endpoint}', data={}, token=readonly_token)
    check_response_message(response, "User doesn't have the required group", 403)
    model = ENDPOINT_MODEL[endpoint]
    assert model.query.count() == 0


@pytest.mark.parametrize('endpoint', GENERIC_CREATE_ENDPOINTS)
def test_create_generic_model(endpoint, client, user_token):
    response = post(client, f'{API_URL}/{endpoint}', data={}, token=user_token)
    check_response_message(response, "Missing mandatory field 'name'", 422)
    data = {'name': 'Foo'}
    response = post(client, f'{API_URL}/{endpoint}', data=data, token=user_token)
    assert response.status_code == 201
    assert {'id', 'name'} <= set(response.json.keys())
    assert response.json['name'] == 'Foo'
    response = post(client, f'{API_URL}/{endpoint}', data=data, token=user_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)
    response = post(client, f'{API_URL}/{endpoint}', data={'name': 'foo'}, token=user_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)
    response = post(client, f'{API_URL}/{endpoint}', data={'name': 'FOO'}, token=user_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)
    data = {'name': 'Bar', 'description': 'Bar description'}
    response = post(client, f'{API_URL}/{endpoint}', data=data, token=user_token)
    assert response.status_code == 201
    assert response.json['description'] == 'Bar description'
    model = ENDPOINT_MODEL[endpoint]
    assert model.query.count() == 2
    response = get(client, f'{API_URL}/{endpoint}', user_token)
    check_names(response, ('Foo', 'Bar'))


@pytest.mark.parametrize('endpoint', GENERIC_CREATE_ENDPOINTS)
def test_create_generic_model_invalid_param(endpoint, client, user_token):
    model = ENDPOINT_MODEL[endpoint]
    response = post(client, f'{API_URL}/{endpoint}', data={'name': 'foo', 'hello': 'world'}, token=user_token)
    check_response_message(response, f"'hello' is an invalid keyword argument for {model.__name__}", 422)


def test_create_item(client, user_token):
    # check that serial_number is mandatory
    response = post(client, f'{API_URL}/inventory/items', data={}, token=user_token)
    check_response_message(response, "Missing mandatory field 'serial_number'", 422)

    # check create with only serial_number
    data = {'serial_number': '123456'}
    response = post(client, f'{API_URL}/inventory/items', data=data, token=user_token)
    assert response.status_code == 201
    assert {'id', 'ics_id', 'serial_number', 'manufacturer', 'model', 'quantity',
            'location', 'status', 'parent', 'children', 'macs', 'history', 'host',
            'updated_at', 'created_at', 'user', 'comments'} == set(response.json.keys())
    assert response.json['serial_number'] == '123456'

    # Check that serial_number doesn't have to be unique
    response = post(client, f'{API_URL}/inventory/items', data=data, token=user_token)
    assert response.status_code == 201

    # check that ics_id shall be unique
    data2 = {'serial_number': '456789', 'ics_id': 'AAA001'}
    response = post(client, f'{API_URL}/inventory/items', data=data2, token=user_token)
    assert response.status_code == 201
    response = post(client, f'{API_URL}/inventory/items', data=data2, token=user_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)

    # check all items that were created
    assert models.Item.query.count() == 3
    response = get(client, f'{API_URL}/inventory/items', user_token)
    check_input_is_subset_of_response(response, (data, data, data2))


def test_create_item_with_host_id(client, host_factory, user_token):
    host = host_factory()
    # Check that we can pass an host_id
    data = {'serial_number': '123456',
            'host_id': host.id}
    response = post(client, f'{API_URL}/inventory/items', data=data, token=user_token)
    assert response.status_code == 201
    item = models.Item.query.filter_by(serial_number=data['serial_number']).first()
    assert item.host_id == host.id


def test_create_item_invalid_ics_id(client, user_token):
    for ics_id in ('foo', 'AAB1234', 'AZ02', 'WS007', 'AAA01'):
        data = {'serial_number': '123456', 'ics_id': ics_id}
        response = post(client, f'{API_URL}/inventory/items', data=data, token=user_token)
        check_response_message(response, 'ICS id shall match [A-Z]{3}[0-9]{3}', 422)


def test_get_item_fail(client, session, readonly_token):
    response = get(client, f'{API_URL}/inventory/items/50', token=readonly_token)
    check_response_message(response, "Item id '50' not found", 404)
    response = get(client, f'{API_URL}/inventory/items/bar', token=readonly_token)
    check_response_message(response, "Item id 'bar' not found", 404)


def test_get_item(client, status_factory, item_factory, readonly_token):
    # Create some items
    status_factory(name='Stock')
    item1 = item_factory(serial_number='123456')
    item2 = item_factory(serial_number='234567', ics_id='AAA001', status='Stock')

    # we can get items by id...
    response = get(client, f'{API_URL}/inventory/items/{item1.id}', token=readonly_token)
    assert response.status_code == 200
    assert response.json['id'] == item1.id
    assert response.json['serial_number'] == item1.serial_number
    # ...or ics_id
    response = get(client, f'{API_URL}/inventory/items/{item2.ics_id}', token=readonly_token)
    assert response.status_code == 200
    assert response.json['id'] == item2.id
    assert response.json['ics_id'] == item2.ics_id
    assert response.json['serial_number'] == item2.serial_number
    assert response.json['status'] == str(item2.status)


def test_patch_item_auth_fail(client, session, readonly_token):
    response = client.patch(f'{API_URL}/inventory/items/50')
    check_response_message(response, 'Missing Authorization Header', 401)
    response = patch(client, f'{API_URL}/inventory/items/50', data={}, token='xxxxxxxxx')
    check_response_message(response, 'Not enough segments', 422)
    response = patch(client, f'{API_URL}/inventory/items/50', data={}, token=readonly_token)
    check_response_message(response, "User doesn't have the required group", 403)


def test_patch_item_fail(client, item_factory, user_token):
    response = patch(client, f'{API_URL}/inventory/items/50', data={}, token=user_token)
    check_response_message(response, 'At least one field is required', 422)
    data = {'location': 'ESS', 'foo': 'bar'}
    response = patch(client, f'{API_URL}/inventory/items/50', data=data, token=user_token)
    check_response_message(response, "Invalid field 'foo'", 422)
    data = {'location': 'ESS'}
    response = patch(client, f'{API_URL}/inventory/items/50', data=data, token=user_token)
    check_response_message(response, "Item id '50' not found", 404)
    response = patch(client, f'{API_URL}/inventory/items/bar', data=data, token=user_token)
    check_response_message(response, "Item id 'bar' not found", 404)

    # Create an item
    item1 = item_factory(serial_number='234567', ics_id='AAA001')
    # check that we can't change the serial_number or ics_id
    response = patch(client, f'{API_URL}/inventory/items/{item1.id}', data={'serial_number': '12345'}, token=user_token)
    check_response_message(response, "Invalid field 'serial_number'", 422)
    response = patch(client, f'{API_URL}/inventory/items/{item1.id}', data={'ics_id': 'AAA002'}, token=user_token)
    check_response_message(response, "'ics_id' can't be changed", 422)


def test_patch_item(client, status, item_factory, user_token):
    # Create some items
    item1 = item_factory(ics_id='ZZZ001')
    item2 = item_factory()

    # we can patch items by id...
    data = {'ics_id': 'AAB004'}
    response = patch(client, f'{API_URL}/inventory/items/{item1.id}', data=data, token=user_token)
    assert response.status_code == 200
    assert response.json['id'] == item1.id
    assert response.json['serial_number'] == item1.serial_number
    assert response.json['ics_id'] == data['ics_id']
    # ...or ics_id
    data = {'status': status.name}
    response = patch(client, f'{API_URL}/inventory/items/{item2.ics_id}', data=data, token=user_token)
    assert response.status_code == 200
    assert response.json['id'] == item2.id
    assert response.json['ics_id'] == item2.ics_id
    assert response.json['serial_number'] == item2.serial_number
    assert response.json['status'] == data['status']


def test_patch_item_integrity_error(client, user_token, item_factory):
    # Create some items
    item1 = item_factory()
    item2 = item_factory(ics_id='ZZZ001')
    data = {'ics_id': item1.ics_id}
    response = patch(client, f'{API_URL}/inventory/items/{item2.id}', data=data, token=user_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)


def test_patch_item_parent(client, location_factory, manufacturer_factory,
                           status_factory, item_factory, user_token):
    # Create some items
    location_factory(name='ESS')
    manufacturer_factory(name='HP')
    status_factory(name='In service')
    status_factory(name='Stock')
    item1 = item_factory(ics_id='AAA001', status='In service')
    item2 = item_factory(ics_id='AAA002')
    item3 = item_factory(ics_id='AAA003')
    item3.parent_id = item1.id

    # set parent changes the status and location
    data1 = {'parent': item1.ics_id}
    response = patch(client, f'{API_URL}/inventory/items/{item2.ics_id}', data=data1, token=user_token)
    assert response.status_code == 200
    assert response.json['id'] == item2.id
    assert response.json['ics_id'] == item2.ics_id
    assert response.json['serial_number'] == item2.serial_number
    assert response.json['parent'] == item1.ics_id
    assert response.json['status'] == str(item1.status)
    assert response.json['location'] == str(item1.location)

    # updating a parent, modifies the status and location of all children
    # check location
    data2 = {'location': 'ESS'}
    response = patch(client, f'{API_URL}/inventory/items/{item1.ics_id}', data=data2, token=user_token)
    assert response.status_code == 200
    assert response.json['id'] == item1.id
    assert response.json['ics_id'] == item1.ics_id
    assert response.json['serial_number'] == item1.serial_number
    assert response.json['status'] == str(item1.status)
    assert response.json['location'] == data2['location']
    for ics_id in ('AAA002', 'AAA003'):
        response = get(client, f'{API_URL}/inventory/items/{ics_id}', token=user_token)
        assert response.json['location'] == data2['location']
        assert response.json['status'] == 'In service'
    # check status
    data3 = {'status': 'Stock'}
    response = patch(client, f'{API_URL}/inventory/items/{item1.ics_id}', data=data3, token=user_token)
    assert response.status_code == 200
    assert response.json['status'] == data3['status']
    for ics_id in ('AAA002', 'AAA003'):
        response = get(client, f'{API_URL}/inventory/items/{ics_id}', token=user_token)
        assert response.json['location'] == data2['location']
        assert response.json['status'] == data3['status']

    # manufacturer has no impact on children
    data4 = {'manufacturer': 'HP'}
    response = patch(client, f'{API_URL}/inventory/items/{item1.ics_id}', data=data4, token=user_token)
    assert response.status_code == 200
    assert response.json['manufacturer'] == 'HP'
    # Manufacturer didn't change on children
    response = get(client, f'{API_URL}/inventory/items/{item2.ics_id}', token=user_token)
    assert response.json['manufacturer'] == str(item2.manufacturer)
    assert str(item2.manufacturer) != 'HP'
    response = get(client, f'{API_URL}/inventory/items/{item3.ics_id}', token=user_token)
    assert response.json['manufacturer'] == str(item3.manufacturer)
    assert str(item3.manufacturer) != 'HP'


def test_get_items(client, location_factory, item_factory, readonly_token):
    # Create some items
    location_factory(name='ESS')
    item1 = item_factory(location='ESS')
    item2 = item_factory(serial_number='234567')
    item3 = item_factory()

    response = get(client, f'{API_URL}/inventory/items', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 3
    check_input_is_subset_of_response(response, (item1.to_dict(), item2.to_dict(), item3.to_dict()))

    # test filtering
    response = get(client, f'{API_URL}/inventory/items?serial_number=234567', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 1
    check_input_is_subset_of_response(response, (item2.to_dict(),))
    # filtering on location_id works but not location (might want to change that)
    response = get(client, f'{API_URL}/inventory/items?location_id={item1.location_id}', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 1
    check_input_is_subset_of_response(response, (item1.to_dict(),))
    response = get(client, f'{API_URL}/inventory/items?location=ESS', token=readonly_token)
    check_response_message(response, 'Invalid query arguments', 422)
    # using an unknown key raises a 422
    response = get(client, f'{API_URL}/inventory/items?foo=bar', token=readonly_token)
    check_response_message(response, 'Invalid query arguments', 422)


def test_get_networks(client, network_factory, readonly_token):
    # Create some networks
    network1 = network_factory(address='172.16.1.0/24', first_ip='172.16.1.1', last_ip='172.16.1.254')
    network2 = network_factory(address='172.16.20.0/22', first_ip='172.16.20.11', last_ip='172.16.20.250')
    network3 = network_factory(address='172.16.5.0/24', first_ip='172.16.5.10', last_ip='172.16.5.254')

    response = get(client, f'{API_URL}/network/networks', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 3
    check_input_is_subset_of_response(response, (network1.to_dict(), network2.to_dict(), network3.to_dict()))

    # test filtering by address
    response = get(client, f'{API_URL}/network/networks?address=172.16.20.0/22', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 1
    check_input_is_subset_of_response(response, (network2.to_dict(),))


def test_create_network_auth_fail(client, session, user_token):
    # admin is required to create networks
    response = post(client, f'{API_URL}/network/networks', data={}, token=user_token)
    check_response_message(response, "User doesn't have the required group", 403)


def test_create_network(client, admin_token, network_scope_factory):
    scope = network_scope_factory(supernet='172.16.0.0/16')
    # check that vlan_name, vlan_id, address, first_ip, last_ip and scope are mandatory
    response = post(client, f'{API_URL}/network/networks', data={}, token=admin_token)
    check_response_message(response, "Missing mandatory field 'vlan_name'", 422)
    response = post(client, f'{API_URL}/network/networks', data={'first_ip': '172.16.1.10', 'last_ip': '172.16.1.250'}, token=admin_token)
    check_response_message(response, "Missing mandatory field 'vlan_name'", 422)
    response = post(client, f'{API_URL}/network/networks', data={'address': '172.16.1.0/24'}, token=admin_token)
    check_response_message(response, "Missing mandatory field 'vlan_name'", 422)
    response = post(client, f'{API_URL}/network/networks', data={'vlan_name': 'network1'}, token=admin_token)
    check_response_message(response, "Missing mandatory field 'vlan_id'", 422)
    response = post(client, f'{API_URL}/network/networks', data={'vlan_name': 'network1', 'vlan_id': 1600}, token=admin_token)
    check_response_message(response, "Missing mandatory field 'address'", 422)
    response = post(client, f'{API_URL}/network/networks', data={'vlan_name': 'network1', 'vlan_id': 1600, 'address': '172.16.1.0/24', 'first_ip': '172.16.1.10'}, token=admin_token)
    check_response_message(response, "Missing mandatory field 'last_ip'", 422)

    data = {'vlan_name': 'network1',
            'vlan_id': 1600,
            'address': '172.16.1.0/24',
            'first_ip': '172.16.1.10',
            'last_ip': '172.16.1.250',
            'scope': scope.name}
    response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token)
    assert response.status_code == 201
    assert {'id', 'vlan_name', 'vlan_id', 'address', 'netmask',
            'first_ip', 'last_ip', 'description', 'admin_only',
            'scope', 'domain', 'interfaces', 'created_at',
            'updated_at', 'user'} == set(response.json.keys())
    assert response.json['vlan_name'] == 'network1'
    assert response.json['vlan_id'] == 1600
    assert response.json['address'] == '172.16.1.0/24'
    assert response.json['first_ip'] == '172.16.1.10'
    assert response.json['last_ip'] == '172.16.1.250'
    assert response.json['netmask'] == '255.255.255.0'

    # Check that address and name shall be unique
    response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)
    data_same_address = data.copy()
    data_same_address['vlan_name'] = 'networkX'
    response = post(client, f'{API_URL}/network/networks', data=data_same_address, token=admin_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)
    data_same_name = {'vlan_name': 'network1',
                      'vlan_id': '1600',
                      'address': '172.16.2.0/24',
                      'first_ip': '172.16.2.10',
                      'last_ip': '172.16.2.250',
                      'scope': scope.name}
    response = post(client, f'{API_URL}/network/networks', data=data_same_name, token=admin_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)

    # Check that all parameters can be passed
    data2 = {'vlan_name': 'network2',
             'vlan_id': '1601',
             'address': '172.16.5.0/24',
             'first_ip': '172.16.5.11',
             'last_ip': '172.16.5.250',
             'description': 'long description',
             'scope': scope.name}
    response = post(client, f'{API_URL}/network/networks', data=data2, token=admin_token)
    assert response.status_code == 201
    assert response.json['description'] == 'long description'

    # check all items that were created
    assert models.Network.query.count() == 2


def test_create_network_invalid_address(client, admin_token, network_scope):
    # invalid network address
    data = {'vlan_name': 'network1',
            'vlan_id': '1600',
            'address': 'foo',
            'first_ip': '172.16.1.10',
            'last_ip': '172.16.1.250',
            'scope': network_scope.name}
    response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token)
    check_response_message(response, "'foo' does not appear to be an IPv4 or IPv6 network", 422)
    data['address'] = '172.16.1'
    response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token)
    check_response_message(response, "'172.16.1' does not appear to be an IPv4 or IPv6 network", 422)
    # address has host bits set
    data['address'] = '172.16.1.1/24'
    response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token)
    check_response_message(response, '172.16.1.1/24 has host bits set', 422)


@pytest.mark.parametrize('address', ('', 'foo', '192.168'))
def test_create_network_invalid_ip(address, client, session, admin_token, network_scope):
    # invalid first IP address
    data = {'vlan_name': 'network1',
            'vlan_id': '1600',
            'address': '192.168.0.0/24',
            'first_ip': address,
            'last_ip': '192.168.0.250',
            'scope': network_scope.name}
    response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token)
    check_response_message(response, f"'{address}' does not appear to be an IPv4 or IPv6 address", 422)
    # invalid last IP address
    data = {'vlan_name': 'network1',
            'vlan_id': '1600',
            'address': '192.168.0.0/24',
            'first_ip': '192.168.0.250',
            'last_ip': address,
            'scope': network_scope.name}
    response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token)
    check_response_message(response, f"'{address}' does not appear to be an IPv4 or IPv6 address", 422)


def test_create_network_invalid_range(client, session, admin_token, network_scope):
    # first_ip not in network address
    data = {'vlan_name': 'network1',
            'vlan_id': '1600',
            'address': '172.16.1.0/24',
            'first_ip': '172.16.2.10',
            'last_ip': '172.16.1.250',
            'scope': network_scope.name}
    response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token)
    check_response_message(response, 'IP address 172.16.2.10 is not in network 172.16.1.0/24', 422)
    # last_ip not in network address
    data = {'vlan_name': 'network1',
            'vlan_id': '1600',
            'address': '172.16.1.0/24',
            'first_ip': '172.16.1.10',
            'last_ip': '172.16.5.250',
            'scope': network_scope.name}
    response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token)
    check_response_message(response, 'IP address 172.16.5.250 is not in network 172.16.1.0/24', 422)
    # first_ip > last_ip
    data = {'vlan_name': 'network1',
            'vlan_id': '1600',
            'address': '172.16.1.0/24',
            'first_ip': '172.16.1.10',
            'last_ip': '172.16.1.9',
            'scope': network_scope.name}
    response = post(client, f'{API_URL}/network/networks', data=data, token=admin_token)
    check_response_message(response, 'Last IP address 172.16.1.9 is less than the first address 172.16.1.10', 422)


def test_get_interfaces(client, network_factory, interface_factory, readonly_token):
    # Create some interfaces
    network1 = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250')
    network2 = network_factory(address='192.168.2.0/24', first_ip='192.168.2.10', last_ip='192.168.2.250')
    interface1 = interface_factory(network=network1, ip='192.168.1.10')
    interface2 = interface_factory(network=network1, ip='192.168.1.11', name='interface2')
    interface3 = interface_factory(network=network2, ip='192.168.2.10')

    response = get(client, f'{API_URL}/network/interfaces', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 3
    check_input_is_subset_of_response(response, (interface1.to_dict(), interface2.to_dict(), interface3.to_dict()))

    # test filtering by network_id
    response = get(client, f'{API_URL}/network/interfaces?network_id={network2.id}', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 1
    check_input_is_subset_of_response(response, (interface3.to_dict(),))


def test_get_interfaces_by_domain(client, domain_factory, network_factory, interface_factory, readonly_token):
    # Create some interfaces
    domain1 = domain_factory(name='tn.esss.lu.se')
    domain2 = domain_factory(name='ics.esss.lu.se')
    network1 = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250', domain=domain1)
    network2 = network_factory(address='192.168.2.0/24', first_ip='192.168.2.10', last_ip='192.168.2.250', domain=domain2)
    interface1 = interface_factory(network=network1, ip='192.168.1.10')
    interface2 = interface_factory(network=network1, ip='192.168.1.11', name='interface2')
    interface3 = interface_factory(network=network2, ip='192.168.2.10')

    # test filtering by domain
    response = get(client, f'{API_URL}/network/interfaces?domain=tn.esss.lu.se', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 2
    check_input_is_subset_of_response(response, (interface1.to_dict(), interface2.to_dict()))

    response = get(client, f'{API_URL}/network/interfaces?domain=ics.esss.lu.se', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 1
    check_input_is_subset_of_response(response, (interface3.to_dict(),))


def test_create_interface(client, network_factory, user_token):
    network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250')
    # check that network_id and ip are mandatory
    response = post(client, f'{API_URL}/network/interfaces', data={}, token=user_token)
    check_response_message(response, "Missing mandatory field 'network'", 422)
    response = post(client, f'{API_URL}/network/interfaces', data={'ip': '192.168.1.20'}, token=user_token)
    check_response_message(response, "Missing mandatory field 'network'", 422)
    response = post(client, f'{API_URL}/network/interfaces', data={'network': network.address}, token=user_token)
    check_response_message(response, "Missing mandatory field 'ip'", 422)

    data = {'network': network.vlan_name,
            'ip': '192.168.1.20',
            'name': 'interface1'}
    response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token)
    assert response.status_code == 201
    assert {'id', 'network', 'ip', 'name', 'mac', 'domain',
            'host', 'device_type', 'cnames', 'tags', 'created_at',
            'updated_at', 'user'} == set(response.json.keys())
    assert response.json['network'] == network.vlan_name
    assert response.json['ip'] == '192.168.1.20'
    assert response.json['name'] == 'interface1'

    # Check that IP and name shall be unique
    response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)

    # Check that all parameters can be passed
    data2 = {'network': network.vlan_name,
             'ip': '192.168.1.21',
             'name': 'myhostname'}
    response = post(client, f'{API_URL}/network/interfaces', data=data2, token=user_token)
    assert response.status_code == 201

    # check all items that were created
    assert models.Interface.query.count() == 2


@pytest.mark.parametrize('ip', ('', 'foo', '192.168'))
def test_create_interface_invalid_ip(ip, client, network_factory, user_token):
    network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250')
    # invalid IP address
    data = {'network': network.vlan_name,
            'ip': ip,
            'name': 'hostname'}
    response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token)
    check_response_message(response, f"'{ip}' does not appear to be an IPv4 or IPv6 address", 422)


def test_create_interface_ip_not_in_network(client, network_factory, user_token):
    network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250')
    # IP address not in range
    data = {'network': network.vlan_name,
            'ip': '192.168.2.4',
            'name': 'hostname'}
    response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token)
    check_response_message(response, 'IP address 192.168.2.4 is not in network 192.168.1.0/24', 422)


def test_create_interface_ip_not_in_range(client, network_factory, user_token):
    network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250')
    # IP address not in range
    data = {'network': network.vlan_name,
            'ip': '192.168.1.4',
            'name': 'hostname'}
    response = post(client, f'{API_URL}/network/interfaces', data=data, token=user_token)
    check_response_message(response, 'IP address 192.168.1.4 is not in range 192.168.1.10 - 192.168.1.250', 422)


def test_create_interface_ip_not_in_range_as_admin(client, network_factory, admin_token):
    network = network_factory(address='192.168.1.0/24', first_ip='192.168.1.10', last_ip='192.168.1.250')
    # IP address not in range
    data = {'network': network.vlan_name,
            'ip': '192.168.1.4',
            'name': 'hostname'}
    response = post(client, f'{API_URL}/network/interfaces', data=data, token=admin_token)
    assert response.status_code == 201


def test_get_macs(client, mac_factory, readonly_token):
    # Create some macs
    mac1 = mac_factory()
    mac2 = mac_factory()

    response = get(client, f'{API_URL}/network/macs', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 2
    check_input_is_subset_of_response(response, (mac1.to_dict(), mac2.to_dict()))


def test_create_mac(client, item_factory, user_token):
    item = item_factory()
    # check that address is mandatory
    response = post(client, f'{API_URL}/network/macs', data={}, token=user_token)
    check_response_message(response, "Missing mandatory field 'address'", 422)

    data = {'address': 'b5:4b:7d:a4:23:43'}
    response = post(client, f'{API_URL}/network/macs', data=data, token=user_token)
    assert response.status_code == 201
    assert {'id', 'address', 'item', 'interfaces'} == set(response.json.keys())
    assert response.json['address'] == data['address']

    # Check that address shall be unique
    response = post(client, f'{API_URL}/network/macs', data=data, token=user_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)

    # Check that all parameters can be passed
    data2 = {'address': 'b5:4b:7d:a4:23:44',
             'item_id': item.id}
    response = post(client, f'{API_URL}/network/macs', data=data2, token=user_token)
    assert response.status_code == 201

    # check that all items were created
    assert models.Mac.query.count() == 2


@pytest.mark.parametrize('address', ('', 'foo', 'b5:4b:7d:a4:23'))
def test_create_mac_invalid_address(address, client, user_token):
    data = {'address': address}
    response = post(client, f'{API_URL}/network/macs', data=data, token=user_token)
    check_response_message(response, f"'{address}' does not appear to be a MAC address", 422)


def test_get_hosts(client, host_factory, readonly_token):
    # Create some hosts
    host1 = host_factory()
    host2 = host_factory()
    response = get(client, f'{API_URL}/network/hosts', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 2
    check_input_is_subset_of_response(response, (host1.to_dict(), host2.to_dict()))


def test_create_host(client, device_type_factory, user_token):
    device_type = device_type_factory(name='Virtual')
    # check that name and device_type are  mandatory
    response = post(client, f'{API_URL}/network/hosts', data={}, token=user_token)
    check_response_message(response, "Missing mandatory field 'name'", 422)
    response = post(client, f'{API_URL}/network/hosts', data={'name': 'myhost'}, token=user_token)
    check_response_message(response, "Missing mandatory field 'device_type'", 422)
    response = post(client, f'{API_URL}/network/hosts', data={'device_type': 'Physical'}, token=user_token)
    check_response_message(response, "Missing mandatory field 'name'", 422)

    data = {'name': 'my-hostname',
            'device_type': device_type.name}
    response = post(client, f'{API_URL}/network/hosts', data=data, token=user_token)
    assert response.status_code == 201
    assert {'id', 'name', 'device_type', 'description',
            'items', 'interfaces', 'created_at',
            'updated_at', 'user'} == set(response.json.keys())
    assert response.json['name'] == data['name']

    # Check that name shall be unique
    response = post(client, f'{API_URL}/network/hosts', data=data, token=user_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)

    # check that the number of items created
    assert models.Host.query.count() == 1


def test_create_host_with_items(client, item_factory, device_type_factory, user_token):
    device_type = device_type_factory(name='Switch')
    item1 = item_factory(ics_id='AAA001')
    item2 = item_factory(ics_id='AAA002')
    # Check that we can pass a list of items ics_id
    data = {'name': 'my-switch',
            'device_type': device_type.name,
            'items': [item1.ics_id, item2.ics_id]}
    response = post(client, f'{API_URL}/network/hosts', data=data, token=user_token)
    assert response.status_code == 201
    host = models.Host.query.filter_by(name='my-switch').first()
    assert models.Item.query.get(item1.id).host_id == host.id
    assert models.Item.query.get(item2.id).host_id == host.id


def test_create_host_as_consultant(client, item_factory, device_type_factory, consultant_token):
    device_type = device_type_factory()
    data = {'name': 'my-hostname',
            'device_type': device_type.name}
    response = post(client, f'{API_URL}/network/hosts', data=data, token=consultant_token)
    assert response.status_code == 201


def test_get_user_profile(client, readonly_token):
    response = get(client, f'{API_URL}/user/profile', token=readonly_token)
    assert response.status_code == 200
    user = response.json
    assert {'id', 'username', 'groups', 'email', 'display_name'} == set(user.keys())
    assert user['username'] == 'user_ro'
    assert user['display_name'] == 'User RO'
    assert user['email'] == 'user_ro@example.com'


def test_get_domains(client, domain_factory, readonly_token):
    # Create some domains
    domain1 = domain_factory()
    domain2 = domain_factory()
    response = get(client, f'{API_URL}/network/domains', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 2
    check_input_is_subset_of_response(response, (domain1.to_dict(), domain2.to_dict()))


def test_create_domain(client, admin_token):
    # check that name is mandatory
    response = post(client, f'{API_URL}/network/domains', data={}, token=admin_token)
    check_response_message(response, "Missing mandatory field 'name'", 422)

    data = {'name': 'tn.esss.lu.se'}
    response = post(client, f'{API_URL}/network/domains', data=data, token=admin_token)
    assert response.status_code == 201
    assert {'id', 'name', 'scopes',
            'networks', 'created_at',
            'updated_at', 'user'} == set(response.json.keys())
    assert response.json['name'] == data['name']

    # Check that name shall be unique
    response = post(client, f'{API_URL}/network/domains', data=data, token=admin_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)


def test_get_cnames(client, cname_factory, readonly_token):
    # Create some cnames
    cname1 = cname_factory()
    cname2 = cname_factory()
    response = get(client, f'{API_URL}/network/cnames', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 2
    check_input_is_subset_of_response(response, (cname1.to_dict(), cname2.to_dict()))


def test_get_cnames_by_domain(client, domain_factory, network_factory, interface_factory, cname_factory, readonly_token):
    # Create some cnames
    domain_a = domain_factory(name='a.esss.lu.se')
    domain_b = domain_factory(name='b.esss.lu.se')
    network_a = network_factory(domain=domain_a)
    network_b = network_factory(domain=domain_b)
    interface_a1 = interface_factory(network=network_a)
    interface_a2 = interface_factory(network=network_a)
    interface_b1 = interface_factory(network=network_b)
    cname_a1 = cname_factory(interface=interface_a1)
    cname_a2 = cname_factory(interface=interface_a1)
    cname_a3 = cname_factory(interface=interface_a2)
    cname_b1 = cname_factory(interface=interface_b1)
    cname_b2 = cname_factory(interface=interface_b1)
    response = get(client, f'{API_URL}/network/cnames', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 5
    response = get(client, f'{API_URL}/network/cnames?domain=a.esss.lu.se', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 3
    check_input_is_subset_of_response(response, (cname_a1.to_dict(), cname_a2.to_dict(), cname_a3.to_dict()))
    response = get(client, f'{API_URL}/network/cnames?domain=b.esss.lu.se', token=readonly_token)
    assert response.status_code == 200
    assert len(response.json) == 2
    check_input_is_subset_of_response(response, (cname_b1.to_dict(), cname_b2.to_dict()))


def test_create_cname(client, interface, admin_token):
    # check that name and interface_id are mandatory
    response = post(client, f'{API_URL}/network/cnames', data={}, token=admin_token)
    check_response_message(response, "Missing mandatory field 'name'", 422)
    response = post(client, f'{API_URL}/network/cnames', data={'name': 'myhost'}, token=admin_token)
    check_response_message(response, "Missing mandatory field 'interface_id'", 422)
    response = post(client, f'{API_URL}/network/cnames', data={'interface_id': interface.id}, token=admin_token)
    check_response_message(response, "Missing mandatory field 'name'", 422)

    data = {'name': 'myhost.tn.esss.lu.se',
            'interface_id': interface.id}
    response = post(client, f'{API_URL}/network/cnames', data=data, token=admin_token)
    assert response.status_code == 201
    assert {'id', 'name', 'interface',
            'created_at', 'updated_at', 'user'} == set(response.json.keys())
    assert response.json['name'] == data['name']

    # Check that name shall be unique
    response = post(client, f'{API_URL}/network/cnames', data=data, token=admin_token)
    check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)