Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: general data source sync plugin #1292

Merged
7 changes: 7 additions & 0 deletions src/bk-user/bin/start_celery_beat.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

# 设置环境变量
CELERY_LOG_LEVEL=${CELERY_LOG_LEVEL:-info}

# Run!
celery -A bkuser.celery beat -l ${CELERY_LOG_LEVEL} --scheduler django_celery_beat.schedulers:DatabaseScheduler
68 changes: 52 additions & 16 deletions src/bk-user/bkuser/apis/web/data_source/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
from bkuser.apps.data_source.constants import FieldMappingOperation
from bkuser.apps.data_source.models import DataSource, DataSourcePlugin
from bkuser.apps.tenant.models import TenantUserCustomField, UserBuiltinField
from bkuser.plugins.base import get_plugin_cfg_cls
from bkuser.plugins.base import get_plugin_cfg_cls, is_plugin_exists
from bkuser.plugins.constants import DataSourcePluginEnum
from bkuser.plugins.models import DataSourceSyncConfig
from bkuser.utils.pydantic import stringify_pydantic_error

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -76,6 +77,7 @@ class DataSourceCreateInputSLZ(serializers.Serializer):
field_mapping = serializers.ListField(
help_text="用户字段映射", child=DataSourceFieldMappingSLZ(), allow_empty=True, required=False, default=list
)
sync_config = serializers.JSONField(help_text="数据源同步配置", default=dict)

