import logging
import os
import re
+import subprocess
def _list_mounts():
def _list_disks():
- logging.debug('listing disks...')
+ if os.path.exists('/dev/disk/by-id/'):
+ return _list_disks_dev_by_id()
+
+ else: # fall back to fdisk -l
+ return _list_disks_fdisk()
+
+
+def _list_disks_dev_by_id():
+ logging.debug('listing disks using /dev/disk/by-id/')
disks_by_dev = {}
partitions_by_dev = {}
return disks
+def _list_disks_fdisk():
+ try:
+ output = subprocess.check_output('fdisk -l 2>/dev/null', shell=True)
+
+ except Exception as e:
+ logging.error('failed to list disks using "fdisk -l": %s' % e, exc_info=True)
+
+ return []
+
+ disks = []
+ disk = None
+
+ def add_disk(disk):
+ logging.debug('found disk at "%s" on bus "%s": "%s %s"' %
+ (disk['target'], disk['bus'], disk['vendor'], disk['model']))
+
+ for part in disk['partitions']:
+ logging.debug('found partition "%s" at "%s" on bus "%s": "%s %s"' %
+ (part['part_no'], part['target'], part['bus'], part['vendor'], part['model']))
+
+ disks.append(disk)
+
+ for line in output.split('\n'):
+ line = line.replace('*', '')
+ line = re.sub('\s+', ' ', line.strip())
+ if not line:
+ continue
+
+ if line.startswith('Disk /dev/'):
+ if disk and disk['partitions']:
+ add_disk(disk)
+
+ parts = line.split()
+
+ disk = {
+ 'target': parts[1].strip(':'),
+ 'bus': '',
+ 'vendor': '',
+ 'model': parts[2] + ' ' + parts[3].strip(','),
+ 'partitions': []
+ }
+
+ elif line.startswith('/dev/') and disk:
+ parts = line.split()
+ part_no = re.findall('\d+$', parts[0])
+ partition = {
+ 'part_no': int(part_no[0]) if part_no else None,
+ 'target': parts[0],
+ 'bus': '',
+ 'vendor': '',
+ 'model': parts[4] + ' ' + ' '.join(parts[6:]),
+ }
+
+ disk['partitions'].append(partition)
+
+ if disk and disk['partitions']:
+ add_disk(disk)
+
+ disks.sort(key=lambda d: d['target'])
+
+ for disk in disks:
+ disk['partitions'].sort(key=lambda p: p['part_no'])
+
+ return disks
+
+
def list_mounted_disks():
mounted_disks = []