Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6a5fc24
Initial commits
YvesDup Sep 11, 2025
7a98c3a
📜🤖 Added by blurb_it.
blurb-it[bot] Sep 12, 2025
297a460
Fix nits
YvesDup Sep 12, 2025
603264c
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Sep 12, 2025
9b226c0
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Sep 18, 2025
4ff001b
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Oct 29, 2025
bab07dd
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Nov 11, 2025
243068f
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Nov 25, 2025
a2d25a4
Merge branch 'python:main' into queue-shutdown-multiprocessing
YvesDup Feb 1, 2026
148ffaf
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Feb 12, 2026
bc57173
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Feb 17, 2026
247f7a9
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Apr 5, 2026
293c2e3
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Apr 21, 2026
9b7d141
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup May 29, 2026
93fe64b
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Jun 4, 2026
5981aaf
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Jun 17, 2026
7c6f69b
Add ``Queue.ShutDown`` documentation.
YvesDup Jun 19, 2026
640798f
Remove inappropiate assertion
YvesDup Jun 21, 2026
537fd85
Merge branch 'main' into queue-shutdown-multiprocessing
YvesDup Jun 21, 2026
3d47640
Refactor two tests
YvesDup Jun 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -932,8 +932,7 @@ For an example of the usage of queues for interprocess communication see
standard library's :mod:`queue` module are raised to signal timeouts.

:class:`Queue` implements all the methods of :class:`queue.Queue` except for
:meth:`~queue.Queue.task_done`, :meth:`~queue.Queue.join`, and
:meth:`~queue.Queue.shutdown`.
:meth:`~queue.Queue.task_done`, :meth:`~queue.Queue.join`.

.. method:: qsize()

Expand Down Expand Up @@ -970,6 +969,10 @@ For an example of the usage of queues for interprocess communication see
If the queue is closed, :exc:`ValueError` is raised instead of
:exc:`AssertionError`.

Can raise the :exc:`queue.ShutDown` when queue shuts down.

.. versionadded:: 3.16

.. method:: put_nowait(obj)

Equivalent to ``put(obj, False)``.
Expand All @@ -988,10 +991,25 @@ For an example of the usage of queues for interprocess communication see
If the queue is closed, :exc:`ValueError` is raised instead of
:exc:`OSError`.

Can raise the :exc:`queue.ShutDown` when queue shuts down and is empty.

.. versionadded:: 3.16

.. method:: get_nowait()

Equivalent to ``get(False)``.

.. method:: shutdown([immediate])

Shuts down the queue. If optional args *immediate* is ``False``
(the default), all remaining items could be retrieve until queue is empty.
If optional args *immediate* is ``True``, all remaining items from the
queue are erased. When queue is empty, get an item raises
a :exc:`queue.ShutDown` exception. Adding a new item into the queue
also raises this same exception.

.. versionadded:: 3.16

:class:`multiprocessing.Queue` has a few additional methods not found in
:class:`queue.Queue`. These methods are usually unnecessary for most
code:
Expand Down
224 changes: 192 additions & 32 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import types
import weakref
import errno
from contextlib import contextmanager

from queue import Empty, Full
from queue import Empty, Full, ShutDown

from . import connection
from . import context
Expand All @@ -45,21 +46,37 @@ def __init__(self, maxsize=0, *, ctx):
else:
self._wlock = ctx.Lock()
self._sem = ctx.BoundedSemaphore(maxsize)

self._lock_shutdown = ctx.Lock()
# Cannot use a ctx.Value because 'ctypes' library is
# not always available on all Linux platforms.
# Use of Semaphores instead of an array from `heap.BufferWrapper'
# is here more efficient and explicit.
self._sem_flag_shutdown = ctx.Semaphore(0)
self._sem_flag_shutdown_immediate = ctx.Semaphore(0)
Comment thread
YvesDup marked this conversation as resolved.
self._sem_pending_getters = ctx.Semaphore(0)
self._sem_pending_putters = ctx.Semaphore(0)

# For use by concurrent.futures
self._ignore_epipe = False
self._reset()

if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)

def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
self._rlock, self._wlock, self._sem, self._opid,
self._lock_shutdown,
self._sem_flag_shutdown, self._sem_flag_shutdown_immediate,
self._sem_pending_getters, self._sem_pending_putters)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._rlock, self._wlock, self._sem, self._opid,
self._lock_shutdown,
self._sem_flag_shutdown, self._sem_flag_shutdown_immediate,
self._sem_pending_getters, self._sem_pending_putters) = state
self._reset()

def _after_fork(self):
Expand All @@ -81,43 +98,101 @@ def _reset(self, after_fork=False):
self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll

def _is_shutdown(self):
return not self._sem_flag_shutdown.locked()

def _set_shutdown(self, immediate=False):
self._sem_flag_shutdown.release()
if immediate:
self._sem_flag_shutdown_immediate.release()

@contextmanager
def _handle_pending_processes(self, sem):
# Count pending getter or putter processes in a dedicated
# semaphore. These 2 semaphores are only used when queue
# shuts down to release one by one all pending processes.
sem.release()
try:
# Wraps potentialy blocking calls:
# sem._sem.acquire() in put method,
# _recv_bytes()/_poll(*args) in get method.
yield
finally:
sem.acquire()

