diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 1bf1f10db..deff35899 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -261,6 +261,7 @@ if(ICEBERG_BUILD_BUNDLE) arrow/arrow_io.cc arrow/s3/arrow_s3_file_io.cc arrow/arrow_register.cc + arrow/literal_util.cc arrow/metadata_column_util.cc avro/avro_data_util.cc avro/avro_direct_decoder.cc diff --git a/src/iceberg/arrow/literal_util.cc b/src/iceberg/arrow/literal_util.cc new file mode 100644 index 000000000..7abf1bd48 --- /dev/null +++ b/src/iceberg/arrow/literal_util.cc @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/literal_util_internal.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/formatter.h" // IWYU pragma: keep +#include "iceberg/util/macros.h" + +namespace iceberg::arrow { + +namespace { + +Result> ToArrowType(const PrimitiveType& type) { + switch (type.type_id()) { + case TypeId::kBoolean: + return ::arrow::boolean(); + case TypeId::kInt: + return ::arrow::int32(); + case TypeId::kLong: + return ::arrow::int64(); + case TypeId::kFloat: + return ::arrow::float32(); + case TypeId::kDouble: + return ::arrow::float64(); + case TypeId::kDecimal: { + const auto& decimal_type = internal::checked_cast(type); + return ::arrow::decimal128(decimal_type.precision(), decimal_type.scale()); + } + case TypeId::kDate: + return ::arrow::date32(); + case TypeId::kTime: + return ::arrow::time64(::arrow::TimeUnit::MICRO); + case TypeId::kTimestamp: + return ::arrow::timestamp(::arrow::TimeUnit::MICRO); + case TypeId::kTimestampTz: + return ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC"); + case TypeId::kTimestampNs: + return ::arrow::timestamp(::arrow::TimeUnit::NANO); + case TypeId::kTimestampTzNs: + return ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"); + case TypeId::kString: + return ::arrow::utf8(); + case TypeId::kBinary: + return ::arrow::binary(); + case TypeId::kFixed: { + const auto& fixed_type = internal::checked_cast(type); + return ::arrow::fixed_size_binary(static_cast(fixed_type.length())); + } + case TypeId::kUuid: + return ::arrow::fixed_size_binary(16); + default: + return NotSupported("Cannot convert {} to an Arrow type", type); + } +} + +Result> ToArrowBuffer( + const std::vector& bytes) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(std::unique_ptr<::arrow::Buffer> buffer, + ::arrow::AllocateBuffer(bytes.size())); + std::memcpy(buffer->mutable_data(), bytes.data(), bytes.size()); + return std::shared_ptr<::arrow::Buffer>(std::move(buffer)); +} + +} // namespace + +Result> ToArrowScalar(const Literal& literal) { + if (literal.type() == nullptr) { + return InvalidArgument("Cannot convert a literal without type to an Arrow scalar"); + } + + if (literal.IsAboveMax() || literal.IsBelowMin()) { + return NotSupported("Cannot convert {} to an Arrow scalar", literal); + } + + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::DataType> arrow_type, + ToArrowType(*literal.type())); + if (literal.IsNull()) { + return ::arrow::MakeNullScalar(std::move(arrow_type)); + } + + const Literal::Value& value = literal.value(); + switch (literal.type()->type_id()) { + case TypeId::kBoolean: + return std::make_shared<::arrow::BooleanScalar>(std::get(value)); + case TypeId::kInt: + return std::make_shared<::arrow::Int32Scalar>(std::get(value)); + case TypeId::kLong: + return std::make_shared<::arrow::Int64Scalar>(std::get(value)); + case TypeId::kFloat: + return std::make_shared<::arrow::FloatScalar>(std::get(value)); + case TypeId::kDouble: + return std::make_shared<::arrow::DoubleScalar>(std::get(value)); + case TypeId::kDecimal: { + const auto& decimal = std::get(value); + ::arrow::Decimal128 arrow_decimal( + static_cast(decimal.value() >> 64), + static_cast(decimal.value() & ~uint64_t{0})); + return std::make_shared<::arrow::Decimal128Scalar>(arrow_decimal, + std::move(arrow_type)); + } + case TypeId::kDate: + return std::make_shared<::arrow::Date32Scalar>(std::get(value)); + case TypeId::kTime: + return std::make_shared<::arrow::Time64Scalar>(std::get(value), + std::move(arrow_type)); + case TypeId::kTimestamp: + case TypeId::kTimestampTz: + case TypeId::kTimestampNs: + case TypeId::kTimestampTzNs: + return std::make_shared<::arrow::TimestampScalar>(std::get(value), + std::move(arrow_type)); + case TypeId::kString: + return std::make_shared<::arrow::StringScalar>(std::get(value)); + case TypeId::kBinary: { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::Buffer> buffer, + ToArrowBuffer(std::get>(value))); + return std::make_shared<::arrow::BinaryScalar>(std::move(buffer)); + } + case TypeId::kFixed: { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::Buffer> buffer, + ToArrowBuffer(std::get>(value))); + return std::make_shared<::arrow::FixedSizeBinaryScalar>(std::move(buffer), + std::move(arrow_type)); + } + case TypeId::kUuid: { + const Uuid& uuid = std::get(value); + ICEBERG_ASSIGN_OR_RAISE( + std::shared_ptr<::arrow::Buffer> buffer, + ToArrowBuffer(std::vector(uuid.bytes().begin(), uuid.bytes().end()))); + return std::make_shared<::arrow::FixedSizeBinaryScalar>(std::move(buffer), + std::move(arrow_type)); + } + default: + return NotSupported("Cannot convert {} literal to an Arrow scalar", + *literal.type()); + } +} + +Result> MakeDefaultArray( + const Literal& literal, const std::shared_ptr<::arrow::DataType>& type, + int64_t num_rows, ::arrow::MemoryPool* pool) { + // An extension type (e.g. `arrow.uuid` for an Iceberg UUID) is backed by a storage + // type, and compute::Cast has no kernel that casts a storage array into an extension + // type. Materialize the array as the storage type and wrap it in the extension type. + if (type->id() == ::arrow::Type::EXTENSION) { + const auto& extension_type = + internal::checked_cast(*type); + ICEBERG_ASSIGN_OR_RAISE( + std::shared_ptr<::arrow::Array> storage, + MakeDefaultArray(literal, extension_type.storage_type(), num_rows, pool)); + return ::arrow::ExtensionType::WrapArray(type, storage); + } + + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::Scalar> scalar, + ToArrowScalar(literal)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(std::shared_ptr<::arrow::Array> array, + ::arrow::MakeArrayFromScalar(*scalar, num_rows, pool)); + if (!array->type()->Equals(*type)) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(::arrow::Datum cast_result, + ::arrow::compute::Cast(array, type)); + return cast_result.make_array(); + } + return array; +} + +Status AppendDefaultToBuilder(const Literal& literal, ::arrow::ArrayBuilder* builder) { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::Scalar> scalar, + ToArrowScalar(literal)); + if (!scalar->type->Equals(*builder->type())) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(scalar, scalar->CastTo(builder->type())); + } + ICEBERG_ARROW_RETURN_NOT_OK(builder->AppendScalar(*scalar)); + return {}; +} + +} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/literal_util_internal.h b/src/iceberg/arrow/literal_util_internal.h new file mode 100644 index 000000000..26813b80e --- /dev/null +++ b/src/iceberg/arrow/literal_util_internal.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include + +#include "iceberg/expression/literal.h" +#include "iceberg/result.h" + +namespace iceberg::arrow { + +/// \brief Convert a primitive literal to an Arrow scalar of its canonical Arrow type. +/// +/// A null literal converts to a null scalar of the corresponding Arrow type. +Result> ToArrowScalar(const Literal& literal); + +/// \brief Create an Arrow array of `num_rows` rows where every row holds the literal +/// value, e.g. to materialize a missing column with a default value. +/// +/// The array is cast to `type` when the literal's canonical Arrow type differs. +Result> MakeDefaultArray( + const Literal& literal, const std::shared_ptr<::arrow::DataType>& type, + int64_t num_rows, ::arrow::MemoryPool* pool); + +/// \brief Append the literal value once to `builder`, e.g. to materialize a missing +/// field with a default value while building rows. +Status AppendDefaultToBuilder(const Literal& literal, ::arrow::ArrayBuilder* builder); + +} // namespace iceberg::arrow diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc index 0c7c6c2ca..b6473edcc 100644 --- a/src/iceberg/parquet/parquet_data_util.cc +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -24,6 +24,7 @@ #include #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/literal_util_internal.h" #include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_data_util_internal.h" #include "iceberg/schema.h" @@ -119,6 +120,11 @@ Result> ProjectStructArray( ICEBERG_ASSIGN_OR_RAISE( projected_array, MakeNullArray(output_arrow_type, struct_array->length(), pool)); + } else if (field_projection.kind == FieldProjection::Kind::kDefault) { + ICEBERG_ASSIGN_OR_RAISE( + projected_array, + arrow::MakeDefaultArray(std::get(field_projection.from), + output_arrow_type, struct_array->length(), pool)); } else if (field_projection.kind == FieldProjection::Kind::kMetadata) { int32_t field_id = projected_field.field_id(); if (field_id == MetadataColumns::kFilePathColumnId) { diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index a5629198f..2a1fc0f2f 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -341,6 +341,10 @@ Result ProjectStruct( child_projection, ProjectField(field, parquet_field, iter->second.local_index)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (field.initial_default() != nullptr) { + // Rows written before the field existed assume its `initial-default` value. + child_projection.kind = FieldProjection::Kind::kDefault; + child_projection.from = *field.initial_default(); } else if (field.optional()) { child_projection.kind = FieldProjection::Kind::kNull; } else { diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index fcbc22126..99fc824d2 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -253,7 +253,8 @@ if(ICEBERG_BUILD_BUNDLE) data_writer_test.cc delete_filter_test.cc delete_loader_test.cc - file_scan_task_reader_test.cc) + file_scan_task_reader_test.cc + literal_util_test.cc) endif() diff --git a/src/iceberg/test/literal_util_test.cc b/src/iceberg/test/literal_util_test.cc new file mode 100644 index 000000000..073732964 --- /dev/null +++ b/src/iceberg/test/literal_util_test.cc @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/literal_util_internal.h" +#include "iceberg/expression/literal.h" +#include "iceberg/test/matchers.h" +#include "iceberg/type.h" +#include "iceberg/util/uuid.h" + +namespace iceberg::arrow { + +namespace { + +struct ToArrowScalarParam { + std::string test_name; + Literal literal; + std::shared_ptr<::arrow::Scalar> expected; +}; + +class ToArrowScalarTest : public ::testing::TestWithParam {}; + +TEST_P(ToArrowScalarTest, ConvertsTypeAndValue) { + const auto& param = GetParam(); + + ICEBERG_UNWRAP_OR_FAIL(auto scalar, ToArrowScalar(param.literal)); + ASSERT_TRUE(scalar->type->Equals(*param.expected->type)) + << "actual type: " << scalar->type->ToString(); + ASSERT_TRUE(scalar->Equals(*param.expected)) << "actual value: " << scalar->ToString(); +} + +std::shared_ptr<::arrow::Buffer> BufferOf(const std::vector& bytes) { + return ::arrow::Buffer::FromString(std::string(bytes.begin(), bytes.end())); +} + +const std::vector kUuidBytes = {0xF7, 0x9C, 0x3E, 0x09, 0x67, 0x7C, 0x4B, 0xBD, + 0xA4, 0x79, 0x3F, 0x34, 0x9C, 0xB7, 0x85, 0xE7}; + +INSTANTIATE_TEST_SUITE_P( + AllPrimitiveTypes, ToArrowScalarTest, + ::testing::Values( + ToArrowScalarParam{"Boolean", Literal::Boolean(true), + std::make_shared<::arrow::BooleanScalar>(true)}, + ToArrowScalarParam{"Int", Literal::Int(42), + std::make_shared<::arrow::Int32Scalar>(42)}, + ToArrowScalarParam{"Long", Literal::Long(42), + std::make_shared<::arrow::Int64Scalar>(42)}, + ToArrowScalarParam{"Float", Literal::Float(1.5F), + std::make_shared<::arrow::FloatScalar>(1.5F)}, + ToArrowScalarParam{"Double", Literal::Double(2.5), + std::make_shared<::arrow::DoubleScalar>(2.5)}, + ToArrowScalarParam{"String", Literal::String("iceberg"), + std::make_shared<::arrow::StringScalar>("iceberg")}, + ToArrowScalarParam{ + "Binary", Literal::Binary({0x01, 0x02}), + std::make_shared<::arrow::BinaryScalar>(BufferOf({0x01, 0x02}))}, + ToArrowScalarParam{"Fixed", Literal::Fixed({0xAB, 0xCD}), + std::make_shared<::arrow::FixedSizeBinaryScalar>( + BufferOf({0xAB, 0xCD}), ::arrow::fixed_size_binary(2))}, + ToArrowScalarParam{"Uuid", Literal::UUID(Uuid::FromBytes(kUuidBytes).value()), + std::make_shared<::arrow::FixedSizeBinaryScalar>( + BufferOf(kUuidBytes), ::arrow::fixed_size_binary(16))}, + ToArrowScalarParam{ + "Decimal", Literal::Decimal(12345, /*precision=*/9, /*scale=*/2), + std::make_shared<::arrow::Decimal128Scalar>(::arrow::Decimal128(12345), + ::arrow::decimal128(9, 2))}, + ToArrowScalarParam{ + "NegativeDecimal", Literal::Decimal(-12345, /*precision=*/9, /*scale=*/2), + std::make_shared<::arrow::Decimal128Scalar>(::arrow::Decimal128(-12345), + ::arrow::decimal128(9, 2))}, + ToArrowScalarParam{"Date", Literal::Date(19000), + std::make_shared<::arrow::Date32Scalar>(19000)}, + ToArrowScalarParam{"Time", Literal::Time(3600000000), + std::make_shared<::arrow::Time64Scalar>( + 3600000000, ::arrow::time64(::arrow::TimeUnit::MICRO))}, + ToArrowScalarParam{ + "Timestamp", Literal::Timestamp(1672531200000000), + std::make_shared<::arrow::TimestampScalar>( + 1672531200000000, ::arrow::timestamp(::arrow::TimeUnit::MICRO))}, + ToArrowScalarParam{"TimestampTz", Literal::TimestampTz(1672531200000000), + std::make_shared<::arrow::TimestampScalar>( + 1672531200000000, + ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC"))}, + ToArrowScalarParam{"TimestampNs", Literal::TimestampNs(1672531200000000000), + std::make_shared<::arrow::TimestampScalar>( + 1672531200000000000, + ::arrow::timestamp(::arrow::TimeUnit::NANO))}, + ToArrowScalarParam{"TimestampTzNs", Literal::TimestampTzNs(1672531200000000000), + std::make_shared<::arrow::TimestampScalar>( + 1672531200000000000, + ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"))}), + [](const ::testing::TestParamInfo& info) { + return info.param.test_name; + }); + +TEST(LiteralUtilTest, NullLiteralBecomesNullScalar) { + ICEBERG_UNWRAP_OR_FAIL(auto scalar, ToArrowScalar(Literal::Null(iceberg::int32()))); + ASSERT_TRUE(scalar->type->Equals(*::arrow::int32())); + ASSERT_FALSE(scalar->is_valid); +} + +TEST(LiteralUtilTest, SentinelLiteralsAreRejected) { + // Casting to a narrower type may produce AboveMax/BelowMin sentinels; they have + // no value to materialize. + ICEBERG_UNWRAP_OR_FAIL( + auto above_max, + Literal::Long(std::numeric_limits::max()).CastTo(iceberg::int32())); + ASSERT_TRUE(above_max.IsAboveMax()); + ASSERT_THAT(ToArrowScalar(above_max), IsError(ErrorKind::kNotSupported)); +} + +TEST(LiteralUtilTest, MakeDefaultArrayFillsAllRows) { + ICEBERG_UNWRAP_OR_FAIL( + auto array, MakeDefaultArray(Literal::Int(7), ::arrow::int32(), /*num_rows=*/3, + ::arrow::default_memory_pool())); + ASSERT_EQ(array->length(), 3); + ASSERT_EQ(array->null_count(), 0); + const auto& int_array = static_cast(*array); + for (int64_t i = 0; i < 3; i++) { + ASSERT_EQ(int_array.Value(i), 7); + } +} + +TEST(LiteralUtilTest, MakeDefaultArrayCastsToTargetType) { + // The target Arrow type prevails when it differs from the literal's natural type. + ICEBERG_UNWRAP_OR_FAIL( + auto array, MakeDefaultArray(Literal::Int(7), ::arrow::int64(), /*num_rows=*/2, + ::arrow::default_memory_pool())); + ASSERT_TRUE(array->type()->Equals(*::arrow::int64())); + const auto& long_array = static_cast(*array); + ASSERT_EQ(long_array.Value(0), 7); +} + +TEST(LiteralUtilTest, MakeDefaultArrayWrapsExtensionType) { + // An Iceberg UUID maps to the `arrow.uuid` extension type (storage is + // fixed_size_binary(16)). compute::Cast cannot target an extension type, so the + // default array must be built as the storage type and wrapped in the extension type. + auto uuid = Uuid::FromBytes(kUuidBytes).value(); + ICEBERG_UNWRAP_OR_FAIL( + auto array, MakeDefaultArray(Literal::UUID(uuid), ::arrow::extension::uuid(), + /*num_rows=*/3, ::arrow::default_memory_pool())); + ASSERT_TRUE(array->type()->Equals(*::arrow::extension::uuid())); + ASSERT_EQ(array->length(), 3); + ASSERT_EQ(array->null_count(), 0); + + const auto& extension_array = static_cast(*array); + const auto& storage = + static_cast(*extension_array.storage()); + for (int64_t i = 0; i < 3; i++) { + ASSERT_EQ(storage.GetView(i), + std::string_view(reinterpret_cast(kUuidBytes.data()), + kUuidBytes.size())); + } +} + +TEST(LiteralUtilTest, AppendDefaultToBuilderAppendsOneValue) { + ::arrow::Int64Builder builder; + ASSERT_THAT(AppendDefaultToBuilder(Literal::Long(42), &builder), IsOk()); + ASSERT_THAT(AppendDefaultToBuilder(Literal::Long(42), &builder), IsOk()); + + std::shared_ptr<::arrow::Array> array; + ASSERT_TRUE(builder.Finish(&array).ok()); + ASSERT_EQ(array->length(), 2); + const auto& long_array = static_cast(*array); + ASSERT_EQ(long_array.Value(0), 42); + ASSERT_EQ(long_array.Value(1), 42); +} + +} // namespace + +} // namespace iceberg::arrow diff --git a/src/iceberg/test/parquet_data_test.cc b/src/iceberg/test/parquet_data_test.cc index 606ad8ca5..e300d69b4 100644 --- a/src/iceberg/test/parquet_data_test.cc +++ b/src/iceberg/test/parquet_data_test.cc @@ -28,6 +28,7 @@ #include #include +#include "iceberg/expression/literal.h" #include "iceberg/parquet/parquet_data_util_internal.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" @@ -409,6 +410,68 @@ TEST(ProjectRecordBatchTest, StructWithMissingOptionalField) { input_json, expected_json)); } +TEST(ProjectRecordBatchTest, StructWithMissingDefaultFields) { + Schema projected_schema({ + SchemaField::MakeRequired(1, "id", int32()), + // Missing required field with an initial-default: filled with the default. + SchemaField(2, "score", int64(), /*optional=*/false, /*doc=*/{}, + std::make_shared(Literal::Long(100))), + // Missing optional field with an initial-default: also filled, not null. + SchemaField(3, "grade", string(), /*optional=*/true, /*doc=*/{}, + std::make_shared(Literal::String("A"))), + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "id", int32()), + }); + + const std::string input_json = R"([ + {"id": 1}, + {"id": 2} + ])"; + const std::string expected_json = R"([ + {"id": 1, "score": 100, "grade": "A"}, + {"id": 2, "score": 100, "grade": "A"} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + +TEST(ProjectRecordBatchTest, NestedStructWithMissingDefaultField) { + Schema projected_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired( + 2, "person", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", string()), + // Missing in source; filled with its initial-default. + SchemaField(4, "age", int32(), /*optional=*/false, /*doc=*/{}, + std::make_shared(Literal::Int(18))), + })), + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "person", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", string()), + })), + }); + + const std::string input_json = R"([ + {"id": 100, "person": {"name": "Employee0"}}, + {"id": 101, "person": {"name": "Employee1"}} + ])"; + const std::string expected_json = R"([ + {"id": 100, "person": {"name": "Employee0", "age": 18}}, + {"id": 101, "person": {"name": "Employee1", "age": 18}} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + TEST(ProjectRecordBatchTest, NestedStructWithMissingOptionalFields) { Schema projected_schema({ SchemaField::MakeRequired(1, "id", int32()), diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index ee1cbc931..c0bef5349 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -38,6 +38,7 @@ #include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/expression/literal.h" #include "iceberg/file_reader.h" #include "iceberg/file_writer.h" #include "iceberg/metadata_columns.h" @@ -262,6 +263,35 @@ TEST_F(ParquetReaderTest, ReadTwoFields) { ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } +TEST_F(ParquetReaderTest, ReadMissingFieldsWithDefaults) { + // The file contains only fields 1 and 2; the projected schema adds fields 3 and 4 + // with initial-defaults, which are filled for all rows written before the columns + // existed. + CreateSimpleParquetFile(); + + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string()), + SchemaField(3, "score", int64(), /*optional=*/false, /*doc=*/{}, + std::make_shared(Literal::Long(100))), + SchemaField(4, "status", string(), /*optional=*/true, /*doc=*/{}, + std::make_shared(Literal::String("active"))), + }); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kParquet, + {.path = temp_parquet_file_, .io = file_io_, .projection = schema}); + ASSERT_THAT(reader_result, IsOk()) + << "Failed to create reader: " << reader_result.error().message; + auto reader = std::move(reader_result.value()); + + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, + R"([[1, "Foo", 100, "active"], + [2, "Bar", 100, "active"], + [3, "Baz", 100, "active"]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + TEST_F(ParquetReaderTest, RoundTripWithGenericFileIO) { auto file_io = std::make_shared(); auto path = CreateNewTempFilePathWithSuffix(".parquet");