Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ class ORCFileReader::Impl {

return Init();
}

liborc::Reader* GetRawORCReader() {
return reader_.get();
}
Status Init() {
int64_t nstripes = reader_->getNumberOfStripes();
stripes_.resize(static_cast<size_t>(nstripes));
Expand Down Expand Up @@ -569,6 +571,10 @@ Result<std::unique_ptr<ORCFileReader>> ORCFileReader::Open(
return result;
}

liborc::Reader* ORCFileReader::GetRawORCReader() {
return impl_->GetRawORCReader();
}

Result<std::shared_ptr<const KeyValueMetadata>> ORCFileReader::ReadMetadata() {
return impl_->ReadMetadata();
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/adapters/orc/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <vector>

#include "arrow/adapters/orc/options.h"
#include "arrow/adapters/orc/util.h"
#include "arrow/io/interfaces.h"
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
Expand Down Expand Up @@ -60,7 +61,8 @@ class ARROW_EXPORT ORCFileReader {
/// \return the returned reader object
static Result<std::unique_ptr<ORCFileReader>> Open(
const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool);

/// \brief Get ORC reader from inside.
liborc::Reader* GetRawORCReader();
/// \brief Return the schema read from the ORC file
///
/// \return the returned Schema object
Expand Down
25 changes: 24 additions & 1 deletion cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1423,9 +1423,32 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
}
return total;
}
Result<int64_t> RecordBatchCountRows(int i) override {
DCHECK_GE(i, 0);
DCHECK_LT(i, num_record_batches());

ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
ARROW_ASSIGN_OR_RAISE(auto outer_message,
ReadMessageFromBlock(block));

auto metadata = outer_message->metadata();

const flatbuf::Message* message = nullptr;
RETURN_NOT_OK(
internal::VerifyMessage(metadata->data(), metadata->size(), &message));

auto batch = message->header_as_RecordBatch();

if (batch == nullptr) {
return Status::IOError(
"Header-type of flatbuffer-encoded Message is not RecordBatch.");
}

return batch->length();
}

Status Open(const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
const IpcReadOptions& options) {
const IpcReadOptions& options) {
owned_file_ = file;
metadata_cache_ = std::make_shared<io::internal::ReadRangeCache>(
file, file->io_context(), options.pre_buffer_cache_options);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class ARROW_EXPORT RecordBatchFileReader
/// \param[in] i the index of the record batch to return
/// \return the read batch
virtual Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) = 0;

virtual Result<int64_t> RecordBatchCountRows(int i) = 0;
/// \brief Read a particular record batch along with its custom metadata from the file.
/// Does not copy memory if the input source supports zero-copy.
///
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ using BufferVector = std::vector<std::shared_ptr<Buffer>>;

class DataType;
class Field;
class FieldRef;
class FieldRef; /// NOLINT(bugprone-forward-declaration-namespace)
class KeyValueMetadata;
enum class Endianness;
class Schema;
Expand Down
15 changes: 8 additions & 7 deletions cpp/src/arrow/util/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#endif
#include <cstdlib>
#include <iostream>
#include <sstream>

#ifdef ARROW_USE_GLOG

Expand Down Expand Up @@ -65,33 +66,33 @@ class CerrLog {
public:
explicit CerrLog(ArrowLogLevel severity) : severity_(severity), has_logged_(false) {}

virtual ~CerrLog() {
virtual ~CerrLog() noexcept(false) {
if (has_logged_) {
std::cerr << std::endl;
stream << std::endl;
}
if (severity_ == ArrowLogLevel::ARROW_FATAL) {
PrintBackTrace();
std::abort();
throw std::runtime_error(stream.str());
}
}

std::ostream& Stream() {
has_logged_ = true;
return std::cerr;
return stream;
}

template <class T>
CerrLog& operator<<(const T& t) {
if (severity_ != ArrowLogLevel::ARROW_DEBUG) {
has_logged_ = true;
std::cerr << t;
stream << t;
}
return *this;
}

protected:
const ArrowLogLevel severity_;
bool has_logged_;
std::stringstream stream;

void PrintBackTrace() {
#ifdef ARROW_WITH_BACKTRACE
Expand Down Expand Up @@ -250,7 +251,7 @@ std::ostream& ArrowLog::Stream() {

bool ArrowLog::IsEnabled() const { return is_enabled_; }

ArrowLog::~ArrowLog() {
ArrowLog::~ArrowLog() noexcept(false) {
if (logging_provider_ != nullptr) {
delete reinterpret_cast<LoggingProvider*>(logging_provider_);
logging_provider_ = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/util/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ enum class ArrowLogLevel : int {
// This is also a null log which does not output anything.
class ARROW_EXPORT ArrowLogBase {
public:
virtual ~ArrowLogBase() {}
virtual ~ArrowLogBase() noexcept(false) {}

virtual bool IsEnabled() const { return false; }

Expand All @@ -168,7 +168,7 @@ class ARROW_EXPORT ArrowLogBase {
class ARROW_EXPORT ArrowLog : public ArrowLogBase {
public:
ArrowLog(const char* file_name, int line_number, ArrowLogLevel severity);
~ArrowLog() override;
~ArrowLog() noexcept(false) override;

/// Return whether or not current logging instance is enabled.
///
Expand Down
33 changes: 28 additions & 5 deletions cpp/src/arrow/util/mutex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include <mutex>

#ifndef _WIN32
#if !defined( _WIN32) && !defined(__ppc64__)
# include <pthread.h>
# include <atomic>
#endif
Expand Down Expand Up @@ -59,7 +59,7 @@ Mutex::Guard Mutex::Lock() {

Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {}

#ifndef _WIN32
#if !defined( _WIN32) && !defined(__ppc64__)
namespace {

struct AfterForkState {
Expand All @@ -71,19 +71,42 @@ struct AfterForkState {
// The leak (only in child processes) is a small price to pay for robustness.
Mutex* mutex = nullptr;

enum State {
INITIALIZED,
IN_PROCESS,
NOT_INITIALIZED,
};

std::atomic_int state = INITIALIZED;

private:
AfterForkState() {
pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork);
}

static void AfterFork() { instance.mutex = new Mutex; }
static void AfterFork() { instance.state.store(NOT_INITIALIZED); }

};

AfterForkState AfterForkState::instance;
} // namespace

Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; }
#endif // _WIN32
Mutex* GlobalForkSafeMutex() {
if (AfterForkState::instance.state.load() == AfterForkState::State::INITIALIZED) {
return AfterForkState::instance.mutex;
}

int expected = AfterForkState::State::NOT_INITIALIZED;
if (AfterForkState::instance.state.compare_exchange_strong(expected, AfterForkState::State::IN_PROCESS)) {
AfterForkState::instance.mutex = new Mutex;
AfterForkState::instance.state.store(AfterForkState::State::INITIALIZED);
} else {
while (AfterForkState::instance.state.load() != AfterForkState::State::INITIALIZED);
}

return AfterForkState::instance.mutex;
}
#endif // _WIN32 and __ppc64__

} // namespace util
} // namespace arrow
2 changes: 1 addition & 1 deletion cpp/src/arrow/util/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ARROW_EXPORT Mutex {
std::unique_ptr<Impl, void (*)(Impl*)> impl_;
};

#ifndef _WIN32
#if !defined(_WIN32) && !defined(__ppc64__)
/// Return a pointer to a process-wide, process-specific Mutex that can be used
/// at any point in a child process. NULL is returned when called in the parent.
///
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -640,10 +640,23 @@ Status MapToSchemaField(const GroupNode& group, LevelInfo current_levels,
return Status::Invalid("Key-value map node must have 1 or 2 child elements. Found: ",
key_value.field_count());
}

/*
* If Parquet file was written by Flink, key type of map column is allowed to be optional, like this:
* optional group event_info (MAP) {
* repeated group key_value {
* optional binary key (UTF8);
* optional binary value (UTF8);
* }
* }
*
* Refer to: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/types/#constructured-data-types
const Node& key_node = *key_value.field(0);
if (!key_node.is_required()) {
return Status::Invalid("Map keys must be annotated as required.");
}
*/

// Arrow doesn't support 1 column maps (i.e. Sets). The options are to either
// make the values column nullable, or process the map as a list. We choose the
// latter as it is simpler.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1664,7 +1664,7 @@ class DeltaBitPackDecoder : public TypedDecoderImpl<DType> {
T min_delta_;
uint32_t mini_block_idx_;
std::shared_ptr<ResizableBuffer> delta_bit_widths_;
int delta_bit_width_;
int delta_bit_width_ = 0;

T last_value_;
};
Expand Down
Loading