Skip to content

feat(fs): support uint64 file IO sizes#326

Open
zjw1111 wants to merge 2 commits into
mainfrom
codex/fs-size-uint64
Open

feat(fs): support uint64 file IO sizes#326
zjw1111 wants to merge 2 commits into
mainfrom
codex/fs-size-uint64

Conversation

@zjw1111
Copy link
Copy Markdown
Collaborator

@zjw1111 zjw1111 commented Jun 1, 2026

Purpose

Linked issue: N/A

This PR updates file system and stream IO size contracts from 32-bit sizes to uint64_t, including Read, ReadAsync, Write, GetPos, and related local/Jindo implementations.

It also simplifies call sites that previously looped or added overflow checks only to bridge 32-bit IO boundaries, while keeping semantic validation such as subrange boundary checks, negative file/result lengths, short read/write checks, and 64-bit to 32-bit narrowing checks where the target format still requires a 32-bit value.

Tests

API and Format

This changes public and internal C++ IO API signatures in include/paimon/fs/file_system.h and related stream interfaces to use uint64_t for sizes and positions.

No storage format or protocol change is intended.

Documentation

No.

Generative AI tooling

Generated-by: OpenAI Codex

@zjw1111 zjw1111 marked this pull request as ready for review June 1, 2026 09:23
Copilot AI review requested due to automatic review settings June 1, 2026 09:23
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 75 out of 75 changed files in this pull request and generated 35 comments.

Comments suppressed due to low confidence (8)

src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp:117

  • Tell()/GetSize() return int64_t to Arrow; converting uint64_t positions/sizes with a plain cast can overflow and yield negative values when the underlying stream exceeds INT64_MAX.
    src/paimon/common/io/data_input_stream.cpp:110
  • DataInputStream::AssertBoundary uses pos + need_length > length on uint64_t values; this addition can overflow and incorrectly pass the boundary check for very large need_length values. Use a subtraction-based check to avoid overflow.
Status DataInputStream::AssertBoundary(uint64_t need_length) const {
    // TODO(jinli.zjw): Store current_pos and file_length as member variables to reduce the overhead
    // of I/O calls.
    PAIMON_ASSIGN_OR_RAISE(uint64_t pos, input_stream_->GetPos());
    PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length());
    if (pos + need_length > length) {
        return Status::Invalid(
            fmt::format("DataInputStream assert boundary failed: need length {}, current position "
                        "{}, exceed length {}",
                        need_length, pos, length));
    }
    return Status::OK();

src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp:117

  • ArrowInputStreamAdapter::Tell()/GetSize() return int64_t to Arrow; casting uint64_t positions/sizes without a bounds check can overflow and return negative values when the stream exceeds INT64_MAX.
    src/paimon/common/data/serializer/binary_row_serializer.cpp:52
  • BinaryRowSerializer::Deserialize should validate read_length before casting to uint64_t for allocation; otherwise a negative length becomes a huge allocation attempt.
Result<BinaryRow> BinaryRowSerializer::Deserialize(DataInputStream* source) const {
    BinaryRow row(num_fields_);
    PAIMON_ASSIGN_OR_RAISE(int32_t read_length, source->ReadValue<int32_t>());
    std::shared_ptr<Bytes> bytes =
        Bytes::AllocateBytes(static_cast<uint64_t>(read_length), pool_.get());
    PAIMON_RETURN_NOT_OK(source->ReadBytes(bytes.get()));
    row.PointTo(MemorySegment::Wrap(bytes), 0, read_length);
    return row;
}

src/paimon/common/io/data_input_stream.cpp:110

  • DataInputStream::AssertBoundary uses pos + need_length > length with uint64_t values; pos + need_length can overflow and incorrectly pass the boundary check. Use a subtraction-based check to avoid overflow.
Status DataInputStream::AssertBoundary(uint64_t need_length) const {
    // TODO(jinli.zjw): Store current_pos and file_length as member variables to reduce the overhead
    // of I/O calls.
    PAIMON_ASSIGN_OR_RAISE(uint64_t pos, input_stream_->GetPos());
    PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length());
    if (pos + need_length > length) {
        return Status::Invalid(
            fmt::format("DataInputStream assert boundary failed: need length {}, current position "
                        "{}, exceed length {}",
                        need_length, pos, length));
    }
    return Status::OK();

src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp:117

  • ArrowInputStreamAdapter::Tell()/GetSize() return int64_t to Arrow; casting uint64_t positions/sizes without a bounds check can overflow and return negative values when the stream exceeds INT64_MAX.
    src/paimon/common/data/serializer/binary_row_serializer.cpp:52
  • BinaryRowSerializer::Deserialize should validate read_length before casting to uint64_t for allocation; otherwise a negative length becomes a huge allocation attempt.
