From 1d35940e6221d8eaa68d55d1bd9992181313174b Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Mon, 18 May 2026 23:02:40 +0200 Subject: [PATCH 1/2] Cherry-pick of https://github.com/Altinity/ClickHouse/pull/1800 with unresolved conflict markers (resolution in next commit) --- Original cherry-pick message follows: Merge pull request #1800 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-100334 Antalya 26.3: Change the interface for Iceberg inserts with the catalog # Conflicts: # src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h # src/TableFunctions/TableFunctionObjectStorage.cpp # tests/integration/test_database_iceberg/test.py --- .../DataLakes/DataLakeConfiguration.h | 374 ++++++++++++++++++ .../TableFunctionObjectStorage.cpp | 10 + .../integration/test_database_iceberg/test.py | 4 + 3 files changed, 388 insertions(+) diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 5e8e40ccfd13..3f31cd00648a 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -443,7 +443,381 @@ using StorageHDFSPaimonConfiguration = DataLakeConfiguration; +<<<<<<< HEAD using StorageLocalPaimonConfiguration = DataLakeConfiguration; +======= +using StorageLocalPaimonConfiguration = DataLakeConfiguration; + +/// Class detects storage type by `storage_type` parameter if exists +/// and uses appropriate implementation - S3, Azure, HDFS or Local +class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, public std::enable_shared_from_this +{ + friend class StorageObjectStorageConfiguration; + +public: + StorageIcebergConfiguration() {} + + explicit StorageIcebergConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {} + + void initialize( + ASTs & engine_args, + ContextPtr local_context, + bool with_table_structure, + const StorageID * table_id = nullptr) override + { + createDynamicConfiguration(engine_args, local_context); + getImpl().initialize(engine_args, local_context, with_table_structure, table_id); + } + + ObjectStorageType getType() const override { return getImpl().getType(); } + + std::string getTypeName() const override { return getImpl().getTypeName(); } + std::string getEngineName() const override { return getImpl().getEngineName(); } + std::string getNamespaceType() const override { return getImpl().getNamespaceType(); } + + Path getRawPath() const override { return getImpl().getRawPath(); } + void setRawPath(const Path & path) override { getImpl().setRawPath(path); } + const String & getRawURI() const override { return getImpl().getRawURI(); } + const Path & getPathForRead() const override { return getImpl().getPathForRead(); } + Path getPathForWrite(const std::string & partition_id) const override { return getImpl().getPathForWrite(partition_id); } + + void setPathForRead(const Path & path) override { getImpl().setPathForRead(path); } + + const Paths & getPaths() const override { return getImpl().getPaths(); } + void setPaths(const Paths & paths) override { getImpl().setPaths(paths); } + + String getDataSourceDescription() const override { return getImpl().getDataSourceDescription(); } + String getNamespace() const override { return getImpl().getNamespace(); } + + StorageObjectStorageQuerySettings getQuerySettings(const ContextPtr & context) const override + { return getImpl().getQuerySettings(context); } + + void addStructureAndFormatToArgsIfNeeded( + ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) override + { getImpl().addStructureAndFormatToArgsIfNeeded(args, structure_, format_, context, with_structure); } + + bool isNamespaceWithGlobs() const override { return getImpl().isNamespaceWithGlobs(); } + + bool isArchive() const override { return getImpl().isArchive(); } + bool isPathInArchiveWithGlobs() const override { return getImpl().isPathInArchiveWithGlobs(); } + std::string getPathInArchive() const override { return getImpl().getPathInArchive(); } + + void check(ContextPtr context) override { getImpl().check(context); } + void validateNamespace(const String & name) const override { getImpl().validateNamespace(name); } + + ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly, CredentialsConfigurationCallback refresh_credentials_callback) override + { return getImpl().createObjectStorage(context, is_readonly, refresh_credentials_callback); } + bool isStaticConfiguration() const override { return getImpl().isStaticConfiguration(); } + + bool isDataLakeConfiguration() const override { return getImpl().isDataLakeConfiguration(); } + + bool supportsTotalRows(ContextPtr context, ObjectStorageType storage_type) const override { return getImpl().supportsTotalRows(context, storage_type); } + std::optional totalRows(ContextPtr context) override { return getImpl().totalRows(context); } + bool supportsTotalBytes(ContextPtr context, ObjectStorageType storage_type) const override { return getImpl().supportsTotalBytes(context, storage_type); } + std::optional totalBytes(ContextPtr context) override { return getImpl().totalBytes(context); } + bool isDataSortedBySortingKey(StorageMetadataPtr storage_metadata, ContextPtr context) const override + { return getImpl().isDataSortedBySortingKey(storage_metadata, context); } + + IDataLakeMetadata * getExternalMetadata() override { return getImpl().getExternalMetadata(); } + + std::shared_ptr getInitialSchemaByPath(ContextPtr context, ObjectInfoPtr object_info) const override + { return getImpl().getInitialSchemaByPath(context, object_info); } + + std::shared_ptr getSchemaTransformer(ContextPtr context, ObjectInfoPtr object_info) const override + { return getImpl().getSchemaTransformer(context, object_info); } + + void modifyFormatSettings(FormatSettings & settings_, const Context & context) const override + { getImpl().modifyFormatSettings(settings_, context); } + + void addDeleteTransformers( + ObjectInfoPtr object_info, + QueryPipelineBuilder & builder, + const std::optional & format_settings, + FormatParserSharedResourcesPtr parser_shared_resources, + ContextPtr local_context) const override + { getImpl().addDeleteTransformers(object_info, builder, format_settings, parser_shared_resources, local_context); } + + ReadFromFormatInfo prepareReadingFromFormat( + ObjectStoragePtr object_storage, + const Strings & requested_columns, + const StorageSnapshotPtr & storage_snapshot, + bool supports_subset_of_columns, + bool supports_tuple_elements, + ContextPtr local_context, + const PrepareReadingFromFormatHiveParams & hive_parameters) override + { + return getImpl().prepareReadingFromFormat( + object_storage, + requested_columns, + storage_snapshot, + supports_subset_of_columns, + supports_tuple_elements, + local_context, + hive_parameters); + } + + void setSchemaHash(const String & hash) override { getImpl().setSchemaHash(hash); } + + void initPartitionStrategy(ASTPtr partition_by, const ColumnsDescription & columns, ContextPtr context) override + { getImpl().initPartitionStrategy(partition_by, columns, context); } + + std::optional getTableStateSnapshot(ContextPtr local_context) const override { return getImpl().getTableStateSnapshot(local_context); } + std::unique_ptr buildStorageMetadataFromState(const DataLakeTableStateSnapshot & state, ContextPtr local_context) const override + { return getImpl().buildStorageMetadataFromState(state, local_context); } + bool shouldReloadSchemaForConsistency(ContextPtr local_context) const override { return getImpl().shouldReloadSchemaForConsistency(local_context); } + std::optional tryGetTableStructureFromMetadata(ContextPtr local_context) const override + { return getImpl().tryGetTableStructureFromMetadata(local_context); } + + bool supportsFileIterator() const override { return getImpl().supportsFileIterator(); } + bool supportsParallelInsert() const override { return getImpl().supportsParallelInsert(); } + bool supportsWrites() const override { return getImpl().supportsWrites(); } + + bool supportsPartialPathPrefix() const override { return getImpl().supportsPartialPathPrefix(); } + + ObjectIterator iterate( + const ActionsDAG * filter_dag, + IDataLakeMetadata::FileProgressCallback callback, + size_t list_batch_size, + StorageMetadataPtr storage_metadata, + ContextPtr context) override + { + return getImpl().iterate(filter_dag, callback, list_batch_size, storage_metadata, context); + } + + void update( + ObjectStoragePtr object_storage_ptr, + ContextPtr context) override + { + getImpl().update(object_storage_ptr, context); + } + void lazyInitializeIfNeeded(ObjectStoragePtr object_storage, ContextPtr local_context) override + { return getImpl().lazyInitializeIfNeeded(object_storage, local_context); } + + void create( + ObjectStoragePtr object_storage, + ContextPtr local_context, + const std::optional & columns, + ASTPtr partition_by, + ASTPtr order_by, + bool if_not_exists, + std::shared_ptr catalog, + const StorageID & table_id_) override + { + getImpl().create(object_storage, local_context, columns, partition_by, order_by, if_not_exists, catalog, table_id_); + } + + SinkToStoragePtr write( + SharedHeader sample_block, + const StorageID & table_id, + ObjectStoragePtr object_storage, + const std::optional & format_settings, + ContextPtr context, + std::shared_ptr catalog) override + { + return getImpl().write(sample_block, table_id, object_storage, format_settings, context, catalog); + } + + bool supportsDelete() const override { return getImpl().supportsDelete(); } + void mutate(const MutationCommands & commands, + ContextPtr context, + const StorageID & storage_id, + StorageMetadataPtr metadata_snapshot, + std::shared_ptr catalog, + const std::optional & format_settings) override + { + getImpl().mutate(commands, context, storage_id, metadata_snapshot, catalog, format_settings); + } + void checkMutationIsPossible(const MutationCommands & commands) override { getImpl().checkMutationIsPossible(commands); } + + void checkAlterIsPossible(const AlterCommands & commands) override { getImpl().checkAlterIsPossible(commands); } + + void alter(const AlterCommands & params, ContextPtr context) override { getImpl().alter(params, context); } + + const DataLakeStorageSettings & getDataLakeSettings() const override { return getImpl().getDataLakeSettings(); } + + ASTPtr createArgsWithAccessData() const override + { + return getImpl().createArgsWithAccessData(); + } + + void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override + { getImpl().fromNamedCollection(collection, context); } + void fromAST(ASTs & args, ContextPtr context, bool with_structure) override + { getImpl().fromAST(args, context, with_structure); } + void fromDisk(const String & disk_name, ASTs & args, ContextPtr context, bool with_structure) override + { getImpl().fromDisk(disk_name, args, context, with_structure); } + + /// Find storage_type argument and remove it from args if exists. + /// Return storage type. + ObjectStorageType extractDynamicStorageType(ASTs & args, ContextPtr context, ASTPtr * type_arg, bool cluster_name_first) const override + { + static const auto * const storage_type_name = "storage_type"; + + { + auto args_copy = args; + if (cluster_name_first) + { + // Remove cluster name from args to avoid confusing cluster name and named collection name + args_copy.erase(args_copy.begin()); + } + + if (auto named_collection = tryGetNamedCollectionWithOverrides(args_copy, context)) + { + if (named_collection->has(storage_type_name)) + { + return objectStorageTypeFromString(named_collection->get(storage_type_name)); + } + } + } + + auto type_it = args.end(); + + /// S3 by default for backward compatibility + /// Iceberg without storage_type == IcebergS3 + ObjectStorageType type = ObjectStorageType::S3; + + for (auto arg_it = args.begin(); arg_it != args.end(); ++arg_it) + { + const auto * type_ast_function = (*arg_it)->as(); + + if (type_ast_function && type_ast_function->name == "equals" + && type_ast_function->arguments && type_ast_function->arguments->children.size() == 2) + { + auto * name = type_ast_function->arguments->children[0]->as(); + + if (name && name->name() == storage_type_name) + { + if (type_it != args.end()) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake can have only one key-value argument: storage_type='type'."); + } + + auto * value = type_ast_function->arguments->children[1]->as(); + + if (!value) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake parameter 'storage_type' has wrong type, string literal expected."); + } + + if (value->value.getType() != Field::Types::String) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake parameter 'storage_type' has wrong value type, string expected."); + } + + type = objectStorageTypeFromString(value->value.safeGet()); + + type_it = arg_it; + } + } + } + + if (type_it != args.end()) + { + if (type_arg) + *type_arg = *type_it; + args.erase(type_it); + } + + return type; + } + + const String & getFormat() const override { return getImpl().getFormat(); } + const String & getCompressionMethod() const override { return getImpl().getCompressionMethod(); } + const String & getStructure() const override { return getImpl().getStructure(); } + + PartitionStrategyFactory::StrategyType getPartitionStrategyType() const override { return getImpl().getPartitionStrategyType(); } + bool getPartitionColumnsInDataFile() const override { return getImpl().getPartitionColumnsInDataFile(); } + std::shared_ptr getPartitionStrategy() const override { return getImpl().getPartitionStrategy(); } + + void setFormat(const String & format_) override { getImpl().setFormat(format_); } + void setCompressionMethod(const String & compression_method_) override { getImpl().setCompressionMethod(compression_method_); } + void setStructure(const String & structure_) override { getImpl().setStructure(structure_); } + + void setPartitionStrategyType(PartitionStrategyFactory::StrategyType partition_strategy_type_) override + { getImpl().setPartitionStrategyType(partition_strategy_type_); } + void setPartitionColumnsInDataFile(bool partition_columns_in_data_file_) override + { getImpl().setPartitionColumnsInDataFile(partition_columns_in_data_file_); } + void setPartitionStrategy(const std::shared_ptr & partition_strategy_) override + { getImpl().setPartitionStrategy(partition_strategy_); } + + void assertInitialized() const override { getImpl().assertInitialized(); } + + ColumnMapperPtr getColumnMapperForObject(ObjectInfoPtr obj) const override { return getImpl().getColumnMapperForObject(obj); } + + ColumnMapperPtr getColumnMapperForCurrentSchema(StorageMetadataPtr storage_metadata_snapshot, ContextPtr context) const override + { return getImpl().getColumnMapperForCurrentSchema(storage_metadata_snapshot, context); } + + std::shared_ptr getCatalog(ContextPtr context, const StorageID & table_id) const override + { return getImpl().getCatalog(context, table_id); } + + bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional & format_settings) override + { return getImpl().optimize(metadata_snapshot, context, format_settings); } + + bool supportsPrewhere() const override { return getImpl().supportsPrewhere(); } + + void drop(ContextPtr context) override { getImpl().drop(context); } + +protected: + void createDynamicConfiguration(ASTs & args, ContextPtr context) + { + ObjectStorageType type = extractDynamicStorageType(args, context, nullptr, false); + createDynamicStorage(type); + } + +private: + inline StorageObjectStorageConfiguration & getImpl() const + { + if (!impl) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Dynamic DataLake storage not initialized"); + + return *impl; + } + + void createDynamicStorage(ObjectStorageType type) + { + if (impl) + { + if (impl->getType() == type) + return; + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't change datalake engine storage"); + } + + switch (type) + { +# if USE_AWS_S3 + case ObjectStorageType::S3: + impl = std::make_unique(settings); + break; +# endif +# if USE_AZURE_BLOB_STORAGE + case ObjectStorageType::Azure: + impl = std::make_unique(settings); + break; +# endif +# if USE_HDFS + case ObjectStorageType::HDFS: + impl = std::make_unique(settings); + break; +# endif + case ObjectStorageType::Local: + impl = std::make_unique(settings); + break; + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsuported DataLake storage {}", type); + } + } + + StorageObjectStorageConfigurationPtr impl; + DataLakeStorageSettingsPtr settings; +}; +>>>>>>> cf7ae691c5b (Merge pull request #1800 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-100334) #endif #if USE_PARQUET diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index c555834b7207..fd612b962ebe 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -264,7 +264,17 @@ StoragePtr TableFunctionObjectStorage:: ConstraintsDescription{}, partition_by, context, +<<<<<<< HEAD /* is_table_function */true); +======= + /* comment */ String{}, + /* format_settings */ std::nullopt, /// No format_settings + /* mode */ LoadingStrictnessLevel::CREATE, + /* catalog */ nullptr, + /* if_not_exists */ false, + /* is_datalake_query*/ false, + /* is_table_function */ true); +>>>>>>> cf7ae691c5b (Merge pull request #1800 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-100334) storage->startup(); return storage; diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 5a08265699ef..185a2b02e4d2 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -136,7 +136,11 @@ def create_clickhouse_iceberg_database( node.query( f""" DROP DATABASE IF EXISTS {name}; +<<<<<<< HEAD CREATE DATABASE {name} ENGINE = DataLakeCatalog('{BASE_URL}', 'minio', '{minio_secret_key}') +======= +CREATE DATABASE {name} ENGINE = {engine}('{BASE_URL}', 'minio', '{minio_secret_key}') +>>>>>>> cf7ae691c5b (Merge pull request #1800 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-100334) SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} """, settings={ From 27116c7c1f3350172af0b6fed426ef43dad652a9 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Sun, 7 Jun 2026 21:45:15 +0200 Subject: [PATCH 2/2] Resolve conflicts in cherry-pick of #1800 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Kept `antalya-26.4` (ours) side for all three conflicted files: - `DataLakeConfiguration.h`: "theirs" side added `/* is_cluster_supported */ false` to `StorageLocalPaimonConfiguration` and a large `StorageIcebergConfiguration` class — neither is in source PR #1800's diff; they came from other changes bundled in the merge commit. - `TableFunctionObjectStorage.cpp`: "theirs" added extra `StorageObjectStorageCluster` constructor args (`comment`, `format_settings`, `mode`, `catalog`, etc.) that don't exist in `antalya-26.4`'s `StorageObjectStorageCluster` signature. - `test.py`: "theirs" used `{engine}` f-string variable which does not exist as a parameter in `antalya-26.4`'s `create_clickhouse_iceberg_database`; `antalya-26.4` uses `DataLakeCatalog` directly. The source PR's actual changes (new `getCatalog` signature taking `StorageID`, new include directives, `create_clickhouse_iceberg_table` refactor, settings handling changes) all applied cleanly without conflicts. --- .../DataLakes/DataLakeConfiguration.h | 374 ------------------ .../TableFunctionObjectStorage.cpp | 10 - .../integration/test_database_iceberg/test.py | 4 - 3 files changed, 388 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 3f31cd00648a..5e8e40ccfd13 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -443,381 +443,7 @@ using StorageHDFSPaimonConfiguration = DataLakeConfiguration; -<<<<<<< HEAD using StorageLocalPaimonConfiguration = DataLakeConfiguration; -======= -using StorageLocalPaimonConfiguration = DataLakeConfiguration; - -/// Class detects storage type by `storage_type` parameter if exists -/// and uses appropriate implementation - S3, Azure, HDFS or Local -class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, public std::enable_shared_from_this -{ - friend class StorageObjectStorageConfiguration; - -public: - StorageIcebergConfiguration() {} - - explicit StorageIcebergConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {} - - void initialize( - ASTs & engine_args, - ContextPtr local_context, - bool with_table_structure, - const StorageID * table_id = nullptr) override - { - createDynamicConfiguration(engine_args, local_context); - getImpl().initialize(engine_args, local_context, with_table_structure, table_id); - } - - ObjectStorageType getType() const override { return getImpl().getType(); } - - std::string getTypeName() const override { return getImpl().getTypeName(); } - std::string getEngineName() const override { return getImpl().getEngineName(); } - std::string getNamespaceType() const override { return getImpl().getNamespaceType(); } - - Path getRawPath() const override { return getImpl().getRawPath(); } - void setRawPath(const Path & path) override { getImpl().setRawPath(path); } - const String & getRawURI() const override { return getImpl().getRawURI(); } - const Path & getPathForRead() const override { return getImpl().getPathForRead(); } - Path getPathForWrite(const std::string & partition_id) const override { return getImpl().getPathForWrite(partition_id); } - - void setPathForRead(const Path & path) override { getImpl().setPathForRead(path); } - - const Paths & getPaths() const override { return getImpl().getPaths(); } - void setPaths(const Paths & paths) override { getImpl().setPaths(paths); } - - String getDataSourceDescription() const override { return getImpl().getDataSourceDescription(); } - String getNamespace() const override { return getImpl().getNamespace(); } - - StorageObjectStorageQuerySettings getQuerySettings(const ContextPtr & context) const override - { return getImpl().getQuerySettings(context); } - - void addStructureAndFormatToArgsIfNeeded( - ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) override - { getImpl().addStructureAndFormatToArgsIfNeeded(args, structure_, format_, context, with_structure); } - - bool isNamespaceWithGlobs() const override { return getImpl().isNamespaceWithGlobs(); } - - bool isArchive() const override { return getImpl().isArchive(); } - bool isPathInArchiveWithGlobs() const override { return getImpl().isPathInArchiveWithGlobs(); } - std::string getPathInArchive() const override { return getImpl().getPathInArchive(); } - - void check(ContextPtr context) override { getImpl().check(context); } - void validateNamespace(const String & name) const override { getImpl().validateNamespace(name); } - - ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly, CredentialsConfigurationCallback refresh_credentials_callback) override - { return getImpl().createObjectStorage(context, is_readonly, refresh_credentials_callback); } - bool isStaticConfiguration() const override { return getImpl().isStaticConfiguration(); } - - bool isDataLakeConfiguration() const override { return getImpl().isDataLakeConfiguration(); } - - bool supportsTotalRows(ContextPtr context, ObjectStorageType storage_type) const override { return getImpl().supportsTotalRows(context, storage_type); } - std::optional totalRows(ContextPtr context) override { return getImpl().totalRows(context); } - bool supportsTotalBytes(ContextPtr context, ObjectStorageType storage_type) const override { return getImpl().supportsTotalBytes(context, storage_type); } - std::optional totalBytes(ContextPtr context) override { return getImpl().totalBytes(context); } - bool isDataSortedBySortingKey(StorageMetadataPtr storage_metadata, ContextPtr context) const override - { return getImpl().isDataSortedBySortingKey(storage_metadata, context); } - - IDataLakeMetadata * getExternalMetadata() override { return getImpl().getExternalMetadata(); } - - std::shared_ptr getInitialSchemaByPath(ContextPtr context, ObjectInfoPtr object_info) const override - { return getImpl().getInitialSchemaByPath(context, object_info); } - - std::shared_ptr getSchemaTransformer(ContextPtr context, ObjectInfoPtr object_info) const override - { return getImpl().getSchemaTransformer(context, object_info); } - - void modifyFormatSettings(FormatSettings & settings_, const Context & context) const override - { getImpl().modifyFormatSettings(settings_, context); } - - void addDeleteTransformers( - ObjectInfoPtr object_info, - QueryPipelineBuilder & builder, - const std::optional & format_settings, - FormatParserSharedResourcesPtr parser_shared_resources, - ContextPtr local_context) const override - { getImpl().addDeleteTransformers(object_info, builder, format_settings, parser_shared_resources, local_context); } - - ReadFromFormatInfo prepareReadingFromFormat( - ObjectStoragePtr object_storage, - const Strings & requested_columns, - const StorageSnapshotPtr & storage_snapshot, - bool supports_subset_of_columns, - bool supports_tuple_elements, - ContextPtr local_context, - const PrepareReadingFromFormatHiveParams & hive_parameters) override - { - return getImpl().prepareReadingFromFormat( - object_storage, - requested_columns, - storage_snapshot, - supports_subset_of_columns, - supports_tuple_elements, - local_context, - hive_parameters); - } - - void setSchemaHash(const String & hash) override { getImpl().setSchemaHash(hash); } - - void initPartitionStrategy(ASTPtr partition_by, const ColumnsDescription & columns, ContextPtr context) override - { getImpl().initPartitionStrategy(partition_by, columns, context); } - - std::optional getTableStateSnapshot(ContextPtr local_context) const override { return getImpl().getTableStateSnapshot(local_context); } - std::unique_ptr buildStorageMetadataFromState(const DataLakeTableStateSnapshot & state, ContextPtr local_context) const override - { return getImpl().buildStorageMetadataFromState(state, local_context); } - bool shouldReloadSchemaForConsistency(ContextPtr local_context) const override { return getImpl().shouldReloadSchemaForConsistency(local_context); } - std::optional tryGetTableStructureFromMetadata(ContextPtr local_context) const override - { return getImpl().tryGetTableStructureFromMetadata(local_context); } - - bool supportsFileIterator() const override { return getImpl().supportsFileIterator(); } - bool supportsParallelInsert() const override { return getImpl().supportsParallelInsert(); } - bool supportsWrites() const override { return getImpl().supportsWrites(); } - - bool supportsPartialPathPrefix() const override { return getImpl().supportsPartialPathPrefix(); } - - ObjectIterator iterate( - const ActionsDAG * filter_dag, - IDataLakeMetadata::FileProgressCallback callback, - size_t list_batch_size, - StorageMetadataPtr storage_metadata, - ContextPtr context) override - { - return getImpl().iterate(filter_dag, callback, list_batch_size, storage_metadata, context); - } - - void update( - ObjectStoragePtr object_storage_ptr, - ContextPtr context) override - { - getImpl().update(object_storage_ptr, context); - } - void lazyInitializeIfNeeded(ObjectStoragePtr object_storage, ContextPtr local_context) override - { return getImpl().lazyInitializeIfNeeded(object_storage, local_context); } - - void create( - ObjectStoragePtr object_storage, - ContextPtr local_context, - const std::optional & columns, - ASTPtr partition_by, - ASTPtr order_by, - bool if_not_exists, - std::shared_ptr catalog, - const StorageID & table_id_) override - { - getImpl().create(object_storage, local_context, columns, partition_by, order_by, if_not_exists, catalog, table_id_); - } - - SinkToStoragePtr write( - SharedHeader sample_block, - const StorageID & table_id, - ObjectStoragePtr object_storage, - const std::optional & format_settings, - ContextPtr context, - std::shared_ptr catalog) override - { - return getImpl().write(sample_block, table_id, object_storage, format_settings, context, catalog); - } - - bool supportsDelete() const override { return getImpl().supportsDelete(); } - void mutate(const MutationCommands & commands, - ContextPtr context, - const StorageID & storage_id, - StorageMetadataPtr metadata_snapshot, - std::shared_ptr catalog, - const std::optional & format_settings) override - { - getImpl().mutate(commands, context, storage_id, metadata_snapshot, catalog, format_settings); - } - void checkMutationIsPossible(const MutationCommands & commands) override { getImpl().checkMutationIsPossible(commands); } - - void checkAlterIsPossible(const AlterCommands & commands) override { getImpl().checkAlterIsPossible(commands); } - - void alter(const AlterCommands & params, ContextPtr context) override { getImpl().alter(params, context); } - - const DataLakeStorageSettings & getDataLakeSettings() const override { return getImpl().getDataLakeSettings(); } - - ASTPtr createArgsWithAccessData() const override - { - return getImpl().createArgsWithAccessData(); - } - - void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override - { getImpl().fromNamedCollection(collection, context); } - void fromAST(ASTs & args, ContextPtr context, bool with_structure) override - { getImpl().fromAST(args, context, with_structure); } - void fromDisk(const String & disk_name, ASTs & args, ContextPtr context, bool with_structure) override - { getImpl().fromDisk(disk_name, args, context, with_structure); } - - /// Find storage_type argument and remove it from args if exists. - /// Return storage type. - ObjectStorageType extractDynamicStorageType(ASTs & args, ContextPtr context, ASTPtr * type_arg, bool cluster_name_first) const override - { - static const auto * const storage_type_name = "storage_type"; - - { - auto args_copy = args; - if (cluster_name_first) - { - // Remove cluster name from args to avoid confusing cluster name and named collection name - args_copy.erase(args_copy.begin()); - } - - if (auto named_collection = tryGetNamedCollectionWithOverrides(args_copy, context)) - { - if (named_collection->has(storage_type_name)) - { - return objectStorageTypeFromString(named_collection->get(storage_type_name)); - } - } - } - - auto type_it = args.end(); - - /// S3 by default for backward compatibility - /// Iceberg without storage_type == IcebergS3 - ObjectStorageType type = ObjectStorageType::S3; - - for (auto arg_it = args.begin(); arg_it != args.end(); ++arg_it) - { - const auto * type_ast_function = (*arg_it)->as(); - - if (type_ast_function && type_ast_function->name == "equals" - && type_ast_function->arguments && type_ast_function->arguments->children.size() == 2) - { - auto * name = type_ast_function->arguments->children[0]->as(); - - if (name && name->name() == storage_type_name) - { - if (type_it != args.end()) - { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "DataLake can have only one key-value argument: storage_type='type'."); - } - - auto * value = type_ast_function->arguments->children[1]->as(); - - if (!value) - { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "DataLake parameter 'storage_type' has wrong type, string literal expected."); - } - - if (value->value.getType() != Field::Types::String) - { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "DataLake parameter 'storage_type' has wrong value type, string expected."); - } - - type = objectStorageTypeFromString(value->value.safeGet()); - - type_it = arg_it; - } - } - } - - if (type_it != args.end()) - { - if (type_arg) - *type_arg = *type_it; - args.erase(type_it); - } - - return type; - } - - const String & getFormat() const override { return getImpl().getFormat(); } - const String & getCompressionMethod() const override { return getImpl().getCompressionMethod(); } - const String & getStructure() const override { return getImpl().getStructure(); } - - PartitionStrategyFactory::StrategyType getPartitionStrategyType() const override { return getImpl().getPartitionStrategyType(); } - bool getPartitionColumnsInDataFile() const override { return getImpl().getPartitionColumnsInDataFile(); } - std::shared_ptr getPartitionStrategy() const override { return getImpl().getPartitionStrategy(); } - - void setFormat(const String & format_) override { getImpl().setFormat(format_); } - void setCompressionMethod(const String & compression_method_) override { getImpl().setCompressionMethod(compression_method_); } - void setStructure(const String & structure_) override { getImpl().setStructure(structure_); } - - void setPartitionStrategyType(PartitionStrategyFactory::StrategyType partition_strategy_type_) override - { getImpl().setPartitionStrategyType(partition_strategy_type_); } - void setPartitionColumnsInDataFile(bool partition_columns_in_data_file_) override - { getImpl().setPartitionColumnsInDataFile(partition_columns_in_data_file_); } - void setPartitionStrategy(const std::shared_ptr & partition_strategy_) override - { getImpl().setPartitionStrategy(partition_strategy_); } - - void assertInitialized() const override { getImpl().assertInitialized(); } - - ColumnMapperPtr getColumnMapperForObject(ObjectInfoPtr obj) const override { return getImpl().getColumnMapperForObject(obj); } - - ColumnMapperPtr getColumnMapperForCurrentSchema(StorageMetadataPtr storage_metadata_snapshot, ContextPtr context) const override - { return getImpl().getColumnMapperForCurrentSchema(storage_metadata_snapshot, context); } - - std::shared_ptr getCatalog(ContextPtr context, const StorageID & table_id) const override - { return getImpl().getCatalog(context, table_id); } - - bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional & format_settings) override - { return getImpl().optimize(metadata_snapshot, context, format_settings); } - - bool supportsPrewhere() const override { return getImpl().supportsPrewhere(); } - - void drop(ContextPtr context) override { getImpl().drop(context); } - -protected: - void createDynamicConfiguration(ASTs & args, ContextPtr context) - { - ObjectStorageType type = extractDynamicStorageType(args, context, nullptr, false); - createDynamicStorage(type); - } - -private: - inline StorageObjectStorageConfiguration & getImpl() const - { - if (!impl) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Dynamic DataLake storage not initialized"); - - return *impl; - } - - void createDynamicStorage(ObjectStorageType type) - { - if (impl) - { - if (impl->getType() == type) - return; - - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't change datalake engine storage"); - } - - switch (type) - { -# if USE_AWS_S3 - case ObjectStorageType::S3: - impl = std::make_unique(settings); - break; -# endif -# if USE_AZURE_BLOB_STORAGE - case ObjectStorageType::Azure: - impl = std::make_unique(settings); - break; -# endif -# if USE_HDFS - case ObjectStorageType::HDFS: - impl = std::make_unique(settings); - break; -# endif - case ObjectStorageType::Local: - impl = std::make_unique(settings); - break; - default: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsuported DataLake storage {}", type); - } - } - - StorageObjectStorageConfigurationPtr impl; - DataLakeStorageSettingsPtr settings; -}; ->>>>>>> cf7ae691c5b (Merge pull request #1800 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-100334) #endif #if USE_PARQUET diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index fd612b962ebe..c555834b7207 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -264,17 +264,7 @@ StoragePtr TableFunctionObjectStorage:: ConstraintsDescription{}, partition_by, context, -<<<<<<< HEAD /* is_table_function */true); -======= - /* comment */ String{}, - /* format_settings */ std::nullopt, /// No format_settings - /* mode */ LoadingStrictnessLevel::CREATE, - /* catalog */ nullptr, - /* if_not_exists */ false, - /* is_datalake_query*/ false, - /* is_table_function */ true); ->>>>>>> cf7ae691c5b (Merge pull request #1800 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-100334) storage->startup(); return storage; diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 185a2b02e4d2..5a08265699ef 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -136,11 +136,7 @@ def create_clickhouse_iceberg_database( node.query( f""" DROP DATABASE IF EXISTS {name}; -<<<<<<< HEAD CREATE DATABASE {name} ENGINE = DataLakeCatalog('{BASE_URL}', 'minio', '{minio_secret_key}') -======= -CREATE DATABASE {name} ENGINE = {engine}('{BASE_URL}', 'minio', '{minio_secret_key}') ->>>>>>> cf7ae691c5b (Merge pull request #1800 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-100334) SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} """, settings={