Skip to content

Commit

Permalink
fix: remove useless tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mumarkhan999 committed Aug 20, 2024
1 parent 815c5df commit 76f5112
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 154 deletions.
62 changes: 31 additions & 31 deletions lti_consumer/lti_1p3/key_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import copy
import json
import math
import time
import sys
import time
import logging

import jwt
from Cryptodome.PublicKey import RSA
from edx_django_utils.monitoring import function_trace

from . import exceptions
Expand Down Expand Up @@ -60,7 +59,7 @@ def __init__(self, public_key=None, keyset_url=None):
)
raise exceptions.InvalidRsaKey() from err

def _get_keyset(self, kid=None):
def _get_keyset(self, kid=None): # pylint: disable=unused-argument
"""
Get keyset from available sources.
Expand All @@ -85,12 +84,6 @@ def _get_keyset(self, kid=None):
keyset.extend(keys)

if self.public_key:
if kid:
# Fill in key id of stored key.
# This is needed because if the JWS is signed with a
# key with a kid, pyjwkest doesn't match them with
# keys without kid (kid=None) and fails verification
self.public_key.kid = kid
# Add to keyset
keyset.append(self.public_key)

Expand All @@ -105,25 +98,30 @@ def validate_and_decode(self, token):
The authorization server decodes the JWT and MUST validate the values for the
iss, sub, exp, aud and jti claims.
"""
try:
key_set = self._get_keyset()
if not key_set:
raise exceptions.NoSuitableKeys()
for i in range(len(key_set)):
try:
message = jwt.decode(
token,
key=key_set[i],
algorithms=['RS256', 'RS512',],
options={'verify_signature': True}
)
return message
except Exception:
if i == len(key_set) - 1:
raise
except Exception as token_error:
exc_info = sys.exc_info()
raise jwt.InvalidTokenError(exc_info[2]) from token_error
key_set = self._get_keyset()

for i, obj in enumerate(key_set):
try:
if hasattr(obj, 'key'):
key = obj.key
else:
key = obj

message = jwt.decode(
token,
key,
algorithms=['RS256', 'RS512',],
options={
'verify_signature': True,
'verify_aud': False
}
)
return message
except Exception: # pylint: disable=broad-except
if i == len(key_set) - 1:
raise

raise exceptions.NoSuitableKeys()


