Data Source V2

Overview

Data Source V2 (DSV2) is Spark’s extensible API for integrating external data systems. It is a set of Java interfaces in the org.apache.spark.sql.connector package that allow connectors to plug into Spark’s query planning and execution. Connectors opt in to features and optimizations — such as filter pushdown, columnar reads, or catalog support — by implementing specific mix-in interfaces, so a minimal connector can add capabilities incrementally. Notable users include:

Compared to the earlier Data Source V1 API, DSV2 offers:

Feature Description
Java API The connector interfaces are pure Java (org.apache.spark.sql.connector), removing the Scala dependency that DSV1 required. A Python Data Source API (pyspark.sql.datasource) is also available as a wrapper for lightweight connectors written entirely in Python.
Catalog integration Connectors can expose namespaces, tables, views, and functions natively through Spark SQL.
Operator pushdown Connectors can accept pushed-down filters, required columns, aggregates, limits, offsets, and more.
Report partitioning and ordering Connectors can report the physical layout of data so that Spark can avoid unnecessary shuffles and sorts.
Requested distribution and ordering Write connectors can request that Spark repartition and sort incoming data before writing, enabling optimized data layouts such as clustering or Z-ordering.
Columnar reads Connectors can return data in columnar batches for vectorized processing.
Row-level DML Connectors can natively support DELETE, UPDATE, and MERGE INTO operations through dedicated interfaces.
Streaming support A unified Table abstraction supports batch, micro-batch, and continuous processing through the same interfaces.

Entry Points

There are two ways to plug a data source into Spark:

Entry Point Use Case
TableProvider The simpler entry point, for sources that identify a table by options (e.g. a file path or Kafka topic) rather than through a catalog. Can implement SupportsCatalogOptions to also participate in DDL via the session catalog.
CatalogPlugin Typically used by external data sources that manage their own catalog of namespaces, tables, and optionally views and functions (e.g. Iceberg, Delta Lake). Registered via spark.sql.catalog.<name>=com.example.MyCatalog.

TableProvider

TableProvider returns a Table given a set of options. Implementations must have a public no-arg constructor.

The key methods are:

Method Description
inferSchema(options) Infer the table’s schema from the given options
inferPartitioning(options) Optionally infer the table’s partitioning
getTable(schema, partitioning, properties) Return a Table for the resolved schema and partitioning

A TableProvider can also implement SupportsCatalogOptions to participate in DDL such as CREATE TABLE by extracting a catalog identifier from the user-supplied options. This bridges option-based sources to the session catalog, which is how built-in file data sources (Parquet, ORC, etc.) support table creation.

CatalogPlugin

CatalogPlugin is a marker interface for catalog implementations. After instantiation, Spark calls initialize(name, options) with the catalog name and all configuration properties that share the prefix spark.sql.catalog.<name>..

A catalog adds capabilities by mixing in additional catalog interfaces:

Interface Capability
TableCatalog List, load, create, alter, and drop tables
StagingTableCatalog Atomic create-table-as-select / replace-table-as-select
SupportsNamespaces Create, alter, drop, and list namespaces
ViewCatalog List, load, create, alter, and drop views (work in progress — not yet integrated into query resolution)
FunctionCatalog List and load functions
ProcedureCatalog Load and list stored procedures

CatalogExtension is a special variant that wraps Spark’s built-in session catalog and can be used to add custom behavior while delegating to the default implementation.

Catalog Interfaces

The interfaces below are mix-ins that a CatalogPlugin implements to expose specific categories of metadata operations through Spark SQL.

TableCatalog

TableCatalog extends CatalogPlugin and provides methods for Table lifecycle management:

Method Description
listTables(namespace) List tables in a namespace
loadTable(ident) Load a table by identifier
createTable(ident, columns, partitions, properties) Create a new table
alterTable(ident, changes...) Alter a table’s schema, properties, or constraints
dropTable(ident) Drop a table

StagingTableCatalog

StagingTableCatalog extends TableCatalog and enables atomic CREATE TABLE AS SELECT and REPLACE TABLE AS SELECT. It returns a StagedTable whose changes only become visible when commitStagedChanges() is called. If the write fails, the catalog remains unchanged.

SupportsNamespaces

SupportsNamespaces adds namespace (database/schema) management:

Method Description
listNamespaces() List child namespaces
createNamespace(namespace, metadata) Create a namespace
alterNamespace(namespace, changes...) Alter namespace properties
dropNamespace(namespace, cascade) Drop a namespace

ViewCatalog

Note: ViewCatalog is a work in progress. The interface is defined but is not yet integrated into Spark’s query resolution or planning.

ViewCatalog extends CatalogPlugin and provides methods for view lifecycle management:

Method Description
listViews(namespace) List views in a namespace
loadView(ident) Load a view by identifier
createView(viewInfo) Create a new view
replaceView(viewInfo, orCreate) Replace (or create) a view
alterView(ident, changes...) Alter a view’s properties or schema
dropView(ident) Drop a view
renameView(oldIdent, newIdent) Rename a view

FunctionCatalog

FunctionCatalog adds user-defined function management:

Method Description
listFunctions(namespace) List functions in a namespace
loadFunction(ident) Load an UnboundFunction by identifier

ProcedureCatalog

ProcedureCatalog adds stored-procedure support:

Method Description
listProcedures(namespace) List procedures in a namespace
loadProcedure(ident) Load an UnboundProcedure by identifier

Procedures are invoked via CALL catalog.procedure(args).

Table

Table is the central abstraction representing a logical dataset — for example, a directory of Parquet files, a Kafka topic, or a table managed by an external metastore. A Table gains read and write abilities through mix-in interfaces.

A Table provides:

Method Description
name() A human-readable identifier for the table
columns() The table’s columns (replaces the deprecated schema() method)
partitioning() Physical partitioning expressed as Transform arrays
properties() A string map of table properties
capabilities() A set of TableCapability values declaring what the table supports

Read and Write Mix-ins

A Table gains read and write abilities by implementing mix-in interfaces:

Additional mix-ins enable further capabilities:

Mix-in Capability
SupportsDelete / SupportsDeleteV2 Filter-based row delete
TruncatableTable TRUNCATE TABLE
SupportsPartitionManagement Partition DDL (ADD/DROP/RENAME PARTITION)
SupportsMetadataColumns Expose hidden metadata columns (e.g. file name, row position)

Read Path

The read path follows a builder pattern that separates logical planning from physical execution:

SupportsRead.newScanBuilder(options)
  └─▸ ScanBuilder          (logical: pushdown negotiation)
        └─▸ Scan            (logical: read schema, description)
              └─▸ Batch     (physical: partitions + reader factory)
                    ├─ InputPartition[]
                    └─ PartitionReaderFactory
                         └─▸ PartitionReader   (per-task I/O)

ScanBuilder

ScanBuilder is the starting point for configuring a read. Spark calls build() to obtain the final Scan.

Before calling build(), Spark negotiates operator pushdown with the scan builder by checking for mix-in interfaces. The pushdown order is:

  1. Sample (SupportsPushDownSample)
  2. Filter (SupportsPushDownFilters / SupportsPushDownV2Filters)
  3. Aggregate (SupportsPushDownAggregates)
  4. Limit / Top-N (SupportsPushDownLimit / SupportsPushDownTopN)
  5. Offset (SupportsPushDownOffset)
  6. Column pruning (SupportsPushDownRequiredColumns)

Each mix-in interface has a method that Spark calls with the relevant operators. The implementation returns the operators it can handle, and Spark applies the remaining operators itself.

Scan

Scan is a logical representation of a configured data source read. It provides:

Method Description
readSchema() The actual schema after any column pruning or pushdown
toBatch() Returns a Batch for batch execution
toMicroBatchStream(checkpointLocation) Returns a MicroBatchStream for streaming
toContinuousStream(checkpointLocation) Returns a ContinuousStream for continuous processing

Implementations must override the method corresponding to the TableCapability declared by their Table.

Batch

Batch is the physical representation of a batch read. It has two methods:

Method Description
planInputPartitions() Returns an array of InputPartition objects; each partition maps to one Spark task
createReaderFactory() Returns a PartitionReaderFactory that is serialized and sent to executors

InputPartition and PartitionReader

InputPartition is a serializable handle representing a data split. It may optionally declare preferredLocations() for data locality.

PartitionReaderFactory is serialized to executors and creates a PartitionReader for each input partition. The reader iterates over rows (or ColumnarBatch instances for columnar sources) and is closed after consumption.

Scan Mix-ins

Scan implementations can opt in to additional optimizations by implementing mix-in interfaces:

Mix-in Capability
SupportsReportPartitioning Reports how the output is partitioned, allowing Spark to avoid unnecessary shuffles (e.g. Storage Partition Join)
SupportsReportOrdering Reports the sort order of the output, allowing Spark to skip redundant sorts
SupportsReportStatistics Reports table and column-level statistics for the cost-based optimizer
SupportsRuntimeV2Filtering Accepts additional filter values at execution time, used for dynamic partition pruning and to narrow row-level DML rewrites

Write Path

The write path mirrors the read path with a builder pattern that separates logical configuration from physical execution:

SupportsWrite.newWriteBuilder(info)
  └─▸ WriteBuilder           (logical: mode selection)
        └─▸ Write             (logical: description, metrics)
              └─▸ BatchWrite  (physical: commit protocol)
                    ├─ DataWriterFactory
                    │    └─▸ DataWriter      (per-task I/O)
                    ├─ commit(messages[])
                    └─ abort(messages[])

WriteBuilder

WriteBuilder is the starting point for configuring a write. Calling build() returns a logical Write object.

Write modes are configured by mixing in additional interfaces on the builder:

Mix-in Mode
SupportsTruncate Truncate the table before writing
SupportsOverwriteV2 Overwrite data matching a filter expression
SupportsDynamicOverwrite Dynamically overwrite partitions

Write

Write is a logical representation of a configured write. Similar to Scan, it bridges to the physical layer:

Method Description
toBatch() Returns a BatchWrite for batch execution
toStreaming() Returns a StreamingWrite for streaming execution

BatchWrite

BatchWrite defines the two-phase commit protocol for batch writes:

  1. createBatchWriterFactory(info) — creates a DataWriterFactory that is serialized and sent to executors.
  2. On each executor, the factory creates a DataWriter per partition. If all rows are written successfully, the writer calls commit(); otherwise it calls abort().
  3. After all tasks complete, the driver calls either commit(messages[]) (if all tasks succeeded) or abort(messages[]) (if any task failed).

Data written by individual tasks should not be visible to readers until the driver-level commit succeeds.

Distribution and Ordering Requirements

A Write implementation can also implement RequiresDistributionAndOrdering to tell Spark how input data must be distributed and sorted before writing. Spark will insert shuffle and sort nodes as needed to satisfy these requirements.

Row-Level DML

DSV2 provides interfaces for connectors to support DELETE, UPDATE, and MERGE INTO statements.

Filter-Based Delete

The simplest form of DML is a filter-based delete, where entire groups of rows matching a predicate are removed without rewriting individual records.

SupportsDeleteV2 (and the older SupportsDelete) is a Table mix-in for this use case. Its key methods are:

Method Description
canDeleteWhere(predicates) Returns whether the data source can efficiently delete rows matching the given predicates. If this returns false, Spark falls back to row-level rewriting (see below)
deleteWhere(predicates) Deletes all rows matching the predicates

This approach is well-suited for data sources that can drop entire partitions or files without rewriting data.

Row-Level Operations

For more complex DML — such as UPDATE, MERGE INTO, or DELETE operations that cannot be handled by a simple filter — connectors implement SupportsRowLevelOperations. This interface returns a RowLevelOperation, which coordinates a read-and-rewrite cycle:

SupportsRowLevelOperations.newRowLevelOperationBuilder(info)
  └─▸ RowLevelOperation
        ├─ newScanBuilder(options)   → reads affected rows
        └─ newWriteBuilder(info)     → writes rewritten data

Data sources fall into two categories:

Expressions

The org.apache.spark.sql.connector.expressions package provides a neutral expression representation used across the DSV2 API:

These expression types are independent of Spark’s internal Catalyst expressions, allowing connectors to avoid a dependency on Spark internals.

Streaming

The same Table and Scan abstractions support streaming queries. A Table that declares MICRO_BATCH_READ or CONTINUOUS_READ capabilities provides streaming reads through:

Method Description
Scan.toMicroBatchStream(checkpointLocation) Returns a MicroBatchStream that reads data in micro-batches, tracking progress through offsets
Scan.toContinuousStream(checkpointLocation) Returns a ContinuousStream for low-latency continuous processing

A Table that declares STREAMING_WRITE supports streaming writes through Write.toStreaming(), which returns a StreamingWrite.

Further Reading