Result<BinaryRow> BinaryRowSerializer::Deserialize(DataInputStream* source) const {
    BinaryRow row(num_fields_);
    PAIMON_ASSIGN_OR_RAISE(int32_t read_length, source->ReadValue<int32_t>());
    std::shared_ptr<Bytes> bytes =
        Bytes::AllocateBytes(static_cast<uint64_t>(read_length), pool_.get());
    PAIMON_RETURN_NOT_OK(source->ReadBytes(bytes.get()));
    row.PointTo(MemorySegment::Wrap(bytes), 0, read_length);
    return row;
}

src/paimon/common/io/data_input_stream.cpp:110

  • DataInputStream::AssertBoundary uses pos + need_length > length with uint64_t values; pos + need_length can overflow and incorrectly pass the boundary check. Use a subtraction-based check to avoid overflow.
Status DataInputStream::AssertBoundary(uint64_t need_length) const {
    // TODO(jinli.zjw): Store current_pos and file_length as member variables to reduce the overhead
    // of I/O calls.
    PAIMON_ASSIGN_OR_RAISE(uint64_t pos, input_stream_->GetPos());
    PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length());
    if (pos + need_length > length) {
        return Status::Invalid(
            fmt::format("DataInputStream assert boundary failed: need length {}, current position "
                        "{}, exceed length {}",
                        need_length, pos, length));
    }
    return Status::OK();

