Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.internal.filter2.columnindex;
package org.apache.parquet.filter2.columnindex;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -33,7 +33,8 @@
* filtering. To be used iterate over the matching row indexes to be read from a row-group, retrieve the count of the
* matching rows or check overlapping of a row index range.
*
* @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long)
* @see org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter#calculateRowRanges(Filter,
* org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore, Set, long)
*/
public class RowRanges {
// Make it public because some uppler layer application need to access it
Expand Down Expand Up @@ -316,4 +317,81 @@ public List<Range> getRanges() {
public String toString() {
return ranges.toString();
}

/**

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a clearer API commitment. The motivation is external readers/Spark, but the type lives under internal and the PR says there are no user-facing changes. If this is meant to be supported externally, the package/API contract should say so; otherwise downstream users may depend on an unsupported API.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, at the time, it was yet another approach to tighten the amount of publicly available API in parquet-java. It quickly turned out that 3rd parties want to use it. For the next major release it is a good candidate to be moved to a more public package. Maybe deprecate it here, and also create at another location?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wgtmac, @gszadovszky, I wonder which package could be the right place for RowRanges and the new builder? Shall I try to move/deprecate them in this PR or separate one?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A proper target package would be the same as now just skip the internal part. I am fine moving/deprecating in this PR. Theoretically, we can simply move it without deprecation since it is "internal" currently, so we have no guarantees for backward compatibility. WDYT, @wgtmac?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, moved in 8d8e2dd.

One thing to flag: the relocation changes a public signature ParquetFileReader.readFilteredRowGroup(int, RowRanges) now takes the new public RowRanges instead of the internal one. japicmp reports this as METHOD_REMOVED (and the inherited form on CompressionConverter$TransParquetFileReader), so I added excludes for:

  • org.apache.parquet.internal.filter2.columnindex.RowRanges (+ $Range)
  • org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter (return-type changes)
  • org.apache.parquet.hadoop.ParquetFileReader#readFilteredRowGroup(int, …internal…RowRanges) (+ the TransParquetFileReader inherited form)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I haven't noticed that, sorry. Then, we probably need to get back to the deprecated/move pattern. 😞
Let's move to the new place, extend the class as required at the old place and deprecate. Also deprecate the public API that references the old place and create new methods for the new one. Mark 2.0 as the removal version. WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the released (1.17.0) surface: the only genuinely-public, cross-package method exposing RowRanges is ParquetFileReader.readFilteredRowGroup(int, RowRanges). ParquetFileReader.getRowRanges(int), that returns RowRanges was only made public on master (1.18.0).
Everything else is under org.apache.parquet.internal.*.

Is it ok if we preserve compatibility for just readFilteredRowGroup(int, RowRanges) and treat the internal.* relocations as internal changes with no compat guarantee?

* @return a new {@link Builder} for constructing a {@link RowRanges} from a sequence of
* selected row indices.
*/
public static Builder builder() {
return new Builder();
}

/**
* Constructs a {@link RowRanges} by appending selected row indices in strictly increasing
* order. Consecutive indices are coalesced into a single {@link Range}; gaps close the
* current run and start a new one.
*
* <p>Usage:
* <pre>{@code
* RowRanges.Builder builder = RowRanges.builder();
* for (long row : selectedRowsInOrder) {
* builder.addSelectedRow(row);
* }
* RowRanges ranges = builder.build();
* }</pre>
*/
public static final class Builder {
private final List<Range> ranges = new ArrayList<>();
private long runStart = -1; // -1 = no active run
private long runEnd = -1; // valid iff runStart >= 0

private Builder() {}

/**
* Marks {@code rowIndex} as selected. The value is a 0-based row index within the current row
* group. Must be called in strictly increasing order; calling with a value less than or equal
* to the previous call's value throws {@link IllegalArgumentException}.
*
* @param rowIndex the 0-based row index to mark selected (must be {@code >} the last value
* passed and non-negative)
* @return this builder for chaining
*/
public Builder addSelectedRow(long rowIndex) {
if (rowIndex < 0) {
throw new IllegalArgumentException("addSelectedRow requires a non-negative row index; got " + rowIndex);
}
if (runStart < 0) {
runStart = rowIndex;
runEnd = rowIndex;
} else if (rowIndex == runEnd + 1) {
runEnd = rowIndex;
} else if (rowIndex > runEnd + 1) {
ranges.add(new Range(runStart, runEnd));
runStart = rowIndex;
runEnd = rowIndex;
} else {
throw new IllegalArgumentException("addSelectedRow requires strictly increasing row indices; got "
+ rowIndex + " after " + runEnd);
}
return this;
}

/**
* Returns a snapshot of the rows selected so far. The returned {@link RowRanges} is independent
* of this builder, so the builder may continue to be used afterwards without affecting it.
*
* @return the constructed {@link RowRanges}, or {@link RowRanges#EMPTY} when no rows were
* selected.
*/
public RowRanges build() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build() should return a snapshot or become terminal. As written, the returned RowRanges shares the builder’s mutable ranges list, so later calls on the same builder can mutate a previously built result and even break the sorted-ranges invariant.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 17cce69.

List<Range> snapshot = new ArrayList<>(ranges);
if (runStart >= 0) {
snapshot.add(new Range(runStart, runEnd));
}
if (snapshot.isEmpty()) {
return RowRanges.EMPTY;
}
return new RowRanges(snapshot);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.function.Function;
import org.apache.parquet.filter2.columnindex.RowRanges;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.internal.filter2.columnindex;
package org.apache.parquet.filter2.columnindex;

import static org.apache.parquet.internal.filter2.columnindex.RowRanges.intersection;
import static org.apache.parquet.internal.filter2.columnindex.RowRanges.union;
import static org.apache.parquet.filter2.columnindex.RowRanges.intersection;
import static org.apache.parquet.filter2.columnindex.RowRanges.union;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -152,4 +152,138 @@ public void testIntersection() {
assertAllRowsEqual(intersection(empty, ranges2).iterator());
assertAllRowsEqual(intersection(empty, empty).iterator());
}

@Test
public void testBuilderBasic() {
// Select rows 2, 3, 4, 5 (one contiguous run)
RowRanges ranges = RowRanges.builder()
.addSelectedRow(2)
.addSelectedRow(3)
.addSelectedRow(4)
.addSelectedRow(5)
.build();
assertAllRowsEqual(ranges.iterator(), 2, 3, 4, 5);
assertEquals(4, ranges.rowCount());
}

@Test
public void testBuilderMultipleRanges() {
// Two runs: 1-2 and 5-7
RowRanges ranges = RowRanges.builder()
.addSelectedRow(1)
.addSelectedRow(2)
.addSelectedRow(5)
.addSelectedRow(6)
.addSelectedRow(7)
.build();
assertAllRowsEqual(ranges.iterator(), 1, 2, 5, 6, 7);
assertEquals(5, ranges.rowCount());
assertTrue(ranges.isOverlapping(1, 2));
assertTrue(ranges.isOverlapping(5, 7));
assertFalse(ranges.isOverlapping(3, 4));
}

@Test
public void testBuilderEmpty() {
// No rows selected
RowRanges ranges = RowRanges.builder().build();
assertEquals(RowRanges.EMPTY, ranges);
assertEquals(0, ranges.rowCount());
assertAllRowsEqual(ranges.iterator());
}

@Test
public void testBuilderAllSelected() {
// Five contiguous rows starting at 0
RowRanges.Builder builder = RowRanges.builder();
for (long i = 0; i < 5; i++) {
builder.addSelectedRow(i);
}
RowRanges ranges = builder.build();
assertAllRowsEqual(ranges.iterator(), 0, 1, 2, 3, 4);
assertEquals(5, ranges.rowCount());
}

@Test
public void testBuilderSingleRow() {
RowRanges ranges = RowRanges.builder().addSelectedRow(3).build();
assertAllRowsEqual(ranges.iterator(), 3);
assertEquals(1, ranges.rowCount());
assertTrue(ranges.isOverlapping(3, 3));
assertFalse(ranges.isOverlapping(0, 2));
assertFalse(ranges.isOverlapping(4, 10));
}

@Test
public void testBuilderAlternating() {
// Every other row selected: 0, 2, 4, 6, 8 — five singleton runs.
RowRanges.Builder builder = RowRanges.builder();
for (long i = 0; i < 10; i += 2) {
builder.addSelectedRow(i);
}
RowRanges ranges = builder.build();
assertAllRowsEqual(ranges.iterator(), 0, 2, 4, 6, 8);
assertEquals(5, ranges.rowCount());
}

@Test
public void testBuilderFirstAndLast() {
RowRanges ranges =
RowRanges.builder().addSelectedRow(0).addSelectedRow(99).build();
assertAllRowsEqual(ranges.iterator(), 0, 99);
assertEquals(2, ranges.rowCount());
}

@Test
public void testBuilderRejectsOutOfOrder() {
RowRanges.Builder builder = RowRanges.builder().addSelectedRow(5).addSelectedRow(7);
try {
builder.addSelectedRow(6);
org.junit.Assert.fail("expected IllegalArgumentException for out-of-order index");
} catch (IllegalArgumentException expected) {
// expected
}
}

@Test
public void testBuilderRejectsDuplicate() {
RowRanges.Builder builder = RowRanges.builder().addSelectedRow(3);
try {
builder.addSelectedRow(3);
org.junit.Assert.fail("expected IllegalArgumentException for duplicate index");
} catch (IllegalArgumentException expected) {
// expected
}
}

@Test
public void testBuilderRejectsNegativeRow() {
RowRanges.Builder builder = RowRanges.builder();
try {
builder.addSelectedRow(-1);
org.junit.Assert.fail("expected IllegalArgumentException for negative index");
} catch (IllegalArgumentException expected) {
// expected
}
}

@Test
public void testBuilderBuildReturnsSnapshot() {
// build() must return a snapshot: continuing to use the builder afterwards must not
// mutate a previously built result.
RowRanges.Builder builder = RowRanges.builder().addSelectedRow(0).addSelectedRow(1);
RowRanges first = builder.build();
assertAllRowsEqual(first.iterator(), 0, 1);
assertEquals(2, first.rowCount());

builder.addSelectedRow(5);
RowRanges second = builder.build();

// The first result is unchanged.
assertAllRowsEqual(first.iterator(), 0, 1);
assertEquals(2, first.rowCount());
// The second result reflects the additional row.
assertAllRowsEqual(second.iterator(), 0, 1, 5);
assertEquals(3, second.rowCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Set;
import java.util.stream.LongStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.filter2.columnindex.RowRanges;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.Statistics;
import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
import org.apache.parquet.crypto.AesCipher;
import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
import org.apache.parquet.filter2.columnindex.RowRanges;
import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.util.AutoCloseables;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import java.util.Formatter;
import java.util.List;
import java.util.Optional;
import org.apache.parquet.filter2.columnindex.RowRanges;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.filter2.columnindex.RowRanges;

/**
* Internal utility class to help at column index based filtering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.parquet.crypto.InternalFileDecryptor;
import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import org.apache.parquet.filter2.columnindex.RowRanges;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.RowGroupFilter;
import org.apache.parquet.format.BlockCipher;
Expand All @@ -111,7 +112,6 @@
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.ParquetDecodingException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.columnindex.RowRanges;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.junit.Before;
Expand Down
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,18 @@
<exclude>org.apache.parquet.avro.AvroReadSupport#AVRO_REQUESTED_PROJECTION</exclude>
<exclude>org.apache.parquet.avro.AvroReadSupport#AVRO_DATA_SUPPLIER</exclude>
<exclude>org.apache.parquet.hadoop.ParquetFileReader#PARQUET_READ_PARALLELISM</exclude>
<!-- GH-3596: RowRanges moved out of the internal package to
org.apache.parquet.filter2.columnindex. The internal package carries no
backward-compatibility guarantee, so the relocation and the resulting
ColumnIndexFilter return-type changes are intentional. -->
<exclude>org.apache.parquet.internal.filter2.columnindex.RowRanges</exclude>
<exclude>org.apache.parquet.internal.filter2.columnindex.RowRanges$Range</exclude>
<exclude>org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter</exclude>
<!-- GH-3596: ParquetFileReader#readFilteredRowGroup previously took the internal
RowRanges; it now takes the relocated public type. The signature change is the
point of the relocation (external readers can construct RowRanges via its Builder). -->
<exclude>org.apache.parquet.hadoop.ParquetFileReader#readFilteredRowGroup(int,org.apache.parquet.internal.filter2.columnindex.RowRanges)</exclude>
<exclude>org.apache.parquet.hadoop.util.CompressionConverter$TransParquetFileReader#readFilteredRowGroup(int,org.apache.parquet.internal.filter2.columnindex.RowRanges)</exclude>
</excludes>
</parameter>
</configuration>
Expand Down
Loading