Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .agents/repo-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ Apache Fory is a multi-language serialization framework with multiple wire forma
- `foryc schema.fdl --lang <langs> --output <dir>`
- Never edit generated code manually. Update the source schema or IDL and regenerate.
- Protocol changes must update `docs/specification/**` and the relevant cross-language tests.
- Remote struct `TypeDef` or `TypeMeta` schema limits are resource controls on cold metadata
cache-miss parse/publish paths only. They must not change wire format, registration, dynamic type
loading, unknown-type behavior, deserialization policy, schema-evolution semantics, or metadata
cache-hit/generated-reader hot paths. Count a remote schema version only after the schema-specific
read state has been successfully built and the owning metadata cache can publish it; failed or
incompatible metadata must not consume the limit.
- Remote struct metadata body and field-count limits are also cold-path resource controls.
`maxTypeMetaBytes` limits one received TypeDef or TypeMeta body excluding the 8-byte header and
extended-size varint; `maxTypeFields` limits one received struct metadata body's field count
(Java native TypeDef counts total fields across class layers). Check these before body
copy/decompression and before field-list allocation, and never add cache-hit or generated-reader
hot-path work for them.

## Runtime Map

Expand Down
13 changes: 13 additions & 0 deletions cpp/fory/serialization/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ struct Config {
/// When enabled, avoids duplicating shared objects and handles cycles.
bool track_ref = true;

/// Maximum accepted field count in one received struct TypeMeta.
uint32_t max_type_fields = 512;

/// Maximum accepted body size in one received TypeMeta.
uint32_t max_type_meta_bytes = 4096;

/// Maximum accepted remote struct schema versions for one logical type.
uint32_t max_schema_versions_per_type = 10;

/// Maximum accepted average remote struct schema versions across logical
/// types. The effective global minimum remains 8192 schemas.
uint32_t max_average_schema_versions_per_type = 3;

/// Default constructor with sensible defaults
Config() = default;
};
Expand Down
118 changes: 98 additions & 20 deletions cpp/fory/serialization/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "fory/serialization/type_resolver.h"
#include "fory/thirdparty/MurmurHash3.h"
#include "fory/type/type.h"
#include <algorithm>
#include <cstring>

