Context managers in Python

Context managers in Python

Almost a decade ago, I gave a brief introduction to context managers

(item 2 here

) and thought that I would start using such managers more actively. But I remembered them only recently, against the background of how much I had to reconfigure and clean the tested code during parallelism experiments (the code turned out to be both unpleasant and ugly).

See the PEP 343 specification: it describes the essence of context managers as follows: “to allow the standard use cases of try/finally statements to be separated into separate blocks”. I’ve always felt that finally gravitates towards exception handling. But it’s not error handling so much as cleanup. Of course, you need to be able to clean up if an exception is thrown, but you also need to be able to clean up even though you’re leaving scope. I think here we relied too much on function calls as the main unit of work, which distracted us from scope as a general concept. This topic is especially interesting compared to lifetimes in Rust.

Currently, I believe that the easiest and most beautiful way to write a context manager is to use the @contextmanager function decorator by writing the function as a generator. But a generator is a function with a yield, and what does it result in? Earlier I gave an example in which the result (yield) was the directory passed to the function. But that doesn’t actually make sense because (A) I’m never going to use it and (B) I already know this information. Therefore, I think that it would be good to do this:

@contextmanager
def visitDir(d):
    old = os.getcwd()
    os.chdir(d)
    yield # No 'd'
    os.chdir(old)

Thus, we will not yield anything. This code passes control back to the context creator so that the functions can work normally, and when the scope is closed, control is returned to the context manager and it executes os.chdir(old).

But still, what will we yield? This is where the as keyword (added to control context along with with) will come in handy. If you have a cm context manager and you create a context like this:

with cm() as x:
    ...

then the result of the context manager is x and is available throughout the remaining scope.

Any object can appear as such a result. In my concurrency examples, I had to pass information to and from the scope, so I created a new type in scenario_tester.py that I called Scenario:

from contextlib import contextmanager
from dataclasses import dataclass, field
import time
import os
from pprint import pformat

@dataclass
class Scenario:
    multiplier: int
    tasks: int
    args1: range = field(init=False)
    args2: list[int] = field(init=False)
    results: list[float] = field(default_factory=list)

    def __post_init__(self):
        self.args1 = range(self.tasks)
        self.args2 = [self.multiplier] * self.tasks

@contextmanager
def scenario():
    multiplier = 1  # Increase for longer computations
    logical_processors = os.cpu_count()
    print(f"{logical_processors = }")
    tasks = (logical_processors - 0) * 1  # Try different numbers
    print(f"{tasks = }")
    start = time.monotonic()
    scenario = Scenario(multiplier, tasks)
    try:
        yield scenario
    finally:
        elapsed = time.monotonic() - start
        print(
            f"""{pformat(list(scenario.results))}
              Elapsed time: {elapsed:.2f}s"""
        )

The Scenario creates and provides args1 and args2 and picks up the results from the test. I’m using dataclass here because I now consider it the default option. More about this is told in my

presentations from the 2022 Pycon conference

. Next, __post_init__() creates args1 and args2, which are intentionally uninitialized by the constructor generated via the dataclass.

The context manager’s scenario() function sets everything up, then creates a Scenario object and gets the result from it, which is used inside the context. When the context scope expires, the Scenario object remains available, so scenario.results can be retrieved and displayed. Note that the start time is not included in the script because it is not needed inside the context, but it can still be used in finally because it is in the scope of the context manager function.

The context manager allows me to get rid of all the extra code I wrote for each of the tests. For example, this is what the situation looks like with_processes.py:

from concurrent.futures import ProcessPoolExecutor
from scenario_tester import scenario
from cpu_intensive import cpu_intensive

if __name__ == "__main__":
    with scenario() as scenario:
        with ProcessPoolExecutor() as executor:
            scenario.results = executor.map(
                cpu_intensive, scenario.args1, scenario.args2
            )

It looks almost identical

with_threads.py

:

from concurrent.futures import ThreadPoolExecutor
from scenario_tester import scenario
from cpu_intensive import cpu_intensive

if __name__ == "__main__":
    with scenario() as scenario:
        with ThreadPoolExecutor() as executor:
            scenario.results = executor.map(
                cpu_intensive, scenario.args1, scenario.args2
            )

Other examples are also similar.

Having marked the duplicated code, we can go a step further and pass the executor type to the function as a parameter, as is done in function_tester.py:

from concurrent.futures import ProcessPoolExecutor
from scenario_tester import scenario
from cpu_intensive import cpu_intensive

def test_cpu_intensive(ExecutorClass):
    with scenario() as s:
        with ExecutorClass() as executor:
            s.results = executor.map(cpu_intensive, s.args1, s.args2)

if __name__ == "__main__":
    test_cpu_intensive(ProcessPoolExecutor)

I didn’t do that because this code wouldn’t work with

no_concurrency.py

but not sure that’s a good argument. Now I’m going to continue these

experiments with parallelism

and maybe change something.

Related posts