From 94d8cc37f102799d9d6e14002e220eb35eb8644b Mon Sep 17 00:00:00 2001 From: nannan00 <17491932+nannan00@users.noreply.github.com> Date: Wed, 15 Nov 2023 12:14:40 +0800 Subject: [PATCH] feat(bkuser): idp cur api --- src/bk-user/bkuser/apis/login/views.py | 6 +- src/bk-user/bkuser/apis/web/idp/__init__.py | 10 + .../bkuser/apis/web/idp/serializers.py | 182 ++++++++++++++++++ src/bk-user/bkuser/apis/web/idp/urls.py | 22 +++ src/bk-user/bkuser/apis/web/idp/views.py | 180 +++++++++++++++++ src/bk-user/bkuser/apis/web/urls.py | 7 +- src/bk-user/bkuser/apps/idp/handlers.py | 4 +- src/bk-user/bkuser/apps/idp/models.py | 26 +++ src/bk-user/bkuser/common/error_codes.py | 3 + src/idp-plugins/idp_plugins/base.py | 10 + 10 files changed, 443 insertions(+), 7 deletions(-) create mode 100644 src/bk-user/bkuser/apis/web/idp/__init__.py create mode 100644 src/bk-user/bkuser/apis/web/idp/serializers.py create mode 100644 src/bk-user/bkuser/apis/web/idp/urls.py create mode 100644 src/bk-user/bkuser/apis/web/idp/views.py diff --git a/src/bk-user/bkuser/apis/login/views.py b/src/bk-user/bkuser/apis/login/views.py index c25e4148d..cc68ad862 100644 --- a/src/bk-user/bkuser/apis/login/views.py +++ b/src/bk-user/bkuser/apis/login/views.py @@ -16,7 +16,7 @@ from rest_framework.response import Response from bkuser.apps.data_source.models import DataSourceUser, LocalDataSourceIdentityInfo -from bkuser.apps.idp.data_models import DataSourceMatchRuleList, convert_match_rules_to_queryset_filter +from bkuser.apps.idp.data_models import convert_match_rules_to_queryset_filter from bkuser.apps.idp.models import Idp from bkuser.apps.tenant.models import Tenant, TenantUser from bkuser.common.error_codes import error_codes @@ -134,13 +134,11 @@ def post(self, request, *args, **kwargs): # 一般社会化登录都得通过绑定匹配方式,比如QQ,用户得先绑定后才能使用QQ登录 # 直接匹配,一般是企业身份登录方式, # 比如企业内部SAML2.0登录,认证后获取到的用户字段,能直接与数据源里的用户数据字段匹配 - # 认证源与数据源的匹配规则 - data_source_match_rules = DataSourceMatchRuleList.validate_python(idp.data_source_match_rules) # 将规则转换为Django Queryset 过滤条件, 不同用户之间过滤逻辑是OR conditions = [ condition for userinfo in data["idp_users"] - if (condition := convert_match_rules_to_queryset_filter(data_source_match_rules, userinfo)) + if (condition := convert_match_rules_to_queryset_filter(idp.data_source_match_rule_objs, userinfo)) ] # 查询数据源用户 diff --git a/src/bk-user/bkuser/apis/web/idp/__init__.py b/src/bk-user/bkuser/apis/web/idp/__init__.py new file mode 100644 index 000000000..1060b7bf4 --- /dev/null +++ b/src/bk-user/bkuser/apis/web/idp/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/src/bk-user/bkuser/apis/web/idp/serializers.py b/src/bk-user/bkuser/apis/web/idp/serializers.py new file mode 100644 index 000000000..acf84f85e --- /dev/null +++ b/src/bk-user/bkuser/apis/web/idp/serializers.py @@ -0,0 +1,182 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from typing import List, Dict, Any + +from django.utils.translation import gettext_lazy as _ +from drf_yasg.utils import swagger_serializer_method +from pydantic import ValidationError as PDValidationError +from rest_framework import serializers +from rest_framework.exceptions import ValidationError + +from bkuser.apps.idp.constants import IdpStatus +from bkuser.apps.idp.models import Idp, IdpPlugin +from bkuser.apps.data_source.models import DataSource +from bkuser.apps.tenant.models import TenantUserCustomField, UserBuiltinField +from bkuser.idp_plugins.constants import BuiltinIdpPluginEnum +from bkuser.idp_plugins.base import get_plugin_cfg_cls +from bkuser.utils.pydantic import stringify_pydantic_error + + +class IdpPluginOutputSLZ(serializers.Serializer): + id = serializers.CharField(help_text="认证源插件唯一标识") + name = serializers.CharField(help_text="认证源插件名称") + description = serializers.CharField(help_text="认证源插件描述") + logo = serializers.CharField(help_text="认证源插件 Logo") + + +class IdpSearchInputSLZ(serializers.Serializer): + keyword = serializers.CharField(help_text="搜索关键字", required=False) + + +class IdpSearchOutputSLZ(serializers.Serializer): + id = serializers.CharField(help_text="认证源唯一标识") + name = serializers.CharField(help_text="认证源名称") + status = serializers.ChoiceField(help_text="认证源状态", choices=IdpStatus.get_choices()) + updater = serializers.CharField(help_text="更新者") + updated_at = serializers.CharField(help_text="更新时间", source="updated_at_display") + plugin = IdpPluginOutputSLZ(help_text="认证源插件") + matched_data_sources = serializers.SerializerMethodField(help_text="匹配的数据源列表") + + @swagger_serializer_method( + serializer_or_field=serializers.ListField( + help_text="匹配的数据源", + child=serializers.CharField(), + allow_empty=True, + ) + ) + def get_matched_data_sources(self, obj: Idp) -> List[str]: + data_source_name_map = self.context["data_source_name_map"] + + return [ + data_source_name_map[r.data_source_id] + for r in obj.data_source_match_rule_objs + if r.data_source_id in data_source_name_map + ] + + +def _validate_duplicate_idp_name(name: str, tenant_id: str, idp_id: str = "") -> str: + """校验IDP 是否重名""" + queryset = Idp.objects.filter(name=name, owner_tenant_id=tenant_id) + # 过滤掉自身名称 + if idp_id: + queryset.exclude(id=idp_id) + + if queryset.exists(): + raise ValidationError(_("同名认证源已存在")) + + return name + + +class FieldCompareRuleSLZ(serializers.Serializer): + source_field = serializers.CharField(help_text="认证源原始字段") + target_field = serializers.CharField(help_text="匹配的数据源字段") + + +class DataSourceMatchRuleSLZ(serializers.Serializer): + data_source_id = serializers.IntegerField(help_text="数据源 ID") + field_compare_rules = serializers.ListField( + help_text="字段比较规则", child=FieldCompareRuleSLZ(), allow_empty=False, min_length=1 + ) + + def validate(self, attrs: Dict[str, Any]) -> Dict[str, Any]: + # 数据源是否当前租户的 + tenant_id = self.context["tenant_id"] + if not DataSource.objects.filter(id=attrs["data_source_id"], owner_tenant_id=tenant_id).exists(): + raise ValidationError(_("数据源必须是当前租户下的,{} 并不符合").format(attrs["data_source_id"])) + + # 匹配的数据源字段必须是当前租户的用户字段,包括内建字段和自定义字段 + builtin_fields = set(UserBuiltinField.objects.all().values_list("name", flat=True)) + custom_fields = set(TenantUserCustomField.objects.filter(tenant_id=tenant_id).values_list("name", flat=True)) + allowed_target_fields = builtin_fields | custom_fields + + target_fields = {r.get("target_field") for r in attrs["field_compare_rules"]} + if not_found_fields := target_fields - allowed_target_fields: + raise ValidationError(_("匹配的数据源字段 {} 不属于用户自定义字段或内置字段").format(not_found_fields)) + + return attrs + + +class IdpCreateInputSLZ(serializers.Serializer): + name = serializers.CharField(help_text="认证源名称", max_length=128) + plugin_id = serializers.CharField(help_text="认证源插件 ID") + plugin_config = serializers.JSONField(help_text="认证源插件配置") + data_source_match_rules = serializers.ListField( + help_text="数据源匹配规则", child=DataSourceMatchRuleSLZ(), allow_empty=True, default=list + ) + + def validate_name(self, name: str) -> str: + return _validate_duplicate_idp_name(name, self.context["tenant_id"]) + + def validate_plugin_id(self, plugin_id: str) -> str: + if not IdpPlugin.objects.filter(id=plugin_id).exists(): + raise ValidationError(_("认证源插件不存在")) + + if plugin_id == BuiltinIdpPluginEnum.LOCAL: + raise ValidationError(_("不允许创建本地账密认证源")) + + return plugin_id + + def validate(self, attrs: Dict[str, Any]) -> Dict[str, Any]: + plugin_id = attrs["plugin_id"] + + try: + cfg_cls = get_plugin_cfg_cls(plugin_id) + except NotImplementedError: + raise ValidationError(_("认证源插件 {} 不存在").format(plugin_id)) + + try: + attrs["plugin_config"] = cfg_cls(**attrs["plugin_config"]).model_dump() + except PDValidationError as e: + raise ValidationError(_("认证源插件配置不合法:{}").format(stringify_pydantic_error(e))) + + return attrs + + +class IdpCreateOutputSLZ(serializers.Serializer): + id = serializers.CharField(help_text="认证源 ID") + callback_uri = serializers.CharField(help_text="回调地址") + + +class IdpRetrieveOutputSLZ(serializers.Serializer): + id = serializers.CharField(help_text="认证源唯一标识") + name = serializers.CharField(help_text="认证源名称") + owner_tenant_id = serializers.CharField(help_text="认证源所属租户 ID") + status = serializers.ChoiceField(help_text="认证源状态", choices=IdpStatus.get_choices()) + plugin = IdpPluginOutputSLZ(help_text="认证源插件") + plugin_config = serializers.JSONField(help_text="认证源插件配置") + data_source_match_rules = serializers.JSONField(help_text="数据源匹配规则", default=list) + callback_uri = serializers.CharField(help_text="回调地址") + + +class IdpPartialUpdateInputSLZ(serializers.Serializer): + name = serializers.CharField(help_text="认证源名称") + + def validate_name(self, name: str) -> str: + return _validate_duplicate_idp_name(name, self.context["tenant_id"], self.context["idp_id"]) + + +class IdpUpdateInputSLZ(serializers.Serializer): + name = serializers.CharField(help_text="认证源名称", max_length=128) + plugin_config = serializers.JSONField(help_text="认证源插件配置") + data_source_match_rules = serializers.ListField( + help_text="数据源匹配规则", child=DataSourceMatchRuleSLZ(), allow_empty=True, default=list + ) + + def validate_name(self, name: str) -> str: + return _validate_duplicate_idp_name(name, self.context["tenant_id"], self.context["idp_id"]) + + def validate_plugin_config(self, plugin_config: Dict[str, Any]) -> Dict[str, Any]: + cfg_cls = get_plugin_cfg_cls(self.context["plugin_id"]) + + try: + return cfg_cls(**plugin_config).model_dump() + except PDValidationError as e: + raise ValidationError(_("认证源插件配置不合法:{}").format(stringify_pydantic_error(e))) diff --git a/src/bk-user/bkuser/apis/web/idp/urls.py b/src/bk-user/bkuser/apis/web/idp/urls.py new file mode 100644 index 000000000..5ea870f6a --- /dev/null +++ b/src/bk-user/bkuser/apis/web/idp/urls.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from django.urls import path + +from . import views + +urlpatterns = [ + # 认证源插件列表 + path("plugins/", views.IdpPluginListApi.as_view(), name="idp_plugin.list"), + # 认证源创建/获取列表 + path("", views.IdpListCreateApi.as_view(), name="idp.list_create"), + # 认证源获取/更新 + path("/", views.IdpRetrieveUpdateApi.as_view(), name="idp.retrieve_update"), +] diff --git a/src/bk-user/bkuser/apis/web/idp/views.py b/src/bk-user/bkuser/apis/web/idp/views.py new file mode 100644 index 000000000..e8cea0228 --- /dev/null +++ b/src/bk-user/bkuser/apis/web/idp/views.py @@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from django.db import transaction +from django.utils.translation import gettext_lazy as _ +from drf_yasg.utils import swagger_auto_schema +from rest_framework import generics, status +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +from bkuser.apps.idp.models import Idp, IdpPlugin +from bkuser.apps.data_source.models import DataSource +from bkuser.apis.web.mixins import CurrentUserTenantMixin +from bkuser.apps.permission.constants import PermAction +from bkuser.apps.permission.permissions import perm_class +from bkuser.idp_plugins.base import get_plugin_cfg_schema_map +from bkuser.common.error_codes import error_codes +from .serializers import ( + IdpPluginOutputSLZ, + IdpSearchInputSLZ, + IdpSearchOutputSLZ, + IdpCreateInputSLZ, + IdpCreateOutputSLZ, + IdpRetrieveOutputSLZ, + IdpPartialUpdateInputSLZ, + IdpUpdateInputSLZ, +) + + +class IdpPluginListApi(generics.ListAPIView): + queryset = IdpPlugin.objects.all() + pagination_class = None + serializer_class = IdpPluginOutputSLZ + + @swagger_auto_schema( + tags=["idp_plugin"], + operation_description="认证源插件列表", + responses={status.HTTP_200_OK: IdpPluginOutputSLZ(many=True)}, + ) + def get(self, request, *args, **kwargs): + return self.list(request, *args, **kwargs) + + +class IdpListCreateApi(CurrentUserTenantMixin, generics.ListCreateAPIView): + permission_classes = [IsAuthenticated, perm_class(PermAction.MANAGE_TENANT)] + + pagination_class = None + serializer_class = IdpSearchOutputSLZ + + def get_serializer_context(self): + # TODO 目前未支持数据源跨租户协助,所以只查询本租户数据源 + data_source_name_map = dict( + DataSource.objects.filter(owner_tenant_id=self.get_current_tenant_id()).values_list("id", "name") + ) + return {"data_source_name_map": data_source_name_map} + + def get_queryset(self): + slz = IdpSearchInputSLZ(data=self.request.query_params) + slz.is_valid(raise_exception=True) + data = slz.validated_data + + queryset = Idp.objects.filter(owner_tenant_id=self.get_current_tenant_id()) + if kw := data.get("keyword"): + queryset = queryset.filter(name__icontains=kw) + + # 关联查询插件 + queryset.select_related("plugin") + + return queryset + + @swagger_auto_schema( + tags=["idp"], + operation_description="认证源列表", + query_serializer=IdpSearchInputSLZ(), + responses={status.HTTP_200_OK: IdpSearchOutputSLZ(many=True)}, + ) + def get(self, request, *args, **kwargs): + return self.list(request, *args, **kwargs) + + @swagger_auto_schema( + tags=["idp"], + operation_description="新建认证源", + request_body=IdpCreateInputSLZ(), + responses={status.HTTP_201_CREATED: IdpCreateOutputSLZ(), **get_plugin_cfg_schema_map()}, + ) + def post(self, request, *args, **kwargs): + current_tenant_id = self.get_current_tenant_id() + slz = IdpCreateInputSLZ(data=request.data, context={"tenant_id": current_tenant_id}) + slz.is_valid(raise_exception=True) + data = slz.validated_data + + current_user = request.user.username + plugin = IdpPlugin.objects.get(id=data["plugin_id"]) + + with transaction.atomic(): + idp = Idp.objects.create( + name=data["name"], + owner_tenant_id=current_tenant_id, + plugin=plugin, + plugin_config=data["plugin_config"], + data_source_match_rules=data["data_source_match_rules"], + creator=current_user, + updater=current_user, + ) + + return Response(IdpCreateOutputSLZ(instance=idp).data, status=status.HTTP_201_CREATED) + + +class IdpRetrieveUpdateApi(CurrentUserTenantMixin, generics.RetrieveUpdateAPIView): + permission_classes = [IsAuthenticated, perm_class(PermAction.MANAGE_TENANT)] + + serializer_class = IdpRetrieveOutputSLZ + lookup_url_kwarg = "id" + + def get_queryset(self): + return Idp.objects.filter(owner_tenant_id=self.get_current_tenant_id()) + + @swagger_auto_schema( + tags=["idp"], + operation_description="认证源详情", + responses={ + status.HTTP_200_OK: IdpRetrieveOutputSLZ(), + **get_plugin_cfg_schema_map(), + }, + ) + def get(self, request, *args, **kwargs): + return self.retrieve(request, *args, **kwargs) + + @swagger_auto_schema( + tags=["idp"], + operation_description="更新认证源部分字段", + request_body=IdpPartialUpdateInputSLZ(), + responses={status.HTTP_204_NO_CONTENT: ""}, + ) + def patch(self, request, *args, **kwargs): + idp = self.get_object() + current_tenant_id = self.get_current_tenant_id() + slz = IdpPartialUpdateInputSLZ(data=request.data, context={"tenant_id": current_tenant_id, "idp_id": idp.id}) + slz.is_valid(raise_exception=True) + data = slz.validated_data + + idp.name = data["name"] + idp.updater = request.user.username + idp.save(update_fields=["name", "updater", "updated_at"]) + + return Response(status=status.HTTP_204_NO_CONTENT) + + @swagger_auto_schema( + tags=["idp"], + operation_description="更新认证源", + request_body=IdpUpdateInputSLZ(), + responses={status.HTTP_204_NO_CONTENT: "", **get_plugin_cfg_schema_map()}, + ) + def put(self, request, *args, **kwargs): + idp = self.get_object() + if idp.is_local: + raise error_codes.CANNOT_UPDATE_IDP.f(_("本地账密认证源不允许更新配置")) + + current_tenant_id = self.get_current_tenant_id() + slz = IdpUpdateInputSLZ( + data=request.data, + context={"tenant_id": current_tenant_id, "idp_id": idp.id, "plugin_id": idp.plugin_id}, + ) + slz.is_valid(raise_exception=True) + data = slz.validated_data + + idp.name = data["name"] + idp.plugin_config = data["plugin_config"] + idp.data_source_match_rules = data["data_source_match_rules"] + idp.updater = request.user.username + idp.save(update_fields=["name", "plugin_config", "data_source_match_rules", "updater", "updated_at"]) + + return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/src/bk-user/bkuser/apis/web/urls.py b/src/bk-user/bkuser/apis/web/urls.py index 6543bde2b..2991a7f10 100644 --- a/src/bk-user/bkuser/apis/web/urls.py +++ b/src/bk-user/bkuser/apis/web/urls.py @@ -15,10 +15,15 @@ path("basic/", include("bkuser.apis.web.basic.urls")), # 租户 path("tenants/", include("bkuser.apis.web.tenant.urls")), + # 租户组织架构 path("tenant-organization/", include("bkuser.apis.web.organization.urls")), + # 数据源 & 数据源用户/部门 path("data-sources/", include("bkuser.apis.web.data_source.urls")), path("data-sources/", include("bkuser.apis.web.data_source_organization.urls")), + # 认证源 + path("idps/", include("bkuser.apis.web.idp.urls")), + # 租户配置 + path("tenant-setting/", include("bkuser.apis.web.tenant_setting.urls")), # 个人中心 path("personal-center/", include("bkuser.apis.web.personal_center.urls")), - path("tenant-setting/", include("bkuser.apis.web.tenant_setting.urls")), ] diff --git a/src/bk-user/bkuser/apps/idp/handlers.py b/src/bk-user/bkuser/apps/idp/handlers.py index 10e51fd6a..7d72673e6 100644 --- a/src/bk-user/bkuser/apps/idp/handlers.py +++ b/src/bk-user/bkuser/apps/idp/handlers.py @@ -20,7 +20,7 @@ from bkuser.plugins.local.models import LocalDataSourcePluginConfig from .constants import IdpStatus -from .data_models import DataSourceMatchRuleList, gen_data_source_match_rule_of_local +from .data_models import gen_data_source_match_rule_of_local from .models import Idp, IdpPlugin @@ -54,7 +54,7 @@ def _update_local_idp_of_tenant(data_source: DataSource): # 根据数据源是否使用账密登录,修改认证源配置 idp_plugin_cfg = LocalIdpPluginConfig(**idp.plugin_config) - data_source_match_rules = DataSourceMatchRuleList.validate_python(idp.data_source_match_rules) + data_source_match_rules = idp.data_source_match_rule_objs # 对于启用登录,则需要添加进配置 if enable_login and data_source.id not in idp_plugin_cfg.data_source_ids: diff --git a/src/bk-user/bkuser/apps/idp/models.py b/src/bk-user/bkuser/apps/idp/models.py index 8edf5e864..f5865589f 100644 --- a/src/bk-user/bkuser/apps/idp/models.py +++ b/src/bk-user/bkuser/apps/idp/models.py @@ -8,12 +8,19 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +from typing import List +from urllib.parse import urljoin + from django.db import models +from django.conf import settings from bkuser.common.models import AuditedModel from bkuser.utils.uuid import generate_uuid from .constants import IdpStatus +from .data_models import DataSourceMatchRuleList, DataSourceMatchRule +from bkuser.idp_plugins.base import get_plugin_type +from bkuser.idp_plugins.constants import PluginTypeEnum, BuiltinIdpPluginEnum class IdpPlugin(models.Model): @@ -45,3 +52,22 @@ class Meta: unique_together = [ ("name", "owner_tenant_id"), ] + + @property + def is_local(self) -> bool: + """检查类型是否为本地账密认证源""" + return self.plugin.id == BuiltinIdpPluginEnum.LOCAL + + @property + def data_source_match_rule_objs(self) -> List[DataSourceMatchRule]: + """转换为规则对象列表""" + return DataSourceMatchRuleList.validate_python(self.data_source_match_rules) + + @property + def callback_uri(self) -> str: + plugin_type = get_plugin_type(self.plugin.id) + # 联邦登录才有回调地址 + if plugin_type == PluginTypeEnum.FEDERATION: + return urljoin(settings.BK_LOGIN_URL, f"auth/idps/{self.id}/actions/callback/") + + return "" diff --git a/src/bk-user/bkuser/common/error_codes.py b/src/bk-user/bkuser/common/error_codes.py index 6770abeb0..27612e9d4 100644 --- a/src/bk-user/bkuser/common/error_codes.py +++ b/src/bk-user/bkuser/common/error_codes.py @@ -84,6 +84,9 @@ class ErrorCodes: DATA_SOURCE_IMPORT_FAILED = ErrorCode(_("数据源导入失败")) CREATE_DATA_SOURCE_SYNC_TASK_FAILED = ErrorCode(_("创建数据源同步任务失败")) + # 认证源 + CANNOT_UPDATE_IDP = ErrorCode(_("该认证源不允许更新配置")) + # 租户 CREATE_TENANT_FAILED = ErrorCode(_("租户创建失败")) UPDATE_TENANT_FAILED = ErrorCode(_("租户更新失败")) diff --git a/src/idp-plugins/idp_plugins/base.py b/src/idp-plugins/idp_plugins/base.py index 5c4e485f5..789abb0b0 100644 --- a/src/idp-plugins/idp_plugins/base.py +++ b/src/idp-plugins/idp_plugins/base.py @@ -12,7 +12,9 @@ from abc import ABC, abstractmethod from typing import Any, Dict, List, Type +from bkuser.utils.pydantic import gen_openapi_schema from django.http import HttpRequest, HttpResponse, HttpResponseNotFound +from drf_yasg import openapi from pydantic import BaseModel from .constants import CUSTOM_PLUGIN_ID_PREFIX, AllowedHttpMethodEnum, BuiltinIdpPluginIDs, PluginTypeEnum @@ -163,3 +165,11 @@ def get_plugin_type(plugin_id: str) -> PluginTypeEnum: f"plugin class({plugin_cls.__name__}) must is a subclass of " f"{BaseCredentialIdpPlugin.__name__} or {BaseFederationIdpPlugin.__name__}" ) + + +def get_plugin_cfg_schema_map() -> Dict[str, openapi.Schema]: + """获取插件配置类 JsonSchema 映射表""" + return { + f"plugin_config:{plugin_id}": gen_openapi_schema(plugin_cls.config_class) + for plugin_id, plugin_cls in _plugin_cls_map.items() + }