def validate_name(self, name: str) -> str:
if DataSource.objects.filter(name=name).exists():
Expand All @@ -101,16 +103,24 @@ def validate_field_mapping(self, field_mapping: List[Dict[str, Any]]) -> List[Di

return field_mapping

def validate_sync_config(self, sync_config: Dict[str, Any]) -> Dict[str, Any]:
try:
DataSourceSyncConfig(**sync_config)
except PDValidationError as e:
raise ValidationError(_("同步配置不合法:{}").format(stringify_pydantic_error(e)))
narasux marked this conversation as resolved.
Show resolved Hide resolved

return sync_config

def validate(self, attrs: Dict[str, Any]) -> Dict[str, Any]:
# 除本地数据源类型外,都需要配置字段映射
if attrs["plugin_id"] != DataSourcePluginEnum.LOCAL and not attrs["field_mapping"]:
plugin_id = attrs["plugin_id"]
if plugin_id != DataSourcePluginEnum.LOCAL and not attrs["field_mapping"]:
raise ValidationError(_("当前数据源类型必须配置字段映射"))

PluginConfigCls = get_plugin_cfg_cls(attrs["plugin_id"]) # noqa: N806
# 自定义插件,可能没有对应的配置类,不需要做格式检查
if not PluginConfigCls:
return attrs
if not is_plugin_exists(plugin_id):
raise ValidationError(_("数据源插件 {} 不存在").format(plugin_id))

PluginConfigCls = get_plugin_cfg_cls(plugin_id) # noqa: N806
try:
PluginConfigCls(**attrs["plugin_config"])
except PDValidationError as e:
Expand Down Expand Up @@ -150,13 +160,10 @@ class DataSourceUpdateInputSLZ(serializers.Serializer):
field_mapping = serializers.ListField(
help_text="用户字段映射", child=DataSourceFieldMappingSLZ(), allow_empty=True, required=False, default=list
)
sync_config = serializers.JSONField(help_text="数据源同步配置", default=dict)

def validate_plugin_config(self, plugin_config: Dict[str, Any]) -> Dict[str, Any]:
PluginConfigCls = get_plugin_cfg_cls(self.context["plugin_id"]) # noqa: N806
# 自定义插件,可能没有对应的配置类,不需要做格式检查
if not PluginConfigCls:
return plugin_config

try:
PluginConfigCls(**plugin_config)
except PDValidationError as e:
Expand All @@ -183,30 +190,59 @@ def validate_field_mapping(self, field_mapping: List[Dict]) -> List[Dict]:

return field_mapping

def validate_sync_config(self, sync_config: Dict[str, Any]) -> Dict[str, Any]:
try:
DataSourceSyncConfig(**sync_config)
except PDValidationError as e:
raise ValidationError(_("同步配置不合法:{}").format(stringify_pydantic_error(e)))

return sync_config


class DataSourceSwitchStatusOutputSLZ(serializers.Serializer):
status = serializers.CharField(help_text="数据源状态")


class RawDataSourceUserSLZ(serializers.Serializer):
id = serializers.CharField(help_text="用户 ID")
code = serializers.CharField(help_text="用户 ID")
narasux marked this conversation as resolved.
Show resolved Hide resolved
properties = serializers.JSONField(help_text="用户属性")
leaders = serializers.ListField(help_text="用户 leader ID 列表", child=serializers.CharField())
departments = serializers.ListField(help_text="用户部门 ID 列表", child=serializers.CharField())


class RawDataSourceDepartmentSLZ(serializers.Serializer):
id = serializers.CharField(help_text="部门 ID")
code = serializers.CharField(help_text="部门 ID")
name = serializers.CharField(help_text="部门名称")
parent = serializers.CharField(help_text="父部门 ID")


class DataSourceTestConnectionOutputSLZ(serializers.Serializer):
"""数据源连通性测试"""
class DataSourceTestConnectionInputSLZ(serializers.Serializer):
plugin_id = serializers.CharField(help_text="数据源插件 ID")
plugin_config = serializers.JSONField(help_text="数据源插件配置")

def validate(self, attrs: Dict[str, Any]) -> Dict[str, Any]:
plugin_id = attrs["plugin_id"]

if plugin_id == DataSourcePluginEnum.LOCAL:
raise ValidationError(_("本地数据源不支持连通性测试"))

if not is_plugin_exists(plugin_id):
raise ValidationError(_("数据源插件 {} 不存在").format(plugin_id))

PluginConfigCls = get_plugin_cfg_cls(plugin_id) # noqa: N806
try:
attrs["plugin_config"] = PluginConfigCls(**attrs["plugin_config"])
except PDValidationError as e:
raise ValidationError(_("插件配置不合法:{}").format(stringify_pydantic_error(e)))

return attrs


class DataSourceTestConnectionOutputSLZ(serializers.Serializer):
error_message = serializers.CharField(help_text="错误信息")
user = RawDataSourceUserSLZ(help_text="用户")
department = RawDataSourceDepartmentSLZ(help_text="部门")
extras = serializers.JSONField(help_text="额外信息")


class LocalDataSourceImportInputSLZ(serializers.Serializer):
Expand All @@ -217,8 +253,8 @@ class LocalDataSourceImportInputSLZ(serializers.Serializer):
incremental = serializers.BooleanField(help_text="是否使用增量同步", default=False)


class LocalDataSourceImportOutputSLZ(serializers.Serializer):
"""本地数据源导入结果"""
class DataSourceImportOrSyncOutputSLZ(serializers.Serializer):
"""数据源导入/同步结果"""

task_id = serializers.CharField(help_text="任务 ID")
status = serializers.CharField(help_text="任务状态")
Expand Down
12 changes: 6 additions & 6 deletions src/bk-user/bkuser/apis/web/data_source/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
),
# 数据源创建/获取列表
path("", views.DataSourceListCreateApi.as_view(), name="data_source.list_create"),
# 连通性测试
path(
"test-connection/",
views.DataSourceTestConnectionApi.as_view(),
name="data_source.test_connection",
),
narasux marked this conversation as resolved.
Show resolved Hide resolved
# 数据源更新/获取
path("<int:id>/", views.DataSourceRetrieveUpdateApi.as_view(), name="data_source.retrieve_update"),
# 数据源启/停
Expand All @@ -31,12 +37,6 @@
views.DataSourceSwitchStatusApi.as_view(),
name="data_source.switch_status",
),
# 连通性测试
path(
"<int:id>/operations/test_connection/",
views.DataSourceTestConnectionApi.as_view(),
name="data_source.test_connection",
),
# 获取用户信息导入模板
path(
"<int:id>/operations/download_template/",
Expand Down
86 changes: 54 additions & 32 deletions src/bk-user/bkuser/apis/web/data_source/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
from bkuser.apis.web.data_source.serializers import (
DataSourceCreateInputSLZ,
DataSourceCreateOutputSLZ,
DataSourceImportOrSyncOutputSLZ,
DataSourcePluginDefaultConfigOutputSLZ,
DataSourcePluginOutputSLZ,
DataSourceRetrieveOutputSLZ,
DataSourceSearchInputSLZ,
DataSourceSearchOutputSLZ,
DataSourceSwitchStatusOutputSLZ,
DataSourceTestConnectionInputSLZ,
DataSourceTestConnectionOutputSLZ,
DataSourceUpdateInputSLZ,
LocalDataSourceImportInputSLZ,
LocalDataSourceImportOutputSLZ,
)
from bkuser.apis.web.mixins import CurrentUserTenantMixin
from bkuser.apps.data_source.constants import DataSourceStatus
Expand All @@ -45,7 +46,8 @@
from bkuser.common.error_codes import error_codes
from bkuser.common.response import convert_workbook_to_response
from bkuser.common.views import ExcludePatchAPIViewMixin, ExcludePutAPIViewMixin
from bkuser.plugins.base import get_plugin_cfg_schema_map
from bkuser.plugins.base import get_plugin_cfg_schema_map, get_plugin_cls
from bkuser.plugins.constants import DataSourcePluginEnum

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -131,6 +133,7 @@ def post(self, request, *args, **kwargs):
plugin=DataSourcePlugin.objects.get(id=data["plugin_id"]),
plugin_config=data["plugin_config"],
field_mapping=data["field_mapping"],
sync_config=data["sync_config"],
creator=current_user,
updater=current_user,
)
Expand Down Expand Up @@ -185,6 +188,7 @@ def put(self, request, *args, **kwargs):
with transaction.atomic():
data_source.plugin_config = data["plugin_config"]
data_source.field_mapping = data["field_mapping"]
data_source.sync_config = data["sync_config"]
data_source.updater = request.user.username
data_source.save()

