From 12e8a5c9057d057b98281521032be41881a1d4ea Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Wed, 30 Aug 2023 15:55:18 -0400 Subject: [PATCH] [wip] add tests for ServiceGeoserver with API route support --- config/providers.cfg | 2 +- tests/interfaces.py | 33 ++++++++++++++----- tests/test_services.py | 75 ++++++++++++++++++++++++++++++++++++++---- 3 files changed, 93 insertions(+), 17 deletions(-) diff --git a/config/providers.cfg b/config/providers.cfg index 66ef921d5..519d38054 100644 --- a/config/providers.cfg +++ b/config/providers.cfg @@ -139,7 +139,7 @@ providers: title: geoserver public: true c4i: false - type: wfs + type: geoserver sync_type: wfs geoserver-web: diff --git a/tests/interfaces.py b/tests/interfaces.py index 2722bfaf3..0a0b86363 100644 --- a/tests/interfaces.py +++ b/tests/interfaces.py @@ -6902,19 +6902,34 @@ def test_GetResourceTypes_ServiceGeoserver(self): override_resource_types=res_types ) - # nested file/dirs always allowed at any level expect under File - res_types_allowed = [Layer.resource_type_name, Process.resource_type_name, Workspace.resource_type_name] - struct_allowed = { - Service.resource_type_name: [Workspace.resource_type_name], - Workspace.resource_type_name: [Layer.resource_type_name, Process.resource_type_name], - Layer.resource_type_name: [], - Process.resource_type_name: [], - } + if TestVersion(self.version) >= TestVersion("3.35.0"): + res_types_allowed = [ + Route.resource_type_name, + Layer.resource_type_name, + Process.resource_type_name, + Workspace.resource_type_name, + ] + struct_allowed = { + Service.resource_type_name: [Workspace.resource_type_name, Route.resource_type_name], + Route.resource_type_name: [Workspace.resource_type_name, Route.resource_type_name], + Workspace.resource_type_name: [Layer.resource_type_name, Process.resource_type_name], + Layer.resource_type_name: [], + Process.resource_type_name: [], + } + else: + # nested file/dirs always allowed at any level expect under File + res_types_allowed = [Layer.resource_type_name, Process.resource_type_name, Workspace.resource_type_name] + struct_allowed = { + Service.resource_type_name: [Workspace.resource_type_name], + Workspace.resource_type_name: [Layer.resource_type_name, Process.resource_type_name], + Layer.resource_type_name: [], + Process.resource_type_name: [], + } path = "/services/{}".format(self.test_service_name) resp = utils.test_request(self, "GET", path, headers=self.json_headers, cookies=self.cookies) body = utils.check_response_basic_info(resp) - svc = body["service"] + svc = body["service"] # type: JSON utils.check_val_is_in("resource_child_allowed", svc) utils.check_val_is_in("resource_types_allowed", svc) utils.check_val_is_in("resource_structure_allowed", svc) diff --git a/tests/test_services.py b/tests/test_services.py index 91abdf010..b0d29e83d 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -1010,6 +1010,8 @@ def test_ServiceGeoserver_effective_permissions(self): gi: GetFeatureInfo permission (WMS Layer) gm: GetMap (WMS Layer) dp: DescribeProcess permission (WPS Process) + r: Read permission (API Route) + w: Write permission (API Route) A: allow D: deny M: match (makes sense only on layer for any permission except GetCapabilities on service itself) @@ -1031,6 +1033,10 @@ def test_ServiceGeoserver_effective_permissions(self): Layer21 (gf-D-M) dp-D, gf-D, gi-D (revoke access, user > group) Layer22 (gf-D-M) dp-D, gf-D, gi-D (revoke access, both groups) Layer23 (gf-A-M) (gf-D-M) dp-D, gf-A, gi-D (allowed access, user > group) + Route1 r-D, w-D (denied default) + Route2 (r-A-R) r-A, w-D + Route3 (r-A-R) (w-A-R) r-A, w-A + Route4 (w-D-M) r-A, w-D (revoked access) .. note:: Permissions that do not applied to a given sub-:term:`OWS` implementation are automatically denied. @@ -1052,6 +1058,10 @@ def test_ServiceGeoserver_effective_permissions(self): p12_name = "process12" p13_name = "process13" p14_name = "process14" + r1_name = "route1" + r2_name = "route2" + r3_name = "route3" + r4_name = "route4" utils.TestSetup.delete_TestService(self, svc1_name) svc1_id, w1_id = utils.TestSetup.create_TestServiceResourceTree( @@ -1122,6 +1132,34 @@ def test_ServiceGeoserver_effective_permissions(self): override_resource_type=models.Layer.resource_type_name ) l23_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + info = utils.TestSetup.create_TestResource( + self, + parent_resource_id=svc1_id, + override_resource_name=r1_name, + override_resource_type=models.Route.resource_type_name + ) + r1_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + info = utils.TestSetup.create_TestResource( + self, + parent_resource_id=r1_id, + override_resource_name=r2_name, + override_resource_type=models.Route.resource_type_name + ) + r2_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + info = utils.TestSetup.create_TestResource( + self, + parent_resource_id=svc1_id, + override_resource_name=r3_name, + override_resource_type=models.Route.resource_type_name + ) + r3_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + info = utils.TestSetup.create_TestResource( + self, + parent_resource_id=r3_id, + override_resource_name=r4_name, + override_resource_type=models.Route.resource_type_name + ) + r4_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] # create permissions gcAR = PermissionSet(Permission.GET_CAPABILITIES, Access.ALLOW, Scope.RECURSIVE) # noqa @@ -1134,6 +1172,9 @@ def test_ServiceGeoserver_effective_permissions(self): gmDR = PermissionSet(Permission.GET_MAP, Access.DENY, Scope.RECURSIVE) # noqa dpAR = PermissionSet(Permission.DESCRIBE_PROCESS, Access.ALLOW, Scope.RECURSIVE) # noqa dpDM = PermissionSet(Permission.DESCRIBE_PROCESS, Access.DENY, Scope.MATCH) # noqa + rAR = PermissionSet(Permission.READ, Access.ALLOW, Scope.RECURSIVE) # noqa + wAR = PermissionSet(Permission.WRITE, Access.ALLOW, Scope.RECURSIVE) # noqa + wDM = PermissionSet(Permission.WRITE, Access.DENY, Scope.MATCH) # noqa utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=svc1_id, override_permission=gcAR) utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=svc1_id, override_permission=gmDR) utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=w1_id, override_permission=gfAR) @@ -1149,6 +1190,10 @@ def test_ServiceGeoserver_effective_permissions(self): utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l22_id, override_permission=gfDM) utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l23_id, override_permission=gfAM) utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l23_id, override_permission=gfDM) + utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=r2_id, override_permission=rAR) + utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=r3_id, override_permission=rAR) + utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=r3_id, override_permission=wAR) + utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=r4_id, override_permission=wDM) # login test user for which the permissions were set self.login_test_user() @@ -1166,9 +1211,9 @@ def _scope(workspace, layer): # type: (Str, Str) -> Str return "{}:{}".format(workspace, layer) - def _test(_path, _params, allow): - # type: (Str, Dict[Str, Str], bool) -> None - req = self.mock_request(_path, method="GET", params=_params) + def _test(_path, _params, allow, method="GET"): + # type: (Str, Dict[Str, Str], bool, Str) -> None + req = self.mock_request(_path, method=method, params=_params) if allow: utils.check_no_raise(lambda: self.ows.check_request(req), msg=_msg(_path, _params)) else: @@ -1205,6 +1250,22 @@ def _test(_path, _params, allow): w2_l2 = _scope(w2_name, l22_name) w2_l3 = _scope(w2_name, l23_name) + # API endpoints only valid with exact paths + r1_path = "{}/{}".format(svc_path, r1_name) + r2_path = "{}/{}".format(svc_path, r2_name) + r3_path = "{}/{}".format(svc_path, r3_name) + r4_path = "{}/{}".format(svc_path, r4_name) + _test(r1_path, {}, method="GET", allow=False) + _test(r2_path, {}, method="GET", allow=True) + _test(r3_path, {}, method="GET", allow=True) + _test(r4_path, {}, method="GET", allow=True) + _test(r1_path, {}, method="POST", allow=False) + _test(r2_path, {}, method="POST", allow=False) + _test(r3_path, {}, method="POST", allow=True) + _test(r4_path, {}, method="POST", allow=False) + _test("/random", {}, method="GET", allow=False) + _test("/random", {}, method="POST", allow=False) + # Layer1, mismatching permission for WMS _test(svc_wms_path, {"request": Permission.GET_FEATURE.title, "layers": w1_l1}, allow=False) _test(w1_wms_path, {"request": Permission.GET_FEATURE.title, "layers": w1_l1}, allow=False) @@ -1402,13 +1463,13 @@ def _add_both(value): layer_permutes = itertools.permutations([w1_l1, w1_l2, w1_l3, w2_l1, w2_l2, w2_l3], quantity) for layer_combo in layer_permutes: query = ",".join(layer_combo) - allow = not any(layer_deny in layer_combo for layer_deny in [w1_l2, w2_l1, w2_l2]) - _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow) + allow_layer = not any(layer_deny in layer_combo for layer_deny in [w1_l2, w2_l1, w2_l2]) + _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow_layer) # multiple layers nested under same workspace will work for request with Workspace isolated path # otherwise, automatic deny regardless if they were allowed during request without workspace in path - allow_w1 = allow and all(layer.startswith(w1_name) for layer in layer_combo) - allow_w2 = allow and all(layer.startswith(w2_name) for layer in layer_combo) + allow_w1 = allow_layer and all(layer.startswith(w1_name) for layer in layer_combo) + allow_w2 = allow_layer and all(layer.startswith(w2_name) for layer in layer_combo) _test(w1_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow_w1) _test(w2_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow_w2)