Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 202 additions & 33 deletions Doc/library/concurrent.interpreters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,25 @@ This module defines the following functions:
Initialize a new (idle) Python interpreter
and return a :class:`Interpreter` object for it.

.. function:: create_queue()
.. function:: create_queue(maxsize=0, *, unbounditems=UNBOUND)

Initialize a new cross-interpreter queue and return a :class:`Queue`
object for it.

*maxsize* sets the upper bound on the number of items that can be placed
in the queue. If *maxsize* is less than or equal to zero, the queue
size is infinite.

*unbounditems* sets the default behavior when getting an item from the
queue whose original interpreter has been destroyed.
See :meth:`Queue.put` for supported values.

.. function:: is_shareable(obj)

Return ``True`` if the object can be sent to another interpreter
without using :mod:`pickle`, and ``False`` otherwise.
See :ref:`interp-object-sharing`.


Interpreter objects
^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -261,27 +275,71 @@ Interpreter objects

Finalize and destroy the interpreter.

.. method:: prepare_main(ns=None, **kwargs)
.. method:: prepare_main(ns=None, /, **kwargs)

Bind the given objects into the interpreter's :mod:`!__main__`
module namespace. This is the primary way to pass data to code
running in another interpreter.

*ns* is an optional :class:`dict` mapping names to values.
Any additional keyword arguments are also bound as names.

The values must be shareable between interpreters. Some objects
are actually shared, some are copied efficiently, and most are
copied via :mod:`pickle`. See :ref:`interp-object-sharing`.

Bind objects in the interpreter's :mod:`!__main__` module.
For example::

Some objects are actually shared and some are copied efficiently,
but most are copied via :mod:`pickle`. See :ref:`interp-object-sharing`.
interp = interpreters.create()
interp.prepare_main(name='world')
interp.exec('print(f"Hello, {name}!")')

.. method:: exec(code, /, dedent=True)
This is equivalent to setting variables in the interpreter's
:mod:`!__main__` module before calling :meth:`exec` or :meth:`call`.
The names are available as global variables in the executed code.

.. method:: exec(code, /)

Run the given source code in the interpreter (in the current thread).

*code* is a :class:`str` of Python source code. It is executed as
though it were the body of a script, using the interpreter's
:mod:`!__main__` module as the globals namespace.

There is no return value. To get a result back, use :meth:`call`
instead, or communicate through a :class:`Queue`.

If the code raises an unhandled exception, an :exc:`ExecutionFailed`
exception is raised in the calling interpreter. The actual
exception object is not preserved because objects cannot be shared
between interpreters directly.

This blocks the current thread until the code finishes.

.. method:: call(callable, /, *args, **kwargs)

Return the result of calling running the given function in the
interpreter (in the current thread).
Call *callable* in the interpreter (in the current thread) and
return the result.

Nearly all callables, args, kwargs, and return values are supported.
All "shareable" objects are supported, as are "stateless" functions
(meaning non-closures that do not use any globals). For other
objects, this method falls back to :mod:`pickle`.

If the callable raises an exception, an :exc:`ExecutionFailed`
exception is raised in the calling interpreter.

.. _interp-call-in-thread:

.. method:: call_in_thread(callable, /, *args, **kwargs)

Run the given function in the interpreter (in a new thread).
Start a new :class:`~threading.Thread` that calls *callable* in
the interpreter and return the thread object.

This is a convenience wrapper that combines
:mod:`threading` with :meth:`call`. The thread is started
immediately. Call :meth:`~threading.Thread.join` on the returned
thread to wait for it to finish.

Exceptions
^^^^^^^^^^
Expand Down Expand Up @@ -318,19 +376,86 @@ Communicating Between Interpreters

.. class:: Queue(id)

A wrapper around a low-level, cross-interpreter queue, which
implements the :class:`queue.Queue` interface. The underlying queue
can only be created through :func:`create_queue`.
A cross-interpreter queue that can be used to pass data safely
between interpreters. It provides the same interface as
:class:`queue.Queue`. The underlying queue can only be created
through :func:`create_queue`.

When an object is placed in the queue, it is prepared for use in
another interpreter. Some objects are actually shared and some are
copied efficiently, but most are copied via :mod:`pickle`.
See :ref:`interp-object-sharing`.

Some objects are actually shared and some are copied efficiently,
but most are copied via :mod:`pickle`. See :ref:`interp-object-sharing`.
:class:`Queue` objects themselves are shareable between interpreters
(they reference the same underlying queue), making them suitable for
use with :meth:`Interpreter.prepare_main`.

.. attribute:: id

(read-only)

The queue's ID.

