From 0c8f2177c33c917786dbe5ebf958edb7043376e0 Mon Sep 17 00:00:00 2001 From: chethan Date: Mon, 1 Jun 2026 20:29:02 -0700 Subject: [PATCH 1/7] Restore ClickHouse compatibility APIs --- cpp/src/arrow/adapters/orc/adapter.cc | 8 +++++++- cpp/src/arrow/adapters/orc/adapter.h | 4 +++- cpp/src/arrow/ipc/reader.cc | 25 ++++++++++++++++++++++++- cpp/src/arrow/ipc/reader.h | 2 +- 4 files changed, 35 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index 51cca497485c..434e4cc586b6 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -222,7 +222,9 @@ class ORCFileReader::Impl { return Init(); } - + liborc::Reader* GetRawORCReader() { + return reader_.get(); +} Status Init() { int64_t nstripes = reader_->getNumberOfStripes(); stripes_.resize(static_cast(nstripes)); @@ -569,6 +571,10 @@ Result> ORCFileReader::Open( return result; } +liborc::Reader* ORCFileReader::GetRawORCReader() { + return impl_->GetRawORCReader(); +} + Result> ORCFileReader::ReadMetadata() { return impl_->ReadMetadata(); } diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h index 4ffff81f355f..dc4059702c0c 100644 --- a/cpp/src/arrow/adapters/orc/adapter.h +++ b/cpp/src/arrow/adapters/orc/adapter.h @@ -22,6 +22,7 @@ #include #include "arrow/adapters/orc/options.h" +#include "arrow/adapters/orc/util.h" #include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" @@ -60,7 +61,8 @@ class ARROW_EXPORT ORCFileReader { /// \return the returned reader object static Result> Open( const std::shared_ptr& file, MemoryPool* pool); - + /// \brief Get ORC reader from inside. + liborc::Reader* GetRawORCReader(); /// \brief Return the schema read from the ORC file /// /// \return the returned Schema object diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 046eacb6ced2..50f6361b6507 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1423,9 +1423,32 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } return total; } + Result RecordBatchCountRows(int i) override { + DCHECK_GE(i, 0); + DCHECK_LT(i, num_record_batches()); + + ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i)); + ARROW_ASSIGN_OR_RAISE(auto outer_message, + ReadMessageFromBlock(block)); + + auto metadata = outer_message->metadata(); + + const flatbuf::Message* message = nullptr; + RETURN_NOT_OK( + internal::VerifyMessage(metadata->data(), metadata->size(), &message)); + + auto batch = message->header_as_RecordBatch(); + + if (batch == nullptr) { + return Status::IOError( + "Header-type of flatbuffer-encoded Message is not RecordBatch."); + } + + return batch->length(); +} Status Open(const std::shared_ptr& file, int64_t footer_offset, - const IpcReadOptions& options) { + const IpcReadOptions& options) { owned_file_ = file; metadata_cache_ = std::make_shared( file, file->io_context(), options.pre_buffer_cache_options); diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 888f59a62777..8b9ba971e699 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -189,7 +189,7 @@ class ARROW_EXPORT RecordBatchFileReader /// \param[in] i the index of the record batch to return /// \return the read batch virtual Result> ReadRecordBatch(int i) = 0; - + virtual Result RecordBatchCountRows(int i) = 0; /// \brief Read a particular record batch along with its custom metadata from the file. /// Does not copy memory if the input source supports zero-copy. /// From 46b7f473e02978cc98bbf9ff5012d49bb85aee8a Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Tue, 9 Aug 2022 13:36:58 +0200 Subject: [PATCH 2/7] Merge pull request #14 from ClickHouse/fix-deadlock Fix deadlock with msan (cherry picked from commit b41ff4452944d50a44ad9c6e4621b50f44e9742e) --- cpp/src/arrow/util/mutex.cc | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/mutex.cc b/cpp/src/arrow/util/mutex.cc index 353090b6dda9..b76eafa9810b 100644 --- a/cpp/src/arrow/util/mutex.cc +++ b/cpp/src/arrow/util/mutex.cc @@ -71,18 +71,41 @@ struct AfterForkState { // The leak (only in child processes) is a small price to pay for robustness. Mutex* mutex = nullptr; + enum State { + INITIALIZED, + IN_PROCESS, + NOT_INITIALIZED, + }; + + std::atomic_int state = INITIALIZED; + private: AfterForkState() { pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork); } - static void AfterFork() { instance.mutex = new Mutex; } + static void AfterFork() { instance.state.store(NOT_INITIALIZED); } + }; AfterForkState AfterForkState::instance; } // namespace -Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; } +Mutex* GlobalForkSafeMutex() { + if (AfterForkState::instance.state.load() == AfterForkState::State::INITIALIZED) { + return AfterForkState::instance.mutex; + } + + int expected = AfterForkState::State::NOT_INITIALIZED; + if (AfterForkState::instance.state.compare_exchange_strong(expected, AfterForkState::State::IN_PROCESS)) { + AfterForkState::instance.mutex = new Mutex; + AfterForkState::instance.state.store(AfterForkState::State::INITIALIZED); + } else { + while (AfterForkState::instance.state.load() != AfterForkState::State::INITIALIZED); + } + + return AfterForkState::instance.mutex; +} #endif // _WIN32 } // namespace util From d063c788c9a78dc9f9adfbc53dd6579f3f8bcc27 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Fri, 20 Jan 2023 17:22:41 +0100 Subject: [PATCH 3/7] Merge pull request #16 from ClickHouse/remove-abort-in-logging Don't abort in ~CerrLog (cherry picked from commit d03245f801f798c63ee9a7d2b8914a9e5c5cd666) --- cpp/src/arrow/util/logging.cc | 15 ++++++++------- cpp/src/arrow/util/logging.h | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/util/logging.cc b/cpp/src/arrow/util/logging.cc index 993c5306ca4a..09248bbfbcfb 100644 --- a/cpp/src/arrow/util/logging.cc +++ b/cpp/src/arrow/util/logging.cc @@ -24,6 +24,7 @@ #endif #include #include +#include #ifdef ARROW_USE_GLOG @@ -65,26 +66,25 @@ class CerrLog { public: explicit CerrLog(ArrowLogLevel severity) : severity_(severity), has_logged_(false) {} - virtual ~CerrLog() { + virtual ~CerrLog() noexcept(false) { if (has_logged_) { - std::cerr << std::endl; + stream << std::endl; } if (severity_ == ArrowLogLevel::ARROW_FATAL) { - PrintBackTrace(); - std::abort(); + throw std::runtime_error(stream.str()); } } std::ostream& Stream() { has_logged_ = true; - return std::cerr; + return stream; } template CerrLog& operator<<(const T& t) { if (severity_ != ArrowLogLevel::ARROW_DEBUG) { has_logged_ = true; - std::cerr << t; + stream << t; } return *this; } @@ -92,6 +92,7 @@ class CerrLog { protected: const ArrowLogLevel severity_; bool has_logged_; + std::stringstream stream; void PrintBackTrace() { #ifdef ARROW_WITH_BACKTRACE @@ -250,7 +251,7 @@ std::ostream& ArrowLog::Stream() { bool ArrowLog::IsEnabled() const { return is_enabled_; } -ArrowLog::~ArrowLog() { +ArrowLog::~ArrowLog() noexcept(false) { if (logging_provider_ != nullptr) { delete reinterpret_cast(logging_provider_); logging_provider_ = nullptr; diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h index 460888f6d75e..a1d14f677ff9 100644 --- a/cpp/src/arrow/util/logging.h +++ b/cpp/src/arrow/util/logging.h @@ -149,7 +149,7 @@ enum class ArrowLogLevel : int { // This is also a null log which does not output anything. class ARROW_EXPORT ArrowLogBase { public: - virtual ~ArrowLogBase() {} + virtual ~ArrowLogBase() noexcept(false) {} virtual bool IsEnabled() const { return false; } @@ -168,7 +168,7 @@ class ARROW_EXPORT ArrowLogBase { class ARROW_EXPORT ArrowLog : public ArrowLogBase { public: ArrowLog(const char* file_name, int line_number, ArrowLogLevel severity); - ~ArrowLog() override; + ~ArrowLog() noexcept(false) override; /// Return whether or not current logging instance is enabled. /// From 39bea3fa745d8bf29197befc553679dbe97c5dde Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Wed, 10 Aug 2022 16:18:07 +0200 Subject: [PATCH 4/7] Merge pull request #15 from ClickHouse/try-fix-data-race Fix 'undefined symbol: pthread_atfork' on PowerPC64 (cherry picked from commit 450a5638704386356f8e520080468fc9bc8bcaf8) --- cpp/src/arrow/util/mutex.cc | 6 +++--- cpp/src/arrow/util/mutex.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/util/mutex.cc b/cpp/src/arrow/util/mutex.cc index b76eafa9810b..f5005daaa870 100644 --- a/cpp/src/arrow/util/mutex.cc +++ b/cpp/src/arrow/util/mutex.cc @@ -19,7 +19,7 @@ #include -#ifndef _WIN32 +#if !defined( _WIN32) && !defined(__ppc64__) # include # include #endif @@ -59,7 +59,7 @@ Mutex::Guard Mutex::Lock() { Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {} -#ifndef _WIN32 +#if !defined( _WIN32) && !defined(__ppc64__) namespace { struct AfterForkState { @@ -106,7 +106,7 @@ Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; } -#endif // _WIN32 +#endif // _WIN32 and __ppc64__ } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/util/mutex.h b/cpp/src/arrow/util/mutex.h index ac63cf70cd9a..c32aa8a1ef76 100644 --- a/cpp/src/arrow/util/mutex.h +++ b/cpp/src/arrow/util/mutex.h @@ -60,7 +60,7 @@ class ARROW_EXPORT Mutex { std::unique_ptr impl_; }; -#ifndef _WIN32 +#if !defined(_WIN32) && !defined(__ppc64__) /// Return a pointer to a process-wide, process-specific Mutex that can be used /// at any point in a child process. NULL is returned when called in the parent. /// From fd6dcb3c98e4caf2650f57831abe53635b1e3ac2 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Fri, 3 Nov 2023 14:15:38 +0100 Subject: [PATCH 5/7] Merge pull request #47 from ClickHouse/fix-uninit-value-msan Fix possible use-of-uninitizliaed-value (cherry picked from commit ba5c67934e8274d649befcffab56731632dc5253) --- cpp/src/parquet/decoder.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/decoder.cc b/cpp/src/parquet/decoder.cc index 3ce2323d29a1..8abf9cf1305c 100644 --- a/cpp/src/parquet/decoder.cc +++ b/cpp/src/parquet/decoder.cc @@ -1664,7 +1664,7 @@ class DeltaBitPackDecoder : public TypedDecoderImpl { T min_delta_; uint32_t mini_block_idx_; std::shared_ptr delta_bit_widths_; - int delta_bit_width_; + int delta_bit_width_ = 0; T last_value_; }; From 1662b4611b09c38c8d58f5b29123ad11ff033240 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Wed, 29 Mar 2023 14:38:21 +0200 Subject: [PATCH 6/7] Merge pull request #17 from bigo-sg/allow_map_key_optional Allow Parquet map key to be optional (cherry picked from commit 0d6d07f67625d893412b90c7cf1499bf671b6d2e) --- cpp/src/parquet/arrow/schema.cc | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 266215a8104e..41c80b4b3b33 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -640,10 +640,23 @@ Status MapToSchemaField(const GroupNode& group, LevelInfo current_levels, return Status::Invalid("Key-value map node must have 1 or 2 child elements. Found: ", key_value.field_count()); } + + /* + * If Parquet file was written by Flink, key type of map column is allowed to be optional, like this: + * optional group event_info (MAP) { + * repeated group key_value { + * optional binary key (UTF8); + * optional binary value (UTF8); + * } + * } + * + * Refer to: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/types/#constructured-data-types const Node& key_node = *key_value.field(0); if (!key_node.is_required()) { return Status::Invalid("Map keys must be annotated as required."); } + */ + // Arrow doesn't support 1 column maps (i.e. Sets). The options are to either // make the values column nullable, or process the map as a list. We choose the // latter as it is simpler. From 1dc77b6132fd41ce4a88f700e1c5bf8d080d7015 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Wed, 11 Dec 2024 16:11:52 +0000 Subject: [PATCH 7/7] Fix clang-tidy --- cpp/src/arrow/type_fwd.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h index be26c40dc1f4..5d27fe468de8 100644 --- a/cpp/src/arrow/type_fwd.h +++ b/cpp/src/arrow/type_fwd.h @@ -60,7 +60,7 @@ using BufferVector = std::vector>; class DataType; class Field; -class FieldRef; +class FieldRef; /// NOLINT(bugprone-forward-declaration-namespace) class KeyValueMetadata; enum class Endianness; class Schema;