Skip to content

Commit

Permalink
Fix Hub token permissions
Browse files Browse the repository at this point in the history
  • Loading branch information
Xpirix committed Nov 21, 2024
1 parent 1bad3a0 commit cac8796
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 10 deletions.
35 changes: 35 additions & 0 deletions qgis-app/api/permissions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from rest_framework import permissions
from rest_framework.permissions import BasePermission
from rest_framework_simplejwt.authentication import JWTAuthentication
from django.contrib.auth.models import User
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
from rest_framework_simplejwt.token_blacklist.models import BlacklistedToken, OutstandingToken
import datetime
from api.models import UserOutstandingToken

MANAGER_GROUP = "Style Managers"

Expand All @@ -21,3 +28,31 @@ def has_object_permission(self, request, view, obj):
is_manager = user.groups.filter(name=MANAGER_GROUP).exists()

return user == obj.creator or user.is_staff or is_manager

class HasValidToken(BasePermission):
def has_permission(self, request, view):
auth_token = request.META.get("HTTP_AUTHORIZATION")
if not str(auth_token).startswith('Bearer'):
return False

# Validate JWT token
authentication = JWTAuthentication()
try:
validated_token = authentication.get_validated_token(auth_token[7:])
user_id = validated_token.payload.get('user_id')
jti = validated_token.payload.get('refresh_jti')
token_id = OutstandingToken.objects.get(jti=jti).pk
is_blacklisted = BlacklistedToken.objects.filter(token_id=token_id).exists()
if not user_id or is_blacklisted:
return False

user = User.objects.get(pk=user_id)
if not user:
return False
user_token = UserOutstandingToken.objects.get(token__pk=token_id, user=user)
user_token.last_used_on = datetime.datetime.now()
user_token.save()
request.user_token = user_token
return True
except (InvalidToken, TokenError):
return False
267 changes: 262 additions & 5 deletions qgis-app/api/tests/test_resources_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from django.core.files.uploadedfile import SimpleUploadedFile
from wavefronts.models import Wavefront

from rest_framework_simplejwt.token_blacklist.models import OutstandingToken
from api.models import UserOutstandingToken

GPKG_DIR = join(dirname(dirname(dirname(__file__))), "geopackages", "tests", "gpkgfiles")
LAYERDEFINITION_DIR = join(dirname(dirname(dirname(__file__))), "layerdefinitions", "tests", "testfiles")
MODELS_DIR = join(dirname(dirname(dirname(__file__))), "models", "tests", "modelfiles")
Expand Down Expand Up @@ -50,8 +53,21 @@ def setUp(self):
super().setUp()
self.client = APIClient()
self.user = User.objects.create_user(username='testuser', password='testpass')
self.token = RefreshToken.for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.token.access_token}')
self.client.login(username='testuser', password='testpass')
self.refresh = RefreshToken.for_user(self.user)
self.outstanding_token = OutstandingToken.objects.get(jti=self.refresh['jti'])
self.user_token = UserOutstandingToken.objects.create(
user=self.user,
token=self.outstanding_token,
is_blacklisted=False,
is_newly_created=True
)
self.url = reverse('user_token_detail', args=[self.user_token.pk])
response = self.client.get(self.url)
self.assertEqual(response.status_code, 200)
self.client.logout()
access_token = response.context.get('access_token')
self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {access_token}')

def test_create_geopackage(self):
url = reverse('resource-create')
Expand Down Expand Up @@ -202,6 +218,44 @@ def test_create_unsupported_resource_type(self):
self.assertEqual(response.status_code, 400)
self.assertEqual(response.data, {"resource_type": "Resource type not supported"})

def test_create_with_invalid_token(self):
url = reverse('resource-create')
self.client.credentials(HTTP_AUTHORIZATION='Bearer invalidtoken')
uploaded_thumbnail = SimpleUploadedFile(
self.thumbnail_content.name, self.thumbnail_content.read()
)
uploaded_gpkg = SimpleUploadedFile(
self.gpkg_file_content.name, self.gpkg_file_content.read()
)
data = {
"resource_type": "geopackage",
"name": "Test Geopackage",
"description": "A test geopackage",
"thumbnail_full": uploaded_thumbnail,
"file": uploaded_gpkg,
}
response = self.client.post(url, data, format='multipart')
self.assertEqual(response.status_code, 401)

def test_create_with_blacklisted_token(self):
url = reverse('resource-create')
self.refresh.blacklist()
uploaded_thumbnail = SimpleUploadedFile(
self.thumbnail_content.name, self.thumbnail_content.read()
)
uploaded_gpkg = SimpleUploadedFile(
self.gpkg_file_content.name, self.gpkg_file_content.read()
)
data = {
"resource_type": "geopackage",
"name": "Test Geopackage",
"description": "A test geopackage",
"thumbnail_full": uploaded_thumbnail,
"file": uploaded_gpkg,
}
response = self.client.post(url, data, format='multipart')
self.assertEqual(response.status_code, 403)


