Skip to content

Commit

Permalink
pref: data source sync (#1272)
Browse files Browse the repository at this point in the history
  • Loading branch information
narasux authored Oct 7, 2023
1 parent 1686cab commit 9a8a34b
Show file tree
Hide file tree
Showing 19 changed files with 342 additions and 136 deletions.
34 changes: 27 additions & 7 deletions src/bk-user/bkuser/apis/web/data_source/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

from bkuser.apps.data_source.constants import FieldMappingOperation
from bkuser.apps.data_source.models import DataSource, DataSourcePlugin
from bkuser.plugins.constants import DATA_SOURCE_PLUGIN_CONFIG_CLASS_MAP, DataSourcePluginEnum
from bkuser.apps.tenant.models import TenantUserCustomField, UserBuiltinField
from bkuser.plugins.base import get_plugin_cfg_cls
from bkuser.plugins.constants import DataSourcePluginEnum
from bkuser.utils.pydantic import stringify_pydantic_error

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,10 +61,7 @@ def get_updated_at(self, obj: DataSource) -> str:


class DataSourceFieldMappingSLZ(serializers.Serializer):
"""
单个数据源字段映射
FIXME (su) 自定义字段实现后,需要检查:target_field 需是租户定义的,source_field 需是插件允许的
"""
"""单个数据源字段映射"""

source_field = serializers.CharField(help_text="数据源原始字段")
mapping_operation = serializers.ChoiceField(help_text="映射关系", choices=FieldMappingOperation.get_choices())
Expand Down Expand Up @@ -90,12 +89,24 @@ def validate_plugin_id(self, plugin_id: str) -> str:

return plugin_id

def validate_field_mapping(self, field_mapping: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
target_fields = {m.get("target_field") for m in field_mapping}
allowed_target_fields = list(UserBuiltinField.objects.all().values_list("name", flat=True)) + list(
TenantUserCustomField.objects.filter(tenant_id=self.context["tenant_id"]).values_list("name", flat=True)
)
if not_allowed_fields := target_fields - set(allowed_target_fields):
raise ValidationError(
_("字段映射中的目标字段 {} 不属于用户自定义字段或内置字段").format(not_allowed_fields),
)

return field_mapping

def validate(self, attrs: Dict[str, Any]) -> Dict[str, Any]:
# 除本地数据源类型外,都需要配置字段映射
if attrs["plugin_id"] != DataSourcePluginEnum.LOCAL and not attrs["field_mapping"]:
raise ValidationError(_("当前数据源类型必须配置字段映射"))

PluginConfigCls = DATA_SOURCE_PLUGIN_CONFIG_CLASS_MAP.get(attrs["plugin_id"]) # noqa: N806
PluginConfigCls = get_plugin_cfg_cls(attrs["plugin_id"]) # noqa: N806
# 自定义插件,可能没有对应的配置类,不需要做格式检查
if not PluginConfigCls:
return attrs
Expand Down Expand Up @@ -141,7 +152,7 @@ class DataSourceUpdateInputSLZ(serializers.Serializer):
)

def validate_plugin_config(self, plugin_config: Dict[str, Any]) -> Dict[str, Any]:
PluginConfigCls = DATA_SOURCE_PLUGIN_CONFIG_CLASS_MAP.get(self.context["plugin_id"]) # noqa: N806
PluginConfigCls = get_plugin_cfg_cls(self.context["plugin_id"]) # noqa: N806
# 自定义插件,可能没有对应的配置类,不需要做格式检查
if not PluginConfigCls:
return plugin_config
Expand All @@ -161,6 +172,15 @@ def validate_field_mapping(self, field_mapping: List[Dict]) -> List[Dict]:
if not field_mapping:
raise ValidationError(_("当前数据源类型必须配置字段映射"))

target_fields = {m.get("target_field") for m in field_mapping}
allowed_target_fields = list(UserBuiltinField.objects.all().values_list("name", flat=True)) + list(
TenantUserCustomField.objects.filter(tenant_id=self.context["tenant_id"]).values_list("name", flat=True)
)
if not_allowed_fields := target_fields - set(allowed_target_fields):
raise ValidationError(
_("字段映射中的目标字段 {} 不属于用户自定义字段或内置字段").format(not_allowed_fields),
)

return field_mapping


Expand Down
27 changes: 16 additions & 11 deletions src/bk-user/bkuser/apis/web/data_source/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from bkuser.common.error_codes import error_codes
from bkuser.common.response import convert_workbook_to_response
from bkuser.common.views import ExcludePatchAPIViewMixin, ExcludePutAPIViewMixin
from bkuser.plugins.constants import DATA_SOURCE_PLUGIN_CONFIG_SCHEMA_MAP
from bkuser.plugins.base import get_plugin_cfg_schema_map

logger = logging.getLogger(__name__)

Expand All @@ -70,7 +70,7 @@ class DataSourcePluginDefaultConfigApi(generics.RetrieveAPIView):
operation_description="数据源插件默认配置",
responses={
status.HTTP_200_OK: DataSourcePluginDefaultConfigOutputSLZ(),
**DATA_SOURCE_PLUGIN_CONFIG_SCHEMA_MAP,
**get_plugin_cfg_schema_map(),
},
)
def get(self, request, *args, **kwargs):
Expand Down Expand Up @@ -114,27 +114,29 @@ def get(self, request, *args, **kwargs):
request_body=DataSourceCreateInputSLZ(),
responses={
status.HTTP_201_CREATED: DataSourceCreateOutputSLZ(),
**DATA_SOURCE_PLUGIN_CONFIG_SCHEMA_MAP,
**get_plugin_cfg_schema_map(),
},
)
def post(self, request, *args, **kwargs):
slz = DataSourceCreateInputSLZ(data=request.data)
current_tenant_id = self.get_current_tenant_id()
slz = DataSourceCreateInputSLZ(data=request.data, context={"tenant_id": current_tenant_id})
slz.is_valid(raise_exception=True)
data = slz.validated_data

