Skip to content

Commit

Permalink
[wip] add tests for ServiceGeoserver with API route support
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Aug 30, 2023
1 parent 4ea3cbc commit 12e8a5c
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 17 deletions.
2 changes: 1 addition & 1 deletion config/providers.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ providers:
title: geoserver
public: true
c4i: false
type: wfs
type: geoserver
sync_type: wfs

geoserver-web:
Expand Down
33 changes: 24 additions & 9 deletions tests/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -6902,19 +6902,34 @@ def test_GetResourceTypes_ServiceGeoserver(self):
override_resource_types=res_types
)

# nested file/dirs always allowed at any level expect under File
res_types_allowed = [Layer.resource_type_name, Process.resource_type_name, Workspace.resource_type_name]
struct_allowed = {
Service.resource_type_name: [Workspace.resource_type_name],
Workspace.resource_type_name: [Layer.resource_type_name, Process.resource_type_name],
Layer.resource_type_name: [],
Process.resource_type_name: [],
}
if TestVersion(self.version) >= TestVersion("3.35.0"):
res_types_allowed = [
Route.resource_type_name,
Layer.resource_type_name,
Process.resource_type_name,
Workspace.resource_type_name,
]
struct_allowed = {
Service.resource_type_name: [Workspace.resource_type_name, Route.resource_type_name],
Route.resource_type_name: [Workspace.resource_type_name, Route.resource_type_name],
Workspace.resource_type_name: [Layer.resource_type_name, Process.resource_type_name],
Layer.resource_type_name: [],
Process.resource_type_name: [],
}
else:
# nested file/dirs always allowed at any level expect under File
res_types_allowed = [Layer.resource_type_name, Process.resource_type_name, Workspace.resource_type_name]
struct_allowed = {
Service.resource_type_name: [Workspace.resource_type_name],
Workspace.resource_type_name: [Layer.resource_type_name, Process.resource_type_name],
Layer.resource_type_name: [],
Process.resource_type_name: [],
}

path = "/services/{}".format(self.test_service_name)
resp = utils.test_request(self, "GET", path, headers=self.json_headers, cookies=self.cookies)
body = utils.check_response_basic_info(resp)
svc = body["service"]
svc = body["service"] # type: JSON
utils.check_val_is_in("resource_child_allowed", svc)
utils.check_val_is_in("resource_types_allowed", svc)
utils.check_val_is_in("resource_structure_allowed", svc)
Expand Down
75 changes: 68 additions & 7 deletions tests/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,8 @@ def test_ServiceGeoserver_effective_permissions(self):
gi: GetFeatureInfo permission (WMS Layer)
gm: GetMap (WMS Layer)
dp: DescribeProcess permission (WPS Process)
r: Read permission (API Route)
w: Write permission (API Route)
A: allow
D: deny
M: match (makes sense only on layer for any permission except GetCapabilities on service itself)
Expand All @@ -1031,6 +1033,10 @@ def test_ServiceGeoserver_effective_permissions(self):
Layer21 (gf-D-M) dp-D, gf-D, gi-D (revoke access, user > group)
Layer22 (gf-D-M) dp-D, gf-D, gi-D (revoke access, both groups)
Layer23 (gf-A-M) (gf-D-M) dp-D, gf-A, gi-D (allowed access, user > group)
Route1 r-D, w-D (denied default)
Route2 (r-A-R) r-A, w-D
Route3 (r-A-R) (w-A-R) r-A, w-A
Route4 (w-D-M) r-A, w-D (revoked access)
.. note::
Permissions that do not applied to a given sub-:term:`OWS` implementation are automatically denied.
Expand All @@ -1052,6 +1058,10 @@ def test_ServiceGeoserver_effective_permissions(self):
p12_name = "process12"
p13_name = "process13"
p14_name = "process14"
r1_name = "route1"
r2_name = "route2"
r3_name = "route3"
r4_name = "route4"

utils.TestSetup.delete_TestService(self, svc1_name)
svc1_id, w1_id = utils.TestSetup.create_TestServiceResourceTree(
Expand Down Expand Up @@ -1122,6 +1132,34 @@ def test_ServiceGeoserver_effective_permissions(self):
override_resource_type=models.Layer.resource_type_name
)
l23_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=svc1_id,
override_resource_name=r1_name,
override_resource_type=models.Route.resource_type_name
)
r1_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=r1_id,
override_resource_name=r2_name,
override_resource_type=models.Route.resource_type_name
)
r2_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=svc1_id,
override_resource_name=r3_name,
override_resource_type=models.Route.resource_type_name
)
r3_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=r3_id,
override_resource_name=r4_name,
override_resource_type=models.Route.resource_type_name
)
r4_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]

