Skip to content

Commit 5d3fce6

Browse files
committed
Reinit IO buffer locks after fork to prevent deadlocks
BufferedReader/Writer/TextIOWrapper use PyThreadMutex internally. If a parent thread held one of these locks during fork(), the child would deadlock on any IO operation. Add reinit_after_fork() to RawThreadMutex and call it on sys.stdin/ stdout/stderr in the child process fork handler, analogous to CPython's _PyIO_Reinit().
1 parent e9001ec commit 5d3fce6

4 files changed

Lines changed: 88 additions & 0 deletions

File tree

crates/common/src/lock.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,18 @@ pub unsafe fn reinit_rwlock_after_fork<T: ?Sized>(rwlock: &PyRwLock<T>) {
9696
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawRwLock>());
9797
}
9898
}
99+
100+
/// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`.
101+
///
102+
/// `PyThreadMutex` is used by buffered IO objects (`BufferedReader`,
103+
/// `BufferedWriter`, `TextIOWrapper`). If a dead parent thread held one of
104+
/// these locks during `fork()`, the child would deadlock on any IO operation.
105+
///
106+
/// # Safety
107+
///
108+
/// Must only be called from the single-threaded child process immediately
109+
/// after `fork()`, before any other thread is created.
110+
#[cfg(unix)]
111+
pub unsafe fn reinit_thread_mutex_after_fork<T: ?Sized>(mutex: &PyThreadMutex<T>) {
112+
unsafe { mutex.raw().reinit_after_fork() }
113+
}

crates/common/src/lock/thread_mutex.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,23 @@ impl<R: RawMutex, G: GetThreadId> RawThreadMutex<R, G> {
7272
}
7373
}
7474

75+
impl<R: RawMutex, G: GetThreadId> RawThreadMutex<R, G> {
76+
/// Reset this mutex to its initial (unlocked, unowned) state after `fork()`.
77+
///
78+
/// # Safety
79+
///
80+
/// Must only be called from the single-threaded child process immediately
81+
/// after `fork()`, before any other thread is created.
82+
#[cfg(unix)]
83+
pub unsafe fn reinit_after_fork(&self) {
84+
self.owner.store(0, Ordering::Relaxed);
85+
unsafe {
86+
let mutex_ptr = &self.mutex as *const R as *mut u8;
87+
core::ptr::write_bytes(mutex_ptr, 0, core::mem::size_of::<R>());
88+
}
89+
}
90+
}
91+
7592
unsafe impl<R: RawMutex + Send, G: GetThreadId + Send> Send for RawThreadMutex<R, G> {}
7693
unsafe impl<R: RawMutex + Sync, G: GetThreadId + Sync> Sync for RawThreadMutex<R, G> {}
7794

@@ -103,6 +120,11 @@ impl<R: RawMutex, G: GetThreadId, T> From<T> for ThreadMutex<R, G, T> {
103120
}
104121
}
105122
impl<R: RawMutex, G: GetThreadId, T: ?Sized> ThreadMutex<R, G, T> {
123+
/// Access the underlying raw thread mutex.
124+
pub fn raw(&self) -> &RawThreadMutex<R, G> {
125+
&self.raw
126+
}
127+
106128
pub fn lock(&self) -> Option<ThreadMutexGuard<'_, R, G, T>> {
107129
if self.raw.lock() {
108130
Some(ThreadMutexGuard {

crates/vm/src/stdlib/io.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
* I/O core tools.
33
*/
44
pub(crate) use _io::module_def;
5+
#[cfg(all(unix, feature = "threading"))]
6+
pub(crate) use _io::reinit_std_streams_after_fork;
57

68
cfg_if::cfg_if! {
79
if #[cfg(any(not(target_arch = "wasm32"), target_os = "wasi"))] {
@@ -4985,6 +4987,49 @@ mod _io {
49854987
}
49864988
}
49874989

4990+
/// Reinit per-object IO buffer locks on std streams after `fork()`.
4991+
#[cfg(all(unix, feature = "threading"))]
4992+
pub fn reinit_std_streams_after_fork(vm: &VirtualMachine) {
4993+
for name in ["stdin", "stdout", "stderr"] {
4994+
let Ok(stream) = vm.sys_module.get_attr(name, vm) else {
4995+
continue;
4996+
};
4997+
reinit_io_locks(&stream);
4998+
}
4999+
}
5000+
5001+
#[cfg(all(unix, feature = "threading"))]
5002+
fn reinit_io_locks(obj: &PyObject) {
5003+
use crate::common::lock::reinit_thread_mutex_after_fork;
5004+
5005+
if let Some(tio) = obj.downcast_ref::<TextIOWrapper>() {
5006+
unsafe { reinit_thread_mutex_after_fork(&tio.data) };
5007+
if let Some(guard) = tio.data.lock() {
5008+
if let Some(ref data) = *guard {
5009+
reinit_io_locks(&data.buffer);
5010+
}
5011+
}
5012+
return;
5013+
}
5014+
if let Some(br) = obj.downcast_ref::<BufferedReader>() {
5015+
unsafe { reinit_thread_mutex_after_fork(&br.data) };
5016+
return;
5017+
}
5018+
if let Some(bw) = obj.downcast_ref::<BufferedWriter>() {
5019+
unsafe { reinit_thread_mutex_after_fork(&bw.data) };
5020+
return;
5021+
}
5022+
if let Some(brw) = obj.downcast_ref::<BufferedRandom>() {
5023+
unsafe { reinit_thread_mutex_after_fork(&brw.data) };
5024+
return;
5025+
}
5026+
if let Some(brwp) = obj.downcast_ref::<BufferedRWPair>() {
5027+
unsafe { reinit_thread_mutex_after_fork(&brwp.read.data) };
5028+
unsafe { reinit_thread_mutex_after_fork(&brwp.write.data) };
5029+
return;
5030+
}
5031+
}
5032+
49885033
pub fn io_open(
49895034
file: PyObjectRef,
49905035
mode: Option<&str>,

crates/vm/src/stdlib/posix.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,12 @@ pub mod module {
719719
#[cfg(feature = "threading")]
720720
reinit_locks_after_fork(vm);
721721

722+
// Reinit per-object IO buffer locks on std streams.
723+
// BufferedReader/Writer/TextIOWrapper use PyThreadMutex which can be
724+
// held by dead parent threads, causing deadlocks on any IO in the child.
725+
#[cfg(feature = "threading")]
726+
crate::stdlib::io::reinit_std_streams_after_fork(vm);
727+
722728
// Phase 2: Reset low-level atomic state (no locks needed).
723729
crate::signal::clear_after_fork();
724730
crate::stdlib::signal::_signal::clear_wakeup_fd_after_fork();

0 commit comments

Comments
 (0)