@override_settings(MEDIA_ROOT=tempfile.mkdtemp())
class TestResourceDetailView(SetUpTest, TestCase):
Expand All @@ -210,8 +264,21 @@ def setUp(self):
super().setUp()
self.client = APIClient()
self.user = User.objects.create_user(username='testuser', password='testpass')
self.token = RefreshToken.for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.token.access_token}')
self.client.login(username='testuser', password='testpass')
self.refresh = RefreshToken.for_user(self.user)
self.outstanding_token = OutstandingToken.objects.get(jti=self.refresh['jti'])
self.user_token = UserOutstandingToken.objects.create(
user=self.user,
token=self.outstanding_token,
is_blacklisted=False,
is_newly_created=True
)
self.url = reverse('user_token_detail', args=[self.user_token.pk])
response = self.client.get(self.url)
self.assertEqual(response.status_code, 200)
self.client.logout()
access_token = response.context.get('access_token')
self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {access_token}')

uploaded_thumbnail = SimpleUploadedFile(
self.thumbnail_content.name, self.thumbnail_content.read()
Expand Down Expand Up @@ -406,4 +473,194 @@ def test_delete_wavefront(self):
url = reverse("resource-detail", kwargs={"uuid": self.wavefront.uuid, "resource_type": "3dmodel"})
response = self.client.delete(url)
self.assertEqual(response.status_code, 204)
self.assertFalse(Wavefront.objects.filter(uuid=self.wavefront.uuid).exists())
self.assertFalse(Wavefront.objects.filter(uuid=self.wavefront.uuid).exists())

def test_update_geopackage_with_invalid_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.geopackage.uuid, "resource_type": "geopackage"})
self.client.credentials(HTTP_AUTHORIZATION='Bearer invalidtoken')
data = {
"name": "Updated Geopackage",
"description": "Updated description",
"thumbnail_full": self.geopackage.thumbnail_image,
"file": self.geopackage.file,
}
response = self.client.put(url, data, format="multipart")
self.assertEqual(response.status_code, 401)

def test_update_geopackage_with_blacklisted_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.geopackage.uuid, "resource_type": "geopackage"})
self.refresh.blacklist()
data = {
"name": "Updated Geopackage",
"description": "Updated description",
"thumbnail_full": self.geopackage.thumbnail_image,
"file": self.geopackage.file,
}
response = self.client.put(url, data, format="multipart")
self.assertEqual(response.status_code, 403)

def test_update_layerdefinition_with_invalid_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.layerdefinition.uuid, "resource_type": "layerdefinition"})
self.client.credentials(HTTP_AUTHORIZATION='Bearer invalidtoken')
data = {
"name": "Updated Layer Definition",
"description": "Updated description",
"thumbnail_full": self.layerdefinition.thumbnail_image,
"file": self.layerdefinition.file,
}
response = self.client.put(url, data, format="multipart")
self.assertEqual(response.status_code, 401)

def test_update_layerdefinition_with_blacklisted_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.layerdefinition.uuid, "resource_type": "layerdefinition"})
self.refresh.blacklist()
data = {
"name": "Updated Layer Definition",
"description": "Updated description",
"thumbnail_full": self.layerdefinition.thumbnail_image,
"file": self.layerdefinition.file,
}
response = self.client.put(url, data, format="multipart")
self.assertEqual(response.status_code, 403)

def test_update_model_with_invalid_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.model.uuid, "resource_type": "model"})
self.client.credentials(HTTP_AUTHORIZATION='Bearer invalidtoken')
data = {
"name": "Updated Model",
"description": "Updated description",
"thumbnail_full": self.model.thumbnail_image,
"file": self.model.file,
}
response = self.client.put(url, data, format="multipart")
self.assertEqual(response.status_code, 401)

def test_update_model_with_blacklisted_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.model.uuid, "resource_type": "model"})
self.refresh.blacklist()
data = {
"name": "Updated Model",
"description": "Updated description",
"thumbnail_full": self.model.thumbnail_image,
"file": self.model.file,
}
response = self.client.put(url, data, format="multipart")
self.assertEqual(response.status_code, 403)

def test_update_style_with_invalid_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.style.uuid, "resource_type": "style"})
self.client.credentials(HTTP_AUTHORIZATION='Bearer invalidtoken')
data = {
"name": "Updated Style",
"description": "Updated description",
"thumbnail_full": self.style.thumbnail_image,
"file": self.style.file,
}
response = self.client.put(url, data, format="multipart")
self.assertEqual(response.status_code, 401)

