# -*- coding: utf-8 -*-
# Copyright (c) 2017-2025, Paradigm4 Inc. All Rights Reserved.
"""DB, Array, and Operator
=======================
Classes for connecting to SciDB and executing queries.
"""
import configparser
import copy
import errno
import itertools
import json
import os
import ssl
import stat
import string
import threading
import uuid
import warnings
import weakref
import httpx
import numpy
import pandas
import pyarrow
from .backend import get_backend
from .be_shim import Shim
from .meta import ops_eager, string_args
from .schema import Schema
# Default maximum cells per page of paged query results.
_DEFAULT_PAGE_SIZE = 1_000_000
class Password_Placeholder(object):
def __repr__(self):
return "PASSWORD_PROVIDED"
[docs]
class DB(object):
"""SciDB connection object.
>>> DB()
... # doctest: +NORMALIZE_WHITESPACE
DB('https://...',
...,
None,
...,
False,
None,
False,
256,
False)
>>> print(DB())
scidb_url = https://...
scidb_auth = ...
http_auth = None
verify = ...
admin = False
namespace = None
use_arrow = False
result_size_limit = 256
no_ops = False
You can also create a DB object in a ``with`` clause; this closes
the SciDB connection at the end of the scope::
with DB() as db:
db.iquery(...)
Constructor parameters:
:param string scidb_url: SciDB connection URL. The URL for the Shim
server or for the native client API. If ``None``, use the value of
the ``SCIDB_URL`` environment variable, if present (default
``http://localhost:8080``)
:param tuple scidb_auth: Credentials for connecting to scidb, if
scidb is configured to use password authentication. Either a
(username, password) tuple, or the path to an authentication file
which can be in INI or JSON format with the structure:
``{"user-name": "name", "user-password": "password"}``.
If not provided, credentials are read from a file in the first
location that exists among:
- ``$SCIDB_AUTH_FILE``
- ``$XDG_CONFIG_DIR/scidb/iquery.auth``
- ``~/.config/scidb/iquery.auth``
:param tuple http_auth: Tuple with username and password for
connecting to Shim, if Shim authentication is used (default
``None``)
:param bool verify: Either a bool, or a path to a cert file.
* If ``True``, the HTTPS certificate is verified against the
system's trusted CA store.
* If ``False``, the HTTPS certificate is not verified. This will
generate a warning.
* If a string, the string must be a path to a cert or ca-cert
file. The connection's HTTPS certificate is verified against
that file.
* If omitted or ``None``, defaults to the setting in the
``SCIDBPY_VERIFY_HTTPS`` environment variable if present,
otherwise defaults to ``True``.
See Python `httpx <https://www.python-httpx.org>`_ library `SSL
Cert Verification <https://www.python-httpx.org/advanced/ssl/>`_
section for details on the ``verify`` argument (default ``None``)
:param bool admin: Set to ``True`` to open a higher-priority
session. This is identical with the ``--admin`` flag for the
``iquery`` SciDB client, see `SciDB Documentation
<https://paradigm4.atlassian.net/wiki/spaces/scidb>`_ for details
(default ``False``)
:param string namespace: Initial namespace for the connection. Only
applicable for SciDB Enterprise Edition. The namespace can changed
at any time using the ``set_namespace`` SciDB operator (default
``None``)
:param bool use_arrow: If ``True``, download SciDB array using
Apache Arrow library. Requires ``accelerated_io_tools`` and
``aio`` enabled in ``Shim``. If ``True``, a Pandas DataFrame is
returned (``as_dataframe`` has no effect) and null-able types are
promoted as per Pandas `promotion scheme
<https://pandas.pydata.org/pandas-docs/stable/user_guide/gotchas.html#na-type-promotions-for-numpy-types>`_
(``dataframe_promo`` has no effect). It can be overridden for each
``iquery`` call (default ``False``)
:param int page_size: Maximum number of cells per page of output
when executing paged queries, that is, non-upload queries that
save their output. Client API only; ignored for Shim. (default
1,000,000)
:param int result_size_limit: absolute limit of the output file in
Megabytes. Effective only when the ``accelerated_io_tools``
plug-in is installed in SciDB and ``aio`` is enabled in Shim
(default ``256`` MB)
:param int inactivity_timeout: Seconds until SciDB server will
cancel a paged query, unless the client requests another page.
Should only need to be increased for multithreaded apps where a
thread holding the GIL may interfere with paged response
processing. Client API only; ignored for Shim. (default 60s)
:param bool no_ops: If ``True``, the list of operators is not
fetched at this time and the connection is not implicitly
verified. This expedites the execution of the function but
disallows for calling the SciDB operators directly from the ``DB``
instance e.g., ``db.scan`` (default ``False``)
:param int progress_check_secs: Client API only. After every
interval of this many seconds, interrupt and resume the HTTP
request so the client can check on the query's progress, and so
the server can confirm that this client is still active. The
``progress_check_callback`` is called, if provided. Setting this
to a lower value provides more frequent progress updates, but
might have the effect of slowing the query down because it gets
interrupted and resumed more often. (This "progress check" is only
enabled if the query is resumable.) (Default: 60 seconds)
:param func progress_check_callback: Client API only. A callback
function of the form: ``func(query_info, page_number,
response_number, cumulative_nbytes, ...)`` which returns None.
This is called once when the page is first requested, and again
after every ``progress_check_secs`` seconds while the query is
executing. If the callback raises an exception, the query gets
canceled.
The callback arguments include:
* query_info: an object with attributes that include ``id`` (the
ID of the query) and ``schema`` (the SciDB schema returned by
the query)
* page_number: the page number being fetched (1 for the first
page)
* response_number: increments by 1 each time through the
callback.
* cumulative_nbytes: the cumulative total number of content
bytes received for this page, including the current response
and all previous responses for the page.
The callback MUST have ``*args`` and ``**kwargs`` arguments to
allow for future changes to the callback interface. It must ignore
any arguments it doesn't understand. (This callback is only
enabled if the query is resumable.)
:param reauth_callback: An optional function matching the
signature::
Callable[[httpx.HTTPStatusError], Optional[Tuple[str, str]]]
or::
Callable[[Exception], Optional[Tuple[str, str]]]
The return value should be a (username, password) tuple or None.
This function is called whenever a request fails with a 401
"Unauthorized" or 403 "Forbidden" response. The functioan
provide stored credentials, display a login prompt, trigger 2FA,
etc. If the function returns None, the reauthentication is
canceled and the error response is returned as usual.
The function can inspect the HTTPError argument for details in
order to generate a message to show the user (e.g. "session on
<host> expired", "second factor required to log in to <domain>",
"incorrect password", etc.).
:param reauth_tries: The maximum number of times to call
``reauth_callback()`` if a request returns a 401/403 response
response and subsequent reauth attempts also return 401/403.
* Set this to 0 to disable reauthentication.
* Set this to 1 (default) if the callback returns stored
credentials (i.e. if repeating the callback won't change
anything).
* Set this to >1 if the callback prompts the user for credentials
and you want to let the user try again after making a mistake.
:param backoff_fn: A callback function that waits some number of
seconds. It should have signature like::
def backoff_fn(err: httpx.HTTPStatusError, delay: int)
If we receive a 429 "Too Many Requests" response from the server,
this lets the application perform other tasks while waiting for
the server to become available.
The HTTPError argument gives the app information it can use to
display a message to the user, e.g. "the server is busy processing
other queries".
Note that backoff_fn may choose to return earlier, or later, than
``delay``. To stop waiting, it can raise the HTTPError it received
as its first argument.
"""
_show_query = "show('{}', 'afl')"
# httpx module connection timeouts
# https://www.python-httpx.org/advanced/timeouts/
#
# Per requests doc, a little more than some multiple of three.
# https://requests.readthedocs.io/en/latest/user/advanced/#timeouts
_connect_timeout = 31
_read_write_timeout = 60
_pool_timeout = 5
def __init__(
self,
scidb_url=None,
scidb_auth=None,
http_auth=None,
verify=None,
admin=False,
namespace=None,
use_arrow=False,
page_size=_DEFAULT_PAGE_SIZE,
result_size_limit=256,
no_ops=False,
inactivity_timeout=None,
progress_check_secs=None,
progress_check_callback=None,
reauth_callback=None,
reauth_tries=None,
backoff_fn=None,
):
if scidb_url is None:
scidb_url = os.environ.get("SCIDB_URL", "http://localhost:8080")
if verify is None:
envvar = os.environ.get("SCIDBPY_VERIFY_HTTPS")
if envvar:
if envvar.lower() in ("false", "f"):
verify = False
elif envvar.lower() in ("true", "t"):
verify = True
elif os.path.exists(envvar):
verity = envvar
else:
warnings.warn(
f'Unrecognized value "{envvar}" for env variable'
' "SCIDBPY_VERIFY_HTTPS" (ignoring). It must be'
' "true", "false", or a path to an existing file.'
)
if isinstance(verify, str):
try:
if os.path.isdir(verify):
verify = ssl.create_default_context(capath=verify)
else:
verify = ssl.create_default_context(cafile=verify)
except FileNotFoundError as err:
raise FileNotFoundError(
f"Certificate path provided to verify, {verify}, not found."
) from err
self.scidb_url = scidb_url
self.verify = verify
self.admin = admin
self.namespace = namespace
self.use_arrow = use_arrow
self.page_size = page_size
self.result_size_limit = result_size_limit
self.no_ops = no_ops
self.inactivity_timeout = inactivity_timeout
self.progress_check_secs = progress_check_secs
self.progress_check_callback = progress_check_callback
self.reauth_callback = reauth_callback
self.reauth_tries = reauth_tries
self.backoff_fn = backoff_fn
if http_auth:
self._http_auth = httpx.DigestAuth(*http_auth)
self.http_auth = (http_auth[0], Password_Placeholder())
else:
self._http_auth = self.http_auth = None
if scidb_auth is None:
scidb_auth = self._default_auth_file()
if isinstance(scidb_auth, str):
self.authfile = os.path.realpath(scidb_auth)
scidb_auth = self._auth_file_credentials(self.authfile)
# Create a reauth callback to automatically re-read the file
# and reauthenticate if the session's cookie expires.
# This must happen before get_backend(self).
if not self.reauth_callback:
def cb(httperror):
_ = httperror
return self._auth_file_credentials(self.authfile)
self.reauth_callback = cb
self.reauth_tries = 1
if scidb_auth:
self._scidb_auth = scidb_auth
self.scidb_auth = (scidb_auth[0], Password_Placeholder())
else:
self._scidb_auth = self.scidb_auth = None
# Timeout object for the `httpx` module calls. The read
# timeout must be larger than the SciDB responseTimeout (our
# "progress check" interval for long-running paged queries).
self.timeout = httpx.Timeout(
connect=self._connect_timeout,
read=self._read_write_timeout,
write=self._read_write_timeout,
pool=self._pool_timeout,
)
self._backend = get_backend(self) # Copies attributes set so far.
self.arrays = Arrays(self)
self._uid = uuid.uuid1().hex
self._lock = threading.Lock()
self._array_cnt = 0
self._formatter = string.Formatter()
if self.no_ops:
self.operators = None
self._dir = None
else:
self.load_ops()
[docs]
def close(self):
"""Close the SciDB session explicitly."""
self._backend.close()
def __enter__(self):
"""Enable use in a ``with`` clause."""
return self
def __exit__(self, *args, **kwargs):
"""Enable use in a ``with`` clause."""
self.close()
@staticmethod
def _default_auth_file():
"""Conjure up the default auth file name."""
authfile = os.environ.get(
"SCIDB_AUTH_FILE",
os.path.join(
os.environ.get("XDG_CONFIG_DIR", os.path.expanduser("~/.config")),
"scidb/iquery.auth",
),
)
return authfile
@staticmethod
def _auth_file_credentials(authfile):
"""Get SciDB credentials from an authentication file if possible."""
# Check file type and permissions.
try:
st = os.stat(authfile)
except OSError as e:
if e.errno != errno.ENOENT:
warnings.warn(f"{authfile}: stat: {e.strerror}")
return None
if not stat.S_ISREG(st.st_mode):
warnings.warn(f"{authfile}: Not a regular file")
return None
if (st.st_mode & (stat.S_IRWXG | stat.S_IRWXO)) != 0:
warnings.warn(f"{authfile}: Bad file permissions")
return None
# Open it and extract credentials. It can be either JSON or
# INI format.
with open(authfile) as F:
contents = F.read()
try:
doc = json.loads(contents)
user = doc["user-name"]
passwd = doc["user-password"]
return user, passwd
except Exception:
pass
config = configparser.ConfigParser()
config.read(authfile)
assert "security_password" in config
section = config["security_password"]
return section["user-name"], section["user-password"]
def __iter__(self):
return (
i
for i in (
self.scidb_url,
self.scidb_auth,
self.http_auth,
self.verify,
self.admin,
self.namespace,
self.use_arrow,
self.result_size_limit,
self.no_ops,
)
)
def __repr__(self):
return (
"{}({!r}, "
+ "{!r}, "
+ "{!r}, "
+ "{!r}, "
+ "{!r}, "
+ "{!r}, "
+ "{!r}, "
+ "{!r}, "
+ "{!r})"
).format(type(self).__name__, *self)
def __str__(self):
return """\
scidb_url = {}
scidb_auth = {}
http_auth = {}
verify = {}
admin = {}
namespace = {}
use_arrow = {}
result_size_limit = {}
no_ops = {}""".format(
*self
)
def __getattr__(self, name):
if self.operators and name in self.operators:
return Operator(self, name)
elif self.no_ops:
raise AttributeError(
(
"Operators not loaded. Run 'load_ops()' or "
+ "use 'no_ops=False' (default) "
+ "at connection time (constructor)"
).format(type(self))
)
else:
raise AttributeError(
"{.__name__!r} object has no attribute {!r}".format(type(self), name)
)
def __dir__(self):
return self._dir
[docs]
def uses_shim(self):
"""Return True if this connection goes through Shim.
False means the connection uses the SciDB Client HTTP API."""
return isinstance(self._backend, Shim)
[docs]
def iquery(
self,
query,
fetch=False,
use_arrow=None,
use_arrow_stream=False,
atts_only=False,
as_dataframe=True,
dataframe_promo=True,
schema=None,
page_size=_DEFAULT_PAGE_SIZE,
upload_data=None,
upload_schema=None,
**kwargs,
):
"""Execute query in SciDB
:param string query: SciDB AFL query to execute
:param bool fetch: If ``True``, download SciDB array (default
``False``)
:param bool use_arrow: If ``True``, download SciDB array using Apache
Arrow library. Requires ``accelerated_io_tools`` and ``aio`` enabled
in ``Shim``. If ``True``, a Pandas DataFrame is returned
(``as_dataframe`` has no effect) and null-able types are promoted as
per Pandas `promotion scheme
<https://pandas.pydata.org/pandas-docs/stable/user_guide/gotchas.html#na-type-promotions-for-numpy-types>`_
(``dataframe_promo`` has no effect). If ``None`` the ``use_arrow``
value set at connection time is used (default ``None``)
:param bool use_arrow_stream: If ``True``, return a
``RecordBatchStreamReader`` object to the user. The user will extract
the records from the stream reader. This parameter only had effect if
``use_arrow`` is set to ``True`` (default ``False``)
:param bool atts_only: If ``True``, download only SciDB array
attributes without dimensions (default ``False``)
:param bool as_dataframe: If ``True``, return a Pandas DataFrame. If
``False``, return a NumPy array (default ``True``)
:param bool dataframe_promo: If ``True``, null-able types are promoted
as per Pandas `promotion scheme
<https://pandas.pydata.org/pandas-docs/stable/user_guide/gotchas.html#na-type-promotions-for-numpy-types>`_
If ``False``, object records are used for null-able types (default
``True``)
:param schema: Schema of the SciDB array to use when downloading the
array. Schema is not verified. If schema is a Schema instance, it is
copied. Otherwise, a :py:class:``Schema`` object is built using
:py:func:``Schema.fromstring`` (default ``None``)
:param int page_size: Maximum number of cells per page of output when
executing paged queries, that is, non-upload queries that save their
output. Client API only; ignored for Shim. (default 1,000,000)
:param upload_data: Data to upload to SciDB. If provided, the query
must contain the {fn} placeholder. The data can be a NumPy array, a
file-like object, or bytes. If the upload_data is a file-like object,
it must support the read() method. If the upload_data is bytes, it
must be a bytes or bytearray object.
:param upload_schema: Schema of the SciDB array to use when uploading
the data. If the ``upload_data`` parameter is a NumPy array is its
dtype can be used to determine the schema.
>>> DB().iquery('build(<x:int64>[i=0:1; j=0:1], i + j)', fetch=True)
i j x
0 0 0 0.0
1 0 1 1.0
2 1 0 1.0
3 1 1 2.0
>>> DB().iquery("input({sch}, '{fn}', 0, '{fmt}')",
... fetch=True,
... upload_data=numpy.arange(3, 6))
i x
0 0 3
1 1 4
2 2 5
"""
# Set use_arrow using local/global
if use_arrow is None:
use_arrow = self.use_arrow
# Special case: -- - set_namespace - --
if query.startswith("set_namespace(") and query[-1] == ")":
param = query[len("set_namespace(") : -1]
# Unquote if quoted. Will be quoted when set in prefix.
if param[0] == "'" and param[-1] == "'":
param = param[1:-1]
self.namespace = param
return
if upload_data is not None:
if isinstance(upload_data, numpy.ndarray):
if upload_schema is None:
try:
upload_schema = Schema.fromdtype(upload_data.dtype)
except Exception as e:
warnings.warn(
"Mapping NumPy dtype to SciDB schema failed. "
+ "Try providing an explicit upload_schema"
)
raise e
# Convert upload data to bytes
if upload_schema.is_fixsize():
upload_data = upload_data.tobytes()
else:
upload_data = upload_schema.tobytes(upload_data)
# Check if placeholders are present
place_holders = set(
field_name for _1, field_name, _3, _4 in self._formatter.parse(query)
)
if "fn" not in place_holders:
warnings.warn(
"upload_data provided, but {fn} placeholder is missing",
stacklevel=2,
)
if "fmt" in place_holders and upload_schema is None:
warnings.warn(
"upload_data and {fmt} placeholder provided, "
+ "but upload_schema is None",
stacklevel=2,
)
# Check if upload data is bytes or file-like object
if not (
isinstance(upload_data, bytes)
or isinstance(upload_data, bytearray)
or hasattr(upload_data, "read")
):
warnings.warn(
"upload_data is not bytes or file-like object", stacklevel=2
)
fn = self._backend.prepare_upload(upload_data)
query = query.format(
sch=upload_schema,
fn=fn,
fmt=upload_schema.atts_fmt_scidb if upload_schema else None,
)
if fetch:
# Use provided schema or get schema from SciDB
if schema:
# Deep-copy schema since we might be mutating it
if isinstance(schema, Schema):
if not atts_only and not use_arrow:
schema = copy.deepcopy(schema)
else:
schema = Schema.fromstring(schema)
else:
# Execute 'show(...)' and Download text
self._backend.runquery(
query=DB._show_query.format(query.replace("'", "\\'")),
page_size=page_size,
save="tsv",
)
schema = Schema.fromstring(self._backend.read())
# Attributes and dimensions can collide. Run make_unique to
# remove any collisions.
#
# make_unique fixes any collision, but if we don't
# download the dimensions, we don't need to fix collisions
# between dimensions and attributes. So, we use
# make_unique only if there are collisions within the
# attribute names.
if (
not atts_only
or len(set((a.name for a in schema.atts))) < len(schema.atts)
) and schema.make_unique():
# Dimensions or attributes were renamed due to
# collisions. We need to cast. ({:h} is our
# custom format for Schema objects, see schema.py)
query = "cast({}, {:h})".format(query, schema)
# Unpack
if not atts_only:
# TODO If SDB-7905 is fixed use the following code
# query = f'flatten({query})'
query = f"unpack({query}, i)"
# update schema after unpacking/flattening
schema.make_dims_atts()
# Execute Query and Download content
self._backend.runquery(
query=query,
save="arrow" if use_arrow else schema.atts_fmt_scidb,
page_size=page_size,
result_size_limit=self.result_size_limit,
atts_only=1, # atts_only handled by unpack/flatten above
# backend always uses atts_only=1
**kwargs,
)
buf = self._backend.readbytes()
# Build result
if use_arrow:
# This is what gets returned if use_arrow_stream = True
data = pyarrow.RecordBatchStreamReader(pyarrow.BufferReader(buf))
if not use_arrow_stream:
data = data.read_pandas()
elif schema.is_fixsize():
data = numpy.frombuffer(buf, dtype=schema.atts_dtype)
if as_dataframe:
data = pandas.DataFrame.from_records(data)
if dataframe_promo:
schema.promote(data)
else:
# Parse binary buffer
data = schema.frombytes(buf, as_dataframe, dataframe_promo)
if as_dataframe:
data = pandas.DataFrame.from_records(data)
return data
else: # fetch=False
self._backend.runquery(query=query, **kwargs)
# Special case: -- - load_library - --
if query.startswith("load_library("):
self.load_ops()
[docs]
def iquery_readlines(self, query, page_size=_DEFAULT_PAGE_SIZE, **kwargs):
"""Execute query in SciDB
>>> DB().iquery_readlines('build(<x:int64>[i=0:2], i * i)')
... # doctest: +ELLIPSIS
[...'0', ...'1', ...'4']
>>> DB().iquery_readlines(
... 'apply(build(<x:int64>[i=0:2], i), y, i + 10)')
... # doctest: +ELLIPSIS
[[...'0', ...'10'], [...'1', ...'11'], [...'2', ...'12']]
"""
self._backend.runquery(query=query, save="tsv", page_size=page_size, **kwargs)
return self._backend.readlines(split="\t")
[docs]
def next_array_name(self):
"""Generate a unique array name. Keep track on these names using the
_uid field and a counter
"""
# Thread-safe counter
with self._lock:
self._array_cnt += 1
return "py_{}_{}".format(self._uid, self._array_cnt)
[docs]
def load_ops(self):
"""Get list of operators and macros."""
self._backend.runquery(query="project(list('operators'), name)", save="tsv")
operators = self._backend.readlines()
self._backend.runquery(query="project(list('macros'), name)", save="tsv")
macros = self._backend.readlines()
self.no_ops = False
self.operators = operators + macros
self._dir = self.operators + [
"arrays",
"gc",
"iquery",
"iquery_readlines",
"upload",
]
self._dir.sort()
[docs]
class Arrays(object):
"""Access to arrays available in SciDB"""
def __init__(self, db):
self.db = db
def __repr__(self):
return "{}({!r})".format(type(self).__name__, self.db)
def __str__(self):
return """DB:
{}""".format(
self.db
)
def __getattr__(self, name):
"""db.arrays.foo"""
return Array(self.db, name)
def __getitem__(self, name):
"""db.arrays['foo']"""
return Array(self.db, name)
def __dir__(self):
"""Download the list of SciDB arrays. Use 'project(list(), name)' to
download only names and schemas
"""
return self.db.iquery_readlines("project(list(), name)")
[docs]
class Array(object):
"""Access to individual array"""
def __init__(self, db, name, gc=False):
self.db = db
self.name = name
if gc:
weakref.finalize(self, self.db.iquery, "remove({})".format(self.name))
def __repr__(self):
return "{}({!r}, {!r})".format(type(self).__name__, self.db, self.name)
def __str__(self):
return self.name
def __getattr__(self, key):
return ArrayExp("{}.{}".format(self.name, key))
def __getitem__(self, key):
return self.fetch()[key]
def __dir__(self):
"""Download the schema of the SciDB array, using ``show()``"""
sh = Schema.fromstring(self.db.iquery_readlines("show({})".format(self))[0][0])
ls = [i.name for i in itertools.chain(sh.atts, sh.dims)]
ls.sort()
return ls
def __mod__(self, alias):
"""Overloads ``%`` operator to add support for aliasing"""
return Array(self.db, "{} as {}".format(self.name, alias))
def fetch(self, **kwargs):
return self.db.iquery("scan({})".format(self), fetch=True, **kwargs)
[docs]
def head(self, n=5, **kwargs):
"""Similar to ``pandas.DataFrame.head``. Makes use of the ``limit``
operator, if available.
"""
if "limit" in self.db.operators:
return self.db.iquery("limit({}, {})".format(self, n), fetch=True, **kwargs)
else:
warnings.warn(
'"limit" operator not available. '
+ "Fetching the entire array. "
+ "See https://github.com/Paradigm4/limit."
)
return self.fetch(**kwargs)[:n]
def schema(self):
return Schema.fromstring(
self.db.iquery_readlines("show({})".format(self))[0][0]
)
[docs]
class ArrayExp(object):
"""Access to individual attribute or dimension"""
def __init__(self, exp):
self.exp = exp
def __repr__(self):
return "{}({!r})".format(type(self).__name__, self.exp)
def __str__(self):
return "{}".format(self.exp)
def __add__(self, other):
return ArrayExp("{} + {}".format(self, other))
[docs]
class Operator(object):
"""Store SciDB operator and arguments. Hungry operators (e.g., remove,
store, etc.) evaluate immediately. Lazy operators evaluate on data
fetch.
"""
def __init__(self, db, name, upload_data=None, upload_schema=None, *args):
self.db = db
self.name = name.lower()
self.upload_data = upload_data
self.upload_schema = upload_schema
self.args = list(args)
self.is_lazy = self.name not in ops_eager
self._dir = self.db.operators + ["fetch"]
self._dir.sort()
def __repr__(self):
return "{}(db={!r}, name={!r}, args=[{}])".format(
type(self).__name__,
self.db,
self.name,
", ".join("{!r}".format(i) for i in self.args),
)
def __str__(self):
args_fmt = []
for pos, arg in enumerate(self.args):
# Format argument to string (possibly recursive)
arg_fmt = "{}".format(arg)
# Special case: quote string argument if not quoted
if (
pos < len(string_args)
and self.name in string_args[pos]
and len(arg)
and arg_fmt[0] != "'"
and arg_fmt[-1] != "'"
):
arg_fmt = "'{}'".format(arg_fmt)
# Special case: -- - show - --
if (
pos == 0
and self.name == "show"
and (isinstance(arg, Operator) or len(self.args) > 1)
):
arg_fmt = "'{}'".format(arg_fmt.replace("'", "\\'"))
# Add to arguments list
args_fmt.append(arg_fmt)
return "{}({})".format(self.name, ", ".join(args_fmt))
def __call__(self, *args, **kwargs):
"""Returns self for lazy expressions. Executes immediate expressions."""
# Propagate "upload_data" and "upload_schema" from previous
# operators
for arg in args:
if isinstance(arg, Operator) and (
arg.upload_data is not None or arg.upload_schema is not None
):
if self.upload_data is not None or (self.upload_schema is not None):
raise NotImplementedError(
'Queries with multiple "upload_data" or '
+ '"upload_schema" arguments are not supported'
)
self.upload_data = arg.upload_data
self.upload_schema = arg.upload_schema
self.args.extend(args)
# Special case: -- - create_array - --
if self.name == "create_array" and len(self.args) < 3:
# Set "temporary"
self.args.append(False)
# Special case: -- - input & load - --
elif self.name in ("input", "load"):
ln = len(self.args)
# Set upload data
if "upload_data" in kwargs.keys():
self.upload_data = kwargs["upload_data"]
# Set upload schema
if "upload_schema" in kwargs.keys():
# Pass through if provided as argument
self.upload_schema = kwargs["upload_schema"]
if self.upload_schema is None:
# If the upload_data is a NumPy array try to map the
# array dtype to upload schema
if self.upload_data is not None and isinstance(
self.upload_data, numpy.ndarray
):
try:
self.upload_schema = Schema.fromdtype(self.upload_data.dtype)
except Exception:
# Might fail if the dtype contains
# objects. The same type mapping is attempted
# later in iquery, but there the exception is
# propagated
pass
# If the operator is input, try to map first argument
# to upload schema
if self.upload_schema is None and self.name == "input" and ln >= 1:
try:
self.upload_schema = Schema.fromstring(args[0])
except Exception:
# Fails if the argument is an array name
pass
# Set required arguments if missing
# Check if "input_file" is present (2nd argument)
if ln < 2:
# Check if "existing_array|anonymous_schema"
# is present (1st argument)
if ln < 1:
self.args.append("{sch}") # anonymous_schema
self.args.append("'{fn}'") # input_file
# Set optional arguments if missing and necessary
# Check if "format" is present (4th argument)
if ln < 4 and self.upload_data is not None:
# Check if "instance_id" is present (3rd argument)
if ln < 3:
self.args.append(0) # instance_id
self.args.append("'{fmt}'") # format
# Special case: -- - store - --
elif self.name == "store":
if len(self.args) < 2:
# Set "named_array"
self.args.append(self.db.next_array_name())
# Garbage collect (if not specified)
if "gc" not in kwargs.keys():
kwargs["gc"] = True
# If temp=True in kwargs, create a temporary array first
if "temp" in kwargs.keys() and kwargs["temp"] is True:
# Get the schema of the new array
try:
new_schema = Schema.fromstring(
self.db.iquery_readlines(
"show('{}', 'afl')".format(
str(self.args[0]).replace("'", "\\'")
)
)[0][0]
)
except httpx.HTTPError as e:
e.args = (
'"temp=True" not supported for complex queries\n' + e.args[0],
)
raise
# Set array name
new_schema.name = self.args[1]
# Create temporary array
self.db.iquery("create temp array {}".format(new_schema))
# Lazy or eager
if self.is_lazy: # Lazy
return self
else: # Hungry
# Execute query
self.db.iquery(
str(self),
upload_data=self.upload_data,
upload_schema=self.upload_schema,
)
# Handle output
# Special case: -- - load - --
if self.name == "load":
if isinstance(self.args[0], Array):
return self.args[0]
else:
return Array(self.db, self.args[0])
# Special case: -- - store - --
elif self.name == "store":
if isinstance(self.args[1], Array):
return self.args[1]
else:
return Array(self.db, self.args[1], kwargs.get("gc", False))
def __getitem__(self, key):
return self.fetch()[key]
def __getattr__(self, name):
if name in self.db.operators:
return Operator(self.db, name, self.upload_data, self.upload_schema, self)
else:
raise AttributeError(
"{.__name__!r} object has no attribute {!r}".format(type(self), name)
)
def __dir__(self):
return self._dir
def __mod__(self, alias):
"""Overloads ``%`` operator to add support for aliasing"""
return Array(self.db, "{} as {}".format(self, alias))
def fetch(self, **kwargs):
if self.is_lazy:
return self.db.iquery(
str(self),
fetch=True,
upload_data=self.upload_data,
upload_schema=self.upload_schema,
**kwargs,
)
def schema(self):
if self.is_lazy:
return Schema.fromstring(
self.db.iquery_readlines("show('{}', 'afl')".format(self))[0][0]
)
connect = DB
iquery = DB.iquery
if __name__ == "__main__":
# logging.basicConfig(level=logging.DEBUG)
os.environ["SCIDBPY_VERIFY_HTTPS"] = "False"
import doctest
doctest.testmod(optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)