summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--curtin/storage_config.py26
-rw-r--r--tests/unittests/test_storage_config.py32
2 files changed, 52 insertions, 6 deletions
diff --git a/curtin/storage_config.py b/curtin/storage_config.py
index 5f96609c..ceae05f6 100644
--- a/curtin/storage_config.py
+++ b/curtin/storage_config.py
@@ -4,6 +4,7 @@ import copy
import operator
import os
import re
+from typing import Optional
import yaml
from curtin.log import LOG
@@ -470,6 +471,22 @@ class ProbertParser(object):
return multipath.is_mpath_partition(
blockdev.get('DEVNAME', ''), blockdev)
+ @staticmethod
+ def detect_partition_scheme(blockdev) -> Optional[str]:
+ ''' Return either:
+ * None if the blockdev is not partitioned
+ * A type of partition table (as a string) if it is supported
+ * "unsupported" if it is not supported.
+ '''
+ if 'ID_PART_TABLE_TYPE' not in blockdev:
+ return None
+
+ ptype = blockdev['ID_PART_TABLE_TYPE']
+ if ptype not in schemas._ptables:
+ return schemas._ptable_unsupported
+
+ return ptype
+
def blockdev_to_id(self, blockdev):
""" Examine a blockdev dictionary and return a tuple of curtin
storage type and name that can be used as a value for
@@ -752,12 +769,9 @@ class BlockdevParser(ProbertParser):
if dasd_size != "0":
entry['ptable'] = 'vtoc'
- if 'ID_PART_TABLE_TYPE' in blockdev_data:
- ptype = blockdev_data['ID_PART_TABLE_TYPE']
- if ptype in schemas._ptables:
- entry['ptable'] = ptype
- else:
- entry['ptable'] = schemas._ptable_unsupported
+ ptable = self.detect_partition_scheme(blockdev_data)
+ if ptable is not None:
+ entry['ptable'] = ptable
match = re.fullmatch(r'/dev/(?P<ctrler>nvme\d+)n\d', devname)
if match is not None:
diff --git a/tests/unittests/test_storage_config.py b/tests/unittests/test_storage_config.py
index f9321c2d..61c8017c 100644
--- a/tests/unittests/test_storage_config.py
+++ b/tests/unittests/test_storage_config.py
@@ -143,6 +143,38 @@ class TestProbertParser(CiTestCase):
self.assertIsNotNone(bdparser(probe_data))
+ def test_detect_partition_scheme__unpartitioned(self):
+ blockdev = {
+ "DEVNAME": "/dev/sda",
+ "DEVTYPE": "disk",
+ }
+ self.assertIsNone(baseparser.detect_partition_scheme(blockdev))
+
+ def test_detect_partition_scheme__gpt(self):
+ blockdev = {
+ "DEVNAME": "/dev/sda",
+ "DEVTYPE": "disk",
+ "ID_PART_TABLE_TYPE": "gpt",
+ }
+ self.assertEqual("gpt", baseparser.detect_partition_scheme(blockdev))
+
+ def test_detect_partition_scheme__dos(self):
+ blockdev = {
+ "DEVNAME": "/dev/sda",
+ "DEVTYPE": "disk",
+ "ID_PART_TABLE_TYPE": "dos",
+ }
+ self.assertEqual("dos", baseparser.detect_partition_scheme(blockdev))
+
+ def test_detect_partition_scheme__something_else(self):
+ blockdev = {
+ "DEVNAME": "/dev/sda",
+ "DEVTYPE": "disk",
+ "ID_PART_TABLE_TYPE": "foobar",
+ }
+ self.assertEqual(
+ "unsupported", baseparser.detect_partition_scheme(blockdev))
+
def _get_data(datafile):
data = util.load_file('tests/data/%s' % datafile)