Source code for scidbpy.schema

# -*- coding: utf-8 -*-
# Copyright (c) 2017-2025, Paradigm4 Inc. All Rights Reserved.

"""Attribute, Dimension, and Schema
================================

Classes for accessing SciDB data and schemas.

"""

import itertools
import re
import struct
import warnings

import dateutil
import numpy
import pandas

type_map_numpy = dict(
    (k, numpy.dtype(v))
    for (k, v) in [
        (t.__name__, t)
        for t in (
            bool,
            numpy.int8,
            numpy.int16,
            numpy.int32,
            numpy.int64,
            numpy.uint8,
            numpy.uint16,
            numpy.uint32,
            numpy.uint64,
        )
    ]
    + [
        ("char", "S1"),
        ("double", numpy.float64),
        ("float", numpy.float32),
        ("string", object),
        ("binary", object),
        ("datetime", "datetime64[s]"),
        ("datetimetz", [("time", "datetime64[s]"), ("tz", "timedelta64[s]")]),
    ]
)

type_map_inv_numpy = {
    v: k for k, v in type_map_numpy.items() if v != numpy.dtype(object)
}
type_map_inv_numpy.update(
    dict(
        (numpy.dtype(k), v)
        for (k, v) in [
            (numpy.str_, "string"),
            (numpy.bytes_, "string"),
            (numpy.datetime64, "datetime"),
            (numpy.timedelta64, "datetimetz"),
        ]
    )
)

type_map_struct = {
    "bool": "?",
    "char": "c",
    "int8": "b",
    "int16": "<h",
    "int32": "<i",
    "int64": "<q",
    "float": "<f",
    "double": "<d",
    "datetime": "<q",
    "datetimetz": "<qq",
}

# Add uint types
for key in list(type_map_struct.keys()):
    if key.startswith("int"):
        type_map_struct["u" + key] = type_map_struct[key].upper()

# Add null-able type
for key, val in type_map_struct.items():
    if len(val) > 1:
        val_null = val[0] + "B" + val[1]
    else:
        val_null = "B" + val
    type_map_struct[key] = (val, val_null)

# Type promotion map for Pandas DataFrame
# http://pandas.pydata.org/pandas-docs/stable/gotchas.html#na-type-promotions
type_map_promo = dict(
    (k, numpy.dtype(v))
    for (k, v) in [
        ("bool", object),
        ("char", object),
        ("int8", numpy.float16),
        ("int16", numpy.float32),
        ("int32", numpy.float64),
        ("int64", numpy.float64),
        ("uint8", numpy.float16),
        ("uint16", numpy.float32),
        ("uint32", numpy.float64),
        ("uint64", numpy.float64),
        ("datetime", "datetime64[ns]"),
    ]
)

one_att_name = "x"
one_dim_name = "i"