with transaction.atomic():
current_user = request.user.username
ds = DataSource.objects.create(
name=data["name"],
owner_tenant_id=self.get_current_tenant_id(),
owner_tenant_id=current_tenant_id,
plugin=DataSourcePlugin.objects.get(id=data["plugin_id"]),
plugin_config=data["plugin_config"],
field_mapping=data["field_mapping"],
creator=current_user,
updater=current_user,
)
# 数据源创建后,发送信号用于登录认证,用户初始化等相关工作
post_create_data_source.send(sender=self.__class__, data_source=ds)

# 数据源创建后,发送信号用于登录认证,用户初始化等相关工作
post_create_data_source.send(sender=self.__class__, data_source=ds)

return Response(
DataSourceCreateOutputSLZ(instance={"id": ds.id}).data,
Expand All @@ -153,7 +155,7 @@ class DataSourceRetrieveUpdateApi(
operation_description="数据源详情",
responses={
status.HTTP_200_OK: DataSourceRetrieveOutputSLZ(),
**DATA_SOURCE_PLUGIN_CONFIG_SCHEMA_MAP,
**get_plugin_cfg_schema_map(),
},
)
def get(self, request, *args, **kwargs):
Expand All @@ -165,14 +167,17 @@ def get(self, request, *args, **kwargs):
request_body=DataSourceUpdateInputSLZ(),
responses={
status.HTTP_204_NO_CONTENT: "",
**DATA_SOURCE_PLUGIN_CONFIG_SCHEMA_MAP,
**get_plugin_cfg_schema_map(),
},
)
def put(self, request, *args, **kwargs):
data_source = self.get_object()
slz = DataSourceUpdateInputSLZ(
data=request.data,
context={"plugin_id": data_source.plugin_id},
context={
"plugin_id": data_source.plugin_id,
"tenant_id": self.get_current_tenant_id(),
},
)
slz.is_valid(raise_exception=True)
data = slz.validated_data
Expand All @@ -183,7 +188,7 @@ def put(self, request, *args, **kwargs):
data_source.updater = request.user.username
data_source.save()

post_update_data_source.send(sender=self.__class__, data_source=data_source)
post_update_data_source.send(sender=self.__class__, data_source=data_source)

return Response(status=status.HTTP_204_NO_CONTENT)

Expand Down
24 changes: 18 additions & 6 deletions src/bk-user/bkuser/apps/data_source/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,26 @@
"""
from django.dispatch import receiver

from bkuser.apps.data_source.models import DataSource
from bkuser.apps.data_source.signals import post_update_data_source
from bkuser.apps.data_source.models import DataSource, DataSourceUser
from bkuser.apps.data_source.signals import (
post_batch_create_data_source_user,
post_create_data_source_user,
post_update_data_source,
)
from bkuser.apps.data_source.tasks import initialize_identity_info_and_send_notification


@receiver(post_update_data_source)
def initial_local_data_source_user_identity_info(sender, data_source: DataSource, **kwargs):
@receiver(post_batch_create_data_source_user)
def sync_identity_infos_and_notify(sender, data_source: DataSource, **kwargs):
"""
TODO (su) 数据源更新后,需要检查是否是本地数据源,若是本地数据源且启用账密登录,
则需要对没有账密信息的用户,进行密码的初始化 & 发送通知
数据源更新后,需要检查是否是本地数据源,若是本地数据源且启用账密登录,
则需要对没有账密信息的用户,进行密码的初始化 & 发送通知,批量创建数据源用户同理
"""
...
initialize_identity_info_and_send_notification.delay(data_source.id)


@receiver(post_create_data_source_user)
def initialize_identity_info_and_notify(sender, data_source: DataSource, user: DataSourceUser, **kwargs):
"""在创建完数据源用户后,需要初始化账密信息,并发送通知"""
initialize_identity_info_and_send_notification.delay(data_source.id, user.id)
44 changes: 23 additions & 21 deletions src/bk-user/bkuser/apps/data_source/initializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"""
import datetime
import logging
from typing import List
from typing import Dict, List, Optional, Tuple

