Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
"$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src>")
set(ICEBERG_SOURCES
arrow_c_data_util.cc
arrow_c_data_guard_internal.cc
arrow_c_data_util.cc
arrow_row_builder.cc
catalog/memory/in_memory_catalog.cc
catalog/session_catalog.cc
catalog/session_context.cc
Expand Down
6 changes: 6 additions & 0 deletions src/iceberg/arrow_c_data_guard_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ class ICEBERG_EXPORT ArrowArrayGuard {
explicit ArrowArrayGuard(ArrowArray* array) : array_(array) {}
~ArrowArrayGuard();

/// \brief Release the guard without calling ArrowArrayRelease.
///
/// Call this when ownership of the underlying ArrowArray has been
/// transferred elsewhere and the guard should not release it.
void Release() { array_ = nullptr; }

private:
ArrowArray* array_;
};
Expand Down
137 changes: 137 additions & 0 deletions src/iceberg/arrow_row_builder.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <utility>

#include <nanoarrow/nanoarrow.h>

#include "iceberg/arrow/nanoarrow_status_internal.h"
#include "iceberg/arrow_c_data_guard_internal.h"
#include "iceberg/arrow_row_builder_internal.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"

namespace iceberg {

Result<ArrowRowBuilder> ArrowRowBuilder::Make(const Schema& schema) {
ArrowSchema arrow_schema;
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &arrow_schema));
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
return Make(&arrow_schema);
}

Result<ArrowRowBuilder> ArrowRowBuilder::Make(const ArrowSchema* schema) {
ArrowRowBuilder builder;
ArrowError error;
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
ArrowArrayInitFromSchema(&builder.array_, schema, &error), error);
// Guard the array in case StartAppending fails.
internal::ArrowArrayGuard guard(&builder.array_);
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&builder.array_));
// Ownership stays with the builder — disarm the guard.
guard.Release();
return builder;
}

ArrowRowBuilder::ArrowRowBuilder(ArrowRowBuilder&& other) noexcept
: array_(other.array_) {
other.array_.release = nullptr;
}

ArrowRowBuilder& ArrowRowBuilder::operator=(ArrowRowBuilder&& other) noexcept {
if (this != &other) {
if (array_.release != nullptr) {
ArrowArrayRelease(&array_);
}
array_ = other.array_;
other.array_.release = nullptr;
}
return *this;
}

ArrowRowBuilder::~ArrowRowBuilder() {
if (array_.release != nullptr) {
ArrowArrayRelease(&array_);
}
}

int64_t ArrowRowBuilder::num_columns() const { return array_.n_children; }

ArrowArray* ArrowRowBuilder::column(int64_t index) {
if (index < 0 || index >= array_.n_children) {
return nullptr;
}
return array_.children[index];
}

Status ArrowRowBuilder::FinishRow() {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&array_));
return {};
}

Result<ArrowArray> ArrowRowBuilder::Finish() && {
ArrowError error;
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
ArrowArrayFinishBuildingDefault(&array_, &error), error);
ArrowArray result = array_;
array_.release = nullptr;
return result;
}

Status AppendNull(ArrowArray* array) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
return {};
}

Status AppendBoolean(ArrowArray* array, bool value) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(array, value ? 1 : 0));
return {};
}

Status AppendInt(ArrowArray* array, int64_t value) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(array, value));
return {};
}

Status AppendString(ArrowArray* array, std::string_view value) {
ArrowStringView view(value.data(), static_cast<int64_t>(value.size()));
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(array, view));
return {};
}

Status AppendStringMap(ArrowArray* array,
const std::unordered_map<std::string, std::string>& entries) {
// A nanoarrow map array is a list of struct<key, value>. children[0] is the
// entries struct, whose children[0]/children[1] are the key/value builders.
ArrowArray* struct_array = array->children[0];
ArrowArray* key_array = struct_array->children[0];
ArrowArray* value_array = struct_array->children[1];

for (const auto& [key, value] : entries) {
ICEBERG_RETURN_UNEXPECTED(AppendString(key_array, key));
ICEBERG_RETURN_UNEXPECTED(AppendString(value_array, value));
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(struct_array));
}

// Finish the (possibly empty) map element on the outer list.
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(array));
return {};
}

} // namespace iceberg
126 changes: 126 additions & 0 deletions src/iceberg/arrow_row_builder_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/arrow_row_builder_internal.h
/// Internal Arrow row-building utilities shared by metadata tables.
///
/// Metadata tables (snapshots, history, manifests, ...) materialize in-memory
/// structures into Arrow batches that conform to the table's Iceberg schema.
/// `ArrowRowBuilder` wraps a nanoarrow `ArrowArray` initialized from such a
/// schema and exposes per-column access plus typed append helpers so each
/// metadata table can emit rows without re-implementing the nanoarrow
/// boilerplate.

