From 17456b5c45e8a6b4d1c31214c79a59693d70f243 Mon Sep 17 00:00:00 2001 From: Benjamin Bertrand <benjamin.bertrand@esss.se> Date: Mon, 26 Nov 2018 17:38:00 +0100 Subject: [PATCH] Add network permissions per domain JIRA INFRA-578 #action In Progress --- app/api/network.py | 15 ++++++- app/decorators.py | 4 ++ app/models.py | 77 ++++++++++++++++++++++++++------- app/network/forms.py | 13 +++--- app/network/views.py | 26 +++++++++-- app/settings.py | 10 ++++- app/templates/user/profile.html | 2 + tests/functional/conftest.py | 14 +++++- tests/functional/test_api.py | 50 ++++++++++++++------- tests/functional/test_models.py | 33 ++++++++++++++ tests/functional/test_web.py | 34 ++++++++++++--- 11 files changed, 229 insertions(+), 49 deletions(-) diff --git a/app/api/network.py b/app/api/network.py index 93b2dcb..706cbce 100644 --- a/app/api/network.py +++ b/app/api/network.py @@ -10,9 +10,10 @@ This module implements the network API. """ from flask import Blueprint, request -from flask_login import login_required +from flask_login import login_required, current_user from .. import models from ..decorators import login_groups_accepted +from ..utils import CSEntryError from . import utils bp = Blueprint("network_api", __name__) @@ -136,6 +137,18 @@ def create_interface(): # setting interface.network. This is why we don't pass network_id here # but network (as vlan_name string) # Same for host + data = request.get_json() + # Check that the user has the permissions to create an interface on this network + try: + network = models.Network.query.filter_by( + vlan_name=data["network"] + ).first_or_404() + except Exception: + # If we can't get a network, an error will be raised in create_generic_model + pass + else: + if not current_user.has_access_to_network(network): + raise CSEntryError("User doesn't have the required group", status_code=403) return utils.create_generic_model( models.Interface, mandatory_fields=("network", "ip", "name", "host") ) diff --git a/app/decorators.py b/app/decorators.py index 6f0a716..66048e2 100644 --- a/app/decorators.py +++ b/app/decorators.py @@ -38,6 +38,10 @@ def login_groups_accepted(*groups): def wrapper(fn): @wraps(fn) def decorated_view(*args, **kwargs): + # LOGIN_DISABLED can be set to True to turn off authentication check when testing. + # This variable is also used by flask-login + if current_app.config.get("LOGIN_DISABLED"): + return fn(*args, **kwargs) if not current_user.is_authenticated: return current_app.login_manager.unauthorized() if not current_user.is_member_of_one_group(groups): diff --git a/app/models.py b/app/models.py index 491198b..89d11fe 100644 --- a/app/models.py +++ b/app/models.py @@ -213,26 +213,65 @@ class User(db.Model, UserMixin): @property def csentry_groups(self): - groups = [] - for key, values in current_app.config["CSENTRY_LDAP_GROUPS"].items(): - for value in values: - if value in self.groups: - groups.append(key) - return groups + """Return the list of CSEntry groups the user belong to + + Groups are assigned based on the CSENTRY_LDAP_GROUPS mapping with LDAP groups + """ + if not hasattr(self, "_csentry_groups"): + self._csentry_groups = [] + for csentry_group, ldap_groups in current_app.config[ + "CSENTRY_LDAP_GROUPS" + ].items(): + if set(self.groups) & set(ldap_groups): + self._csentry_groups.append(csentry_group) + # Add the network group based on CSENTRY_DOMAINS_LDAP_GROUPS + network_ldap_groups = set( + itertools.chain( + *current_app.config["CSENTRY_DOMAINS_LDAP_GROUPS"].values() + ) + ) + if set(self.groups) & network_ldap_groups: + self._csentry_groups.append("network") + return self._csentry_groups + + @property + def csentry_domains(self): + """Return the list of CSEntry domains the user has access to + + Domains are assigned based on the CSENTRY_DOMAINS_LDAP_GROUPS mapping with LDAP groups + """ + if not hasattr(self, "_csentry_domains"): + self._csentry_domains = [] + for domain, ldap_groups in current_app.config[ + "CSENTRY_DOMAINS_LDAP_GROUPS" + ].items(): + if set(self.groups) & set(ldap_groups): + self._csentry_domains.append(domain) + return self._csentry_domains @property def is_admin(self): - for group in current_app.config["CSENTRY_LDAP_GROUPS"]["admin"]: - if group in self.groups: - return True - return False + return "admin" in self.csentry_groups def is_member_of_one_group(self, groups): - """Return True if the user is at least member of one of the given groups""" - names = [] - for group in groups: - names.extend(current_app.config["CSENTRY_LDAP_GROUPS"].get(group)) - return bool(set(self.groups) & set(names)) + """Return True if the user is at least member of one of the given CSEntry groups""" + return bool(set(groups) & set(self.csentry_groups)) + + def has_access_to_network(self, network): + """Return True if the user has access to the network + + - admin users have access to all networks + - normal users must have access to the network domain + - normal users don't have access to admin_only networks (whatever the domain) + - LOGIN_DISABLED can be set to True to turn off authentication check when testing. + In this case, this function always returns True. + """ + if current_app.config.get("LOGIN_DISABLED") or self.is_admin: + return True + if network is None or network.admin_only: + # True is already returned for admin users + return False + return str(network.domain) in self.csentry_domains def favorite_attributes(self): """Return all user's favorite attributes""" @@ -1081,6 +1120,14 @@ class Host(CreatedMixin, SearchableMixin, db.Model): except IndexError: return None + @property + def main_network(self): + """Return the host main interface network""" + try: + return self.main_interface.network + except AttributeError: + return None + @property def fqdn(self): """Return the host fully qualified domain name diff --git a/app/network/forms.py b/app/network/forms.py index e218479..a8ea9de 100644 --- a/app/network/forms.py +++ b/app/network/forms.py @@ -209,19 +209,18 @@ class InterfaceForm(CSEntryForm): super().__init__(*args, **kwargs) self.host_id.choices = utils.get_model_choices(models.Host) if current_user.is_admin: - network_query = models.Network.query tags_query = models.Tag.query else: - network_query = models.Network.query.filter( - models.Network.admin_only.is_(False) - ) tags_query = models.Tag.query.filter(models.Tag.admin_only.is_(False)) - self.network_id.choices = utils.get_model_choices( - models.Network, allow_none=False, attr="vlan_name", query=network_query - ) self.tags.choices = utils.get_model_choices( models.Tag, attr="name", query=tags_query ) + # Only return the networks the user has access to + self.network_id.choices = [ + (str(network.id), network.vlan_name) + for network in models.Network.query.order_by(models.Network.vlan_name).all() + if current_user.has_access_to_network(network) + ] class HostInterfaceForm(HostForm, InterfaceForm): diff --git a/app/network/views.py b/app/network/views.py index 0b57c1e..cf617b2 100644 --- a/app/network/views.py +++ b/app/network/views.py @@ -21,6 +21,7 @@ from flask import ( request, flash, current_app, + abort, ) from flask_login import login_required, current_user from wtforms import ValidationError @@ -74,6 +75,9 @@ def create_host(): del form.interface_name if form.validate_on_submit(): network_id = form.network_id.data + network = models.Network.query.get(network_id) + if not current_user.has_access_to_network(network): + abort(403) ansible_groups = [ models.AnsibleGroup.query.get(id_) for id_ in form.ansible_groups.data ] @@ -95,7 +99,7 @@ def create_host(): name=form.name.data, ip=form.ip.data, mac=form.mac.data, - network=models.Network.query.get(network_id), + network=network, tags=tags, ) interface.cnames = [ @@ -194,6 +198,8 @@ def view_host(name): @login_groups_accepted("admin", "network") def edit_host(name): host = models.Host.query.filter_by(name=name).first_or_404() + if not current_user.has_access_to_network(host.main_network): + abort(403) form = HostForm(request.form, obj=host) # Passing ansible_groups as kwarg to the HostForm doesn't work because # obj takes precedence (but host.ansible_groups contain AnsibleGroup instances and not id) @@ -235,11 +241,18 @@ def edit_host(name): @login_groups_accepted("admin", "network") def create_interface(hostname): host = models.Host.query.filter_by(name=hostname).first_or_404() + # User shall have access to the host main interface domain + if not current_user.has_access_to_network(host.main_network): + abort(403) random_mac = host.device_type.name.startswith("Virtual") form = InterfaceForm( request.form, host_id=host.id, interface_name=host.name, random_mac=random_mac ) if form.validate_on_submit(): + # User shall have access to the new interface domain + network = (models.Network.query.get(form.network_id.data),) + if not current_user.has_access_to_network(network): + abort(403) # The total number of tags will always be quite small # It's more efficient to retrieve all of them in one query # and do the filtering here @@ -251,7 +264,7 @@ def create_interface(hostname): name=form.interface_name.data, ip=form.ip.data, mac=form.mac.data, - network=models.Network.query.get(form.network_id.data), + network=network, tags=tags, ) interface.cnames = [ @@ -284,6 +297,8 @@ def create_interface(hostname): @login_groups_accepted("admin", "network") def edit_interface(name): interface = models.Interface.query.filter_by(name=name).first_or_404() + if not current_user.has_access_to_network(interface.network): + abort(403) cnames_string = " ".join([str(cname) for cname in interface.cnames]) form = InterfaceForm( request.form, @@ -303,13 +318,16 @@ def edit_interface(name): form.tags.default = [tag.id for tag in interface.tags] form.tags.process(request.form) if form.validate_on_submit(): + network = models.Network.query.get(form.network_id.data) + if not current_user.has_access_to_network(network): + abort(403) try: interface.name = form.interface_name.data interface.ip = form.ip.data interface.mac = form.mac.data # Setting directly network_id doesn't update the relationship and bypass the checks # performed on the model - interface.network = models.Network.query.get(form.network_id.data) + interface.network = network # Delete the cnames that have been removed new_cnames_string = form.cnames_string.data.split() for (index, cname) in enumerate(interface.cnames): @@ -357,6 +375,8 @@ def edit_interface(name): @login_groups_accepted("admin", "network") def delete_interface(): interface = models.Interface.query.get_or_404(request.form["interface_id"]) + if not current_user.has_access_to_network(interface.network): + abort(403) hostname = interface.host.name # Explicitely remove the interface from the host to make sure # it will be re-indexed diff --git a/app/settings.py b/app/settings.py index 640fd8f..15becaf 100644 --- a/app/settings.py +++ b/app/settings.py @@ -60,10 +60,18 @@ LDAP_GROUP_MEMBERS_ATTR = "member" LDAP_GET_USER_ATTRIBUTES = ["cn", "sAMAccountName", "mail"] LDAP_GET_GROUP_ATTRIBUTES = ["cn"] +# Mapping between CSEntry groups and LDAP groups +# The generic "network" group is automatically added based +# on all CSENTRY_DOMAINS_LDAP_GROUPS CSENTRY_LDAP_GROUPS = { "admin": ["ICS Control System Infrastructure group"], "inventory": ["ICS Employees", "ICS Consultants"], - "network": ["ICS Employees", "ICS Consultants"], +} +# Domains the user has access to based on LDAP groups +CSENTRY_DOMAINS_LDAP_GROUPS = { + "esss.lu.se": ["ICS Control System Infrastructure group"], + "tn.esss.lu.se": ["ICS Employees", "ICS Consultants"], + "cslab.esss.lu.se": ["ICS Employees", "ICS Consultants"], } NETWORK_DEFAULT_PREFIX = 24 diff --git a/app/templates/user/profile.html b/app/templates/user/profile.html index f91800c..479c0ae 100644 --- a/app/templates/user/profile.html +++ b/app/templates/user/profile.html @@ -27,6 +27,8 @@ <dd>{{user.email}}</dd> <dt>CSEntry Groups</dt> <dd>{{ user.csentry_groups | join(', ') }}</dd> + <dt>CSEntry Domains</dt> + <dd>{{ user.csentry_domains | join(', ') }}</dd> </dl> <h3>Personal access tokens</h3> diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py index 35c5313..2d3d276 100644 --- a/tests/functional/conftest.py +++ b/tests/functional/conftest.py @@ -51,7 +51,11 @@ def app(request): "CSENTRY_LDAP_GROUPS": { "admin": ["CSEntry Admin"], "inventory": ["CSEntry User"], - "network": ["CSEntry User", "CSEntry Consultant"], + }, + "CSENTRY_DOMAINS_LDAP_GROUPS": { + "prod.example.org": ["CSEntry Prod"], + "lab.example.org": ["CSEntry Lab"], + "foo.example.org": ["CSEntry User", "CSEntry Consultant"], }, } app = create_app(config=config) @@ -156,6 +160,14 @@ def patch_ldap_authenticate(monkeypatch): response.status = AuthenticationResponseStatus.success response.user_info = {"cn": "User RO", "mail": "user_ro@example.com"} response.user_groups = [{"cn": "ESS Employees"}] + elif username == "user_prod" and password == "userprod": + response.status = AuthenticationResponseStatus.success + response.user_info = {"cn": "User Prod", "mail": "user_prod@example.com"} + response.user_groups = [{"cn": "CSEntry Prod"}] + elif username == "user_lab" and password == "userlab": + response.status = AuthenticationResponseStatus.success + response.user_info = {"cn": "User Lab", "mail": "user_lab@example.com"} + response.user_groups = [{"cn": "CSEntry Lab"}] else: response.status = AuthenticationResponseStatus.fail return response diff --git a/tests/functional/test_api.py b/tests/functional/test_api.py index c678b84..8c2b5ae 100644 --- a/tests/functional/test_api.py +++ b/tests/functional/test_api.py @@ -112,6 +112,16 @@ def admin_token(client): return get_token(client, "admin", "adminpasswd") +@pytest.fixture +def no_login_check_token(request, app): + app.config["LOGIN_DISABLED"] = True + client = app.test_client() + # We still need to login, otherwise an AnonymousUserMixin is returned + # An AnonymousUser doesn't have all the User methods + yield get_token(client, "user_ro", "userro") + app.config["LOGIN_DISABLED"] = False + + def check_response_message(response, msg, status_code=400): assert response.status_code == status_code try: @@ -963,44 +973,46 @@ def test_get_interfaces_with_model( assert response.get_json()[0]["model"] == "EX3400" -def test_create_interface_fails(client, host, network_factory, user_token): +def test_create_interface_fails(client, host, network_factory, no_login_check_token): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) # check that network_id and ip are mandatory - response = post(client, f"{API_URL}/network/interfaces", data={}, token=user_token) + response = post( + client, f"{API_URL}/network/interfaces", data={}, token=no_login_check_token + ) check_response_message(response, "Missing mandatory field 'network'", 422) response = post( client, f"{API_URL}/network/interfaces", data={"ip": "192.168.1.20"}, - token=user_token, + token=no_login_check_token, ) check_response_message(response, "Missing mandatory field 'network'", 422) response = post( client, f"{API_URL}/network/interfaces", data={"network": network.address}, - token=user_token, + token=no_login_check_token, ) check_response_message(response, "Missing mandatory field 'ip'", 422) data = {"network": network.vlan_name, "ip": "192.168.1.20", "name": "interface1"} response = post( - client, f"{API_URL}/network/interfaces", data=data, token=user_token + client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message(response, "Missing mandatory field 'host'", 422) data["host"] = host.name response = post( - client, f"{API_URL}/network/interfaces", data=data, token=user_token + client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message( response, f"Interface name shall start with the host name '{host.name}'", 422 ) -def test_create_interface(client, host, network_factory, user_token): +def test_create_interface(client, host, network_factory, no_login_check_token): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) @@ -1011,7 +1023,7 @@ def test_create_interface(client, host, network_factory, user_token): "host": host.name, } response = post( - client, f"{API_URL}/network/interfaces", data=data, token=user_token + client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) assert response.status_code == 201 assert { @@ -1043,7 +1055,7 @@ def test_create_interface(client, host, network_factory, user_token): "mac": "7c:e2:ca:64:d0:68", } response = post( - client, f"{API_URL}/network/interfaces", data=data2, token=user_token + client, f"{API_URL}/network/interfaces", data=data2, token=no_login_check_token ) assert response.status_code == 201 @@ -1052,7 +1064,7 @@ def test_create_interface(client, host, network_factory, user_token): # Check that IP and name shall be unique response = post( - client, f"{API_URL}/network/interfaces", data=data, token=user_token + client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message( response, @@ -1062,7 +1074,9 @@ def test_create_interface(client, host, network_factory, user_token): @pytest.mark.parametrize("ip", ("", "foo", "192.168")) -def test_create_interface_invalid_ip(ip, client, host, network_factory, user_token): +def test_create_interface_invalid_ip( + ip, client, host, network_factory, no_login_check_token +): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) @@ -1074,14 +1088,16 @@ def test_create_interface_invalid_ip(ip, client, host, network_factory, user_tok "host": host.name, } response = post( - client, f"{API_URL}/network/interfaces", data=data, token=user_token + client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message( response, f"'{ip}' does not appear to be an IPv4 or IPv6 address", 422 ) -def test_create_interface_ip_not_in_network(client, host, network_factory, user_token): +def test_create_interface_ip_not_in_network( + client, host, network_factory, no_login_check_token +): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) @@ -1093,14 +1109,16 @@ def test_create_interface_ip_not_in_network(client, host, network_factory, user_ "host": host.name, } response = post( - client, f"{API_URL}/network/interfaces", data=data, token=user_token + client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message( response, "IP address 192.168.2.4 is not in network 192.168.1.0/24", 422 ) -def test_create_interface_ip_not_in_range(client, host, network_factory, user_token): +def test_create_interface_ip_not_in_range( + client, host, network_factory, no_login_check_token +): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) @@ -1112,7 +1130,7 @@ def test_create_interface_ip_not_in_range(client, host, network_factory, user_to "host": host.name, } response = post( - client, f"{API_URL}/network/interfaces", data=data, token=user_token + client, f"{API_URL}/network/interfaces", data=data, token=no_login_check_token ) check_response_message( response, diff --git a/tests/functional/test_models.py b/tests/functional/test_models.py index 02e69e7..0f2186c 100644 --- a/tests/functional/test_models.py +++ b/tests/functional/test_models.py @@ -44,6 +44,39 @@ def test_user_is_member_of_one_group(user_factory): assert user.is_member_of_one_group(["admin"]) +def test_user_domains(user_factory): + user = user_factory(groups=["CSEntry Prod", "CSEntry User"]) + assert user.csentry_domains == ["prod.example.org", "foo.example.org"] + user = user_factory(groups=["foo", "CSEntry Lab"]) + assert user.csentry_domains == ["lab.example.org"] + + +def test_user_has_access_to_network(user_factory, domain_factory, network_factory): + domain_prod = domain_factory(name="prod.example.org") + domain_lab = domain_factory(name="lab.example.org") + network_prod = network_factory(domain=domain_prod) + network_lab = network_factory(domain=domain_lab) + network_lab_admin = network_factory(domain=domain_lab, admin_only=True) + user = user_factory(groups=["CSEntry Prod", "CSEntry Lab"]) + assert user.has_access_to_network(network_prod) + assert user.has_access_to_network(network_lab) + assert not user.has_access_to_network(network_lab_admin) + assert not user.has_access_to_network(None) + user = user_factory(groups=["foo", "CSEntry Lab"]) + assert not user.has_access_to_network(network_prod) + assert user.has_access_to_network(network_lab) + assert not user.has_access_to_network(network_lab_admin) + user = user_factory(groups=["one", "two"]) + assert not user.has_access_to_network(network_prod) + assert not user.has_access_to_network(network_lab) + assert not user.has_access_to_network(network_lab_admin) + user = user_factory(groups=["CSEntry Admin"]) + assert user.has_access_to_network(network_prod) + assert user.has_access_to_network(network_lab) + assert user.has_access_to_network(network_lab_admin) + assert user.has_access_to_network(None) + + def test_network_ip_properties(network_factory): # Create some networks network1 = network_factory( diff --git a/tests/functional/test_web.py b/tests/functional/test_web.py index 8d4267f..2c39473 100644 --- a/tests/functional/test_web.py +++ b/tests/functional/test_web.py @@ -35,6 +35,17 @@ def logged_rw_client(client): return client +@pytest.fixture +def no_login_check_client(request, app): + app.config["LOGIN_DISABLED"] = True + client = app.test_client() + # We still need to login, otherwise an AnonymousUserMixin is returned + # An AnonymousUser doesn't have all the User methods + login(client, "user_ro", "userro") + yield client + app.config["LOGIN_DISABLED"] = False + + def test_login_logout(client): response = login(client, "unknown", "invalid") assert b"<title>Login - CSEntry</title>" in response.data @@ -201,7 +212,9 @@ def test_retrieve_hosts_by_ip(logged_client, interface_factory): assert r["data"][0]["name"] == interface1.host.name -def test_delete_interface_from_index(logged_rw_client, interface_factory, host_factory): +def test_delete_interface_from_index( + no_login_check_client, interface_factory, host_factory +): host1 = host_factory(name="host1") interface_factory(name="host1", host=host1) interface2 = interface_factory(name="host1b", host=host1) @@ -210,7 +223,7 @@ def test_delete_interface_from_index(logged_rw_client, interface_factory, host_f assert list(instances) == [host1] assert nb == 1 # Delete the interface - response = logged_rw_client.post( + response = no_login_check_client.post( "/network/interfaces/delete", data={"interface_id": interface2.id} ) assert response.status_code == 302 @@ -246,9 +259,13 @@ def test_edit_item_comment_in_index( assert list(instances) == [item1] -def test_create_host(logged_rw_client, network_factory, device_type): +def test_create_host(client, domain_factory, network_factory, device_type): + domain = domain_factory(name="prod.example.org") network = network_factory( - address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" + address="192.168.1.0/24", + first_ip="192.168.1.10", + last_ip="192.168.1.250", + domain=domain, ) name = "myhost" ip = "192.168.1.11" @@ -266,7 +283,14 @@ def test_create_host(logged_rw_client, network_factory, device_type): "random_mac": False, "cnames_string": "", } - response = logged_rw_client.post(f"/network/hosts/create", data=form) + # Permission denied with user_lab user + login(client, "user_lab", "userlab") + response = client.post(f"/network/hosts/create", data=form) + assert response.status_code == 403 + logout(client) + # Success with user_prod user + login(client, "user_prod", "userprod") + response = client.post(f"/network/hosts/create", data=form) assert response.status_code == 302 # The host was created host = models.Host.query.filter_by(name=name).first() -- GitLab