Source code for scidbpy.db

"""DB, Array, and Operator
=======================

Classes for connecting to SciDB and executing queries.

"""

import configparser
import copy
import errno
import itertools
import json
import os
import stat
import string
import threading
import uuid
import warnings

import numpy
import pandas
import pyarrow
import requests

try:
    from weakref import finalize
except ImportError:
    from backports.weakref import finalize

from .backend import get_backend
from .be_shim import Shim
from .meta import ops_eager, string_args
from .schema import Schema

# Default maximum cells per page of paged query results.
_DEFAULT_PAGE_SIZE = 1_000_000


class Password_Placeholder(object):
    def __repr__(self):
        return 'PASSWORD_PROVIDED'


[docs] class DB(object): """SciDB connection object. >>> DB() ... # doctest: +NORMALIZE_WHITESPACE DB('https://...', ..., None, ..., False, None, False, 256, False) >>> print(DB()) scidb_url = https://... scidb_auth = ... http_auth = None verify = ... admin = False namespace = None use_arrow = False result_size_limit = 256 no_ops = False Constructor parameters: :param string scidb_url: SciDB connection URL. The URL for the Shim server or for the native client API. If ``None``, use the value of the ``SCIDB_URL`` environment variable, if present (default ``http://localhost:8080``) :param tuple scidb_auth: Credentials for connecting to scidb, if scidb is configured to use password authentication. Either a (username, password) tuple, or the path to an authentication file which can be in INI or JSON format with the structure: ``{"user-name": "name", "user-password": "password"}``. If not provided, credentials are read from a file in the first location that exists among: - ``$SCIDB_AUTH_FILE`` - ``$XDG_CONFIG_DIR/scidb/iquery.auth`` - ``~/.config/scidb/iquery.auth`` :param tuple http_auth: Tuple with username and password for connecting to Shim, if Shim authentication is used (default ``None``) :param bool verify: Either a bool, or a path to a cert file. * If ``True``, the HTTPS certificate is verified against the system's trusted CA store. * If ``False``, the HTTPS certificate is not verified. This will generate a warning. * If a string, the string must be a path to a cert or ca-cert file. The connection's HTTPS certificate is verified against that file. * If omitted or ``None``, defaults to the setting in the ``SCIDBPY_VERIFY_HTTPS`` environment variable if present, otherwise defaults to ``True``. See Python `requests <http://docs.python-requests.org/en/master/>`_ library `SSL Cert Verification <http://docs.python-requests.org/en/master/user/advanced/ #ssl-cert-verification>`_ section for details on the ``verify`` argument (default ``None``) :param bool admin: Set to ``True`` to open a higher-priority session. This is identical with the ``--admin`` flag for the ``iquery`` SciDB client, see `SciDB Documentation <https://paradigm4.atlassian.net/wiki/spaces/scidb>`_ for details (default ``False``) :param string namespace: Initial namespace for the connection. Only applicable for SciDB Enterprise Edition. The namespace can changed at any time using the ``set_namespace`` SciDB operator (default ``None``) :param bool use_arrow: If ``True``, download SciDB array using Apache Arrow library. Requires ``accelerated_io_tools`` and ``aio`` enabled in ``Shim``. If ``True``, a Pandas DataFrame is returned (``as_dataframe`` has no effect) and null-able types are promoted as per Pandas `promotion scheme <http://pandas.pydata.org/pandas-docs/stable/gotchas.html #na-type-promotions>`_ (``dataframe_promo`` has no effect). It can be overridden for each ``iquery`` call (default ``False``) :param int page_size: Maximum number of cells per page of output when executing paged queries, that is, non-upload queries that save their output. Client API only; ignored for Shim. (default 1,000,000) :param int result_size_limit: absolute limit of the output file in Megabytes. Effective only when the ``accelerated_io_tools`` plug-in is installed in SciDB and ``aio`` is enabled in Shim (default ``256`` MB) :param int inactivity_timeout: Seconds until SciDB server will cancel a paged query, unless the client requests another page. Should only need to be increased for multithreaded apps where a thread holding the GIL may interfere with paged response processing. Client API only; ignored for Shim. (default 60s) :param bool no_ops: If ``True``, the list of operators is not fetched at this time and the connection is not implicitly verified. This expedites the execution of the function but disallows for calling the SciDB operators directly from the ``DB`` instance e.g., ``db.scan`` (default ``False``) :param int progress_check_secs: Client API only. After every interval of this many seconds, interrupt and resume the HTTP request so the client can check on the query's progress, and so the server can confirm that this client is still active. The ``progress_check_callback`` is called, if provided. Setting this to a lower value provides more frequent progress updates, but might have the effect of slowing the query down because it gets interrupted and resumed more often. (This "progress check" is only enabled if the query is resumable.) (Default: 60 seconds) :param func progress_check_callback: Client API only. A callback function of the form: ``func(query_info, page_number, response_number, cumulative_nbytes, ...)`` which returns None. This is called once when the page is first requested, and again after every ``progress_check_secs`` seconds while the query is executing. If the callback raises an exception, the query gets canceled. The callback arguments include: * query_info: an object with attributes that include ``id`` (the ID of the query) and ``schema`` (the SciDB schema returned by the query) * page_number: the page number being fetched (1 for the first page) * response_number: increments by 1 each time through the callback. * cumulative_nbytes: the cumulative total number of content bytes received for this page, including the current response and all previous responses for the page. The callback MUST have ``*args`` and ``**kwargs`` arguments to allow for future changes to the callback interface. It must ignore any arguments it doesn't understand. (This callback is only enabled if the query is resumable.) :param reauth_callback: An optional function matching the signature:: Callable[[requests.HTTPError], Optional[Tuple[str, str]]] or:: Callable[[Exception], Optional[Tuple[str, str]]] The return value should be a (username, password) tuple or None. This function is called whenever a request fails with a 401 "Unauthorized" or 403 "Forbidden" response. The function can provide stored credentials, display a login prompt, trigger 2FA, etc. If the function returns None, the reauthentication is canceled and the error response is returned as usual. The function can inspect the HTTPError argument for details in order to generate a message to show the user (e.g. "session on <host> expired", "second factor required to log in to <domain>", "incorrect password", etc.). :param reauth_tries: The maximum number of times to call ``reauth_callback()`` if a request returns a 401/403 response response and subsequent reauth attempts also return 401/403. - Set this to 0 to disable reauthentication. - Set this to 1 (default) if the callback returns stored credentials (i.e. if repeating the callback won't change anything). - Set this to >1 if the callback prompts the user for credentials and you want to let the user try again after making a mistake. :param backoff_fn: A callback function that waits some number of seconds. It should have signature like:: def backoff_fn(err: requests.HTTPError, delay: int) If we receive a 429 "Too Many Requests" response from the server, this lets the application perform other tasks while waiting for the server to become available. The HTTPError argument gives the app information it can use to display a message to the user, e.g. "the server is busy processing other queries". Note that backoff_fn may choose to return earlier, or later, than ``delay``. To stop waiting, it can raise the HTTPError it received as its first argument. """ _show_query = "show('{}', 'afl')" def __init__( self, scidb_url=None, scidb_auth=None, http_auth=None, verify=None, admin=False, namespace=None, use_arrow=False, page_size=_DEFAULT_PAGE_SIZE, result_size_limit=256, no_ops=False, inactivity_timeout=None, progress_check_secs=None, progress_check_callback=None, reauth_callback=None, reauth_tries=None, backoff_fn=None): if scidb_url is None: scidb_url = os.environ.get("SCIDB_URL", "http://localhost:8080") if verify is None: envvar = os.environ.get("SCIDBPY_VERIFY_HTTPS") if envvar: if envvar.lower() in ("false", "f"): verify = False elif envvar.lower() in ("true", "t"): verify = True elif os.path.exists(envvar): verify = envvar else: warnings.warn( f'Unrecognized value "{envvar}" for env variable' ' "SCIDBPY_VERIFY_HTTPS" (ignoring). It must be' ' "true", "false", or a path to an existing file.') self.scidb_url = scidb_url self.verify = verify self.admin = admin self.namespace = namespace self.use_arrow = use_arrow self.page_size = page_size self.result_size_limit = result_size_limit self.no_ops = no_ops self.inactivity_timeout = inactivity_timeout self.progress_check_secs = progress_check_secs self.progress_check_callback = progress_check_callback self.reauth_callback = reauth_callback self.reauth_tries = reauth_tries self.backoff_fn = backoff_fn if http_auth: self._http_auth = requests.auth.HTTPDigestAuth(*http_auth) self.http_auth = (http_auth[0], Password_Placeholder()) else: self._http_auth = self.http_auth = None if scidb_auth is None: scidb_auth = self._default_auth_file() if isinstance(scidb_auth, str): self.authfile = os.path.realpath(scidb_auth) scidb_auth = self._auth_file_credentials(self.authfile) # Create a reauth callback to automatically re-read the file # and reauthenticate if the session's cookie expires. # This must happen before get_backend(self). if not self.reauth_callback: def cb(httperror): _ = httperror return self._auth_file_credentials(self.authfile) self.reauth_callback = cb self.reauth_tries = 1 if scidb_auth: self._scidb_auth = scidb_auth self.scidb_auth = (scidb_auth[0], Password_Placeholder()) else: self._scidb_auth = self.scidb_auth = None self._backend = get_backend(self) # Copies attributes set so far. self.arrays = Arrays(self) self._uid = uuid.uuid1().hex self._lock = threading.Lock() self._array_cnt = 0 self._formatter = string.Formatter() if self.no_ops: self.operators = None self._dir = None else: self.load_ops() @staticmethod def _default_auth_file(): """Conjure up the default auth file name.""" authfile = os.environ.get( "SCIDB_AUTH_FILE", os.path.join( os.environ.get("XDG_CONFIG_DIR", os.path.expanduser("~/.config")), "scidb/iquery.auth")) return authfile @staticmethod def _auth_file_credentials(authfile): """Get SciDB credentials from an authentication file if possible.""" # Check file type and permissions. try: st = os.stat(authfile) except OSError as e: if e.errno != errno.ENOENT: warnings.warn(f"{authfile}: stat: {e.strerror}") return None if not stat.S_ISREG(st.st_mode): warnings.warn(f"{authfile}: Not a regular file") return None if (st.st_mode & (stat.S_IRWXG | stat.S_IRWXO)) != 0: warnings.warn(f"{authfile}: Bad file permissions") return None # Open it and extract credentials. It can be either JSON or # INI format. with open(authfile) as F: contents = F.read() try: doc = json.loads(contents) user = doc['user-name'] passwd = doc['user-password'] return user, passwd except Exception: pass config = configparser.ConfigParser() config.read(authfile) assert 'security_password' in config section = config['security_password'] return section['user-name'], section['user-password'] def __iter__(self): return (i for i in ( self.scidb_url, self.scidb_auth, self.http_auth, self.verify, self.admin, self.namespace, self.use_arrow, self.result_size_limit, self.no_ops)) def __repr__(self): return ('{}({!r}, ' + '{!r}, ' + '{!r}, ' + '{!r}, ' + '{!r}, ' + '{!r}, ' + '{!r}, ' + '{!r}, ' + '{!r})').format( type(self).__name__, *self) def __str__(self): return '''\ scidb_url = {} scidb_auth = {} http_auth = {} verify = {} admin = {} namespace = {} use_arrow = {} result_size_limit = {} no_ops = {}'''.format(*self) def __getattr__(self, name): if self.operators and name in self.operators: return Operator(self, name) elif self.no_ops: raise AttributeError( ("Operators not loaded. Run 'load_ops()' or " + "use 'no_ops=False' (default) " + "at connection time (constructor)").format( type(self))) else: raise AttributeError( '{.__name__!r} object has no attribute {!r}'.format( type(self), name)) def __dir__(self): return self._dir
[docs] def uses_shim(self): """Return True if this connection goes through Shim. False means the connection uses the SciDB Client HTTP API.""" return isinstance(self._backend, Shim)
[docs] def iquery(self, query, fetch=False, use_arrow=None, use_arrow_stream=False, atts_only=False, as_dataframe=True, dataframe_promo=True, schema=None, page_size=_DEFAULT_PAGE_SIZE, upload_data=None, upload_schema=None, **kwargs): """Execute query in SciDB :param string query: SciDB AFL query to execute :param bool fetch: If ``True``, download SciDB array (default ``False``) :param bool use_arrow: If ``True``, download SciDB array using Apache Arrow library. Requires ``accelerated_io_tools`` and ``aio`` enabled in ``Shim``. If ``True``, a Pandas DataFrame is returned (``as_dataframe`` has no effect) and null-able types are promoted as per Pandas `promotion scheme <http://pandas.pydata.org/pandas-docs/stable/gotchas.html #na-type-promotions>`_ (``dataframe_promo`` has no effect). If ``None`` the ``use_arrow`` value set at connection time is used (default ``None``) :param bool use_arrow_stream: If ``True``, return a ``RecordBatchStreamReader`` object to the user. The user will extract the records from the stream reader. This parameter only had effect if ``use_arrow`` is set to ``True`` (default ``False``) :param bool atts_only: If ``True``, download only SciDB array attributes without dimensions (default ``False``) :param bool as_dataframe: If ``True``, return a Pandas DataFrame. If ``False``, return a NumPy array (default ``True``) :param bool dataframe_promo: If ``True``, null-able types are promoted as per Pandas `promotion scheme <http://pandas.pydata.org/pandas-docs/stable/gotchas.html #na-type-promotions>`_ If ``False``, object records are used for null-able types (default ``True``) :param schema: Schema of the SciDB array to use when downloading the array. Schema is not verified. If schema is a Schema instance, it is copied. Otherwise, a :py:class:``Schema`` object is built using :py:func:``Schema.fromstring`` (default ``None``) :param int page_size: Maximum number of cells per page of output when executing paged queries, that is, non-upload queries that save their output. Client API only; ignored for Shim. (default 1,000,000) >>> DB().iquery('build(<x:int64>[i=0:1; j=0:1], i + j)', fetch=True) i j x 0 0 0 0.0 1 0 1 1.0 2 1 0 1.0 3 1 1 2.0 >>> DB().iquery("input({sch}, '{fn}', 0, '{fmt}')", ... fetch=True, ... upload_data=numpy.arange(3, 6)) i x 0 0 3 1 1 4 2 2 5 """ # Set use_arrow using local/global if use_arrow is None: use_arrow = self.use_arrow # Special case: -- - set_namespace - -- if query.startswith('set_namespace(') and query[-1] == ')': param = query[len('set_namespace('):-1] # Unquote if quoted. Will be quoted when set in prefix. if param[0] == "'" and param[-1] == "'": param = param[1:-1] self.namespace = param return if upload_data is not None: if isinstance(upload_data, numpy.ndarray): if upload_schema is None: try: upload_schema = Schema.fromdtype(upload_data.dtype) except Exception as e: warnings.warn( 'Mapping NumPy dtype to SciDB schema failed. ' + 'Try providing an explicit upload_schema') raise e # Convert upload data to bytes if upload_schema.is_fixsize(): upload_data = upload_data.tobytes() else: upload_data = upload_schema.tobytes(upload_data) # Check if placeholders are present place_holders = set( field_name for _1, field_name, _3, _4 in self._formatter.parse(query)) if 'fn' not in place_holders: warnings.warn( 'upload_data provided, but {fn} placeholder is missing', stacklevel=2) if 'fmt' in place_holders and upload_schema is None: warnings.warn( 'upload_data and {fmt} placeholder provided, ' + 'but upload_schema is None', stacklevel=2) # Check if upload data is bytes or file-like object if not (isinstance(upload_data, bytes) or isinstance(upload_data, bytearray) or hasattr(upload_data, 'read')): warnings.warn( 'upload_data is not bytes or file-like object', stacklevel=2) fn = self._backend.prepare_upload(upload_data) query = query.format( sch=upload_schema, fn=fn, fmt=upload_schema.atts_fmt_scidb if upload_schema else None) if fetch: # Use provided schema or get schema from SciDB if schema: # Deep-copy schema since we might be mutating it if isinstance(schema, Schema): if not atts_only and not use_arrow: schema = copy.deepcopy(schema) else: schema = Schema.fromstring(schema) else: # Execute 'show(...)' and Download text self._backend.runquery( query=DB._show_query.format(query.replace("'", "\\'")), page_size=page_size, save='tsv') schema = Schema.fromstring(self._backend.read()) # Attributes and dimensions can collide. Run make_unique to # remove any collisions. # # make_unique fixes any collision, but if we don't # download the dimensions, we don't need to fix collisions # between dimensions and attributes. So, we use # make_unique only if there are collisions within the # attribute names. if ((not atts_only or len(set((a.name for a in schema.atts))) < len(schema.atts)) and schema.make_unique()): # Dimensions or attributes were renamed due to # collisions. We need to cast. ({:h} is our # custom format for Schema objects, see schema.py) query = 'cast({}, {:h})'.format(query, schema) # Unpack if not atts_only: # TODO If SDB-7905 is fixed use the following code # query = f'flatten({query})' query = f'unpack({query}, i)' # update schema after unpacking/flattening schema.make_dims_atts() # Execute Query and Download content self._backend.runquery( query=query, save='arrow' if use_arrow else schema.atts_fmt_scidb, page_size=page_size, result_size_limit=self.result_size_limit, atts_only=1, # atts_only handled by unpack/flatten above # backend always uses atts_only=1 **kwargs) buf = self._backend.readbytes() # Build result if use_arrow: # This is what gets returned if use_arrow_stream = True data = pyarrow.RecordBatchStreamReader( pyarrow.BufferReader(buf)) if not use_arrow_stream: data = data.read_pandas() elif schema.is_fixsize(): data = numpy.frombuffer(buf, dtype=schema.atts_dtype) if as_dataframe: data = pandas.DataFrame.from_records(data) if dataframe_promo: schema.promote(data) else: # Parse binary buffer data = schema.frombytes(buf, as_dataframe, dataframe_promo) if as_dataframe: data = pandas.DataFrame.from_records(data) return data else: # fetch=False self._backend.runquery(query=query, **kwargs) # Special case: -- - load_library - -- if query.startswith('load_library('): self.load_ops()
[docs] def iquery_readlines(self, query, page_size=_DEFAULT_PAGE_SIZE, **kwargs): """Execute query in SciDB >>> DB().iquery_readlines('build(<x:int64>[i=0:2], i * i)') ... # doctest: +ELLIPSIS [...'0', ...'1', ...'4'] >>> DB().iquery_readlines( ... 'apply(build(<x:int64>[i=0:2], i), y, i + 10)') ... # doctest: +ELLIPSIS [[...'0', ...'10'], [...'1', ...'11'], [...'2', ...'12']] """ self._backend.runquery(query=query, save='tsv', page_size=page_size, **kwargs) return self._backend.readlines(split='\t')
[docs] def next_array_name(self): """Generate a unique array name. Keep track on these names using the _uid field and a counter """ # Thread-safe counter with self._lock: self._array_cnt += 1 return 'py_{}_{}'.format(self._uid, self._array_cnt)
[docs] def load_ops(self): """Get list of operators and macros. """ self._backend.runquery( query="project(list('operators'), name)", save='tsv') operators = self._backend.readlines() self._backend.runquery( query="project(list('macros'), name)", save='tsv') macros = self._backend.readlines() self.no_ops = False self.operators = operators + macros self._dir = (self.operators + ['arrays', 'gc', 'iquery', 'iquery_readlines', 'upload']) self._dir.sort()
[docs] class Arrays(object): """Access to arrays available in SciDB""" def __init__(self, db): self.db = db def __repr__(self): return '{}({!r})'.format( type(self).__name__, self.db) def __str__(self): return '''DB: {}'''.format(self.db) def __getattr__(self, name): """db.arrays.foo""" return Array(self.db, name) def __getitem__(self, name): """db.arrays['foo']""" return Array(self.db, name) def __dir__(self): """Download the list of SciDB arrays. Use 'project(list(), name)' to download only names and schemas """ return self.db.iquery_readlines('project(list(), name)')
[docs] class Array(object): """Access to individual array""" def __init__(self, db, name, gc=False): self.db = db self.name = name if gc: finalize(self, self.db.iquery, 'remove({})'.format(self.name)) def __repr__(self): return '{}({!r}, {!r})'.format( type(self).__name__, self.db, self.name) def __str__(self): return self.name def __getattr__(self, key): return ArrayExp('{}.{}'.format(self.name, key)) def __getitem__(self, key): return self.fetch()[key] def __dir__(self): """Download the schema of the SciDB array, using ``show()``""" sh = Schema.fromstring( self.db.iquery_readlines('show({})'.format(self))[0][0]) ls = [i.name for i in itertools.chain(sh.atts, sh.dims)] ls.sort() return ls def __mod__(self, alias): """Overloads ``%`` operator to add support for aliasing""" return Array(self.db, '{} as {}'.format(self.name, alias)) def fetch(self, **kwargs): return self.db.iquery('scan({})'.format(self), fetch=True, **kwargs)
[docs] def head(self, n=5, **kwargs): """Similar to ``pandas.DataFrame.head``. Makes use of the ``limit`` operator, if available. """ if 'limit' in self.db.operators: return self.db.iquery('limit({}, {})'.format(self, n), fetch=True, **kwargs) else: warnings.warn( '"limit" operator not available. ' + 'Fetching the entire array. ' + 'See https://github.com/Paradigm4/limit.') return self.fetch(**kwargs)[:n]
def schema(self): return Schema.fromstring( self.db.iquery_readlines("show({})".format(self))[0][0])
[docs] class ArrayExp(object): """Access to individual attribute or dimension""" def __init__(self, exp): self.exp = exp def __repr__(self): return '{}({!r})'.format(type(self).__name__, self.exp) def __str__(self): return '{}'.format(self.exp) def __add__(self, other): return ArrayExp('{} + {}'.format(self, other))
[docs] class Operator(object): """Store SciDB operator and arguments. Hungry operators (e.g., remove, store, etc.) evaluate immediately. Lazy operators evaluate on data fetch. """ def __init__(self, db, name, upload_data=None, upload_schema=None, *args): self.db = db self.name = name.lower() self.upload_data = upload_data self.upload_schema = upload_schema self.args = list(args) self.is_lazy = self.name not in ops_eager self._dir = self.db.operators + ['fetch'] self._dir.sort() def __repr__(self): return '{}(db={!r}, name={!r}, args=[{}])'.format( type(self).__name__, self.db, self.name, ', '.join('{!r}'.format(i) for i in self.args)) def __str__(self): args_fmt = [] for (pos, arg) in enumerate(self.args): # Format argument to string (possibly recursive) arg_fmt = '{}'.format(arg) # Special case: quote string argument if not quoted if (pos < len(string_args) and self.name in string_args[pos] and len(arg) and arg_fmt[0] != "'" and arg_fmt[-1] != "'"): arg_fmt = "'{}'".format(arg_fmt) # Special case: -- - show - -- if (pos == 0 and self.name == 'show' and (isinstance(arg, Operator) or len(self.args) > 1)): arg_fmt = "'{}'".format(arg_fmt.replace("'", "\\'")) # Add to arguments list args_fmt.append(arg_fmt) return '{}({})'.format(self.name, ', '.join(args_fmt)) def __call__(self, *args, **kwargs): """Returns self for lazy expressions. Executes immediate expressions. """ # Propagate "upload_data" and "upload_schema" from previous # operators for arg in args: if isinstance(arg, Operator) and ( arg.upload_data is not None or arg.upload_schema is not None): if self.upload_data is not None or (self.upload_schema is not None): raise NotImplementedError( 'Queries with multiple "upload_data" or ' + '"upload_schema" arguments are not supported') self.upload_data = arg.upload_data self.upload_schema = arg.upload_schema self.args.extend(args) # Special case: -- - create_array - -- if self.name == 'create_array' and len(self.args) < 3: # Set "temporary" self.args.append(False) # Special case: -- - input & load - -- elif self.name in ('input', 'load'): ln = len(self.args) # Set upload data if 'upload_data' in kwargs.keys(): self.upload_data = kwargs['upload_data'] # Set upload schema if 'upload_schema' in kwargs.keys(): # Pass through if provided as argument self.upload_schema = kwargs['upload_schema'] if self.upload_schema is None: # If the upload_data is a NumPy array try to map the # array dtype to upload schema if (self.upload_data is not None and isinstance(self.upload_data, numpy.ndarray)): try: self.upload_schema = Schema.fromdtype( self.upload_data.dtype) except Exception: # Might fail if the dtype contains # objects. The same type mapping is attempted # later in iquery, but there the exception is # propagated pass # If the operator is input, try to map first argument # to upload schema if (self.upload_schema is None and self.name == 'input' and ln >= 1): try: self.upload_schema = Schema.fromstring(args[0]) except Exception: # Fails if the argument is an array name pass # Set required arguments if missing # Check if "input_file" is present (2nd argument) if ln < 2: # Check if "existing_array|anonymous_schema" # is present (1st argument) if ln < 1: self.args.append('{sch}') # anonymous_schema self.args.append("'{fn}'") # input_file # Set optional arguments if missing and necessary # Check if "format" is present (4th argument) if ln < 4 and self.upload_data is not None: # Check if "instance_id" is present (3rd argument) if ln < 3: self.args.append(0) # instance_id self.args.append("'{fmt}'") # format # Special case: -- - store - -- elif self.name == 'store': if len(self.args) < 2: # Set "named_array" self.args.append(self.db.next_array_name()) # Garbage collect (if not specified) if 'gc' not in kwargs.keys(): kwargs['gc'] = True # If temp=True in kwargs, create a temporary array first if 'temp' in kwargs.keys() and kwargs['temp'] is True: # Get the schema of the new array try: new_schema = Schema.fromstring( self.db.iquery_readlines( "show('{}', 'afl')".format( str(self.args[0]).replace("'", "\\'")))[0][0]) except requests.HTTPError as e: e.args = ( '"temp=True" not supported for complex queries\n' + e.args[0], ) raise # Set array name new_schema.name = self.args[1] # Create temporary array self.db.iquery('create temp array {}'.format(new_schema)) # Lazy or eager if self.is_lazy: # Lazy return self else: # Hungry # Execute query self.db.iquery(str(self), upload_data=self.upload_data, upload_schema=self.upload_schema) # Handle output # Special case: -- - load - -- if self.name == 'load': if isinstance(self.args[0], Array): return self.args[0] else: return Array(self.db, self.args[0]) # Special case: -- - store - -- elif self.name == 'store': if isinstance(self.args[1], Array): return self.args[1] else: return Array(self.db, self.args[1], kwargs.get('gc', False)) def __getitem__(self, key): return self.fetch()[key] def __getattr__(self, name): if name in self.db.operators: return Operator( self.db, name, self.upload_data, self.upload_schema, self) else: raise AttributeError( '{.__name__!r} object has no attribute {!r}'.format( type(self), name)) def __dir__(self): return self._dir def __mod__(self, alias): """Overloads ``%`` operator to add support for aliasing""" return Array(self.db, '{} as {}'.format(self, alias)) def fetch(self, **kwargs): if self.is_lazy: return self.db.iquery(str(self), fetch=True, upload_data=self.upload_data, upload_schema=self.upload_schema, **kwargs) def schema(self): if self.is_lazy: return Schema.fromstring( self.db.iquery_readlines( "show('{}', 'afl')".format(self))[0][0])
connect = DB iquery = DB.iquery if __name__ == "__main__": # logging.basicConfig(level=logging.DEBUG) os.environ["SCIDBPY_VERIFY_HTTPS"] = "False" import doctest doctest.testmod(optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)