.. attribute:: maxsize

(read-only)

The maximum number of items allowed in the queue. A value of zero
means the queue size is infinite.

.. method:: empty()

Return ``True`` if the queue is empty, ``False`` otherwise.

.. method:: full()

Return ``True`` if the queue is full, ``False`` otherwise.

.. method:: qsize()

Return the number of items in the queue.

.. method:: put(obj, block=True, timeout=None, *, unbounditems=None)

Put *obj* into the queue. If *block* is true (the default),
block if necessary until a free slot is available. If *timeout*
is a positive number, block at most *timeout* seconds and raise
:exc:`QueueFullError` if no free slot is available within that time.

If *block* is false, put *obj* in the queue if a free slot is
immediately available, otherwise raise :exc:`QueueFullError`.

*unbounditems* controls what happens when the item is retrieved
via :meth:`get` after the interpreter that called :meth:`put` has
been destroyed. If ``None`` (the default), the queue's default
(set via :func:`create_queue`) is used. Supported values:

* ``UNBOUND`` -- :meth:`get` returns the ``UNBOUND``
sentinel in place of the original object.
* ``UNBOUND_ERROR`` -- :meth:`get` raises
``ItemInterpreterDestroyed``.
* ``UNBOUND_REMOVE`` -- the item is silently removed from
the queue when the original interpreter is destroyed.

.. method:: put_nowait(obj, *, unbounditems=None)

Equivalent to ``put(obj, block=False)``.

.. method:: get(block=True, timeout=None)

Remove and return an item from the queue. If *block* is true
(the default), block if necessary until an item is available.
If *timeout* is a positive number, block at most *timeout* seconds
and raise :exc:`QueueEmptyError` if no item is available within
that time.

If *block* is false, return an item if one is immediately
available, otherwise raise :exc:`QueueEmptyError`.

.. method:: get_nowait()

Equivalent to ``get(block=False)``.


.. exception:: QueueEmptyError

Expand All @@ -354,31 +479,75 @@ Creating an interpreter and running code in it::

interp = interpreters.create()

# Run in the current OS thread.
# Run source code directly.
interp.exec('print("Hello from a subinterpreter!")')

interp.exec('print("spam!")')
# Call a function and get the result.
def add(x, y):
return x + y

interp.exec("""if True:
print('spam!')
""")
result = interp.call(add, 3, 4)
print(result) # 7

from textwrap import dedent
interp.exec(dedent("""
print('spam!')
"""))
# Run a function in a new thread.
def worker():
print('Running in a thread!')

def run(arg):
return arg
t = interp.call_in_thread(worker)
t.join()

res = interp.call(run, 'spam!')
print(res)
Passing data with :meth:`~Interpreter.prepare_main`::

def run():
print('spam!')
interp = interpreters.create()

interp.call(run)
# Bind variables into the interpreter's __main__ namespace.
interp.prepare_main(greeting='Hello', name='world')
interp.exec('print(f"{greeting}, {name}!")')

# Run in new OS thread.
# Can also use a dict.
config = {'host': 'localhost', 'port': 8080}
interp.prepare_main(config)
interp.exec('print(f"Connecting to {host}:{port}")')

t = interp.call_in_thread(run)
t.join()
Using queues to communicate between interpreters::

interp = interpreters.create()

# Create a queue and share it with the subinterpreter.
queue = interpreters.create_queue()
interp.prepare_main(queue=queue)

# The subinterpreter puts results into the queue.
interp.exec("""
import math
queue.put(math.factorial(10))
""")

# The main interpreter reads from the same queue.
result = queue.get()
print(result) # 3628800

Running CPU-bound work in parallel using threads and interpreters::

import time
from concurrent import interpreters

def compute(n):
total = sum(range(n))
return total

interp1 = interpreters.create()
interp2 = interpreters.create()

# Each interpreter runs in its own thread and does not share
# the GIL, enabling true parallel execution.
t1 = interp1.call_in_thread(compute, 50_000_000)
t2 = interp2.call_in_thread(compute, 50_000_000)
t1.join()
t2.join()

.. tip::

For many use cases, :class:`~concurrent.futures.InterpreterPoolExecutor`
provides a higher-level interface that combines threads with
interpreters automatically.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Improve :mod:`concurrent.interpreters` documentation with clearer
descriptions of :class:`~concurrent.interpreters.Queue` methods,
:meth:`~concurrent.interpreters.Interpreter.prepare_main` usage,
:meth:`~concurrent.interpreters.Interpreter.exec` parameters,
and expanded usage examples. Also fix the ``exec`` signature which
incorrectly listed a ``dedent`` parameter.
Loading