Source code for scidbpy.db

"""DB, Array, and Operator
=======================

Classes for connecting to SciDB and executing queries.

"""

import copy
import enum
import itertools
import logging
import numpy
import os
import pandas
import pyarrow
import re
import requests
import string
import threading
import uuid
import warnings

try:
    from weakref import finalize
except ImportError:
    from backports.weakref import finalize

from .meta import ops_hungry, string_args
from .schema import Attribute, Dimension, Schema


[docs]class Shim(enum.Enum): cancel = 'cancel' execute_query = 'execute_query' new_session = 'new_session' read_bytes = 'read_bytes' read_lines = 'read_lines' release_session = 'release_session' upload = 'upload'
class Password_Placeholder(object): def __repr__(self): return 'PASSWORD_PROVIDED' def _shim_release_session(scidb_url, http_auth, verify, id): """Make Shim release_session request""" url = requests.compat.urljoin(scidb_url, Shim.release_session.value) req = requests.get( url, params={'id': id}, auth=http_auth, verify=verify) req.reason = req.content req.url = _sanitize_url(req.url) req.raise_for_status()
[docs]class DB(object): """SciDB Shim connection object. >>> DB() ... # doctest: +NORMALIZE_WHITESPACE DB('http://localhost:8080', None, None, None, False, None, False, 256, False) >>> print(DB()) scidb_url = http://localhost:8080 scidb_auth = None http_auth = None verify = None admin = False namespace = None use_arrow = False result_size_limit = 256 no_ops = False Constructor parameters: :param string scidb_url: SciDB connection URL. The URL for the Shim server. If ``None``, use the value of the ``SCIDB_URL`` environment variable, if present (default ``http://localhost:8080``) :param tuple scidb_auth: Tuple with username and password for connecting to SciDB, if password authentication method is used (default ``None``) :param tuple http_auth: Tuple with username and password for connecting to Shim, if Shim authentication is used (default ``None``) :param bool verify: If ``False``, HTTPS certificates are not verified. This value is passed to the Python ``requests`` library. See Python `requests <http://docs.python-requests.org/en/master/>`_ library `SSL Cert Verification <http://docs.python-requests.org/en/master/user/advanced/ #ssl-cert-verification>`_ section for details on the ``verify`` argument (default ``None``) :param bool admin: Set to ``True`` to open a higher-priority session. This is identical with the ``--admin`` flag for the ``iquery`` SciDB client, see `SciDB Documentation <https://paradigm4.atlassian.net/wiki/spaces/scidb>`_ for details (default ``False``) :param string namespace: Initial namespace for the connection. Only applicable for SciDB Enterprise Edition. The namespace can changed at any time using the ``set_namespace`` SciDB operator (default ``None``) :param bool use_arrow: If ``True``, download SciDB array using Apache Arrow library. Requires ``accelerated_io_tools`` and ``aio`` enabled in ``Shim``. If ``True``, a Pandas DataFrame is returned (``as_dataframe`` has no effect) and null-able types are promoted as per Pandas `promotion scheme <http://pandas.pydata.org/pandas-docs/stable/gotchas.html #na-type-promotions>`_ (``dataframe_promo`` has no effect). It can be overriden for each ``iquery`` call (default ``False``) :param int result_size_limit: absolute limit of the output file in Megabytes. Effective only when the `accelerated_io_tools` plug-in is installed in SciDB and `aio` is enabled in Shim (default `256` MB) :param bool no_ops: If ``True``, the list of operators is not fetched at this time and the connection is not implicitly verified. This expedites the execution of the function but disallows for calling the SciDB operators directly from the ``DB`` instance e.g., ``db.scan`` (default ``False``) """ _show_query = "show('{}', 'afl')" def __init__( self, scidb_url=None, scidb_auth=None, http_auth=None, verify=None, admin=False, namespace=None, use_arrow=False, result_size_limit=256, no_ops=False): if scidb_url is None: scidb_url = os.getenv('SCIDB_URL', 'http://localhost:8080') self.scidb_url = scidb_url self.verify = verify self.admin = admin self.namespace = namespace self.use_arrow = use_arrow self.result_size_limit = result_size_limit self.no_ops = no_ops if http_auth: self._http_auth = requests.auth.HTTPDigestAuth(*http_auth) self.http_auth = (http_auth[0], Password_Placeholder()) else: self._http_auth = self.http_auth = None admin_shim = 1 if self.admin else 0 if scidb_auth: self._id = self._shim(Shim.new_session, user=scidb_auth[0], password=scidb_auth[1], admin=admin_shim).text self.scidb_auth = (scidb_auth[0], Password_Placeholder()) else: self._id = self._shim(Shim.new_session, admin=admin_shim).text self.scidb_auth = None finalize(self, _shim_release_session, self.scidb_url, self._http_auth, self.verify, self._id) self.arrays = Arrays(self) self._uid = uuid.uuid1().hex self._lock = threading.Lock() self._array_cnt = 0 self._formatter = string.Formatter() if self.no_ops: self.operators = None self._dir = None else: self.load_ops() def __iter__(self): return (i for i in ( self.scidb_url, self.scidb_auth, self.http_auth, self.verify, self.admin, self.namespace, self.use_arrow, self.result_size_limit, self.no_ops)) def __repr__(self): return ('{}({!r}, ' + '{!r}, ' + '{!r}, ' + '{!r}, ' + '{!r}, ' + '{!r}, ' + '{!r}, ' + '{!r}, ' + '{!r})').format( type(self).__name__, *self) def __str__(self): return '''\ scidb_url = {} scidb_auth = {} http_auth = {} verify = {} admin = {} namespace = {} use_arrow = {} result_size_limit = {} no_ops = {}'''.format(*self) def __getattr__(self, name): if self.operators and name in self.operators: return Operator(self, name) elif self.no_ops: raise AttributeError( ("Operators not loaded. Run 'load_ops()' or " + "use 'no_ops=False' (default) " + "at connection time (constructor)").format( type(self))) else: raise AttributeError( '{.__name__!r} object has no attribute {!r}'.format( type(self), name)) def __dir__(self): return self._dir
[docs] def iquery(self, query, fetch=False, use_arrow=None, atts_only=False, as_dataframe=True, dataframe_promo=True, schema=None, upload_data=None, upload_schema=None): """Execute query in SciDB :param string query: SciDB AFL query to execute :param bool fetch: If ``True``, download SciDB array (default ``False``) :param bool use_arrow: If ``True``, download SciDB array using Apache Arrow library. Requires ``accelerated_io_tools`` and ``aio`` enabled in ``Shim``. If ``True``, a Pandas DataFrame is returned (``as_dataframe`` has no effect) and null-able types are promoted as per Pandas `promotion scheme <http://pandas.pydata.org/pandas-docs/stable/gotchas.html #na-type-promotions>`_ (``dataframe_promo`` has no effect). If ``None`` the ``use_arrow`` value set at connection time is used (default ``None``) :param bool atts_only: If ``True``, download only SciDB array attributes without dimensions (default ``False``) :param bool as_dataframe: If ``True``, return a Pandas DataFrame. If ``False``, return a NumPy array (default ``True``) :param bool dataframe_promo: If ``True``, null-able types are promoted as per Pandas `promotion scheme <http://pandas.pydata.org/pandas-docs/stable/gotchas.html #na-type-promotions>`_ If ``False``, object records are used for null-able types (default ``True``) :param schema: Schema of the SciDB array to use when downloading the array. Schema is not verified. If schema is a Schema instance, it is copied. Otherwise, a :py:class:``Schema`` object is built using :py:func:``Schema.fromstring`` (default ``None``) >>> DB().iquery('build(<x:int64>[i=0:1; j=0:1], i + j)', fetch=True) i j x 0 0 0 0.0 1 0 1 1.0 2 1 0 1.0 3 1 1 2.0 >>> DB().iquery("input({sch}, '{fn}', 0, '{fmt}')", ... fetch=True, ... upload_data=numpy.arange(3, 6)) i x 0 0 3 1 1 4 2 2 5 """ # Set use_arrow using local/global if use_arrow is None: use_arrow = self.use_arrow # Special case: -- - set_namespace - -- if query.startswith('set_namespace(') and query[-1] == ')': param = query[len('set_namespace('):-1] # Unquote if quoted. Will be quoted when set in prefix. if param[0] == "'" and param[-1] == "'": param = param[1:-1] self.namespace = param return if upload_data is not None: if isinstance(upload_data, numpy.ndarray): if upload_schema is None: try: upload_schema = Schema.fromdtype(upload_data.dtype) except Exception as e: warnings.warn( 'Mapping NumPy dtype to SciDB schema failed. ' + 'Try providing an explicit upload_schema') raise e # Convert upload data to bytes if upload_schema.is_fixsize(): upload_data = upload_data.tobytes() else: upload_data = upload_schema.tobytes(upload_data) # Check if placeholders are present place_holders = set( field_name for _1, field_name, _3, _4 in self._formatter.parse(query)) if 'fn' not in place_holders: warnings.warn( 'upload_data provided, but {fn} placeholder is missing', stacklevel=2) if 'fmt' in place_holders and upload_schema is None: warnings.warn( 'upload_data and {fmt} placeholder provided, ' + 'but upload_schema is None', stacklevel=2) # Check if upload data is bytes or file-like object if not (isinstance(upload_data, bytes) or isinstance(upload_data, bytearray) or hasattr(upload_data, 'read')): warnings.warn( 'upload_data is not bytes or file-like object', stacklevel=2) fn = self._shim(Shim.upload, data=upload_data).text query = query.format( sch=upload_schema, fn=fn, fmt=upload_schema.atts_fmt_scidb if upload_schema else None) if fetch: # Use provided schema or get schema from SciDB if schema: # Deep-copy schema since we might be mutating it if isinstance(schema, Schema): if not atts_only and not use_arrow: schema = copy.deepcopy(schema) else: schema = Schema.fromstring(schema) else: # Execute 'show(...)' and Download text self._shim( Shim.execute_query, query=DB._show_query.format(query.replace("'", "\\'")), save='tsv') schema = Schema.fromstring( self._shim(Shim.read_lines, n=0).text) # Attributes and dimensions can collide. Run make_unique to # remove any collisions. # # make_unique fixes any collision, but if we don't # download the dimensions, we don't need to fix collisions # between dimensions and attributes. So, we use # make_unique only if there are collisions within the # attribute names. if ((not atts_only or len(set((a.name for a in schema.atts))) < len(schema.atts)) and schema.make_unique()): # Dimensions or attributes were renamed due to # collisions. We need to cast. query = 'cast({}, {:h})'.format(query, schema) # Unpack if not atts_only and not use_arrow: # apply: add dimensions as attributes # project: place dimensions first query = 'project(apply({}, {}), {})'.format( query, ', '.join('{0}, {0}'.format(d.name) for d in schema.dims), ', '.join(i.name for i in itertools.chain( schema.dims, schema.atts))) # update schema after apply schema.make_dims_atts() # Execute Query and Download content self._shim(Shim.execute_query, query=query, save='arrow' if use_arrow else schema.atts_fmt_scidb, result_size_limit=self.result_size_limit, atts_only=1 if atts_only or not use_arrow else 0) buf = self._shim(Shim.read_bytes, n=0).content # Build result if use_arrow: data = pyarrow.RecordBatchStreamReader( pyarrow.BufferReader(buf)).read_pandas() # Place dimensions first if not atts_only: data = data[[i.name for i in itertools.chain( schema.dims, schema.atts)]] elif schema.is_fixsize(): data = numpy.frombuffer(buf, dtype=schema.atts_dtype) if as_dataframe: data = pandas.DataFrame.from_records(data) if dataframe_promo: schema.promote(data) else: # Parse binary buffer data = schema.frombytes(buf, as_dataframe, dataframe_promo) if as_dataframe: data = pandas.DataFrame.from_records(data) return data else: # fetch=False self._shim(Shim.execute_query, query=query) # Special case: -- - load_library - -- if query.startswith('load_library('): self.load_ops()
[docs] def iquery_readlines(self, query): """Execute query in SciDB >>> DB().iquery_readlines('build(<x:int64>[i=0:2], i * i)') ... # doctest: +ELLIPSIS [...'0', ...'1', ...'4'] >>> DB().iquery_readlines( ... 'apply(build(<x:int64>[i=0:2], i), y, i + 10)') ... # doctest: +ELLIPSIS [[...'0', ...'10'], [...'1', ...'11'], [...'2', ...'12']] """ self._shim(Shim.execute_query, query=query, save='tsv') ret = self._shim_readlines() return ret
[docs] def next_array_name(self): """Generate a uniqu array name. Keep track on these names using the _uid field and a counter """ # Thread-safe counter with self._lock: self._array_cnt += 1 return 'py_{}_{}'.format(self._uid, self._array_cnt)
[docs] def load_ops(self): """Get list of operators and macros. """ query_id = self._shim( Shim.execute_query, query="project(list('operators'), name)", save='tsv').text # set query ID as DB instance ID operators = self._shim_readlines() self._shim( Shim.execute_query, query="project(list('macros'), name)", save='tsv').content macros = self._shim_readlines() self.no_ops = False self.operators = operators + macros self._dir = (self.operators + ['arrays', 'gc', 'iquery', 'iquery_readlines', 'upload']) self._dir.sort()
def _cleanup(self, endpoint): """Attempt to cleanup in case of user interruption or error. Send a cancel request to the query associated with the current session """ if endpoint == Shim.execute_query: requests.get( requests.compat.urljoin(self.scidb_url, Shim.cancel.value), params={'id': self._id}, auth=self._http_auth, verify=self.verify) def _shim(self, endpoint, **kwargs): """Make request on Shim endpoint""" if endpoint != Shim.new_session: kwargs.update(id=self._id) if self.use_arrow and endpoint == Shim.execute_query: kwargs['result_size_limit'] = self.result_size_limit # Add prefix to request, if necessary if self.namespace and endpoint == Shim.execute_query: kwargs['prefix'] = "set_namespace('{}')".format(self.namespace) # Make request url = requests.compat.urljoin(self.scidb_url, endpoint.value) if endpoint == Shim.upload: # Post request req = requests.post( '{}?id={}'.format(url, kwargs['id']), data=kwargs['data'], auth=self._http_auth, verify=self.verify) else: # Get request try: req = requests.get( url, params=kwargs, auth=self._http_auth, verify=self.verify) except KeyboardInterrupt as e: self._cleanup(endpoint) raise e req.reason = req.content req.url = _sanitize_url(req.url) try: req.raise_for_status() except Exception as e: self._cleanup(endpoint) raise e return req def _shim_readlines(self): """Read data from Shim and parse as text lines""" return [line.split('\t') if '\t' in line else line for line in self._shim( Shim.read_lines, n=0).text.splitlines()]
[docs]class Arrays(object): """Access to arrays available in SciDB""" def __init__(self, db): self.db = db def __repr__(self): return '{}({!r})'.format( type(self).__name__, self.db) def __str__(self): return '''DB: {}'''.format(self.db) def __getattr__(self, name): """db.arrays.foo""" return Array(self.db, name) def __getitem__(self, name): """db.arrays['foo']""" return Array(self.db, name) def __dir__(self): """Download the list of SciDB arrays. Use 'project(list(), name)' to download only names and schemas """ return self.db.iquery_readlines('project(list(), name)')
[docs]class Array(object): """Access to individual array""" def __init__(self, db, name, gc=False): self.db = db self.name = name if gc: finalize(self, self.db.iquery, 'remove({})'.format(self.name)) def __repr__(self): return '{}({!r}, {!r})'.format( type(self).__name__, self.db, self.name) def __str__(self): return self.name def __getattr__(self, key): return ArrayExp('{}.{}'.format(self.name, key)) def __getitem__(self, key): return self.fetch()[key] def __dir__(self): """Download the schema of the SciDB array, using ``show()``""" sh = Schema.fromstring( self.db.iquery_readlines('show({})'.format(self))[0][0]) ls = [i.name for i in itertools.chain(sh.atts, sh.dims)] ls.sort() return ls def __mod__(self, alias): """Overloads ``%`` operator to add support for aliasing""" return Array(self.db, '{} as {}'.format(self.name, alias)) def fetch(self, **kwargs): return self.db.iquery('scan({})'.format(self), fetch=True, **kwargs)
[docs] def head(self, n=5, **kwargs): """Similar to ``pandas.DataFrame.head``. Makes use of the ``limit`` operator, if available. """ if 'limit' in self.db.operators: return self.db.iquery('limit({}, {})'.format(self, n), fetch=True, **kwargs) else: warnings.warn( '"limit" operator not available. ' + 'Fetching the entire array. ' + 'See https://github.com/Paradigm4/limit.') return self.fetch(**kwargs)[:n]
def schema(self): return Schema.fromstring( self.db.iquery_readlines("show({})".format(self))[0][0])
[docs]class ArrayExp(object): """Access to individual attribute or dimension""" def __init__(self, exp): self.exp = exp def __repr__(self): return '{}({!r})'.format(type(self).__name__, self.exp) def __str__(self): return '{}'.format(self.exp) def __add__(self, other): return ArrayExp('{} + {}'.format(self, other))
[docs]class Operator(object): """Store SciDB operator and arguments. Hungry operators (e.g., remove, store, etc.) evaluate immediately. Lazy operators evaluate on data fetch. """ def __init__(self, db, name, upload_data=None, upload_schema=None, *args): self.db = db self.name = name.lower() self.upload_data = upload_data self.upload_schema = upload_schema self.args = list(args) self.is_lazy = self.name not in ops_hungry self._dir = self.db.operators + ['fetch'] self._dir.sort() def __repr__(self): return '{}(db={!r}, name={!r}, args=[{}])'.format( type(self).__name__, self.db, self.name, ', '.join('{!r}'.format(i) for i in self.args)) def __str__(self): args_fmt = [] for (pos, arg) in enumerate(self.args): # Format argument to string (possibly recursive) arg_fmt = '{}'.format(arg) # Special case: quote string argument if not quoted if (pos < len(string_args) and self.name in string_args[pos] and len(arg) and arg_fmt[0] != "'" and arg_fmt[-1] != "'"): arg_fmt = "'{}'".format(arg_fmt) # Special case: -- - show - -- if (pos == 0 and self.name == 'show' and (isinstance(arg, Operator) or len(self.args) > 1)): arg_fmt = "'{}'".format(arg_fmt.replace("'", "\\'")) # Add to arguments list args_fmt.append(arg_fmt) return '{}({})'.format(self.name, ', '.join(args_fmt)) def __call__(self, *args, **kwargs): """Returns self for lazy expressions. Executes immediate expressions. """ # Propagate "upload_data" and "upload_schema" from previous # operators for arg in args: if isinstance(arg, Operator) and ( arg.upload_data is not None or arg.upload_schema is not None): if self.upload_data is not None or (self.upload_schema is not None): raise NotImplementedError( 'Queries with multiple "upload_data" or ' + '"upload_schema" arguments are not supported') self.upload_data = arg.upload_data self.upload_schema = arg.upload_schema self.args.extend(args) # Special case: -- - create_array - -- if self.name == 'create_array' and len(self.args) < 3: # Set "temporary" self.args.append(False) # Special case: -- - input & load - -- elif self.name in ('input', 'load'): ln = len(self.args) # Set upload data if 'upload_data' in kwargs.keys(): self.upload_data = kwargs['upload_data'] # Set upload schema if 'upload_schema' in kwargs.keys(): # Pass through if provided as argument self.upload_schema = kwargs['upload_schema'] if self.upload_schema is None: # If the upload_data is a NumPy array try to map the # array dtype to upload schema if (self.upload_data is not None and isinstance(self.upload_data, numpy.ndarray)): try: self.upload_schema = Schema.fromdtype( self.upload_data.dtype) except Exception: # Might fail if the dtype contains # objects. The same type mapping is attempted # later in iquery, but there the exception is # propagated pass # If the operator is input, try to map first argument # to upload schema if (self.upload_schema is None and self.name == 'input' and ln >= 1): try: self.upload_schema = Schema.fromstring(args[0]) except Exception: # Fails if the argument is an array name pass # Set required arguments if missing # Check if "input_file" is present (2nd argument) if ln < 2: # Check if "existing_array|anonymous_schema" # is present (1st argument) if ln < 1: self.args.append('{sch}') # anonymous_schema self.args.append("'{fn}'") # input_file # Set optional arguments if missing and necessary # Check if "format" is present (4th argument) if ln < 4 and self.upload_data is not None: # Check if "instance_id" is present (3nd argument) if ln < 3: self.args.append(0) # instance_id self.args.append("'{fmt}'") # format # Special case: -- - store - -- elif self.name == 'store': if len(self.args) < 2: # Set "named_array" self.args.append(self.db.next_array_name()) # Garbage collect (if not specified) if 'gc' not in kwargs.keys(): kwargs['gc'] = True # If temp=True in kwargs, create a temporary array first if 'temp' in kwargs.keys() and kwargs['temp'] is True: # Get the schema of the new array try: new_schema = Schema.fromstring( self.db.iquery_readlines( "show('{}', 'afl')".format( str(self.args[0]).replace("'", "\\'")))[0][0]) except requests.HTTPError as e: e.args = ( '"temp=True" not supported for complex queries\n' + e.args[0], ) raise # Set array name new_schema.name = self.args[1] # Create temporary array self.db.iquery('create temp array {}'.format(new_schema)) # Lazy or hungry if self.is_lazy: # Lazy return self else: # Hungry # Execute query self.db.iquery(str(self), upload_data=self.upload_data, upload_schema=self.upload_schema) # Handle output # Special case: -- - load - -- if self.name == 'load': if isinstance(self.args[0], Array): return self.args[0] else: return Array(self.db, self.args[0]) # Special case: -- - store - -- elif self.name == 'store': if isinstance(self.args[1], Array): return self.args[1] else: return Array(self.db, self.args[1], kwargs.get('gc', False)) def __getitem__(self, key): return self.fetch()[key] def __getattr__(self, name): if name in self.db.operators: return Operator( self.db, name, self.upload_data, self.upload_schema, self) else: raise AttributeError( '{.__name__!r} object has no attribute {!r}'.format( type(self), name)) def __dir__(self): return self._dir def __mod__(self, alias): """Overloads ``%`` operator to add support for aliasing""" return Array(self.db, '{} as {}'.format(self, alias)) def fetch(self, **kwargs): if self.is_lazy: return self.db.iquery(str(self), fetch=True, upload_data=self.upload_data, upload_schema=self.upload_schema, **kwargs) def schema(self): if self.is_lazy: return Schema.fromstring( self.db.iquery_readlines( "show('{}', 'afl')".format(self))[0][0])
_sanitize_url_re = re.compile('((user)|(password))=[^&]*(?=(&|$))') def _sanitize_url(url): return _sanitize_url_re.sub('\\1=...', url) connect = DB iquery = DB.iquery if __name__ == "__main__": # logging.basicConfig(level=logging.DEBUG) import doctest doctest.testmod(optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)