def _release_pending_putters(self):
with self._lock_shutdown:
if not self._sem_pending_putters.locked():
self._sem.release()

def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._sem.acquire(block, timeout):
raise Full

if self._is_shutdown():
raise ShutDown
try:
with self._handle_pending_processes(self._sem_pending_putters):
if not self._sem.acquire(block, timeout):
raise Full
finally:
if self._is_shutdown():
self._release_pending_putters()
raise ShutDown

with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

def _release_pending_getters(self):
with self._lock_shutdown:
if not self._sem_pending_getters.locked():
self._put_sentinel()

def get(self, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):

if (empty := self.empty()) and self._is_shutdown():
raise ShutDown
try:
with self._handle_pending_processes(self._sem_pending_getters):
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
Comment thread
gpshead marked this conversation as resolved.
finally:
if self._is_shutdown() and empty:
self._release_pending_getters()
raise ShutDown

item = _ForkingPickler.loads(res)
if self._is_shutdown() \
and isinstance(item, _ShutdownSentinel):
# A pending getter process is just unblocked,
# Unblock a next one if exists.
self._release_pending_getters()
raise ShutDown

return item

def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
Expand All @@ -135,6 +210,57 @@ def get_nowait(self):
def put_nowait(self, obj):
return self.put(obj, False)

def _clear(self):
with self._rlock:
while self._poll():
self._recv_bytes()

def _put_sentinel(self):
# When put a sentinel into an empty queue,
# dont forget to call to _sem.acquire in order to
# maintain a correct count of acquire/release
# calls for BoudedSempaphore.
self._sem.acquire()

with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(_sentinel_shutdown)
self._notempty.notify()

def shutdown(self, immediate=False):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")

with self._lock_shutdown:
if self._is_shutdown():
raise RuntimeError(f"Queue {self!r} already shut down")

is_pending_getters = not self._sem_pending_getters.locked()
is_pending_putters = not self._sem_pending_putters.locked()
str_shutdown = f"shutdown -> immediate:{immediate}"
str_shutdown += f"/PGetters:{is_pending_getters}" \
f"/PPutters:{is_pending_putters}" \
f"/Empty:{self.empty()}" \
f"/Full:{self.full()}"
debug(str_shutdown)
self._set_shutdown(immediate)

# Shut down is immediatly and there is no pending getter,
# we purge the queue (pipe). If there are datas into the buffer
# the 'feeder' thread should erase all of them.
if immediate and not is_pending_getters:
self._clear()

# Starting release one pending getter process.
# Put a first shutdown sentinel data into the pipe.
if self.empty() and is_pending_getters:
self._put_sentinel()

# Starting release one pending putter processes.
if is_pending_putters:
self._sem.release()

def close(self):
self._closed = True
close = self._close
Expand Down Expand Up @@ -180,7 +306,7 @@ def _start_thread(self):
args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
self._sem, self._sem_flag_shutdown_immediate),
name='QueueFeederThread',
daemon=True,
)
Expand Down Expand Up @@ -228,7 +354,8 @@ def _finalize_close(buffer, notempty):

@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, reader_close,
writer_close, ignore_epipe, onerror, queue_sem):
writer_close, ignore_epipe, onerror, queue_sem,
flag_shutdown_immediate):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
Expand All @@ -240,7 +367,7 @@ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
wrelease = writelock.release
else:
wacquire = None

is_shutdown_immediate = lambda: not flag_shutdown_immediate.locked()
while 1:
try:
nacquire()
Expand All @@ -258,6 +385,14 @@ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
writer_close()
return

# When queue shuts down immediatly, do not insert
# regular data in pipe, only shutdown sentinel.
if is_shutdown_immediate() \
and not isinstance(obj, _ShutdownSentinel):
debug("Queue shuts down immediatly, " \
"don't feed regular data to pipe")
continue

# serialize the data before acquiring the lock
obj = _ForkingPickler.dumps(obj)
if wacquire is None:
Expand Down Expand Up @@ -301,6 +436,12 @@ def _on_queue_feeder_error(e, obj):
__class_getitem__ = classmethod(types.GenericAlias)


# Sentinel item used to release pending getter processes
# when queue shuts down.
class _ShutdownSentinel: pass
_sentinel_shutdown = _ShutdownSentinel()


_sentinel = object()

#
Expand Down Expand Up @@ -328,8 +469,16 @@ def __setstate__(self, state):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._sem.acquire(block, timeout):
raise Full
if self._is_shutdown():
raise ShutDown
try:
with self._handle_pending_processes(self._sem_pending_putters):
if not self._sem.acquire(block, timeout):
raise Full
finally:
if self._is_shutdown():
self._release_pending_putters()
raise ShutDown

with self._notempty, self._cond:
if self._thread is None:
Expand All @@ -350,6 +499,17 @@ def join(self):
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()

def _clear(self):
super()._clear()

# Data could be in the buffer, not in the pipe.
# Call acquire method of '_unfinished_tasks' Semaphore
# until counter is zero.
with self._cond:
while not self._unfinished_tasks.locked():
self._unfinished_tasks.acquire(block=False)
self._cond.notify_all()

#
# Simplified Queue type -- really just a locked pipe
#
Expand Down
Loading
Loading