from django.utils import timezone

Expand Down Expand Up @@ -56,32 +56,31 @@ def __init__(self, data_source: DataSource):
self.plugin_cfg.password_rule, self.plugin_cfg.password_initial # type: ignore
)

def sync(self) -> List[DataSourceUser]:
"""检查指定数据源的所有用户,对没有账密信息的,做初始化,适用于批量同步(导入)的情况"""
def initialize(self, users: Optional[List[DataSourceUser]] = None) -> Tuple[List[DataSourceUser], Dict[int, str]]:
"""
初始化指定用户的身份信息,若没有指定,则初始化该数据源所有没有初始化过的
:returns: (数据源用户列表, {user_id: password})
"""
if self._can_skip():
return []
return [], {}

exists_info_user_ids = LocalDataSourceIdentityInfo.objects.filter(
data_source=self.data_source,
).values_list("user_id", flat=True)
# NOTE:已经存在的账密信息,不会按照最新规则重新生成!不然用户密码就失效了!
waiting_init_users = DataSourceUser.objects.filter(
data_source=self.data_source,
).exclude(id__in=exists_info_user_ids)

self._init_users_identity_info(waiting_init_users)
return waiting_init_users

def initialize(self, user: DataSourceUser) -> None:
"""初始化用户身份信息,适用于单个用户创建的情况"""
if self._can_skip():
return
if users:
waiting_init_users = [u for u in users if u.id not in exists_info_user_ids]
else:
waiting_init_users = DataSourceUser.objects.filter(
data_source=self.data_source,
).exclude(id__in=exists_info_user_ids)

if LocalDataSourceIdentityInfo.objects.filter(user=user).exists():
logger.warning("local data source user %s identity info exists, skip initialize", user.id)
return
if not waiting_init_users:
logger.warning("not users need initialize, skip...")
return [], {}

self._init_users_identity_info([user])
return waiting_init_users, self._init_users_identity_info(waiting_init_users)

def _can_skip(self) -> bool:
"""预先判断能否直接跳过"""
Expand All @@ -96,8 +95,8 @@ def _can_skip(self) -> bool:

return False

def _init_users_identity_info(self, users: List[DataSourceUser]):
"""初始化用户身份信息"""
def _init_users_identity_info(self, users: List[DataSourceUser]) -> Dict[int, str]:
"""初始化用户身份信息,返回 {user_id: password} 映射表"""
time_now = timezone.now()
expired_at = self._get_password_expired_at()

Expand All @@ -114,6 +113,9 @@ def _init_users_identity_info(self, users: List[DataSourceUser]):
]
LocalDataSourceIdentityInfo.objects.bulk_create(waiting_create_infos, batch_size=self.BATCH_SIZE)

# 由于用户密码采用 HASH 加密,因此只有在初始化的时候才能获取到明文密码
return {info.user.id: info.password for info in waiting_create_infos}

def _get_password_expired_at(self) -> datetime.datetime:
"""获取密码过期的具体时间"""
valid_time: int = self.plugin_cfg.password_rule.valid_time # type: ignore
Expand Down
50 changes: 31 additions & 19 deletions src/bk-user/bkuser/apps/data_source/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import logging
from typing import Dict, List, Optional

from django.conf import settings
from django.template import Context, Template
from django.utils.translation import gettext_lazy as _

from bkuser.apps.data_source.models import DataSource, DataSourceUser, LocalDataSourceIdentityInfo
from bkuser.apps.data_source.models import DataSource, DataSourceUser
from bkuser.component import cmsi
from bkuser.plugins.local.constants import NotificationMethod, NotificationScene
from bkuser.plugins.local.models import LocalDataSourcePluginConfig, NotificationTemplate
Expand All @@ -25,9 +26,10 @@
class NotificationTmplContextGenerator:
"""生成通知模板使用的上下文"""

def __init__(self, user: DataSourceUser, scene: NotificationScene):
def __init__(self, user: DataSourceUser, scene: NotificationScene, **scene_kwargs):
self.user = user
self.scene = scene
self.scene_kwargs = scene_kwargs

