Skip to content

plpipes.database.driver#

Module for handling database drivers in the plpipes framework.

This module defines the Driver class that acts as an abstraction layer for database interactions, allowing for the use of various backends for database operations. It provides an interface for executing SQL commands, managing transactions, and interacting with tables and views within a database.

Classes:

Name Description
Driver

The base class for database drivers providing methods for executing SQL commands, handling transactions, and managing database backend interactions.

Driver #

Bases: Plugin

Driver class for handling database interactions.

Attributes:

Name Type Description
_default_backend_name str

The default backend name to use.

_backend_subkeys list

List of backend subkeys associated with this driver.

Methods:

Name Description
config

Returns the configuration for the driver.

driver_name

Returns the name of the driver.

begin

Context manager for starting a transaction.

_execute

Executes an SQL command in the transaction.

_execute_script

Executes a SQL script in the transaction.

_list_tables

Lists tables in the database.

_read_table

Reads a table from the database.

_drop_table

Drops a table from the database.

_create_table

Creates a new table.

_create_view

Creates a new view in the database.

_copy_table

Copies data between tables.

_query_chunked

Executes a chunked query.

_query_group

Executes a grouped query.

load_backend

Loads a specific backend into the driver.

Source code in src\plpipes\database\driver\__init__.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
class Driver(plpipes.plugin.Plugin):
    """
    Driver class for handling database interactions.

    Attributes:
        _default_backend_name (str): The default backend name to use.
        _backend_subkeys (list): List of backend subkeys associated with this driver.

    Methods:
        config(): Returns the configuration for the driver.
        driver_name(): Returns the name of the driver.
        begin(): Context manager for starting a transaction.
        _execute(txn, sql, parameters=None): Executes an SQL command in the transaction.
        _execute_script(txn, sql): Executes a SQL script in the transaction.
        _list_tables(txn): Lists tables in the database.
        _read_table(txn, table_name, backend, kws): Reads a table from the database.
        _drop_table(txn, table_name, only_if_exists): Drops a table from the database.
        _create_table(txn, table_name, sql_or_df, parameters, if_exists, kws): Creates a new table.
        _create_view(txn, view_name, sql, parameters, if_exists, kws): Creates a new view in the database.
        _copy_table(txn, from_table_name, to_table_name, if_exists, kws): Copies data between tables.
        _query_chunked(txn, sql, parameters, backend, kws): Executes a chunked query.
        _query_group(txn, sql, parameters, by, backend, kws): Executes a grouped query.
        load_backend(name): Loads a specific backend into the driver.
    """

    _default_backend_name = "pandas"
    _backend_subkeys = []

    @classmethod
    def _init_plugin(klass, key):
        """
        Initializes the plugin with backend registry and configuration.

        Args:
            klass: The class reference.
            key: The key of the plugin instance.
        """
        super()._init_plugin(key)
        klass._backend_registry = {}
        klass._backend_subkeys = [key, *klass._backend_subkeys]
        klass._create_table = klass._create_table.copy()

    @classmethod
    def _backend_lookup(klass, name):
        """
        Looks up and returns the specified backend by name.

        Args:
            klass: The class reference.
            name: The name of the backend to look up.

        Returns:
            backend: The backend instance associated with the specified name.
        """
        try:
            return klass._backend_registry[name]
        except KeyError:
            backend_class = _backend_class_registry.lookup(name, subkeys=klass._backend_subkeys)
            backend = backend_class()
            klass._backend_registry[name] = backend
            backend.register_handlers({'create_table': klass._create_table.td})
            logging.debug(f"backend {backend._plugin_name} for {klass._plugin_name} loaded")
            return backend

    def __init__(self, name, drv_cfg):
        """
        Initializes the Driver instance with a name and configuration.

        Args:
            name: The name of the database driver.
            drv_cfg: The configuration settings for the driver instance.
        """
        self._name = name
        self._cfg = drv_cfg
        self._last_key = 0
        self._default_backend = self._backend_lookup(self._cfg.get("backend", self._default_backend_name))
        for backend_name in self._cfg.get('extra_backends', []):
            self._backend_lookup(backend_name)

    def config(self):
        """
        Returns the configuration settings for the driver.

        Returns:
            Tree: A tree representation of the configuration.
        """
        return self._cfg.to_tree()

    def _backend(self, name):
        """
        Retrieves the requested backend or the default backend if none is specified.

        Args:
            name: The name of the backend to retrieve.

        Returns:
            backend: The specified backend instance or default backend if name is None.
        """
        if name is None:
            return self._default_backend
        logging.debug(f"looking up backend {name}")
        return self._backend_lookup(name)

    def driver_name(self):
        """
        Returns the name of the database driver.

        Returns:
            str: The name of the driver.
        """
        return self._plugin_name

    @optional_abstract
    @contextmanager
    def begin(self):
        """
        Context manager for beginning a database transaction.

        Yields:
            Transaction: A transaction object for conducting operations within a context.
        """
        ...

    @optional_abstract
    def _execute(self, txn, sql, parameters=None):
        """
        Executes an SQL command within a transaction.

        Args:
            txn: The transaction instance to execute the command within.
            sql: The SQL command to execute.
            parameters: Optional parameters for the SQL command.
        """
        ...

    @optional_abstract
    def _execute_script(self, txn, sql):
        """
        Executes a SQL script within a transaction.

        Args:
            txn: The transaction instance to execute the script within.
            sql: The SQL script to execute.
        """
        ...

    @optional_abstract
    def _list_tables(self, txn):
        """
        Lists all tables within the database for the given transaction.

        Args:
            txn: The transaction instance for querying the database.
        """
        ...

    def _next_key(self):
        """
        Generates the next unique key for database operations.

        Returns:
            int: A unique key for the current operation.
        """
        self._last_key += 1
        return self._last_key

    def _query(self, txn, sql, parameters, backend, kws):
        """
        Executes a database query and returns the result.

        Args:
            txn: The transaction instance to use for the query.
            sql: The SQL query string to execute.
            parameters: Optional parameters for the SQL query.
            backend: The backend to use for executing the query.
            kws: Additional keyword arguments for the backend.

        Returns:
            Result: The result of the query execution.
        """
        logging.debug(f"database query code: {repr(sql)}, parameters: {str(parameters)[0:40]}")
        return self._backend(backend).query(txn, sql, parameters, kws)

    def _query_first(self, txn, sql, parameters, backend, kws):
        """
        Executes a query and returns the first result.

        Args:
            txn: The transaction instance.
            sql: The SQL query string to execute.
            parameters: Optional parameters for the SQL query.
            backend: The backend to use for executing the query.
            kws: Additional keyword arguments.

        Returns:
            Result: The first result of the query execution.
        """
        logging.debug(f"database query code: {repr(sql)}, parameters: {str(parameters)[0:40]}")
        return self._backend(backend).query_first(txn, sql, parameters, kws)

    def _query_first_value(self, txn, sql, parameters, backend, kws):
        """
        Executes a query and returns the first value from the result.

        Args:
            txn: The transaction instance.
            sql: The SQL query string to execute.
            parameters: Optional parameters for the SQL query.
            backend: The backend to use for executing the query.
            kws: Additional keyword arguments.

        Returns:
            Any: The first value from the result of the query execution.
        """
        logging.debug(f"database query code: {repr(sql)}, parameters: {str(parameters)[0:40]}")
        return self._backend(backend).query_first_value(txn, sql, parameters, kws)

    @optional_abstract
    def _read_table(self, txn, table_name, backend, kws):
        """
        Reads a table from the database.

        Args:
            txn: The transaction instance.
            table_name: The name of the table to read.
            backend: The backend to use for reading the table.
            kws: Additional keyword arguments.

        Returns:
            DataFrame: The data read from the table.
        """
        ...

    @optional_abstract
    def _drop_table(self, txn, table_name, only_if_exists):
        """
        Drops a specified table from the database.

        Args:
            txn: The transaction instance.
            table_name: The name of the table to be dropped.
            only_if_exists: Boolean to specify if the table should only be dropped if it exists.
        """
        ...

    @dispatcher({str: '_create_table_from_str',
                 list: '_create_table_from_records',
                 types.GeneratorType: '_create_table_from_iterator',},
                ix=2)
    def _create_table(self, txn, table_name, sql_or_df, parameters, if_exists, kws):
        """
        Creates a table in the database from various input types.

        Args:
            txn: The transaction instance to create the table within.
            table_name: The name of the table to create.
            sql_or_df: The SQL command or DataFrame to define the table.
            parameters: Optional parameters for creating the table.
            if_exists: Specifies how to handle the table if it already exists.
            kws: Additional keyword arguments.
        """
        ...

    @optional_abstract
    def _create_table_from_str(self, txn, table_name, sql, parameters, if_exists, kws):
        """
        Creates a table from a SQL string command.

        Args:
            txn: The transaction instance.
            table_name: The name of the table to create.
            sql: SQL string defining the table schema.
            parameters: Optional parameters for the SQL command.
            if_exists: Specifies how to handle the table if it already exists.
            kws: Additional keyword arguments.
        """
        ...

    @optional_abstract
    def _create_table_from_clause(self, txn, table_name, clause, parameters, if_exists, kws):
        """
        Creates a table from a SQL clause.

        Args:
            txn: The transaction instance.
            table_name: The name of the table to create.
            clause: SQL clause for creating the table.
            parameters: Optional parameters for the SQL command.
            if_exists: Specifies how to handle the table if it already exists.
            kws: Additional keyword arguments.
        """
        ...

    def _create_table_from_records(self, txn, table_name, records, parameters, if_exists, kws):
        """
        Creates a table from a list of records.

        Args:
            txn: The transaction instance.
            table_name: The name of the table to create.
            records: Iterable containing records for the table.
            parameters: Optional parameters for creating the table.
            if_exists: Specifies how to handle the table if it already exists.
            kws: Additional keyword arguments.
        """
        backend = self._backend(kws.pop("backend", None))
        backend.create_table_from_records(txn, table_name, records, parameters, if_exists, kws)

    def _create_table_from_iterator(self, txn, table_name, iterator, parameters, if_exists, kws):
        """
        Creates a table from an iterator yielding records.

        Args:
            txn: The transaction instance.
            table_name: The name of the table to create.
            iterator: Iterator yielding records for the table.
            parameters: Optional parameters for creating the table.
            if_exists: Specifies how to handle the table if it already exists.
            kws: Additional keyword arguments.
        """
        for chunk in iterator:
            self._create_table(txn, table_name, chunk, parameters, if_exists, kws)
            if_exists = 'append'

    @optional_abstract
    def _create_view(self, txn, view_name, sql, parameters, if_exists, kws):
        """
        Creates a view in the database.

        Args:
            txn: The transaction instance.
            view_name: The name of the view to create.
            sql: SQL string defining the view.
            parameters: Optional parameters for creating the view.
            if_exists: Specifies how to handle the view if it already exists.
            kws: Additional keyword arguments.
        """
        ...

    @optional_abstract
    def _copy_table(self, txn, from_table_name, to_table_name, if_exists, kws):
        """
        Copies the contents of one table to another.

        Args:
            txn: The transaction instance.
            from_table_name: The name of the table to copy from.
            to_table_name: The name of the table to copy to.
            if_exists: Specifies how to handle the table if it already exists.
            kws: Additional keyword arguments.
        """
        ...

    @optional_abstract
    def _read_table_chunked(self, txn, table_name, backend, kws):
        """
        Reads a table in chunks.

        Args:
            txn: The transaction instance.
            table_name: The name of the table to read.
            backend: The backend to use for reading the table.
            kws: Additional keyword arguments.
        """
        ...

    def _query_chunked(self, txn, sql, parameters, backend, kws):
        """
        Executes a chunked query and returns results.

        Args:
            txn: The transaction instance.
            sql: The SQL query string to execute.
            parameters: Optional parameters for the SQL query.
            backend: The backend to use for executing the query.
            kws: Additional keyword arguments.

        Returns:
            Chunked results of the query execution.
        """
        return self._backend(backend).query_chunked(txn, sql, parameters, kws)

    def _query_group(self, txn, sql, parameters, by, backend, kws):
        """
        Executes a grouped query and returns results.

        Args:
            txn: The transaction instance.
            sql: The SQL query string to execute.
            parameters: Optional parameters for the SQL query.
            by: Column(s) to group the results by.
            backend: The backend to use for executing the query.
            kws: Additional keyword arguments.

        Returns:
            Grouped results of the query execution.
        """
        return self._backend(backend).query_group(txn, sql, parameters, by, kws)

    def load_backend(self, name):
        """
        Loads a specific backend into the driver.

        Args:
            name: The name of the backend to load.
        """
        self._backend_lookup(name)

