Module hybridq.utils.dot
Author: Salvatore Mandra (salvatore.mandra@nasa.gov)
Copyright © 2021, United States Government, as represented by the Administrator of the National Aeronautics and Space Administration. All rights reserved.
The HybridQ: A Hybrid Simulator for Quantum Circuits platform is licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Expand source code
"""
Author: Salvatore Mandra (salvatore.mandra@nasa.gov)
Copyright © 2021, United States Government, as represented by the Administrator
of the National Aeronautics and Space Administration. All rights reserved.
The HybridQ: A Hybrid Simulator for Quantum Circuits platform is licensed under
the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0.
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from __future__ import annotations
from hybridq.utils.utils import load_library
from hybridq.utils.transpose import _swap_core
from warnings import warn
import numpy as np
import ctypes
# Define shorthand for defining functions
def _define_function(lib, fname, restype, *argtypes):
func = lib.__getattr__(fname)
func.argtypes = argtypes
func.restype = restype
return func
# Load library
_lib_dot = load_library('hybridq.so')
# If library isn't loaded properly, warn
if _lib_dot is None:
warn("Cannot find C++ HybridQ core. "
"Falling back to 'numpy.dot' instead.")
# Get c_type
_c_types_map = {
np.dtype('float32'): ctypes.c_float,
np.dtype('float64'): ctypes.c_double,
}
# Get pack size
_log2_pack_size = _define_function(_lib_dot, "get_log2_pack_size",
ctypes.c_uint32)() if _lib_dot else None
# Get dot core
_dot_core = {
np.dtype(t): _define_function(_lib_dot, f"apply_U_{t}", ctypes.c_int,
ctypes.POINTER(_c_types_map[np.dtype(t)]),
ctypes.POINTER(_c_types_map[np.dtype(t)]),
ctypes.POINTER(_c_types_map[np.dtype(t)]),
ctypes.POINTER(ctypes.c_uint32),
ctypes.c_uint, ctypes.c_uint)
for t in (t + str(b) for t in ['float'] for b in [32, 64])
} if _lib_dot else None
# Get to_complex core
_to_complex_core = {
np.dtype('complex' + str(2 * b)): _define_function(
_lib_dot, f"to_complex{2*b}", ctypes.c_int,
ctypes.POINTER(_c_types_map[np.dtype('float' + str(b))]),
ctypes.POINTER(_c_types_map[np.dtype('float' + str(b))]),
ctypes.POINTER(_c_types_map[np.dtype('float' + str(b))]), ctypes.c_uint)
for b in [32, 64]
} if _lib_dot else None
def _maybe_view(a: array_like):
"""
Determine if `a` is potentially a view of another array.
"""
return a.base is not None
def to_complex(a: array_like, b: array_like):
# Check that a and b have the same shape
if a.shape != b.shape:
raise ValueError("'a' and 'b' must have the same shape.")
# Check that neither a nor b are complex objects
if np.iscomplexobj(a) or np.iscomplexobj(b):
raise ValueError("Both 'a' and 'b' must be real valued.")
# Use faster C++ implementation
if not _maybe_view(a) and not _maybe_view(
b) and a.dtype == b.dtype and a.dtype in _c_types_map:
# Get complex_type
complex_type = (1j * np.array([0], dtype=a.dtype)).dtype
# Get new array
c = np.empty(a.shape, dtype=complex_type)
# Get c_float_type
c_float_type = _c_types_map[a.dtype]
# Get pointers
a_ptr = a.ctypes.data_as(ctypes.POINTER(c_float_type))
b_ptr = b.ctypes.data_as(ctypes.POINTER(c_float_type))
c_ptr = c.ctypes.data_as(ctypes.POINTER(c_float_type))
# Get size
size = np.prod(a.shape)
# Apply core
_to_complex_core[complex_type](a_ptr, b_ptr, c_ptr, size)
# Use slower numpy implementation
else:
c = a + 1j * b
# Return complex
return c
def to_complex_array(a: array_like):
# Check that a is complex
if not np.iscomplexobj(a):
raise ValueError("'a' must be an array of complex numbers.")
# Get real type
float_type = np.real(np.array([0], dtype=a.dtype)).dtype
# Get size
size = np.prod(a.shape)
# Split real and imaginary part
return np.reshape(
np.reshape(
np.asarray(a, order='C').view(float_type), (np.prod(a.shape), 2)).T,
(2,) + a.shape)
def dot(a: np.ndarray,
b: np.ndarray,
axes_b: iter[int] = None,
b_as_complex_array: bool = False,
inplace: bool = False,
backend: any = 'numpy',
**kwargs):
"""
"""
# Select the correct backend
if backend == 'numpy':
import numpy as backend
elif backend == 'jax':
import jax.numpy as backend
else:
raise ValueError(f"Backend {backend} is not supported.")
# set defaults
kwargs.setdefault('out', None)
kwargs.setdefault('force_numpy', False)
kwargs.setdefault('raise_if_hcore_fails', False)
kwargs.setdefault('swap_back', True)
kwargs.setdefault('alignment', 32)
# If axes are not provided, fall back to numpy.dot
if axes_b is None:
return backend.dot(a, b, out=kwargs['out'])
# Get array
def _get(x):
# Copy reference
_x = x
# Get array
x = np.asarray(x, order='C')
# Return array and True if new
return x, x is not _x
# Convert to numpy.ndarray
a, _new_a = _get(a)
b, _new_b = _get(b)
# Get number of axes
a_ndim = a.ndim
b_ndim = b.ndim - (1 if b_as_complex_array else 0)
# Get shapes
a_shape = np.asarray(a.shape)
b_shape = np.asarray(b.shape[:len(b.shape) -
(1 if b_as_complex_array else 0)])
# Get axes
axes_b = np.asarray(axes_b)
# Get real_type and complex_type
real_type = b.dtype if b_as_complex_array else np.real(
np.array([1], dtype=b.dtype)).dtype
complex_type = (1j * np.array([1], dtype=real_type)).dtype
# Check right format for b if b_as_complex_array==True
if b_as_complex_array:
if b.shape[0] != 2:
raise ValueError("'b' is in the wrong format.")
if np.iscomplexobj(b):
raise ValueError("'b' is expected to be real.")
# Check axes
if any(axes_b >= b_ndim):
raise IndexError("Index not in 'b'")
if a_shape[-1] != np.prod(b_shape[axes_b]):
raise ValueError("'a' and 'b' are incompatible.")
# Get positions
pos = (b_ndim - axes_b[::-1] - 1).astype('uint32')
# Get smallest swap size
swap_size = 0 if np.all(pos >= _log2_pack_size) else next(
k
for k in range(_log2_pack_size, 2 * max(len(axes_b), _log2_pack_size) +
1)
if sum(pos < k) <= k - _log2_pack_size)
# Check if it is possible to use HybridQ core
_use_hcore = not kwargs['force_numpy']
# Check library has been loaded properly
_use_hcore &= _lib_dot is not None
# Check if real_type is supported
_use_hcore &= real_type in _c_types_map
# Check that a is a matrix
_use_hcore &= a_ndim == 2 and a_shape[0] == a_shape[1]
# Check that the system isn't too small
_use_hcore &= b_ndim >= 6 and (b_ndim - len(axes_b)) >= _log2_pack_size
# Check that all axes for b have dimension 2
_use_hcore &= all(x == 2 for x in b_shape)
# Check that not too many axes are used
_use_hcore &= len(axes_b) <= 10
# Check that swap is not too large
_use_hcore &= swap_size <= 12
# For debug purposes
if not _use_hcore and not kwargs['force_numpy'] and kwargs[
'raise_if_hcore_fails']:
raise AssertionError("Cannot use HybridQ core.")
# Check if the HybridQ C++ core can be used
if _use_hcore:
from hybridq.utils.aligned import array, isaligned
# Convert
if a.dtype != complex_type:
warn(f"'a' is recast to '{complex_type}' to match 'b'.")
a = a.astype(complex_type)
# Convert and align
if b_as_complex_array:
b = array(b,
copy=not (inplace or _new_b),
alignment=kwargs['alignment'])
else:
b = array([np.real(b), np.imag(b)],
dtype=real_type,
alignment=kwargs['alignment'])
# Split in real and imaginary part
b_re = b[0]
b_im = b[1]
# Check
assert (b_re.ctypes.data % 32 == 0)
assert (b_im.ctypes.data % 32 == 0)
# Get pointers
a_ptr = a.ctypes.data_as(ctypes.POINTER(_c_types_map[real_type]))
b_re_ptr = b_re.ctypes.data_as(ctypes.POINTER(_c_types_map[real_type]))
b_im_ptr = b_im.ctypes.data_as(ctypes.POINTER(_c_types_map[real_type]))
# Swap real and imaginary part if required
if swap_size:
# Get swap transposition
swap_pos = np.concatenate(
(np.setdiff1d(range(swap_size),
pos), np.intersect1d(range(swap_size),
pos))).astype('uint32')
swap_pos_ptr = swap_pos.ctypes.data_as(
ctypes.POINTER(ctypes.c_uint32))
swap_pos_inv = np.argsort(swap_pos).astype('uint32')
swap_pos_inv_ptr = swap_pos_inv.ctypes.data_as(
ctypes.POINTER(ctypes.c_uint32))
# Transpose real and imaginary part
if _swap_core[real_type](b_re_ptr, swap_pos_ptr, b_ndim, swap_size):
raise ValueError("Something went wrong with '_swap_core'.")
if _swap_core[real_type](b_im_ptr, swap_pos_ptr, b_ndim, swap_size):
raise ValueError("Something went wrong with '_swap_core'.")
# Swap positions
pos = np.array(
[swap_pos_inv[x] if x < swap_size else x for x in pos],
dtype='uint32')
# Get pointer
pos_ptr = pos.ctypes.data_as(ctypes.POINTER(ctypes.c_uint32))
# Call HybridQ C++ core
if _dot_core[real_type](b_re_ptr, b_im_ptr, a_ptr, pos_ptr, b_ndim,
len(pos)):
raise ValueError('Something went wrong.')
# Swap back
if swap_size and kwargs['swap_back']:
# Transpose real and imaginary part
if _swap_core[real_type](b_re_ptr, swap_pos_inv_ptr, b_ndim,
swap_size):
raise ValueError("Something went wrong with '_swap_core'.")
if _swap_core[real_type](b_im_ptr, swap_pos_inv_ptr, b_ndim,
swap_size):
raise ValueError("Something went wrong with '_swap_core'.")
# Get result
res = b if b_as_complex_array else b[0] + 1j * b[1]
# Get the right transposition to use in `hybridq.utils.transpose`
if swap_size and not kwargs['swap_back']:
tr = list(range(b_ndim - swap_size)) + (
b_ndim - swap_pos_inv[::-1] - 1).tolist()
# Return
return res if kwargs['swap_back'] is True else (
res, tr if swap_size else None)
# Otherwise, fallback to numpy.dot
else:
# Warn
if not kwargs['force_numpy']:
warn("Fallback to 'numpy.dot'")
# Convert to complex array if needed
if b_as_complex_array:
b = backend.reshape(b[0] + 1j * b[1], b_shape)
# Get right transposition
pos = axes_b.tolist() + [x for x in range(b_ndim) if x not in axes_b]
# Reshape and transpose
b = backend.reshape(backend.transpose(b, pos), (np.prod(
b_shape[axes_b]), np.prod(b_shape) // np.prod(b_shape[axes_b])))
# Invert posisions
pos = [pos.index(x) for x in range(len(pos))]
# Apply a.dot(b), reshape to the original shape and transpose back
b = backend.transpose(backend.reshape(backend.dot(a, b), b_shape), pos)
# Return
return backend.array([backend.real(b), backend.imag(b)
]) if b_as_complex_array else b
# It should never reach here
assert (False)
Functions
def dot(a: np.ndarray, b: np.ndarray, axes_b: iter[int] = None, b_as_complex_array: bool = False, inplace: bool = False, backend: any = 'numpy', **kwargs)
-
Expand source code
def dot(a: np.ndarray, b: np.ndarray, axes_b: iter[int] = None, b_as_complex_array: bool = False, inplace: bool = False, backend: any = 'numpy', **kwargs): """ """ # Select the correct backend if backend == 'numpy': import numpy as backend elif backend == 'jax': import jax.numpy as backend else: raise ValueError(f"Backend {backend} is not supported.") # set defaults kwargs.setdefault('out', None) kwargs.setdefault('force_numpy', False) kwargs.setdefault('raise_if_hcore_fails', False) kwargs.setdefault('swap_back', True) kwargs.setdefault('alignment', 32) # If axes are not provided, fall back to numpy.dot if axes_b is None: return backend.dot(a, b, out=kwargs['out']) # Get array def _get(x): # Copy reference _x = x # Get array x = np.asarray(x, order='C') # Return array and True if new return x, x is not _x # Convert to numpy.ndarray a, _new_a = _get(a) b, _new_b = _get(b) # Get number of axes a_ndim = a.ndim b_ndim = b.ndim - (1 if b_as_complex_array else 0) # Get shapes a_shape = np.asarray(a.shape) b_shape = np.asarray(b.shape[:len(b.shape) - (1 if b_as_complex_array else 0)]) # Get axes axes_b = np.asarray(axes_b) # Get real_type and complex_type real_type = b.dtype if b_as_complex_array else np.real( np.array([1], dtype=b.dtype)).dtype complex_type = (1j * np.array([1], dtype=real_type)).dtype # Check right format for b if b_as_complex_array==True if b_as_complex_array: if b.shape[0] != 2: raise ValueError("'b' is in the wrong format.") if np.iscomplexobj(b): raise ValueError("'b' is expected to be real.") # Check axes if any(axes_b >= b_ndim): raise IndexError("Index not in 'b'") if a_shape[-1] != np.prod(b_shape[axes_b]): raise ValueError("'a' and 'b' are incompatible.") # Get positions pos = (b_ndim - axes_b[::-1] - 1).astype('uint32') # Get smallest swap size swap_size = 0 if np.all(pos >= _log2_pack_size) else next( k for k in range(_log2_pack_size, 2 * max(len(axes_b), _log2_pack_size) + 1) if sum(pos < k) <= k - _log2_pack_size) # Check if it is possible to use HybridQ core _use_hcore = not kwargs['force_numpy'] # Check library has been loaded properly _use_hcore &= _lib_dot is not None # Check if real_type is supported _use_hcore &= real_type in _c_types_map # Check that a is a matrix _use_hcore &= a_ndim == 2 and a_shape[0] == a_shape[1] # Check that the system isn't too small _use_hcore &= b_ndim >= 6 and (b_ndim - len(axes_b)) >= _log2_pack_size # Check that all axes for b have dimension 2 _use_hcore &= all(x == 2 for x in b_shape) # Check that not too many axes are used _use_hcore &= len(axes_b) <= 10 # Check that swap is not too large _use_hcore &= swap_size <= 12 # For debug purposes if not _use_hcore and not kwargs['force_numpy'] and kwargs[ 'raise_if_hcore_fails']: raise AssertionError("Cannot use HybridQ core.") # Check if the HybridQ C++ core can be used if _use_hcore: from hybridq.utils.aligned import array, isaligned # Convert if a.dtype != complex_type: warn(f"'a' is recast to '{complex_type}' to match 'b'.") a = a.astype(complex_type) # Convert and align if b_as_complex_array: b = array(b, copy=not (inplace or _new_b), alignment=kwargs['alignment']) else: b = array([np.real(b), np.imag(b)], dtype=real_type, alignment=kwargs['alignment']) # Split in real and imaginary part b_re = b[0] b_im = b[1] # Check assert (b_re.ctypes.data % 32 == 0) assert (b_im.ctypes.data % 32 == 0) # Get pointers a_ptr = a.ctypes.data_as(ctypes.POINTER(_c_types_map[real_type])) b_re_ptr = b_re.ctypes.data_as(ctypes.POINTER(_c_types_map[real_type])) b_im_ptr = b_im.ctypes.data_as(ctypes.POINTER(_c_types_map[real_type])) # Swap real and imaginary part if required if swap_size: # Get swap transposition swap_pos = np.concatenate( (np.setdiff1d(range(swap_size), pos), np.intersect1d(range(swap_size), pos))).astype('uint32') swap_pos_ptr = swap_pos.ctypes.data_as( ctypes.POINTER(ctypes.c_uint32)) swap_pos_inv = np.argsort(swap_pos).astype('uint32') swap_pos_inv_ptr = swap_pos_inv.ctypes.data_as( ctypes.POINTER(ctypes.c_uint32)) # Transpose real and imaginary part if _swap_core[real_type](b_re_ptr, swap_pos_ptr, b_ndim, swap_size): raise ValueError("Something went wrong with '_swap_core'.") if _swap_core[real_type](b_im_ptr, swap_pos_ptr, b_ndim, swap_size): raise ValueError("Something went wrong with '_swap_core'.") # Swap positions pos = np.array( [swap_pos_inv[x] if x < swap_size else x for x in pos], dtype='uint32') # Get pointer pos_ptr = pos.ctypes.data_as(ctypes.POINTER(ctypes.c_uint32)) # Call HybridQ C++ core if _dot_core[real_type](b_re_ptr, b_im_ptr, a_ptr, pos_ptr, b_ndim, len(pos)): raise ValueError('Something went wrong.') # Swap back if swap_size and kwargs['swap_back']: # Transpose real and imaginary part if _swap_core[real_type](b_re_ptr, swap_pos_inv_ptr, b_ndim, swap_size): raise ValueError("Something went wrong with '_swap_core'.") if _swap_core[real_type](b_im_ptr, swap_pos_inv_ptr, b_ndim, swap_size): raise ValueError("Something went wrong with '_swap_core'.") # Get result res = b if b_as_complex_array else b[0] + 1j * b[1] # Get the right transposition to use in `hybridq.utils.transpose` if swap_size and not kwargs['swap_back']: tr = list(range(b_ndim - swap_size)) + ( b_ndim - swap_pos_inv[::-1] - 1).tolist() # Return return res if kwargs['swap_back'] is True else ( res, tr if swap_size else None) # Otherwise, fallback to numpy.dot else: # Warn if not kwargs['force_numpy']: warn("Fallback to 'numpy.dot'") # Convert to complex array if needed if b_as_complex_array: b = backend.reshape(b[0] + 1j * b[1], b_shape) # Get right transposition pos = axes_b.tolist() + [x for x in range(b_ndim) if x not in axes_b] # Reshape and transpose b = backend.reshape(backend.transpose(b, pos), (np.prod( b_shape[axes_b]), np.prod(b_shape) // np.prod(b_shape[axes_b]))) # Invert posisions pos = [pos.index(x) for x in range(len(pos))] # Apply a.dot(b), reshape to the original shape and transpose back b = backend.transpose(backend.reshape(backend.dot(a, b), b_shape), pos) # Return return backend.array([backend.real(b), backend.imag(b) ]) if b_as_complex_array else b # It should never reach here assert (False)
def to_complex(a: array_like, b: array_like)
-
Expand source code
def to_complex(a: array_like, b: array_like): # Check that a and b have the same shape if a.shape != b.shape: raise ValueError("'a' and 'b' must have the same shape.") # Check that neither a nor b are complex objects if np.iscomplexobj(a) or np.iscomplexobj(b): raise ValueError("Both 'a' and 'b' must be real valued.") # Use faster C++ implementation if not _maybe_view(a) and not _maybe_view( b) and a.dtype == b.dtype and a.dtype in _c_types_map: # Get complex_type complex_type = (1j * np.array([0], dtype=a.dtype)).dtype # Get new array c = np.empty(a.shape, dtype=complex_type) # Get c_float_type c_float_type = _c_types_map[a.dtype] # Get pointers a_ptr = a.ctypes.data_as(ctypes.POINTER(c_float_type)) b_ptr = b.ctypes.data_as(ctypes.POINTER(c_float_type)) c_ptr = c.ctypes.data_as(ctypes.POINTER(c_float_type)) # Get size size = np.prod(a.shape) # Apply core _to_complex_core[complex_type](a_ptr, b_ptr, c_ptr, size) # Use slower numpy implementation else: c = a + 1j * b # Return complex return c
def to_complex_array(a: array_like)
-
Expand source code
def to_complex_array(a: array_like): # Check that a is complex if not np.iscomplexobj(a): raise ValueError("'a' must be an array of complex numbers.") # Get real type float_type = np.real(np.array([0], dtype=a.dtype)).dtype # Get size size = np.prod(a.shape) # Split real and imaginary part return np.reshape( np.reshape( np.asarray(a, order='C').view(float_type), (np.prod(a.shape), 2)).T, (2,) + a.shape)