diff --git a/src/bk-user/bkuser/plugins/local/exceptions.py b/src/bk-user/bkuser/plugins/local/exceptions.py index dc56d7ec5..444cac211 100644 --- a/src/bk-user/bkuser/plugins/local/exceptions.py +++ b/src/bk-user/bkuser/plugins/local/exceptions.py @@ -35,5 +35,13 @@ class RequiredFieldIsEmpty(LocalDataSourcePluginError): """待导入文件中必填字段为空""" +class InvalidLeader(LocalDataSourcePluginError): + """待导入的用不上级信息不合法""" + + +class InvalidUsername(LocalDataSourcePluginError): + """待导入的用户名非法""" + + class DuplicateUsername(LocalDataSourcePluginError): """待导入文件中存在重复用户""" diff --git a/src/bk-user/bkuser/plugins/local/parser.py b/src/bk-user/bkuser/plugins/local/parser.py index 8a87b4bf5..d6586cc0f 100644 --- a/src/bk-user/bkuser/plugins/local/parser.py +++ b/src/bk-user/bkuser/plugins/local/parser.py @@ -16,10 +16,13 @@ from django.utils.translation import gettext_lazy as _ from openpyxl.workbook import Workbook +from bkuser.plugins.local.constants import USERNAME_REGEX from bkuser.plugins.local.exceptions import ( CustomColumnNameInvalid, DuplicateColumnName, DuplicateUsername, + InvalidLeader, + InvalidUsername, RequiredFieldIsEmpty, SheetColumnsNotMatch, UserSheetNotExists, @@ -119,18 +122,29 @@ def _validate_and_prepare(self): # noqa: C901 if duplicate_col_names := [n for n, cnt in Counter(sheet_col_names).items() if cnt > 1]: raise DuplicateColumnName(_("待导入文件中存在重复列名:{}").format(", ".join(duplicate_col_names))) - usernames = [] - # 5. 检查所有必填字段是否有值 + all_usernames = [] for row in self.sheet.iter_rows(min_row=self.user_data_min_row_idx): info = dict(zip(self.all_field_names, [cell.value for cell in row], strict=True)) + # 5. 检查所有必填字段是否有值 for field_name in self.required_field_names: if not info.get(field_name): raise RequiredFieldIsEmpty(_("待导入文件中必填字段 {} 存在空值").format(field_name)) - usernames.append(info["username"]) + username = info["username"] + # 6. 检查用户名是否合法 + if not USERNAME_REGEX.fullmatch(username): + raise InvalidUsername( + _("用户名 {} 不符合命名规范: 由3-32位字母、数字、下划线(_)、点(.)、连接符(-)字符组成,以字母或数字开头").format(username) # noqa: E501 + ) + + # 7. 检查用户不能是自己的 leader + if (leaders := info.get("leaders")) and username in [ld.strip() for ld in leaders.split(",")]: + raise InvalidLeader(_("待导入文件中用户 {} 不能是自己的直接上级").format(username)) + + all_usernames.append(username) - # 6. 检查用户名是否有重复的 - if duplicate_usernames := [n for n, cnt in Counter(usernames).items() if cnt > 1]: + # 8. 检查用户名是否有重复的 + if duplicate_usernames := [n for n, cnt in Counter(all_usernames).items() if cnt > 1]: raise DuplicateUsername(_("待导入文件中存在重复用户名:{}").format(", ".join(duplicate_usernames))) def _parse_departments(self): diff --git a/src/bk-user/tests/plugins/local/test_parser.py b/src/bk-user/tests/plugins/local/test_parser.py index c4108ed1f..6e8ea62f2 100644 --- a/src/bk-user/tests/plugins/local/test_parser.py +++ b/src/bk-user/tests/plugins/local/test_parser.py @@ -15,6 +15,8 @@ CustomColumnNameInvalid, DuplicateColumnName, DuplicateUsername, + InvalidLeader, + InvalidUsername, RequiredFieldIsEmpty, SheetColumnsNotMatch, UserSheetNotExists, @@ -55,9 +57,27 @@ def test_validate_case_required_field_is_empty(self, user_wk): with pytest.raises(RequiredFieldIsEmpty): LocalDataSourceDataParser(user_wk).parse() + def test_validate_case_invalid_username_chinese(self, user_wk): + # 修改表格数据,导致用户名非法 + user_wk["users"]["A4"].value = "张三" + with pytest.raises(InvalidUsername): + LocalDataSourceDataParser(user_wk).parse() + + def test_validate_case_invalid_username_punctuation(self, user_wk): + # 修改表格数据,导致用户名非法 + user_wk["users"]["A4"].value = "zhangsan@m.com" + with pytest.raises(InvalidUsername): + LocalDataSourceDataParser(user_wk).parse() + + def test_validate_case_invalid_leader(self, user_wk): + # 修改表格数据,导致用户是自己的 leader + user_wk["users"]["F4"].value = "zhangsan, lisi,wangwu" + with pytest.raises(InvalidLeader): + LocalDataSourceDataParser(user_wk).parse() + def test_validate_case_duplicate_username(self, user_wk): # 修改表格数据,导致用户名重复 - user_wk["users"]["A4"].value = "zhangsan" + user_wk["users"]["A6"].value = "zhangsan" with pytest.raises(DuplicateUsername): LocalDataSourceDataParser(user_wk).parse()