How to test application code with MockPool¶
Test query logic without connecting to a real warehouse. MockPool
accepts fixture data and returns it through the same cursor interface as a real
connection pool, so your application code works identically in tests and production.
Set up MockPool¶
Create a MockPool, load fixture data keyed by view name, and
register it as the default pool:
from semolina import MockPool, register
pool = MockPool()
pool.load(
"sales",
[
{"revenue": 1000, "country": "US"},
{"revenue": 2000, "country": "CA"},
],
)
register("default", pool, dialect="mock")
The view_name passed to load() must match the
view= parameter on your SemanticView subclass.
Write a pytest test¶
Query your model the same way your application code does. MockPool returns
fixture data through SemolinaCursor, so assertions work on real Row objects:
import pytest
from semolina import (
SemanticView,
Metric,
Dimension,
MockPool,
register,
unregister,
)
class Sales(SemanticView, view="sales"):
revenue = Metric()
country = Dimension()
@pytest.fixture(autouse=True)
def mock_pool():
pool = MockPool()
pool.load(
"sales",
[
{"revenue": 1000, "country": "US"},
{"revenue": 2000, "country": "CA"},
],
)
register("default", pool, dialect="mock")
yield
unregister("default")
def test_revenue_query():
cursor = (
Sales.query()
.metrics(Sales.revenue)
.dimensions(Sales.country)
.execute()
)
rows = cursor.fetchall_rows()
assert len(rows) == 2
assert rows[0].country == "US"
assert rows[0].revenue == 1000
Use a named pool for isolation¶
Register the mock under a specific name and select it with .using() to avoid
conflicting with other pools in your test suite:
register("test", pool, dialect="mock")
cursor = (
Sales.query()
.metrics(Sales.revenue)
.using("test")
.execute()
)
Verify filter SQL¶
MockPool returns all fixture data regardless of .where() filters – it does
not evaluate predicates. Use the result to confirm your query runs, and check
Inspect generated SQL to verify the filter appears in the generated SQL:
def test_filtered_query():
cursor = (
Sales.query()
.metrics(Sales.revenue)
.dimensions(Sales.country)
.where(Sales.country == "US")
.execute()
)
rows = cursor.fetchall_rows()
assert (
len(rows) == 2
) # MockPool returns all fixture rows
Inspect generated SQL¶
Use .to_sql() to verify the SQL your query produces without executing it:
def test_sql_generation():
sql = (
Sales.query()
.metrics(Sales.revenue)
.dimensions(Sales.country)
.where(Sales.country == "US")
.to_sql()
)
assert 'AGG("revenue")' in sql
assert '"country"' in sql
Tip
.to_sql() uses Snowflake-style syntax (AGG(), double-quoted identifiers)
regardless of which pool is registered. Use it for structural assertions, not
dialect-specific SQL validation.
Clean up between tests¶
Always call unregister() in teardown to prevent pool registration
from leaking between tests. The autouse fixture pattern shown above handles this
automatically.
See also¶
How to choose and configure a backend – register real connection pools for Snowflake and Databricks
How to build queries – the full query API
Your first query – getting started with
MockPool