Skip to content

Commit

Permalink
fix(duckdb): support memtables with all null columns in pyarrow output
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Dec 31, 2024
1 parent 56a66de commit 748a73b
Show file tree
Hide file tree
Showing 20 changed files with 163 additions and 37 deletions.
6 changes: 6 additions & 0 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ def _normalize_external_tables(self, external_tables=None) -> ExternalData | Non
n += 1
if not (schema := obj.schema):
raise TypeError(f"Schema is empty for external table {name}")
if null_fields := schema.null_fields:
raise com.IbisTypeError(
"ClickHouse doesn't support NULL-typed fields. "
"Consider assigning a type through casting or on construction. "
f"Got null typed fields: {null_fields}"
)

structure = [
f"{name} {type_mapper.to_string(typ.copy(nullable=not typ.is_nested()))}"
Expand Down
25 changes: 16 additions & 9 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import ibis.expr.types as ir
from ibis import util
from ibis.backends import CanCreateDatabase, UrlFromPath
from ibis.backends.duckdb.converter import DuckDBPandasData
from ibis.backends.duckdb.converter import DuckDBPandasData, DuckDBPyArrowData
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers.base import STAR, AlterTable, C, RenameTable
from ibis.common.dispatch import lazy_singledispatch
Expand Down Expand Up @@ -148,8 +148,6 @@ def create_table(

if obj is None and schema is None:
raise ValueError("Either `obj` or `schema` must be specified")
if schema is not None:
schema = ibis.schema(schema)

quoted = self.compiler.quoted
dialect = self.dialect
Expand All @@ -172,16 +170,25 @@ def create_table(
else:
query = None

if schema is None:
schema = table.schema()
else:
schema = ibis.schema(schema)

if null_fields := schema.null_fields:
raise exc.IbisTypeError(
"DuckDB does not support creating tables with NULL typed columns. "
"Ensure that every column has non-NULL type. "
f"NULL columns: {null_fields}"
)

if overwrite:
temp_name = util.gen_name("duckdb_table")
else:
temp_name = name

initial_table = sg.table(temp_name, catalog=catalog, db=database, quoted=quoted)
target = sge.Schema(
this=initial_table,
expressions=(schema or table.schema()).to_sqlglot(dialect),
)
target = sge.Schema(this=initial_table, expressions=schema.to_sqlglot(dialect))

create_stmt = sge.Create(
kind="TABLE",
Expand Down Expand Up @@ -252,7 +259,7 @@ def table(self, name: str, database: str | None = None) -> ir.Table:

table_schema = self.get_schema(name, catalog=catalog, database=database)
# load geospatial only if geo columns
if any(typ.is_geospatial() for typ in table_schema.types):
if table_schema.geospatial:
self.load_extension("spatial")
return ops.DatabaseTable(
name,
Expand Down Expand Up @@ -1302,7 +1309,7 @@ def to_pyarrow(
**_: Any,
) -> pa.Table:
table = self._to_duckdb_relation(expr, params=params, limit=limit).arrow()
return expr.__pyarrow_result__(table)
return expr.__pyarrow_result__(table, data_mapper=DuckDBPyArrowData)

def execute(
self,
Expand Down
22 changes: 22 additions & 0 deletions ibis/backends/duckdb/converter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import pyarrow as pa

from ibis.formats.pandas import PandasData
from ibis.formats.pyarrow import PyArrowData

if TYPE_CHECKING:
import ibis.expr.datatypes as dt


class DuckDBPandasData(PandasData):
@staticmethod
def convert_Array(s, dtype, pandas_type):
return s.replace(float("nan"), None)


class DuckDBPyArrowData(PyArrowData):
@classmethod
def convert_scalar(cls, scalar: pa.Scalar, dtype: dt.DataType) -> pa.Scalar:
if dtype.is_null():
return pa.scalar(None)
return super().convert_scalar(scalar, dtype)

@classmethod
def convert_column(cls, column: pa.Array, dtype: dt.DataType) -> pa.Array:
if dtype.is_null():
return pa.nulls(len(column))
return super().convert_column(column, dtype)
23 changes: 23 additions & 0 deletions ibis/backends/duckdb/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pytest import param

import ibis
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
from ibis.conftest import LINUX, SANDBOXED, not_windows
from ibis.util import gen_name
Expand Down Expand Up @@ -442,3 +443,25 @@ def test_pyarrow_batches_chunk_size(con): # 10443
batches = con.to_pyarrow_batches(t, chunk_size=-1)
with pytest.raises(TypeError):
next(batches)


@pytest.mark.parametrize(
"kwargs",
[
dict(obj=ibis.memtable({"a": [None]})),
dict(obj=ibis.memtable({"a": [None]}), schema=ibis.schema({"a": "null"})),
dict(schema=ibis.schema({"a": "null"})),
],
ids=["obj", "obj-schema", "schema"],
)
def test_create_table_with_nulls(con, kwargs):
t = ibis.memtable({"a": [None]})
schema = t.schema()

assert schema == ibis.schema({"a": "null"})
assert schema.null_fields == ("a",)

name = gen_name("duckdb_all_nulls")

with pytest.raises(com.IbisTypeError, match="NULL typed columns"):
con.create_table(name, **kwargs)
2 changes: 1 addition & 1 deletion ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
"Exasol cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
17 changes: 13 additions & 4 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,22 @@ def compile(
expr, params=params, pretty=pretty
) # Discard `limit` and other kwargs.

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
if null_columns := op.schema.null_fields:
raise exc.IbisTypeError(

Check warning on line 380 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L380

Added line #L380 was not covered by tests
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
)
self.create_view(op.name, op.data.to_frame(), schema=op.schema, temp=True)

Check warning on line 384 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L384

Added line #L384 was not covered by tests

def _finalize_memtable(self, name: str) -> None:
self.drop_view(name, temp=True, force=True)

Check warning on line 387 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L387

Added line #L387 was not covered by tests

def execute(self, expr: ir.Expr, **kwargs: Any) -> Any:
"""Execute an expression."""
self._verify_in_memory_tables_are_unique(expr)
self._register_udfs(expr)
self._run_pre_execute_hooks(expr)

Check warning on line 391 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L391

Added line #L391 was not covered by tests

table_expr = expr.as_table()
sql = self.compile(table_expr, **kwargs)
sql = self.compile(expr.as_table(), **kwargs)

Check warning on line 393 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L393

Added line #L393 was not covered by tests
df = self._table_env.sql_query(sql).to_pandas()

return expr.__pandas_result__(df)
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/impala/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,7 @@ def explain(

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
"Impala cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ def create_table(

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
"MS SQL cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def create_table(

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
"MySQL cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
5 changes: 5 additions & 0 deletions ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ def drop_table(

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := schema.null_fields:
raise exc.IbisTypeError(
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
)

name = op.name
quoted = self.compiler.quoted
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
from psycopg2.extras import execute_batch

schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise exc.IbisTypeError(
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/risingwave/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def create_table(

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
18 changes: 10 additions & 8 deletions ibis/backends/tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,9 @@ def test_array_intersect(con, data):
@pytest.mark.notimpl(
["trino"], reason="inserting maps into structs doesn't work", raises=TrinoUserError
)
@pytest.mark.notyet(
["flink"], raises=ValueError, reason="array of struct is not supported"
)
def test_unnest_struct(con):
data = {"value": [[{"a": 1}, {"a": 2}], [{"a": 3}, {"a": 4}]]}
t = ibis.memtable(data, schema=ibis.schema({"value": "!array<!struct<a: !int>>"}))
Expand All @@ -1063,8 +1066,8 @@ def test_unnest_struct(con):
@pytest.mark.notimpl(
["trino"], reason="inserting maps into structs doesn't work", raises=TrinoUserError
)
@pytest.mark.notimpl(
["flink"], reason="flink unnests a and b as separate columns", raises=Py4JJavaError
@pytest.mark.notyet(
["flink"], raises=ValueError, reason="array of struct is not supported"
)
def test_unnest_struct_with_multiple_fields(con):
data = {
Expand Down Expand Up @@ -1161,9 +1164,7 @@ def test_zip_null(con, fn):
["trino"], reason="inserting maps into structs doesn't work", raises=TrinoUserError
)
@pytest.mark.notyet(
["flink"],
raises=Py4JJavaError,
reason="does not seem to support field selection on unnest",
["flink"], raises=ValueError, reason="array of struct is not supported"
)
def test_array_of_struct_unnest(con):
jobs = ibis.memtable(
Expand Down Expand Up @@ -1683,15 +1684,16 @@ def test_table_unnest_column_expr(backend):
assert set(result.values) == set(expected.replace({np.nan: None}).values)


@pytest.mark.notimpl(
["datafusion", "polars", "flink"], raises=com.OperationNotDefinedError
)
@pytest.mark.notimpl(["datafusion", "polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(["trino"], raises=TrinoUserError)
@pytest.mark.notimpl(["postgres"], raises=PsycoPg2SyntaxError)
@pytest.mark.notimpl(["risingwave"], raises=PsycoPg2ProgrammingError)
@pytest.mark.notyet(
["risingwave"], raises=PsycoPg2InternalError, reason="not supported in risingwave"
)
@pytest.mark.notyet(
["flink"], raises=ValueError, reason="array of struct is not supported"
)
def test_table_unnest_array_of_struct_of_array(con):
t = ibis.memtable(
{
Expand Down
6 changes: 2 additions & 4 deletions ibis/backends/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1730,9 +1730,7 @@ def test_insert_into_table_missing_columns(con, temp_table):

@pytest.mark.notyet(["druid"], raises=AssertionError, reason="can't drop tables")
@pytest.mark.notyet(
["clickhouse", "flink"],
raises=AssertionError,
reason="memtables are assembled every time",
["clickhouse"], raises=AssertionError, reason="memtables are assembled every time"
)
@pytest.mark.notyet(
["bigquery"], raises=AssertionError, reason="test is flaky", strict=False
Expand Down Expand Up @@ -1778,7 +1776,7 @@ def test_same_name_memtable_is_overwritten(con):


@pytest.mark.notimpl(
["clickhouse", "flink"],
["clickhouse"],
raises=AssertionError,
reason="backend doesn't use _register_in_memory_table",
)
Expand Down
43 changes: 43 additions & 0 deletions ibis/backends/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pytest import param

import ibis
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
from ibis import util
from ibis.backends.tests.errors import (
Expand All @@ -16,6 +17,7 @@
ExaQueryError,
MySQLOperationalError,
OracleDatabaseError,
Py4JJavaError,
PyDeltaTableError,
PyDruidProgrammingError,
PyODBCProgrammingError,
Expand All @@ -28,6 +30,7 @@

pd = pytest.importorskip("pandas")
pa = pytest.importorskip("pyarrow")
pat = pytest.importorskip("pyarrow.types")

limit = [param(42, id="limit")]

Expand Down Expand Up @@ -618,4 +621,44 @@ def test_scalar_to_memory(limit, awards_players, output_format, converter):

expr = awards_players.filter(awards_players.awardID == "DEADBEEF").yearID.min()
res = method(expr)

assert converter(res) is None


mark_notyet_nulls = pytest.mark.notyet(
[
"clickhouse",
"exasol",
"flink",
"impala",
"mssql",
"mysql",
"oracle",
"postgres",
"risingwave",
"trino",
],
raises=com.IbisTypeError,
reason="unable to handle null types as input",
)


@mark_notyet_nulls
def test_all_null_table(con):
t = ibis.memtable({"a": [None]})
result = con.to_pyarrow(t)
assert pat.is_null(result["a"].type)


@mark_notyet_nulls
def test_all_null_column(con):
t = ibis.memtable({"a": [None]})
result = con.to_pyarrow(t.a)
assert pat.is_null(result.type)


@pytest.mark.notyet(["flink"], raises=Py4JJavaError)
def test_all_null_scalar(con):
e = ibis.literal(None)
result = con.to_pyarrow(e)
assert pat.is_null(result.type)
1 change: 1 addition & 0 deletions ibis/backends/tests/test_numeric.py
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,7 @@ def test_scalar_round_is_integer(con):
],
)
@pytest.mark.notyet(["exasol"], raises=ExaQueryError)
@pytest.mark.notimpl(["flink"], raises=NotImplementedError)
def test_memtable_decimal(con, numbers):
schema = ibis.schema(dict(numbers=dt.Decimal(38, 9)))
t = ibis.memtable({"numbers": numbers}, schema=schema)
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/trino/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
"Trino cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
Loading

0 comments on commit 748a73b

Please sign in to comment.