__init__(name, drv_cfg) #

Initializes the Driver instance with a name and configuration.

Parameters:

Name Type Description Default
name

The name of the database driver.

required
drv_cfg

The configuration settings for the driver instance.

required
Source code in src\plpipes\database\driver\__init__.py
def __init__(self, name, drv_cfg):
    """
    Initializes the Driver instance with a name and configuration.

    Args:
        name: The name of the database driver.
        drv_cfg: The configuration settings for the driver instance.
    """
    self._name = name
    self._cfg = drv_cfg
    self._last_key = 0
    self._default_backend = self._backend_lookup(self._cfg.get("backend", self._default_backend_name))
    for backend_name in self._cfg.get('extra_backends', []):
        self._backend_lookup(backend_name)

begin() #

Context manager for beginning a database transaction.

Yields:

Name Type Description
Transaction

A transaction object for conducting operations within a context.

Source code in src\plpipes\database\driver\__init__.py
@optional_abstract
@contextmanager
def begin(self):
    """
    Context manager for beginning a database transaction.

    Yields:
        Transaction: A transaction object for conducting operations within a context.
    """
    ...

config() #

Returns the configuration settings for the driver.

Returns:

Name Type Description
Tree

A tree representation of the configuration.

Source code in src\plpipes\database\driver\__init__.py
def config(self):
    """
    Returns the configuration settings for the driver.

    Returns:
        Tree: A tree representation of the configuration.
    """
    return self._cfg.to_tree()

driver_name() #

Returns the name of the database driver.

Returns:

Name Type Description
str

The name of the driver.

Source code in src\plpipes\database\driver\__init__.py
def driver_name(self):
    """
    Returns the name of the database driver.

    Returns:
        str: The name of the driver.
    """
    return self._plugin_name

load_backend(name) #

Loads a specific backend into the driver.

Parameters:

Name Type Description Default
name

The name of the backend to load.

required
Source code in src\plpipes\database\driver\__init__.py
def load_backend(self, name):
    """
    Loads a specific backend into the driver.

    Args:
        name: The name of the backend to load.
    """
    self._backend_lookup(name)