def gen(self) -> Dict[str, str]:
"""生成通知模板使用的上下文
Expand All @@ -54,10 +56,12 @@ def _gen_base_ctx(self) -> Dict[str, str]:

def _gen_user_initialize_ctx(self) -> Dict[str, str]:
"""用户初始化"""
info = LocalDataSourceIdentityInfo.objects.get(user=self.user)
# FIXME (su) 密码修改为对称加密后,无法通过该方式直接获取到
# FIXME (su) 提供修改密码的 URL(settings.BK_USER_URL + xxxx)
return {"password": info.password, "reset_url": "https://example.com/reset-password", **self._gen_base_ctx()}
return {
"password": self.scene_kwargs["passwd"],
"reset_url": settings.BK_USER_URL + "/reset-password",
**self._gen_base_ctx(),
}

def _gen_reset_passwd_ctx(self) -> Dict[str, str]:
"""重置密码"""
Expand Down Expand Up @@ -90,17 +94,25 @@ def __init__(self, data_source: DataSource, scene: NotificationScene):

self.templates = self._get_tmpls_with_scene(plugin_cfg, scene)

def send(self, users: Optional[List[DataSourceUser]] = None):
def send(self, users: List[DataSourceUser], user_passwd_map: Optional[Dict[int, str]] = None) -> None:
"""根据数据源插件配置,发送对应的通知信息"""
if users is None:
users = DataSourceUser.objects.filter(data_source=self.data_source)
user_passwd_map = user_passwd_map or {}

# 预先检查,避免出现发送部分通知后,出现异常中断的情况
if self.scene == NotificationScene.USER_INITIALIZE: # noqa: SIM102
if not_passwd_users := {u.id for u in users} - set(user_passwd_map.keys()):
raise ValueError(f"users {not_passwd_users} not found in user passwd map")

try:
for u in users:
self._send_notifications(u)
scene_kwargs = {}
if self.scene == NotificationScene.USER_INITIALIZE:
scene_kwargs["passwd"] = user_passwd_map[u.id]

self._send_notifications(u, **scene_kwargs)
# TODO (su) 细化异常处理
except Exception:
logger.exception(_("send notification failed"))
logger.exception("send notification failed")

def _get_tmpls_with_scene(
self, plugin_cfg: LocalDataSourcePluginConfig, scene: NotificationScene
Expand All @@ -122,26 +134,26 @@ def _get_tmpls_with_scene(
# 返回场景匹配,且被声明启用的模板列表
return [tmpl for tmpl in cfg.templates if tmpl.scene == scene and tmpl.method in cfg.enabled_methods]

def _send_notifications(self, user: DataSourceUser):
def _send_notifications(self, user: DataSourceUser, **scene_kwargs):
"""根据配置的通知模板,逐个用户发送通知"""
for tmpl in self.templates:
if tmpl.method == NotificationMethod.EMAIL:
self._send_email(user, tmpl)
self._send_email(user, tmpl, **scene_kwargs)
elif tmpl.method == NotificationMethod.SMS:
self._send_sms(user, tmpl)
self._send_sms(user, tmpl, **scene_kwargs)

def _send_email(self, user: DataSourceUser, tmpl: NotificationTemplate):
def _send_email(self, user: DataSourceUser, tmpl: NotificationTemplate, **scene_kwargs):
logger.info("send email to user %s, scene %s, title: %s", user.username, tmpl.scene, tmpl.title)
content = self._render_tmpl(user, tmpl.content_html)
content = self._render_tmpl(user, tmpl.content_html, **scene_kwargs)
# FIXME (su) 修改为指定用户名
cmsi.send_mail([user.email], tmpl.sender, tmpl.title, content) # type: ignore

def _send_sms(self, user: DataSourceUser, tmpl: NotificationTemplate):
def _send_sms(self, user: DataSourceUser, tmpl: NotificationTemplate, **scene_kwargs):
logger.info("send sms to user %s, scene %s", user.username, tmpl.scene)
content = self._render_tmpl(user, tmpl.content)
content = self._render_tmpl(user, tmpl.content, **scene_kwargs)
# FIXME (su) 修改为指定用户名
cmsi.send_sms([user.phone], content)

def _render_tmpl(self, user: DataSourceUser, content: str) -> str:
ctx = NotificationTmplContextGenerator(user=user, scene=self.scene).gen()
def _render_tmpl(self, user: DataSourceUser, content: str, **scene_kwargs) -> str:
ctx = NotificationTmplContextGenerator(user=user, scene=self.scene, **scene_kwargs).gen()
return Template(content).render(Context(ctx))
4 changes: 4 additions & 0 deletions src/bk-user/bkuser/apps/data_source/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@
post_create_data_source = django.dispatch.Signal(providing_args=["data_source"])

post_update_data_source = django.dispatch.Signal(providing_args=["data_source"])

post_batch_create_data_source_user = django.dispatch.Signal(providing_args=["data_source", "usernames"])

post_create_data_source_user = django.dispatch.Signal(providing_args=["data_source", "user"])
Loading

0 comments on commit 9a8a34b

Please sign in to comment.