Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import org.apache.paimon.iceberg.IcebergOptions
import org.apache.paimon.spark.catalog.FormatTableCatalog

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
import org.apache.spark.sql.execution.datasources.v2.{CreateTableAsSelectExec, DataSourceV2Relation, ReplaceTableAsSelectExec}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import scala.collection.JavaConverters._
Expand All @@ -39,30 +39,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends Strate
case _: StagingTableCatalog =>
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
case _ =>
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq

// Include Iceberg compatibility options in table properties (fix for DataFrame writer options)
val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq

val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys

val (tableOptions, writeOptions) = options.partition {
case (key, _) => allTableOptionKeys.contains(key)
}
val newProps = CatalogV2Util.withDefaultOwnership(props) ++ tableOptions

val isPartitionedFormatTable = {
catalog match {
case catalog: FormatTableCatalog =>
catalog.isFormatTable(newProps.get("provider").orNull) && parts.nonEmpty
case _ => false
}
}

if (isPartitionedFormatTable) {
throw new UnsupportedOperationException(
"Using CTAS with partitioned format table is not supported yet.")
}
val (newProps, writeOptions) = propertiesWithOptions(props, options)
failIfPartitionedFormatTable(catalog, newProps, parts)

CreateTableAsSelectExec(
catalog,
Expand All @@ -75,6 +53,66 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends Strate
ifNotExists
) :: Nil
}
case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
catalog match {
case _: StagingTableCatalog =>
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
case _ =>
val (newProps, writeOptions) = propertiesWithOptions(props, options)

ReplaceTableAsSelectExec(
catalog,
ident,
parts,
query,
planLater(query),
newProps,
new CaseInsensitiveStringMap(writeOptions.asJava),
orCreate,
invalidateCache
) :: Nil
}
case _ => Nil
}

private lazy val tableOptionKeys: Seq[String] = {
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq

// Include Iceberg compatibility options in table properties (fix for DataFrame writer options)
val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq

coreOptionKeys ++ icebergOptionKeys
}

private def propertiesWithOptions(
props: Map[String, String],
options: Map[String, String]): (Map[String, String], Map[String, String]) = {
val (tableOptions, writeOptions) = options.partition {
case (key, _) => tableOptionKeys.contains(key)
}
(CatalogV2Util.withDefaultOwnership(props) ++ tableOptions, writeOptions)
}

private def failIfPartitionedFormatTable(
catalog: TableCatalog,
properties: Map[String, String],
parts: Seq[_]): Unit = {
val isPartitionedFormatTable = {
catalog match {
case catalog: FormatTableCatalog =>
catalog.isFormatTable(properties.get("provider").orNull) && parts.nonEmpty
case _ => false
}
}

if (isPartitionedFormatTable) {
throw new UnsupportedOperationException(
"Using CTAS with partitioned format table is not supported yet.")
}
}

private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = {
val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
spark.sharedState.cacheManager.uncacheQuery(spark, v2Relation, true, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ package org.apache.spark.sql.execution.shim
import org.apache.paimon.CoreOptions
import org.apache.paimon.iceberg.IcebergOptions
import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.ResolvedDBObjectName
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec}
import org.apache.spark.sql.connector.catalog.StagingTableCatalog
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect, TableSpec}
import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
import org.apache.spark.sql.execution.datasources.v2.{CreateTableAsSelectExec, DataSourceV2Relation, ReplaceTableAsSelectExec}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import scala.collection.JavaConverters._
Expand All @@ -51,30 +50,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
case _: StagingTableCatalog =>
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
case _ =>
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq

// Include Iceberg compatibility options in table properties (fix for DataFrame writer options)
val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq

val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys

val (tableOptions, writeOptions) = options.partition {
case (key, _) => allTableOptionKeys.contains(key)
}
val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions)

val isPartitionedFormatTable = {
catalog match {
case catalog: FormatTableCatalog =>
catalog.isFormatTable(newTableSpec.provider.orNull) && parts.nonEmpty
case _ => false
}
}

if (isPartitionedFormatTable) {
throw new UnsupportedOperationException(
"Using CTAS with partitioned format table is not supported yet.")
}
val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options)
failIfPartitionedFormatTable(catalog, newTableSpec, parts)

CreateTableAsSelectExec(
catalog.asTableCatalog,
Expand All @@ -87,6 +64,64 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
ifNotExists
) :: Nil
}
case ReplaceTableAsSelect(
ResolvedDBObjectName(catalog: SparkCatalog, ident),
parts,
query,
tableSpec: TableSpec,
options,
orCreate) =>
catalog match {
case _: StagingTableCatalog =>
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
case _ =>
val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options)

