From 9b6daaff5f380d532364704c93493850adbc5fd5 Mon Sep 17 00:00:00 2001 From: Benjamin Bertrand <benjamin.bertrand@esss.se> Date: Mon, 28 Oct 2019 14:57:10 +0100 Subject: [PATCH] Add endpoints to patch host and interface Endpoints required to update host and interface via the API JIRA INFRA-1406 #action In Progress --- app/api/network.py | 83 ++++++- app/api/utils.py | 51 +++- app/models.py | 34 +-- app/network/forms.py | 13 +- app/utils.py | 74 +++++- tests/functional/test_api.py | 454 ++++++++++++++++++++++++++++++++++- 6 files changed, 645 insertions(+), 64 deletions(-) diff --git a/app/api/network.py b/app/api/network.py index 585bb82..0373fa0 100644 --- a/app/api/network.py +++ b/app/api/network.py @@ -11,9 +11,10 @@ This module implements the network API. """ from flask import Blueprint, request from flask_login import login_required, current_user +from wtforms import ValidationError from .. import models from ..decorators import login_groups_accepted -from ..utils import CSEntryError +from ..utils import CSEntryError, validate_ip from . import utils bp = Blueprint("network_api", __name__) @@ -156,6 +157,54 @@ def create_interface(): ) +@bp.route("/interfaces/<int:interface_id>", methods=["PATCH"]) +@login_groups_accepted("admin", "network") +def patch_interface(interface_id): + r"""Patch an existing interface + + .. :quickref: Network; Update an existing interface + + :param interface_id: interface primary key + :jsonparam ip: interface IP + :jsonparam name: interface name + :jsonparam network: network name + :jsonparam mac: MAC address + """ + interface = models.Interface.query.get_or_404(interface_id) + # User shall have access to both the current and new network (if provided) + if not current_user.has_access_to_network(interface.network): + raise CSEntryError("User doesn't have the required group", status_code=403) + data = utils.get_json_body() + if "network" in data: + new_network = models.Network.query.filter_by( + vlan_name=data["network"] + ).first_or_404() + if not current_user.has_access_to_network(new_network): + raise CSEntryError("User doesn't have the required group", status_code=403) + # Ensure that the IP is in the network (current one or new one if passed) + # This is required because validate_interfaces is only called when a new network + # is assigned. So the check is needed even when new_network == interface.network + # (same network given in argument) + if "ip" in data: + try: + if "network" in data: + validate_ip(data["ip"], new_network) + else: + validate_ip(data["ip"], interface.network) + except ValidationError as e: + raise CSEntryError(str(e), status_code=422) + # The method doesn't allow to update the interface host + # I don't think it makes much sense (and an interface shall start by the host name) + # To add a new cname, use create_cname + allowed_fields = ( + ("ip", str, None), + ("name", str, None), + ("network", models.Network, "vlan_name"), + ("mac", str, None), + ) + return utils.update_generic_model(models.Interface, interface_id, allowed_fields) + + @bp.route("/interfaces/<int:interface_id>", methods=["DELETE"]) @login_groups_accepted("admin") def delete_interface(interface_id): @@ -235,6 +284,38 @@ def create_host(): ) +@bp.route("/hosts/<int:host_id>", methods=["PATCH"]) +@login_groups_accepted("admin", "network") +def patch_host(host_id): + r"""Patch an existing host + + .. :quickref: Network; Update an existing host + + :param host_id: host primary key + :jsonparam device_type: Physical|Virtual|... + :jsonparam is_ioc: True|False + :jsonparam description: description + :jsonparam items: list of items ICS id linked to the host + :jsonparam ansible_vars: Ansible variables + :jsonparam ansible_groups: list of Ansible groups names + """ + host = models.Host.query.get_or_404(host_id) + if not current_user.has_access_to_network(host.main_network): + raise CSEntryError("User doesn't have the required group", status_code=403) + # The method currently doesn't allow to update the host name + # If we do, we have to update all the linked interface name as well! + # Interfaces shall always start by the host name + allowed_fields = ( + ("device_type", models.DeviceType, "name"), + ("is_ioc", bool, None), + ("description", str, None), + ("items", [models.Item], "ics_id"), + ("ansible_vars", dict, None), + ("ansible_groups", [models.AnsibleGroup], "name"), + ) + return utils.update_generic_model(models.Host, host_id, allowed_fields) + + @bp.route("/hosts/<int:host_id>", methods=["DELETE"]) @login_groups_accepted("admin") def delete_host(host_id): diff --git a/app/api/utils.py b/app/api/utils.py index 9a2269c..15f389b 100644 --- a/app/api/utils.py +++ b/app/api/utils.py @@ -20,7 +20,7 @@ from .. import utils def commit(): try: db.session.commit() - except (sa.exc.IntegrityError, sa.exc.DataError) as e: + except sa.exc.SQLAlchemyError as e: db.session.rollback() raise utils.CSEntryError(str(e), status_code=422) @@ -110,11 +110,29 @@ def search_generic_model(model): return jsonify(data), 200, header -def create_generic_model(model, mandatory_fields=("name",), **kwargs): +def get_json_body(): + """Return the json body from the current request + + Raise a CSEntryError if the body is not a json object + """ data = request.get_json() if data is None: raise utils.CSEntryError("Body should be a JSON object") current_app.logger.debug(f"Received: {data}") + if not data: + raise utils.CSEntryError("At least one field is required", status_code=422) + return data + + +def create_generic_model(model, mandatory_fields=("name",), **kwargs): + """Create an instance of the model + + :param model: model class + :param mandatory_fields: list of fields that shall be passed in the body + :param kwargs: extra fields to use + :returns: representation of the created instance as json + """ + data = get_json_body() data.update(kwargs) for mandatory_field in mandatory_fields: if mandatory_field not in data: @@ -143,3 +161,32 @@ def delete_generic_model(model, primary_key): db.session.delete(instance) db.session.commit() return jsonify(), 204 + + +def update_generic_model(model, primary_key, allowed_fields): + """Update the model based on the primary_key + + :param model: model class + :param primary_key: primary key of the instance to update + :param allowed_fields: list of fields that can be updated + :returns: representation of the updated model as json + """ + data = get_json_body() + # Use a list and not a set because the order is important + allowed_keys = [field[0] for field in allowed_fields] + for key in data: + if key not in allowed_keys: + raise utils.CSEntryError(f"Invalid field '{key}'", status_code=422) + instance = model.query.get_or_404(primary_key) + data = utils.convert_to_models(data, allowed_fields) + try: + # Loop on allowed_keys and not data to respect + # the order in which the fields are set + # setting ip before network is important for Interface + for key in allowed_keys: + if key in data: + setattr(instance, key, data[key]) + except Exception as e: + raise utils.CSEntryError(str(e), status_code=422) + commit() + return jsonify(instance.to_dict()), 200 diff --git a/app/models.py b/app/models.py index 8408cc6..87be9fc 100644 --- a/app/models.py +++ b/app/models.py @@ -901,30 +901,16 @@ class Network(CreatedMixin, db.Model): """Return the list of IP addresses available""" return [addr for addr in self.ip_range() if addr not in self.used_ips()] - @staticmethod - def ip_in_network(ip, address): - """Ensure the IP is in the network - - :param str user_id: unicode ID of a user - :returns: a tuple with the IP and network as (IPv4Address, IPv4Network) - :raises: ValidationError if the IP is not in the network - """ - addr = ipaddress.ip_address(ip) - net = ipaddress.ip_network(address) - if addr not in net: - raise ValidationError(f"IP address {ip} is not in network {address}") - return (addr, net) - @validates("first_ip") def validate_first_ip(self, key, ip): """Ensure the first IP is in the network""" - self.ip_in_network(ip, self.address) + utils.ip_in_network(ip, self.address) return ip @validates("last_ip") def validate_last_ip(self, key, ip): """Ensure the last IP is in the network and greater than first_ip""" - addr, net = self.ip_in_network(ip, self.address) + addr, net = utils.ip_in_network(ip, self.address) if addr < self.first: raise ValidationError( f"Last IP address {ip} is less than the first address {self.first}" @@ -934,19 +920,7 @@ class Network(CreatedMixin, db.Model): @validates("interfaces") def validate_interfaces(self, key, interface): """Ensure the interface IP is in the network range""" - addr, net = self.ip_in_network(interface.ip, self.address) - # Admin user can create IP outside the defined range - try: - # current_user is a local proxy and is not - # valid outside of a request context. - is_admin = current_user.is_admin - except AttributeError: - is_admin = False - if not is_admin: - if addr < self.first or addr > self.last: - raise ValidationError( - f"IP address {interface.ip} is not in range {self.first} - {self.last}" - ) + utils.validate_ip(interface.ip, self) return interface @validates("vlan_name") @@ -1228,7 +1202,7 @@ class Host(CreatedMixin, SearchableMixin, db.Model): # Automatically convert items to a list of instances if passed as a list of ics_id if "items" in kwargs: kwargs["items"] = [ - utils.convert_to_model(item, Item, filter="ics_id") + utils.convert_to_model(item, Item, filter_by="ics_id") for item in kwargs["items"] ] # Automatically convert ansible groups to a list of instances if passed as a list of strings diff --git a/app/network/forms.py b/app/network/forms.py index 30743fe..d1d7a1e 100644 --- a/app/network/forms.py +++ b/app/network/forms.py @@ -9,7 +9,6 @@ This module defines the network blueprint forms. :license: BSD 2-Clause, see LICENSE for more details. """ -import ipaddress from flask import current_app from flask_login import current_user from wtforms import ( @@ -62,17 +61,7 @@ def ip_in_network(form, field): "Can't validate the IP. No network was selected." ) network = models.Network.query.get(network_id_field.data) - ip = ipaddress.ip_address(field.data) - if ip not in network.network_ip: - raise validators.ValidationError( - f"IP address {ip} is not in network {network.address}" - ) - # Admin user can create IP outside the defined range - if current_user.is_authenticated and not current_user.is_admin: - if ip < network.first or ip > network.last: - raise validators.ValidationError( - f"IP address {ip} is not in range {network.first} - {network.last}" - ) + utils.validate_ip(field.data, network) class DomainForm(CSEntryForm): diff --git a/app/utils.py b/app/utils.py index a8a44a4..b053226 100644 --- a/app/utils.py +++ b/app/utils.py @@ -11,6 +11,7 @@ This module implements utility functions. """ import base64 import datetime +import ipaddress import io import random import sqlalchemy as sa @@ -20,6 +21,7 @@ from pathlib import Path from flask import current_app, jsonify, url_for from flask.globals import _app_ctx_stack, _request_ctx_stack from flask_login import current_user +from wtforms import ValidationError from .extensions import db @@ -79,7 +81,7 @@ def format_field(field): return str(field) -def convert_to_model(item, model, filter="name"): +def convert_to_model(item, model, filter_by="name"): """Convert item to an instance of model Allow to convert a string to an instance of model @@ -87,15 +89,37 @@ def convert_to_model(item, model, filter="name"): :returns: an instance of model """ - if item is None: - return None - if not isinstance(item, model): - kwarg = {filter: item} - instance = model.query.filter_by(**kwarg).first() - if instance is None: - raise CSEntryError(f"{item} is not a valid {model.__name__.lower()}") - return instance - return item + if item is None or isinstance(item, model): + return item + kwarg = {filter_by: item} + instance = model.query.filter_by(**kwarg).first() + if instance is None: + raise CSEntryError(f"{item} is not a valid {model.__name__.lower()}") + return instance + + +def convert_to_models(d, fields): + """Convert the values of the dictionary to the given type + + :param d: dictionary with the values to update + :param fields: list of tuple (key, type_, filter_by) + :returns: new updated dictionary + """ + new = d.copy() + for key, type_, filter_by in fields: + if filter_by is None or key not in d: + # This is not an instance of db.Model but a standard type + continue + if isinstance(type_, list): + values = new[key] + if not isinstance(values, list): + values = [values] + new[key] = [ + convert_to_model(value, type_[0], filter_by) for value in values + ] + else: + new[key] = convert_to_model(new[key], type_, filter_by) + return new def attribute_to_string(value): @@ -493,3 +517,33 @@ def update_ansible_vars(host, vars): else: host.ansible_vars = vars return True + + +def ip_in_network(ip, address): + """Ensure the IP is in the network + + :returns: a tuple with the IP and network as (IPv4Address, IPv4Network) + :raises: ValidationError if the IP is not in the network + """ + addr = ipaddress.ip_address(ip) + net = ipaddress.ip_network(address) + if addr not in net: + raise ValidationError(f"IP address {ip} is not in network {address}") + return (addr, net) + + +def validate_ip(ip, network): + """Ensure the IP is in the network range""" + addr, net = ip_in_network(ip, network.address) + # Admin user can create IP outside the defined range + try: + # current_user is a local proxy and is not + # valid outside of a request context. + is_admin = current_user.is_admin + except AttributeError: + is_admin = False + if not is_admin: + if addr < network.first or addr > network.last: + raise ValidationError( + f"IP address {ip} is not in range {network.first} - {network.last}" + ) diff --git a/tests/functional/test_api.py b/tests/functional/test_api.py index 41cc3b0..c5988c9 100644 --- a/tests/functional/test_api.py +++ b/tests/functional/test_api.py @@ -235,7 +235,7 @@ def test_create_model_auth_fail(endpoint, client, readonly_token): @pytest.mark.parametrize("endpoint", GENERIC_CREATE_ENDPOINTS) def test_create_generic_model(endpoint, client, user_token): response = post(client, f"{API_URL}/{endpoint}", data={}, token=user_token) - check_response_message(response, "Missing mandatory field 'name'", 422) + check_response_message(response, "At least one field is required", 422) data = {"name": "Foo"} response = post(client, f"{API_URL}/{endpoint}", data=data, token=user_token) assert response.status_code == 201 @@ -290,7 +290,7 @@ def test_create_generic_model_invalid_param(endpoint, client, user_token): def test_create_item(client, user_token): # check that serial_number is mandatory response = post(client, f"{API_URL}/inventory/items", data={}, token=user_token) - check_response_message(response, "Missing mandatory field 'serial_number'", 422) + check_response_message(response, "At least one field is required", 422) # check create with only serial_number data = {"serial_number": "123456"} @@ -650,7 +650,7 @@ def test_create_network(client, admin_token, network_scope_factory): scope = network_scope_factory(supernet="172.16.0.0/16") # check that vlan_name, vlan_id, address, first_ip, last_ip, gateway and scope are mandatory response = post(client, f"{API_URL}/network/networks", data={}, token=admin_token) - check_response_message(response, "Missing mandatory field 'vlan_name'", 422) + check_response_message(response, "At least one field is required", 422) response = post( client, f"{API_URL}/network/networks", @@ -1041,7 +1041,7 @@ def test_create_interface_fails(client, host, network_factory, no_login_check_to response = post( client, f"{API_URL}/network/interfaces", data={}, token=no_login_check_token ) - check_response_message(response, "Missing mandatory field 'network'", 422) + check_response_message(response, "At least one field is required", 422) response = post( client, f"{API_URL}/network/interfaces", @@ -1250,7 +1250,7 @@ def test_create_mac(client, item_factory, user_token): item = item_factory() # check that address is mandatory response = post(client, f"{API_URL}/inventory/macs", data={}, token=user_token) - check_response_message(response, "Missing mandatory field 'address'", 422) + check_response_message(response, "At least one field is required", 422) data = {"address": "b5:4b:7d:a4:23:43"} response = post(client, f"{API_URL}/inventory/macs", data=data, token=user_token) @@ -1297,7 +1297,7 @@ def test_get_ansible_groups(client, ansible_group_factory, readonly_token): def test_create_ansible_group(client, admin_token): # check that name is mandatory response = post(client, f"{API_URL}/network/groups", data={}, token=admin_token) - check_response_message(response, "Missing mandatory field 'name'", 422) + check_response_message(response, "At least one field is required", 422) data = {"name": "mygroup"} response = post(client, f"{API_URL}/network/groups", data=data, token=admin_token) @@ -1459,7 +1459,7 @@ def test_create_host(client, device_type_factory, user_token): device_type = device_type_factory(name="Virtual") # check that name and device_type are mandatory response = post(client, f"{API_URL}/network/hosts", data={}, token=user_token) - check_response_message(response, "Missing mandatory field 'name'", 422) + check_response_message(response, "At least one field is required", 422) response = post( client, f"{API_URL}/network/hosts", data={"name": "myhost"}, token=user_token ) @@ -1622,7 +1622,7 @@ def test_get_domains(client, domain_factory, readonly_token): def test_create_domain(client, admin_token): # check that name is mandatory response = post(client, f"{API_URL}/network/domains", data={}, token=admin_token) - check_response_message(response, "Missing mandatory field 'name'", 422) + check_response_message(response, "At least one field is required", 422) data = {"name": "tn.esss.lu.se"} response = post(client, f"{API_URL}/network/domains", data=data, token=admin_token) @@ -1702,7 +1702,7 @@ def test_get_cnames_by_domain( def test_create_cname(client, interface, admin_token): # check that name and interface_id are mandatory response = post(client, f"{API_URL}/network/cnames", data={}, token=admin_token) - check_response_message(response, "Missing mandatory field 'name'", 422) + check_response_message(response, "At least one field is required", 422) response = post( client, f"{API_URL}/network/cnames", data={"name": "myhost"}, token=admin_token ) @@ -1822,3 +1822,439 @@ def test_pagination(endpoint, client, host_factory, readonly_token): f'{API_URL}/{endpoint}?per_page=25&page=2{extra_args}>; rel="last"' in response.headers["link"] ) + + +def test_patch_host_no_data(client, host_factory, admin_token): + host = host_factory() + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data={}, token=admin_token + ) + check_response_message(response, "At least one field is required", 422) + + +@pytest.mark.parametrize("field,value", [("foo", "xxxx"), ("name", "myhost")]) +def test_patch_host_invalid_fields(client, host_factory, admin_token, field, value): + host = host_factory() + response = patch( + client, + f"{API_URL}/network/hosts/{host.id}", + data={field: value}, + token=admin_token, + ) + check_response_message(response, f"Invalid field '{field}'", 422) + + +@pytest.mark.parametrize( + "field,value", + [ + ("description", "This is a test"), + ("ansible_vars", {"myvar": "hello", "another": "world"}), + ("is_ioc", False), + ("is_ioc", True), + ], +) +def test_patch_host(client, host_factory, admin_token, field, value): + # Create a host + host = host_factory() + data = {field: value} + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data=data, token=admin_token + ) + assert response.status_code == 200 + assert response.get_json()[field] == value + updated_host = models.Host.query.get(host.id) + assert getattr(updated_host, field) == value + + +def test_patch_host_device_type(client, host_factory, device_type_factory, admin_token): + host = host_factory() + device_type = device_type_factory(name="MyDevice") + data = {"device_type": device_type.name} + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data=data, token=admin_token + ) + assert response.status_code == 200 + assert response.get_json()["device_type"] == device_type.name + updated_host = models.Host.query.get(host.id) + assert updated_host.device_type == device_type + + +def test_patch_host_invalid_device_type(client, host_factory, admin_token): + host = host_factory() + data = {"device_type": "foo"} + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data=data, token=admin_token + ) + check_response_message(response, f"foo is not a valid devicetype", 400) + + +@pytest.mark.parametrize("groups", (["group1"], ["group1", "group2"])) +def test_patch_host_ansible_groups( + client, host_factory, ansible_group_factory, admin_token, groups +): + host = host_factory() + for group_name in groups: + ansible_group_factory(name=group_name) + data = {"ansible_groups": groups} + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data=data, token=admin_token + ) + assert response.status_code == 200 + assert response.get_json()["ansible_groups"] == groups + updated_host = models.Host.query.get(host.id) + for group_name in groups: + group = models.AnsibleGroup.query.filter_by(name=group_name).first() + assert group.hosts == [updated_host] + + +def test_patch_host_single_ansible_group( + client, host_factory, ansible_group_factory, admin_token +): + host = host_factory() + group = ansible_group_factory(name="my_group") + data = {"ansible_groups": group.name} + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data=data, token=admin_token + ) + assert response.status_code == 200 + assert response.get_json()["ansible_groups"] == [group.name] + updated_host = models.Host.query.get(host.id) + assert updated_host.ansible_groups == [group] + + +def test_patch_host_invalid_ansible_group(client, host_factory, admin_token): + host = host_factory() + data = {"ansible_groups": "unknown_group"} + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data=data, token=admin_token + ) + check_response_message(response, f"unknown_group is not a valid ansiblegroup", 400) + + +@pytest.mark.parametrize("items", (["AAA001"], ["AAB001", "AAB002"])) +def test_patch_host_items(client, host_factory, item_factory, admin_token, items): + host = host_factory() + for ics_id in items: + item_factory(ics_id=ics_id) + data = {"items": items} + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data=data, token=admin_token + ) + assert response.status_code == 200 + assert response.get_json()["items"] == items + updated_host = models.Host.query.get(host.id) + for ics_id in items: + item = models.Item.query.filter_by(ics_id=ics_id).first() + assert item.host == updated_host + + +def test_patch_host_single_item(client, host_factory, item_factory, admin_token): + host = host_factory() + item = item_factory(ics_id="BBB001") + data = {"items": item.ics_id} + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data=data, token=admin_token + ) + assert response.status_code == 200 + assert response.get_json()["items"] == [item.ics_id] + updated_host = models.Host.query.get(host.id) + assert updated_host.items == [item] + + +def test_patch_host_invalid_item(client, host_factory, admin_token): + host = host_factory() + data = {"items": "ABC002"} + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data=data, token=admin_token + ) + check_response_message(response, f"ABC002 is not a valid item", 400) + + +def test_patch_host_network_permission( + client, + network_scope_factory, + network_factory, + host_factory, + interface_factory, + user_token, +): + scope = network_scope_factory(name="FooNetworks") + network = network_factory( + address="192.168.1.0/24", + first_ip="192.168.1.10", + last_ip="192.168.1.250", + scope=scope, + ) + host = host_factory() + interface_factory(ip="192.168.1.11", host=host, network=network) + data = {"description": "Hello world"} + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data=data, token=user_token + ) + assert response.status_code == 200 + + +def test_patch_host_invalid_network_permission( + client, + network_scope_factory, + network_factory, + host_factory, + interface_factory, + user_token, +): + scope = network_scope_factory(name="ProdNetworks") + network = network_factory( + address="192.168.1.0/24", + first_ip="192.168.1.10", + last_ip="192.168.1.250", + scope=scope, + ) + host = host_factory() + interface_factory(ip="192.168.1.11", host=host, network=network) + data = {"description": "Hello world"} + response = patch( + client, f"{API_URL}/network/hosts/{host.id}", data=data, token=user_token + ) + check_response_message(response, "User doesn't have the required group", 403) + + +def test_patch_interface_no_data(client, interface_factory, admin_token): + interface = interface_factory() + response = patch( + client, + f"{API_URL}/network/interfaces/{interface.id}", + data={}, + token=admin_token, + ) + check_response_message(response, "At least one field is required", 422) + + +@pytest.mark.parametrize( + "field,value", [("foo", "xxxx"), ("host", "myhost"), ("cnames", "alias")] +) +def test_patch_interface_invalid_fields( + client, interface_factory, admin_token, field, value +): + interface = interface_factory() + response = patch( + client, + f"{API_URL}/network/interfaces/{interface.id}", + data={field: value}, + token=admin_token, + ) + check_response_message(response, f"Invalid field '{field}'", 422) + + +def test_patch_interface_mac(client, interface_factory, admin_token): + interface = interface_factory() + data = {"mac": "02:42:42:b2:01:c6"} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface.id}", + data=data, + token=admin_token, + ) + assert response.status_code == 200 + assert response.get_json()["mac"] == data["mac"] + updated_interface = models.Interface.query.get(interface.id) + assert updated_interface.mac == data["mac"] + + +def test_patch_interface_ip(client, interface_factory, network_factory, admin_token): + network = network_factory( + address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" + ) + interface = interface_factory(network=network, ip="192.168.1.11") + data = {"ip": "192.168.2.12"} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface.id}", + data=data, + token=admin_token, + ) + check_response_message( + response, f"IP address {data['ip']} is not in network {network.address}", 422 + ) + data = {"ip": "192.168.1.12"} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface.id}", + data=data, + token=admin_token, + ) + assert response.status_code == 200 + assert response.get_json()["ip"] == data["ip"] + updated_interface = models.Interface.query.get(interface.id) + assert updated_interface.ip == data["ip"] + + +def test_patch_interface_name(client, host_factory, interface_factory, admin_token): + host = host_factory(name="myhost") + interface = interface_factory(host=host) + data = {"name": "foo"} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface.id}", + data=data, + token=admin_token, + ) + check_response_message( + response, f"Interface name shall start with the host name '{host.name}'", 422 + ) + data = {"name": host.name + "-2"} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface.id}", + data=data, + token=admin_token, + ) + assert response.status_code == 200 + assert response.get_json()["name"] == data["name"] + updated_interface = models.Interface.query.get(interface.id) + assert updated_interface.name == data["name"] + + +def test_patch_interface_network( + client, network_factory, interface_factory, admin_token +): + network1 = network_factory( + vlan_name="mynetwork", + address="192.168.1.0/24", + first_ip="192.168.1.10", + last_ip="192.168.1.250", + ) + network2 = network_factory( + vlan_name="new-network", + address="192.168.2.0/24", + first_ip="192.168.2.10", + last_ip="192.168.2.250", + ) + interface = interface_factory(network=network1, ip="192.168.1.20") + data = {"network": "unknown"} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface.id}", + data=data, + token=admin_token, + ) + check_response_message(response, "Resource not found", 404) + data = {"network": network2.vlan_name} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface.id}", + data=data, + token=admin_token, + ) + check_response_message( + response, f"IP address {interface.ip} is not in network {network2.address}", 422 + ) + data = {"network": network2.vlan_name, "ip": "192.168.5.10"} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface.id}", + data=data, + token=admin_token, + ) + check_response_message( + response, f"IP address {data['ip']} is not in network {network2.address}", 422 + ) + data = {"network": network2.vlan_name, "ip": "192.168.2.10"} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface.id}", + data=data, + token=admin_token, + ) + assert response.status_code == 200 + assert response.get_json()["network"] == data["network"] + assert response.get_json()["ip"] == data["ip"] + updated_interface = models.Interface.query.get(interface.id) + assert updated_interface.network == network2 + + +def test_patch_interface_current_network_permission( + client, network_scope_factory, network_factory, interface_factory, user_token +): + scope_prod = network_scope_factory(name="ProdNetworks") + scope_foo = network_scope_factory(name="FooNetworks") + network_prod = network_factory( + vlan_name="prod-network", + address="192.168.1.0/24", + first_ip="192.168.1.10", + last_ip="192.168.1.250", + scope=scope_prod, + ) + network_foo = network_factory( + vlan_name="foo-network", + address="192.168.2.0/24", + first_ip="192.168.2.10", + last_ip="192.168.2.250", + scope=scope_foo, + ) + # User can't update an interface part of the ProdNetworks + interface_prod = interface_factory(network=network_prod, ip="192.168.1.20") + data = {"ip": "192.168.1.21"} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface_prod.id}", + data=data, + token=user_token, + ) + check_response_message(response, "User doesn't have the required group", 403) + # but can on the FooNetworks + interface_foo = interface_factory(network=network_foo, ip="192.168.2.20") + data = {"ip": "192.168.2.21"} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface_foo.id}", + data=data, + token=user_token, + ) + assert response.status_code == 200 + + +def test_patch_interface_new_network_permission( + client, network_scope_factory, network_factory, interface_factory, user_token +): + scope_prod = network_scope_factory(name="ProdNetworks") + scope_foo = network_scope_factory(name="FooNetworks") + network_prod = network_factory( + vlan_name="prod-network", + address="192.168.1.0/24", + first_ip="192.168.1.10", + last_ip="192.168.1.250", + scope=scope_prod, + ) + network_foo1 = network_factory( + vlan_name="foo-network1", + address="192.168.2.0/24", + first_ip="192.168.2.10", + last_ip="192.168.2.250", + scope=scope_foo, + ) + network_foo2 = network_factory( + vlan_name="foo-network2", + address="192.168.3.0/24", + first_ip="192.168.3.10", + last_ip="192.168.3.250", + scope=scope_foo, + ) + interface_foo = interface_factory(network=network_foo1, ip="192.168.2.20") + # User can't change the network to the ProdNetworks + data = {"network": network_prod.vlan_name} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface_foo.id}", + data=data, + token=user_token, + ) + # but can on the same scope it has access to + check_response_message(response, "User doesn't have the required group", 403) + data = {"network": network_foo2.vlan_name, "ip": "192.168.3.10"} + response = patch( + client, + f"{API_URL}/network/interfaces/{interface_foo.id}", + data=data, + token=user_token, + ) + assert response.status_code == 200 -- GitLab