"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import logging
import os
import re
import sqlite3
from collections import OrderedDict, defaultdict
from sqlite3 import Connection, Cursor
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Tuple, Union, cast
import pathvalidate
import typepy
from dataproperty.typing import TypeHint
from mbstrdecoder import MultiByteStrDecoder
from sqliteschema import SQLITE_SYSTEM_TABLES, SQLiteSchemaExtractor
from tabledata import TableData
from typepy import extract_typepy_from_dtype
from ._common import extract_table_metadata
from ._func import copy_table, validate_table_name
from ._logger import logger
from ._sanitizer import SQLiteTableDataSanitizer
from .converter import RecordConvertor
from .error import (
AttributeNotFoundError,
DatabaseError,
NameValidationError,
NullDatabaseConnectionError,
OperationalError,
TableNotFoundError,
)
from .query import (
Attr,
AttrList,
Insert,
InsertMany,
QueryItem,
Select,
Set,
Table,
Value,
WhereQuery,
make_index_name,
)
from .sqlquery import SqlQuery
if TYPE_CHECKING:
import pandas
MEMORY_DB_NAME = ":memory:"
[docs]
class SimpleSQLite:
"""
Wrapper class for |sqlite3| module.
:param str database_src:
SQLite database source. Acceptable types are:
(1) File path to a database to be connected.
(2) sqlite3.Connection instance.
(3) SimpleSQLite instance
:param str mode: Open mode.
:param bool delayed_connection:
Delaying connection to a database until access to the database the first time,
if the value is |True|.
:param int max_workers:
Maximum number of workers to generate a table.
In default, the same as the total number of CPUs.
:param bool profile:
Recording SQL query execution time profile, if the value is |True|.
:param Any **connect_kwargs:
Keyword arguments passing to
`sqlite3.connect <https://docs.python.org/3/library/sqlite3.html#sqlite3.connect>`__.
.. seealso::
:py:meth:`.connect`
:py:meth:`.get_profile`
"""
dup_col_handler = "error"
global_debug_query = False
@property
def database_path(self) -> Optional[str]:
"""
:return: File path of the connected database.
:rtype: str
:Examples:
>>> from simplesqlite import SimpleSQLite
>>> con = SimpleSQLite("sample.sqlite", "w")
>>> con.database_path
'/tmp/sample.sqlite'
>>> con.close()
>>> print(con.database_path)
None
"""
if self.__delayed_connection_path:
return self.__delayed_connection_path
return self.__database_path
@property
def connection(self) -> Optional[Connection]:
"""
:return: |Connection| instance of the connected database.
:rtype: sqlite3.Connection
"""
self.__delayed_connect()
return self.__connection
@property
def schema_extractor(self) -> SQLiteSchemaExtractor:
return SQLiteSchemaExtractor(self, max_workers=self.__max_workers)
@property
def total_changes(self) -> int:
"""
.. seealso::
:py:attr:`sqlite3.Connection.total_changes`
"""
self.check_connection()
return self.connection.total_changes # type: ignore
@property
def mode(self) -> Optional[str]:
"""
:return: Connection mode: ``"r"``/``"w"``/``"a"``.
:rtype: str
.. seealso:: :py:meth:`.connect`
"""
return self.__mode
def __initialize_connection(self) -> None:
self.__database_path: Optional[str] = None
self.__connection: Optional[Connection] = None
self.__mode: Optional[str] = None
self.__delayed_connection_path: Optional[str] = None
self.__dict_query_count: Dict[str, int] = defaultdict(int)
self.__dict_query_totalexectime: Dict[str, float] = defaultdict(float)
def __init__(
self,
database_src: Union[Connection, "SimpleSQLite", str],
mode: str = "a",
delayed_connection: bool = True,
max_workers: Optional[int] = None,
profile: bool = False,
**connect_kwargs: Any,
) -> None:
self.debug_query = False
self.__initialize_connection()
self.__mode = mode
self.__max_workers = max_workers
self.__is_profile = profile
self.__connect_kwargs = connect_kwargs
if database_src is None:
raise TypeError("database_src must be not None")
if isinstance(database_src, SimpleSQLite):
self.__connection = database_src.connection
self.__database_path = database_src.database_path
self.debug_query = database_src.debug_query
return
if isinstance(database_src, sqlite3.Connection):
self.__connection = database_src
return
if delayed_connection:
self.__delayed_connection_path = database_src
return
self.connect(database_src, mode)
def __del__(self) -> None:
self.close()
def __enter__(self): # type: ignore
return self
def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore
self.close()
[docs]
def is_connected(self) -> bool:
"""
:return: |True| if the connection to a database is valid.
:rtype: bool
:Examples:
>>> from simplesqlite import SimpleSQLite
>>> con = SimpleSQLite("sample.sqlite", "w")
>>> con.is_connected()
True
>>> con.close()
>>> con.is_connected()
False
"""
try:
self.check_connection()
except NullDatabaseConnectionError:
return False
return True
[docs]
def check_connection(self) -> None:
"""
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:Sample Code:
.. code:: python
import simplesqlite
con = simplesqlite.SimpleSQLite("sample.sqlite", "w")
print("---- connected to a database ----")
con.check_connection()
print("---- disconnected from a database ----")
con.close()
try:
con.check_connection()
except simplesqlite.NullDatabaseConnectionError as e:
print(e)
:Output:
.. code-block:: none
---- connected to a database ----
---- disconnected from a database ----
null database connection
"""
if self.connection is None:
if not self.__delayed_connect():
raise NullDatabaseConnectionError("null database connection")
[docs]
def connect(self, database_path: str, mode: str = "a") -> None:
"""
Connect to a SQLite database.
Parameters to open a connection to an SQLite database are passed via
``connect_kwargs`` argument of the constructor.
:param str database_path:
Path to the SQLite database file to be connected.
:param str mode:
``"r"``: Open for read only.
``"w"``: Open for read/write.
Delete existing tables when connecting.
``"a"``: Open for read/write. Append to the existing tables.
:raises ValueError:
If ``database_path`` is invalid or |attr_mode| is invalid.
:raises simplesqlite.DatabaseError:
If the file is encrypted or is not a database.
:raises simplesqlite.OperationalError:
If unable to open the database file.
"""
self.close()
logger.debug(f"connect to a SQLite database: path='{database_path}', mode={mode}")
if mode == "r":
self.__verify_db_file_existence(database_path)
elif mode in ["w", "a"]:
self.__validate_db_path(database_path)
else:
raise ValueError("unknown connection mode: " + mode)
if database_path == MEMORY_DB_NAME:
self.__database_path = database_path
else:
self.__database_path = os.path.realpath(database_path)
try:
self.__connection = sqlite3.connect(database_path, **self.__connect_kwargs)
except sqlite3.OperationalError as e:
raise OperationalError(e)
self.__mode = mode
try:
# validate connection after connect
self.fetch_table_names()
except sqlite3.DatabaseError as e:
raise DatabaseError(e)
if mode != "w":
return
for table in self.fetch_table_names():
self.drop_table(table)
[docs]
def execute_query(
self, query: Union[str, QueryItem], caller: Optional[Tuple] = None
) -> Optional[Cursor]:
"""
Send arbitrary SQLite query to the database.
:param query: Query to executed.
:param tuple caller:
Caller information.
Expects the return value of :py:meth:`logging.Logger.findCaller`.
:return: The result of the query execution.
:rtype: sqlite3.Cursor
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.OperationalError: |raises_operational_error|
.. warning::
This method can execute an arbitrary query.
i.e. No access permissions check by |attr_mode|.
"""
import time
self.check_connection()
if typepy.is_null_string(query):
return None
if self.debug_query or self.global_debug_query:
logger.debug(query)
if self.__is_profile:
exec_start_time = time.time()
assert self.connection # to avoid type check error
try:
result = self.connection.execute(str(query))
except (sqlite3.OperationalError, sqlite3.IntegrityError) as e:
if caller is None:
caller = logging.getLogger().findCaller()
file_path, line_no, func_name = caller[:3]
raise OperationalError(
message="\n".join(
[
"failed to execute query at {:s}({:d}) {:s}".format(
file_path, line_no, func_name
),
f" - query: {MultiByteStrDecoder(query).unicode_str}",
f" - msg: {e}",
f" - db: {self.database_path}",
]
)
)
if self.__is_profile:
self.__dict_query_count[str(query)] += 1
elapse_time = time.time() - exec_start_time
self.__dict_query_totalexectime[str(query)] += elapse_time
return result
[docs]
def set_row_factory(self, row_factory: Optional[Callable]) -> None:
"""
Set row_factory to the database connection.
"""
self.check_connection()
self.__connection.row_factory = row_factory # type: ignore
[docs]
def select(
self,
select: Union[str, AttrList],
table_name: str,
where: Optional[WhereQuery] = None,
extra: Optional[str] = None,
) -> Optional[Cursor]:
"""
Send a SELECT query to the database.
:param select: Attribute for the ``SELECT`` query.
:param str table_name: |arg_select_table_name|
:param where: |arg_select_where|
:type where: |arg_where_type|
:param str extra: |arg_select_extra|
:return: Result of the query execution.
:rtype: sqlite3.Cursor
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:raises simplesqlite.OperationalError: |raises_operational_error|
"""
self.verify_table_existence(table_name)
return self.execute_query(
str(Select(select, table_name, where, extra)),
logging.getLogger().findCaller(),
)
[docs]
def select_as_dataframe(
self,
table_name: str,
columns: Optional[Sequence[str]] = None,
where: Optional[WhereQuery] = None,
extra: Optional[str] = None,
) -> "pandas.DataFrame":
"""
Get data in the database and return fetched data as a
:py:class:`pandas.Dataframe` instance.
:param str table_name: |arg_select_table_name|
:param columns: |arg_select_as_xx_columns|
:param where: |arg_select_where|
:param extra: |arg_select_extra|
:return: Table data as a :py:class:`pandas.Dataframe` instance.
:rtype: pandas.DataFrame
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:raises simplesqlite.OperationalError: |raises_operational_error|
:Example:
:ref:`example-select-as-dataframe`
.. note::
``pandas`` package required to execute this method.
"""
import pandas
if columns is None:
columns = self.fetch_attr_names(table_name)
result = self.select(
select=AttrList(columns), table_name=table_name, where=where, extra=extra
)
if result is None:
return pandas.DataFrame()
return pandas.DataFrame(result.fetchall(), columns=columns)
[docs]
def select_as_tabledata(
self,
table_name: str,
columns: Optional[Sequence[str]] = None,
where: Optional[WhereQuery] = None,
extra: Optional[str] = None,
type_hints: Optional[Dict[str, TypeHint]] = None,
) -> TableData:
"""
Get data in the database and return fetched data as a
:py:class:`tabledata.TableData` instance.
:param str table_name: |arg_select_table_name|
:param columns: |arg_select_as_xx_columns|
:param where: |arg_select_where|
:type where: |arg_where_type|
:param str extra: |arg_select_extra|
:return: Table data as a :py:class:`tabledata.TableData` instance.
:rtype: tabledata.TableData
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:raises simplesqlite.OperationalError: |raises_operational_error|
.. note::
``pandas`` package required to execute this method.
"""
if columns is None:
columns = self.fetch_attr_names(table_name)
result = self.select(
select=AttrList(columns), table_name=table_name, where=where, extra=extra
)
if result is None:
return TableData(None, [], [])
if type_hints is None:
type_hints = self.fetch_data_types(table_name)
return TableData(
table_name,
columns,
result.fetchall(),
type_hints=[type_hints.get(col) for col in columns],
max_workers=self.__max_workers,
)
[docs]
def select_as_dict(
self,
table_name: str,
columns: Optional[Sequence[str]] = None,
where: Optional[WhereQuery] = None,
extra: Optional[str] = None,
) -> "Optional[List[OrderedDict[str, Any]]]":
"""
Get data in the database and return fetched data as a
|OrderedDict| list.
:param str table_name: |arg_select_table_name|
:param list columns: |arg_select_as_xx_columns|
:param where: |arg_select_where|
:type where: |arg_where_type|
:param str extra: |arg_select_extra|
:return: Table data as |OrderedDict| instances.
:rtype: |list| of |OrderedDict|
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:raises simplesqlite.OperationalError: |raises_operational_error|
:Example:
:ref:`example-select-as-dict`
"""
return self.select_as_tabledata(table_name, columns, where, extra).as_dict().get(table_name)
[docs]
def select_as_memdb(
self,
table_name: str,
columns: Optional[Sequence[str]] = None,
where: Optional[WhereQuery] = None,
extra: Optional[str] = None,
) -> "SimpleSQLite":
"""
Get data in the database and return fetched data as a
in-memory |SimpleSQLite| instance.
:param str table_name: |arg_select_table_name|
:param columns: |arg_select_as_xx_columns|
:param where: |arg_select_where|
:type where: |arg_where_type|
:param str extra: |arg_select_extra|
:return:
Table data as a |SimpleSQLite| instance that connected to in
memory database.
:rtype: |SimpleSQLite|
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:raises simplesqlite.OperationalError: |raises_operational_error|
"""
table_schema = self.schema_extractor.fetch_table_schema(table_name)
memdb = connect_memdb(max_workers=self.__max_workers)
memdb.create_table_from_tabledata(
self.select_as_tabledata(table_name, columns, where, extra),
primary_key=table_schema.primary_key,
index_attrs=table_schema.index_list,
)
return memdb
[docs]
def insert(
self, table_name: str, record: Any, attr_names: Optional[Sequence[str]] = None
) -> bool:
"""
Send an INSERT query to the database.
:param str table_name: Table name of executing the query.
:param record: Record to be inserted.
:type record: |dict|/|namedtuple|/|list|/|tuple|
:raises IOError: |raises_write_permission|
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.OperationalError: |raises_operational_error|
:Example:
:ref:`example-insert-records`
"""
self.validate_access_permission(["w", "a"])
self.verify_table_existence(table_name, allow_view=False)
if attr_names is None:
attr_names = self.fetch_attr_names(table_name)
values = RecordConvertor.to_record(attr_names, record)
query = Insert(table_name, AttrList(attr_names), values).to_query()
if self.execute_query(query, logging.getLogger().findCaller()) is None:
return False
return True
[docs]
def insert_many(
self,
table_name: str,
records: Sequence[Union[Dict, Sequence]],
attr_names: Optional[Sequence[str]] = None,
) -> int:
"""
Send an INSERT query with multiple records to the database.
:param str table: Table name of executing the query.
:param records: Records to be inserted.
:type records: list of |dict|/|namedtuple|/|list|/|tuple|
:return: Number of inserted records.
:rtype: int
:raises IOError: |raises_write_permission|
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:raises simplesqlite.OperationalError: |raises_operational_error|
:Example:
:ref:`example-insert-records`
"""
self.validate_access_permission(["w", "a"])
self.verify_table_existence(table_name, allow_view=False)
if attr_names:
logger.debug(
"insert {number} records into {table}({attrs})".format(
number=len(records) if records else 0, table=table_name, attrs=attr_names
)
)
else:
logger.debug(
"insert {number} records into {table}".format(
number=len(records) if records else 0, table=table_name
)
)
if typepy.is_empty_sequence(records):
return 0
if attr_names is None:
attr_names = self.fetch_attr_names(table_name)
records = RecordConvertor.to_records(attr_names, records)
query = InsertMany(table_name, AttrList(attr_names)).to_query()
if self.debug_query or self.global_debug_query:
logging_count = 8
num_records = len(records)
logs = [query] + [
f" record {i:4d}: {record}" for i, record in enumerate(records[:logging_count])
]
if num_records - logging_count > 0:
logs.append(f" and other {num_records - logging_count} records will be inserted")
logger.debug("\n".join(logs))
assert self.connection # to avoid type check error
try:
self.connection.executemany(query, records)
except (sqlite3.OperationalError, sqlite3.IntegrityError) as e:
caller = logging.getLogger().findCaller()
file_path, line_no, func_name = caller[:3]
raise OperationalError(
f"{file_path:s}({line_no:d}) {func_name:s}: failed to execute query:\n"
+ f" query={query}\n"
+ f" msg='{e}'\n"
+ f" db={self.database_path}\n"
+ f" records={records[:2]}\n"
)
return len(records)
[docs]
def update(
self,
table_name: str,
set_query: Union[str, Sequence[Set]],
where: Optional[WhereQuery] = None,
) -> Optional[Cursor]:
"""Execute an UPDATE query.
Args:
table_name (|str|):
Table name of executing the query.
set_query (Union[str, Sequence[Set]]):
``SET`` clause for the update query.
where (|arg_where_type| , optional):
``WHERE`` clause for the update query.
Defaults to |None|.
Raises:
IOError:
|raises_write_permission|
simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
simplesqlite.OperationalError:
|raises_operational_error|
"""
self.validate_access_permission(["w", "a"])
self.verify_table_existence(table_name, allow_view=False)
query = SqlQuery.make_update(table_name, set_query, where)
return self.execute_query(query, logging.getLogger().findCaller())
[docs]
def delete(self, table_name: str, where: Optional[WhereQuery] = None) -> Optional[Cursor]:
"""
Send a DELETE query to the database.
:param str table_name: Table name of executing the query.
:param where: |arg_select_where|
:type where: |arg_where_type|
"""
self.validate_access_permission(["w", "a"])
self.verify_table_existence(table_name, allow_view=False)
query = f"DELETE FROM {table_name:s}"
if where:
query += f" WHERE {where:s}"
return self.execute_query(query, logging.getLogger().findCaller())
[docs]
def fetch_value(
self,
select: str,
table_name: str,
where: Optional[WhereQuery] = None,
extra: Optional[str] = None,
) -> Optional[int]:
"""
Fetch a value from the table. Return |None| if no value matches
the conditions, or the table not found in the database.
:param str select: Attribute for SELECT query
:param str table_name: Table name of executing the query.
:param where: |arg_select_where|
:type where: |arg_where_type|
:return: Result of execution of the query.
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.OperationalError: |raises_operational_error|
"""
try:
self.verify_table_existence(table_name)
except DatabaseError as e:
logger.debug(e)
return None
result = self.execute_query(
Select(select, table_name, where, extra), logging.getLogger().findCaller()
)
if result is None:
return None
fetch = result.fetchone()
if fetch is None:
return None
return fetch[0]
def fetch_values(
self,
select: Union[str, AttrList],
table_name: str,
where: Optional[WhereQuery] = None,
extra: Optional[str] = None,
) -> List:
result = self.select(select=select, table_name=table_name, where=where, extra=extra)
if result is None:
return []
return [record[0] for record in result.fetchall()]
[docs]
def fetch_table_names(
self, include_system_table: bool = False, include_view: bool = True
) -> List[str]:
"""
:return: List of table names in the database.
:rtype: list
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.OperationalError: |raises_operational_error|
:Sample Code:
.. code:: python
from simplesqlite import SimpleSQLite
con = SimpleSQLite("sample.sqlite", "w")
con.create_table_from_data_matrix(
"hoge",
["attr_a", "attr_b"],
[[1, "a"], [2, "b"]])
print(con.fetch_table_names())
:Output:
.. code-block:: python
['hoge']
"""
self.check_connection()
return self.schema_extractor.fetch_table_names(
include_system_table=include_system_table, include_view=include_view
)
[docs]
def fetch_view_names(self) -> List[str]:
"""
:return: List of table names in the database.
:rtype: list
"""
self.check_connection()
return self.schema_extractor.fetch_view_names()
[docs]
def fetch_attr_names(self, table_name: str) -> List[str]:
"""
:return: List of attribute names in the table.
:rtype: list
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:raises simplesqlite.OperationalError: |raises_operational_error|
:Example:
.. code:: python
import simplesqlite
table_name = "sample_table"
con = simplesqlite.SimpleSQLite("sample.sqlite", "w")
con.create_table_from_data_matrix(
table_name,
["attr_a", "attr_b"],
[[1, "a"], [2, "b"]])
print(con.fetch_attr_names(table_name))
try:
print(con.fetch_attr_names("not_existing"))
except simplesqlite.TableNotFoundError as e:
print(e)
:Output:
.. parsed-literal::
['attr_a', 'attr_b']
'not_existing' table not found in /tmp/sample.sqlite
"""
self.verify_table_existence(table_name)
return self.schema_extractor.fetch_table_schema(table_name).get_attr_names()
[docs]
def fetch_attr_type(self, table_name: str) -> Dict[str, str]:
"""
:return:
Dictionary of attribute names and attribute types in the table.
:rtype: dict
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:raises simplesqlite.OperationalError: |raises_operational_error|
"""
self.verify_table_existence(table_name, allow_view=False)
result = self.execute_query(
"SELECT sql FROM sqlite_master WHERE type='table' and name={:s}".format(
Value(table_name)
)
)
assert result # to avoid type check error
query = result.fetchone()[0]
match = re.search("[(].*[)]", query)
assert match # to avoid type check error
def get_entry(items: List[str]) -> List[str]:
key = " ".join(items[:-1])
value = items[-1]
return [key, value]
return dict([get_entry(item.split(" ")) for item in match.group().strip("()").split(", ")])
[docs]
def fetch_num_records(
self, table_name: str, where: Optional[WhereQuery] = None
) -> Optional[int]:
"""
Fetch the number of records in a table.
:param str table_name: Table name to get number of records.
:param where: |arg_select_where|
:type where: |arg_where_type|
:return:
Number of records in the table.
|None| if no value matches the conditions,
or the table not found in the database.
:rtype: int
"""
return self.fetch_value(select="COUNT(*)", table_name=table_name, where=where)
def fetch_data_types(self, table_name: str) -> Dict[str, TypeHint]:
_, _, type_hints = extract_table_metadata(self, table_name)
return type_hints
[docs]
def get_profile(self, profile_count: int = 50) -> List[Any]:
"""
Get profile of query execution time.
:param int profile_count:
Number of profiles to retrieve,
counted from the top query in descending order by
the cumulative execution time.
:return: Profile information for each query.
:rtype: list of |namedtuple|
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.OperationalError: |raises_operational_error|
:Example:
:ref:`example-get-profile`
"""
from collections import namedtuple
profile_table_name = "sql_profile"
value_matrix = [
[query, execute_time, self.__dict_query_count.get(query, 0)]
for query, execute_time in self.__dict_query_totalexectime.items()
]
attr_names = ("sql_query", "cumulative_time", "count")
con_tmp = connect_memdb(max_workers=self.__max_workers)
try:
con_tmp.create_table_from_data_matrix(
profile_table_name, attr_names, data_matrix=value_matrix
)
except ValueError:
return []
try:
result = con_tmp.select(
select="{:s},SUM({:s}),SUM({:s})".format(*attr_names),
table_name=profile_table_name,
extra="GROUP BY {:s} ORDER BY {:s} DESC LIMIT {:d}".format(
attr_names[0], attr_names[1], profile_count
),
)
except sqlite3.OperationalError:
return []
if result is None:
return []
SqliteProfile = namedtuple("SqliteProfile", " ".join(attr_names)) # type: ignore
return [SqliteProfile(*profile) for profile in result.fetchall()]
[docs]
def fetch_sqlite_master(self) -> List[Dict]:
"""
Get sqlite_master table information as a list of dictionaries.
:return: sqlite_master table information.
:rtype: list
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:Sample Code:
.. code:: python
import json
from simplesqlite import SimpleSQLite
con = SimpleSQLite("sample.sqlite", "w")
data_matrix = [
[1, 1.1, "aaa", 1, 1],
[2, 2.2, "bbb", 2.2, 2.2],
[3, 3.3, "ccc", 3, "ccc"],
]
con.create_table_from_data_matrix(
"sample_table",
["a", "b", "c", "d", "e"],
data_matrix,
index_attrs=["a"])
print(json.dumps(con.fetch_sqlite_master(), indent=4))
:Output:
.. code-block:: json
[
{
"tbl_name": "sample_table",
"sql": "CREATE TABLE 'sample_table' ('a' INTEGER, 'b' REAL, 'c' TEXT, 'd' REAL, 'e' TEXT)",
"type": "table",
"name": "sample_table",
"rootpage": 2
},
{
"tbl_name": "sample_table",
"sql": "CREATE INDEX sample_table_a_index ON sample_table('a')",
"type": "index",
"name": "sample_table_a_index",
"rootpage": 3
}
]
"""
self.check_connection()
return self.schema_extractor.fetch_sqlite_master()
[docs]
def has_table(self, table_name: str, include_view: bool = True) -> bool:
"""
:param str table_name: Table name to be tested.
:return: |True| if the database has the table.
:rtype: bool
:Sample Code:
.. code:: python
from simplesqlite import SimpleSQLite
con = SimpleSQLite("sample.sqlite", "w")
con.create_table_from_data_matrix(
"hoge",
["attr_a", "attr_b"],
[[1, "a"], [2, "b"]])
print(con.has_table("hoge"))
print(con.has_table("not_existing"))
:Output:
.. code-block:: python
True
False
"""
try:
validate_table_name(table_name)
except NameValidationError:
return False
return table_name in self.fetch_table_names(include_view=include_view)
[docs]
def has_view(self, view_name: str) -> bool:
"""
:param str table_name: Table name to be tested.
:return: |True| if the database has the table.
:rtype: bool
"""
try:
validate_table_name(view_name)
except NameValidationError:
return False
return view_name in self.fetch_view_names()
[docs]
def has_attr(self, table_name: str, attr_name: Optional[str]) -> bool:
"""
:param str table_name: Table name that the attribute exists.
:param str attr_name: Attribute name to be tested.
:return: |True| if the table has the attribute.
:rtype: bool
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:Sample Code:
.. code:: python
import simplesqlite
table_name = "sample_table"
con = simplesqlite.SimpleSQLite("sample.sqlite", "w")
con.create_table_from_data_matrix(
table_name,
["attr_a", "attr_b"],
[[1, "a"], [2, "b"]])
print(con.has_attr(table_name, "attr_a"))
print(con.has_attr(table_name, "not_existing"))
try:
print(con.has_attr("not_existing_table", "attr_a"))
except simplesqlite.DatabaseError as e:
print(e)
:Output:
.. parsed-literal::
True
False
'not_existing' table not found in /tmp/sample.sqlite
"""
self.verify_table_existence(table_name, allow_view=False)
if typepy.is_null_string(attr_name):
return False
return attr_name in self.fetch_attr_names(table_name)
[docs]
def has_attrs(self, table_name: str, attr_names: Sequence[str]) -> bool:
"""
:param str table_name: Table name that attributes exists.
:param attr_names: Attribute names to tested.
:return: |True| if the table has all of the attribute.
:rtype: bool
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:Sample Code:
.. code:: python
import simplesqlite
table_name = "sample_table"
con = simplesqlite.SimpleSQLite("sample.sqlite", "w")
con.create_table_from_data_matrix(
table_name,
["attr_a", "attr_b"],
[[1, "a"], [2, "b"]])
print(con.has_attrs(table_name, ["attr_a"]))
print(con.has_attrs(table_name, ["attr_a", "attr_b"]))
print(con.has_attrs(table_name, ["attr_a", "attr_b", "not_existing"]))
try:
print(con.has_attrs("not_existing_table", ["attr_a"]))
except simplesqlite.DatabaseError as e:
print(e)
:Output:
.. parsed-literal::
True
True
False
'not_existing' table not found in /tmp/sample.sqlite
"""
if typepy.is_empty_sequence(attr_names):
return False
not_exist_fields = [
attr_name for attr_name in attr_names if not self.has_attr(table_name, attr_name)
]
if not_exist_fields:
return False
return True
[docs]
def verify_table_existence(self, table_name: str, allow_view: bool = True) -> None:
"""
:param str table_name: Table name to be tested.
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:raises simplesqlite.NameValidationError:
|raises_validate_table_name|
:Sample Code:
.. code:: python
import simplesqlite
table_name = "sample_table"
con = simplesqlite.SimpleSQLite("sample.sqlite", "w")
con.create_table_from_data_matrix(
table_name,
["attr_a", "attr_b"],
[[1, "a"], [2, "b"]])
con.verify_table_existence(table_name)
try:
con.verify_table_existence("not_existing")
except simplesqlite.DatabaseError as e:
print(e)
:Output:
.. parsed-literal::
'not_existing' table not found in /tmp/sample.sqlite
"""
validate_table_name(table_name)
if self.has_table(table_name, include_view=allow_view):
return
raise TableNotFoundError(
f"'{table_name}' table not found in '{self.database_path}' database"
)
[docs]
def verify_attr_existence(self, table_name: str, attr_name: str) -> None:
"""
:param str table_name: Table name that the attribute exists.
:param str attr_name: Attribute name to tested.
:raises simplesqlite.AttributeNotFoundError:
If attribute not found in the table
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
:Sample Code:
.. code:: python
from simplesqlite import (
SimpleSQLite,
DatabaseError,
AttributeNotFoundError
)
table_name = "sample_table"
con = SimpleSQLite("sample.sqlite", "w")
con.create_table_from_data_matrix(
table_name,
["attr_a", "attr_b"],
[[1, "a"], [2, "b"]])
con.verify_attr_existence(table_name, "attr_a")
try:
con.verify_attr_existence(table_name, "not_existing")
except AttributeNotFoundError as e:
print(e)
try:
con.verify_attr_existence("not_existing", "attr_a")
except DatabaseError as e:
print(e)
:Output:
.. parsed-literal::
'not_existing' attribute not found in 'sample_table' table
'not_existing' table not found in /tmp/sample.sqlite
"""
self.verify_table_existence(table_name, allow_view=False)
if self.has_attr(table_name, attr_name):
return
raise AttributeNotFoundError(f"'{attr_name}' attribute not found in '{table_name}' table")
[docs]
def validate_access_permission(self, valid_permissions: Sequence[str]) -> None:
"""
:param valid_permissions:
List of permissions that access is allowed.
:type valid_permissions: |list|/|tuple|
:raises ValueError: If the |attr_mode| is invalid.
:raises IOError:
If the |attr_mode| not in the ``valid_permissions``.
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
"""
self.check_connection()
if typepy.is_null_string(self.mode):
raise ValueError("mode is not set")
if self.mode not in valid_permissions:
raise OSError(
"invalid access: expected-mode='{}', current-mode='{}'".format(
"' or '".join(valid_permissions), self.mode
)
)
[docs]
def drop_table(self, table_name: str) -> None:
"""
:param str table_name: Table name to drop.
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises IOError: |raises_write_permission|
"""
self.validate_access_permission(["w", "a"])
if table_name in SQLITE_SYSTEM_TABLES:
# warning message
return
if self.has_table(table_name, include_view=False):
query = f"DROP TABLE IF EXISTS '{table_name:s}'"
self.execute_query(query, logging.getLogger().findCaller())
elif self.has_view(table_name):
self.execute_query(f"DROP VIEW IF EXISTS {table_name}")
self.commit()
[docs]
def create_table(self, table_name: str, attr_descriptions: Sequence[str]) -> bool:
"""
:param str table_name: Table name to create.
:param list attr_descriptions: List of table description.
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises IOError: |raises_write_permission|
"""
self.validate_access_permission(["w", "a"])
table_name = table_name.strip()
if self.has_table(table_name):
return True
query = "CREATE TABLE IF NOT EXISTS '{:s}' ({:s})".format(
table_name, ", ".join(attr_descriptions)
)
logger.debug(query)
if self.execute_query(query, logging.getLogger().findCaller()) is None:
return False
return True
[docs]
def create_index(self, table_name: str, attr_name: str) -> None:
"""
:param str table_name:
Table name that contains the attribute to be indexed.
:param str attr_name: Attribute name to create index.
:raises IOError: |raises_write_permission|
:raises simplesqlite.NullDatabaseConnectionError:
|raises_check_connection|
:raises simplesqlite.TableNotFoundError:
|raises_verify_table_existence|
"""
self.verify_table_existence(table_name, allow_view=False)
self.validate_access_permission(["w", "a"])
query_format = "CREATE INDEX IF NOT EXISTS {index:s} ON {table}({attr})"
query = query_format.format(
index=make_index_name(table_name, attr_name),
table=Table(table_name),
attr=Attr(attr_name),
)
logger.debug(query)
self.execute_query(query, logging.getLogger().findCaller())
[docs]
def create_index_list(self, table_name: str, attr_names: Sequence[str]) -> None:
"""
:param str table_name: Table name that exists attribute.
:param list attr_names:
List of attribute names to create indices.
Ignore attributes that are not existing in the table.
.. seealso:: :py:meth:`.create_index`
"""
self.validate_access_permission(["w", "a"])
if typepy.is_empty_sequence(attr_names):
return
table_attr_set = set(self.fetch_attr_names(table_name))
index_attr_set = set(AttrList.sanitize(attr_names))
for attribute in list(table_attr_set.intersection(index_attr_set)):
self.create_index(table_name, attribute)
[docs]
def create_table_from_data_matrix(
self,
table_name: str,
attr_names: Sequence[str],
data_matrix: Any,
primary_key: Optional[str] = None,
add_primary_key_column: bool = False,
index_attrs: Optional[Sequence[str]] = None,
type_hints: Optional[Sequence[TypeHint]] = None,
) -> None:
"""
Create a table if not exists. Moreover, insert data into the created
table.
:param str table_name: Table name to create.
:param list attr_names: Attribute names of the table.
:param data_matrix: Data to be inserted into the table.
:type data_matrix: List of |dict|/|namedtuple|/|list|/|tuple|
:param str primary_key: |primary_key|
:param tuple index_attrs: |index_attrs|
:raises simplesqlite.NameValidationError:
|raises_validate_table_name|
:raises simplesqlite.NameValidationError:
|raises_validate_attr_name|
:raises ValueError: If the ``data_matrix`` is empty.
:Example:
:ref:`example-create-table-from-data-matrix`
.. seealso::
:py:meth:`.create_table`
:py:meth:`.insert_many`
:py:meth:`.create_index_list`
"""
self.__create_table_from_tabledata(
TableData(
table_name,
headers=attr_names,
rows=data_matrix,
type_hints=type_hints,
max_workers=self.__max_workers,
),
primary_key,
add_primary_key_column,
index_attrs,
)
[docs]
def create_table_from_tabledata(
self,
table_data: TableData,
primary_key: Optional[str] = None,
add_primary_key_column: bool = False,
index_attrs: Optional[Sequence[str]] = None,
) -> None:
"""
Create a table from :py:class:`tabledata.TableData`.
:param tabledata.TableData table_data: Table data to create.
:param str primary_key: |primary_key|
:param tuple index_attrs: |index_attrs|
.. seealso::
:py:meth:`.create_table_from_data_matrix`
"""
self.__create_table_from_tabledata(
table_data, primary_key, add_primary_key_column, index_attrs
)
[docs]
def create_table_from_csv(
self,
csv_source: str,
table_name: str = "",
attr_names: Sequence[str] = (),
delimiter: str = ",",
quotechar: str = '"',
encoding: str = "utf-8",
primary_key: Optional[str] = None,
add_primary_key_column: bool = False,
index_attrs: Optional[Sequence[str]] = None,
) -> None:
"""
Create a table from a CSV file/text.
:param str csv_source: Path to the CSV file or CSV text.
:param str table_name:
Table name to create.
Using CSV file basename as the table name if the value is empty.
:param list attr_names:
Attribute names of the table.
Use the first line of the CSV file as attributes if ``attr_names`` is empty.
:param str delimiter:
A one-character string used to separate fields.
:param str quotechar:
A one-character string used to quote fields containing special
characters, such as the ``delimiter`` or ``quotechar``,
or which contain new-line characters.
:param str encoding: CSV file encoding.
:param str primary_key: |primary_key|
:param tuple index_attrs: |index_attrs|
:raises ValueError: If the CSV data is invalid.
:Dependency Packages:
- `pytablereader <https://github.com/thombashi/pytablereader>`__
:Example:
:ref:`example-create-table-from-csv`
.. seealso::
:py:meth:`.create_table_from_data_matrix`
:py:func:`csv.reader`
:py:meth:`.pytablereader.CsvTableFileLoader.load`
:py:meth:`.pytablereader.CsvTableTextLoader.load`
"""
import pytablereader as ptr
loader = ptr.CsvTableFileLoader(csv_source)
if typepy.is_not_null_string(table_name):
loader.table_name = table_name
loader.headers = attr_names
loader.delimiter = delimiter
loader.quotechar = quotechar
loader.encoding = encoding
try:
for table_data in loader.load():
self.__create_table_from_tabledata(
table_data, primary_key, add_primary_key_column, index_attrs
)
return
except (ptr.InvalidFilePathError, OSError):
pass
loader = ptr.CsvTableTextLoader(csv_source)
if typepy.is_not_null_string(table_name):
loader.table_name = table_name
loader.headers = attr_names
loader.delimiter = delimiter
loader.quotechar = quotechar
loader.encoding = encoding
for table_data in loader.load():
self.__create_table_from_tabledata(
table_data, primary_key, add_primary_key_column, index_attrs
)
[docs]
def create_table_from_json(
self,
json_source: str,
table_name: str = "",
primary_key: Optional[str] = None,
add_primary_key_column: bool = False,
index_attrs: Optional[Sequence[str]] = None,
) -> None:
"""
Create a table from a JSON file/text.
:param str json_source: Path to the JSON file or JSON text.
:param str table_name: Table name to create.
:param str primary_key: |primary_key|
:param tuple index_attrs: |index_attrs|
:Dependency Packages:
- `pytablereader <https://github.com/thombashi/pytablereader>`__
:Examples:
:ref:`example-create-table-from-json`
.. seealso::
:py:meth:`.pytablereader.JsonTableFileLoader.load`
:py:meth:`.pytablereader.JsonTableTextLoader.load`
"""
import pytablereader as ptr
loader = ptr.JsonTableFileLoader(json_source)
if typepy.is_not_null_string(table_name):
loader.table_name = table_name
try:
for table_data in loader.load():
self.__create_table_from_tabledata(
table_data, primary_key, add_primary_key_column, index_attrs
)
return
except (ptr.InvalidFilePathError, OSError):
pass
loader = ptr.JsonTableTextLoader(json_source)
if typepy.is_not_null_string(table_name):
loader.table_name = table_name
for table_data in loader.load():
self.__create_table_from_tabledata(
table_data, primary_key, add_primary_key_column, index_attrs
)
[docs]
def create_table_from_dataframe(
self,
dataframe: "pandas.DataFrame",
table_name: str = "",
primary_key: Optional[str] = None,
add_primary_key_column: bool = False,
index_attrs: Optional[Sequence[str]] = None,
) -> None:
"""
Create a table from a pandas.DataFrame instance.
:param pandas.DataFrame dataframe: DataFrame instance to convert.
:param str table_name: Table name to create.
:param str primary_key: |primary_key|
:param tuple index_attrs: |index_attrs|
:Examples:
:ref:`example-create-table-from-df`
"""
self.__create_table_from_tabledata(
TableData.from_dataframe(
dataframe=dataframe,
table_name=table_name,
type_hints=[extract_typepy_from_dtype(dtype) for dtype in dataframe.dtypes],
),
primary_key,
add_primary_key_column,
index_attrs,
)
def dump(self, db_path: str, mode: str = "a") -> None:
with SimpleSQLite(db_path, mode=mode, max_workers=self.__max_workers) as dst_con:
for table_name in self.fetch_table_names(include_view=False):
copy_table(self, dst_con, src_table_name=table_name, dst_table_name=table_name)
[docs]
def rollback(self) -> None:
"""
.. seealso:: :py:meth:`sqlite3.Connection.rollback`
"""
try:
self.check_connection()
except NullDatabaseConnectionError:
return
logger.debug(f"rollback: path='{self.database_path}'")
assert self.connection # to avoid type check error
self.connection.rollback()
[docs]
def commit(self) -> None:
"""
.. seealso:: :py:meth:`sqlite3.Connection.commit`
"""
try:
self.check_connection()
except NullDatabaseConnectionError:
return
logger.debug(f"commit: path='{self.database_path}'")
assert self.connection # to avoid type check error
try:
self.connection.commit()
except sqlite3.ProgrammingError:
pass
[docs]
def close(self) -> None:
"""
Commit and close the connection.
.. seealso:: :py:meth:`sqlite3.Connection.close`
"""
if self.__delayed_connection_path and self.__connection is None:
self.__initialize_connection()
return
try:
self.check_connection()
except (SystemError, NullDatabaseConnectionError):
return
logger.debug(f"close connection to a SQLite database: path='{self.database_path}'")
self.commit()
assert self.connection # to avoid type check error
self.connection.close()
self.__initialize_connection()
@staticmethod
def __validate_db_path(database_path: str) -> None:
if typepy.is_null_string(database_path):
raise ValueError("null path")
if database_path == MEMORY_DB_NAME:
return
try:
pathvalidate.validate_filename(os.path.basename(database_path))
except AttributeError:
raise TypeError(f"database path must be a string: actual={type(database_path)}")
def __verify_db_file_existence(self, database_path: str) -> None:
"""
:raises SimpleSQLite.OperationalError: If unable to open database file.
"""
self.__validate_db_path(database_path)
if not os.path.isfile(os.path.realpath(database_path)):
raise OSError("file not found: " + database_path)
try:
connection = sqlite3.connect(database_path)
except sqlite3.OperationalError as e:
raise OperationalError(e)
connection.close()
def __delayed_connect(self) -> bool:
if self.__delayed_connection_path is None:
return False
# save and clear delayed_connection_path to avoid infinite recursion before
# calling the connect method
connection_path = self.__delayed_connection_path
self.__delayed_connection_path = None
self.connect(connection_path, cast(str, self.__mode))
return True
def __extract_attr_descs_from_tabledata(
self, table_data: TableData, primary_key: Optional[str], add_primary_key_column: bool
) -> List[str]:
if primary_key and not add_primary_key_column and primary_key not in table_data.headers:
raise ValueError("primary key must be one of the values of attributes")
attr_description_list: List[str] = []
if add_primary_key_column:
if not primary_key:
primary_key = "id"
if primary_key in table_data.headers:
raise ValueError(
"a primary key field that will be added should not conflict "
"with existing fields."
)
attr_description_list.append(f"{primary_key} INTEGER PRIMARY KEY AUTOINCREMENT")
for col, value_type in sorted(self.__extract_col_type_from_tabledata(table_data).items()):
attr_name = table_data.headers[col]
attr_description = f"{Attr(attr_name)} {value_type:s}"
if attr_name == primary_key:
attr_description += " PRIMARY KEY"
attr_description_list.append(attr_description)
return attr_description_list
@staticmethod
def __extract_col_type_from_tabledata(table_data: TableData) -> Dict:
"""
Extract data type name for each column as SQLite names.
:param tabledata.TableData table_data:
:return: { column_number : column_data_type }
:rtype: dictionary
"""
typename_table = {
typepy.Typecode.INTEGER: "INTEGER",
typepy.Typecode.REAL_NUMBER: "REAL",
typepy.Typecode.STRING: "TEXT",
}
return {
col_idx: typename_table.get(col_dp.typecode, "TEXT")
for col_idx, col_dp in enumerate(table_data.column_dp_list)
}
def __create_table_from_tabledata(
self,
table_data: TableData,
primary_key: Optional[str],
add_primary_key_column: bool,
index_attrs: Optional[Sequence[str]],
) -> None:
self.validate_access_permission(["w", "a"])
debug_msg_list = ["__create_table_from_tabledata:", f" tbldata={table_data}"]
if primary_key:
debug_msg_list.append(f" primary_key={primary_key}")
if add_primary_key_column:
debug_msg_list.append(f" add_primary_key_column={add_primary_key_column}")
if index_attrs:
debug_msg_list.append(f" index_attrs={index_attrs}")
logger.debug("\n".join(debug_msg_list))
if table_data.is_empty():
raise ValueError(f"input table_data is empty: {table_data}")
table_data = SQLiteTableDataSanitizer(
table_data, dup_col_handler=self.dup_col_handler, max_workers=self.__max_workers
).normalize()
table_name = table_data.table_name
assert table_name
self.create_table(
table_name,
self.__extract_attr_descs_from_tabledata(
table_data, primary_key, add_primary_key_column
),
)
if add_primary_key_column:
self.insert_many(table_name, [[None] + row for row in table_data.value_matrix])
else:
self.insert_many(table_name, table_data.value_matrix)
if typepy.is_not_empty_sequence(index_attrs):
self.create_index_list(table_name, AttrList.sanitize(index_attrs)) # type: ignore
self.commit()
[docs]
def connect_memdb(max_workers: Optional[int] = None) -> SimpleSQLite:
"""
:return: Instance of an in memory database.
:rtype: SimpleSQLite
:Example:
:ref:`example-connect-sqlite-db-mem`
"""
return SimpleSQLite(MEMORY_DB_NAME, "w", max_workers=max_workers)