"""Attribute, Dimension, and Schema
================================
Classes for accessing SciDB data and schemas.
"""
import itertools
import re
import struct
import warnings
import dateutil
import numpy
import pandas
import six
type_map_numpy = dict(
(k, numpy.dtype(v)) for (k, v) in
[(t.__name__, t) for t in (
bool,
numpy.int8,
numpy.int16,
numpy.int32,
numpy.int64,
numpy.uint8,
numpy.uint16,
numpy.uint32,
numpy.uint64,
)] + [
('char', 'S1'),
('double', numpy.float64),
('float', numpy.float32),
('string', object),
('binary', object),
('datetime', 'datetime64[s]'),
('datetimetz', [('time', 'datetime64[s]'),
('tz', 'timedelta64[s]')]),
])
type_map_inv_numpy = {v: k
for k, v in six.iteritems(type_map_numpy)
if v != numpy.dtype(object)}
type_map_inv_numpy.update(dict(
(numpy.dtype(k), v) for (k, v) in
[
(numpy.str_, 'string'),
(numpy.string_, 'string'),
(numpy.datetime64, 'datetime'),
(numpy.timedelta64, 'datetimetz'),
]))
type_map_struct = {
'bool': '?',
'char': 'c',
'int8': 'b',
'int16': '<h',
'int32': '<i',
'int64': '<q',
'float': '<f',
'double': '<d',
'datetime': '<q',
'datetimetz': '<qq',
}
# Add uint types
for key in list(type_map_struct.keys()):
if key.startswith('int'):
type_map_struct['u' + key] = type_map_struct[key].upper()
# Add null-able type
for (key, val) in type_map_struct.items():
if len(val) > 1:
val_null = val[0] + 'B' + val[1]
else:
val_null = 'B' + val
type_map_struct[key] = (val, val_null)
# Type promotion map for Pandas DataFrame
# http://pandas.pydata.org/pandas-docs/stable/gotchas.html#na-type-promotions
type_map_promo = dict(
(k, numpy.dtype(v)) for (k, v) in
[
('bool', object),
('char', object),
('int8', numpy.float16),
('int16', numpy.float32),
('int32', numpy.float64),
('int64', numpy.float64),
('uint8', numpy.float16),
('uint16', numpy.float32),
('uint32', numpy.float64),
('uint64', numpy.float64),
('datetime', 'datetime64[ns]'),
])
one_att_name = 'x'
one_dim_name = 'i'
[docs]
class Attribute(object):
"""Represent SciDB array attribute
Construct an attribute using Attribute constructor:
>>> Attribute('foo', 'int64', not_null=True)
... # doctest: +NORMALIZE_WHITESPACE
Attribute(name='foo',
type_name='int64',
not_null=True,
default=None,
compression=None)
>>> Attribute('foo', 'int64', default=100, compression='zlib')
... # doctest: +NORMALIZE_WHITESPACE
Attribute(name='foo',
type_name='int64',
not_null=False,
default=100,
compression='zlib')
Construct an attribute from a string:
>>> Attribute.fromstring('foo:int64')
... # doctest: +NORMALIZE_WHITESPACE
Attribute(name='foo',
type_name='int64',
not_null=False,
default=None,
compression=None)
>>> Attribute.fromstring(
... "taz : string NOT null DEFAULT '' compression 'bzlib'")
... # doctest: +NORMALIZE_WHITESPACE
Attribute(name='taz',
type_name='string',
not_null=True,
default="''",
compression='bzlib')
"""
_regex = re.compile('''
\\s*
(?P<name> \\w+ ) \\s* : \\s*
(?P<type_name> \\w+ ) \\s*
(?: (?P<not_null> NOT )? \\s+ NULL )? \\s*
(?: DEFAULT \\s+ (?P<default> \\S+ ) )? \\s*
(?: COMPRESSION \\s+ '(?P<compression> \\w+ )' )? \\s*
$''', re.VERBOSE | re.IGNORECASE)
# length dtype for variable-size SciDB types
_length_dtype = numpy.dtype(numpy.uint32)
_length_fmt = '<I'
def __init__(self,
name,
type_name,
not_null=False,
default=None,
compression=None):
self.__name = name
self.type_name = type_name
self.not_null = bool(not_null)
self.default = default
self.compression = compression
self.fmt_scidb = '{}{}'.format(self.type_name,
'' if self.not_null else ' null')
self.fmt_struct = type_map_struct.get(self.type_name, None)
self._set_dtype()
def __iter__(self):
return (i for i in (
self.name,
self.type_name,
self.not_null,
self.default,
self.compression))
def __eq__(self, other):
return tuple(self) == tuple(other)
def __repr__(self):
return ('{}(' +
'name={!r}, ' +
'type_name={!r}, ' +
'not_null={!r}, ' +
'default={!r}, ' +
'compression={!r})').format(
type(self).__name__, *self)
def __str__(self):
return '{}:{}{}{}{}'.format(
self.name,
self.type_name,
' NOT NULL' if self.not_null else '',
' DEFAULT {}'.format(self.default) if self.default else '',
" COMPRESSION '{}'".format(self.compression)
if self.compression else '')
@property
def name(self):
return self.__name
@name.setter
def name(self, value):
self.__name = value
self._set_dtype()
def _set_dtype(self):
self.dtype_val = type_map_numpy.get(self.type_name, object)
# >>> numpy.dtype([(u"a", int)])
# TypeError: data type not understood
# https://github.com/numpy/numpy/issues/2407
# cannot use `self.name` directly, use `str(...)`
if self.not_null:
self.dtype = numpy.dtype([(str(self.name), self.dtype_val)])
else:
self.dtype = numpy.dtype([(str(self.name),
[('null', numpy.uint8),
('val', self.dtype_val)])])
def is_fixsize(self):
return self.dtype_val != object
def itemsize(self, buf=None, offset=0):
if self.dtype_val != object:
return self.dtype.itemsize
null_size = 0 if self.not_null else 1
value_size = numpy.frombuffer(
buf, numpy.uint32, 1, offset + null_size)[0]
return null_size + Attribute._length_dtype.itemsize + value_size
def frombytes(self, buf, offset=0, size=None, promo=False):
null_size = 0 if self.not_null else 1
if self.dtype_val == object:
if self.type_name == 'string':
val = buf[offset + null_size +
Attribute._length_dtype.itemsize:
offset + size - 1].decode('utf-8')
else:
val = buf[offset + null_size +
Attribute._length_dtype.itemsize:
offset + size]
else:
val = struct.unpack(
self.fmt_struct[0], buf[offset + null_size:offset + size])
if len(val) == 1:
val = val[0]
if self.not_null:
return val
else:
missing = struct.unpack('B', buf[offset:offset + null_size])[0]
if promo:
return val if missing == 255 else None
else:
return (missing, val)
def tobytes(self, val):
if self.dtype_val == object:
if self.type_name == 'string':
val_enc = val.encode('utf-8')
buf = b''.join(
[struct.pack(Attribute._length_fmt, len(val_enc) + 1),
val_enc,
b'\x00'])
elif self.type_name == 'binary':
buf = b''.join(
[struct.pack(Attribute._length_fmt, len(val)), val])
else:
raise NotImplementedError('Convert <{}> to bytes'.format(self))
else:
if self.not_null:
buf = struct.pack(self.fmt_struct[0], val)
else:
if isinstance(val, numpy.void):
# NumPy structured array
buf = struct.pack(self.fmt_struct[1], *val)
else:
buf = struct.pack(self.fmt_struct[1], 255, val)
return buf
@classmethod
def fromstring(cls, string):
try:
return cls(**Attribute._regex.match(string).groupdict())
except AttributeError:
raise Exception('Failed to parse attribute: {}'.format(string))
@classmethod
def fromdtype(cls, dtype_descr):
if isinstance(dtype_descr[1], str):
# e.g. ('name', 'int64')
dtype_val = dtype_descr[1]
not_null = True
else:
# e.g. ('name', [('null': 'int8'), ('val': 'int64')]
# ('name', [('time', 'datetime64'), ('tz', 'timedelta64')])
# ('name', [('null': 'int8'),
# ('val' : [('time', 'datetime64'),
# ('tz', 'timedelta64')])])
if dtype_descr[1][0][0] == 'null':
not_null = False
dtype_val = dtype_descr[1][1][1]
else:
not_null = True
dtype_val = dtype_descr[1]
dtype_val = numpy.dtype(dtype_val)
if dtype_val in type_map_inv_numpy.keys():
type_name = type_map_inv_numpy[dtype_val]
else:
# if dtype_val not found in map, try the dtype_val.type
# (without the length)
ty = numpy.dtype(dtype_val.type)
# e.g. '<U3' --type--> '<U' --map--> numpy.str_
if ty in type_map_inv_numpy.keys():
type_name = type_map_inv_numpy[ty]
else:
raise Exception(
'No SciDB type mapping for NumPy type {}'.format(
dtype_val))
return cls(name=dtype_descr[0] if dtype_descr[0] else one_att_name,
type_name=type_name,
not_null=not_null)
[docs]
class Dimension(object):
"""Represent SciDB array dimension
Construct a dimension using the Dimension constructor:
>>> Dimension('foo')
... # doctest: +NORMALIZE_WHITESPACE
Dimension(name='foo',
low_value=None,
high_value=None,
chunk_overlap=None,
chunk_length=None)
>>> Dimension('foo', -100, '10', '?', '1000')
... # doctest: +NORMALIZE_WHITESPACE
Dimension(name='foo',
low_value=-100,
high_value=10,
chunk_overlap='?',
chunk_length=1000)
Construct a dimension from a string:
>>> Dimension.fromstring('foo')
... # doctest: +NORMALIZE_WHITESPACE
Dimension(name='foo',
low_value=None,
high_value=None,
chunk_overlap=None,
chunk_length=None)
>>> Dimension.fromstring('foo=-100:*:?:10')
... # doctest: +NORMALIZE_WHITESPACE
Dimension(name='foo',
low_value=-100,
high_value='*',
chunk_overlap='?',
chunk_length=10)
"""
_regex = re.compile('''
\\s*
(?P<name> \\w+ ) \\s*
(?: = \\s* (?P<low_value> [^:\\s]+ ) \\s*
: \\s*
(?P<high_value> [^:\\s]+ ) \\s*
(?: : \\s* (?P<chunk_overlap> [^:\\s]+ ) \\s*
(?: : \\s* (?P<chunk_length> [^:\\s]+ ) )?
)?
)?
\\s* $''', re.VERBOSE)
def __init__(self,
name,
low_value=None,
high_value=None,
chunk_overlap=None,
chunk_length=None):
self.name = name
try:
self.low_value = int(low_value)
except (TypeError, ValueError):
self.low_value = low_value
try:
self.high_value = int(high_value)
except (TypeError, ValueError):
self.high_value = high_value
try:
self.chunk_overlap = int(chunk_overlap)
except (TypeError, ValueError):
self.chunk_overlap = chunk_overlap
try:
self.chunk_length = int(chunk_length)
except (TypeError, ValueError):
self.chunk_length = chunk_length
def __iter__(self):
return (i for i in (
self.name,
self.low_value,
self.high_value,
self.chunk_overlap,
self.chunk_length))
def __eq__(self, other):
return tuple(self) == tuple(other)
def __repr__(self):
return ('{}(' +
'name={!r}, ' +
'low_value={!r}, ' +
'high_value={!r}, ' +
'chunk_overlap={!r}, ' +
'chunk_length={!r})').format(
type(self).__name__, *self)
def __str__(self):
out = self.name
if self.low_value is not None:
out += '={}:{}'.format(self.low_value, self.high_value)
if self.chunk_overlap is not None:
out += ':{}'.format(self.chunk_overlap)
if self.chunk_length is not None:
out += ':{}'.format(self.chunk_length)
return out
@classmethod
def fromstring(cls, string):
try:
return cls(**Dimension._regex.match(string).groupdict())
except AttributeError:
raise Exception('Failed to parse dimension: {}'.format(string))
[docs]
class Schema(object):
"""Represent SciDB array schema
Construct a schema using Schema, Attribute, and Dimension
constructors:
>>> Schema('foo', (Attribute('x', 'int64'),), (Dimension('i', 0, 10),))
... # doctest: +NORMALIZE_WHITESPACE
Schema(name='foo',
atts=(Attribute(name='x',
type_name='int64',
not_null=False,
default=None,
compression=None),),
dims=(Dimension(name='i',
low_value=0,
high_value=10,
chunk_overlap=None,
chunk_length=None),))
Construct a schema using Schema constructor and fromstring methods
of Attribute and Dimension:
>>> Schema('foo',
... (Attribute.fromstring('x:int64'),),
... (Dimension.fromstring('i=0:10'),))
... # doctest: +NORMALIZE_WHITESPACE
Schema(name='foo',
atts=(Attribute(name='x',
type_name='int64',
not_null=False,
default=None,
compression=None),),
dims=(Dimension(name='i',
low_value=0,
high_value=10,
chunk_overlap=None,
chunk_length=None),))
Construct a schema from a string:
>>> Schema.fromstring(
... 'foo@1<x:int64 not null, y:double>[i=0:*; j=-100:0:0:10]')
... # doctest: +NORMALIZE_WHITESPACE
Schema(name='foo@1',
atts=(Attribute(name='x',
type_name='int64',
not_null=True,
default=None,
compression=None),
Attribute(name='y',
type_name='double',
not_null=False,
default=None,
compression=None)),
dims=(Dimension(name='i',
low_value=0,
high_value='*',
chunk_overlap=None,
chunk_length=None),
Dimension(name='j',
low_value=-100,
high_value=0,
chunk_overlap=0,
chunk_length=10)))
Print a schema constructed from a string:
>>> print(Schema.fromstring('<x:int64,y:float> [i=0:2:0:1000000; j=0:*]'))
... # doctest: +NORMALIZE_WHITESPACE
<x:int64,y:float> [i=0:2:0:1000000; j=0:*]
Format Schema object to only print the schema part without the
array name:
>>> '{:h}'.format(Schema.fromstring('foo<x:int64>[i]'))
'<x:int64> [i]'
"""
_regex_name = re.compile(
'\\s* (?: not \\s+ empty \\s+ )? (?P<name> [\\w@]+ )?', re.VERBOSE)
_regex_atts = re.compile(
'\\s* < ( [^,>]+ \\s* (?: , \\s* [^,>]+ \\s* )* ) >', re.VERBOSE)
_regex_dims = re.compile(
'\\s* \\[ ( [^;\\]]+ \\s* (?: ; \\s* [^;\\]]+ \\s* )* ) \\] \\s*',
re.VERBOSE)
def __init__(self, name=None, atts=(), dims=()):
self.name = name
self.atts = tuple(atts)
self.dims = tuple(dims)
# Set lazy
self.__atts_dtype = None
self.__atts_fmt_scidb = None
def __iter__(self):
return (i for i in (self.name, ) + self.atts + self.dims)
def __eq__(self, other):
return tuple(self) == tuple(other)
def __repr__(self):
return '{}(name={!r}, atts={!r}, dims={!r})'.format(
type(self).__name__, self.name, self.atts, self.dims)
def __str__(self):
return self._render()
def __format__(self, fmt_spec=''):
return self._render(no_name='h' in fmt_spec)
def _render(self, no_name=False):
return '{}<{}> [{}]'.format(
self.name if not no_name and self.name else '',
','.join(str(a) for a in self.atts),
'; '.join(str(d) for d in self.dims))
def _promo_warning(self):
cnt = sum(not a.not_null for a in self.atts)
if cnt:
warnings.warn(
('{} type(s) promoted for null support.' +
' Precision loss may occur').format(cnt),
stacklevel=2)
@property
def atts_dtype(self):
if self.__atts_dtype is None:
self.__atts_dtype = numpy.dtype(list(itertools.chain.from_iterable(
a.dtype.descr for a in self.atts)))
return self.__atts_dtype
@property
def atts_fmt_scidb(self):
if self.__atts_fmt_scidb is None:
self.__atts_fmt_scidb = '({})'.format(
', '.join(a.fmt_scidb for a in self.atts))
return self.__atts_fmt_scidb
def pprint(self):
print(self)
info = numpy.empty(
(len(self.atts) + len(self.dims),),
dtype=[('name', object),
('class', object),
('type', object),
('nullable', object),
('start', object),
('end', object),
('overlap', object),
('chunk', object)])
pos = 0
for a in self.atts:
info.put((pos,),
(a.name,
'attr',
a.type_name,
not a.not_null,
'',
'',
'',
''))
pos += 1
for d in self.dims:
info.put((pos,),
(d.name,
'dim',
'int64',
'',
d.low_value,
d.high_value,
d.chunk_overlap,
d.chunk_length))
pos += 1
print(pandas.DataFrame.from_records(info))
def is_fixsize(self):
return all(a.is_fixsize() for a in self.atts)
[docs]
def make_unique(self):
"""Make dimension and attribute names unique within the schema. Return
``True`` if any dimension or attribute was renamed.
>>> s = Schema(None, (Attribute('i', 'bool'),), (Dimension('i'),))
>>> print(s)
<i:bool> [i]
>>> s.make_unique()
True
>>> print(s)
<i:bool> [i_1]
>>> s = Schema.fromstring('<i:bool, i:int64>[i;i_1;i]')
>>> s.make_unique()
True
>>> print(s)
<i:bool,i_2:int64> [i_3; i_1; i_4]
"""
all_before = set(itertools.chain((a.name for a in self.atts),
(d.name for d in self.dims)))
# Check if overall duplicates are present
if len(all_before) < len(self.atts) + len(self.dims):
all_after = set()
# Process attributes
for a in self.atts:
# Start renaming after the first copy. First copy
# will not be in all_after. From second copy
# on-wards, a copy will be in all_after.
if a.name in all_after:
new_name_tmpl = a.name + '_{}'
count = 1
new_name = new_name_tmpl.format(count)
while (new_name in all_before or
new_name in all_after):
count += 1
new_name = new_name_tmpl.format(count)
a.name = new_name
all_after.add(a.name)
# Process dimensions
for d in self.dims:
if d.name in all_after:
new_name_tmpl = d.name + '_{}'
count = 1
new_name = new_name_tmpl.format(count)
while (new_name in all_before or
new_name in all_after):
count += 1
new_name = new_name_tmpl.format(count)
d.name = new_name
all_after.add(d.name)
# Reset dtype
self.__atts_dtype = None
return True
else:
return False
[docs]
def make_dims_atts(self):
"""Make attributes from dimensions and pre-append them to the
attributes list.
>>> s = Schema(None, (Attribute('x', 'bool'),), (Dimension('i'),))
>>> print(s)
<x:bool> [i]
>>> s.make_dims_atts()
>>> print(s)
<i:int64 NOT NULL,x:bool> [i]
>>> s = Schema.fromstring('<x:bool>[i;j]')
>>> s.make_dims_atts()
>>> print(s)
<i:int64 NOT NULL,j:int64 NOT NULL,x:bool> [i; j]
"""
self.atts = tuple(itertools.chain(
(Attribute(d.name, 'int64', not_null=True) for d in self.dims),
self.atts))
# Reset
self.__atts_dtype = None
self.__atts_fmt_scidb = None
def get_promo_atts_dtype(self):
self._promo_warning()
return numpy.dtype(
[a.dtype.descr[0] if a.not_null else
(a.dtype.names[0],
type_map_promo.get(
a.type_name, type_map_numpy.get(a.type_name, object)))
for a in self.atts])
def frombytes(self, buf, as_dataframe=False, dataframe_promo=True):
# Scan content and build (offset, size) metadata
off = 0
buf_meta = []
while off < len(buf):
meta = []
for att in self.atts:
sz = att.itemsize(buf, off)
meta.append((off, sz))
off += sz
buf_meta.append(meta)
# Create NumPy record array
if as_dataframe and dataframe_promo:
data = numpy.empty((len(buf_meta),),
dtype=self.get_promo_atts_dtype())
else:
data = numpy.empty((len(buf_meta),), dtype=self.atts_dtype)
# Extract values using (offset, size) metadata
# Populate NumPy record array
pos = 0
for meta in buf_meta:
data.put((pos,),
tuple(att.frombytes(
buf,
off,
sz,
promo=as_dataframe and dataframe_promo)
for (att, (off, sz)) in zip(self.atts, meta)))
pos += 1
return data
def tobytes(self, data):
buf_lst = []
if len(data.dtype) > 0:
# NumPy structured array
if len(self.atts_dtype) == 1:
# One attribute
atr = self.atts[0]
for cell in data:
buf_lst.append(atr.tobytes(cell[0]))
else:
# Multiple attributes
for cell in data:
for (atr, val) in zip(self.atts, cell):
buf_lst.append(atr.tobytes(val))
else:
# NumPy single-field array
atr = self.atts[0]
for val in data:
buf_lst.append(atr.tobytes(val))
return b''.join(buf_lst)
@classmethod
def fromstring(cls, string):
name_match = Schema._regex_name.match(string)
atts_match = Schema._regex_atts.match(string, name_match.end(0))
dims_match = Schema._regex_dims.match(string, atts_match.end(0))
name = name_match.groupdict()['name']
return cls(
name.strip() if name else None,
(Attribute.fromstring(s)
for s in atts_match.group(1).split(',')),
(Dimension.fromstring(s)
for s in dims_match.group(1).split(';')))
@classmethod
def fromdtype(cls, dtype):
return cls(
None,
(Attribute.fromdtype(dt) for dt in dtype.descr),
(Dimension(one_dim_name),))
if __name__ == "__main__":
import doctest
doctest.testmod(optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)