namespace fory {
namespace serialization {
Expand Down Expand Up @@ -477,8 +479,71 @@ ReadContext::read_enum_type_info(uint32_t base_type_id) {
return Unexpected(Error::type_mismatch(type_id, base_type_id));
}

// Maximum number of parsed type defs to cache (avoid OOM from malicious input)
static constexpr size_t k_max_parsed_num_type_defs = 8192;
static constexpr size_t k_min_remote_struct_schema_limit = 8192;

Result<std::string, Error>
ReadContext::check_remote_struct_schema_limit(const TypeMeta &type_meta) {
const auto type_id = static_cast<TypeId>(type_meta.type_id);
switch (type_id) {
case TypeId::STRUCT:
case TypeId::COMPATIBLE_STRUCT:
case TypeId::NAMED_STRUCT:
case TypeId::NAMED_COMPATIBLE_STRUCT:
break;
default:
return std::string();
}

std::string key;
if (type_meta.register_by_name) {
key.reserve(type_meta.namespace_str.size() + type_meta.type_name.size() +
2);
key.push_back('n');
key.append(type_meta.namespace_str);
key.push_back('\0');
key.append(type_meta.type_name);
} else {
key = "i" + std::to_string(type_meta.user_type_id);
}

auto *entry = remote_schema_versions_by_type_.find(key);
const uint32_t versions_for_type = entry == nullptr ? 0 : entry->second;
if (FORY_PREDICT_FALSE(versions_for_type >=
config_->max_schema_versions_per_type)) {
return Unexpected(Error::invalid_data(
"Remote struct schema versions for one type exceeded "
"max_schema_versions_per_type=" +
std::to_string(config_->max_schema_versions_per_type)));
}

const size_t accepted_type_count =
remote_schema_versions_by_type_.size() + (entry == nullptr ? 1 : 0);
const size_t global_limit = std::max(
k_min_remote_struct_schema_limit,
accepted_type_count *
static_cast<size_t>(config_->max_average_schema_versions_per_type));
if (FORY_PREDICT_FALSE(total_accepted_schema_versions_ >= global_limit)) {
return Unexpected(Error::invalid_data(
"Remote struct schema versions exceeded global limit from "
"max_average_schema_versions_per_type=" +
std::to_string(config_->max_average_schema_versions_per_type)));
}

return key;
}

void ReadContext::record_remote_struct_schema(const std::string &type_key) {
if (type_key.empty()) {
return;
}
auto *entry = remote_schema_versions_by_type_.find(type_key);
if (entry == nullptr) {
remote_schema_versions_by_type_[type_key] = 1;
} else {
++entry->second;
}
++total_accepted_schema_versions_;
}

Result<const TypeInfo *, Error> ReadContext::read_type_meta() {
Error error;
Expand Down Expand Up @@ -531,8 +596,12 @@ Result<const TypeInfo *, Error> ReadContext::read_type_meta() {
}

// Not in cache - parse the TypeMeta
FORY_TRY(parsed_meta,
TypeMeta::from_bytes_with_header(*buffer_, meta_header));
const uint32_t type_def_start =
buffer_->reader_index() - static_cast<uint32_t>(sizeof(int64_t));
FORY_TRY(parsed_meta, TypeMeta::from_bytes_with_header(
*buffer_, meta_header, config_->max_type_fields,
config_->max_type_meta_bytes));
const uint32_t type_def_end = buffer_->reader_index();

// Find local TypeInfo to get field_id mapping (optional for schema evolution)
const TypeInfo *local_type_info = nullptr;
Expand All @@ -557,6 +626,24 @@ Result<const TypeInfo *, Error> ReadContext::read_type_meta() {
}
}

if (local_type_info) {
const auto &local_type_def = local_type_info->type_def;
const size_t remote_type_def_size =
static_cast<size_t>(type_def_end - type_def_start);
if (local_type_def.size() == remote_type_def_size &&
std::memcmp(local_type_def.data(), buffer_->data() + type_def_start,
remote_type_def_size) == 0) {
parsed_type_infos_[meta_header] = local_type_info;
has_last_meta_header_ = true;
last_meta_header_ = meta_header;
last_meta_type_info_ = local_type_info;
reading_type_infos_.push_back(local_type_info);
return local_type_info;
}
}

FORY_TRY(remote_schema_key, check_remote_struct_schema_limit(*parsed_meta));

// Create TypeInfo with field_ids assigned
auto type_info = std::make_unique<TypeInfo>();
if (local_type_info) {
Expand All @@ -583,21 +670,13 @@ Result<const TypeInfo *, Error> ReadContext::read_type_meta() {
type_info->type_meta = std::move(parsed_meta);
}

// get raw pointer before moving into storage
const TypeInfo *raw_ptr = type_info.get();

// Store in primary storage
if (parsed_type_infos_.size() < k_max_parsed_num_type_defs) {
cached_type_infos_.push_back(std::move(type_info));
raw_ptr = cached_type_infos_.back().get();
parsed_type_infos_[meta_header] = raw_ptr;
has_last_meta_header_ = true;
last_meta_header_ = meta_header;
last_meta_type_info_ = raw_ptr;
} else {
owned_reading_type_infos_.push_back(std::move(type_info));
raw_ptr = owned_reading_type_infos_.back().get();
}
cached_type_infos_.push_back(std::move(type_info));
const TypeInfo *raw_ptr = cached_type_infos_.back().get();
parsed_type_infos_[meta_header] = raw_ptr;
has_last_meta_header_ = true;
last_meta_header_ = meta_header;
last_meta_type_info_ = raw_ptr;
record_remote_struct_schema(remote_schema_key);

reading_type_infos_.push_back(raw_ptr);
return raw_ptr;
Expand Down Expand Up @@ -677,7 +756,6 @@ void ReadContext::reset() {
ref_reader_.reset();
}
reading_type_infos_.clear();
owned_reading_type_infos_.clear();
current_dyn_depth_ = 0;
if (meta_string_table_active_) {
meta_string_table_.reset();
Expand Down
12 changes: 9 additions & 3 deletions cpp/fory/serialization/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "fory/util/result.h"

#include <cassert>
#include <string>
#include <typeindex>

namespace fory {
Expand All @@ -39,6 +40,7 @@ namespace serialization {
// Forward declarations
class TypeResolver;
class ReadContext;
class TypeMeta;

/// RAII helper to automatically decrease dynamic depth when leaving scope.
/// Used for tracking nested polymorphic type deserialization depth.
Expand Down Expand Up @@ -656,6 +658,10 @@ class ReadContext {
inline const Config &config() const { return *config_; }

private:
FORY_NOINLINE Result<std::string, Error>
check_remote_struct_schema_limit(const TypeMeta &type_meta);
void record_remote_struct_schema(const std::string &type_key);

// Error state - accumulated during deserialization, checked at the end
Error error_;

Expand All @@ -666,14 +672,14 @@ class ReadContext {
uint32_t current_dyn_depth_;

// Meta sharing state (for compatible mode)
// Per-message storage for TypeInfo objects not cached across messages.
std::vector<std::unique_ptr<TypeInfo>> owned_reading_type_infos_;
// Persistent cache storage for TypeInfo objects keyed by meta header.
std::vector<std::unique_ptr<TypeInfo>> cached_type_infos_;
// Index-based access (pointers to owned_reading_type_infos_ or type_resolver)
// Index-based access (pointers to cached_type_infos_ or type_resolver)
std::vector<const TypeInfo *> reading_type_infos_;
// Cache by meta_header (pointers to cached_type_infos_)
fory::flat_hash_map<int64_t, const TypeInfo *> parsed_type_infos_;
fory::flat_hash_map<std::string, uint32_t> remote_schema_versions_by_type_;
size_t total_accepted_schema_versions_ = 0;
// Fast path for repeated type meta headers.
int64_t last_meta_header_ = 0;
const TypeInfo *last_meta_type_info_ = nullptr;
Expand Down
31 changes: 31 additions & 0 deletions cpp/fory/serialization/fory.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,37 @@ class ForyBuilder {
return *this;
}

/// Set maximum accepted field count in one received struct TypeMeta.
ForyBuilder &max_type_fields(uint32_t max_fields) {
FORY_CHECK(max_fields > 0) << "max_type_fields must be positive";
config_.max_type_fields = max_fields;
return *this;
}

/// Set maximum accepted body size in one received TypeMeta.
ForyBuilder &max_type_meta_bytes(uint32_t max_bytes) {
FORY_CHECK(max_bytes > 0) << "max_type_meta_bytes must be positive";
config_.max_type_meta_bytes = max_bytes;
return *this;
}

/// Set maximum accepted remote struct schema versions for one logical type.
ForyBuilder &max_schema_versions_per_type(uint32_t max_versions) {
FORY_CHECK(max_versions > 0)
<< "max_schema_versions_per_type must be positive";
config_.max_schema_versions_per_type = max_versions;
return *this;
}

/// Set maximum accepted average remote struct schema versions across logical
/// types. The effective global minimum remains 8192 schemas.
ForyBuilder &max_average_schema_versions_per_type(uint32_t max_versions) {
FORY_CHECK(max_versions > 0)
<< "max_average_schema_versions_per_type must be positive";
config_.max_average_schema_versions_per_type = max_versions;
return *this;
}

/// Provide a custom type resolver instance.
ForyBuilder &type_resolver(std::shared_ptr<TypeResolver> resolver) {
type_resolver_ = std::move(resolver);
Expand Down
103 changes: 103 additions & 0 deletions cpp/fory/serialization/serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,63 @@ TEST(SerializationTest, RegistrationByNameFailureDoesNotLeakTypeInfo) {
EXPECT_EQ(dotted_type_name.error().code(), ErrorCode::Invalid);
}

static std::vector<uint8_t> make_remote_type_meta(const std::string &type_name,
const std::string &field) {
std::vector<FieldInfo> fields;
fields.emplace_back(
field, FieldType(static_cast<uint32_t>(TypeId::VARINT32), false));
TypeMeta meta =
TypeMeta::from_fields(static_cast<uint32_t>(TypeId::NAMED_STRUCT),
"example", type_name, true, 0, std::move(fields));
auto bytes = meta.to_bytes();
EXPECT_TRUE(bytes.ok()) << "TypeMeta serialization failed: "
<< bytes.error().to_string();
return bytes.value();
}

static Result<const TypeInfo *, Error>
append_and_read_type_meta(ReadContext &ctx, const std::vector<uint8_t> &bytes) {
Buffer buffer;
buffer.write_var_uint32(0);
buffer.write_bytes(bytes.data(), static_cast<uint32_t>(bytes.size()));
ctx.reset();
ctx.attach(buffer);
auto result = ctx.read_type_meta();
ctx.detach();
return result;
}

TEST(SerializationTest, RemoteSchemaLimitRejectsExtraVersions) {
Config config;
config.compatible = true;
config.max_schema_versions_per_type = 1;
ReadContext ctx(config, std::make_unique<TypeResolver>());

auto first = append_and_read_type_meta(
ctx, make_remote_type_meta("Unknown", "first_value"));
ASSERT_TRUE(first.ok()) << first.error().to_string();

auto second = append_and_read_type_meta(
ctx, make_remote_type_meta("Unknown", "second_value"));
EXPECT_FALSE(second.ok());
ASSERT_FALSE(second.ok());
EXPECT_EQ(second.error().code(), ErrorCode::InvalidData);
}

TEST(SerializationTest, RemoteSchemaLimitKeepsUnknownTypesSeparate) {
Config config;
config.compatible = true;
config.max_schema_versions_per_type = 1;
ReadContext ctx(config, std::make_unique<TypeResolver>());

auto first = append_and_read_type_meta(
ctx, make_remote_type_meta("UnknownA", "value"));
ASSERT_TRUE(first.ok()) << first.error().to_string();
auto second = append_and_read_type_meta(
ctx, make_remote_type_meta("UnknownB", "value"));
EXPECT_TRUE(second.ok()) << second.error().to_string();
}

TEST(SerializationTest, TypeMetaRejectsOverConsumedDeclaredSize) {
TypeMeta meta =
TypeMeta::from_fields(static_cast<uint32_t>(TypeId::STRUCT), "", "S",
Expand Down Expand Up @@ -1021,6 +1078,52 @@ TEST(SerializationTest, TypeMetaHeaderUses52BitMetadataHash) {
parsed.value()->get_hash());
}

TEST(SerializationTest, TypeMetaRejectsMaxTypeFields) {
std::vector<FieldInfo> fields;
fields.emplace_back(
"first", FieldType(static_cast<uint32_t>(TypeId::VARINT32), false));
fields.emplace_back(
"second", FieldType(static_cast<uint32_t>(TypeId::VARINT32), false));
TypeMeta meta = TypeMeta::from_fields(static_cast<uint32_t>(TypeId::STRUCT),
"", "S", false, 1, std::move(fields));
auto bytes_result = meta.to_bytes();
ASSERT_TRUE(bytes_result.ok())
<< "TypeMeta serialization failed: " << bytes_result.error().to_string();
std::vector<uint8_t> bytes = bytes_result.value();
Buffer buffer(bytes);
Error error;
int64_t header = 0;
buffer.read_bytes(&header, sizeof(header), error);
ASSERT_TRUE(error.ok()) << error.to_string();

auto parsed = TypeMeta::from_bytes_with_header(buffer, header, 1, 4096);
ASSERT_FALSE(parsed.ok());
EXPECT_NE(parsed.error().to_string().find("max_type_fields"),
std::string::npos);
}

TEST(SerializationTest, TypeMetaRejectsMaxTypeMetaBytes) {
std::vector<FieldInfo> fields;
fields.emplace_back(
"value", FieldType(static_cast<uint32_t>(TypeId::VARINT32), false));
TypeMeta meta = TypeMeta::from_fields(static_cast<uint32_t>(TypeId::STRUCT),
"", "S", false, 1, std::move(fields));
auto bytes_result = meta.to_bytes();
ASSERT_TRUE(bytes_result.ok())
<< "TypeMeta serialization failed: " << bytes_result.error().to_string();
std::vector<uint8_t> bytes = bytes_result.value();
Buffer buffer(bytes);
Error error;
int64_t header = 0;
buffer.read_bytes(&header, sizeof(header), error);
ASSERT_TRUE(error.ok()) << error.to_string();

auto parsed = TypeMeta::from_bytes_with_header(buffer, header, 512, 1);
ASSERT_FALSE(parsed.ok());
EXPECT_NE(parsed.error().to_string().find("max_type_meta_bytes"),
std::string::npos);
}

TEST(SerializationTest, TypeMetaRejectsBodyOnlyHeaderHash) {
TypeMeta meta =
TypeMeta::from_fields(static_cast<uint32_t>(TypeId::STRUCT), "", "S",
Expand Down
Loading
Loading