feat(fs): support uint64 file IO sizes#326
Open
zjw1111 wants to merge 2 commits into
Open
Conversation
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 75 out of 75 changed files in this pull request and generated 35 comments.
Comments suppressed due to low confidence (8)
src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp:117
- Tell()/GetSize() return int64_t to Arrow; converting uint64_t positions/sizes with a plain cast can overflow and yield negative values when the underlying stream exceeds INT64_MAX.
src/paimon/common/io/data_input_stream.cpp:110 - DataInputStream::AssertBoundary uses
pos + need_length > lengthon uint64_t values; this addition can overflow and incorrectly pass the boundary check for very large need_length values. Use a subtraction-based check to avoid overflow.
Status DataInputStream::AssertBoundary(uint64_t need_length) const {
// TODO(jinli.zjw): Store current_pos and file_length as member variables to reduce the overhead
// of I/O calls.
PAIMON_ASSIGN_OR_RAISE(uint64_t pos, input_stream_->GetPos());
PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length());
if (pos + need_length > length) {
return Status::Invalid(
fmt::format("DataInputStream assert boundary failed: need length {}, current position "
"{}, exceed length {}",
need_length, pos, length));
}
return Status::OK();
src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp:117
- ArrowInputStreamAdapter::Tell()/GetSize() return int64_t to Arrow; casting uint64_t positions/sizes without a bounds check can overflow and return negative values when the stream exceeds INT64_MAX.
src/paimon/common/data/serializer/binary_row_serializer.cpp:52 - BinaryRowSerializer::Deserialize should validate read_length before casting to uint64_t for allocation; otherwise a negative length becomes a huge allocation attempt.
Result<BinaryRow> BinaryRowSerializer::Deserialize(DataInputStream* source) const {
BinaryRow row(num_fields_);
PAIMON_ASSIGN_OR_RAISE(int32_t read_length, source->ReadValue<int32_t>());
std::shared_ptr<Bytes> bytes =
Bytes::AllocateBytes(static_cast<uint64_t>(read_length), pool_.get());
PAIMON_RETURN_NOT_OK(source->ReadBytes(bytes.get()));
row.PointTo(MemorySegment::Wrap(bytes), 0, read_length);
return row;
}
src/paimon/common/io/data_input_stream.cpp:110
- DataInputStream::AssertBoundary uses
pos + need_length > lengthwith uint64_t values;pos + need_lengthcan overflow and incorrectly pass the boundary check. Use a subtraction-based check to avoid overflow.
Status DataInputStream::AssertBoundary(uint64_t need_length) const {
// TODO(jinli.zjw): Store current_pos and file_length as member variables to reduce the overhead
// of I/O calls.
PAIMON_ASSIGN_OR_RAISE(uint64_t pos, input_stream_->GetPos());
PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length());
if (pos + need_length > length) {
return Status::Invalid(
fmt::format("DataInputStream assert boundary failed: need length {}, current position "
"{}, exceed length {}",
need_length, pos, length));
}
return Status::OK();
src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp:117
- ArrowInputStreamAdapter::Tell()/GetSize() return int64_t to Arrow; casting uint64_t positions/sizes without a bounds check can overflow and return negative values when the stream exceeds INT64_MAX.
src/paimon/common/data/serializer/binary_row_serializer.cpp:52 - BinaryRowSerializer::Deserialize should validate read_length before casting to uint64_t for allocation; otherwise a negative length becomes a huge allocation attempt.
Result<BinaryRow> BinaryRowSerializer::Deserialize(DataInputStream* source) const {
BinaryRow row(num_fields_);
PAIMON_ASSIGN_OR_RAISE(int32_t read_length, source->ReadValue<int32_t>());
std::shared_ptr<Bytes> bytes =
Bytes::AllocateBytes(static_cast<uint64_t>(read_length), pool_.get());
PAIMON_RETURN_NOT_OK(source->ReadBytes(bytes.get()));
row.PointTo(MemorySegment::Wrap(bytes), 0, read_length);
return row;
}
src/paimon/common/io/data_input_stream.cpp:110
- DataInputStream::AssertBoundary uses
pos + need_length > lengthwith uint64_t values;pos + need_lengthcan overflow and incorrectly pass the boundary check. Use a subtraction-based check to avoid overflow.
Status DataInputStream::AssertBoundary(uint64_t need_length) const {
// TODO(jinli.zjw): Store current_pos and file_length as member variables to reduce the overhead
// of I/O calls.
PAIMON_ASSIGN_OR_RAISE(uint64_t pos, input_stream_->GetPos());
PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length());
if (pos + need_length > length) {
return Status::Invalid(
fmt::format("DataInputStream assert boundary failed: need length {}, current position "
"{}, exceed length {}",
need_length, pos, length));
}
return Status::OK();
Comment on lines
45
to
52
| arrow::Result<int64_t> ArrowInputStreamAdapter::Read(int64_t nbytes, void* out) { | ||
| ARROW_RETURN_NOT_OK(ValidateArrowIoRange<uint32_t>(nbytes, "nbytes")); | ||
| Result<int32_t> read_bytes = | ||
| input_stream_->Read(static_cast<char*>(out), static_cast<uint32_t>(nbytes)); | ||
| Result<uint64_t> read_bytes = | ||
| input_stream_->Read(static_cast<char*>(out), static_cast<uint64_t>(nbytes)); | ||
| if (!read_bytes.ok()) { | ||
| return ToArrowStatus(read_bytes.status()); | ||
| } | ||
| return read_bytes.value(); | ||
| return static_cast<int64_t>(read_bytes.value()); | ||
| } |
Comment on lines
64
to
72
| arrow::Result<int64_t> ArrowInputStreamAdapter::ReadAt(int64_t position, int64_t nbytes, | ||
| void* out) { | ||
| ARROW_RETURN_NOT_OK(ValidateArrowIoRange<uint64_t>(position, "position")); | ||
| ARROW_RETURN_NOT_OK(ValidateArrowIoRange<uint32_t>(nbytes, "nbytes")); | ||
| Result<int32_t> read_bytes = input_stream_->Read( | ||
| static_cast<char*>(out), static_cast<uint32_t>(nbytes), static_cast<uint64_t>(position)); | ||
| Result<uint64_t> read_bytes = input_stream_->Read( | ||
| static_cast<char*>(out), static_cast<uint64_t>(nbytes), static_cast<uint64_t>(position)); | ||
| if (!read_bytes.ok()) { | ||
| return ToArrowStatus(read_bytes.status()); | ||
| } | ||
| return read_bytes.value(); | ||
| return static_cast<int64_t>(read_bytes.value()); | ||
| } |
Comment on lines
85
to
105
| @@ -123,7 +93,7 @@ arrow::Future<std::shared_ptr<arrow::Buffer>> ArrowInputStreamAdapter::ReadAsync | |||
| } | |||
| std::shared_ptr<arrow::Buffer> buffer = std::move(buffer_result).ValueUnsafe(); | |||
| input_stream_->ReadAsync(reinterpret_cast<char*>(buffer->mutable_data()), | |||
| static_cast<uint32_t>(nbytes), static_cast<uint64_t>(position), | |||
| static_cast<uint64_t>(nbytes), static_cast<uint64_t>(position), | |||
| [fut, buffer](Status callback_status) mutable { | |||
| if (callback_status.ok()) { | |||
| fut.MarkFinished(std::move(buffer)); | |||
| @@ -135,11 +105,11 @@ arrow::Future<std::shared_ptr<arrow::Buffer>> ArrowInputStreamAdapter::ReadAsync | |||
| } | |||
Comment on lines
50
to
60
| arrow::Status ArrowOutputStreamAdapter::Write(const void* data, int64_t nbytes) { | ||
| if (!InRange<uint32_t>(nbytes)) { | ||
| return arrow::Status::Invalid( | ||
| fmt::format("nbytes value {} is out of bound of uint32_t", nbytes)); | ||
| } | ||
| Result<int32_t> len = | ||
| out_->Write(static_cast<const char*>(data), static_cast<uint32_t>(nbytes)); | ||
| Result<uint64_t> len = | ||
| out_->Write(static_cast<const char*>(data), static_cast<uint64_t>(nbytes)); | ||
| if (!len.ok()) { | ||
| return ToArrowStatus(len.status()); | ||
| } | ||
| if (len.value() != static_cast<uint64_t>(nbytes)) { | ||
| return arrow::Status::IOError( | ||
| fmt::format("expect write len {} mismatch actual write len {}", nbytes, len.value())); | ||
| } | ||
| return arrow::Status::OK(); |
Comment on lines
38
to
44
| arrow::Result<int64_t> ArrowOutputStreamAdapter::Tell() const { | ||
| paimon::Result<int64_t> pos = out_->GetPos(); | ||
| paimon::Result<uint64_t> pos = out_->GetPos(); | ||
| if (!pos.ok()) { | ||
| return ToArrowStatus(pos.status()); | ||
| } | ||
| return pos.value(); | ||
| return static_cast<int64_t>(pos.value()); | ||
| } |
Comment on lines
+187
to
+190
| PAIMON_ASSIGN_OR_RAISE(uint64_t pos, out->GetPos()); | ||
| PAIMON_RETURN_NOT_OK(out->Close()); | ||
| guard.Release(); | ||
| return std::make_pair(PathUtil::GetName(file_path), pos); | ||
| return std::make_pair(PathUtil::GetName(file_path), static_cast<int64_t>(pos)); |
Comment on lines
+87
to
92
| Result<uint64_t> OffsetInputStream::Read(char* buffer, uint64_t size) { | ||
| PAIMON_RETURN_NOT_OK(AssertBoundary(static_cast<int64_t>(inner_position_ + size))); | ||
| PAIMON_ASSIGN_OR_RAISE(uint64_t actual_read_len, wrapped_->Read(buffer, size)); | ||
| inner_position_ += actual_read_len; | ||
| return actual_read_len; | ||
| } |
Comment on lines
+94
to
107
| Result<uint64_t> OffsetInputStream::Read(char* buffer, uint64_t size, uint64_t offset) { | ||
| PAIMON_RETURN_NOT_OK(AssertBoundary(static_cast<int64_t>(offset + size))); | ||
| return wrapped_->Read(buffer, size, static_cast<uint64_t>(offset_) + offset); | ||
| } | ||
|
|
||
| void OffsetInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset, | ||
| void OffsetInputStream::ReadAsync(char* buffer, uint64_t size, uint64_t offset, | ||
| std::function<void(Status)>&& callback) { | ||
| auto status = AssertBoundary(offset); | ||
| auto status = AssertBoundary(static_cast<int64_t>(offset + size)); | ||
| if (!status.ok()) { | ||
| callback(status); | ||
| return; | ||
| } | ||
| status = AssertBoundary(offset + size); | ||
| if (!status.ok()) { | ||
| callback(status); | ||
| return; | ||
| } | ||
| wrapped_->ReadAsync(buffer, size, offset_ + offset, std::move(callback)); | ||
| wrapped_->ReadAsync(buffer, size, static_cast<uint64_t>(offset_) + offset, std::move(callback)); | ||
| } |
Comment on lines
184
to
191
| Result<uint64_t> AvroFileBatchReader::GetNumberOfRows() const { | ||
| if (!total_rows_) { | ||
| PAIMON_ASSIGN_OR_RAISE(int64_t current_pos, input_stream_->GetPos()); | ||
| ScopeGuard stream_guard([this, current_pos]() -> void { | ||
| PAIMON_ASSIGN_OR_RAISE(uint64_t current_pos, input_stream_->GetPos()); | ||
| auto seek_pos = static_cast<int64_t>(current_pos); | ||
| ScopeGuard stream_guard([this, seek_pos]() -> void { | ||
| // reset input stream position to original position | ||
| Status status = input_stream_->Seek(current_pos, SeekOrigin::FS_SEEK_SET); | ||
| Status status = input_stream_->Seek(seek_pos, SeekOrigin::FS_SEEK_SET); | ||
| (void)status; |
Comment on lines
+187
to
+190
| PAIMON_ASSIGN_OR_RAISE(uint64_t pos, out->GetPos()); | ||
| PAIMON_RETURN_NOT_OK(out->Close()); | ||
| guard.Release(); | ||
| return std::make_pair(PathUtil::GetName(file_path), pos); | ||
| return std::make_pair(PathUtil::GetName(file_path), static_cast<int64_t>(pos)); |
lxy-9602
reviewed
Jun 2, 2026
|
|
||
| BufferedInputStream::BufferedInputStream(const std::shared_ptr<InputStream>& in, | ||
| int32_t buffer_size, MemoryPool* pool) | ||
| : buffer_size_(buffer_size), in_(in) { |
Collaborator
There was a problem hiding this comment.
Consider change buffer_size to uint64_t
| return wrapped_->Read(buffer, size); | ||
| Result<uint64_t> OffsetInputStream::Read(char* buffer, uint64_t size) { | ||
| PAIMON_RETURN_NOT_OK(AssertBoundary(static_cast<int64_t>(inner_position_ + size))); | ||
| PAIMON_ASSIGN_OR_RAISE(uint64_t actual_read_len, wrapped_->Read(buffer, size)); |
Collaborator
There was a problem hiding this comment.
Why AssertBoundary need int64? Is uint64_t also acceptable?
| std::shared_ptr<InputStream> wrapped_; | ||
| const int64_t length_; | ||
| const int64_t offset_; | ||
| int64_t inner_position_ = 0; |
Collaborator
There was a problem hiding this comment.
why offset_ and length_ can be int64 rather than uint64?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: N/A
This PR updates file system and stream IO size contracts from 32-bit sizes to
uint64_t, includingRead,ReadAsync,Write,GetPos, and related local/Jindo implementations.It also simplifies call sites that previously looped or added overflow checks only to bridge 32-bit IO boundaries, while keeping semantic validation such as subrange boundary checks, negative file/result lengths, short read/write checks, and 64-bit to 32-bit narrowing checks where the target format still requires a 32-bit value.
Tests
API and Format
This changes public and internal C++ IO API signatures in
include/paimon/fs/file_system.hand related stream interfaces to useuint64_tfor sizes and positions.No storage format or protocol change is intended.
Documentation
No.
Generative AI tooling
Generated-by: OpenAI Codex