[docs] class Attribute(object): """Represent SciDB array attribute Construct an attribute using Attribute constructor: >>> Attribute('foo', 'int64', not_null=True) ... # doctest: +NORMALIZE_WHITESPACE Attribute(name='foo', type_name='int64', not_null=True, default=None, compression=None) >>> Attribute('foo', 'int64', default=100, compression='zlib') ... # doctest: +NORMALIZE_WHITESPACE Attribute(name='foo', type_name='int64', not_null=False, default=100, compression='zlib') Construct an attribute from a string: >>> Attribute.fromstring('foo:int64') ... # doctest: +NORMALIZE_WHITESPACE Attribute(name='foo', type_name='int64', not_null=False, default=None, compression=None) >>> Attribute.fromstring( ... "taz : string NOT null DEFAULT '' compression 'bzlib'") ... # doctest: +NORMALIZE_WHITESPACE Attribute(name='taz', type_name='string', not_null=True, default="''", compression='bzlib') """ _regex = re.compile( """ \\s* (?P<name> \\w+ ) \\s* : \\s* (?P<type_name> \\w+ ) \\s* (?: (?P<not_null> NOT )? \\s+ NULL )? \\s* (?: DEFAULT \\s+ (?P<default> \\S+ ) )? \\s* (?: COMPRESSION \\s+ '(?P<compression> \\w+ )' )? \\s* $""", re.VERBOSE | re.IGNORECASE, ) # length dtype for variable-size SciDB types _length_dtype = numpy.dtype(numpy.uint32) _length_fmt = "<I" def __init__(self, name, type_name, not_null=False, default=None, compression=None): self.__name = name self.type_name = type_name self.not_null = bool(not_null) self.default = default self.compression = compression self.fmt_scidb = "{}{}".format(self.type_name, "" if self.not_null else " null") self.fmt_struct = type_map_struct.get(self.type_name, None) self._set_dtype() def __iter__(self): return ( i for i in ( self.name, self.type_name, self.not_null, self.default, self.compression, ) ) def __eq__(self, other): return tuple(self) == tuple(other) def __repr__(self): return ( "{}(" + "name={!r}, " + "type_name={!r}, " + "not_null={!r}, " + "default={!r}, " + "compression={!r})" ).format(type(self).__name__, *self) def __str__(self): return "{}:{}{}{}{}".format( self.name, self.type_name, " NOT NULL" if self.not_null else "", " DEFAULT {}".format(self.default) if self.default else "", " COMPRESSION '{}'".format(self.compression) if self.compression else "", ) @property def name(self): return self.__name @name.setter def name(self, value): self.__name = value self._set_dtype() def _set_dtype(self): self.dtype_val = type_map_numpy.get(self.type_name, object) # >>> numpy.dtype([(u"a", int)]) # TypeError: data type not understood # https://github.com/numpy/numpy/issues/2407 # cannot use `self.name` directly, use `str(...)` if self.not_null: self.dtype = numpy.dtype([(str(self.name), self.dtype_val)]) else: self.dtype = numpy.dtype( [(str(self.name), [("null", numpy.uint8), ("val", self.dtype_val)])] ) def is_fixsize(self): return self.dtype_val != object def itemsize(self, buf=None, offset=0): if self.dtype_val != object: return self.dtype.itemsize null_size = 0 if self.not_null else 1 value_size = numpy.frombuffer(buf, numpy.uint32, 1, offset + null_size)[0] return null_size + Attribute._length_dtype.itemsize + value_size def frombytes(self, buf, offset=0, size=None, promo=False): null_size = 0 if self.not_null else 1 if self.dtype_val == object: if self.type_name == "string": val = buf[ offset + null_size + Attribute._length_dtype.itemsize : offset + size - 1 ].decode("utf-8") else: val = buf[ offset + null_size + Attribute._length_dtype.itemsize : offset + size ] else: val = struct.unpack( self.fmt_struct[0], buf[offset + null_size : offset + size] ) if len(val) == 1: val = val[0] if self.not_null: return val else: missing = struct.unpack("B", buf[offset : offset + null_size])[0] if promo: return val if missing == 255 else None else: return (missing, val) def tobytes(self, val): if self.dtype_val == object: if self.type_name == "string": val_enc = val.encode("utf-8") buf = b"".join( [ struct.pack(Attribute._length_fmt, len(val_enc) + 1), val_enc, b"\x00", ] ) elif self.type_name == "binary": buf = b"".join([struct.pack(Attribute._length_fmt, len(val)), val]) else: raise NotImplementedError("Convert <{}> to bytes".format(self)) else: if self.not_null: buf = struct.pack(self.fmt_struct[0], val) else: if isinstance(val, numpy.void): # NumPy structured array buf = struct.pack(self.fmt_struct[1], *val) else: buf = struct.pack(self.fmt_struct[1], 255, val) return buf @classmethod def fromstring(cls, string): try: return cls(**Attribute._regex.match(string).groupdict()) except AttributeError: raise Exception("Failed to parse attribute: {}".format(string)) @classmethod def fromdtype(cls, dtype_descr): if isinstance(dtype_descr[1], str): # e.g. ('name', 'int64') dtype_val = dtype_descr[1] not_null = True else: # e.g. ('name', [('null': 'int8'), ('val': 'int64')] # ('name', [('time', 'datetime64'), ('tz', 'timedelta64')]) # ('name', [('null': 'int8'), # ('val' : [('time', 'datetime64'), # ('tz', 'timedelta64')])]) if dtype_descr[1][0][0] == "null": not_null = False dtype_val = dtype_descr[1][1][1] else: not_null = True dtype_val = dtype_descr[1] dtype_val = numpy.dtype(dtype_val) if dtype_val in type_map_inv_numpy.keys(): type_name = type_map_inv_numpy[dtype_val] else: # if dtype_val not found in map, try the dtype_val.type # (without the length) ty = numpy.dtype(dtype_val.type) # e.g. '<U3' --type--> '<U' --map--> numpy.str_ if ty in type_map_inv_numpy.keys(): type_name = type_map_inv_numpy[ty] else: raise Exception( "No SciDB type mapping for NumPy type {}".format(dtype_val) ) return cls( name=dtype_descr[0] if dtype_descr[0] else one_att_name, type_name=type_name, not_null=not_null, )
[docs] class Dimension(object): """Represent SciDB array dimension Construct a dimension using the Dimension constructor: >>> Dimension('foo') ... # doctest: +NORMALIZE_WHITESPACE Dimension(name='foo', low_value=None, high_value=None, chunk_overlap=None, chunk_length=None) >>> Dimension('foo', -100, '10', '?', '1000') ... # doctest: +NORMALIZE_WHITESPACE Dimension(name='foo', low_value=-100, high_value=10, chunk_overlap='?', chunk_length=1000) Construct a dimension from a string: >>> Dimension.fromstring('foo') ... # doctest: +NORMALIZE_WHITESPACE Dimension(name='foo', low_value=None, high_value=None, chunk_overlap=None, chunk_length=None) >>> Dimension.fromstring('foo=-100:*:?:10') ... # doctest: +NORMALIZE_WHITESPACE Dimension(name='foo', low_value=-100, high_value='*', chunk_overlap='?', chunk_length=10) """ _regex = re.compile( """ \\s* (?P<name> \\w+ ) \\s* (?: = \\s* (?P<low_value> [^:\\s]+ ) \\s* : \\s* (?P<high_value> [^:\\s]+ ) \\s* (?: : \\s* (?P<chunk_overlap> [^:\\s]+ ) \\s* (?: : \\s* (?P<chunk_length> [^:\\s]+ ) )? )? )? \\s* $""", re.VERBOSE, ) def __init__( self, name, low_value=None, high_value=None, chunk_overlap=None, chunk_length=None, ): self.name = name try: self.low_value = int(low_value) except (TypeError, ValueError): self.low_value = low_value try: self.high_value = int(high_value) except (TypeError, ValueError): self.high_value = high_value try: self.chunk_overlap = int(chunk_overlap) except (TypeError, ValueError): self.chunk_overlap = chunk_overlap try: self.chunk_length = int(chunk_length) except (TypeError, ValueError): self.chunk_length = chunk_length def __iter__(self): return ( i for i in ( self.name, self.low_value, self.high_value, self.chunk_overlap, self.chunk_length, ) ) def __eq__(self, other): return tuple(self) == tuple(other) def __repr__(self): return ( "{}(" + "name={!r}, " + "low_value={!r}, " + "high_value={!r}, " + "chunk_overlap={!r}, " + "chunk_length={!r})" ).format(type(self).__name__, *self) def __str__(self): out = self.name if self.low_value is not None: out += "={}:{}".format(self.low_value, self.high_value) if self.chunk_overlap is not None: out += ":{}".format(self.chunk_overlap) if self.chunk_length is not None: out += ":{}".format(self.chunk_length) return out @classmethod def fromstring(cls, string): try: return cls(**Dimension._regex.match(string).groupdict()) except AttributeError: raise Exception("Failed to parse dimension: {}".format(string))
[docs] class Schema(object): """Represent SciDB array schema Construct a schema using Schema, Attribute, and Dimension constructors: >>> Schema('foo', (Attribute('x', 'int64'),), (Dimension('i', 0, 10),)) ... # doctest: +NORMALIZE_WHITESPACE Schema(name='foo', atts=(Attribute(name='x', type_name='int64', not_null=False, default=None, compression=None),), dims=(Dimension(name='i', low_value=0, high_value=10, chunk_overlap=None, chunk_length=None),)) Construct a schema using Schema constructor and fromstring methods of Attribute and Dimension: >>> Schema('foo', ... (Attribute.fromstring('x:int64'),), ... (Dimension.fromstring('i=0:10'),)) ... # doctest: +NORMALIZE_WHITESPACE Schema(name='foo', atts=(Attribute(name='x', type_name='int64', not_null=False, default=None, compression=None),), dims=(Dimension(name='i', low_value=0, high_value=10, chunk_overlap=None, chunk_length=None),)) Construct a schema from a string: >>> Schema.fromstring( ... 'foo@1<x:int64 not null, y:double>[i=0:*; j=-100:0:0:10]') ... # doctest: +NORMALIZE_WHITESPACE Schema(name='foo@1', atts=(Attribute(name='x', type_name='int64', not_null=True, default=None, compression=None), Attribute(name='y', type_name='double', not_null=False, default=None, compression=None)), dims=(Dimension(name='i', low_value=0, high_value='*', chunk_overlap=None, chunk_length=None), Dimension(name='j', low_value=-100, high_value=0, chunk_overlap=0, chunk_length=10))) Print a schema constructed from a string: >>> print(Schema.fromstring('<x:int64,y:float> [i=0:2:0:1000000; j=0:*]')) ... # doctest: +NORMALIZE_WHITESPACE <x:int64,y:float> [i=0:2:0:1000000; j=0:*] Format Schema object to only print the schema part without the array name: >>> '{:h}'.format(Schema.fromstring('foo<x:int64>[i]')) '<x:int64> [i]' """ _regex_name = re.compile( "\\s* (?: not \\s+ empty \\s+ )? (?P<name> [\\w@]+ )?", re.VERBOSE ) _regex_atts = re.compile( "\\s* < ( [^,>]+ \\s* (?: , \\s* [^,>]+ \\s* )* ) >", re.VERBOSE ) _regex_dims = re.compile( "\\s* \\[ ( [^;\\]]+ \\s* (?: ; \\s* [^;\\]]+ \\s* )* ) \\] \\s*", re.VERBOSE ) def __init__(self, name=None, atts=(), dims=()): self.name = name self.atts = tuple(atts) self.dims = tuple(dims) # Set lazy self.__atts_dtype = None self.__atts_fmt_scidb = None def __iter__(self): return (i for i in (self.name,) + self.atts + self.dims) def __eq__(self, other): return tuple(self) == tuple(other) def __repr__(self): return "{}(name={!r}, atts={!r}, dims={!r})".format( type(self).__name__, self.name, self.atts, self.dims ) def __str__(self): return self._render() def __format__(self, fmt_spec=""): return self._render(no_name="h" in fmt_spec) def _render(self, no_name=False): return "{}<{}> [{}]".format( self.name if not no_name and self.name else "", ",".join(str(a) for a in self.atts), "; ".join(str(d) for d in self.dims), ) def _promo_warning(self): cnt = sum(not a.not_null for a in self.atts) if cnt: warnings.warn( ( "{} type(s) promoted for null support." + " Precision loss may occur" ).format(cnt), stacklevel=2, ) @property def atts_dtype(self): if self.__atts_dtype is None: self.__atts_dtype = numpy.dtype( list(itertools.chain.from_iterable(a.dtype.descr for a in self.atts)) ) return self.__atts_dtype @property def atts_fmt_scidb(self): if self.__atts_fmt_scidb is None: self.__atts_fmt_scidb = "({})".format( ", ".join(a.fmt_scidb for a in self.atts) ) return self.__atts_fmt_scidb def pprint(self): print(self) info = numpy.empty( (len(self.atts) + len(self.dims),), dtype=[ ("name", object), ("class", object), ("type", object), ("nullable", object), ("start", object), ("end", object), ("overlap", object), ("chunk", object), ], ) pos = 0 for a in self.atts: info.put( (pos,), (a.name, "attr", a.type_name, not a.not_null, "", "", "", "") ) pos += 1 for d in self.dims: info.put( (pos,), ( d.name, "dim", "int64", "", d.low_value, d.high_value, d.chunk_overlap, d.chunk_length, ), ) pos += 1 print(pandas.DataFrame.from_records(info)) def is_fixsize(self): return all(a.is_fixsize() for a in self.atts)
[docs] def make_unique(self): """Make dimension and attribute names unique within the schema. Return ``True`` if any dimension or attribute was renamed. >>> s = Schema(None, (Attribute('i', 'bool'),), (Dimension('i'),)) >>> print(s) <i:bool> [i] >>> s.make_unique() True >>> print(s) <i:bool> [i_1] >>> s = Schema.fromstring('<i:bool, i:int64>[i;i_1;i]') >>> s.make_unique() True >>> print(s) <i:bool,i_2:int64> [i_3; i_1; i_4] """ all_before = set( itertools.chain((a.name for a in self.atts), (d.name for d in self.dims)) ) # Check if overall duplicates are present if len(all_before) < len(self.atts) + len(self.dims): all_after = set() # Process attributes for a in self.atts: # Start renaming after the first copy. First copy # will not be in all_after. From second copy # on-wards, a copy will be in all_after. if a.name in all_after: new_name_tmpl = a.name + "_{}" count = 1 new_name = new_name_tmpl.format(count) while new_name in all_before or new_name in all_after: count += 1 new_name = new_name_tmpl.format(count) a.name = new_name all_after.add(a.name) # Process dimensions for d in self.dims: if d.name in all_after: new_name_tmpl = d.name + "_{}" count = 1 new_name = new_name_tmpl.format(count) while new_name in all_before or new_name in all_after: count += 1 new_name = new_name_tmpl.format(count) d.name = new_name all_after.add(d.name) # Reset dtype self.__atts_dtype = None return True else: return False
[docs] def make_dims_atts(self): """Make attributes from dimensions and pre-append them to the attributes list. >>> s = Schema(None, (Attribute('x', 'bool'),), (Dimension('i'),)) >>> print(s) <x:bool> [i] >>> s.make_dims_atts() >>> print(s) <i:int64 NOT NULL,x:bool> [i] >>> s = Schema.fromstring('<x:bool>[i;j]') >>> s.make_dims_atts() >>> print(s) <i:int64 NOT NULL,j:int64 NOT NULL,x:bool> [i; j] """ self.atts = tuple( itertools.chain( (Attribute(d.name, "int64", not_null=True) for d in self.dims), self.atts, ) ) # Reset self.__atts_dtype = None self.__atts_fmt_scidb = None
def get_promo_atts_dtype(self): self._promo_warning() return numpy.dtype( [ ( a.dtype.descr[0] if a.not_null else ( a.dtype.names[0], type_map_promo.get( a.type_name, type_map_numpy.get(a.type_name, object) ), ) ) for a in self.atts ] )
[docs] def promote(self, data): """Promote nullable attributes in the DataFrame to types which support some type of null values as per Pandas `promotion scheme <http://pandas.pydata.org/pandas-docs/stable/gotchas.html #na-type-promotions-for-numpy-types>`__ """ self._promo_warning() for a in self.atts: if not a.not_null: if a.type_name == "datetimetz": # Special case to promote SciDB datetimetz to Pandas data[a.name] = pandas.Series( data=[ ( pandas.Timestamp( attr[1][0], # Cnvert UTC offset to timezone tz=dateutil.tz.tzoffset(None, offset=attr[1][1]), ) if attr[0] == 255 else numpy.nan ) for attr in data[a.name] ], dtype="datetime64[ns, UTC]", ) else: # All other types data[a.name] = pandas.Series( data=[ attr[1] if attr[0] == 255 else numpy.nan for attr in data[a.name] ], dtype=type_map_promo.get( a.type_name, type_map_numpy.get(a.type_name, object) ), )
def frombytes(self, buf, as_dataframe=False, dataframe_promo=True): # Scan content and build (offset, size) metadata off = 0 buf_meta = [] while off < len(buf): meta = [] for att in self.atts: sz = att.itemsize(buf, off) meta.append((off, sz)) off += sz buf_meta.append(meta) # Create NumPy record array if as_dataframe and dataframe_promo: data = numpy.empty((len(buf_meta),), dtype=self.get_promo_atts_dtype()) else: data = numpy.empty((len(buf_meta),), dtype=self.atts_dtype) # Extract values using (offset, size) metadata # Populate NumPy record array pos = 0 for meta in buf_meta: data.put( (pos,), tuple( att.frombytes(buf, off, sz, promo=as_dataframe and dataframe_promo) for (att, (off, sz)) in zip(self.atts, meta) ), ) pos += 1 return data def tobytes(self, data): buf_lst = [] if len(data.dtype) > 0: # NumPy structured array if len(self.atts_dtype) == 1: # One attribute atr = self.atts[0] for cell in data: buf_lst.append(atr.tobytes(cell[0])) else: # Multiple attributes for cell in data: for atr, val in zip(self.atts, cell): buf_lst.append(atr.tobytes(val)) else: # NumPy single-field array atr = self.atts[0] for val in data: buf_lst.append(atr.tobytes(val)) return b"".join(buf_lst) @classmethod def fromstring(cls, string): name_match = Schema._regex_name.match(string) atts_match = Schema._regex_atts.match(string, name_match.end(0)) dims_match = Schema._regex_dims.match(string, atts_match.end(0)) name = name_match.groupdict()["name"] return cls( name.strip() if name else None, (Attribute.fromstring(s) for s in atts_match.group(1).split(",")), (Dimension.fromstring(s) for s in dims_match.group(1).split(";")), ) @classmethod def fromdtype(cls, dtype): return cls( None, (Attribute.fromdtype(dt) for dt in dtype.descr), (Dimension(one_dim_name),), )
if __name__ == "__main__": import doctest doctest.testmod(optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)