ReplaceTableAsSelectExec(
catalog.asTableCatalog,
ident.asIdentifier,
parts,
query,
planLater(query),
qualifyLocInTableSpec(newTableSpec),
new CaseInsensitiveStringMap(writeOptions.asJava),
orCreate,
invalidateCache
) :: Nil
}
case _ => Nil
}

private lazy val tableOptionKeys: Seq[String] = {
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq

// Include Iceberg compatibility options in table properties (fix for DataFrame writer options)
val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq

coreOptionKeys ++ icebergOptionKeys
}

private def tableSpecWithOptions(
tableSpec: TableSpec,
options: Map[String, String]): (TableSpec, Map[String, String]) = {
val (tableOptions, writeOptions) = options.partition {
case (key, _) => tableOptionKeys.contains(key)
}
(tableSpec.copy(properties = tableSpec.properties ++ tableOptions), writeOptions)
}

private def failIfPartitionedFormatTable(
catalog: SparkCatalog,
tableSpec: TableSpec,
parts: Seq[_]): Unit = {
if (catalog.isFormatTable(tableSpec.provider.orNull) && parts.nonEmpty) {
throw new UnsupportedOperationException(
"Using CTAS with partitioned format table is not supported yet.")
}
}

private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = {
val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
spark.sharedState.cacheManager.uncacheQuery(spark, v2Relation, true, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ package org.apache.spark.sql.execution.shim
import org.apache.paimon.CoreOptions
import org.apache.paimon.iceberg.IcebergOptions
import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec}
import org.apache.spark.sql.connector.catalog.StagingTableCatalog
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect, TableSpec}
import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
import org.apache.spark.sql.execution.datasources.v2.{CreateTableAsSelectExec, DataSourceV2Relation, ReplaceTableAsSelectExec}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import scala.collection.JavaConverters._
Expand All @@ -53,30 +52,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
case _: StagingTableCatalog =>
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
case _ =>
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq

// Include Iceberg compatibility options in table properties (fix for DataFrame writer options)
val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq

val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys

val (tableOptions, writeOptions) = options.partition {
case (key, _) => allTableOptionKeys.contains(key)
}
val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions)

val isPartitionedFormatTable = {
catalog match {
case catalog: FormatTableCatalog =>
catalog.isFormatTable(newTableSpec.provider.orNull) && parts.nonEmpty
case _ => false
}
}

if (isPartitionedFormatTable) {
throw new UnsupportedOperationException(
"Using CTAS with partitioned format table is not supported yet.")
}
val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options)
failIfPartitionedFormatTable(catalog, newTableSpec, parts)

CreateTableAsSelectExec(
catalog.asTableCatalog,
Expand All @@ -89,6 +66,66 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
ifNotExists
) :: Nil
}
case ReplaceTableAsSelect(
ResolvedIdentifier(catalog: SparkCatalog, ident),
parts,
query,
tableSpec: TableSpec,
options,
orCreate,
analyzedQuery) =>
assert(analyzedQuery.isDefined)
catalog match {
case _: StagingTableCatalog =>
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
case _ =>
val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options)

ReplaceTableAsSelectExec(
catalog.asTableCatalog,
ident,
parts,
analyzedQuery.get,
planLater(query),
qualifyLocInTableSpec(newTableSpec),
new CaseInsensitiveStringMap(writeOptions.asJava),
orCreate,
invalidateCache
) :: Nil
}
case _ => Nil
}

private lazy val tableOptionKeys: Seq[String] = {
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq

// Include Iceberg compatibility options in table properties (fix for DataFrame writer options)
val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq

coreOptionKeys ++ icebergOptionKeys
}

private def tableSpecWithOptions(
tableSpec: TableSpec,
options: Map[String, String]): (TableSpec, Map[String, String]) = {
val (tableOptions, writeOptions) = options.partition {
case (key, _) => tableOptionKeys.contains(key)
}
(tableSpec.copy(properties = tableSpec.properties ++ tableOptions), writeOptions)
}

private def failIfPartitionedFormatTable(
catalog: SparkCatalog,
tableSpec: TableSpec,
parts: Seq[_]): Unit = {
if (catalog.isFormatTable(tableSpec.provider.orNull) && parts.nonEmpty) {
throw new UnsupportedOperationException(
"Using CTAS with partitioned format table is not supported yet.")
}
}

private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = {
val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
spark.sharedState.cacheManager.uncacheQuery(spark, v2Relation, true, false)
}
}
Loading