"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import abc
import re
from typing import Any, List, Optional, Sequence, Union
import typepy
from pathvalidate import ascii_symbols, unprintable_ascii_chars
from pathvalidate.error import ErrorReason, ValidationError
from ._column import Column
from ._func import validate_table_name
from ._validator import validate_sqlite_attr_name
from .error import SqlSyntaxError
WhereQuery = Union[str, "Where", "And", "Or"]
class QueryItemInterface(metaclass=abc.ABCMeta):
@abc.abstractmethod
def to_query(self) -> str: # pragma: no cover
pass
class QueryItem(QueryItemInterface):
def __init__(self, value: str) -> None:
try:
self._value = value.strip()
except AttributeError:
raise TypeError("name must be a string")
def __repr__(self) -> str:
return self.to_query()
def __format__(self, format_spec: str) -> str:
return self.to_query()
[docs]
class Table(QueryItem):
"""
:param str name: Table name.
:return: String that suitable for table name of a SQLite query.
:Examples:
>>> from simplesqlite.query import Table
>>> Table("length")
'length'
>>> Table("length(cm)")
'[length(cm)]'
>>> Table("string length")
"'string length'"
"""
__RE_NEED_BRACKET = re.compile("[{:s}]|^[0-9]+".format(re.escape("%()-+/.,")))
__RE_NEED_QUOTE = re.compile(r"[\s]+")
[docs]
def to_query(self) -> str:
name = self._value
if self.__RE_NEED_BRACKET.search(name):
return f"[{name:s}]"
if self.__RE_NEED_QUOTE.search(name):
return f"'{name:s}'"
return name
[docs]
class Attr(QueryItem):
"""
:param str name: Attribute name.
:param str operation:
Used as a SQLite function if the value is not empty.
:return: String that suitable for attribute name of a SQLite query.
:rtype: str
:Examples:
>>> from simplesqlite.query import Attr
>>> Attr("key")
'key'
>>> Attr("a+b")
'[a+b]'
>>> Attr("key", operation="SUM")
'SUM(key)'
"""
__RE_NEED_QUOTE = re.compile("[{:s}]".format(re.escape("[]_")))
__RE_NEED_BRACKET = re.compile(
r"[{:s}0-9\s]".format(re.escape("%(){}-+/.;:`'\"\0\\*?<>|!#&=~^@"))
)
__RE_SANITIZE = re.compile("[{:s}\n\r]".format(re.escape("'\",")))
[docs]
@classmethod
def sanitize(cls, name: str) -> str:
try:
return cls.__RE_SANITIZE.sub("_", name)
except TypeError:
return str(name)
def __init__(self, name: str, operation: str = "") -> None:
super().__init__(name)
self.__operation = operation
[docs]
def to_query(self) -> str:
name = self.sanitize(self._value)
need_quote = self.__RE_NEED_QUOTE.search(name) is not None
try:
validate_sqlite_attr_name(name)
except ValidationError as e:
if e.reason == ErrorReason.RESERVED_NAME and e.reusable_name is False:
need_quote = True
elif e.reason in (
ErrorReason.RESERVED_NAME,
ErrorReason.NULL_NAME,
ErrorReason.INVALID_CHARACTER,
):
pass
else:
raise
if need_quote:
sql_name = f'"{name:s}"'
elif self.__RE_NEED_BRACKET.search(name):
sql_name = f"[{name:s}]"
elif name == "join":
sql_name = f"[{name:s}]"
else:
sql_name = name
if typepy.is_not_null_string(self.__operation):
sql_name = f"{self.__operation:s}({sql_name:s})"
return sql_name
[docs]
class AttrList(list, QueryItemInterface):
"""
:param list/tuple names: Attribute names.
:param str operation:
Used as a SQLite function if the value is not empty.
:Examples:
>>> from simplesqlite.query import AttrList
>>> AttrList(["key", "a+b"]))
['key', '[a+b]']
>>> AttrList(["key", "a+b"], operation="AVG")
['AVG(key)', 'AVG([a+b])']
.. seealso::
:py:class:`Attr`
"""
[docs]
@classmethod
def sanitize(self, names: Sequence[str]) -> List[str]:
return [Attr.sanitize(name) for name in names]
def __init__(self, names: Sequence[str], operation: str = "") -> None:
self.__operation = operation
try:
super().__init__([Attr(name, operation) for name in names])
except AttributeError:
raise TypeError("name must be a string")
def __repr__(self) -> str:
return self.to_query()
def __format__(self, format_spec: str) -> str:
return self.to_query()
[docs]
def to_query(self) -> str:
return ",".join([str(attr) for attr in self])
[docs]
def append(self, item: Union[str, Attr]) -> None:
if not isinstance(item, (str, Attr)):
raise TypeError(f"item should be a str/Attr instance: actual={type(item)}")
if isinstance(item, str):
item = Attr(item, operation=self.__operation)
super().append(item)
class Distinct(QueryItem):
@property
def key(self) -> Union[Attr, AttrList]:
return self.__key
def __init__(self, key: Union[str, Attr, AttrList]) -> None:
if not isinstance(key, (str, Attr, AttrList)):
raise TypeError(f"key should be a string/Attr/AttrList instance: actual={type(key)}")
if isinstance(key, str):
self.__key: Union[Attr, AttrList] = Attr(key)
else:
self.__key = key
def to_query(self) -> str:
return f"DISTINCT {self.key}"
[docs]
class Value(QueryItem):
"""
:param str value: Value associated with a key.
:return:
String that suitable for a value of a key.
Return ``"NULL"`` if the value is |None|.
:rtype: str
:Examples:
>>> from simplesqlite.query import Value
>>> Value(1.2)
'1.2'
>>> Value("value")
"'value'"
>>> Value(None)
'NULL'
"""
def __init__(self, value: Any) -> None:
if isinstance(value, Value):
self._value = value._value
else:
self._value = value
[docs]
def to_query(self) -> str:
value: Any = self._value
if value is None:
return "NULL"
if typepy.Integer(value).is_type() or typepy.RealNumber(value).is_type():
return str(value)
try:
if value.find("'") != -1:
return f'"{value}"'
except (TypeError, AttributeError):
pass
if value == "CURRENT_TIMESTAMP":
return value
return f"'{value}'"
[docs]
class Where(QueryItem):
"""
``WHERE`` query clause.
:param Union[str, Column] key: Attribute name of the key.
:param Any value: Value of the right hand side associated with the key.
:param str cmp_operator: Comparison operator of WHERE query.
:raises simplesqlite.SqlSyntaxError:
If **a)** ``cmp_operator`` is invalid operator. Valid operators are as follows:
``"="``, ``"=="``, ``"!="``, ``"<>"``, ``">"``, ``">="``, ``"<"``, ``"<="``.
**b)** the ``value`` is |None| and the ``cmp_operator`` is not ``"="``/``"!="``.
:Examples:
>>> from simplesqlite.query import Where
>>> Where("key", "hoge")
"key = 'hoge'"
>>> Where("value", 1, cmp_operator=">")
'value > 1'
"""
__VALID_CMP_OPERATORS = ("=", "==", "!=", "<>", ">", ">=", "<", "<=")
@property
def key(self) -> str:
return self._value
@property
def value(self) -> Any:
return self.__rhs
def __init__(self, key: Union[str, Column], value: Any, cmp_operator: str = "=") -> None:
if isinstance(key, Column):
norm_key = key.get_column_name()
else:
norm_key = key
super().__init__(norm_key)
self.__rhs = value
self.__cmp_operator = cmp_operator
if not self.__cmp_operator:
raise SqlSyntaxError("cmp_operator required")
if self.__cmp_operator not in self.__VALID_CMP_OPERATORS:
raise SqlSyntaxError(f"operator not supported: {self.__cmp_operator}")
[docs]
def to_query(self) -> str:
if self.value is None:
if self.__cmp_operator == "=":
return f"{Attr(self.key)} IS NULL"
elif self.__cmp_operator == "!=":
return f"{Attr(self.key)} IS NOT NULL"
raise SqlSyntaxError(
f"Invalid operator ({self.__cmp_operator:s}) with None right-hand side"
)
return f"{Attr(self.key)} {self.__cmp_operator:s} {Value(self.value)}"
[docs]
class Or(list, QueryItemInterface):
"""
``OR`` query clause.
Args:
where_list (list of |arg_where_type|):
Query items that concatenating with ``OR``.
"""
def __init__(self, where_list: Sequence[WhereQuery]) -> None:
for where in where_list:
if not isinstance(where, (str, Where, And, Or)):
raise TypeError(
"where_list item must either string or Where/And/Or class instance: actual={}".format(
type(where)
)
)
super().__init__(where_list)
def __repr__(self) -> str:
return self.to_query()
def __format__(self, format_spec: str) -> str:
return self.to_query()
[docs]
def to_query(self) -> str:
item_list = []
for where in self:
if isinstance(where, And):
item_list.append(f"({where})")
else:
item_list.append(f"{where}")
return " OR ".join(item_list)
[docs]
class And(list, QueryItemInterface):
"""
``AND`` query clause.
Args:
where_list (list of |arg_where_type|):
Query items that concatenating with ``AND``.
"""
def __init__(self, where_list: Sequence[WhereQuery]) -> None:
for where in where_list:
if not isinstance(where, (str, Where, And, Or)):
raise TypeError(
"where_list item must either string or Where/And/Or class instance: actual={}".format(
type(where)
)
)
super().__init__(where_list)
def __repr__(self) -> str:
return self.to_query()
def __format__(self, format_spec: str) -> str:
return self.to_query()
[docs]
def to_query(self) -> str:
item_list = []
for where in self:
if isinstance(where, Or):
item_list.append(f"({where})")
else:
item_list.append(f"{where}")
return " AND ".join(item_list)
[docs]
class Select(QueryItem):
"""
``SELECT`` query clause.
:param select: Attribute for SELECT query.
:param str table: Table name of executing the query.
:param str where:
Add a WHERE clause to execute query, if the value is not |None|.
:param extra extra:
Add additional clause to execute query, if the value is not |None|.
:raises ValueError: ``select`` is empty string.
:raises simplesqlite.NameValidationError:
|raises_validate_table_name|
:Examples:
>>> from simplesqlite.query import Select, Where
>>> Select(select="value", table="example")
'SELECT value FROM example'
>>> Select(select="value", table="example", where=Where("key", 1))
'SELECT value FROM example WHERE key = 1'
>>> Select(select="value", table="example", where=Where("key", 1), extra="ORDER BY value")
'SELECT value FROM example WHERE key = 1 ORDER BY value'
"""
def __init__(
self,
select: Union[str, AttrList],
table: str,
where: Optional[WhereQuery] = None,
extra: Optional[str] = None,
) -> None:
self.__select = select
self.__table = table
self.__where = where
self.__extra = extra
validate_table_name(self.__table)
if self.__where and not isinstance(where, (str, Where, And, Or)):
raise TypeError(
f"where should be a str/Where/And/Or instance: actual={type(self.__where)}"
)
if not self.__select:
raise ValueError("SELECT query required")
[docs]
def to_query(self) -> str:
query_list = [f"SELECT {self.__select}", f"FROM {Table(self.__table)}"]
if self.__where:
query_list.append(f"WHERE {self.__where}")
if self.__extra:
query_list.append(self.__extra)
return " ".join(query_list)
class Insert(QueryItem):
"""
INSERT query for multiple records.
:param str table: Table name of executing the query.
:param AttrList attrs: Attributes that inserting to..
:raises simplesqlite.NameValidationError:
|raises_validate_table_name|
"""
def __init__(self, table: str, attrs: AttrList, values: Sequence[Any]) -> None:
validate_table_name(table)
if typepy.is_empty_sequence(attrs):
raise ValueError("empty attributes")
self.__table = table
self.__attrs = attrs
self.__values = [Value(value) for value in values]
def to_query(self) -> str:
return "INSERT INTO {:s}({:s}) VALUES ({:s})".format(
Table(self.__table),
",".join([attr.to_query() for attr in self.__attrs]),
",".join([value.to_query() for value in self.__values]),
)
class InsertMany(QueryItem):
"""
INSERT query for multiple records.
:param str table: Table name of executing the query.
:param AttrList attrs: Attributes that inserting to..
:raises simplesqlite.NameValidationError:
|raises_validate_table_name|
"""
def __init__(self, table: str, attrs: AttrList) -> None:
validate_table_name(table)
if not isinstance(attrs, AttrList):
raise TypeError(f"attr must be a AttrList class instance: actual={type(attrs)}")
if typepy.is_empty_sequence(attrs):
raise ValueError("empty attributes")
self.__table = table
self.__attrs = attrs
def to_query(self) -> str:
return "INSERT INTO {:s}({:s}) VALUES ({:s})".format(
Table(self.__table),
",".join([attr.to_query() for attr in self.__attrs]),
",".join(["?" for _ in self.__attrs]),
)
class Set(QueryItem):
"""
SET query clause.
"""
def __init__(self, key: Union[str, Column], value: Any) -> None:
if isinstance(key, Column):
norm_key = key.get_column_name()
else:
norm_key = key
self.__lhs = Attr(norm_key)
self.__rhs = Value(value)
def to_query(self) -> str:
return f"{self.__lhs} = {self.__rhs}"
def make_index_name(table_name: str, attr_name: str) -> str:
import hashlib
re_invalid_chars = re.compile(
"[{:s}]".format(re.escape("".join(ascii_symbols + unprintable_ascii_chars))), re.UNICODE
)
index_hash = hashlib.md5((table_name + attr_name).encode("utf8")).hexdigest()[:4]
return "{:s}_{:s}_index_{}".format(
re_invalid_chars.sub("", table_name), re_invalid_chars.sub("", attr_name), index_hash
)