# create permissions
gcAR = PermissionSet(Permission.GET_CAPABILITIES, Access.ALLOW, Scope.RECURSIVE) # noqa
Expand All @@ -1134,6 +1172,9 @@ def test_ServiceGeoserver_effective_permissions(self):
gmDR = PermissionSet(Permission.GET_MAP, Access.DENY, Scope.RECURSIVE) # noqa
dpAR = PermissionSet(Permission.DESCRIBE_PROCESS, Access.ALLOW, Scope.RECURSIVE) # noqa
dpDM = PermissionSet(Permission.DESCRIBE_PROCESS, Access.DENY, Scope.MATCH) # noqa
rAR = PermissionSet(Permission.READ, Access.ALLOW, Scope.RECURSIVE) # noqa
wAR = PermissionSet(Permission.WRITE, Access.ALLOW, Scope.RECURSIVE) # noqa
wDM = PermissionSet(Permission.WRITE, Access.DENY, Scope.MATCH) # noqa
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=svc1_id, override_permission=gcAR)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=svc1_id, override_permission=gmDR)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=w1_id, override_permission=gfAR)
Expand All @@ -1149,6 +1190,10 @@ def test_ServiceGeoserver_effective_permissions(self):
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l22_id, override_permission=gfDM)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l23_id, override_permission=gfAM)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l23_id, override_permission=gfDM)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=r2_id, override_permission=rAR)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=r3_id, override_permission=rAR)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=r3_id, override_permission=wAR)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=r4_id, override_permission=wDM)

# login test user for which the permissions were set
self.login_test_user()
Expand All @@ -1166,9 +1211,9 @@ def _scope(workspace, layer):
# type: (Str, Str) -> Str
return "{}:{}".format(workspace, layer)

def _test(_path, _params, allow):
# type: (Str, Dict[Str, Str], bool) -> None
req = self.mock_request(_path, method="GET", params=_params)
def _test(_path, _params, allow, method="GET"):
# type: (Str, Dict[Str, Str], bool, Str) -> None
req = self.mock_request(_path, method=method, params=_params)
if allow:
utils.check_no_raise(lambda: self.ows.check_request(req), msg=_msg(_path, _params))
else:
Expand Down Expand Up @@ -1205,6 +1250,22 @@ def _test(_path, _params, allow):
w2_l2 = _scope(w2_name, l22_name)
w2_l3 = _scope(w2_name, l23_name)

# API endpoints only valid with exact paths
r1_path = "{}/{}".format(svc_path, r1_name)
r2_path = "{}/{}".format(svc_path, r2_name)
r3_path = "{}/{}".format(svc_path, r3_name)
r4_path = "{}/{}".format(svc_path, r4_name)
_test(r1_path, {}, method="GET", allow=False)
_test(r2_path, {}, method="GET", allow=True)
_test(r3_path, {}, method="GET", allow=True)
_test(r4_path, {}, method="GET", allow=True)
_test(r1_path, {}, method="POST", allow=False)
_test(r2_path, {}, method="POST", allow=False)
_test(r3_path, {}, method="POST", allow=True)
_test(r4_path, {}, method="POST", allow=False)
_test("/random", {}, method="GET", allow=False)
_test("/random", {}, method="POST", allow=False)

# Layer1, mismatching permission for WMS
_test(svc_wms_path, {"request": Permission.GET_FEATURE.title, "layers": w1_l1}, allow=False)
_test(w1_wms_path, {"request": Permission.GET_FEATURE.title, "layers": w1_l1}, allow=False)
Expand Down Expand Up @@ -1402,13 +1463,13 @@ def _add_both(value):
layer_permutes = itertools.permutations([w1_l1, w1_l2, w1_l3, w2_l1, w2_l2, w2_l3], quantity)
for layer_combo in layer_permutes:
query = ",".join(layer_combo)
allow = not any(layer_deny in layer_combo for layer_deny in [w1_l2, w2_l1, w2_l2])
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow)
allow_layer = not any(layer_deny in layer_combo for layer_deny in [w1_l2, w2_l1, w2_l2])
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow_layer)

# multiple layers nested under same workspace will work for request with Workspace isolated path
# otherwise, automatic deny regardless if they were allowed during request without workspace in path
allow_w1 = allow and all(layer.startswith(w1_name) for layer in layer_combo)
allow_w2 = allow and all(layer.startswith(w2_name) for layer in layer_combo)
allow_w1 = allow_layer and all(layer.startswith(w1_name) for layer in layer_combo)
allow_w2 = allow_layer and all(layer.startswith(w2_name) for layer in layer_combo)
_test(w1_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow_w1)
_test(w2_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow_w2)

Expand Down

0 comments on commit 12e8a5c

Please sign in to comment.