Expand All @@ -193,40 +197,30 @@ def put(self, request, *args, **kwargs):
return Response(status=status.HTTP_204_NO_CONTENT)


class DataSourceTestConnectionApi(CurrentUserTenantDataSourceMixin, generics.RetrieveAPIView):
class DataSourceTestConnectionApi(generics.CreateAPIView):
"""数据源连通性测试"""

serializer_class = DataSourceTestConnectionOutputSLZ

@swagger_auto_schema(
tags=["data_source"],
operation_description="数据源连通性测试",
request_body=DataSourceTestConnectionInputSLZ(),
responses={status.HTTP_200_OK: DataSourceTestConnectionOutputSLZ()},
)
def get(self, request, *args, **kwargs):
data_source = self.get_object()
if data_source.is_local:
raise error_codes.DATA_SOURCE_OPERATION_UNSUPPORTED

# TODO (su) 实现代码逻辑,需调用数据源插件以确认连通性
mock_data = {
"error_message": "",
"user": {
"id": "uid_2",
"properties": {
"username": "zhangSan",
},
"leaders": ["uid_0", "uid_1"],
"departments": ["dept_id_1"],
},
"department": {
"id": "dept_id_1",
"name": "dept_name",
"parent": "dept_id_0",
},
}
def post(self, request, *args, **kwargs):
slz = DataSourceTestConnectionInputSLZ(data=request.data)
slz.is_valid(raise_exception=True)
data = slz.validated_data

