From 39faa80b8c696f4550bfbcf7c35e823c71b28a73 Mon Sep 17 00:00:00 2001 From: Benjamin Bertrand <benjamin.bertrand@ess.eu> Date: Wed, 21 Oct 2020 17:01:25 +0200 Subject: [PATCH] Add auditor group Give read-only access to all resources. No write access. JIRA INFRA-2728 #action In Progress --- app/api/network.py | 10 ++--- app/api/user.py | 3 +- app/models.py | 16 ++++--- app/network/views.py | 10 ++--- app/settings.py | 1 + tests/functional/conftest.py | 5 +++ tests/functional/test_api.py | 33 +++++++++++--- tests/functional/test_models.py | 10 +++++ tests/functional/test_web.py | 76 ++++++++++++++++++--------------- 9 files changed, 107 insertions(+), 57 deletions(-) diff --git a/app/api/network.py b/app/api/network.py index 10d601f..4aa81b2 100644 --- a/app/api/network.py +++ b/app/api/network.py @@ -21,7 +21,7 @@ bp = Blueprint("network_api", __name__) @bp.route("/scopes") -@login_groups_accepted("admin") +@login_groups_accepted("admin", "auditor") def get_scopes(): """Return network scopes @@ -91,14 +91,14 @@ def patch_scope(scope_id): @bp.route("/networks") -@login_groups_accepted("admin", "network") +@login_groups_accepted("admin", "auditor", "network") def get_networks(): """Return networks .. :quickref: Network; Get networks """ query = models.Network.query - if not current_user.is_admin: + if not (current_user.is_admin or current_user.is_auditor): sensitive_networks = ( query.filter(models.Network.sensitive.is_(True)) .join(models.Network.scope) @@ -206,7 +206,7 @@ def get_interfaces(): """ query = models.Interface.query query = query.join(models.Interface.network).order_by(models.Interface.ip) - if not current_user.is_admin: + if not (current_user.is_admin or current_user.is_auditor): sensitive_interfaces = ( query.filter(models.Network.sensitive.is_(True)) .join(models.Network.scope) @@ -369,7 +369,7 @@ def get_hosts(): .. :quickref: Network; Get hosts """ query = models.Host.query - if not current_user.is_admin: + if not (current_user.is_admin or current_user.is_auditor): # Note that hosts without interface will be filtered out # by this query. This is not an issue as hosts should always # have an interface. They are useless otherwise. diff --git a/app/api/user.py b/app/api/user.py index fe17353..4fe437d 100644 --- a/app/api/user.py +++ b/app/api/user.py @@ -13,6 +13,7 @@ from flask import current_app, Blueprint, jsonify, request from flask_ldap3_login import AuthenticationResponseStatus from flask_login import login_required, current_user from ..extensions import ldap_manager +from ..decorators import login_groups_accepted from .. import utils, tokens, models from .utils import get_generic_model @@ -20,7 +21,7 @@ bp = Blueprint("user_api", __name__) @bp.route("/users") -@login_required +@login_groups_accepted("admin", "auditor") def get_users(): """Return users information diff --git a/app/models.py b/app/models.py index 3b4005d..fffec88 100644 --- a/app/models.py +++ b/app/models.py @@ -252,6 +252,10 @@ class User(db.Model, UserMixin): def is_admin(self): return "admin" in self.csentry_groups + @property + def is_auditor(self): + return "auditor" in self.csentry_groups + def is_member_of_one_group(self, groups): """Return True if the user is at least member of one of the given CSEntry groups""" return bool(set(groups) & set(self.csentry_groups)) @@ -275,7 +279,7 @@ class User(db.Model, UserMixin): def can_view_network(self, network): """Return True if the user can view the network - - admin users can view all networks + - admin and auditor users can view all networks - non sensitive networks can be viewed by anyone - normal users must have access to the network scope to view sensitive networks - LOGIN_DISABLED can be set to True to turn off authentication check when testing. @@ -284,6 +288,7 @@ class User(db.Model, UserMixin): if ( current_app.config.get("LOGIN_DISABLED") or self.is_admin + or self.is_auditor or not network.sensitive ): return True @@ -303,7 +308,7 @@ class User(db.Model, UserMixin): def can_view_host(self, host): """Return True if the user can view the host - - admin users can view all hosts + - admin and auditor users can view all hosts - non sensitive hosts can be viewed by anyone - normal users must have access to the network scope to view sensitive hosts - LOGIN_DISABLED can be set to True to turn off authentication check when testing. @@ -312,6 +317,7 @@ class User(db.Model, UserMixin): if ( current_app.config.get("LOGIN_DISABLED") or self.is_admin + or self.is_auditor or not host.sensitive ): return True @@ -414,9 +420,9 @@ class User(db.Model, UserMixin): def get_tasks(self, all=False): """Return all tasks created by the current user - If the user is admin and all is set to True, will return all tasks + If the user is admin or auditor and all is set to True, will return all tasks """ - if all and self.is_admin: + if all and (self.is_admin or self.is_auditor): return Task.query.order_by(Task.created_at).all() return Task.query.filter_by(user=self).order_by(Task.created_at).all() @@ -482,7 +488,7 @@ class SearchableMixin(object): @classmethod def search(cls, query, page=1, per_page=20, sort=None, filter_sensitive=False): - if filter_sensitive and not current_user.is_admin: + if filter_sensitive and not (current_user.is_admin or current_user.is_auditor): if query == "*": query = current_user.sensitive_filter else: diff --git a/app/network/views.py b/app/network/views.py index d36e2b7..c193c6d 100644 --- a/app/network/views.py +++ b/app/network/views.py @@ -580,14 +580,14 @@ def create_domain(): @bp.route("/scopes") -@login_groups_accepted("admin") +@login_groups_accepted("admin", "auditor") def list_scopes(): scopes = models.NetworkScope.query.all() return render_template("network/scopes.html", scopes=scopes) @bp.route("/scopes/view/<name>") -@login_groups_accepted("admin") +@login_groups_accepted("admin", "auditor") def view_scope(name): scope = models.NetworkScope.query.filter_by(name=name).first_or_404() return render_template("network/view_scope.html", scope=scope) @@ -674,10 +674,10 @@ def retrieve_first_available_ip(network_id): @bp.route("/networks") -@login_groups_accepted("admin", "network") +@login_groups_accepted("admin", "auditor", "network") def list_networks(): networks = models.Network.query.all() - if not current_user.is_admin: + if not (current_user.is_admin or current_user.is_auditor): networks = [ network for network in networks if current_user.can_view_network(network) ] @@ -685,7 +685,7 @@ def list_networks(): @bp.route("/networks/view/<vlan_name>") -@login_groups_accepted("admin", "network") +@login_groups_accepted("admin", "auditor", "network") def view_network(vlan_name): network = models.Network.query.filter_by(vlan_name=vlan_name).first_or_404() if not current_user.can_view_network(network): diff --git a/app/settings.py b/app/settings.py index 3910352..cbdced7 100644 --- a/app/settings.py +++ b/app/settings.py @@ -74,6 +74,7 @@ LDAP_GET_GROUP_ATTRIBUTES = os.environ.get("LDAP_GET_GROUP_ATTRIBUTES", "cn").sp # on all CSENTRY_NETWORK_SCOPES_LDAP_GROUPS CSENTRY_LDAP_GROUPS = { "admin": ["ICS Control System Infrastructure group"], + "auditor": ["ICS Control System Infrastructure group"], "inventory": ["ICS Employees", "ICS Consultants"], } # Network scopes the user has access to based on LDAP groups diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py index de73920..7d89ab6 100644 --- a/tests/functional/conftest.py +++ b/tests/functional/conftest.py @@ -51,6 +51,7 @@ def app(request): "CACHE_NO_NULL_WARNING": True, "CSENTRY_LDAP_GROUPS": { "admin": ["CSEntry Admin"], + "auditor": ["CSEntry Auditor"], "inventory": ["CSEntry User"], }, "CSENTRY_NETWORK_SCOPES_LDAP_GROUPS": { @@ -160,6 +161,10 @@ def patch_ldap_authenticate(monkeypatch): response.status = AuthenticationResponseStatus.success response.user_info = {"cn": "Admin User", "mail": "admin@example.com"} response.user_groups = [{"cn": "CSEntry Admin"}] + elif username == "audit" and password == "auditpasswd": + response.status = AuthenticationResponseStatus.success + response.user_info = {"cn": "Auditor User", "mail": "audit@example.com"} + response.user_groups = [{"cn": "CSEntry Auditor"}] elif username == "user_rw" and password == "userrw": response.status = AuthenticationResponseStatus.success response.user_info = {"cn": "User RW", "mail": "user_rw@example.com"} diff --git a/tests/functional/test_api.py b/tests/functional/test_api.py index 79b70b1..c9f1a71 100644 --- a/tests/functional/test_api.py +++ b/tests/functional/test_api.py @@ -1189,11 +1189,12 @@ def test_get_interfaces_with_model( @pytest.mark.parametrize( - "user, password, expected_interfaces, expected_host3_result", + "username, password, expected_interfaces, expected_host3_result", [ ("user_ro", "userro", ["host2", "host1"], 0), ("user_prod", "userprod", ["host2", "host1", "host3"], 1), ("admin", "adminpasswd", ["host2", "host1", "host3"], 1), + ("audit", "auditpasswd", ["host2", "host1", "host3"], 1), ], ) def test_get_interfaces_with_sensitive_network( @@ -1202,7 +1203,7 @@ def test_get_interfaces_with_sensitive_network( network_factory, interface_factory, host_factory, - user, + username, password, expected_interfaces, expected_host3_result, @@ -1229,7 +1230,7 @@ def test_get_interfaces_with_sensitive_network( interface_factory(name=host2.name, network=network1, ip="192.168.1.10", host=host2) interface_factory(name=host3.name, network=network2, ip="192.168.2.10", host=host3) # Retrieve interfaces - token = get_token(client, user, password) + token = get_token(client, username, password) response = get(client, f"{API_URL}/network/interfaces", token=token) assert response.status_code == 200 interfaces = [interface["name"] for interface in response.get_json()] @@ -1702,9 +1703,10 @@ def test_get_hosts_recursive_items(client, item_factory, host_factory, admin_tok @pytest.mark.parametrize( - "user, password, expected_hosts, expected_host3_result", + "username, password, expected_hosts, expected_host3_result", [ ("admin", "adminpasswd", ["host1", "host2", "host3", "host4"], 1), + ("audit", "auditpasswd", ["host1", "host2", "host3", "host4"], 1), ("user_prod", "userprod", ["host1", "host2", "host3"], 1), ("user_ro", "userro", ["host1", "host2"], 0), ], @@ -1715,7 +1717,7 @@ def test_get_hosts_sensitive( network_factory, interface_factory, host_factory, - user, + username, password, expected_hosts, expected_host3_result, @@ -1744,7 +1746,7 @@ def test_get_hosts_sensitive( interface_factory(name=host2.name, network=network1, ip="192.168.1.11", host=host2) interface_factory(name=host3.name, network=network2, ip="192.168.2.10", host=host3) # Retrieve the hosts - token = get_token(client, user, password) + token = get_token(client, username, password) response = get(client, f"{API_URL}/network/hosts", token=token) assert response.status_code == 200 hosts = [host["name"] for host in response.get_json()] @@ -2053,6 +2055,9 @@ def test_search_hosts(client, host_factory, readonly_token): ("admin", "adminpasswd", "", 3), ("admin", "adminpasswd", "?q=beautiful", 2), ("admin", "adminpasswd", "?q=description:beautiful", 1), + ("audit", "auditpasswd", "", 3), + ("audit", "auditpasswd", "?q=beautiful", 2), + ("audit", "auditpasswd", "?q=description:beautiful", 1), ("user_ro", "userro", "", 1), ("user_ro", "userro", "?q=beautiful", 1), ("user_ro", "userro", "?q=description:beautiful", 0), @@ -2975,3 +2980,19 @@ def test_delete_cname_success(client, admin_token, cname): "network/cnames", models.Cname, ) + + +@pytest.mark.parametrize("url", ["/network/scopes", "/user/users"]) +@pytest.mark.parametrize( + "username, password, status_code", + [ + ("user_rw", "userrw", 403), + ("user_prod", "userprod", 403), + ("admin", "adminpasswd", 200), + ("audit", "auditpasswd", 200), + ], +) +def test_get_admin_protected_url(client, url, username, password, status_code): + token = get_token(client, username, password) + response = get(client, f"{API_URL}{url}", token=token) + assert response.status_code == status_code diff --git a/tests/functional/test_models.py b/tests/functional/test_models.py index 5b8f0dc..559db1c 100644 --- a/tests/functional/test_models.py +++ b/tests/functional/test_models.py @@ -31,6 +31,13 @@ def test_user_is_admin(user_factory): assert user.is_admin +def test_user_is_auditor(user_factory): + user = user_factory(groups=["foo", "CSEntry User"]) + assert not user.is_auditor + user = user_factory(groups=["foo", "CSEntry Auditor"]) + assert user.is_auditor + + def test_user_is_member_of_one_group(user_factory): user = user_factory(groups=["one", "two"]) assert not user.is_member_of_one_group(["network", "admin"]) @@ -42,6 +49,9 @@ def test_user_is_member_of_one_group(user_factory): assert not user.is_member_of_one_group(["network"]) assert user.is_member_of_one_group(["network", "admin"]) assert user.is_member_of_one_group(["admin"]) + user = user_factory(groups=["CSEntry Auditor"]) + assert not user.is_member_of_one_group(["network", "admin"]) + assert user.is_member_of_one_group(["auditor"]) def test_user_network_scopes(user_factory): diff --git a/tests/functional/test_web.py b/tests/functional/test_web.py index ecb5f9a..7e4da7b 100644 --- a/tests/functional/test_web.py +++ b/tests/functional/test_web.py @@ -85,14 +85,20 @@ def test_protected_url_get(url, client): @pytest.mark.parametrize("url", ["/network/scopes"]) -def test_admin_protected_url_get(url, client): - login(client, "user_rw", "userrw") +@pytest.mark.parametrize( + "username, password, status_code", + [ + ("user_rw", "userrw", 403), + ("user_prod", "userprod", 403), + ("admin", "adminpasswd", 200), + ("audit", "auditpasswd", 200), + ], +) +def test_admin_protected_url_get(client, url, username, password, status_code): + login(client, username, password) response = client.get(url) - assert response.status_code == 403 + assert response.status_code == status_code logout(client) - login(client, "admin", "adminpasswd") - response = client.get(url) - assert response.status_code == 200 @pytest.mark.parametrize( @@ -297,23 +303,21 @@ def test_retrieve_sensitive_hosts( assert r["data"][0]["name"] == host1.name logout(client) # Users member of the scope can see sensitive hosts - login(client, "user_prod", "userprod") - response = client.post( - "/network/_retrieve_hosts", - ) - r = response.get_json() - assert r["recordsTotal"] == 3 - assert r["recordsFiltered"] == 3 - assert len(r["data"]) == 3 - # admin users can see all hosts - login(client, "admin", "adminpasswd") - response = client.post( - "/network/_retrieve_hosts", - ) - r = response.get_json() - assert r["recordsTotal"] == 3 - assert r["recordsFiltered"] == 3 - assert len(r["data"]) == 3 + # Same for admin and auditor users + for (user, passwd) in ( + ("user_prod", "userprod"), + ("admin", "adminpasswd"), + ("audit", "auditpasswd"), + ): + login(client, user, passwd) + response = client.post( + "/network/_retrieve_hosts", + ) + r = response.get_json() + assert r["recordsTotal"] == 3 + assert r["recordsFiltered"] == 3 + assert len(r["data"]) == 3 + logout(client) def test_delete_interface_from_index( @@ -967,6 +971,8 @@ def test_create_network_scope_overlapping(network_scope_factory, logged_admin_cl ("user_lab", "userlab", True, 403), ("user_prod", "userprod", False, 200), ("user_prod", "userprod", True, 200), + ("audit", "auditpasswd", False, 200), + ("audit", "auditpasswd", True, 200), ], ) @pytest.mark.parametrize("admin_only", [False, True]) @@ -1028,18 +1034,18 @@ def test_view_networks(client, network_scope_factory, network_factory): assert network3.vlan_name not in str(response.data) logout(client) # user_prod user can see all networks - login(client, "user_prod", "userprod") - response = client.get("/network/networks") - assert network1.vlan_name in str(response.data) - assert network2.vlan_name in str(response.data) - assert network3.vlan_name in str(response.data) - logout(client) - # admin can see all networks - login(client, "admin", "adminpasswd") - response = client.get("/network/networks") - assert network1.vlan_name in str(response.data) - assert network2.vlan_name in str(response.data) - assert network3.vlan_name in str(response.data) + # Same for admin and auditor users + for (user, passwd) in ( + ("user_prod", "userprod"), + ("admin", "adminpasswd"), + ("audit", "auditpasswd"), + ): + login(client, "user_prod", "userprod") + response = client.get("/network/networks") + assert network1.vlan_name in str(response.data) + assert network2.vlan_name in str(response.data) + assert network3.vlan_name in str(response.data) + logout(client) def test_retrieve_groups(logged_client, ansible_group_factory): -- GitLab