fix: override CachedInputStream::Advance to avoid real I/O for skipped pages#322
Open
zhf999 wants to merge 7 commits into
Open
fix: override CachedInputStream::Advance to avoid real I/O for skipped pages#322zhf999 wants to merge 7 commits into
zhf999 wants to merge 7 commits into
Conversation
…edRowGroupReader::ReadFilteredRowGroup to avoid performance drop on wide tables
…n arrow::CachedInputStream
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR optimizes Parquet page-level filtering by reusing row-group page index metadata across column reads, and adjusts Arrow/Parquet stream behavior to avoid unnecessary I/O when skipping pages.
Changes:
- Switch
ReadFilteredColumnto accept aRowGroupPageIndexReaderdirectly (instead ofPageIndexReader) and reuse it across columns in a row group. - Cache
RowGroupPageIndexReaderonce per row group to avoid repeated metadata reads. - Patch Arrow to allow overriding
InputStream::Advance()and implement a no-I/OAdvance()for skipped pages.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| src/paimon/format/parquet/page_filtered_row_group_reader.h | Updates the ReadFilteredColumn API to take a row-group page index reader. |
| src/paimon/format/parquet/page_filtered_row_group_reader.cpp | Reuses RowGroupPageIndexReader across column reads and simplifies offset-index lookup. |
| cmake_modules/arrow.diff | Patches Arrow/Parquet to make Advance() virtual and overrides it to skip without triggering reads. |
Comments suppressed due to low confidence (1)
cmake_modules/arrow.diff:1
- The
Advance()implementation can moveposition_backwards or past bounds whennbytes < 0or whenposition_ > length_(makinglength_ - position_negative). This can corrupt stream state. Consider validatingnbytes >= 0, clamping using a non-negative remaining byte count (e.g.,remaining = std::max<int64_t>(0, length_ - position_)), and returning an error status for invalid inputs or inconsistent internal state.
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+428
to
+432
| /// \brief Advance or skip stream indicated number of bytes | ||
| /// \param[in] nbytes the number to move forward | ||
| /// \return Status | ||
| - Status Advance(int64_t nbytes); | ||
| + virtual Status Advance(int64_t nbytes); |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (2)
cmake_modules/arrow.diff:1
- Making
InputStream::Advance()virtual changes the class vtable and is an ABI-breaking change for libarrow consumers. If this repo packages/ships Arrow as a shared library (or links against a system-provided Arrow), this can cause runtime crashes or ODR/ABI mismatches unless all dependents are rebuilt against the same headers and binaries. Prefer an approach that avoids changing the public ABI (e.g., adjust the Parquet skipping logic to use existing APIs such asSeek()where available, or move the optimization into Parquet-specific code that doesn’t require changingarrow::io::InputStream’s virtual surface). If the ABI break is acceptable here, it should be paired with an explicit guarantee that Arrow is always built from this patched source and not mixed with system Arrow.
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
cmake_modules/arrow.diff:1
std::min(nbytes, length_ - position_)can moveposition_backwards ifnbytesis negative, and it can also behave incorrectly ifposition_ > length_(sincelength_ - position_is negative). This can corrupt subsequent reads/offset calculations. Add an explicitnbytes >= 0check (returning an error for negative advances), and compute the new position using a saturating/monotonic update (e.g., clamp to[0, length_]without relying onlength_ - position_being non-negative).
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: close #137
This PR depends on #316 and should be merged after that PR is merged.
When page-level filtering skips non-matching pages via
data_page_filter,SerializedPageReader::NextPage()callsstream_->Advance(compressed_len)toskip the page payload. However,
InputStream::Advance()is non-virtual and itsdefault implementation calls
Read()then discards the result. ForCachedInputStream(introduced in #232), this triggerssource_->ReadAt()oncache miss — reading the entire compressed payload of skipped pages from remote
storage, completely defeating the purpose of page-level I/O skipping.
This PR fixes the issue by:
arrow::io::InputStream::Advance()to bevirtual, allowingsubclasses to override it.
Advance()inCachedInputStreamto simply move the streamposition without any I/O, since the skipped data will never be consumed.
Read()retains its original fallback-to-source behavior on cache miss,ensuring correctness if a matched page unexpectedly misses the cache.
Tests
API and Format
virtualtoarrow::io::InputStream::Advance()declaration in theArrow patch (
arrow.diff). No public paimon-cpp API changes.Documentation
No.
Generative AI tooling
Generated-by: Aone Copilot (Claude 4.7 Opus)