Newer
Older
tests.functional.test_api
~~~~~~~~~~~~~~~~~~~~~~~~~
This module defines API tests.
:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.
"""
"inventory/actions": models.Action,
"inventory/manufacturers": models.Manufacturer,
"inventory/models": models.Model,
"inventory/locations": models.Location,
"inventory/statuses": models.Status,
"inventory/items": models.Item,
"network/networks": models.Network,
"network/interfaces": models.Interface,
"network/hosts": models.Host,
"network/domains": models.Domain,
"network/cnames": models.Cname,
GENERIC_GET_ENDPOINTS = [
key
for key in ENDPOINT_MODEL.keys()
if key.startswith("inventory") and key not in ("inventory/items", "inventory/macs")
]
GENERIC_CREATE_ENDPOINTS = [
key
for key in ENDPOINT_MODEL.keys()
if key.startswith("inventory")
and key not in ("inventory/items", "inventory/actions", "inventory/macs")
]
CREATE_AUTH_ENDPOINTS = [
key for key in ENDPOINT_MODEL.keys() if key != "inventory/actions"
]
HOST_KEYS = {
"id",
"name",
"fqdn",
"is_ioc",
"device_type",
"model",
"description",
"items",
"interfaces",
"ansible_vars",
"ansible_groups",
"created_at",
"updated_at",
"user",
}
INTERFACE_KEYS = {
"id",
"is_main",
"network",
"ip",
"netmask",
"name",
"mac",
"domain",
"host",
"device_type",
"model",
"cnames",
"created_at",
"updated_at",
"user",
}
def get(client, url, token=None):
response = client.get(
url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
return response
def post(client, url, data, token=None):
response = client.post(url, data=json.dumps(data), headers=headers)
return response
def patch(client, url, data, token=None):
response = client.patch(url, data=json.dumps(data), headers=headers)
return response
def delete(client, url, token=None):
headers = {"Content-Type": "application/json"}
if token is not None:
headers["Authorization"] = f"Bearer {token}"
response = client.delete(url, headers=headers)
return response
data = {"username": username, "password": password}
return post(client, f"{API_URL}/user/login", data)
def get_token(client, username, password):
response = login(client, username, password)
return response.get_json()["access_token"]
@pytest.fixture()
def readonly_token(client):
@pytest.fixture()
def user_token(client):
@pytest.fixture()
def consultant_token(client):
return get_token(client, "consultant", "consultantpwd")
return get_token(client, "admin", "adminpasswd")
@pytest.fixture
def no_login_check_token(request, app):
app.config["LOGIN_DISABLED"] = True
client = app.test_client()
# We still need to login, otherwise an AnonymousUserMixin is returned
# An AnonymousUser doesn't have all the User methods
yield get_token(client, "user_ro", "userro")
app.config["LOGIN_DISABLED"] = False
def check_response_message(response, msg, status_code=400):
assert response.status_code == status_code
try:
except AttributeError:
data = json.loads(response.data)
try:
except KeyError:
# flask-jwt-extended is using "msg" instead of "message"
# in its default callbacks
response_names = set(item["name"] for item in response.get_json())
def check_input_is_subset_of_response(response, inputs):
# Sort the response by id to match the inputs order
response_elts = sorted(response.get_json(), key=lambda d: d["id"])
for key, value in d1.items():
if isinstance(value, datetime.datetime):
response = client.post(f"{API_URL}/user/login")
check_response_message(response, "Body should be a JSON object")
response = post(
client, f"{API_URL}/user/login", data={"username": "foo", "passwd": ""}
)
check_response_message(
response, "Missing mandatory field (username or password)", 422
)
response = login(client, "foo", "invalid")
check_response_message(response, "Invalid credentials", 401)
response = login(client, "user_ro", "userro")
assert "access_token" in response.get_json()
@pytest.mark.parametrize("endpoint", GENERIC_GET_ENDPOINTS)
def test_get_generic_model(endpoint, session, client, readonly_token):
model = ENDPOINT_MODEL[endpoint]
for name in names:
session.add(model(name=name))
session.commit()
response = client.get(f"{API_URL}/{endpoint}")
check_response_message(response, "Missing Authorization Header", 401)
response = get(client, f"{API_URL}/{endpoint}", "xxxxxxxxx")
check_response_message(response, "Not enough segments", 422)
response = get(client, f"{API_URL}/{endpoint}", readonly_token)
response = get(client, f"{API_URL}/{endpoint}", readonly_token)
@pytest.mark.parametrize("endpoint", CREATE_AUTH_ENDPOINTS)
def test_create_model_auth_fail(endpoint, client, readonly_token):
response = client.post(f"{API_URL}/{endpoint}")
check_response_message(response, "Missing Authorization Header", 401)
response = post(client, f"{API_URL}/{endpoint}", data={}, token="xxxxxxxxx")
check_response_message(response, "Not enough segments", 422)
response = post(client, f"{API_URL}/{endpoint}", data={}, token=readonly_token)
check_response_message(response, "User doesn't have the required group", 403)
model = ENDPOINT_MODEL[endpoint]
assert model.query.count() == 0
@pytest.mark.parametrize("endpoint", GENERIC_CREATE_ENDPOINTS)
def test_create_generic_model(endpoint, client, user_token):
response = post(client, f"{API_URL}/{endpoint}", data={}, token=user_token)
check_response_message(response, "Missing mandatory field 'name'", 422)
data = {"name": "Foo"}
response = post(client, f"{API_URL}/{endpoint}", data=data, token=user_token)
assert {"id", "name"} <= set(response.get_json().keys())
assert response.get_json()["name"] == "Foo"
response = post(client, f"{API_URL}/{endpoint}", data=data, token=user_token)
check_response_message(
response,
"(psycopg2.IntegrityError) duplicate key value violates unique constraint",
422,
)
response = post(
client, f"{API_URL}/{endpoint}", data={"name": "foo"}, token=user_token
)
check_response_message(
response,
"(psycopg2.IntegrityError) duplicate key value violates unique constraint",
422,
)
response = post(
client, f"{API_URL}/{endpoint}", data={"name": "FOO"}, token=user_token
)
check_response_message(
response,
"(psycopg2.IntegrityError) duplicate key value violates unique constraint",
422,
)
data = {"name": "Bar", "description": "Bar description"}
response = post(client, f"{API_URL}/{endpoint}", data=data, token=user_token)
assert response.get_json()["description"] == "Bar description"
model = ENDPOINT_MODEL[endpoint]
assert model.query.count() == 2
response = get(client, f"{API_URL}/{endpoint}", user_token)
check_names(response, ("Foo", "Bar"))
@pytest.mark.parametrize("endpoint", GENERIC_CREATE_ENDPOINTS)
def test_create_generic_model_invalid_param(endpoint, client, user_token):
response = post(
client,
f"{API_URL}/{endpoint}",
data={"name": "foo", "hello": "world"},
token=user_token,
)
check_response_message(
response, f"'hello' is an invalid keyword argument for {model.__name__}", 422
)
def test_create_item(client, user_token):
# check that serial_number is mandatory
response = post(client, f"{API_URL}/inventory/items", data={}, token=user_token)
check_response_message(response, "Missing mandatory field 'serial_number'", 422)
# check create with only serial_number
data = {"serial_number": "123456"}
response = post(client, f"{API_URL}/inventory/items", data=data, token=user_token)
assert {
"id",
"ics_id",
"serial_number",
"manufacturer",
"model",
"quantity",
"location",
"status",
"parent",
"children",
"macs",
"history",
"host",
"stack_member",
"updated_at",
"created_at",
"user",
"comments",
} == set(response.get_json().keys())
assert response.get_json()["serial_number"] == "123456"
# Check that serial_number doesn't have to be unique
response = post(client, f"{API_URL}/inventory/items", data=data, token=user_token)
assert response.status_code == 201
# check that ics_id shall be unique
data2 = {"serial_number": "456789", "ics_id": "AAA001"}
response = post(client, f"{API_URL}/inventory/items", data=data2, token=user_token)
response = post(client, f"{API_URL}/inventory/items", data=data2, token=user_token)
check_response_message(
response,
"(psycopg2.IntegrityError) duplicate key value violates unique constraint",
422,
)
# check all items that were created
assert models.Item.query.count() == 3
response = get(client, f"{API_URL}/inventory/items", user_token)
check_input_is_subset_of_response(response, (data, data, data2))
def test_create_item_with_host_id(client, host_factory, user_token):
host = host_factory()
# Check that we can pass an host_id
data = {"serial_number": "123456", "host_id": host.id}
response = post(client, f"{API_URL}/inventory/items", data=data, token=user_token)
assert response.status_code == 201
item = models.Item.query.filter_by(serial_number=data["serial_number"]).first()
assert item.host_id == host.id
def test_create_item_invalid_ics_id(client, user_token):
for ics_id in ("foo", "AAB1234", "AZ02", "WS007", "AAA01"):
data = {"serial_number": "123456", "ics_id": ics_id}
response = post(
client, f"{API_URL}/inventory/items", data=data, token=user_token
)
check_response_message(response, "ICS id shall match [A-Z]{3}[0-9]{3}", 422)
def test_get_item_fail(client, session, readonly_token):
response = get(client, f"{API_URL}/inventory/items/50", token=readonly_token)
check_response_message(response, "Item id '50' not found", 404)
response = get(client, f"{API_URL}/inventory/items/bar", token=readonly_token)
check_response_message(response, "Item id 'bar' not found", 404)
def test_get_item(client, status_factory, item_factory, readonly_token):
status_factory(name="Stock")
item1 = item_factory(serial_number="123456")
item2 = item_factory(serial_number="234567", ics_id="AAA001", status="Stock")
response = get(
client, f"{API_URL}/inventory/items/{item1.id}", token=readonly_token
)
assert response.get_json()["id"] == item1.id
assert response.get_json()["serial_number"] == item1.serial_number
response = get(
client, f"{API_URL}/inventory/items/{item2.ics_id}", token=readonly_token
)
assert response.get_json()["id"] == item2.id
assert response.get_json()["ics_id"] == item2.ics_id
assert response.get_json()["serial_number"] == item2.serial_number
assert response.get_json()["status"] == str(item2.status)
def test_patch_item_auth_fail(client, session, readonly_token):
response = client.patch(f"{API_URL}/inventory/items/50")
check_response_message(response, "Missing Authorization Header", 401)
response = patch(
client, f"{API_URL}/inventory/items/50", data={}, token="xxxxxxxxx"
)
check_response_message(response, "Not enough segments", 422)
response = patch(
client, f"{API_URL}/inventory/items/50", data={}, token=readonly_token
)
check_response_message(response, "User doesn't have the required group", 403)
def test_patch_item_fail(client, item_factory, user_token):
response = patch(client, f"{API_URL}/inventory/items/50", data={}, token=user_token)
check_response_message(response, "At least one field is required", 422)
data = {"location": "ESS", "foo": "bar"}
response = patch(
client, f"{API_URL}/inventory/items/50", data=data, token=user_token
)
check_response_message(response, "Invalid field 'foo'", 422)
data = {"location": "ESS"}
response = patch(
client, f"{API_URL}/inventory/items/50", data=data, token=user_token
)
check_response_message(response, "Item id '50' not found", 404)
response = patch(
client, f"{API_URL}/inventory/items/bar", data=data, token=user_token
)
check_response_message(response, "Item id 'bar' not found", 404)
# Create an item
item1 = item_factory(serial_number="234567", ics_id="AAA001")
# check that we can't change the serial_number or ics_id
response = patch(
client,
f"{API_URL}/inventory/items/{item1.id}",
data={"serial_number": "12345"},
token=user_token,
)
check_response_message(response, "Invalid field 'serial_number'", 422)
response = patch(
client,
f"{API_URL}/inventory/items/{item1.id}",
data={"ics_id": "AAA002"},
token=user_token,
)
check_response_message(response, "'ics_id' can't be changed", 422)
def test_patch_item(client, status, item_factory, user_token):
data = {"ics_id": "AAB004"}
response = patch(
client, f"{API_URL}/inventory/items/{item1.id}", data=data, token=user_token
)
assert response.get_json()["id"] == item1.id
assert response.get_json()["serial_number"] == item1.serial_number
assert response.get_json()["ics_id"] == data["ics_id"]
data = {"status": status.name}
response = patch(
client, f"{API_URL}/inventory/items/{item2.ics_id}", data=data, token=user_token
)
assert response.get_json()["id"] == item2.id
assert response.get_json()["ics_id"] == item2.ics_id
assert response.get_json()["serial_number"] == item2.serial_number
assert response.get_json()["status"] == data["status"]
def test_patch_item_integrity_error(client, user_token, item_factory):
item2 = item_factory(ics_id="ZZZ001")
data = {"ics_id": item1.ics_id}
response = patch(
client, f"{API_URL}/inventory/items/{item2.id}", data=data, token=user_token
)
check_response_message(
response,
"(psycopg2.IntegrityError) duplicate key value violates unique constraint",
422,
)
def test_patch_item_parent(
client,
location_factory,
manufacturer_factory,
status_factory,
item_factory,
user_token,
):
location_factory(name="ESS")
manufacturer_factory(name="HP")
status_factory(name="In service")
status_factory(name="Stock")
item1 = item_factory(ics_id="AAA001", status="In service")
item2 = item_factory(ics_id="AAA002")
item3 = item_factory(ics_id="AAA003")
item3.parent_id = item1.id
# set parent changes the status and location
data1 = {"parent": item1.ics_id}
response = patch(
client,
f"{API_URL}/inventory/items/{item2.ics_id}",
data=data1,
token=user_token,
)
assert response.get_json()["id"] == item2.id
assert response.get_json()["ics_id"] == item2.ics_id
assert response.get_json()["serial_number"] == item2.serial_number
assert response.get_json()["parent"] == item1.ics_id
assert response.get_json()["status"] == str(item1.status)
assert response.get_json()["location"] == str(item1.location)
# updating a parent, modifies the status and location of all children
# check location
data2 = {"location": "ESS"}
response = patch(
client,
f"{API_URL}/inventory/items/{item1.ics_id}",
data=data2,
token=user_token,
)
assert response.get_json()["id"] == item1.id
assert response.get_json()["ics_id"] == item1.ics_id
assert response.get_json()["serial_number"] == item1.serial_number
assert response.get_json()["status"] == str(item1.status)
assert response.get_json()["location"] == data2["location"]
for ics_id in ("AAA002", "AAA003"):
response = get(client, f"{API_URL}/inventory/items/{ics_id}", token=user_token)
assert response.get_json()["location"] == data2["location"]
assert response.get_json()["status"] == "In service"
data3 = {"status": "Stock"}
response = patch(
client,
f"{API_URL}/inventory/items/{item1.ics_id}",
data=data3,
token=user_token,
)
assert response.get_json()["status"] == data3["status"]
for ics_id in ("AAA002", "AAA003"):
response = get(client, f"{API_URL}/inventory/items/{ics_id}", token=user_token)
assert response.get_json()["location"] == data2["location"]
assert response.get_json()["status"] == data3["status"]
# manufacturer has no impact on children
data4 = {"manufacturer": "HP"}
response = patch(
client,
f"{API_URL}/inventory/items/{item1.ics_id}",
data=data4,
token=user_token,
)
assert response.get_json()["manufacturer"] == "HP"
# Manufacturer didn't change on children
response = get(
client, f"{API_URL}/inventory/items/{item2.ics_id}", token=user_token
)
assert response.get_json()["manufacturer"] == str(item2.manufacturer)
assert str(item2.manufacturer) != "HP"
response = get(
client, f"{API_URL}/inventory/items/{item3.ics_id}", token=user_token
)
assert response.get_json()["manufacturer"] == str(item3.manufacturer)
def test_get_items(client, location_factory, item_factory, readonly_token):
# Create some items
location_factory(name="ESS")
item1 = item_factory(location="ESS")
item2 = item_factory(serial_number="234567")
response = get(client, f"{API_URL}/inventory/items", token=readonly_token)
assert response.status_code == 200
check_input_is_subset_of_response(
response, (item1.to_dict(), item2.to_dict(), item3.to_dict())
)
# test filtering
response = get(
client, f"{API_URL}/inventory/items?serial_number=234567", token=readonly_token
)
assert response.status_code == 200
check_input_is_subset_of_response(response, (item2.to_dict(),))
# filtering on location_id works but not location (might want to change that)
response = get(
client,
f"{API_URL}/inventory/items?location_id={item1.location_id}",
token=readonly_token,
)
assert response.status_code == 200
check_input_is_subset_of_response(response, (item1.to_dict(),))
response = get(
client, f"{API_URL}/inventory/items?location=ESS", token=readonly_token
)
check_response_message(response, "Invalid query arguments", 422)
# using an unknown key raises a 422
response = get(client, f"{API_URL}/inventory/items?foo=bar", token=readonly_token)
check_response_message(response, "Invalid query arguments", 422)
def test_get_networks(client, network_factory, readonly_token):
network1 = network_factory(
address="172.16.1.0/24", first_ip="172.16.1.1", last_ip="172.16.1.254"
)
network2 = network_factory(
address="172.16.20.0/22", first_ip="172.16.20.11", last_ip="172.16.20.250"
)
network3 = network_factory(
address="172.16.5.0/24", first_ip="172.16.5.10", last_ip="172.16.5.254"
)
response = get(client, f"{API_URL}/network/networks", token=readonly_token)
check_input_is_subset_of_response(
response, (network1.to_dict(), network2.to_dict(), network3.to_dict())
)
response = get(
client,
f"{API_URL}/network/networks?address=172.16.20.0/22",
token=readonly_token,
)
check_input_is_subset_of_response(response, (network2.to_dict(),))
def test_create_network_auth_fail(client, session, user_token):
# admin is required to create networks
response = post(client, f"{API_URL}/network/networks", data={}, token=user_token)
check_response_message(response, "User doesn't have the required group", 403)
def test_create_network(client, admin_token, network_scope_factory):
scope = network_scope_factory(supernet="172.16.0.0/16")
# check that vlan_name, vlan_id, address, first_ip, last_ip, gateway and scope are mandatory
response = post(client, f"{API_URL}/network/networks", data={}, token=admin_token)
check_response_message(response, "Missing mandatory field 'vlan_name'", 422)
response = post(
client,
f"{API_URL}/network/networks",
data={"first_ip": "172.16.1.10", "last_ip": "172.16.1.250"},
token=admin_token,
)
check_response_message(response, "Missing mandatory field 'vlan_name'", 422)
response = post(
client,
f"{API_URL}/network/networks",
data={"address": "172.16.1.0/24"},
token=admin_token,
)
check_response_message(response, "Missing mandatory field 'vlan_name'", 422)
response = post(
client,
f"{API_URL}/network/networks",
data={"vlan_name": "network1"},
token=admin_token,
)
check_response_message(response, "Missing mandatory field 'vlan_id'", 422)
response = post(
client,
f"{API_URL}/network/networks",
data={"vlan_name": "network1", "vlan_id": 1600},
token=admin_token,
)
check_response_message(response, "Missing mandatory field 'address'", 422)
response = post(
client,
f"{API_URL}/network/networks",
data={
"vlan_name": "network1",
"vlan_id": 1600,
"address": "172.16.1.0/24",
"first_ip": "172.16.1.10",
},
token=admin_token,
)
check_response_message(response, "Missing mandatory field 'last_ip'", 422)
response = post(
client,
f"{API_URL}/network/networks",
data={
"vlan_name": "network1",
"vlan_id": 1600,
"address": "172.16.1.0/24",
"first_ip": "172.16.1.10",
"last_ip": "172.16.1.250",
"scope": scope.name,
},
token=admin_token,
)
check_response_message(response, "Missing mandatory field 'gateway'", 422)
data = {
"vlan_name": "network1",
"vlan_id": 1600,
"address": "172.16.1.0/24",
"first_ip": "172.16.1.10",
"last_ip": "172.16.1.250",
"scope": scope.name,
}
response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token)
assert {
"id",
"vlan_name",
"vlan_id",
"address",
"netmask",
"first_ip",
"last_ip",
"description",
"admin_only",
"scope",
"domain",
"interfaces",
"created_at",
"updated_at",
"user",
} == set(response.get_json().keys())
assert response.get_json()["vlan_name"] == "network1"
assert response.get_json()["vlan_id"] == 1600
assert response.get_json()["address"] == "172.16.1.0/24"
assert response.get_json()["first_ip"] == "172.16.1.10"
assert response.get_json()["last_ip"] == "172.16.1.250"
assert response.get_json()["gateway"] == "172.16.1.254"
assert response.get_json()["netmask"] == "255.255.255.0"
# Check that address and name shall be unique
response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token)
check_response_message(
response,
"(psycopg2.IntegrityError) duplicate key value violates unique constraint",
422,
)
data_same_address["vlan_name"] = "networkX"
response = post(
client, f"{API_URL}/network/networks", data=data_same_address, token=admin_token
)
check_response_message(
response,
"(psycopg2.IntegrityError) duplicate key value violates unique constraint",
422,
)
data_same_name = {
"vlan_name": "network1",
"vlan_id": "1600",
"address": "172.16.2.0/24",
"first_ip": "172.16.2.10",
"last_ip": "172.16.2.250",
"scope": scope.name,
}
response = post(
client, f"{API_URL}/network/networks", data=data_same_name, token=admin_token
)
check_response_message(
response,
"(psycopg2.IntegrityError) duplicate key value violates unique constraint",
422,
)
data2 = {
"vlan_name": "network2",
"vlan_id": "1601",
"address": "172.16.5.0/24",
"first_ip": "172.16.5.11",
"last_ip": "172.16.5.250",
"description": "long description",
"scope": scope.name,
}
response = post(
client, f"{API_URL}/network/networks", data=data2, token=admin_token
)
assert response.get_json()["description"] == "long description"
# check all items that were created
assert models.Network.query.count() == 2
def test_create_network_invalid_address(client, admin_token, network_scope):
# invalid network address
data = {
"vlan_name": "network1",
"vlan_id": "1600",
"address": "foo",
"first_ip": "172.16.1.10",
"last_ip": "172.16.1.250",
"scope": network_scope.name,
}
response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token)
check_response_message(
response, "'foo' does not appear to be an IPv4 or IPv6 network", 422
)
data["address"] = "172.16.1"
response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token)
check_response_message(
response, "'172.16.1' does not appear to be an IPv4 or IPv6 network", 422
)
data["address"] = "172.16.1.1/24"
response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token)
check_response_message(response, "172.16.1.1/24 has host bits set", 422)
@pytest.mark.parametrize("address", ("", "foo", "192.168"))
def test_create_network_invalid_ip(
address, client, session, admin_token, network_scope
):
# invalid first IP address
data = {
"vlan_name": "network1",
"vlan_id": "1600",
"address": "192.168.0.0/24",
"first_ip": address,
"last_ip": "192.168.0.250",
"scope": network_scope.name,
}
response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token)
check_response_message(
response, f"'{address}' does not appear to be an IPv4 or IPv6 address", 422
)
# invalid last IP address
data = {
"vlan_name": "network1",
"vlan_id": "1600",
"address": "192.168.0.0/24",
"first_ip": "192.168.0.250",
"last_ip": address,
"scope": network_scope.name,
}
response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token)
check_response_message(
response, f"'{address}' does not appear to be an IPv4 or IPv6 address", 422
)
def test_create_network_invalid_range(client, session, admin_token, network_scope):
# first_ip not in network address
data = {
"vlan_name": "network1",
"vlan_id": "1600",
"address": "172.16.1.0/24",
"first_ip": "172.16.2.10",
"last_ip": "172.16.1.250",
"scope": network_scope.name,
}
response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token)
check_response_message(
response, "IP address 172.16.2.10 is not in network 172.16.1.0/24", 422
)
data = {
"vlan_name": "network1",
"vlan_id": "1600",
"address": "172.16.1.0/24",
"first_ip": "172.16.1.10",
"last_ip": "172.16.5.250",
"scope": network_scope.name,
}
response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token)
check_response_message(
response, "IP address 172.16.5.250 is not in network 172.16.1.0/24", 422
)
data = {
"vlan_name": "network1",
"vlan_id": "1600",
"address": "172.16.1.0/24",
"first_ip": "172.16.1.10",
"last_ip": "172.16.1.9",
"scope": network_scope.name,
}
response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token)
check_response_message(
response,
"Last IP address 172.16.1.9 is less than the first address 172.16.1.10",
422,
)
def test_get_interfaces(client, network_factory, interface_factory, readonly_token):
# Create some interfaces
network1 = network_factory(
address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250"
)
network2 = network_factory(
address="192.168.2.0/24", first_ip="192.168.2.10", last_ip="192.168.2.250"
)
interface1 = interface_factory(network=network1, ip="192.168.1.10")
interface2 = interface_factory(
network=network1, ip="192.168.1.11", host=interface1.host
)
interface3 = interface_factory(network=network2, ip="192.168.2.10")
response = get(client, f"{API_URL}/network/interfaces", token=readonly_token)
check_input_is_subset_of_response(
response, (interface1.to_dict(), interface2.to_dict(), interface3.to_dict())
)
response = get(
client,
f"{API_URL}/network/interfaces?network_id={network2.id}",
token=readonly_token,
)
check_input_is_subset_of_response(response, (interface3.to_dict(),))
def test_get_interfaces_by_domain(
client, domain_factory, network_factory, interface_factory, readonly_token
):
domain1 = domain_factory(name="tn.esss.lu.se")
domain2 = domain_factory(name="ics.esss.lu.se")
network1 = network_factory(
address="192.168.1.0/24",
first_ip="192.168.1.10",
last_ip="192.168.1.250",
domain=domain1,
)
network2 = network_factory(
address="192.168.2.0/24",
first_ip="192.168.2.10",
last_ip="192.168.2.250",
domain=domain2,
)
interface1 = interface_factory(network=network1, ip="192.168.1.10")
interface2 = interface_factory(network=network1, ip="192.168.1.11")
interface3 = interface_factory(network=network2, ip="192.168.2.10")
response = get(
client,
f"{API_URL}/network/interfaces?domain=tn.esss.lu.se",
token=readonly_token,
)
assert response.status_code == 200
check_input_is_subset_of_response(
response, (interface1.to_dict(), interface2.to_dict())
)
response = get(
client,
f"{API_URL}/network/interfaces?domain=ics.esss.lu.se",
token=readonly_token,
)
assert response.status_code == 200
check_input_is_subset_of_response(response, (interface3.to_dict(),))
def test_get_interfaces_by_network(
client, network_factory, interface_factory, readonly_token
):
# Create some interfaces
network1 = network_factory(
vlan_name="MyNetwork1",
address="192.168.1.0/24",
first_ip="192.168.1.10",
last_ip="192.168.1.250",
)
network2 = network_factory(
vlan_name="MyNetwork2",
address="192.168.2.0/24",
first_ip="192.168.2.10",
last_ip="192.168.2.250",
)