"""DB, Array, and Operator
=======================
Classes for connecting to SciDB and executing queries.
"""
import configparser
import copy
import errno
import itertools
import json
import os
import stat
import string
import threading
import uuid
import warnings
import numpy
import pandas
import pyarrow
import requests
try:
from weakref import finalize
except ImportError:
from backports.weakref import finalize
from .backend import get_backend
from .be_shim import Shim
from .meta import ops_eager, string_args
from .schema import Schema
# Default maximum cells per page of paged query results.
_DEFAULT_PAGE_SIZE = 1_000_000
class Password_Placeholder(object):
def __repr__(self):
return 'PASSWORD_PROVIDED'
[docs]
class DB(object):
"""SciDB connection object.
>>> DB()
... # doctest: +NORMALIZE_WHITESPACE
DB('https://...',
...,
None,
...,
False,
None,
False,
256,
False)
>>> print(DB())
scidb_url = https://...
scidb_auth = ...
http_auth = None
verify = ...
admin = False
namespace = None
use_arrow = False
result_size_limit = 256
no_ops = False
Constructor parameters:
:param string scidb_url: SciDB connection URL. The URL for the
Shim server or for the native client API. If ``None``, use the
value of the ``SCIDB_URL`` environment variable, if present
(default ``http://localhost:8080``)
:param tuple scidb_auth: Credentials for connecting to scidb,
if scidb is configured to use password authentication.
Either a (username, password) tuple, or the path to an
authentication file which can be in INI or JSON format
with the structure:
``{"user-name": "name", "user-password": "password"}``.
If not provided, credentials are read from a file in the first
location that exists among:
- ``$SCIDB_AUTH_FILE``
- ``$XDG_CONFIG_DIR/scidb/iquery.auth``
- ``~/.config/scidb/iquery.auth``
:param tuple http_auth: Tuple with username and password for
connecting to Shim, if Shim authentication is used (default
``None``)
:param bool verify: Either a bool, or a path to a cert file.
* If ``True``, the HTTPS certificate is verified against the system's
trusted CA store.
* If ``False``, the HTTPS certificate is not verified. This will
generate a warning.
* If a string, the string must be a path to a cert or ca-cert file.
The connection's HTTPS certificate is verified against that file.
* If omitted or ``None``, defaults to the setting in the
``SCIDBPY_VERIFY_HTTPS`` environment variable if present, otherwise
defaults to ``True``.
See Python `requests
<http://docs.python-requests.org/en/master/>`_ library `SSL Cert
Verification
<http://docs.python-requests.org/en/master/user/advanced/
#ssl-cert-verification>`_ section for details on the ``verify``
argument (default ``None``)
:param bool admin: Set to ``True`` to open a higher-priority
session. This is identical with the ``--admin`` flag for the
``iquery`` SciDB client, see `SciDB Documentation
<https://paradigm4.atlassian.net/wiki/spaces/scidb>`_ for
details (default ``False``)
:param string namespace: Initial namespace for the
connection. Only applicable for SciDB Enterprise Edition. The
namespace can changed at any time using the ``set_namespace``
SciDB operator (default ``None``)
:param bool use_arrow: If ``True``, download SciDB array using
Apache Arrow library. Requires ``accelerated_io_tools`` and
``aio`` enabled in ``Shim``. If ``True``, a Pandas DataFrame is
returned (``as_dataframe`` has no effect) and null-able types
are promoted as per Pandas `promotion scheme
<http://pandas.pydata.org/pandas-docs/stable/gotchas.html
#na-type-promotions>`_ (``dataframe_promo`` has no effect). It
can be overridden for each ``iquery`` call (default ``False``)
:param int page_size: Maximum number of cells per page of output
when executing paged queries, that is, non-upload queries that
save their output. Client API only; ignored for Shim. (default
1,000,000)
:param int result_size_limit: absolute limit of the output file in
Megabytes. Effective only when the ``accelerated_io_tools``
plug-in is installed in SciDB and ``aio`` is enabled in Shim
(default ``256`` MB)
:param int inactivity_timeout: Seconds until SciDB server will
cancel a paged query, unless the client requests another page.
Should only need to be increased for multithreaded apps where a
thread holding the GIL may interfere with paged response
processing. Client API only; ignored for Shim. (default 60s)
:param bool no_ops: If ``True``, the list of operators is not
fetched at this time and the connection is not implicitly
verified. This expedites the execution of the function but
disallows for calling the SciDB operators directly from the
``DB`` instance e.g., ``db.scan`` (default ``False``)
:param int progress_check_secs: Client API only. After every
interval of this many seconds, interrupt and resume the HTTP request
so the client can check on the query's progress, and so the server
can confirm that this client is still active. The
``progress_check_callback`` is called, if provided. Setting this to
a lower value provides more frequent progress updates, but might
have the effect of slowing the query down because it gets
interrupted and resumed more often. (This "progress check" is only
enabled if the query is resumable.) (Default: 60 seconds)
:param func progress_check_callback: Client API only. A callback
function of the form: ``func(query_info, page_number,
response_number, cumulative_nbytes, ...)`` which returns None. This
is called once when the page is first requested, and again after
every ``progress_check_secs`` seconds while the query is
executing. If the callback raises an exception, the query gets
canceled.
The callback arguments include:
* query_info: an object with attributes that include ``id`` (the
ID of the query) and ``schema`` (the SciDB schema returned by
the query)
* page_number: the page number being fetched (1 for the first page)
* response_number: increments by 1 each time through the callback.
* cumulative_nbytes: the cumulative total number of content bytes
received for this page, including the current response and
all previous responses for the page.
The callback MUST have ``*args`` and ``**kwargs`` arguments to allow
for future changes to the callback interface. It must ignore any
arguments it doesn't understand. (This callback is only enabled if
the query is resumable.)
:param reauth_callback: An optional function matching the signature::
Callable[[requests.HTTPError], Optional[Tuple[str, str]]]
or::
Callable[[Exception], Optional[Tuple[str, str]]]
The return value should be a (username, password) tuple or None.
This function is called whenever a request fails with a 401
"Unauthorized" or 403 "Forbidden" response. The function can provide
stored credentials, display a login prompt, trigger 2FA, etc. If the
function returns None, the reauthentication is canceled and the
error response is returned as usual.
The function can inspect the HTTPError argument for details in order
to generate a message to show the user (e.g. "session on <host>
expired", "second factor required to log in to <domain>",
"incorrect password", etc.).
:param reauth_tries: The maximum number of times to call
``reauth_callback()`` if a request returns a 401/403 response
response and subsequent reauth attempts also return 401/403.
- Set this to 0 to disable reauthentication.
- Set this to 1 (default) if the callback returns stored credentials
(i.e. if repeating the callback won't change anything).
- Set this to >1 if the callback prompts the user for credentials
and you want to let the user try again after making a mistake.
:param backoff_fn: A callback function that waits some number of seconds.
It should have signature like::
def backoff_fn(err: requests.HTTPError, delay: int)
If we receive a 429 "Too Many Requests" response from the server,
this lets the application perform other tasks while waiting for the
server to become available.
The HTTPError argument gives the app information it can use to display a
message to the user, e.g. "the server is busy processing other queries".
Note that backoff_fn may choose to return earlier, or later, than
``delay``. To stop waiting, it can raise the HTTPError it received
as its first argument.
"""
_show_query = "show('{}', 'afl')"
def __init__(
self,
scidb_url=None,
scidb_auth=None,
http_auth=None,
verify=None,
admin=False,
namespace=None,
use_arrow=False,
page_size=_DEFAULT_PAGE_SIZE,
result_size_limit=256,
no_ops=False,
inactivity_timeout=None,
progress_check_secs=None,
progress_check_callback=None,
reauth_callback=None,
reauth_tries=None,
backoff_fn=None):
if scidb_url is None:
scidb_url = os.environ.get("SCIDB_URL", "http://localhost:8080")
if verify is None:
envvar = os.environ.get("SCIDBPY_VERIFY_HTTPS")
if envvar:
if envvar.lower() in ("false", "f"):
verify = False
elif envvar.lower() in ("true", "t"):
verify = True
elif os.path.exists(envvar):
verify = envvar
else:
warnings.warn(
f'Unrecognized value "{envvar}" for env variable'
' "SCIDBPY_VERIFY_HTTPS" (ignoring). It must be'
' "true", "false", or a path to an existing file.')
self.scidb_url = scidb_url
self.verify = verify
self.admin = admin
self.namespace = namespace
self.use_arrow = use_arrow
self.page_size = page_size
self.result_size_limit = result_size_limit
self.no_ops = no_ops
self.inactivity_timeout = inactivity_timeout
self.progress_check_secs = progress_check_secs
self.progress_check_callback = progress_check_callback
self.reauth_callback = reauth_callback
self.reauth_tries = reauth_tries
self.backoff_fn = backoff_fn
if http_auth:
self._http_auth = requests.auth.HTTPDigestAuth(*http_auth)
self.http_auth = (http_auth[0], Password_Placeholder())
else:
self._http_auth = self.http_auth = None
if scidb_auth is None:
scidb_auth = self._default_auth_file()
if isinstance(scidb_auth, str):
self.authfile = os.path.realpath(scidb_auth)
scidb_auth = self._auth_file_credentials(self.authfile)
# Create a reauth callback to automatically re-read the file
# and reauthenticate if the session's cookie expires.
# This must happen before get_backend(self).
if not self.reauth_callback:
def cb(httperror):
_ = httperror
return self._auth_file_credentials(self.authfile)
self.reauth_callback = cb
self.reauth_tries = 1
if scidb_auth:
self._scidb_auth = scidb_auth
self.scidb_auth = (scidb_auth[0], Password_Placeholder())
else:
self._scidb_auth = self.scidb_auth = None
self._backend = get_backend(self) # Copies attributes set so far.
self.arrays = Arrays(self)
self._uid = uuid.uuid1().hex
self._lock = threading.Lock()
self._array_cnt = 0
self._formatter = string.Formatter()
if self.no_ops:
self.operators = None
self._dir = None
else:
self.load_ops()
@staticmethod
def _default_auth_file():
"""Conjure up the default auth file name."""
authfile = os.environ.get(
"SCIDB_AUTH_FILE",
os.path.join(
os.environ.get("XDG_CONFIG_DIR",
os.path.expanduser("~/.config")),
"scidb/iquery.auth"))
return authfile
@staticmethod
def _auth_file_credentials(authfile):
"""Get SciDB credentials from an authentication file if possible."""
# Check file type and permissions.
try:
st = os.stat(authfile)
except OSError as e:
if e.errno != errno.ENOENT:
warnings.warn(f"{authfile}: stat: {e.strerror}")
return None
if not stat.S_ISREG(st.st_mode):
warnings.warn(f"{authfile}: Not a regular file")
return None
if (st.st_mode & (stat.S_IRWXG | stat.S_IRWXO)) != 0:
warnings.warn(f"{authfile}: Bad file permissions")
return None
# Open it and extract credentials. It can be either JSON or
# INI format.
with open(authfile) as F:
contents = F.read()
try:
doc = json.loads(contents)
user = doc['user-name']
passwd = doc['user-password']
return user, passwd
except Exception:
pass
config = configparser.ConfigParser()
config.read(authfile)
assert 'security_password' in config
section = config['security_password']
return section['user-name'], section['user-password']
def __iter__(self):
return (i for i in (
self.scidb_url,
self.scidb_auth,
self.http_auth,
self.verify,
self.admin,
self.namespace,
self.use_arrow,
self.result_size_limit,
self.no_ops))
def __repr__(self):
return ('{}({!r}, ' +
'{!r}, ' +
'{!r}, ' +
'{!r}, ' +
'{!r}, ' +
'{!r}, ' +
'{!r}, ' +
'{!r}, ' +
'{!r})').format(
type(self).__name__, *self)
def __str__(self):
return '''\
scidb_url = {}
scidb_auth = {}
http_auth = {}
verify = {}
admin = {}
namespace = {}
use_arrow = {}
result_size_limit = {}
no_ops = {}'''.format(*self)
def __getattr__(self, name):
if self.operators and name in self.operators:
return Operator(self, name)
elif self.no_ops:
raise AttributeError(
("Operators not loaded. Run 'load_ops()' or " +
"use 'no_ops=False' (default) " +
"at connection time (constructor)").format(
type(self)))
else:
raise AttributeError(
'{.__name__!r} object has no attribute {!r}'.format(
type(self), name))
def __dir__(self):
return self._dir
[docs]
def uses_shim(self):
"""Return True if this connection goes through Shim.
False means the connection uses the SciDB Client HTTP API."""
return isinstance(self._backend, Shim)
[docs]
def iquery(self,
query,
fetch=False,
use_arrow=None,
use_arrow_stream=False,
atts_only=False,
as_dataframe=True,
dataframe_promo=True,
schema=None,
page_size=_DEFAULT_PAGE_SIZE,
upload_data=None,
upload_schema=None,
**kwargs):
"""Execute query in SciDB
:param string query: SciDB AFL query to execute
:param bool fetch: If ``True``, download SciDB array (default
``False``)
:param bool use_arrow: If ``True``, download SciDB array using
Apache Arrow library. Requires ``accelerated_io_tools`` and
``aio`` enabled in ``Shim``. If ``True``, a Pandas DataFrame
is returned (``as_dataframe`` has no effect) and null-able
types are promoted as per Pandas `promotion scheme
<http://pandas.pydata.org/pandas-docs/stable/gotchas.html
#na-type-promotions>`_ (``dataframe_promo`` has no
effect). If ``None`` the ``use_arrow`` value set at
connection time is used (default ``None``)
:param bool use_arrow_stream: If ``True``, return a
``RecordBatchStreamReader`` object to the user. The user
will extract the records from the stream reader. This
parameter only had effect if ``use_arrow`` is set to
``True`` (default ``False``)
:param bool atts_only: If ``True``, download only SciDB array
attributes without dimensions (default ``False``)
:param bool as_dataframe: If ``True``, return a Pandas
DataFrame. If ``False``, return a NumPy array (default
``True``)
:param bool dataframe_promo: If ``True``, null-able types are
promoted as per Pandas `promotion scheme
<http://pandas.pydata.org/pandas-docs/stable/gotchas.html
#na-type-promotions>`_ If ``False``, object records are used
for null-able types (default ``True``)
:param schema: Schema of the SciDB array to use when
downloading the array. Schema is not verified. If schema is
a Schema instance, it is copied. Otherwise, a
:py:class:``Schema`` object is built using
:py:func:``Schema.fromstring`` (default ``None``)
:param int page_size: Maximum number of cells per page of output
when executing paged queries, that is, non-upload queries that
save their output. Client API only; ignored for Shim. (default
1,000,000)
>>> DB().iquery('build(<x:int64>[i=0:1; j=0:1], i + j)', fetch=True)
i j x
0 0 0 0.0
1 0 1 1.0
2 1 0 1.0
3 1 1 2.0
>>> DB().iquery("input({sch}, '{fn}', 0, '{fmt}')",
... fetch=True,
... upload_data=numpy.arange(3, 6))
i x
0 0 3
1 1 4
2 2 5
"""
# Set use_arrow using local/global
if use_arrow is None:
use_arrow = self.use_arrow
# Special case: -- - set_namespace - --
if query.startswith('set_namespace(') and query[-1] == ')':
param = query[len('set_namespace('):-1]
# Unquote if quoted. Will be quoted when set in prefix.
if param[0] == "'" and param[-1] == "'":
param = param[1:-1]
self.namespace = param
return
if upload_data is not None:
if isinstance(upload_data, numpy.ndarray):
if upload_schema is None:
try:
upload_schema = Schema.fromdtype(upload_data.dtype)
except Exception as e:
warnings.warn(
'Mapping NumPy dtype to SciDB schema failed. ' +
'Try providing an explicit upload_schema')
raise e
# Convert upload data to bytes
if upload_schema.is_fixsize():
upload_data = upload_data.tobytes()
else:
upload_data = upload_schema.tobytes(upload_data)
# Check if placeholders are present
place_holders = set(
field_name
for _1, field_name, _3, _4 in self._formatter.parse(query))
if 'fn' not in place_holders:
warnings.warn(
'upload_data provided, but {fn} placeholder is missing',
stacklevel=2)
if 'fmt' in place_holders and upload_schema is None:
warnings.warn(
'upload_data and {fmt} placeholder provided, ' +
'but upload_schema is None',
stacklevel=2)
# Check if upload data is bytes or file-like object
if not (isinstance(upload_data, bytes) or
isinstance(upload_data, bytearray) or
hasattr(upload_data, 'read')):
warnings.warn(
'upload_data is not bytes or file-like object',
stacklevel=2)
fn = self._backend.prepare_upload(upload_data)
query = query.format(
sch=upload_schema,
fn=fn,
fmt=upload_schema.atts_fmt_scidb if upload_schema else None)
if fetch:
# Use provided schema or get schema from SciDB
if schema:
# Deep-copy schema since we might be mutating it
if isinstance(schema, Schema):
if not atts_only and not use_arrow:
schema = copy.deepcopy(schema)
else:
schema = Schema.fromstring(schema)
else:
# Execute 'show(...)' and Download text
self._backend.runquery(
query=DB._show_query.format(query.replace("'", "\\'")),
page_size=page_size,
save='tsv')
schema = Schema.fromstring(self._backend.read())
# Attributes and dimensions can collide. Run make_unique to
# remove any collisions.
#
# make_unique fixes any collision, but if we don't
# download the dimensions, we don't need to fix collisions
# between dimensions and attributes. So, we use
# make_unique only if there are collisions within the
# attribute names.
if ((not atts_only or
len(set((a.name for a in schema.atts))) <
len(schema.atts)) and schema.make_unique()):
# Dimensions or attributes were renamed due to
# collisions. We need to cast. ({:h} is our
# custom format for Schema objects, see schema.py)
query = 'cast({}, {:h})'.format(query, schema)
# Unpack
if not atts_only:
# TODO If SDB-7905 is fixed use the following code
# query = f'flatten({query})'
query = f'unpack({query}, i)'
# update schema after unpacking/flattening
schema.make_dims_atts()
# Execute Query and Download content
self._backend.runquery(
query=query,
save='arrow' if use_arrow else schema.atts_fmt_scidb,
page_size=page_size,
result_size_limit=self.result_size_limit,
atts_only=1, # atts_only handled by unpack/flatten above
# backend always uses atts_only=1
**kwargs)
buf = self._backend.readbytes()
# Build result
if use_arrow:
# This is what gets returned if use_arrow_stream = True
data = pyarrow.RecordBatchStreamReader(
pyarrow.BufferReader(buf))
if not use_arrow_stream:
data = data.read_pandas()
elif schema.is_fixsize():
data = numpy.frombuffer(buf, dtype=schema.atts_dtype)
if as_dataframe:
data = pandas.DataFrame.from_records(data)
if dataframe_promo:
schema.promote(data)
else:
# Parse binary buffer
data = schema.frombytes(buf, as_dataframe, dataframe_promo)
if as_dataframe:
data = pandas.DataFrame.from_records(data)
return data
else: # fetch=False
self._backend.runquery(query=query, **kwargs)
# Special case: -- - load_library - --
if query.startswith('load_library('):
self.load_ops()
[docs]
def iquery_readlines(self, query, page_size=_DEFAULT_PAGE_SIZE, **kwargs):
"""Execute query in SciDB
>>> DB().iquery_readlines('build(<x:int64>[i=0:2], i * i)')
... # doctest: +ELLIPSIS
[...'0', ...'1', ...'4']
>>> DB().iquery_readlines(
... 'apply(build(<x:int64>[i=0:2], i), y, i + 10)')
... # doctest: +ELLIPSIS
[[...'0', ...'10'], [...'1', ...'11'], [...'2', ...'12']]
"""
self._backend.runquery(query=query,
save='tsv',
page_size=page_size,
**kwargs)
return self._backend.readlines(split='\t')
[docs]
def next_array_name(self):
"""Generate a unique array name. Keep track on these names using the
_uid field and a counter
"""
# Thread-safe counter
with self._lock:
self._array_cnt += 1
return 'py_{}_{}'.format(self._uid, self._array_cnt)
[docs]
def load_ops(self):
"""Get list of operators and macros.
"""
self._backend.runquery(
query="project(list('operators'), name)",
save='tsv')
operators = self._backend.readlines()
self._backend.runquery(
query="project(list('macros'), name)",
save='tsv')
macros = self._backend.readlines()
self.no_ops = False
self.operators = operators + macros
self._dir = (self.operators +
['arrays',
'gc',
'iquery',
'iquery_readlines',
'upload'])
self._dir.sort()
[docs]
class Arrays(object):
"""Access to arrays available in SciDB"""
def __init__(self, db):
self.db = db
def __repr__(self):
return '{}({!r})'.format(
type(self).__name__, self.db)
def __str__(self):
return '''DB:
{}'''.format(self.db)
def __getattr__(self, name):
"""db.arrays.foo"""
return Array(self.db, name)
def __getitem__(self, name):
"""db.arrays['foo']"""
return Array(self.db, name)
def __dir__(self):
"""Download the list of SciDB arrays. Use 'project(list(), name)' to
download only names and schemas
"""
return self.db.iquery_readlines('project(list(), name)')
[docs]
class Array(object):
"""Access to individual array"""
def __init__(self, db, name, gc=False):
self.db = db
self.name = name
if gc:
finalize(self,
self.db.iquery,
'remove({})'.format(self.name))
def __repr__(self):
return '{}({!r}, {!r})'.format(
type(self).__name__, self.db, self.name)
def __str__(self):
return self.name
def __getattr__(self, key):
return ArrayExp('{}.{}'.format(self.name, key))
def __getitem__(self, key):
return self.fetch()[key]
def __dir__(self):
"""Download the schema of the SciDB array, using ``show()``"""
sh = Schema.fromstring(
self.db.iquery_readlines('show({})'.format(self))[0][0])
ls = [i.name for i in itertools.chain(sh.atts, sh.dims)]
ls.sort()
return ls
def __mod__(self, alias):
"""Overloads ``%`` operator to add support for aliasing"""
return Array(self.db, '{} as {}'.format(self.name, alias))
def fetch(self, **kwargs):
return self.db.iquery('scan({})'.format(self),
fetch=True,
**kwargs)
[docs]
def head(self, n=5, **kwargs):
"""Similar to ``pandas.DataFrame.head``. Makes use of the ``limit``
operator, if available.
"""
if 'limit' in self.db.operators:
return self.db.iquery('limit({}, {})'.format(self, n),
fetch=True,
**kwargs)
else:
warnings.warn(
'"limit" operator not available. ' +
'Fetching the entire array. ' +
'See https://github.com/Paradigm4/limit.')
return self.fetch(**kwargs)[:n]
def schema(self):
return Schema.fromstring(
self.db.iquery_readlines("show({})".format(self))[0][0])
[docs]
class ArrayExp(object):
"""Access to individual attribute or dimension"""
def __init__(self, exp):
self.exp = exp
def __repr__(self):
return '{}({!r})'.format(type(self).__name__, self.exp)
def __str__(self):
return '{}'.format(self.exp)
def __add__(self, other):
return ArrayExp('{} + {}'.format(self, other))
[docs]
class Operator(object):
"""Store SciDB operator and arguments. Hungry operators (e.g., remove,
store, etc.) evaluate immediately. Lazy operators evaluate on data
fetch.
"""
def __init__(self, db, name, upload_data=None, upload_schema=None, *args):
self.db = db
self.name = name.lower()
self.upload_data = upload_data
self.upload_schema = upload_schema
self.args = list(args)
self.is_lazy = self.name not in ops_eager
self._dir = self.db.operators + ['fetch']
self._dir.sort()
def __repr__(self):
return '{}(db={!r}, name={!r}, args=[{}])'.format(
type(self).__name__,
self.db,
self.name,
', '.join('{!r}'.format(i) for i in self.args))
def __str__(self):
args_fmt = []
for (pos, arg) in enumerate(self.args):
# Format argument to string (possibly recursive)
arg_fmt = '{}'.format(arg)
# Special case: quote string argument if not quoted
if (pos < len(string_args) and
self.name in string_args[pos] and
len(arg) and
arg_fmt[0] != "'" and
arg_fmt[-1] != "'"):
arg_fmt = "'{}'".format(arg_fmt)
# Special case: -- - show - --
if (pos == 0 and
self.name == 'show' and
(isinstance(arg, Operator) or
len(self.args) > 1)):
arg_fmt = "'{}'".format(arg_fmt.replace("'", "\\'"))
# Add to arguments list
args_fmt.append(arg_fmt)
return '{}({})'.format(self.name, ', '.join(args_fmt))
def __call__(self, *args, **kwargs):
"""Returns self for lazy expressions. Executes immediate expressions.
"""
# Propagate "upload_data" and "upload_schema" from previous
# operators
for arg in args:
if isinstance(arg, Operator) and (
arg.upload_data is not None or
arg.upload_schema is not None):
if self.upload_data is not None or (self.upload_schema
is not None):
raise NotImplementedError(
'Queries with multiple "upload_data" or ' +
'"upload_schema" arguments are not supported')
self.upload_data = arg.upload_data
self.upload_schema = arg.upload_schema
self.args.extend(args)
# Special case: -- - create_array - --
if self.name == 'create_array' and len(self.args) < 3:
# Set "temporary"
self.args.append(False)
# Special case: -- - input & load - --
elif self.name in ('input', 'load'):
ln = len(self.args)
# Set upload data
if 'upload_data' in kwargs.keys():
self.upload_data = kwargs['upload_data']
# Set upload schema
if 'upload_schema' in kwargs.keys():
# Pass through if provided as argument
self.upload_schema = kwargs['upload_schema']
if self.upload_schema is None:
# If the upload_data is a NumPy array try to map the
# array dtype to upload schema
if (self.upload_data is not None and
isinstance(self.upload_data, numpy.ndarray)):
try:
self.upload_schema = Schema.fromdtype(
self.upload_data.dtype)
except Exception:
# Might fail if the dtype contains
# objects. The same type mapping is attempted
# later in iquery, but there the exception is
# propagated
pass
# If the operator is input, try to map first argument
# to upload schema
if (self.upload_schema is None and
self.name == 'input' and
ln >= 1):
try:
self.upload_schema = Schema.fromstring(args[0])
except Exception:
# Fails if the argument is an array name
pass
# Set required arguments if missing
# Check if "input_file" is present (2nd argument)
if ln < 2:
# Check if "existing_array|anonymous_schema"
# is present (1st argument)
if ln < 1:
self.args.append('{sch}') # anonymous_schema
self.args.append("'{fn}'") # input_file
# Set optional arguments if missing and necessary
# Check if "format" is present (4th argument)
if ln < 4 and self.upload_data is not None:
# Check if "instance_id" is present (3rd argument)
if ln < 3:
self.args.append(0) # instance_id
self.args.append("'{fmt}'") # format
# Special case: -- - store - --
elif self.name == 'store':
if len(self.args) < 2:
# Set "named_array"
self.args.append(self.db.next_array_name())
# Garbage collect (if not specified)
if 'gc' not in kwargs.keys():
kwargs['gc'] = True
# If temp=True in kwargs, create a temporary array first
if 'temp' in kwargs.keys() and kwargs['temp'] is True:
# Get the schema of the new array
try:
new_schema = Schema.fromstring(
self.db.iquery_readlines(
"show('{}', 'afl')".format(
str(self.args[0]).replace("'", "\\'")))[0][0])
except requests.HTTPError as e:
e.args = (
'"temp=True" not supported for complex queries\n' +
e.args[0],
)
raise
# Set array name
new_schema.name = self.args[1]
# Create temporary array
self.db.iquery('create temp array {}'.format(new_schema))
# Lazy or eager
if self.is_lazy: # Lazy
return self
else: # Hungry
# Execute query
self.db.iquery(str(self),
upload_data=self.upload_data,
upload_schema=self.upload_schema)
# Handle output
# Special case: -- - load - --
if self.name == 'load':
if isinstance(self.args[0], Array):
return self.args[0]
else:
return Array(self.db, self.args[0])
# Special case: -- - store - --
elif self.name == 'store':
if isinstance(self.args[1], Array):
return self.args[1]
else:
return Array(self.db,
self.args[1],
kwargs.get('gc', False))
def __getitem__(self, key):
return self.fetch()[key]
def __getattr__(self, name):
if name in self.db.operators:
return Operator(
self.db, name, self.upload_data, self.upload_schema, self)
else:
raise AttributeError(
'{.__name__!r} object has no attribute {!r}'.format(
type(self), name))
def __dir__(self):
return self._dir
def __mod__(self, alias):
"""Overloads ``%`` operator to add support for aliasing"""
return Array(self.db, '{} as {}'.format(self, alias))
def fetch(self, **kwargs):
if self.is_lazy:
return self.db.iquery(str(self),
fetch=True,
upload_data=self.upload_data,
upload_schema=self.upload_schema,
**kwargs)
def schema(self):
if self.is_lazy:
return Schema.fromstring(
self.db.iquery_readlines(
"show('{}', 'afl')".format(self))[0][0])
connect = DB
iquery = DB.iquery
if __name__ == "__main__":
# logging.basicConfig(level=logging.DEBUG)
os.environ["SCIDBPY_VERIFY_HTTPS"] = "False"
import doctest
doctest.testmod(optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)