#include <cstdint>
#include <string_view>
#include <unordered_map>

#include "iceberg/arrow_c_data.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Movable RAII builder that materializes rows into an Arrow struct array.
///
/// Handles the nanoarrow lifecycle: InitFromSchema → StartAppending →
/// ... append values ... → FinishBuilding → Release.
///
/// Two constructors:
/// - `Make(schema)` accepts an Iceberg Schema (typical for metadata tables).
/// - `Make(arrow_schema)` accepts a raw ArrowSchema (for lower-level callers
/// like position_delete_writer or manifest_adapter).
///
/// Typical usage:
/// \code
/// ICEBERG_ASSIGN_OR_RAISE(auto builder, ArrowRowBuilder::Make(schema));
/// for (const auto& row : rows) {
/// ICEBERG_RETURN_UNEXPECTED(AppendInt(builder.column(0), row.id));
/// ICEBERG_RETURN_UNEXPECTED(AppendString(builder.column(1), row.name));
/// ICEBERG_RETURN_UNEXPECTED(builder.FinishRow());
/// }
/// ICEBERG_ASSIGN_OR_RAISE(auto array, std::move(builder).Finish());
/// \endcode
class ICEBERG_EXPORT ArrowRowBuilder {
public:
/// \brief Create a row builder from an Iceberg schema.
static Result<ArrowRowBuilder> Make(const Schema& schema);

/// \brief Create a row builder from an ArrowSchema.
///
/// The schema must outlive this call (the caller guards it). On failure the
/// partially-initialized array is released automatically.
static Result<ArrowRowBuilder> Make(const ArrowSchema* schema);

ArrowRowBuilder(ArrowRowBuilder&& other) noexcept;
ArrowRowBuilder& operator=(ArrowRowBuilder&& other) noexcept;

ArrowRowBuilder(const ArrowRowBuilder&) = delete;
ArrowRowBuilder& operator=(const ArrowRowBuilder&) = delete;

~ArrowRowBuilder();

/// \brief The number of top-level columns in the batch.
int64_t num_columns() const;

/// \brief Access the nanoarrow child builder for a top-level column.
///
/// \param index Zero-based column index. Returns nullptr if out of range.
ArrowArray* column(int64_t index);

/// \brief Finish the current row, advancing the struct length by one.
///
/// Call after appending exactly one value (or null) to every column.
Status FinishRow();

/// \brief Finish building and transfer ownership of the resulting array.
///
/// The builder must not be used after this call.
Result<ArrowArray> Finish() &&;

private:
ArrowRowBuilder() = default;
ArrowArray array_{};
};

/// \brief Append a null to a nanoarrow array builder.
ICEBERG_EXPORT Status AppendNull(ArrowArray* array);

/// \brief Append a boolean value to a nanoarrow array builder.
ICEBERG_EXPORT Status AppendBoolean(ArrowArray* array, bool value);

/// \brief Append an integer value to a nanoarrow array builder.
///
/// Works for int32/int64/timestamp columns, which nanoarrow stores as int64.
ICEBERG_EXPORT Status AppendInt(ArrowArray* array, int64_t value);

/// \brief Append a string value to a nanoarrow array builder.
ICEBERG_EXPORT Status AppendString(ArrowArray* array, std::string_view value);

/// \brief Append a map<string, string> value to a nanoarrow map array builder.
///
/// Appends one (possibly empty) map element. The iteration order of the
/// resulting entries is unspecified.
ICEBERG_EXPORT Status AppendStringMap(
ArrowArray* array, const std::unordered_map<std::string, std::string>& entries);

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ iceberg_include_dir = include_directories('..')
iceberg_sources = files(
'arrow_c_data_guard_internal.cc',
'arrow_c_data_util.cc',
'arrow_row_builder.cc',
'catalog/memory/in_memory_catalog.cc',
'catalog/session_catalog.cc',
'catalog/session_context.cc',
Expand Down
4 changes: 3 additions & 1 deletion src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ add_iceberg_test(schema_test
add_iceberg_test(table_test
SOURCES
location_provider_test.cc
metadata_table_test.cc
metrics_config_test.cc
metrics_reporter_test.cc
metrics_test.cc
Expand Down Expand Up @@ -185,6 +184,8 @@ if(ICEBERG_BUILD_BUNDLE)

add_iceberg_test(catalog_test USE_BUNDLE SOURCES in_memory_catalog_test.cc)

add_iceberg_test(metadata_table_test USE_BUNDLE SOURCES metadata_table_test.cc)

add_iceberg_test(eval_expr_test
USE_BUNDLE
SOURCES
Expand Down Expand Up @@ -244,6 +245,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
arrow_c_data_util_test.cc
arrow_row_builder_test.cc
data_writer_test.cc
delete_filter_test.cc
delete_loader_test.cc
Expand Down
Loading
Loading