diff --git a/tests/storage_tests/devices_test/md_test.py b/tests/storage_tests/devices_test/md_test.py index f719ddcbe..1b7b3d9b0 100644 --- a/tests/storage_tests/devices_test/md_test.py +++ b/tests/storage_tests/devices_test/md_test.py @@ -209,6 +209,128 @@ def test_mdraid_members_add_remove(self): self.assertIsNone(part.format.md_uuid) +class MDLUKSTestCase(StorageTestCase): + + raidname = "blivetTestRAIDLUKS" + passphrase = "passphrase" + + def setUp(self): + super().setUp() + + disks = [os.path.basename(vdev) for vdev in self.vdevs] + self.storage = blivet.Blivet() + self.storage.exclusive_disks = disks + self.storage.reset() + + # make sure only the targetcli disks are in the devicetree + for disk in self.storage.disks: + self.assertTrue(disk.path in self.vdevs) + self.assertIsNone(disk.format.type) + self.assertFalse(disk.children) + + def _clean_up(self): + self.storage.reset() + for disk in self.storage.disks: + if disk.path not in self.vdevs: + raise RuntimeError("Disk %s found in devicetree but not in disks created for tests" % disk.name) + self.storage.recursive_remove(disk) + + self.storage.do_it() + + return super()._clean_up() + + def _prepare_members(self, members): + luks_devs = [] + for i in range(members): + disk = self.storage.devicetree.get_device_by_path(self.vdevs[i]) + self.assertIsNotNone(disk) + self.storage.initialize_disk(disk) + + part = self.storage.new_partition(size=blivet.size.Size("100 MiB"), fmt_type="luks", + fmt_args={"passphrase": self.passphrase}, + parents=[disk]) + self.storage.create_device(part) + + luks_dev = blivet.devices.LUKSDevice("luks-%s" % part.name, size=part.size, + parents=[part]) + self.storage.create_device(luks_dev) + self.storage.format_device(luks_dev, blivet.formats.get_format("mdmember")) + luks_devs.append(luks_dev) + + blivet.partitioning.do_partitioning(self.storage) + + return luks_devs + + def _test_mdraid(self, raid_level, members): + luks_devs = self._prepare_members(members) + array = self.storage.new_mdarray(name=self.raidname, parents=luks_devs, + level=raid_level, total_devices=members, + member_devices=members) + self.storage.create_device(array) + + with wait_for_resync(): + self.storage.do_it() + self.storage.reset() + + array = self.storage.devicetree.get_device_by_name(self.raidname) + self.assertIsNotNone(array) + self.assertEqual(array.level, raid_level) + self.assertEqual(array.member_devices, members) + self.assertEqual(array.spares, 0) + self.assertCountEqual([m.name for m in array.members], + [p.name for p in luks_devs]) + for member in array.members: + self.assertEqual(member.format.md_uuid, array.uuid) + + array.teardown() + for member in array.members: + member.teardown() + + parts_names = [parent.parents[0].name for parent in array.members] + + self.storage.reset() + + parts = [] + for name in parts_names: + part = self.storage.devicetree.get_device_by_name(name) + self.assertIsNotNone(part) + self.assertEqual(part.format.type, "luks") + parts.append(part) + + # unlock just one of the LUKS partitions + parts[0].format.passphrase = self.passphrase + parts[0].format.setup() + self.assertTrue(parts[0].format.status) + + self.storage.devicetree.populate() + + # array should not be in the tree + array = self.storage.devicetree.get_device_by_name(self.raidname) + + self.assertIsNone(array) + + # unlock rest of the LUKS devices + for part in parts[1:]: + part.format.passphrase = self.passphrase + part.format.setup() + self.assertTrue(parts[0].format.status) + + # now the array should be in the tree and we should be able to activate it + self.storage.devicetree.populate() + array = self.storage.devicetree.get_device_by_name(self.raidname) + self.assertIsNotNone(array) + + array.setup() + self.assertTrue(array.status) + self.assertEqual(len(array.members), members) + + def test_luks_mdraid_raid0(self): + self._test_mdraid(blivet.devicelibs.raid.RAID0, 2) + + def test_luks_mdraid_raid1(self): + self._test_mdraid(blivet.devicelibs.raid.RAID1, 2) + + class BIOSRAIDTestCase(StorageTestCase): def setUp(self):