The API reference is generated automatically from the Python package under pysrc/riffq. To refresh the documentation, run:
make docs
riffq
Riffq Python bindings and high-level server interface.
This package exposes a lightweight Python API around the Rust server core
(Server) and provides utilities for building custom query backends.
Exports:
Server: Low-level Rust server class (from the compiled extension).BaseConnection: Abstract base to implement your own connection logic.RiffqServer: Convenience wrapper that wires callbacks toServerand manages connection instances.
Modules:
| Name | Description |
|---|---|
connection |
Connection primitives and the high-level server wrapper. |
helpers |
Helper utilities for building Arrow results. |
testing |
Test helpers for riffq-backed servers. Stop a server subprocess without leaking |
Modules
riffq.connection
riffq.connection
Connection primitives and the high-level server wrapper.
This module defines two main concepts:
BaseConnection: an abstract per client connection that you subclass to implement authentication, query execution and lifecycle hooks.RiffqServer: a small orchestrator that owns the RustServer, createsBaseConnectioninstances on demand, and forwards events to them.
Callbacks
The Rust layer invokes Python callbacks with a callback function argument
that must be called to deliver results back to the server. Query callbacks
accept an Arrow C Stream capsule for result sets, or an error.
Classes:
| Name | Description |
|---|---|
BaseConnection |
Abstract per client connection. |
RiffqServer |
High level server that manages connections and forwards events. |
BaseConnection
Abstract per client connection.
Subclass this to implement authentication and query handling. One instance is created per remote client and reused for its lifecycle.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conn_id
|
int
|
Unique identifier assigned by the server. |
required |
executor
|
ThreadPoolExecutor
|
Thread pool used for offloading blocking work. |
required |
Methods:
| Name | Description |
|---|---|
arrow_batch |
Create a |
handle_auth |
Authenticate a client. |
handle_connect |
Handle successful TCP connection establishment. |
handle_disconnect |
Handle client disconnect cleanup. |
handle_query |
Execute a SQL statement and return results. |
send_reader |
Send a PyArrow reader back to the server. |
arrow_batch(values, names)
Create a RecordBatchReader from arrays and names.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
values
|
Sequence[Array]
|
Sequence of |
required |
names
|
Sequence[str]
|
Column names aligned with |
required |
Returns:
| Type | Description |
|---|---|
RecordBatchReader
|
pyarrow.RecordBatchReader: Reader yielding a single batch. |
handle_auth(user, password, host, database=None, callback=lambda *a, **k: None)
abstractmethod
Authenticate a client.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user
|
str
|
Username supplied by the client. |
required |
password
|
str
|
Password supplied by the client. |
required |
host
|
str
|
Requested host/server name. |
required |
database
|
Optional[str]
|
Optional initial database name. |
None
|
callback
|
Callable[..., None]
|
Invoke with |
lambda *a, **k: None
|
handle_connect(ip, port, server_name=None, callback=lambda *a, **k: None)
Handle successful TCP connection establishment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ip
|
str
|
Remote peer IP address. |
required |
port
|
int
|
Remote peer port. |
required |
callback
|
Callable[..., None]
|
Invoke with |
lambda *a, **k: None
|
handle_disconnect(ip, port, callback=lambda *a, **k: None)
Handle client disconnect cleanup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ip
|
str
|
Remote peer IP address. |
required |
port
|
int
|
Remote peer port. |
required |
callback
|
Callable[..., None]
|
Invoke with |
lambda *a, **k: None
|
handle_query(sql, callback=lambda *a, **k: None, **kwargs)
abstractmethod
Execute a SQL statement and return results.
Implementations should produce a pyarrow.RecordBatchReader and pass
its Arrow C Stream capsule to callback. To indicate an error, raise an
exception or pass an error to the callback if supported.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql
|
str
|
The SQL text to execute. |
required |
callback
|
Callable[..., None]
|
Callable to receive an Arrow C Stream capsule. |
lambda *a, **k: None
|
**kwargs
|
Any
|
Transport or driver specific flags (e.g., timeouts). |
{}
|
send_reader(reader, callback)
Send a PyArrow reader back to the server.
Converts a pyarrow.RecordBatchReader (or compatible) into an Arrow C
Stream capsule and passes it to the provided callback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reader
|
Any
|
A |
required |
callback
|
Callable[..., None]
|
Callable receiving a single C Stream capsule object. |
required |
RiffqServer
High level server that manages connections and forwards events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
listen_addr
|
str
|
Address string (e.g., "127.0.0.1:5432") to bind. |
required |
connection_cls
|
Type[BaseConnection]
|
Subclass of |
BaseConnection
|
Methods:
| Name | Description |
|---|---|
get_connection |
Get or create the |
handle_auth |
Forward an authentication request to the connection instance. |
handle_connect |
Forward a connect notification to the connection instance. |
handle_disconnect |
Forward a disconnect notification and release the connection. |
handle_query |
Forward a query to the connection instance. |
handle_shutdown |
Run cleanup once when the server shuts down. |
register_database |
Register a logical database for catalog emulation. |
register_schema |
Register a schema under a database for catalog emulation. |
register_table |
Register a table and its columns for catalog emulation. |
set_lazy_catalog |
Install a lazy (callback-driven) catalog source for catalog emulation. |
set_tls |
Enable TLS with certificate and key files. |
start |
Start the server event loop. |
get_connection(connection_id)
Get or create the BaseConnection for a given connection_id.
handle_auth(connection_id, user, password, host, database=None, callback=lambda *a, **k: None)
Forward an authentication request to the connection instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection_id
|
int
|
Server assigned identifier for this client. |
required |
user
|
str
|
Username supplied by the client. |
required |
password
|
str
|
Password supplied by the client. |
required |
host
|
str
|
Requested host/server name. |
required |
database
|
Optional[str]
|
Optional initial database name. |
None
|
callback
|
Callable[..., None]
|
Function to receive auth result. |
lambda *a, **k: None
|
handle_connect(connection_id, ip, port, server_name=None, callback=lambda *a, **k: None)
Forward a connect notification to the connection instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection_id
|
int
|
Server assigned identifier for this client. |
required |
ip
|
str
|
Remote peer IP address. |
required |
port
|
int
|
Remote peer port. |
required |
callback
|
Callable[..., None]
|
Function to acknowledge handling. |
lambda *a, **k: None
|
handle_disconnect(connection_id, ip, port, callback=lambda *a, **k: None)
Forward a disconnect notification and release the connection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection_id
|
int
|
Server assigned identifier for this client. |
required |
ip
|
str
|
Remote peer IP address. |
required |
port
|
int
|
Remote peer port. |
required |
callback
|
Callable[..., None]
|
Function to acknowledge handling. |
lambda *a, **k: None
|
handle_query(sql, callback, connection_id=None, **kwargs)
Forward a query to the connection instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql
|
str
|
SQL string to execute. |
required |
callback
|
Callable[..., None]
|
Function to receive an Arrow C Stream capsule. |
required |
connection_id
|
Optional[int]
|
Server assigned identifier for this client. |
None
|
**kwargs
|
Any
|
Driver specific flags forwarded as is. |
{}
|
handle_shutdown(callback)
Run cleanup once when the server shuts down.
The callback is invoked after the server stops accepting connections on
SIGINT or SIGTERM, before start() returns. Use it to flush or
checkpoint state (e.g. close a DuckDB connection) for a clean shutdown.
register_database(database_name)
Register a logical database for catalog emulation.
When start(catalog_emulation=True) is used, the server responds to
client metadata queries (pg_catalog) using entries registered via these
helpers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
database_name
|
str
|
Name of the database to expose via |
required |
register_schema(database_name, schema_name)
Register a schema under a database for catalog emulation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
database_name
|
str
|
Existing database registered via |
required |
schema_name
|
str
|
Schema name to add under the database. |
required |
register_table(database_name, schema_name, table_name, columns)
Register a table and its columns for catalog emulation.
The columns argument describes each column as a single key dict mapping
the column name to a small descriptor: { "name": { "type": <str>, "nullable": <bool> } }.
Supported type strings are aligned with riffq.helpers.to_arrow mapping
and include: int, float, bool, str/string, date, datetime.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
database_name
|
str
|
Target database name. |
required |
schema_name
|
str
|
Target schema name. |
required |
table_name
|
str
|
Table name to register. |
required |
columns
|
List[Dict[str, Dict[str, Any]]]
|
Column descriptors as described above. |
required |
set_lazy_catalog(source)
Install a lazy (callback-driven) catalog source for catalog emulation.
Instead of eagerly pre-registering every database/schema/table with
register_database/register_schema/register_table, supply one
source object and the server pulls catalog metadata from it on every
pg_catalog / information_schema scan — so the catalog always
reflects the source's live state.
source must expose four methods, each of which receives a callback
and invokes it with a list of row dicts:
databases(callback)->[{"oid": int, "name": str, "datdba"?: int}]schemas(database, callback)->[{"oid": int, "name": str, "owner_oid"?: int}]relations(database, schema, callback)->[{"oid": int, "reltype_oid": int, "name": str, "kind"?: "table"|"view"|"materialized_view", "owner_oid"?: int, "has_index"?: bool, "has_rules"?: bool, "has_triggers"?: bool, "row_security"?: bool}]— the optional flags populatepg_tables(tableownerviaowner_oid,hasindexes, etc.); omitowner_oidfor backends without ownership (it stays blank)columns(database, schema, relation, callback)->[{"name": str, "type_oid": int, "nullable": bool}]
OIDs are supplied by the source and written through verbatim; they must be stable across calls (so catalog joins resolve) and unique among the source's objects. A duplicate object (same name in the same scope) is an error; a source object whose name collides with a built-in replaces it.
When a lazy source is set, the eager register_* registrations are
ignored. Requires start(catalog_emulation=True).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
Any
|
The lazy catalog source object described above. |
required |
set_tls(crt, key)
Enable TLS with certificate and key files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
crt
|
str
|
Path to PEM encoded server certificate. |
required |
key
|
str
|
Path to PEM encoded private key. |
required |
Raises:
| Type | Description |
|---|---|
OSError
|
If |
start(tls=False, catalog_emulation=False, server_version=None)
Start the server event loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tls
|
bool
|
Turn ssl/tls on or off. When tls is true, remember you need to set server.set_tls(cert_path, key_path) |
False
|
catalog_emulation
|
bool
|
Turn pg_catalog & information_schema query handling by riffq. |
False
|
server_version
|
Optional[str]
|
Server version string eg: "17.6 (Homebrew)" If you omit this, we use hardcoded string in src/lib.rs |
None
|
Returns: None. Starts the underlying Rust server loop.
riffq.helpers
riffq.helpers
Helper utilities for building Arrow results.
Currently provides to_arrow for constructing an Arrow C Stream from a simple
schema description and row data. This is handy for small, programmatic results
without depending on a database engine.
Functions:
| Name | Description |
|---|---|
to_arrow |
Build an Arrow C Stream from schema and rows for regular python values |
to_arrow(schema_desc, rows)
Build an Arrow C Stream from schema and rows for regular python values
The schema is a list of dicts like { "name": str, "type": str } where
type is one of: int, float, bool, str/string, date,
datetime. Rows are sequences whose positional items match the schema
order.
Example usage:
callback(to_arrow([{"name": "val", "type": "int"}], [ [1], [2] ]))
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema_desc
|
list[dict]
|
Column descriptors in display order. |
required |
rows
|
list
|
Iterable of row sequences aligned to |
required |
Returns:
| Type | Description |
|---|---|
PyCapsule
|
A PyCapsule containing an Arrow C Stream suitable for returning to the |
PyCapsule
|
server callback. |