Comment on lines 45 to 52
arrow::Result<int64_t> ArrowInputStreamAdapter::Read(int64_t nbytes, void* out) {
ARROW_RETURN_NOT_OK(ValidateArrowIoRange<uint32_t>(nbytes, "nbytes"));
Result<int32_t> read_bytes =
input_stream_->Read(static_cast<char*>(out), static_cast<uint32_t>(nbytes));
Result<uint64_t> read_bytes =
input_stream_->Read(static_cast<char*>(out), static_cast<uint64_t>(nbytes));
if (!read_bytes.ok()) {
return ToArrowStatus(read_bytes.status());
}
return read_bytes.value();
return static_cast<int64_t>(read_bytes.value());
}
Comment on lines 64 to 72
arrow::Result<int64_t> ArrowInputStreamAdapter::ReadAt(int64_t position, int64_t nbytes,
void* out) {
ARROW_RETURN_NOT_OK(ValidateArrowIoRange<uint64_t>(position, "position"));
ARROW_RETURN_NOT_OK(ValidateArrowIoRange<uint32_t>(nbytes, "nbytes"));
Result<int32_t> read_bytes = input_stream_->Read(
static_cast<char*>(out), static_cast<uint32_t>(nbytes), static_cast<uint64_t>(position));
Result<uint64_t> read_bytes = input_stream_->Read(
static_cast<char*>(out), static_cast<uint64_t>(nbytes), static_cast<uint64_t>(position));
if (!read_bytes.ok()) {
return ToArrowStatus(read_bytes.status());
}
return read_bytes.value();
return static_cast<int64_t>(read_bytes.value());
}
Comment on lines 85 to 105
@@ -123,7 +93,7 @@ arrow::Future<std::shared_ptr<arrow::Buffer>> ArrowInputStreamAdapter::ReadAsync
}
std::shared_ptr<arrow::Buffer> buffer = std::move(buffer_result).ValueUnsafe();
input_stream_->ReadAsync(reinterpret_cast<char*>(buffer->mutable_data()),
static_cast<uint32_t>(nbytes), static_cast<uint64_t>(position),
static_cast<uint64_t>(nbytes), static_cast<uint64_t>(position),
[fut, buffer](Status callback_status) mutable {
if (callback_status.ok()) {
fut.MarkFinished(std::move(buffer));
@@ -135,11 +105,11 @@ arrow::Future<std::shared_ptr<arrow::Buffer>> ArrowInputStreamAdapter::ReadAsync
}
Comment on lines 50 to 60
arrow::Status ArrowOutputStreamAdapter::Write(const void* data, int64_t nbytes) {
if (!InRange<uint32_t>(nbytes)) {
return arrow::Status::Invalid(
fmt::format("nbytes value {} is out of bound of uint32_t", nbytes));
}
Result<int32_t> len =
out_->Write(static_cast<const char*>(data), static_cast<uint32_t>(nbytes));
Result<uint64_t> len =
out_->Write(static_cast<const char*>(data), static_cast<uint64_t>(nbytes));
if (!len.ok()) {
return ToArrowStatus(len.status());
}
if (len.value() != static_cast<uint64_t>(nbytes)) {
return arrow::Status::IOError(
fmt::format("expect write len {} mismatch actual write len {}", nbytes, len.value()));
}
return arrow::Status::OK();
Comment on lines 38 to 44
arrow::Result<int64_t> ArrowOutputStreamAdapter::Tell() const {
paimon::Result<int64_t> pos = out_->GetPos();
paimon::Result<uint64_t> pos = out_->GetPos();
if (!pos.ok()) {
return ToArrowStatus(pos.status());
}
return pos.value();
return static_cast<int64_t>(pos.value());
}
Comment on lines +187 to +190
PAIMON_ASSIGN_OR_RAISE(uint64_t pos, out->GetPos());
PAIMON_RETURN_NOT_OK(out->Close());
guard.Release();
return std::make_pair(PathUtil::GetName(file_path), pos);
return std::make_pair(PathUtil::GetName(file_path), static_cast<int64_t>(pos));
Comment on lines +87 to 92
Result<uint64_t> OffsetInputStream::Read(char* buffer, uint64_t size) {
PAIMON_RETURN_NOT_OK(AssertBoundary(static_cast<int64_t>(inner_position_ + size)));
PAIMON_ASSIGN_OR_RAISE(uint64_t actual_read_len, wrapped_->Read(buffer, size));
inner_position_ += actual_read_len;
return actual_read_len;
}
Comment on lines +94 to 107
Result<uint64_t> OffsetInputStream::Read(char* buffer, uint64_t size, uint64_t offset) {
PAIMON_RETURN_NOT_OK(AssertBoundary(static_cast<int64_t>(offset + size)));
return wrapped_->Read(buffer, size, static_cast<uint64_t>(offset_) + offset);
}

void OffsetInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset,
void OffsetInputStream::ReadAsync(char* buffer, uint64_t size, uint64_t offset,
std::function<void(Status)>&& callback) {
auto status = AssertBoundary(offset);
auto status = AssertBoundary(static_cast<int64_t>(offset + size));
if (!status.ok()) {
callback(status);
return;
}
status = AssertBoundary(offset + size);
if (!status.ok()) {
callback(status);
return;
}
wrapped_->ReadAsync(buffer, size, offset_ + offset, std::move(callback));
wrapped_->ReadAsync(buffer, size, static_cast<uint64_t>(offset_) + offset, std::move(callback));
}
Comment on lines 184 to 191
Result<uint64_t> AvroFileBatchReader::GetNumberOfRows() const {
if (!total_rows_) {
PAIMON_ASSIGN_OR_RAISE(int64_t current_pos, input_stream_->GetPos());
ScopeGuard stream_guard([this, current_pos]() -> void {
PAIMON_ASSIGN_OR_RAISE(uint64_t current_pos, input_stream_->GetPos());
auto seek_pos = static_cast<int64_t>(current_pos);
ScopeGuard stream_guard([this, seek_pos]() -> void {
// reset input stream position to original position
Status status = input_stream_->Seek(current_pos, SeekOrigin::FS_SEEK_SET);
Status status = input_stream_->Seek(seek_pos, SeekOrigin::FS_SEEK_SET);
(void)status;
Comment on lines +187 to +190
PAIMON_ASSIGN_OR_RAISE(uint64_t pos, out->GetPos());
PAIMON_RETURN_NOT_OK(out->Close());
guard.Release();
return std::make_pair(PathUtil::GetName(file_path), pos);
return std::make_pair(PathUtil::GetName(file_path), static_cast<int64_t>(pos));
Comment thread src/paimon/common/fs/file_system_test.cpp

BufferedInputStream::BufferedInputStream(const std::shared_ptr<InputStream>& in,
int32_t buffer_size, MemoryPool* pool)
: buffer_size_(buffer_size), in_(in) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider change buffer_size to uint64_t

return wrapped_->Read(buffer, size);
Result<uint64_t> OffsetInputStream::Read(char* buffer, uint64_t size) {
PAIMON_RETURN_NOT_OK(AssertBoundary(static_cast<int64_t>(inner_position_ + size)));
PAIMON_ASSIGN_OR_RAISE(uint64_t actual_read_len, wrapped_->Read(buffer, size));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why AssertBoundary need int64? Is uint64_t also acceptable?

std::shared_ptr<InputStream> wrapped_;
const int64_t length_;
const int64_t offset_;
int64_t inner_position_ = 0;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why offset_ and length_ can be int64 rather than uint64?

Comment thread src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp
Comment thread src/paimon/common/utils/math.h
Comment thread src/paimon/fs/jindo/jindo_file_system.cpp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants