diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index ea76c641a..b8f39c62e 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -18,8 +18,9 @@ set(ICEBERG_INCLUDES "$" "$") set(ICEBERG_SOURCES - arrow_c_data_util.cc arrow_c_data_guard_internal.cc + arrow_c_data_util.cc + arrow_row_builder.cc catalog/memory/in_memory_catalog.cc catalog/session_catalog.cc catalog/session_context.cc diff --git a/src/iceberg/arrow_c_data_guard_internal.h b/src/iceberg/arrow_c_data_guard_internal.h index 04603b0fe..f624f74c5 100644 --- a/src/iceberg/arrow_c_data_guard_internal.h +++ b/src/iceberg/arrow_c_data_guard_internal.h @@ -31,6 +31,12 @@ class ICEBERG_EXPORT ArrowArrayGuard { explicit ArrowArrayGuard(ArrowArray* array) : array_(array) {} ~ArrowArrayGuard(); + /// \brief Release the guard without calling ArrowArrayRelease. + /// + /// Call this when ownership of the underlying ArrowArray has been + /// transferred elsewhere and the guard should not release it. + void Release() { array_ = nullptr; } + private: ArrowArray* array_; }; diff --git a/src/iceberg/arrow_row_builder.cc b/src/iceberg/arrow_row_builder.cc new file mode 100644 index 000000000..b53c881bb --- /dev/null +++ b/src/iceberg/arrow_row_builder.cc @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include + +#include "iceberg/arrow/nanoarrow_status_internal.h" +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_row_builder_internal.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" + +namespace iceberg { + +Result ArrowRowBuilder::Make(const Schema& schema) { + ArrowSchema arrow_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &arrow_schema)); + internal::ArrowSchemaGuard schema_guard(&arrow_schema); + return Make(&arrow_schema); +} + +Result ArrowRowBuilder::Make(const ArrowSchema* schema) { + ArrowRowBuilder builder; + ArrowError error; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayInitFromSchema(&builder.array_, schema, &error), error); + // Guard the array in case StartAppending fails. + internal::ArrowArrayGuard guard(&builder.array_); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&builder.array_)); + // Ownership stays with the builder — disarm the guard. + guard.Release(); + return builder; +} + +ArrowRowBuilder::ArrowRowBuilder(ArrowRowBuilder&& other) noexcept + : array_(other.array_) { + other.array_.release = nullptr; +} + +ArrowRowBuilder& ArrowRowBuilder::operator=(ArrowRowBuilder&& other) noexcept { + if (this != &other) { + if (array_.release != nullptr) { + ArrowArrayRelease(&array_); + } + array_ = other.array_; + other.array_.release = nullptr; + } + return *this; +} + +ArrowRowBuilder::~ArrowRowBuilder() { + if (array_.release != nullptr) { + ArrowArrayRelease(&array_); + } +} + +int64_t ArrowRowBuilder::num_columns() const { return array_.n_children; } + +ArrowArray* ArrowRowBuilder::column(int64_t index) { + if (index < 0 || index >= array_.n_children) { + return nullptr; + } + return array_.children[index]; +} + +Status ArrowRowBuilder::FinishRow() { + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&array_)); + return {}; +} + +Result ArrowRowBuilder::Finish() && { + ArrowError error; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayFinishBuildingDefault(&array_, &error), error); + ArrowArray result = array_; + array_.release = nullptr; + return result; +} + +Status AppendNull(ArrowArray* array) { + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1)); + return {}; +} + +Status AppendBoolean(ArrowArray* array, bool value) { + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(array, value ? 1 : 0)); + return {}; +} + +Status AppendInt(ArrowArray* array, int64_t value) { + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(array, value)); + return {}; +} + +Status AppendString(ArrowArray* array, std::string_view value) { + ArrowStringView view(value.data(), static_cast(value.size())); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(array, view)); + return {}; +} + +Status AppendStringMap(ArrowArray* array, + const std::unordered_map& entries) { + // A nanoarrow map array is a list of struct. children[0] is the + // entries struct, whose children[0]/children[1] are the key/value builders. + ArrowArray* struct_array = array->children[0]; + ArrowArray* key_array = struct_array->children[0]; + ArrowArray* value_array = struct_array->children[1]; + + for (const auto& [key, value] : entries) { + ICEBERG_RETURN_UNEXPECTED(AppendString(key_array, key)); + ICEBERG_RETURN_UNEXPECTED(AppendString(value_array, value)); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(struct_array)); + } + + // Finish the (possibly empty) map element on the outer list. + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(array)); + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/arrow_row_builder_internal.h b/src/iceberg/arrow_row_builder_internal.h new file mode 100644 index 000000000..f3fd2b499 --- /dev/null +++ b/src/iceberg/arrow_row_builder_internal.h @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/arrow_row_builder_internal.h +/// Internal Arrow row-building utilities shared by metadata tables. +/// +/// Metadata tables (snapshots, history, manifests, ...) materialize in-memory +/// structures into Arrow batches that conform to the table's Iceberg schema. +/// `ArrowRowBuilder` wraps a nanoarrow `ArrowArray` initialized from such a +/// schema and exposes per-column access plus typed append helpers so each +/// metadata table can emit rows without re-implementing the nanoarrow +/// boilerplate. + +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Movable RAII builder that materializes rows into an Arrow struct array. +/// +/// Handles the nanoarrow lifecycle: InitFromSchema → StartAppending → +/// ... append values ... → FinishBuilding → Release. +/// +/// Two constructors: +/// - `Make(schema)` accepts an Iceberg Schema (typical for metadata tables). +/// - `Make(arrow_schema)` accepts a raw ArrowSchema (for lower-level callers +/// like position_delete_writer or manifest_adapter). +/// +/// Typical usage: +/// \code +/// ICEBERG_ASSIGN_OR_RAISE(auto builder, ArrowRowBuilder::Make(schema)); +/// for (const auto& row : rows) { +/// ICEBERG_RETURN_UNEXPECTED(AppendInt(builder.column(0), row.id)); +/// ICEBERG_RETURN_UNEXPECTED(AppendString(builder.column(1), row.name)); +/// ICEBERG_RETURN_UNEXPECTED(builder.FinishRow()); +/// } +/// ICEBERG_ASSIGN_OR_RAISE(auto array, std::move(builder).Finish()); +/// \endcode +class ICEBERG_EXPORT ArrowRowBuilder { + public: + /// \brief Create a row builder from an Iceberg schema. + static Result Make(const Schema& schema); + + /// \brief Create a row builder from an ArrowSchema. + /// + /// The schema must outlive this call (the caller guards it). On failure the + /// partially-initialized array is released automatically. + static Result Make(const ArrowSchema* schema); + + ArrowRowBuilder(ArrowRowBuilder&& other) noexcept; + ArrowRowBuilder& operator=(ArrowRowBuilder&& other) noexcept; + + ArrowRowBuilder(const ArrowRowBuilder&) = delete; + ArrowRowBuilder& operator=(const ArrowRowBuilder&) = delete; + + ~ArrowRowBuilder(); + + /// \brief The number of top-level columns in the batch. + int64_t num_columns() const; + + /// \brief Access the nanoarrow child builder for a top-level column. + /// + /// \param index Zero-based column index. Returns nullptr if out of range. + ArrowArray* column(int64_t index); + + /// \brief Finish the current row, advancing the struct length by one. + /// + /// Call after appending exactly one value (or null) to every column. + Status FinishRow(); + + /// \brief Finish building and transfer ownership of the resulting array. + /// + /// The builder must not be used after this call. + Result Finish() &&; + + private: + ArrowRowBuilder() = default; + ArrowArray array_{}; +}; + +/// \brief Append a null to a nanoarrow array builder. +ICEBERG_EXPORT Status AppendNull(ArrowArray* array); + +/// \brief Append a boolean value to a nanoarrow array builder. +ICEBERG_EXPORT Status AppendBoolean(ArrowArray* array, bool value); + +/// \brief Append an integer value to a nanoarrow array builder. +/// +/// Works for int32/int64/timestamp columns, which nanoarrow stores as int64. +ICEBERG_EXPORT Status AppendInt(ArrowArray* array, int64_t value); + +/// \brief Append a string value to a nanoarrow array builder. +ICEBERG_EXPORT Status AppendString(ArrowArray* array, std::string_view value); + +/// \brief Append a map value to a nanoarrow map array builder. +/// +/// Appends one (possibly empty) map element. The iteration order of the +/// resulting entries is unspecified. +ICEBERG_EXPORT Status AppendStringMap( + ArrowArray* array, const std::unordered_map& entries); + +} // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 7bd2e052c..faeee00c1 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -45,6 +45,7 @@ iceberg_include_dir = include_directories('..') iceberg_sources = files( 'arrow_c_data_guard_internal.cc', 'arrow_c_data_util.cc', + 'arrow_row_builder.cc', 'catalog/memory/in_memory_catalog.cc', 'catalog/session_catalog.cc', 'catalog/session_context.cc', diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index bf00c91ac..add20985b 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -89,7 +89,6 @@ add_iceberg_test(schema_test add_iceberg_test(table_test SOURCES location_provider_test.cc - metadata_table_test.cc metrics_config_test.cc metrics_reporter_test.cc metrics_test.cc @@ -185,6 +184,8 @@ if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(catalog_test USE_BUNDLE SOURCES in_memory_catalog_test.cc) + add_iceberg_test(metadata_table_test USE_BUNDLE SOURCES metadata_table_test.cc) + add_iceberg_test(eval_expr_test USE_BUNDLE SOURCES @@ -244,6 +245,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES arrow_c_data_util_test.cc + arrow_row_builder_test.cc data_writer_test.cc delete_filter_test.cc delete_loader_test.cc diff --git a/src/iceberg/test/arrow_row_builder_test.cc b/src/iceberg/test/arrow_row_builder_test.cc new file mode 100644 index 000000000..45fb3b787 --- /dev/null +++ b/src/iceberg/test/arrow_row_builder_test.cc @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/// \file arrow_row_builder_test.cc +/// Unit tests for ArrowRowBuilder and its typed append helpers. + +#include +#include + +#include +#include +#include +#include + +#include "iceberg/arrow_row_builder_internal.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" +#include "iceberg/test/matchers.h" +#include "iceberg/type.h" + +namespace iceberg { +namespace { + +/// \brief A schema exercising every append helper: int32, string, int64, +/// boolean, and map. +std::shared_ptr MakeTestSchema() { + return std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "count", int64()), + SchemaField::MakeOptional(4, "active", boolean()), + SchemaField::MakeOptional( + 5, "props", + std::make_shared(SchemaField::MakeRequired(6, "key", string()), + SchemaField::MakeRequired(7, "value", string())))}); +} + +/// \brief Finish a builder and import the result into an Arrow RecordBatch. +std::shared_ptr<::arrow::RecordBatch> FinishAndImport(ArrowRowBuilder builder, + const Schema& schema) { + auto array_result = std::move(builder).Finish(); + EXPECT_THAT(array_result, IsOk()); + + ArrowSchema c_schema; + EXPECT_THAT(ToArrowSchema(schema, &c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportSchema(&c_schema).ValueOrDie(); + + // ImportRecordBatch takes ownership of the array and releases it. + return ::arrow::ImportRecordBatch(&array_result.value(), arrow_schema).ValueOrDie(); +} + +} // namespace + +TEST(ArrowRowBuilderTest, BuildsRowsWithTypedValues) { + auto schema = MakeTestSchema(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, ArrowRowBuilder::Make(*schema)); + + ASSERT_EQ(builder.num_columns(), 5); + + // Row 0 + ASSERT_THAT(AppendInt(builder.column(0), 1), IsOk()); + ASSERT_THAT(AppendString(builder.column(1), "alice"), IsOk()); + ASSERT_THAT(AppendInt(builder.column(2), 100), IsOk()); + ASSERT_THAT(AppendBoolean(builder.column(3), true), IsOk()); + ASSERT_THAT(AppendStringMap(builder.column(4), {{"k", "v"}}), IsOk()); + ASSERT_THAT(builder.FinishRow(), IsOk()); + + // Row 1 + ASSERT_THAT(AppendInt(builder.column(0), 2), IsOk()); + ASSERT_THAT(AppendString(builder.column(1), "bob"), IsOk()); + ASSERT_THAT(AppendInt(builder.column(2), 200), IsOk()); + ASSERT_THAT(AppendBoolean(builder.column(3), false), IsOk()); + ASSERT_THAT(AppendStringMap(builder.column(4), {}), IsOk()); + ASSERT_THAT(builder.FinishRow(), IsOk()); + + auto batch = FinishAndImport(std::move(builder), *schema); + ASSERT_EQ(batch->num_rows(), 2); + ASSERT_EQ(batch->num_columns(), 5); + + auto id = std::static_pointer_cast<::arrow::Int32Array>(batch->column(0)); + EXPECT_EQ(id->Value(0), 1); + EXPECT_EQ(id->Value(1), 2); + + auto name = std::static_pointer_cast<::arrow::StringArray>(batch->column(1)); + EXPECT_EQ(name->GetString(0), "alice"); + EXPECT_EQ(name->GetString(1), "bob"); + + auto count = std::static_pointer_cast<::arrow::Int64Array>(batch->column(2)); + EXPECT_EQ(count->Value(0), 100); + EXPECT_EQ(count->Value(1), 200); + + auto active = std::static_pointer_cast<::arrow::BooleanArray>(batch->column(3)); + EXPECT_TRUE(active->Value(0)); + EXPECT_FALSE(active->Value(1)); + + // props: one entry in row 0, empty (but non-null) map in row 1. + auto props = std::static_pointer_cast<::arrow::MapArray>(batch->column(4)); + EXPECT_FALSE(props->IsNull(0)); + EXPECT_FALSE(props->IsNull(1)); + EXPECT_EQ(props->value_length(0), 1); + EXPECT_EQ(props->value_length(1), 0); +} + +TEST(ArrowRowBuilderTest, AppendsNullForOptionalColumns) { + auto schema = MakeTestSchema(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, ArrowRowBuilder::Make(*schema)); + + ASSERT_THAT(AppendInt(builder.column(0), 42), IsOk()); + ASSERT_THAT(AppendNull(builder.column(1)), IsOk()); + ASSERT_THAT(AppendNull(builder.column(2)), IsOk()); + ASSERT_THAT(AppendNull(builder.column(3)), IsOk()); + ASSERT_THAT(AppendStringMap(builder.column(4), {}), IsOk()); + ASSERT_THAT(builder.FinishRow(), IsOk()); + + auto batch = FinishAndImport(std::move(builder), *schema); + ASSERT_EQ(batch->num_rows(), 1); + + auto id = std::static_pointer_cast<::arrow::Int32Array>(batch->column(0)); + EXPECT_FALSE(id->IsNull(0)); + EXPECT_EQ(id->Value(0), 42); + + EXPECT_TRUE(batch->column(1)->IsNull(0)); + EXPECT_TRUE(batch->column(2)->IsNull(0)); + EXPECT_TRUE(batch->column(3)->IsNull(0)); +} + +TEST(ArrowRowBuilderTest, AppendsMultiEntryStringMap) { + auto schema = MakeTestSchema(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, ArrowRowBuilder::Make(*schema)); + + ASSERT_THAT(AppendInt(builder.column(0), 1), IsOk()); + ASSERT_THAT(AppendNull(builder.column(1)), IsOk()); + ASSERT_THAT(AppendNull(builder.column(2)), IsOk()); + ASSERT_THAT(AppendNull(builder.column(3)), IsOk()); + ASSERT_THAT(AppendStringMap(builder.column(4), {{"a", "1"}, {"b", "2"}, {"c", "3"}}), + IsOk()); + ASSERT_THAT(builder.FinishRow(), IsOk()); + + auto batch = FinishAndImport(std::move(builder), *schema); + auto props = std::static_pointer_cast<::arrow::MapArray>(batch->column(4)); + EXPECT_EQ(props->value_length(0), 3); +} + +TEST(ArrowRowBuilderTest, EmptyBuilderProducesZeroRowBatch) { + auto schema = MakeTestSchema(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, ArrowRowBuilder::Make(*schema)); + + auto batch = FinishAndImport(std::move(builder), *schema); + EXPECT_EQ(batch->num_rows(), 0); + EXPECT_EQ(batch->num_columns(), 5); +} + +TEST(ArrowRowBuilderTest, ColumnIndexOutOfRangeReturnsNull) { + auto schema = MakeTestSchema(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, ArrowRowBuilder::Make(*schema)); + + EXPECT_EQ(builder.num_columns(), 5); + EXPECT_NE(builder.column(0), nullptr); + EXPECT_NE(builder.column(4), nullptr); + EXPECT_EQ(builder.column(-1), nullptr); + EXPECT_EQ(builder.column(5), nullptr); +} + +} // namespace iceberg