# -*- coding: utf-8 -*- """ tests.functional.test_api ~~~~~~~~~~~~~~~~~~~~~~~~~ This module defines API tests. :copyright: (c) 2017 European Spallation Source ERIC :license: BSD 2-Clause, see LICENSE for more details. """ import datetime import json import pytest from app import models API_URL = "/api/v1" ENDPOINT_MODEL = { "inventory/actions": models.Action, "inventory/manufacturers": models.Manufacturer, "inventory/models": models.Model, "inventory/locations": models.Location, "inventory/statuses": models.Status, "inventory/items": models.Item, "inventory/macs": models.Mac, "network/networks": models.Network, "network/interfaces": models.Interface, "network/hosts": models.Host, "network/groups": models.AnsibleGroup, "network/domains": models.Domain, "network/cnames": models.Cname, } GENERIC_GET_ENDPOINTS = [ key for key in ENDPOINT_MODEL.keys() if key.startswith("inventory") and key not in ("inventory/items", "inventory/macs") ] GENERIC_CREATE_ENDPOINTS = [ key for key in ENDPOINT_MODEL.keys() if key.startswith("inventory") and key not in ("inventory/items", "inventory/actions", "inventory/macs") ] CREATE_AUTH_ENDPOINTS = [ key for key in ENDPOINT_MODEL.keys() if key != "inventory/actions" ] HOST_KEYS = { "id", "name", "fqdn", "is_ioc", "device_type", "model", "description", "items", "interfaces", "ansible_vars", "ansible_groups", "created_at", "updated_at", "user", } INTERFACE_KEYS = { "id", "is_main", "network", "ip", "netmask", "name", "mac", "domain", "host", "device_type", "model", "cnames", "created_at", "updated_at", "user", } def get(client, url, token=None): response = client.get( url, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {token}", }, ) return response def post(client, url, data, token=None): headers = {"Content-Type": "application/json"} if token is not None: headers["Authorization"] = f"Bearer {token}" response = client.post(url, data=json.dumps(data), headers=headers) return response def patch(client, url, data, token=None): headers = {"Content-Type": "application/json"} if token is not None: headers["Authorization"] = f"Bearer {token}" response = client.patch(url, data=json.dumps(data), headers=headers) return response def delete(client, url, token=None): headers = {"Content-Type": "application/json"} if token is not None: headers["Authorization"] = f"Bearer {token}" response = client.delete(url, headers=headers) return response def login(client, username, password): data = {"username": username, "password": password} return post(client, f"{API_URL}/user/login", data) def get_token(client, username, password): response = login(client, username, password) return response.get_json()["access_token"] @pytest.fixture() def readonly_token(client): return get_token(client, "user_ro", "userro") @pytest.fixture() def user_token(client): return get_token(client, "user_rw", "userrw") @pytest.fixture() def consultant_token(client): return get_token(client, "consultant", "consultantpwd") @pytest.fixture() def admin_token(client): return get_token(client, "admin", "adminpasswd") @pytest.fixture def no_login_check_token(request, app): app.config["LOGIN_DISABLED"] = True client = app.test_client() # We still need to login, otherwise an AnonymousUserMixin is returned # An AnonymousUser doesn't have all the User methods yield get_token(client, "user_ro", "userro") app.config["LOGIN_DISABLED"] = False def check_response_message(response, msg, status_code=400): assert response.status_code == status_code try: data = response.get_json() except AttributeError: data = json.loads(response.data) try: message = data["message"] except KeyError: # flask-jwt-extended is using "msg" instead of "message" # in its default callbacks message = data["msg"] assert message.startswith(msg) def check_names(response, names): response_names = set(item["name"] for item in response.get_json()) assert set(names) == response_names def check_input_is_subset_of_response(response, inputs): # Sort the response by id to match the inputs order response_elts = sorted(response.get_json(), key=lambda d: d["id"]) for d1, d2 in zip(inputs, response_elts): for key, value in d1.items(): if isinstance(value, datetime.datetime): value = value.strftime("%Y-%m-%d %H:%M") assert d2[key] == value def test_login(client): response = client.post(f"{API_URL}/user/login") check_response_message(response, "Body should be a JSON object") response = post( client, f"{API_URL}/user/login", data={"username": "foo", "passwd": ""} ) check_response_message( response, "Missing mandatory field (username or password)", 422 ) response = login(client, "foo", "invalid") check_response_message(response, "Invalid credentials", 401) response = login(client, "user_ro", "userro") assert response.status_code == 200 assert "access_token" in response.get_json() @pytest.mark.parametrize("endpoint", GENERIC_GET_ENDPOINTS) def test_get_generic_model(endpoint, session, client, readonly_token): model = ENDPOINT_MODEL[endpoint] names = ("Foo", "Bar", "Alice") for name in names: session.add(model(name=name)) session.commit() response = client.get(f"{API_URL}/{endpoint}") check_response_message(response, "Missing Authorization Header", 401) response = get(client, f"{API_URL}/{endpoint}", "xxxxxxxxx") check_response_message(response, "Not enough segments", 422) response = get(client, f"{API_URL}/{endpoint}", readonly_token) check_names(response, names) response = get(client, f"{API_URL}/{endpoint}", readonly_token) check_names(response, names) for item in response.get_json(): assert "qrcode" in item @pytest.mark.parametrize("endpoint", CREATE_AUTH_ENDPOINTS) def test_create_model_auth_fail(endpoint, client, readonly_token): response = client.post(f"{API_URL}/{endpoint}") check_response_message(response, "Missing Authorization Header", 401) response = post(client, f"{API_URL}/{endpoint}", data={}, token="xxxxxxxxx") check_response_message(response, "Not enough segments", 422) response = post(client, f"{API_URL}/{endpoint}", data={}, token=readonly_token) check_response_message(response, "User doesn't have the required group", 403) model = ENDPOINT_MODEL[endpoint] assert model.query.count() == 0 @pytest.mark.parametrize("endpoint", GENERIC_CREATE_ENDPOINTS) def test_create_generic_model(endpoint, client, user_token): response = post(client, f"{API_URL}/{endpoint}", data={}, token=user_token) check_response_message(response, "Missing mandatory field 'name'", 422) data = {"name": "Foo"} response = post(client, f"{API_URL}/{endpoint}", data=data, token=user_token) assert response.status_code == 201 assert {"id", "name"} <= set(response.get_json().keys()) assert response.get_json()["name"] == "Foo" response = post(client, f"{API_URL}/{endpoint}", data=data, token=user_token) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) response = post( client, f"{API_URL}/{endpoint}", data={"name": "foo"}, token=user_token ) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) response = post( client, f"{API_URL}/{endpoint}", data={"name": "FOO"}, token=user_token ) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) data = {"name": "Bar", "description": "Bar description"} response = post(client, f"{API_URL}/{endpoint}", data=data, token=user_token) assert response.status_code == 201 assert response.get_json()["description"] == "Bar description" model = ENDPOINT_MODEL[endpoint] assert model.query.count() == 2 response = get(client, f"{API_URL}/{endpoint}", user_token) check_names(response, ("Foo", "Bar")) @pytest.mark.parametrize("endpoint", GENERIC_CREATE_ENDPOINTS) def test_create_generic_model_invalid_param(endpoint, client, user_token): model = ENDPOINT_MODEL[endpoint] response = post( client, f"{API_URL}/{endpoint}", data={"name": "foo", "hello": "world"}, token=user_token, ) check_response_message( response, f"'hello' is an invalid keyword argument for {model.__name__}", 422 ) def test_create_item(client, user_token): # check that serial_number is mandatory response = post(client, f"{API_URL}/inventory/items", data={}, token=user_token) check_response_message(response, "Missing mandatory field 'serial_number'", 422) # check create with only serial_number data = {"serial_number": "123456"} response = post(client, f"{API_URL}/inventory/items", data=data, token=user_token) assert response.status_code == 201 assert { "id", "ics_id", "serial_number", "manufacturer", "model", "quantity", "location", "status", "parent", "children", "macs", "history", "host", "stack_member", "updated_at", "created_at", "user", "comments", } == set(response.get_json().keys()) assert response.get_json()["serial_number"] == "123456" # Check that serial_number doesn't have to be unique response = post(client, f"{API_URL}/inventory/items", data=data, token=user_token) assert response.status_code == 201 # check that ics_id shall be unique data2 = {"serial_number": "456789", "ics_id": "AAA001"} response = post(client, f"{API_URL}/inventory/items", data=data2, token=user_token) assert response.status_code == 201 response = post(client, f"{API_URL}/inventory/items", data=data2, token=user_token) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) # check all items that were created assert models.Item.query.count() == 3 response = get(client, f"{API_URL}/inventory/items", user_token) check_input_is_subset_of_response(response, (data, data, data2)) def test_create_item_with_host_id(client, host_factory, user_token): host = host_factory() # Check that we can pass an host_id data = {"serial_number": "123456", "host_id": host.id} response = post(client, f"{API_URL}/inventory/items", data=data, token=user_token) assert response.status_code == 201 item = models.Item.query.filter_by(serial_number=data["serial_number"]).first() assert item.host_id == host.id def test_create_item_invalid_ics_id(client, user_token): for ics_id in ("foo", "AAB1234", "AZ02", "WS007", "AAA01"): data = {"serial_number": "123456", "ics_id": ics_id} response = post( client, f"{API_URL}/inventory/items", data=data, token=user_token ) check_response_message(response, "ICS id shall match [A-Z]{3}[0-9]{3}", 422) def test_get_item_fail(client, session, readonly_token): response = get(client, f"{API_URL}/inventory/items/50", token=readonly_token) check_response_message(response, "Item id '50' not found", 404) response = get(client, f"{API_URL}/inventory/items/bar", token=readonly_token) check_response_message(response, "Item id 'bar' not found", 404) def test_get_item(client, status_factory, item_factory, readonly_token): # Create some items status_factory(name="Stock") item1 = item_factory(serial_number="123456") item2 = item_factory(serial_number="234567", ics_id="AAA001", status="Stock") # we can get items by id... response = get( client, f"{API_URL}/inventory/items/{item1.id}", token=readonly_token ) assert response.status_code == 200 assert response.get_json()["id"] == item1.id assert response.get_json()["serial_number"] == item1.serial_number # ...or ics_id response = get( client, f"{API_URL}/inventory/items/{item2.ics_id}", token=readonly_token ) assert response.status_code == 200 assert response.get_json()["id"] == item2.id assert response.get_json()["ics_id"] == item2.ics_id assert response.get_json()["serial_number"] == item2.serial_number assert response.get_json()["status"] == str(item2.status) def test_patch_item_auth_fail(client, session, readonly_token): response = client.patch(f"{API_URL}/inventory/items/50") check_response_message(response, "Missing Authorization Header", 401) response = patch( client, f"{API_URL}/inventory/items/50", data={}, token="xxxxxxxxx" ) check_response_message(response, "Not enough segments", 422) response = patch( client, f"{API_URL}/inventory/items/50", data={}, token=readonly_token ) check_response_message(response, "User doesn't have the required group", 403) def test_patch_item_fail(client, item_factory, user_token): response = patch(client, f"{API_URL}/inventory/items/50", data={}, token=user_token) check_response_message(response, "At least one field is required", 422) data = {"location": "ESS", "foo": "bar"} response = patch( client, f"{API_URL}/inventory/items/50", data=data, token=user_token ) check_response_message(response, "Invalid field 'foo'", 422) data = {"location": "ESS"} response = patch( client, f"{API_URL}/inventory/items/50", data=data, token=user_token ) check_response_message(response, "Item id '50' not found", 404) response = patch( client, f"{API_URL}/inventory/items/bar", data=data, token=user_token ) check_response_message(response, "Item id 'bar' not found", 404) # Create an item item1 = item_factory(serial_number="234567", ics_id="AAA001") # check that we can't change the serial_number or ics_id response = patch( client, f"{API_URL}/inventory/items/{item1.id}", data={"serial_number": "12345"}, token=user_token, ) check_response_message(response, "Invalid field 'serial_number'", 422) response = patch( client, f"{API_URL}/inventory/items/{item1.id}", data={"ics_id": "AAA002"}, token=user_token, ) check_response_message(response, "'ics_id' can't be changed", 422) def test_patch_item(client, status, item_factory, user_token): # Create some items item1 = item_factory(ics_id="ZZZ001") item2 = item_factory() # we can patch items by id... data = {"ics_id": "AAB004"} response = patch( client, f"{API_URL}/inventory/items/{item1.id}", data=data, token=user_token ) assert response.status_code == 200 assert response.get_json()["id"] == item1.id assert response.get_json()["serial_number"] == item1.serial_number assert response.get_json()["ics_id"] == data["ics_id"] # ...or ics_id data = {"status": status.name} response = patch( client, f"{API_URL}/inventory/items/{item2.ics_id}", data=data, token=user_token ) assert response.status_code == 200 assert response.get_json()["id"] == item2.id assert response.get_json()["ics_id"] == item2.ics_id assert response.get_json()["serial_number"] == item2.serial_number assert response.get_json()["status"] == data["status"] def test_patch_item_integrity_error(client, user_token, item_factory): # Create some items item1 = item_factory() item2 = item_factory(ics_id="ZZZ001") data = {"ics_id": item1.ics_id} response = patch( client, f"{API_URL}/inventory/items/{item2.id}", data=data, token=user_token ) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) def test_patch_item_parent( client, location_factory, manufacturer_factory, status_factory, item_factory, user_token, ): # Create some items location_factory(name="ESS") manufacturer_factory(name="HP") status_factory(name="In service") status_factory(name="Stock") item1 = item_factory(ics_id="AAA001", status="In service") item2 = item_factory(ics_id="AAA002") item3 = item_factory(ics_id="AAA003") item3.parent_id = item1.id # set parent changes the status and location data1 = {"parent": item1.ics_id} response = patch( client, f"{API_URL}/inventory/items/{item2.ics_id}", data=data1, token=user_token, ) assert response.status_code == 200 assert response.get_json()["id"] == item2.id assert response.get_json()["ics_id"] == item2.ics_id assert response.get_json()["serial_number"] == item2.serial_number assert response.get_json()["parent"] == item1.ics_id assert response.get_json()["status"] == str(item1.status) assert response.get_json()["location"] == str(item1.location) # updating a parent, modifies the status and location of all children # check location data2 = {"location": "ESS"} response = patch( client, f"{API_URL}/inventory/items/{item1.ics_id}", data=data2, token=user_token, ) assert response.status_code == 200 assert response.get_json()["id"] == item1.id assert response.get_json()["ics_id"] == item1.ics_id assert response.get_json()["serial_number"] == item1.serial_number assert response.get_json()["status"] == str(item1.status) assert response.get_json()["location"] == data2["location"] for ics_id in ("AAA002", "AAA003"): response = get(client, f"{API_URL}/inventory/items/{ics_id}", token=user_token) assert response.get_json()["location"] == data2["location"] assert response.get_json()["status"] == "In service" # check status data3 = {"status": "Stock"} response = patch( client, f"{API_URL}/inventory/items/{item1.ics_id}", data=data3, token=user_token, ) assert response.status_code == 200 assert response.get_json()["status"] == data3["status"] for ics_id in ("AAA002", "AAA003"): response = get(client, f"{API_URL}/inventory/items/{ics_id}", token=user_token) assert response.get_json()["location"] == data2["location"] assert response.get_json()["status"] == data3["status"] # manufacturer has no impact on children data4 = {"manufacturer": "HP"} response = patch( client, f"{API_URL}/inventory/items/{item1.ics_id}", data=data4, token=user_token, ) assert response.status_code == 200 assert response.get_json()["manufacturer"] == "HP" # Manufacturer didn't change on children response = get( client, f"{API_URL}/inventory/items/{item2.ics_id}", token=user_token ) assert response.get_json()["manufacturer"] == str(item2.manufacturer) assert str(item2.manufacturer) != "HP" response = get( client, f"{API_URL}/inventory/items/{item3.ics_id}", token=user_token ) assert response.get_json()["manufacturer"] == str(item3.manufacturer) assert str(item3.manufacturer) != "HP" def test_get_items(client, location_factory, item_factory, readonly_token): # Create some items location_factory(name="ESS") item1 = item_factory(location="ESS") item2 = item_factory(serial_number="234567") item3 = item_factory() response = get(client, f"{API_URL}/inventory/items", token=readonly_token) assert response.status_code == 200 assert len(response.get_json()) == 3 check_input_is_subset_of_response( response, (item1.to_dict(), item2.to_dict(), item3.to_dict()) ) # test filtering response = get( client, f"{API_URL}/inventory/items?serial_number=234567", token=readonly_token ) assert response.status_code == 200 assert len(response.get_json()) == 1 check_input_is_subset_of_response(response, (item2.to_dict(),)) # filtering on location_id works but not location (might want to change that) response = get( client, f"{API_URL}/inventory/items?location_id={item1.location_id}", token=readonly_token, ) assert response.status_code == 200 assert len(response.get_json()) == 1 check_input_is_subset_of_response(response, (item1.to_dict(),)) response = get( client, f"{API_URL}/inventory/items?location=ESS", token=readonly_token ) check_response_message(response, "Invalid query arguments", 422) # using an unknown key raises a 422 response = get(client, f"{API_URL}/inventory/items?foo=bar", token=readonly_token) check_response_message(response, "Invalid query arguments", 422) def test_get_networks(client, network_factory, readonly_token): # Create some networks network1 = network_factory( address="172.16.1.0/24", first_ip="172.16.1.1", last_ip="172.16.1.254" ) network2 = network_factory( address="172.16.20.0/22", first_ip="172.16.20.11", last_ip="172.16.20.250" ) network3 = network_factory( address="172.16.5.0/24", first_ip="172.16.5.10", last_ip="172.16.5.254" ) response = get(client, f"{API_URL}/network/networks", token=readonly_token) assert response.status_code == 200 assert len(response.get_json()) == 3 check_input_is_subset_of_response( response, (network1.to_dict(), network2.to_dict(), network3.to_dict()) ) # test filtering by address response = get( client, f"{API_URL}/network/networks?address=172.16.20.0/22", token=readonly_token, ) assert response.status_code == 200 assert len(response.get_json()) == 1 check_input_is_subset_of_response(response, (network2.to_dict(),)) def test_create_network_auth_fail(client, session, user_token): # admin is required to create networks response = post(client, f"{API_URL}/network/networks", data={}, token=user_token) check_response_message(response, "User doesn't have the required group", 403) def test_create_network(client, admin_token, network_scope_factory): scope = network_scope_factory(supernet="172.16.0.0/16") # check that vlan_name, vlan_id, address, first_ip, last_ip, gateway and scope are mandatory response = post(client, f"{API_URL}/network/networks", data={}, token=admin_token) check_response_message(response, "Missing mandatory field 'vlan_name'", 422) response = post( client, f"{API_URL}/network/networks", data={"first_ip": "172.16.1.10", "last_ip": "172.16.1.250"}, token=admin_token, ) check_response_message(response, "Missing mandatory field 'vlan_name'", 422) response = post( client, f"{API_URL}/network/networks", data={"address": "172.16.1.0/24"}, token=admin_token, ) check_response_message(response, "Missing mandatory field 'vlan_name'", 422) response = post( client, f"{API_URL}/network/networks", data={"vlan_name": "network1"}, token=admin_token, ) check_response_message(response, "Missing mandatory field 'vlan_id'", 422) response = post( client, f"{API_URL}/network/networks", data={"vlan_name": "network1", "vlan_id": 1600}, token=admin_token, ) check_response_message(response, "Missing mandatory field 'address'", 422) response = post( client, f"{API_URL}/network/networks", data={ "vlan_name": "network1", "vlan_id": 1600, "address": "172.16.1.0/24", "first_ip": "172.16.1.10", }, token=admin_token, ) check_response_message(response, "Missing mandatory field 'last_ip'", 422) response = post( client, f"{API_URL}/network/networks", data={ "vlan_name": "network1", "vlan_id": 1600, "address": "172.16.1.0/24", "first_ip": "172.16.1.10", "last_ip": "172.16.1.250", "scope": scope.name, }, token=admin_token, ) check_response_message(response, "Missing mandatory field 'gateway'", 422) data = { "vlan_name": "network1", "vlan_id": 1600, "address": "172.16.1.0/24", "first_ip": "172.16.1.10", "last_ip": "172.16.1.250", "gateway": "172.16.1.254", "scope": scope.name, } response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token) assert response.status_code == 201 assert { "id", "vlan_name", "vlan_id", "address", "netmask", "first_ip", "last_ip", "gateway", "description", "admin_only", "scope", "domain", "interfaces", "created_at", "updated_at", "user", } == set(response.get_json().keys()) assert response.get_json()["vlan_name"] == "network1" assert response.get_json()["vlan_id"] == 1600 assert response.get_json()["address"] == "172.16.1.0/24" assert response.get_json()["first_ip"] == "172.16.1.10" assert response.get_json()["last_ip"] == "172.16.1.250" assert response.get_json()["gateway"] == "172.16.1.254" assert response.get_json()["netmask"] == "255.255.255.0" # Check that address and name shall be unique response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) data_same_address = data.copy() data_same_address["vlan_name"] = "networkX" response = post( client, f"{API_URL}/network/networks", data=data_same_address, token=admin_token ) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) data_same_name = { "vlan_name": "network1", "vlan_id": "1600", "address": "172.16.2.0/24", "first_ip": "172.16.2.10", "last_ip": "172.16.2.250", "gateway": "172.16.2.254", "scope": scope.name, } response = post( client, f"{API_URL}/network/networks", data=data_same_name, token=admin_token ) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) # Check that all parameters can be passed data2 = { "vlan_name": "network2", "vlan_id": "1601", "address": "172.16.5.0/24", "first_ip": "172.16.5.11", "last_ip": "172.16.5.250", "gateway": "172.16.5.254", "description": "long description", "scope": scope.name, } response = post( client, f"{API_URL}/network/networks", data=data2, token=admin_token ) assert response.status_code == 201 assert response.get_json()["description"] == "long description" # check all items that were created assert models.Network.query.count() == 2 def test_create_network_invalid_address(client, admin_token, network_scope): # invalid network address data = { "vlan_name": "network1", "vlan_id": "1600", "address": "foo", "first_ip": "172.16.1.10", "last_ip": "172.16.1.250", "gateway": "172.16.1.254", "scope": network_scope.name, } response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token) check_response_message( response, "'foo' does not appear to be an IPv4 or IPv6 network", 422 ) data["address"] = "172.16.1" response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token) check_response_message( response, "'172.16.1' does not appear to be an IPv4 or IPv6 network", 422 ) # address has host bits set data["address"] = "172.16.1.1/24" response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token) check_response_message(response, "172.16.1.1/24 has host bits set", 422) @pytest.mark.parametrize("address", ("", "foo", "192.168")) def test_create_network_invalid_ip( address, client, session, admin_token, network_scope ): # invalid first IP address data = { "vlan_name": "network1", "vlan_id": "1600", "address": "192.168.0.0/24", "first_ip": address, "last_ip": "192.168.0.250", "gateway": "192.168.0.254", "scope": network_scope.name, } response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token) check_response_message( response, f"'{address}' does not appear to be an IPv4 or IPv6 address", 422 ) # invalid last IP address data = { "vlan_name": "network1", "vlan_id": "1600", "address": "192.168.0.0/24", "first_ip": "192.168.0.250", "last_ip": address, "gateway": "192.168.0.254", "scope": network_scope.name, } response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token) check_response_message( response, f"'{address}' does not appear to be an IPv4 or IPv6 address", 422 ) def test_create_network_invalid_range(client, session, admin_token, network_scope): # first_ip not in network address data = { "vlan_name": "network1", "vlan_id": "1600", "address": "172.16.1.0/24", "first_ip": "172.16.2.10", "last_ip": "172.16.1.250", "gateway": "172.16.1.254", "scope": network_scope.name, } response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token) check_response_message( response, "IP address 172.16.2.10 is not in network 172.16.1.0/24", 422 ) # last_ip not in network address data = { "vlan_name": "network1", "vlan_id": "1600", "address": "172.16.1.0/24", "first_ip": "172.16.1.10", "last_ip": "172.16.5.250", "gateway": "172.16.1.1", "scope": network_scope.name, } response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token) check_response_message( response, "IP address 172.16.5.250 is not in network 172.16.1.0/24", 422 ) # first_ip > last_ip data = { "vlan_name": "network1", "vlan_id": "1600", "address": "172.16.1.0/24", "first_ip": "172.16.1.10", "last_ip": "172.16.1.9", "gateway": "172.16.1.1", "scope": network_scope.name, } response = post(client, f"{API_URL}/network/networks", data=data, token=admin_token) check_response_message( response, "Last IP address 172.16.1.9 is less than the first address 172.16.1.10", 422, ) def test_get_interfaces(client, network_factory, interface_factory, readonly_token): # Create some interfaces network1 = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) network2 = network_factory( address="192.168.2.0/24", first_ip="192.168.2.10", last_ip="192.168.2.250" ) interface1 = interface_factory(network=network1, ip="192.168.1.10") interface2 = interface_factory( network=network1, ip="192.168.1.11", host=interface1.host ) interface3 = interface_factory(network=network2, ip="192.168.2.10") response = get(client, f"{API_URL}/network/interfaces", token=readonly_token) assert response.status_code == 200 assert len(response.get_json()) == 3 check_input_is_subset_of_response( response, (interface1.to_dict(), interface2.to_dict(), interface3.to_dict()) ) # test filtering by network_id response = get( client, f"{API_URL}/network/interfaces?network_id={network2.id}", token=readonly_token, ) assert response.status_code == 200 assert len(response.get_json()) == 1 check_input_is_subset_of_response(response, (interface3.to_dict(),)) def test_get_interfaces_by_domain( client, domain_factory, network_factory, interface_factory, readonly_token ): # Create some interfaces domain1 = domain_factory(name="tn.esss.lu.se") domain2 = domain_factory(name="ics.esss.lu.se") network1 = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250", domain=domain1, ) network2 = network_factory( address="192.168.2.0/24", first_ip="192.168.2.10", last_ip="192.168.2.250", domain=domain2, ) interface1 = interface_factory(network=network1, ip="192.168.1.10") interface2 = interface_factory(network=network1, ip="192.168.1.11") interface3 = interface_factory(network=network2, ip="192.168.2.10") # test filtering by domain response = get( client, f"{API_URL}/network/interfaces?domain=tn.esss.lu.se", token=readonly_token, ) assert response.status_code == 200 assert len(response.get_json()) == 2 check_input_is_subset_of_response( response, (interface1.to_dict(), interface2.to_dict()) ) response = get( client, f"{API_URL}/network/interfaces?domain=ics.esss.lu.se", token=readonly_token, ) assert response.status_code == 200 assert len(response.get_json()) == 1 check_input_is_subset_of_response(response, (interface3.to_dict(),)) def test_get_interfaces_by_network( client, network_factory, interface_factory, readonly_token ): # Create some interfaces network1 = network_factory( vlan_name="MyNetwork1", address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250", ) network2 = network_factory( vlan_name="MyNetwork2", address="192.168.2.0/24", first_ip="192.168.2.10", last_ip="192.168.2.250", ) interface1 = interface_factory(network=network1, ip="192.168.1.10") interface2 = interface_factory(network=network1, ip="192.168.1.11") interface3 = interface_factory(network=network2, ip="192.168.2.10") # test filtering by network name response = get( client, f"{API_URL}/network/interfaces?network=MyNetwork1", token=readonly_token ) assert response.status_code == 200 assert len(response.get_json()) == 2 check_input_is_subset_of_response( response, (interface1.to_dict(), interface2.to_dict()) ) response = get( client, f"{API_URL}/network/interfaces?network=MyNetwork2", token=readonly_token ) assert response.status_code == 200 assert len(response.get_json()) == 1 check_input_is_subset_of_response(response, (interface3.to_dict(),)) def test_get_interfaces_with_model( client, model_factory, item_factory, host_factory, interface_factory, readonly_token ): host1 = host_factory() model1 = model_factory(name="EX3400") item_factory(model=model1, host_id=host1.id) interface_factory(host=host1) response = get(client, f"{API_URL}/network/interfaces", token=readonly_token) assert response.get_json()[0]["model"] == "EX3400" def test_create_interface_fails(client, host, network_factory, no_login_check_token): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) # check that network_id and ip are mandatory response = post( client, f"{API_URL}/network/interfaces", data={}, token=no_login_check_token ) check_response_message(response, "Missing mandatory field 'network'", 422) response = post( client, f"{API_URL}/network/interfaces", data={"ip": "192.168.1.20"}, token=no_login_check_token, ) check_response_message(response, "Missing mandatory field 'network'", 422) response = post( client, f"{API_URL}/network/interfaces", data={"network": network.address}, token=no_login_check_token, ) check_response_message(response, "Missing mandatory field 'ip'", 422) data = {"network": network.vlan_name, "ip": "192.168.1.20", "name": "interface1"} response = post( client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message(response, "Missing mandatory field 'host'", 422) data["host"] = host.name response = post( client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message( response, f"Interface name shall start with the host name '{host.name}'", 422 ) def test_create_interface(client, host, network_factory, no_login_check_token): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) data = { "network": network.vlan_name, "ip": "192.168.1.20", "name": host.name, "host": host.name, } response = post( client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) assert response.status_code == 201 assert INTERFACE_KEYS == set(response.get_json().keys()) assert response.get_json()["network"] == network.vlan_name assert response.get_json()["ip"] == "192.168.1.20" assert response.get_json()["name"] == host.name # This is the main interface assert response.get_json()["is_main"] # Check that all parameters can be passed data2 = { "network": network.vlan_name, "ip": "192.168.1.21", "name": host.name + "-2", "host": host.name, "mac": "7c:e2:ca:64:d0:68", } response = post( client, f"{API_URL}/network/interfaces", data=data2, token=no_login_check_token ) assert response.status_code == 201 # This is not the main interface assert not response.get_json()["is_main"] # check all items that were created assert models.Interface.query.count() == 2 # Check that IP and name shall be unique response = post( client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) @pytest.mark.parametrize("ip", ("", "foo", "192.168")) def test_create_interface_invalid_ip( ip, client, host, network_factory, no_login_check_token ): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) # invalid IP address data = { "network": network.vlan_name, "ip": ip, "name": host.name, "host": host.name, } response = post( client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message( response, f"'{ip}' does not appear to be an IPv4 or IPv6 address", 422 ) def test_create_interface_ip_not_in_network( client, host, network_factory, no_login_check_token ): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) # IP address not in range data = { "network": network.vlan_name, "ip": "192.168.2.4", "name": host.name, "host": host.name, } response = post( client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message( response, "IP address 192.168.2.4 is not in network 192.168.1.0/24", 422 ) def test_create_interface_ip_not_in_range( client, host, network_factory, no_login_check_token ): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) # IP address not in range data = { "network": network.vlan_name, "ip": "192.168.1.4", "name": host.name, "host": host.name, } response = post( client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message( response, "IP address 192.168.1.4 is not in range 192.168.1.10 - 192.168.1.250", 422, ) def test_create_interface_ip_not_in_range_as_admin( client, host, network_factory, admin_token ): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) # IP address not in range data = { "network": network.vlan_name, "ip": "192.168.1.4", "name": host.name, "host": host.name, } response = post( client, f"{API_URL}/network/interfaces", data=data, token=admin_token ) assert response.status_code == 201 def test_delete_interface_invalid_credentials(client, interface_factory, user_token): interface1 = interface_factory() response = delete( client, f"{API_URL}/network/interfaces/{interface1.id}", token=user_token ) assert response.status_code == 403 assert len(models.Interface.query.all()) == 1 def test_delete_interface_success(client, interface_factory, admin_token): interface1 = interface_factory() response = delete( client, f"{API_URL}/network/interfaces/{interface1.id}", token=admin_token ) assert response.status_code == 204 assert len(models.Interface.query.all()) == 0 def test_delete_interface_invalid_id(client, interface_factory, admin_token): interface1 = interface_factory() response = delete( client, f"{API_URL}/network/hosts/{interface1.id + 1}", token=admin_token ) assert response.status_code == 404 assert response.get_json() == {"message": "Resource not found"} assert len(models.Interface.query.all()) == 1 def test_get_macs(client, mac_factory, readonly_token): # Create some macs mac1 = mac_factory() mac2 = mac_factory() response = get(client, f"{API_URL}/inventory/macs", token=readonly_token) assert response.status_code == 200 assert len(response.get_json()) == 2 check_input_is_subset_of_response(response, (mac1.to_dict(), mac2.to_dict())) def test_create_mac(client, item_factory, user_token): item = item_factory() # check that address is mandatory response = post(client, f"{API_URL}/inventory/macs", data={}, token=user_token) check_response_message(response, "Missing mandatory field 'address'", 422) data = {"address": "b5:4b:7d:a4:23:43"} response = post(client, f"{API_URL}/inventory/macs", data=data, token=user_token) assert response.status_code == 201 assert {"id", "address", "item"} == set(response.get_json().keys()) assert response.get_json()["address"] == data["address"] # Check that address shall be unique response = post(client, f"{API_URL}/inventory/macs", data=data, token=user_token) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) # Check that all parameters can be passed data2 = {"address": "b5:4b:7d:a4:23:44", "item_id": item.id} response = post(client, f"{API_URL}/inventory/macs", data=data2, token=user_token) assert response.status_code == 201 # check that all items were created assert models.Mac.query.count() == 2 @pytest.mark.parametrize("address", ("", "foo", "b5:4b:7d:a4:23")) def test_create_mac_invalid_address(address, client, user_token): data = {"address": address} response = post(client, f"{API_URL}/inventory/macs", data=data, token=user_token) check_response_message( response, f"'{address}' does not appear to be a MAC address", 422 ) def test_get_ansible_groups(client, ansible_group_factory, readonly_token): # Create some Ansible groups group1 = ansible_group_factory(vars={"foo": "hello"}) group2 = ansible_group_factory() response = get(client, f"{API_URL}/network/groups", token=readonly_token) assert response.status_code == 200 assert len(response.get_json()) == 2 check_input_is_subset_of_response(response, (group1.to_dict(), group2.to_dict())) def test_create_ansible_group(client, admin_token): # check that name is mandatory response = post(client, f"{API_URL}/network/groups", data={}, token=admin_token) check_response_message(response, "Missing mandatory field 'name'", 422) data = {"name": "mygroup"} response = post(client, f"{API_URL}/network/groups", data=data, token=admin_token) assert response.status_code == 201 assert { "id", "name", "vars", "type", "hosts", "children", "created_at", "updated_at", "user", } == set(response.get_json().keys()) assert response.get_json()["name"] == data["name"] # Check that name shall be unique response = post(client, f"{API_URL}/network/groups", data=data, token=admin_token) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) def test_create_ansible_group_with_vars(client, admin_token): data = {"name": "mygroup", "vars": {"foo": "hello", "mylist": [1, 2, 3]}} response = post(client, f"{API_URL}/network/groups", data=data, token=admin_token) assert response.status_code == 201 assert response.get_json()["vars"] == data["vars"] group = models.AnsibleGroup.query.filter_by(name="mygroup").first() assert group.vars == data["vars"] def test_get_hosts(client, host_factory, readonly_token): # Create some hosts host1 = host_factory() host2 = host_factory() response = get(client, f"{API_URL}/network/hosts", token=readonly_token) assert response.status_code == 200 assert len(response.get_json()) == 2 assert HOST_KEYS == set(response.get_json()[0].keys()) check_input_is_subset_of_response(response, (host1.to_dict(), host2.to_dict())) def test_get_hosts_with_ansible_vars(client, host_factory, readonly_token): vars = {"foo": "hello", "mylist": [1, 2, 3]} host_factory(ansible_vars=vars) response = get(client, f"{API_URL}/network/hosts", token=readonly_token) assert response.status_code == 200 assert response.get_json()[0]["ansible_vars"] == vars def test_get_hosts_with_model( client, model_factory, item_factory, host_factory, readonly_token ): host1 = host_factory() model1 = model_factory(name="EX3400") item_factory(model=model1, host_id=host1.id) response = get(client, f"{API_URL}/network/hosts", token=readonly_token) assert response.status_code == 200 assert response.get_json()[0]["model"] == "EX3400" def test_get_hosts_with_no_model(client, host_factory, readonly_token): host_factory() response = get(client, f"{API_URL}/network/hosts", token=readonly_token) assert response.status_code == 200 assert response.get_json()[0]["model"] is None def test_get_hosts_recursive(client, host_factory, interface_factory, readonly_token): # Create some hosts with interfaces host1 = host_factory() interface11 = interface_factory(name=host1.name, host=host1) interface12 = interface_factory(host=host1) host2 = host_factory() interface21 = interface_factory(host=host2) # Without recursive, we only get the name of the interfaces response = get(client, f"{API_URL}/network/hosts", token=readonly_token) assert response.status_code == 200 assert len(response.get_json()) == 2 rhost1, rhost2 = response.get_json() # Interfaces shall be sorted assert rhost1["interfaces"] == sorted([interface11.name, interface12.name]) assert rhost2["interfaces"] == [interface21.name] # With recursive, interfaces are expanded response = get( client, f"{API_URL}/network/hosts?recursive=true", token=readonly_token ) assert response.status_code == 200 assert len(response.get_json()) == 2 rhost1, rhost2 = response.get_json() assert len(rhost1["interfaces"]) == 2 rinterface11, rinterface12 = rhost1["interfaces"] assert INTERFACE_KEYS == set(rinterface11.keys()) assert INTERFACE_KEYS == set(rinterface12.keys()) assert len(rhost2["interfaces"]) == 1 rinterface21 = rhost2["interfaces"][0] assert INTERFACE_KEYS == set(rinterface21.keys()) assert rinterface21["network"] == interface21.network.vlan_name def test_create_host(client, device_type_factory, user_token): device_type = device_type_factory(name="Virtual") # check that name and device_type are mandatory response = post(client, f"{API_URL}/network/hosts", data={}, token=user_token) check_response_message(response, "Missing mandatory field 'name'", 422) response = post( client, f"{API_URL}/network/hosts", data={"name": "myhost"}, token=user_token ) check_response_message(response, "Missing mandatory field 'device_type'", 422) response = post( client, f"{API_URL}/network/hosts", data={"device_type": "Physical"}, token=user_token, ) check_response_message(response, "Missing mandatory field 'name'", 422) data = {"name": "my-hostname", "device_type": device_type.name} response = post(client, f"{API_URL}/network/hosts", data=data, token=user_token) assert response.status_code == 201 assert { "id", "name", "fqdn", "is_ioc", "device_type", "model", "description", "items", "interfaces", "ansible_vars", "ansible_groups", "created_at", "updated_at", "user", } == set(response.get_json().keys()) assert response.get_json()["name"] == data["name"] # Check that name shall be unique response = post(client, f"{API_URL}/network/hosts", data=data, token=user_token) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) # check that the number of items created assert models.Host.query.count() == 1 def test_create_host_with_items(client, item_factory, device_type_factory, user_token): device_type = device_type_factory(name="Network") item1 = item_factory(ics_id="AAA001") item2 = item_factory(ics_id="AAA002") # Check that we can pass a list of items ics_id data = { "name": "my-switch", "device_type": device_type.name, "items": [item1.ics_id, item2.ics_id], } response = post(client, f"{API_URL}/network/hosts", data=data, token=user_token) assert response.status_code == 201 host = models.Host.query.filter_by(name="my-switch").first() assert models.Item.query.get(item1.id).host_id == host.id assert models.Item.query.get(item2.id).host_id == host.id def test_create_host_with_ansible_vars(client, device_type_factory, user_token): device_type = device_type_factory(name="VirtualMachine") data = { "name": "my-host", "device_type": device_type.name, "ansible_vars": {"foo": "hello", "mylist": [1, 2, 3]}, } response = post(client, f"{API_URL}/network/hosts", data=data, token=user_token) assert response.status_code == 201 assert response.get_json()["ansible_vars"] == data["ansible_vars"] host = models.Host.query.filter_by(name="my-host").first() assert host.ansible_vars == data["ansible_vars"] def test_create_host_with_ansible_groups( client, device_type_factory, ansible_group_factory, user_token ): device_type = device_type_factory(name="VirtualMachine") group1 = ansible_group_factory(name="mygroup") group2 = ansible_group_factory(name="another") data = { "name": "my-host", "device_type": device_type.name, "ansible_groups": [group1.name, group2.name], } response = post(client, f"{API_URL}/network/hosts", data=data, token=user_token) assert response.status_code == 201 host = models.Host.query.filter_by(name="my-host").first() assert host.ansible_groups == [group1, group2] def test_create_host_as_consultant( client, item_factory, device_type_factory, consultant_token ): device_type = device_type_factory() data = {"name": "my-hostname", "device_type": device_type.name} response = post( client, f"{API_URL}/network/hosts", data=data, token=consultant_token ) assert response.status_code == 201 def test_delete_host_invalid_credentials(client, host_factory, user_token): host1 = host_factory() response = delete(client, f"{API_URL}/network/hosts/{host1.id}", token=user_token) assert response.status_code == 403 assert len(models.Host.query.all()) == 1 def test_delete_host_success(client, host_factory, admin_token): host1 = host_factory() response = delete(client, f"{API_URL}/network/hosts/{host1.id}", token=admin_token) assert response.status_code == 204 assert len(models.Host.query.all()) == 0 def test_delete_host_invalid_id(client, host_factory, admin_token): host1 = host_factory() response = delete( client, f"{API_URL}/network/hosts/{host1.id + 1}", token=admin_token ) assert response.status_code == 404 assert response.get_json() == {"message": "Resource not found"} assert len(models.Host.query.all()) == 1 def test_delete_host_with_interfaces(client, interface_factory, host, admin_token): interface_factory(host=host) interface_factory(host=host) assert len(host.interfaces) == 2 assert len(models.Interface.query.all()) == 2 response = delete(client, f"{API_URL}/network/hosts/{host.id}", token=admin_token) assert response.status_code == 204 assert len(models.Host.query.all()) == 0 assert len(models.Interface.query.all()) == 0 def test_get_user_profile(client, readonly_token): response = get(client, f"{API_URL}/user/profile", token=readonly_token) assert response.status_code == 200 user = response.get_json() assert {"id", "username", "groups", "email", "display_name"} == set(user.keys()) assert user["username"] == "user_ro" assert user["display_name"] == "User RO" assert user["email"] == "user_ro@example.com" def test_get_domains(client, domain_factory, readonly_token): # Create some domains domain1 = domain_factory() domain2 = domain_factory() response = get(client, f"{API_URL}/network/domains", token=readonly_token) assert response.status_code == 200 assert len(response.get_json()) == 2 check_input_is_subset_of_response(response, (domain1.to_dict(), domain2.to_dict())) def test_create_domain(client, admin_token): # check that name is mandatory response = post(client, f"{API_URL}/network/domains", data={}, token=admin_token) check_response_message(response, "Missing mandatory field 'name'", 422) data = {"name": "tn.esss.lu.se"} response = post(client, f"{API_URL}/network/domains", data=data, token=admin_token) assert response.status_code == 201 assert { "id", "name", "scopes", "networks", "created_at", "updated_at", "user", } == set(response.get_json().keys()) assert response.get_json()["name"] == data["name"] # Check that name shall be unique response = post(client, f"{API_URL}/network/domains", data=data, token=admin_token) check_response_message( response, "(psycopg2.IntegrityError) duplicate key value violates unique constraint", 422, ) def test_get_cnames(client, cname_factory, readonly_token): # Create some cnames cname1 = cname_factory() cname2 = cname_factory() response = get(client, f"{API_URL}/network/cnames", token=readonly_token) assert response.status_code == 200 assert len(response.get_json()) == 2 check_input_is_subset_of_response(response, (cname1.to_dict(), cname2.to_dict())) def test_get_cnames_by_domain( client, domain_factory, network_factory, interface_factory, cname_factory, readonly_token, ): # Create some cnames domain_a = domain_factory(name="a.esss.lu.se") domain_b = domain_factory(name="b.esss.lu.se") network_a = network_factory(domain=domain_a) network_b = network_factory(domain=domain_b) interface_a1 = interface_factory(network=network_a) interface_a2 = interface_factory(network=network_a) interface_b1 = interface_factory(network=network_b) cname_a1 = cname_factory(interface=interface_a1) cname_a2 = cname_factory(interface=interface_a1) cname_a3 = cname_factory(interface=interface_a2) cname_b1 = cname_factory(interface=interface_b1) cname_b2 = cname_factory(interface=interface_b1) response = get(client, f"{API_URL}/network/cnames", token=readonly_token) assert response.status_code == 200 assert len(response.get_json()) == 5 response = get( client, f"{API_URL}/network/cnames?domain=a.esss.lu.se", token=readonly_token ) assert response.status_code == 200 assert len(response.get_json()) == 3 check_input_is_subset_of_response( response, (cname_a1.to_dict(), cname_a2.to_dict(), cname_a3.to_dict()) ) response = get( client, f"{API_URL}/network/cnames?domain=b.esss.lu.se", token=readonly_token ) assert response.status_code == 200 assert len(response.get_json()) == 2 check_input_is_subset_of_response( response, (cname_b1.to_dict(), cname_b2.to_dict()) ) def test_create_cname(client, interface, admin_token): # check that name and interface_id are mandatory response = post(client, f"{API_URL}/network/cnames", data={}, token=admin_token) check_response_message(response, "Missing mandatory field 'name'", 422) response = post( client, f"{API_URL}/network/cnames", data={"name": "myhost"}, token=admin_token ) check_response_message(response, "Missing mandatory field 'interface_id'", 422) response = post( client, f"{API_URL}/network/cnames", data={"interface_id": interface.id}, token=admin_token, ) check_response_message(response, "Missing mandatory field 'name'", 422) data = {"name": "myhost", "interface_id": interface.id} response = post(client, f"{API_URL}/network/cnames", data=data, token=admin_token) assert response.status_code == 201 assert {"id", "name", "interface", "created_at", "updated_at", "user"} == set( response.get_json().keys() ) assert response.get_json()["name"] == data["name"] # Check that name shall be unique by domain response = post(client, f"{API_URL}/network/cnames", data=data, token=admin_token) check_response_message( response, f"Duplicate cname on the {interface.network.domain} domain", 422 ) def test_search_hosts(client, host_factory, readonly_token): # Create some hosts host1 = host_factory(name="test-beautiful", description="The Zen of Python") host_factory(name="test-explicit", description="Beautiful is better than ugly.") host_factory(name="another-host") # When no query is passed, all hosts are returned response = get(client, f"{API_URL}/network/hosts/search", token=readonly_token) assert response.status_code == 200 assert len(response.get_json()) == 3 # a keyword is searched in all fields by default response = get( client, f"{API_URL}/network/hosts/search?q=beautiful", token=readonly_token ) assert response.status_code == 200 assert len(response.get_json()) == 2 # a search can be restricted to a specific field response = get( client, f"{API_URL}/network/hosts/search?q=name:beautiful", token=readonly_token ) assert response.status_code == 200 r = response.get_json() assert len(r) == 1 assert HOST_KEYS == set(r[0].keys()) assert r[0]["name"] == host1.name assert r[0]["description"] == host1.description