def test_update_style_with_blacklisted_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.style.uuid, "resource_type": "style"})
self.refresh.blacklist()
data = {
"name": "Updated Style",
"description": "Updated description",
"thumbnail_full": self.style.thumbnail_image,
"file": self.style.file,
}
response = self.client.put(url, data, format="multipart")
self.assertEqual(response.status_code, 403)

def test_update_wavefront_with_invalid_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.wavefront.uuid, "resource_type": "3dmodel"})
self.client.credentials(HTTP_AUTHORIZATION='Bearer invalidtoken')
data = {
"name": "Updated 3D Model",
"description": "Updated description",
"thumbnail_full": self.wavefront.thumbnail_image,
"file": self.wavefront.file,
}
response = self.client.put(url, data, format="multipart")
self.assertEqual(response.status_code, 401)

def test_update_wavefront_with_blacklisted_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.wavefront.uuid, "resource_type": "3dmodel"})
self.refresh.blacklist()
data = {
"name": "Updated 3D Model",
"description": "Updated description",
"thumbnail_full": self.wavefront.thumbnail_image,
"file": self.wavefront.file,
}
response = self.client.put(url, data, format="multipart")
self.assertEqual(response.status_code, 403)

def test_delete_geopackage_with_invalid_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.geopackage.uuid, "resource_type": "geopackage"})
self.client.credentials(HTTP_AUTHORIZATION='Bearer invalidtoken')
response = self.client.delete(url)
self.assertEqual(response.status_code, 401)
self.assertTrue(Geopackage.objects.filter(uuid=self.geopackage.uuid).exists())

def test_delete_geopackage_with_blacklisted_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.geopackage.uuid, "resource_type": "geopackage"})
self.refresh.blacklist()
response = self.client.delete(url)
self.assertEqual(response.status_code, 403)
self.assertTrue(Geopackage.objects.filter(uuid=self.geopackage.uuid).exists())

def test_delete_layerdefinition_with_invalid_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.layerdefinition.uuid, "resource_type": "layerdefinition"})
self.client.credentials(HTTP_AUTHORIZATION='Bearer invalidtoken')
response = self.client.delete(url)
self.assertEqual(response.status_code, 401)
self.assertTrue(LayerDefinition.objects.filter(uuid=self.layerdefinition.uuid).exists())

def test_delete_layerdefinition_with_blacklisted_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.layerdefinition.uuid, "resource_type": "layerdefinition"})
self.refresh.blacklist()
response = self.client.delete(url)
self.assertEqual(response.status_code, 403)
self.assertTrue(LayerDefinition.objects.filter(uuid=self.layerdefinition.uuid).exists())

def test_delete_model_with_invalid_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.model.uuid, "resource_type": "model"})
self.client.credentials(HTTP_AUTHORIZATION='Bearer invalidtoken')
response = self.client.delete(url)
self.assertEqual(response.status_code, 401)
self.assertTrue(Model.objects.filter(uuid=self.model.uuid).exists())

def test_delete_model_with_blacklisted_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.model.uuid, "resource_type": "model"})
self.refresh.blacklist()
response = self.client.delete(url)
self.assertEqual(response.status_code, 403)
self.assertTrue(Model.objects.filter(uuid=self.model.uuid).exists())

def test_delete_style_with_invalid_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.style.uuid, "resource_type": "style"})
self.client.credentials(HTTP_AUTHORIZATION='Bearer invalidtoken')
response = self.client.delete(url)
self.assertEqual(response.status_code, 401)
self.assertTrue(Style.objects.filter(uuid=self.style.uuid).exists())

def test_delete_style_with_blacklisted_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.style.uuid, "resource_type": "style"})
self.refresh.blacklist()
response = self.client.delete(url)
self.assertEqual(response.status_code, 403)
self.assertTrue(Style.objects.filter(uuid=self.style.uuid).exists())

def test_delete_wavefront_with_invalid_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.wavefront.uuid, "resource_type": "3dmodel"})
self.client.credentials(HTTP_AUTHORIZATION='Bearer invalidtoken')
response = self.client.delete(url)
self.assertEqual(response.status_code, 401)
self.assertTrue(Wavefront.objects.filter(uuid=self.wavefront.uuid).exists())

def test_delete_wavefront_with_blacklisted_token(self):
url = reverse("resource-detail", kwargs={"uuid": self.wavefront.uuid, "resource_type": "3dmodel"})
self.refresh.blacklist()
response = self.client.delete(url)
self.assertEqual(response.status_code, 403)
self.assertTrue(Wavefront.objects.filter(uuid=self.wavefront.uuid).exists())
Loading

0 comments on commit cac8796

Please sign in to comment.