Drivers for Solaris ZFS operations in local and iSCSI modes
import abc
import fcntl
import os
import subprocess
import time
from eventlet.green.OpenSSL import SSL
from eventlet.green import socket
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
import paramiko
from cinder import exception
from cinder.i18n import _, _LE, _LI
from cinder.image import image_utils
from cinder.volume import driver
from cinder.volume.drivers.san.san import SanDriver
import rad.auth as rada
import rad.bindings.com.oracle.solaris.rad.zfsmgr_1 as zfsmgr
import rad.client as radc
import rad.connect as radcon
from solaris_install.target.size import Size
LOG = logging.getLogger(__name__)
solaris_zfs_opts = [
help='The base dataset for ZFS volumes.'),
help='iSCSI target group name.'), ]
def connect_tls(host, port=12302, locale=None, ca_certs=None):
"""Connect to a RAD instance over TLS.
host string, target host
port int, target port (RAD_PORT_TLS = 12302)
locale string, locale
ca_certs string, path to file containing CA certificates
RadConnection: a connection to RAD
# We don't want SSL 2.0, SSL 3.0 nor TLS 1.0 in RAD
context = SSL.Context(SSL.SSLv23_METHOD)
if ca_certs is not None:
context.set_verify(SSL.VERIFY_PEER, _tls_verify_cb)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = SSL.Connection(context, sock)
sock.connect((host, port))
return radcon.RadConnection(sock, locale=locale)
class ZFSVolumeDriver(SanDriver):
"""OpenStack Cinder ZFS volume driver for generic ZFS volumes.
Version history:
1.0.0 - Initial driver with basic functionalities in Havana
1.1.0 - Support SAN for the remote storage nodes access in Juno
1.1.1 - Add support for the volume backup
1.1.2 - Add support for the volume migration
1.2.0 - Add support for the volume management in Kilo
1.2.1 - Enable the connect_tls by importing eventlet.green.socket
1.2.2 - Introduce the ZFS RAD for volume migration enhancement
1.2.3 - Replace volume-specific targets with one shared target in
the ZFSISCSIDriver
1.3.0 - Support the option iscsi_secondary_ip_addresses and then
return target_iqns, target_portals, target_luns in Mitaka
version = "1.3.0"
protocol = 'local'
def __init__(self, *args, **kwargs):
super(ZFSVolumeDriver, self).__init__(execute=self.solaris_execute,
*args, **kwargs)
self.run_local = self.configuration.san_is_local
self.hostname = socket.gethostname()
def solaris_execute(self, *cmd, **kwargs):
"""Execute the command locally or remotely."""
if self.run_local:
return processutils.execute(*cmd, **kwargs)
return super(ZFSVolumeDriver, self)._run_ssh(cmd,
def check_for_setup_error(self):
"""Check the setup error."""
def create_volume(self, volume):
"""Create a volume."""
size = '%sG' % volume['size']
zfs_volume = self._get_zfs_volume_name(volume['name'])
# Create a ZFS volume
cmd = ['/usr/sbin/zfs', 'create', '-V', size, zfs_volume]
LOG.debug(_("Created ZFS volume '%s'") % volume['name'])
def create_volume_from_snapshot(self, volume, snapshot):
"""Create a cloned volume from a snapshot."""
if volume['size'] != snapshot['volume_size']:
exception_message = (_("Could not create volume '%s' because "
"its volume size of '%s' is different "
"from that of the snapshot, '%s'.")
% (volume['name'], volume['size'],
raise exception.InvalidInput(reason=exception_message)
# Create a ZFS clone
zfs_snapshot = self._get_zfs_snap_name(snapshot)
zfs_volume = self._get_zfs_volume_name(volume['name'])
cmd = ['/usr/sbin/zfs', 'clone', zfs_snapshot, zfs_volume]
LOG.debug(_("Created cloned volume '%s'") % volume['name'])
def create_cloned_volume(self, volume, src_vref):
"""Create a clone of the specified volume."""
if volume['size'] != src_vref['size']:
exception_message = (_("Could not clone volume '%s' because "
"its volume size of '%s' is different "
"from that of the source volume, '%s'.")
% (volume['name'], volume['size'],
raise exception.VolumeBackendAPIException(data=exception_message)
LOG.debug(_("Created cloned volume '%s' from source volume '%s'")
% (volume['name'], src_vref['name']))
def delete_volume(self, volume):
"""Delete a volume.
Firstly, the volume should be checked if it is a cloned one. If yes,
its parent snapshot with prefix 'tmp-snapshot-' should be deleted as
well after it is removed.
zvol = self._get_zvol_path(volume)
(out, _err) = self._execute('/usr/bin/ls', zvol)
except processutils.ProcessExecutionError:
LOG.debug(_("The volume path '%s' doesn't exist") % zvol)
zfs_volume = self._get_zfs_volume_name(volume['name'])
origin_snapshot = self._get_zfs_property('origin', zfs_volume)
tmp_cloned_vol = False
# Check if it is the temporary snapshot created for the cloned volume
if origin_snapshot.startswith(self.configuration.zfs_volume_base):
prop_type = self._get_zfs_property('type', origin_snapshot)
tmp_snap_prefix = 'tmp-snapshot-%s' % volume['id']
if prop_type == 'snapshot' and tmp_snap_prefix in origin_snapshot:
tmp_cloned_vol = True
cmd = ['/usr/sbin/zfs', 'destroy', zfs_volume]
LOG.debug(_("Deleted volume '%s'") % volume['name'])
if tmp_cloned_vol:
self._execute('/usr/sbin/zfs', 'destroy', origin_snapshot)
LOG.debug(_("Deleted parent snapshot '%s' of volume '%s'")
% (origin_snapshot, volume['name']))
def create_snapshot(self, snapshot):
"""Create a snapshot."""
cmd = ['/usr/sbin/zfs', 'snapshot', self._get_zfs_snap_name(snapshot)]
LOG.debug(_("Created snapshot '%s'") % snapshot['name'])
def delete_snapshot(self, snapshot):
"""Delete a snapshot."""
cmd = ['/usr/sbin/zfs', 'destroy', self._get_zfs_snap_name(snapshot)]
LOG.debug(_("Deleted snapshot '%s'") % snapshot['name'])
def ensure_export(self, context, volume):
"""Synchronously recreate an export for a logical volume."""
def create_export(self, context, volume, connector):
"""Export the volume."""
def remove_export(self, context, volume):
"""Remove an export for a volume."""
def initialize_connection(self, volume, connector):
"""Initialize the connection and returns connection info."""
volume_path = '%s/volume-%s' % (self.configuration.zfs_volume_base,
properties = {}
properties['device_path'] = self._get_zvol_path(volume)
return {
'driver_volume_type': 'local',
'volume_path': volume_path,
'data': properties
def terminate_connection(self, volume, connector, **kwargs):
"""Disconnection from the connector."""
def attach_volume(self, context, volume, instance_uuid, host_name,
"""Callback for volume attached to instance or host."""
def detach_volume(self, context, volume, attachment):
""" Callback for volume detached."""
def get_volume_stats(self, refresh=False):
"""Get volume status."""
if refresh:
return self._stats
def _get_zfs_property(self, prop, dataset):
"""Get the value of property for the dataset."""
(out, _err) = self._execute('/usr/sbin/zfs', 'get', '-H', '-o',
'value', prop, dataset)
return out.rstrip()
except processutils.ProcessExecutionError:
LOG.info(_LI("Failed to get the property '%s' of the dataset '%s'")
% (prop, dataset))
return None
def _get_zfs_snap_name(self, snapshot):
"""Get the snapshot path."""
return "%s/%s@%s" % (self.configuration.zfs_volume_base,
snapshot['volume_name'], snapshot['name'])
def _get_zfs_volume_name(self, volume_name):
"""Add the pool name to get the ZFS volume."""
return "%s/%s" % (self.configuration.zfs_volume_base,
def _remote_piped_execute(self, cmd1, cmd2, ip, username, password):
"""Piped execute on a remote host."""
LOG.debug(_("Piping cmd1='%s' into cmd='%s' on host '%s'") %
(' '.join(cmd1), ' '.join(cmd2), ip))
client = paramiko.SSHClient()
client.connect(ip, username=username, password=password)
cmd = ' '.join(cmd1) + '|' + ' '.join(cmd2)
stdin, stdout, stderr = client.exec_command(cmd)
channel = stdout.channel
exit_status = channel.recv_exit_status()
if exit_status != 0:
LOG.error(_("_remote_piped_execute: failed to host '%s' with "
"stdout '%s' and stderr '%s'")
% (ip, stdout.read(), stderr.read()))
msg = (_("Remote piped execution failed to host '%s'.") % ip)
raise exception.VolumeBackendAPIException(data=msg)
def _piped_execute(self, cmd1, cmd2):
"""Pipe output of cmd1 into cmd2."""
LOG.debug(_("Piping cmd1='%s' into cmd2='%s'") %
(' '.join(cmd1), ' '.join(cmd2)))
p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE,
LOG.error(_LE("_piped_execute '%s' failed.") % (cmd1))
# Set the pipe to be blocking because evenlet.green.subprocess uses
# the non-blocking pipe.
flags = fcntl.fcntl(p1.stdout, fcntl.F_GETFL) & (~os.O_NONBLOCK)
fcntl.fcntl(p1.stdout, fcntl.F_SETFL, flags)
p2 = subprocess.Popen(cmd2, stdin=p1.stdout,
stdout, stderr = p2.communicate()
if p2.returncode:
msg = (_("_piped_execute failed with the info '%s' and '%s'.") %
(stdout, stderr))
raise exception.VolumeBackendAPIException(data=msg)
def _zfs_send_recv(self, src, dst):
"""Replicate the ZFS dataset by calling zfs send/recv cmd"""
src_snapshot = {'volume_name': src['name'],
'name': 'tmp-snapshot-%s' % src['id']}
src_snapshot_name = self._get_zfs_snap_name(src_snapshot)
prop_type = self._get_zfs_property('type', src_snapshot_name)
# Delete the temporary snapshot if it already exists
if prop_type == 'snapshot':
# Create a temporary snapshot of volume
src_snapshot_name = self._get_zfs_snap_name(src_snapshot)
cmd1 = ['/usr/sbin/zfs', 'send', src_snapshot_name]
cmd2 = ['/usr/sbin/zfs', 'receive', dst]
# Due to pipe injection protection in the ssh utils method,
# cinder.utils.check_ssh_injection(), the piped commands must be passed
# through via paramiko. These commands take no user defined input
# other than the names of the zfs datasets which are already protected
# against the special characters of concern.
if not self.run_local:
ip = self.configuration.san_ip
username = self.configuration.san_login
password = self.configuration.san_password
self._remote_piped_execute(cmd1, cmd2, ip, username, password)
self._piped_execute(cmd1, cmd2)
# Delete the temporary src snapshot and dst snapshot
dst_snapshot_name = "%s@tmp-snapshot-%s" % (dst, src['id'])
cmd = ['/usr/sbin/zfs', 'destroy', dst_snapshot_name]
def _get_rc_connect(self, san_info=None):
"""Connect the RAD server."""
if san_info is not None:
san_ip = san_info.split(';')[0]
san_login = san_info.split(';')[1]
san_password = san_info.split(';')[2]
san_ip = self.configuration.san_ip
san_login = self.configuration.san_login
san_password = self.configuration.san_password
rc = connect_tls(san_ip)
auth = rada.RadAuth(rc)
auth.pam_login(san_login, san_password)
return rc
def _rad_zfs_send_recv(self, src, dst, dst_san_info=None):
"""Replicate the ZFS dataset stream."""
src_snapshot = {'volume_name': src['name'],
'name': 'tmp-send-snapshot-%s' % src['id']}
src_snapshot_name = self._get_zfs_snap_name(src_snapshot)
prop_type = self._get_zfs_property('type', src_snapshot_name)
# Delete the temporary snapshot if it already exists
if prop_type == 'snapshot':
# Create the temporary snapshot of src volume
src_rc = self._get_rc_connect()
dst_rc = self._get_rc_connect(dst_san_info)
src_pat = self._get_zfs_volume_name(src['name'])
src_vol_obj = src_rc.get_object(zfsmgr.ZfsDataset(),
dst_pat = dst.rsplit('/', 1)[0]
dst_vol_obj = dst_rc.get_object(zfsmgr.ZfsDataset(),
send_sock_info = src_vol_obj.get_send_socket(
name=src_snapshot_name, socket_type=zfsmgr.SocketType.AF_INET)
send_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
send_sock.connect((self.hostname, int(send_sock_info.socket)))
dst_san_ip = dst_san_info.split(';')[0]
remote_host, alias, addresslist = socket.gethostbyaddr(dst_san_ip)
recv_sock_info = dst_vol_obj.get_receive_socket(
name=dst, socket_type=zfsmgr.SocketType.AF_INET,
recv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
recv_sock.connect((remote_host, int(recv_sock_info.socket)))
# Set 4mb buffer size
buf_size = 4194304
while True:
# Read the data from the send stream
buf = send_sock.recv(buf_size)
if not buf:
# Write the data to the receive steam
# Delete the temporary dst snapshot
pat = radc.ADRGlobPattern({"name": dst})
dst_zvol_obj = dst_rc.get_object(zfsmgr.ZfsDataset(), pat)
snapshot_list = dst_zvol_obj.get_snapshots()
for snap in snapshot_list:
if 'tmp-send-snapshot'in snap:
# Delete the temporary src snapshot
LOG.debug(("Transfered src'%s' to dst'%s' on the host'%s'") %
(src_snapshot_name, dst, self.hostname))
def _get_zvol_path(self, volume):
"""Get the ZFS volume path."""
return "/dev/zvol/rdsk/%s" % self._get_zfs_volume_name(volume['name'])
def _update_volume_stats(self):
"""Retrieve volume status info."""
LOG.debug(_("Updating volume status"))
stats = {}
backend_name = self.configuration.safe_get('volume_backend_name')
stats["volume_backend_name"] = backend_name or self.__class__.__name__
stats["storage_protocol"] = self.protocol
stats["driver_version"] = self.version
stats["vendor_name"] = 'Oracle'
stats['QoS_support'] = False
dataset = self.configuration.zfs_volume_base
used_size = self._get_zfs_property('used', dataset)
avail_size = self._get_zfs_property('avail', dataset)
stats['total_capacity_gb'] = \
(Size(used_size) + Size(avail_size)).get(Size.gb_units)
stats['free_capacity_gb'] = Size(avail_size).get(Size.gb_units)
stats['reserved_percentage'] = self.configuration.reserved_percentage
stats['location_info'] =\
('ZFSVolumeDriver:%(hostname)s:%(zfs_volume_base)s:local' %
{'hostname': self.hostname,
'zfs_volume_base': self.configuration.zfs_volume_base})
self._stats = stats
def extend_volume(self, volume, new_size):
"""Extend an existing volume's size."""
volsize_str = 'volsize=%sg' % new_size
zfs_volume = self._get_zfs_volume_name(volume['name'])
self._execute('/usr/sbin/zfs', 'set', volsize_str, zfs_volume)
except Exception:
msg = (_("Failed to extend volume size to %(new_size)s GB.")
% {'new_size': new_size})
raise exception.VolumeBackendAPIException(data=msg)
def rename_volume(self, src, dst):
"""Rename the volume from src to dst in the same zpool."""
cmd = ['/usr/sbin/zfs', 'rename', src, dst]
LOG.debug(_("Rename the volume '%s' to '%s'") % (src, dst))
def _get_existing_volume_ref_name(self, existing_ref):
"""Returns the volume name of an existing reference.
And Check if an existing volume reference has a source-name
if 'source-name' in existing_ref:
vol_name = existing_ref['source-name']
return vol_name
reason = _("Reference must contain source-name.")
raise exception.ManageExistingInvalidReference(
def manage_existing_get_size(self, volume, existing_ref):
"""Return size of volume to be managed by manage_existing.
existing_ref is a dictionary of the form:
{'source-name': <name of the volume>}
target_vol_name = self._get_existing_volume_ref_name(existing_ref)
volsize = self._get_zfs_property('volsize', target_vol_name)
return Size(volsize).get(Size.gb_units)
def manage_existing(self, volume, existing_ref):
"""Brings an existing zfs volume object under Cinder management.
:param volume: Cinder volume to manage
:param existing_ref: Driver-specific information used to identify a
# Check the existence of the ZFS volume
target_vol_name = self._get_existing_volume_ref_name(existing_ref)
prop_type = self._get_zfs_property('type', target_vol_name)
if prop_type != 'volume':
msg = (_("Failed to identify the volume '%s'.")
% target_vol_name)
raise exception.InvalidInput(reason=msg)
if volume['name']:
volume_name = volume['name']
volume_name = 'new_zvol'
# rename the volume
dst_volume = "%s/%s" % (self.configuration.zfs_volume_base,
self.rename_volume(target_vol_name, dst_volume)
def unmanage(self, volume):
"""Removes the specified volume from Cinder management."""
# Rename the volume's name to cinder-unm-* format.
volume_name = self._get_zfs_volume_name(volume['name'])
tmp_volume_name = "cinder-unm-%s" % volume['name']
new_volume_name = "%s/%s" % (self.configuration.zfs_volume_base,
self.rename_volume(volume_name, new_volume_name)
def migrate_volume(self, context, volume, host):
"""Migrate the volume from one backend to another one.
The backends should be in the same volume type.
:param context: context
:param volume: a dictionary describing the volume to migrate
:param host: a dictionary describing the host to migrate to
false_ret = (False, None)
if volume['status'] != 'available':
LOG.debug(_("Status of volume '%s' is '%s', not 'available'.") %
(volume['name'], volume['status']))
return false_ret
if 'capabilities' not in host:
LOG.debug(("No 'capabilities' is reported in the host'%s'") %
return false_ret
if 'location_info' not in host['capabilities']:
LOG.debug(("No 'location_info' is reported in the host'%s'") %
return false_ret
info = host['capabilities']['location_info']
dst_volume = "%s/%s" % (info.split(':')[2], volume['name'])
src_volume = self._get_zfs_volume_name(volume['name'])
# check if the src and dst volume are under the same zpool
dst_san_info = info.split(':')[3]
if dst_san_info == 'local':
self._zfs_send_recv(volume, dst_volume)
self._rad_zfs_send_recv(volume, dst_volume, dst_san_info)
# delete the source volume
provider_location = {}
return (True, provider_location)
class STMFDriver(ZFSVolumeDriver):
"""Abstract base class for common COMSTAR operations."""
__metaclass__ = abc.ABCMeta
def __init__(self, *args, **kwargs):
super(STMFDriver, self).__init__(*args, **kwargs)
def _stmf_execute(self, *cmd):
"""Handle the possible race during the local execution."""
tries = 0
while True:
except processutils.ProcessExecutionError as ex:
tries = tries + 1
if tries >= self.configuration.num_shell_tries or \
'resource busy' not in ex.stderr:
time.sleep(tries ** 2)
def _check_target(self, target, protocol):
"""Verify the target and check its status."""
(out, _err) = self._execute('/usr/sbin/stmfadm', 'list-target',
'-v', target)
tmp_protocol = None
status = None
for line in [l.strip() for l in out.splitlines()]:
if line.startswith("Operational"):
status = line.split()[-1]
if line.startswith("Protocol"):
tmp_protocol = line.split()[-1]
if tmp_protocol == protocol:
return status
err_msg = (_("'%s' does not match the listed protocol '%s'"
" for target '%s'.")
% (protocol, tmp_protocol, target))
except processutils.ProcessExecutionError as error:
if 'not found' in error.stderr:
LOG.debug(_("The target '%s' is not found.") % target)
return None
err_msg = (_("Failed to list the target '%s': '%s'")
% (target, error.stderr))
raise exception.VolumeBackendAPIException(data=err_msg)
def _online_target(self, target, protocol):
"""Online the target in the offline state."""
self._execute('/usr/sbin/stmfadm', 'online-target',
assert self._check_target(target, protocol) == 'Online'
def _check_tg(self, tg):
"""Check if the target group exists."""
self._execute('/usr/sbin/stmfadm', 'list-tg', tg)
return True
except processutils.ProcessExecutionError as error:
if 'not found' in error.stderr:
LOG.debug(_("The target group '%s' is not found.") % tg)
return False
err_msg = (_("Failed to list the target group '%s': '%s'")
% (tg, error.stderr))
raise exception.VolumeBackendAPIException(data=err_msg)
def _get_luid(self, volume):
"""Get the LU corresponding to the volume."""
zvol = self._get_zvol_path(volume)
(out, _err) = self._execute('/usr/sbin/stmfadm', 'list-lu', '-v')
luid = None
for line in [l.strip() for l in out.splitlines()]:
if line.startswith("LU Name:"):
luid = line.split()[-1]
if line.startswith("Alias") and line.split()[-1] == zvol:
luid = None
if luid is not None:
LOG.debug(_("Got the LU '%s'") % luid)
LOG.debug(_("Failed to get LU for volume '%s'")
% volume['name'])
return luid
def _get_view_and_lun(self, lu):
"""Check the view entry of the LU and then get the lun and view."""
view_and_lun = {}
view_and_lun['view'] = view_and_lun['lun'] = None
(out, _err) = self._execute('/usr/sbin/stmfadm', 'list-view',
'-l', lu, '-v')
except processutils.ProcessExecutionError as error:
if 'no views found' in error.stderr:
LOG.debug(_("No view is found for LU '%s'") % lu)
return view_and_lun
for line in [l.strip() for l in out.splitlines()]:
if line.startswith("View Entry:"):
view_and_lun['view'] = line.split()[-1]
if line.startswith("LUN") and 'Auto' not in line.split()[-1]:
view_and_lun['lun'] = int(line.split()[-1])
if line.startswith("Lun"):
view_and_lun['lun'] = int(line.split()[2])
if view_and_lun['view'] is None or view_and_lun['lun'] is None:
err_msg = (_("Failed to get the view_entry or LUN of the LU '%s'.")
% lu)
raise exception.VolumeBackendAPIException(data=err_msg)
LOG.debug(_("The view_entry and LUN of LU '%s' are '%s' and '%d'.")
% (lu, view_and_lun['view'], view_and_lun['lun']))
return view_and_lun
class ZFSISCSIDriver(STMFDriver, driver.ISCSIDriver):
"""ZFS volume operations in iSCSI mode."""
protocol = 'iSCSI'
def __init__(self, *args, **kwargs):
super(ZFSISCSIDriver, self).__init__(*args, **kwargs)
if not self.configuration.san_is_local:
self.hostname, alias, addresslist = \
def get_volume_stats(self, refresh=False):
"""Get volume status."""
status = super(ZFSISCSIDriver, self).get_volume_stats(refresh)
status["storage_protocol"] = self.protocol
backend_name = self.configuration.safe_get('volume_backend_name')
status["volume_backend_name"] = backend_name or self.__class__.__name__
if not self.configuration.san_is_local:
san_info = "%s;%s;%s" % (self.configuration.san_ip,
status['location_info'] = \
'%(san_info)s' %
{'hostname': self.hostname,
'zfs_volume_base': self.configuration.zfs_volume_base,
'san_info': san_info})
return status
def _add_tg_member(self, target, tg, tpg):
"""Create the target and then add it to the target group."""
self._stmf_execute('/usr/sbin/itadm', 'create-target', '-n',
target, '-t', tpg)
self._stmf_execute('/usr/sbin/stmfadm', 'offline-target',
self._stmf_execute('/usr/sbin/stmfadm', 'add-tg-member', '-g',
tg, target)
def _get_target_portal(self, target):
"""Get the current target IP address."""
(out, _err) = self._execute('/usr/sbin/itadm', 'list-target',
'-v', target)
portal = None
for line in [l.strip() for l in out.splitlines()]:
if line.startswith("Target Address:"):
portal = line.split()[-1]
return portal
def _check_tpg(self, tpg):
"""Verify the tpg."""
(out, _err) = self._execute('/usr/sbin/itadm', 'list-tpg',
'-v', tpg)
return True
except processutils.ProcessExecutionError as error:
if 'not found' in error.stderr:
return False
err_msg = (_("Failed to list the tpg '%s': '%s'")
% (tpg, error.stderr))
raise exception.VolumeBackendAPIException(data=err_msg)
def _create_tpg(self, tpg, ip):
"""Create the TPG for the IP address."""
self._execute('/usr/sbin/itadm', 'create-tpg', tpg, ip)
except processutils.ProcessExecutionError as error:
err_msg = (_("Failed to create the tpg '%s': '%s'") %
(tpg, error.stderr))
raise exception.VolumeBackendAPIException(data=err_msg)
def _setup_targets(self, target_ips, target_group):
"""Setup targets for the IP addresses."""
for ip in target_ips:
tpg_name = "tpg-%s" % ip
if not self._check_tpg(tpg_name):
self._create_tpg(tpg_name, ip)
target_name = '%s%s-%s-%s-target' % \
target_status = self._check_target(target_name, 'iSCSI')
if target_status == 'Online':
if target_status is None:
self._add_tg_member(target_name, target_group, tpg_name)
self._online_target(target_name, 'iSCSI')
def do_setup(self, context):
"""Setup the target and target group."""
target_group = self.configuration.zfs_target_group
if not self._check_tg(target_group):
self._stmf_execute('/usr/sbin/stmfadm', 'create-tg', target_group)
target_name = '%s%s-%s-target' % \
target_status = self._check_target(target_name, 'iSCSI')
secondary_interfaces = self.configuration.iscsi_secondary_ip_addresses
if target_status == 'Online' and not secondary_interfaces:
if target_status is None:
# Create the primary target
if self.configuration.san_is_local:
primary_ip = self.configuration.iscsi_ip_address
primary_ip = self.configuration.san_ip
tpg_name = "tpg-%s" % primary_ip
if not self._check_tpg(tpg_name):
self._create_tpg(tpg_name, primary_ip)
self._add_tg_member(target_name, target_group, tpg_name)
# Online the target from the 'Offline' status
self._online_target(target_name, 'iSCSI')
if secondary_interfaces:
secondary_ips = [ip for ip in secondary_interfaces if ip.strip()]
self._setup_targets(secondary_ips, target_group)
def _get_tg_secondary_members(self, primary_target):
"""Get target members of the target group."""
tg = self.configuration.zfs_target_group
(out, _err) = self._execute('/usr/sbin/stmfadm', 'list-tg', '-v', tg)
targets = []
target_portals = []
for line in [l.strip() for l in out.splitlines()]:
if line.startswith("Member:"):
target = line.split()[-1]
if target == primary_target:
target_portal = self._get_target_portal(target)
if target_portal:
return targets, target_portals
def create_export(self, context, volume, conncetor):
"""Export the volume."""
# If the volume is already exported there is nothing to do, as we
# simply export volumes and they are universally available.
luid = self._get_luid(volume)
if luid:
view_lun = self._get_view_and_lun(luid)
if view_lun['view'] is not None:
msg = (_("Failed to create logical unit for volume '%s' due "
"to an existing LU id but no view.") % volume['name'])
raise exception.VolumeBackendAPIException(data=msg)
zvol = self._get_zvol_path(volume)
# Create a Logical Unit (LU)
self._stmf_execute('/usr/sbin/stmfadm', 'create-lu', zvol)
luid = self._get_luid(volume)
if not luid:
msg = (_("Failed to create LU for volume '%s'")
% volume['name'])
raise exception.VolumeBackendAPIException(data=msg)
# Add a view entry to the logical unit
target_group = self.configuration.zfs_target_group
self._stmf_execute('/usr/sbin/stmfadm', 'add-view',
'-t', target_group, luid)
def remove_export(self, context, volume):
"""Remove an export for a volume.
All of the related elements about the volume, including the
target, target group, view entry and lu, are deleted.
luid = self._get_luid(volume)
# Remove the LU
if luid is not None:
self._stmf_execute('/usr/sbin/stmfadm', 'delete-lu', luid)
# Remove the target and its target group if they were created by
# earlier versions of the volume driver
target_group = 'tg-%s' % volume['name']
target_name = '%s%s' % (self.configuration.iscsi_target_prefix,
if self._check_target(target_name, 'iSCSI') is not None:
self._stmf_execute('/usr/sbin/itadm', 'delete-target', '-f',
if self._check_tg(target_group):
self._stmf_execute('/usr/sbin/stmfadm', 'delete-tg', target_group)
def _get_iscsi_properties(self, volume):
"""Get iSCSI configuration
Now we use the discovery address as the default approach to add
objects into the initiator. A discovery address is an IP address:port
combination used in a SendTargets discovery session in the initiator.
:target_discovered: boolean indicating whether discovery was used
:target_iqn: the IQN of the iSCSI target
:target_portal: the portal of the iSCSI target
:target_lun: the lun of the iSCSI target
:volume_id: the id of the volume
:auth_method:, :auth_username:, :auth_password:
the authentication details. Right now, either auth_method is not
present meaning no authentication, or auth_method == `CHAP`
meaning use CHAP with the specified credentials.
If multiple IP addresses are configured, the returns will include
:target_iqns, :target_portals, :target_luns, which contain lists of
multiple values. The main portal information is also returned in
:target_iqn, :target_portal, :target_lun for backward compatibility.
luid = self._get_luid(volume)
if not luid:
msg = (_("Failed to get LU for volume '%s'") % volume['name'])
raise exception.VolumeBackendAPIException(data=msg)
old_target_name = True
target_name = '%s%s' % (self.configuration.iscsi_target_prefix,
if self._check_target(target_name, 'iSCSI') is None:
target_name = '%s%s-%s-target' % \
old_target_name = False
properties = {}
properties['target_discovered'] = True
properties['target_iqn'] = target_name
# Here the san_is_local means that the cinder-volume runs in the
# iSCSI target with iscsi_ip_address.
if self.configuration.san_is_local:
target_ip = self.configuration.iscsi_ip_address
target_ip = self.configuration.san_ip
properties['target_portal'] = ('%s:%d' %
view_lun = self._get_view_and_lun(luid)
if view_lun['lun'] is not None:
properties['target_lun'] = view_lun['lun']
properties['volume_id'] = volume['id']
secondary_ifs = self.configuration.iscsi_secondary_ip_addresses
# The multipathing doesn't apply to the old volume-specific target
if not old_target_name and secondary_ifs:
target_portals = []
target_iqns = []
target_luns = []
secondary_iqns, secondary_portals = self._get_tg_secondary_members(
properties['target_portals'] = target_portals + secondary_portals
properties['target_iqns'] = target_iqns + secondary_iqns
properties['target_luns'] = (len(secondary_iqns) + 1) * target_luns
auth = volume['provider_auth']
if auth:
(auth_method, auth_username, auth_secret) = auth.split()
properties['auth_method'] = auth_method
properties['auth_username'] = auth_username
properties['auth_password'] = auth_secret
return properties
def initialize_connection(self, volume, connector):
"""Initialize the connection and returns connection info.
The iSCSI driver returns a driver_volume_type of 'iscsi'.
The format of the driver data is defined in _get_iscsi_properties.
Example return value::
'driver_volume_type': 'iscsi'
'data': {
'target_discovered': True,
'target_portal': '',
'volume_id': 1,
initiator_name = connector['initiator']
volume_name = volume['name']
LOG.debug(_('Connecting the initiator %(initiator_name)s '
'for volume %(volume_name)s')
% {'initiator_name': initiator_name,
'volume_name': volume_name})
iscsi_properties = self._get_iscsi_properties(volume)
return {
'driver_volume_type': 'iscsi',
'data': iscsi_properties
def terminate_connection(self, volume, connector, **kwargs):
"""Disconnection from the connector."""
initiator_name = connector['initiator']
volume_name = volume['name']
LOG.debug(_('Disconnecting the initiator %(initiator_name)s '
'for volume %(volume_name)s')
% {'initiator_name': initiator_name,
'volume_name': volume_name})
class ZFSFCDriver(STMFDriver, driver.FibreChannelDriver):
"""ZFS volume operations in FC mode."""
protocol = 'FC'
def __init__(self, *args, **kwargs):
super(ZFSFCDriver, self).__init__(*args, **kwargs)
if not self.configuration.san_is_local:
self.hostname, alias, addresslist = \
def get_volume_stats(self, refresh=False):
"""Get volume status."""
status = super(ZFSFCDriver, self).get_volume_stats(refresh)
status["storage_protocol"] = self.protocol
backend_name = self.configuration.safe_get('volume_backend_name')
status["volume_backend_name"] = backend_name or self.__class__.__name__
if not self.configuration.san_is_local:
san_info = "%s;%s;%s" % (self.configuration.san_ip,
status['location_info'] = \
'%(san_info)s' %
{'hostname': self.hostname,
'zfs_volume_base': self.configuration.zfs_volume_base,
'san_info': san_info})
return status
def do_setup(self, context):
"""Check wwns and setup the target group."""
self.wwns = self._get_wwns()
if not self.wwns:
msg = (_("Could not determine fibre channel world wide "
"node names."))
raise exception.VolumeBackendAPIException(data=msg)
self.tg = 'tg-wwn-%s' % self.wwns[0]
if not self._check_tg(self.tg):
def _get_wwns(self):
"""Get the FC port WWNs of the host."""
(out, _err) = self._execute('/usr/sbin/fcinfo', 'hba-port', '-t')
wwns = []
for line in [l.strip() for l in out.splitlines()]:
if line.startswith("HBA Port WWN:"):
wwn = line.split()[-1]
LOG.debug(_("Got the FC port WWN '%s'") % wwn)
return wwns
def _get_target_wwns(self, tg):
"""Get the target members in the tg."""
(out, _err) = self._execute('/usr/sbin/stmfadm', 'list-tg',
'-v', tg)
wwns = []
for line in [l.strip() for l in out.splitlines()]:
if line.startswith("Member:"):
wwn = line.split()[-1]
target_wwn = wwn.split('.')[-1]
return wwns
def _setup_tg(self, tg):
"""Setup the target group."""
self._stmf_execute('/usr/sbin/stmfadm', 'create-tg', tg)
# Add free target wwns into the target group
for wwn in self.wwns:
if not self._target_in_tg(wwn, None):
target_wwn = 'wwn.%s' % wwn
self._stmf_execute('/usr/sbin/stmfadm', 'offline-target',
self._stmf_execute('/usr/sbin/stmfadm', 'add-tg-member',
'-g', tg, target_wwn)
self._online_target(target_wwn, 'Channel')
LOG.error(_LE("Failed to add and online the target '%s'.")
% (target_wwn))
target_wwns = self._get_target_wwns(tg)
if not target_wwns:
msg = (_("No target members exist in the target group '%s'.")
% tg)
raise exception.VolumeBackendAPIException(data=msg)
def _only_lu(self, lu):
"""Check if the LU is the only one."""
(out, _err) = self._execute('/usr/sbin/stmfadm', 'list-lu', '-v')
linecount = 0
for line in [l.strip() for l in out.splitlines()]:
if line.startswith("LU Name:"):
luid = line.split()[-1]
linecount += 1
if linecount == 1 and luid == lu:
LOG.debug(_("The LU '%s' is the only one.") % lu)
return True
return False
def _target_in_tg(self, wwn, tg):
"""Check if the target has been added into a target group."""
target = 'wwn.%s' % wwn.upper()
if tg is not None:
(out, _err) = self._execute('/usr/sbin/stmfadm', 'list-tg',
'-v', tg)
(out, _err) = self._execute('/usr/sbin/stmfadm', 'list-tg', '-v')
for line in [l.strip() for l in out.splitlines()]:
if line.startswith("Member:") and target in line:
return True
LOG.debug(_("The target '%s' is not in %s target group.") %
(target, tg if tg else 'any'))
return False
def _force_lip_wwn(self):
"""Force the link to reinitialize."""
target_wwns = self._get_target_wwns(self.tg)
for target_wwn in target_wwns:
self._stmf_execute('/usr/sbin/fcadm', 'force-lip', target_wwn)
def create_export(self, context, volume, connector):
"""Export the volume."""
# If the volume is already exported there is nothing to do, as we
# simply export volumes and they are universally available.
luid = self._get_luid(volume)
if luid:
view_lun = self._get_view_and_lun(luid)
if view_lun['view'] is not None:
msg = (_("Failed to create logical unit for volume '%s' due "
"to an existing LU id but no view.") % volume['name'])
raise exception.VolumeBackendAPIException(data=msg)
zvol = self._get_zvol_path(volume)
# Create a Logical Unit (LU)
self._stmf_execute('/usr/sbin/stmfadm', 'create-lu', zvol)
luid = self._get_luid(volume)
if not luid:
msg = (_("Failed to create logic unit for volume '%s'")
% volume['name'])
raise exception.VolumeBackendAPIException(data=msg)
# setup the target group if it doesn't exist.
if not self._check_tg(self.tg):
# Add a logical unit view entry
self._stmf_execute('/usr/sbin/stmfadm', 'add-view', '-t',
self.tg, luid)
def remove_export(self, context, volume):
"""Remove an export for a volume."""
luid = self._get_luid(volume)
if luid is not None:
target_group = self.tg
view_lun = self._get_view_and_lun(luid)
if view_lun['view']:
self._stmf_execute('/usr/sbin/stmfadm', 'remove-view', '-l',
luid, view_lun['view'])
# Remove the target group when the LU to be deleted is last one
# exposed by the target group.
if self._only_lu(luid):
if self._check_tg(target_group):
self._stmf_execute('/usr/sbin/stmfadm', 'delete-tg',
# Remove the LU
self._stmf_execute('/usr/sbin/stmfadm', 'delete-lu', luid)
def _get_fc_properties(self, volume):
"""Get Fibre Channel configuration.
:target_discovered: boolean indicating whether discovery was used
:target_wwn: the world wide name of the FC port target
:target_lun: the lun assigned to the LU for the view entry
luid = self._get_luid(volume)
if not luid:
msg = (_("Failed to get logic unit for volume '%s'")
% volume['name'])
raise exception.VolumeBackendAPIException(data=msg)
properties = {}
properties['target_discovered'] = True
properties['target_wwn'] = self._get_target_wwns(self.tg)
view_lun = self._get_view_and_lun(luid)
if view_lun['lun'] is not None:
properties['target_lun'] = view_lun['lun']
return properties
def initialize_connection(self, volume, connector):
"""Initializes the connection and returns connection info.
The driver returns a driver_volume_type of 'fibre_channel'.
The target_wwn can be a single entry or a list of wwns that
correspond to the list of remote wwn(s) that will export the volume.
Example return values:
'driver_volume_type': 'fibre_channel'
'data': {
'target_discovered': True,
'target_lun': 1,
'target_wwn': '1234567890123',
'driver_volume_type': 'fibre_channel'
'data': {
'target_discovered': True,
'target_lun': 1,
'target_wwn': ['1234567890123', '0987654321321'],
fc_properties = self._get_fc_properties(volume)
return {
'driver_volume_type': 'fibre_channel',
'data': fc_properties