return Response(DataSourceTestConnectionOutputSLZ(instance=mock_data).data)
plugin_id = data["plugin_id"]
if plugin_id == DataSourcePluginEnum.LOCAL:
raise error_codes.DATA_SOURCE_OPERATION_UNSUPPORTED.f("本地数据源插件不支持连通性测试")

PluginCls = get_plugin_cls(plugin_id) # noqa: N806
result = PluginCls(data["plugin_config"]).test_connection()

return Response(DataSourceTestConnectionOutputSLZ(instance=result).data)


class DataSourceSwitchStatusApi(CurrentUserTenantDataSourceMixin, ExcludePutAPIViewMixin, generics.UpdateAPIView):
Expand Down Expand Up @@ -300,7 +294,7 @@ class DataSourceImportApi(CurrentUserTenantDataSourceMixin, generics.CreateAPIVi
tags=["data_source"],
operation_description="本地数据源用户数据导入",
request_body=LocalDataSourceImportInputSLZ(),
responses={status.HTTP_200_OK: LocalDataSourceImportOutputSLZ()},
responses={status.HTTP_200_OK: DataSourceImportOrSyncOutputSLZ()},
)
def post(self, request, *args, **kwargs):
"""从 Excel 导入数据源用户数据"""
Expand All @@ -316,13 +310,14 @@ def post(self, request, *args, **kwargs):
try:
workbook = openpyxl.load_workbook(data["file"])
except Exception: # pylint: disable=broad-except
logger.exception("本地数据源导入失败")
logger.exception("本地数据源 %s 导入失败", data_source.id)
raise error_codes.DATA_SOURCE_IMPORT_FAILED.f(_("文件格式异常"))

options = DataSourceSyncOptions(
operator=request.user.username,
overwrite=data["overwrite"],
incremental=data["incremental"],
# FIXME (su) 本地数据源导入也要改成异步行为,但是要解决 excel 如何传递的问题
async_run=False,
trigger=SyncTaskTrigger.MANUAL,
)
Expand All @@ -335,16 +330,43 @@ def post(self, request, *args, **kwargs):
raise error_codes.DATA_SOURCE_IMPORT_FAILED.f(str(e))

return Response(
LocalDataSourceImportOutputSLZ(
DataSourceImportOrSyncOutputSLZ(
instance={"task_id": task.id, "status": task.status, "summary": task.summary}
).data
)


class DataSourceSyncApi(generics.CreateAPIView):
class DataSourceSyncApi(CurrentUserTenantDataSourceMixin, generics.CreateAPIView):
"""数据源同步"""

@swagger_auto_schema(
tags=["data_source"],
operation_description="数据源数据同步",
responses={status.HTTP_200_OK: DataSourceImportOrSyncOutputSLZ()},
)
def post(self, request, *args, **kwargs):
"""触发数据源同步任务"""
# TODO (su) 实现代码逻辑,注意:本地数据源应该使用导入,而不是同步
return Response()
data_source = self.get_object()
if data_source.is_local:
raise error_codes.DATA_SOURCE_OPERATION_UNSUPPORTED.f(_("本地数据源不支持同步,请使用导入功能"))

# 同步策略:手动点击页面按钮,会触发全量覆盖的同步,且该同步是异步行为
options = DataSourceSyncOptions(
operator=request.user.username,
overwrite=True,
incremental=False,
async_run=True,
trigger=SyncTaskTrigger.MANUAL,
)

try:
task = DataSourceSyncManager(data_source, options).execute()
except Exception as e: # pylint: disable=broad-except
logger.exception("数据源 %s 同步失败", data_source.id)
raise error_codes.DATA_SOURCE_IMPORT_FAILED.f(str(e))

return Response(
DataSourceImportOrSyncOutputSLZ(
instance={"task_id": task.id, "status": task.status, "summary": task.summary}
).data
)
10 changes: 10 additions & 0 deletions src/bk-user/bkuser/apps/data_source/management/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
Loading
Loading