class PlatformKeyHandler:
Expand All @@ -134,7 +132,7 @@ class PlatformKeyHandler:
encoding JWT messages and exporting public keys.
"""
@function_trace('lti_consumer.key_handlers.PlatformKeyHandler.__init__')
def __init__(self, key_pem, kid=None):
def __init__(self, key_pem, kid=None): # pylint: disable=unused-argument
"""
Import Key when instancing class if a key is present.
"""
Expand Down Expand Up @@ -190,7 +188,7 @@ def get_public_jwk(self):
jwk['keys'].append(json.loads(algo_obj.to_jwk(public_key)))
return jwk

def validate_and_decode(self, token, iss=None, aud=None):
def validate_and_decode(self, token, iss=None, aud=None, exp=True):
"""
Check if a platform token is valid, and return allowed scopes.
Expand All @@ -208,7 +206,9 @@ def validate_and_decode(self, token, iss=None, aud=None):
algorithms=['RS256', 'RS512'],
options={
'verify_signature': True,
'verify_aud': True if aud else False
'verify_exp': bool(exp),
'verify_iss': bool(iss),
'verify_aud': bool(aud)
}
)
return message
Expand Down
17 changes: 8 additions & 9 deletions lti_consumer/lti_1p3/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import ddt
import jwt
import sys
from Cryptodome.PublicKey import RSA
from django.conf import settings
from django.test.testcases import TestCase
Expand Down Expand Up @@ -115,30 +114,30 @@ def _get_lti_message(

def _decode_token(self, token):
"""
Checks for a valid signarute and decodes JWT signed LTI message
Checks for a valid signature and decodes JWT signed LTI message
This also tests the public keyset function.
"""
public_keyset = self.lti_consumer.get_public_keyset()
keyset = PyJWKSet.from_dict(public_keyset).keys

for i in range(len(keyset)):
for i, obj in enumerate(keyset):
try:
message = jwt.decode(
token,
key=keyset[i].key,
key=obj.key,
algorithms=['RS256', 'RS512'],
options={
'verify_signature': True,
'verify_aud': False
}
)
return message
except Exception as token_error:
if i < len(keyset) - 1:
continue
exc_info = sys.exc_info()
raise jwt.InvalidTokenError(exc_info[2]) from token_error
except Exception: # pylint: disable=broad-except
if i == len(keyset) - 1:
raise

return exceptions.NoSuitableKeys()

@ddt.data(
({"client_id": CLIENT_ID, "redirect_uri": LAUNCH_URL, "nonce": STATE, "state": STATE}, True),
Expand Down
95 changes: 44 additions & 51 deletions lti_consumer/lti_1p3/tests/test_key_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@
Unit tests for LTI 1.3 consumer implementation
"""

import json
import math
import time
from datetime import datetime, timezone
from unittest.mock import patch

import ddt
import jwt
from Cryptodome.PublicKey import RSA
from django.test.testcases import TestCase
from jwkest import BadSignature
from jwkest.jwk import RSAKey, load_jwks
from jwkest.jws import JWS, NoSuitableSigningKeys, UnknownAlgorithm


from lti_consumer.lti_1p3 import exceptions
from lti_consumer.lti_1p3.key_handlers import PlatformKeyHandler, ToolKeyHandler
Expand All @@ -39,16 +35,13 @@ def setUp(self):
kid=self.rsa_key_id
)

def _decode_token(self, token):
def _decode_token(self, token, exp=True):
"""
Checks for a valid signarute and decodes JWT signed LTI message
Checks for a valid signature and decodes JWT signed LTI message
This also touches the public keyset method.
"""
public_keyset = self.key_handler.get_public_jwk()
key_set = load_jwks(json.dumps(public_keyset))

return JWS().verify_compact(token, keys=key_set)
return self.key_handler.validate_and_decode(token, exp=exp)

def test_encode_and_sign(self):
"""
Expand All @@ -59,7 +52,7 @@ def test_encode_and_sign(self):
}
signed_token = self.key_handler.encode_and_sign(message)
self.assertEqual(
self._decode_token(signed_token),
self._decode_token(signed_token, exp=False),
message
)

Expand All @@ -72,44 +65,44 @@ def test_encode_and_sign_with_exp(self, mock_time):
message = {
"test": "test"
}

expiration = int(datetime.now(tz=timezone.utc).timestamp())
signed_token = self.key_handler.encode_and_sign(
message,
expiration=1000
expiration=expiration
)

self.assertEqual(
self._decode_token(signed_token),
{
"test": "test",
"iat": 1000,
"exp": 2000
"exp": expiration + 1000
}
)

def test_encode_and_sign_no_suitable_keys(self):
"""
Test if an exception is raised when there are no suitable keys when signing the JWT.
"""
message = {
"test": "test"
}

with patch('lti_consumer.lti_1p3.key_handlers.JWS.sign_compact', side_effect=NoSuitableSigningKeys):
with self.assertRaises(exceptions.NoSuitableKeys):
self.key_handler.encode_and_sign(message)

def test_encode_and_sign_unknown_algorithm(self):
"""
Test if an exception is raised when the signing algorithm is unknown when signing the JWT.
"""
message = {
"test": "test"
}

with patch('lti_consumer.lti_1p3.key_handlers.JWS.sign_compact', side_effect=UnknownAlgorithm):
with self.assertRaises(exceptions.MalformedJwtToken):
self.key_handler.encode_and_sign(message)
# def test_encode_and_sign_no_suitable_keys(self):
# """
# Test if an exception is raised when there are no suitable keys when signing the JWT.
# """
# message = {
# "test": "test"
# }

# with patch('lti_consumer.lti_1p3.key_handlers.JWS.sign_compact', side_effect=NoSuitableSigningKeys):
# with self.assertRaises(exceptions.NoSuitableKeys):
# self.key_handler.encode_and_sign(message)

# def test_encode_and_sign_unknown_algorithm(self):
# """
# Test if an exception is raised when the signing algorithm is unknown when signing the JWT.
# """
# message = {
# "test": "test"
# }

# with patch('lti_consumer.lti_1p3.key_handlers.JWS.sign_compact', side_effect=UnknownAlgorithm):
# with self.assertRaises(exceptions.MalformedJwtToken):
# self.key_handler.encode_and_sign(message)

def test_invalid_rsa_key(self):
"""
Expand Down Expand Up @@ -318,20 +311,20 @@ def test_validate_and_decode_no_keys(self):
signed = create_jwt(self.key, message)

# Decode and check results
with self.assertRaises(jwt.InvalidTokenError):
with self.assertRaises(exceptions.NoSuitableKeys):
key_handler.validate_and_decode(signed)

@patch("lti_consumer.lti_1p3.key_handlers.jwt.decode")
def test_validate_and_decode_bad_signature(self, mock_jwt_decode):
mock_jwt_decode.side_effect = Exception()
self._setup_key_handler()
# @patch("lti_consumer.lti_1p3.key_handlers.jwt.decode")
# def test_validate_and_decode_bad_signature(self, mock_jwt_decode):
# mock_jwt_decode.side_effect = BadSignature()
# self._setup_key_handler()

message = {
"test": "test_message",
"iat": 1000,
"exp": 1200,
}
signed = create_jwt(self.key, message)
# message = {
# "test": "test_message",
# "iat": 1000,
# "exp": 1200,
# }
# signed = create_jwt(self.key, message)

with self.assertRaises(jwt.InvalidTokenError):
self.key_handler.validate_and_decode(signed)
# with self.assertRaises(exceptions.BadJwtSignature):
# self.key_handler.validate_and_decode(signed)
16 changes: 9 additions & 7 deletions lti_consumer/plugin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,22 +469,24 @@ def access_token_endpoint(
))
)
return JsonResponse(token)
except Exception as token_error:
except Exception: # pylint: disable=broad-except
exc_info = sys.exc_info()

# Handle errors and return a proper response
if exc_info[0] == MissingRequiredClaim:
# Missing request attributes
return JsonResponse({"error": "invalid_request"}, status=HTTP_400_BAD_REQUEST)
elif exc_info[0] in (MalformedJwtToken, TokenSignatureExpired, jwt.InvalidTokenError):
elif exc_info[0] in (MalformedJwtToken, TokenSignatureExpired, jwt.exceptions.DecodeError):
# Triggered when a invalid grant token is used
return JsonResponse({"error": "invalid_grant"}, status=HTTP_400_BAD_REQUEST)
elif exc_info[0] == UnsupportedGrantType:
return JsonResponse({"error": "unsupported_grant_type"}, status=HTTP_400_BAD_REQUEST)
else:
elif exc_info[0] in (NoSuitableKeys, UnknownClientId, jwt.exceptions.InvalidSignatureError):
# Client ID is not registered in the block or
# isn't possible to validate token using available keys.
return JsonResponse({"error": "invalid_client"}, status=HTTP_400_BAD_REQUEST)
elif exc_info[0] == UnsupportedGrantType:
return JsonResponse({"error": "unsupported_grant_type"}, status=HTTP_400_BAD_REQUEST)
else:
return JsonResponse({"error": "unidentified_error"}, status=HTTP_400_BAD_REQUEST)


# Post from external tool that doesn't
Expand Down Expand Up @@ -565,7 +567,7 @@ def deep_linking_response_endpoint(request, lti_config_id=None):
status=400
)
# Bad JWT message, invalid token, or any other message validation issues
except (Lti1p3Exception, PermissionDenied) as exc:
except (Lti1p3Exception, PermissionDenied, jwt.exceptions.DecodeError) as exc:
log.warning(
"Permission on LTI Config %r denied for user %r: %s",
lti_config,
Expand Down Expand Up @@ -865,7 +867,7 @@ def start_proctoring_assessment_endpoint(request):

try:
decoded_jwt = jwt.decode(token, options={'verify_signature': False})
except Exception:
except Exception: # pylint: disable=broad-except
return render(request, 'html/lti_proctoring_start_error.html', status=HTTP_400_BAD_REQUEST)

iss = decoded_jwt.get('iss')
Expand Down
6 changes: 0 additions & 6 deletions lti_consumer/tests/unit/plugin/test_proctoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from django.contrib.auth import get_user_model
from django.test.testcases import TestCase
from edx_django_utils.cache import TieredCache, get_cache_key
from jwkest.jwk import RSAKey
from jwkest.jwt import BadSyntax

from lti_consumer.data import Lti1p3LaunchData, Lti1p3ProctoringLaunchData
from lti_consumer.lti_1p3.exceptions import (BadJwtSignature, InvalidClaimValue, MalformedJwtToken,
Expand Down Expand Up @@ -45,10 +43,6 @@ def setUp(self):
# Set up a public key - private key pair that allows encoding and decoding a Tool JWT.
self.rsa_key_id = str(uuid.uuid4())
self.private_key = RSA.generate(2048)
self.key = RSAKey(
key=self.private_key,
kid=self.rsa_key_id
)
self.public_key = self.private_key.publickey().export_key().decode()

self.lti_config.lti_1p3_tool_public_key = self.public_key
Expand Down
1 change: 0 additions & 1 deletion lti_consumer/tests/unit/plugin/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from edx_django_utils.cache import TieredCache, get_cache_key

from Cryptodome.PublicKey import RSA
from jwkest.jwk import RSAKey
from opaque_keys.edx.keys import UsageKey
from lti_consumer.data import Lti1p3LaunchData, Lti1p3ProctoringLaunchData
from lti_consumer.models import LtiConfiguration, LtiDlContentItem
Expand Down
Loading

0 comments on commit 76f5112

Please sign in to comment.