Python contextvars and multithreading

Why can’t they be friends?

Koby Bass
4 min readJul 8, 2021
Photo by paolo candelo on Unsplash

What is it?

contextvars

Sometimes in our code we find that every function relies on a single parameter, like the current user_id or access_token . In single threaded code, this can be solved by a global variable or the Singleton pattern.

For most applications, concurrency is required to support multiple users at the same time, or just make things faster. contextvars solves this by creating consistent variable per thread:

import contextvars
from threading import Thread
current_user = contextvars.ContextVar("Name of current user")def handle_request(user):
current_user.set(user)
print_hello()
def print_hello():
print(f"Hello, {current_user.get()}")
Thread(target=handle_request, args=("koby", )).start()
Thread(target=handle_request, args=("world", )).start()
Running the script, we see context vars are passed between functions.

Notice we no longer need to pass the user down to print_hello. In a real scenario, there may be a call chain of 10+ functions, and only the last function needs the user.

ThreadPoolExecutor

A common way to run your code concurrently is ThreadPoolExecutor . For example, if we want to say hello and at the same time offer our user sympathies for our terrible service:

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def say_hello():
time.sleep(1)
return "Hello, user"
def offer_sympathies():
time.sleep(1)
return "Very sorry, user"
with ThreadPoolExecutor() as exec:
tasks = (say_hello, offer_sympathies)
futures = (exec.submit(task) for task in tasks)
for future in as_completed(futures):
print(future.result())
Using ThreadPoolExecutor we saved our user a second and made the user very confused.

ThreadPoolExecutor and contextlib don’t play nice

Unfortunately, at the time of writing, the executor does not propagate context variables our of the box. So if we want to pass a user down a long chain of functions using contextvars , using an executor will break the chain:

import contextvars
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
current_user = contextvars.ContextVar("Name of current user")def say_hello():
time.sleep(1)
return f"Hello, {current_user.get()}"
def offer_sympathies():
time.sleep(1)
return f"Very sorry, {current_user.get()}"
current_user.set("koby")with ThreadPoolExecutor() as exec:
tasks = (say_hello, offer_sympathies)
futures = (exec.submit(task) for task in tasks)
for future in as_completed(futures):
print(future.result())
Rather than greeting the user, we raise an exception.

Terrible. Our current_user variable is lost inside the thread! Luckily, we can fix this relatively easily, using two executor parameters. From the python docs:

initializer is an optional callable that is called at the start of each worker thread; initargs is a tuple of arguments passed to the initializer.

Using this, all we have to do is provide the original context to the initializer function, and set all the context variables in the new thread. This can be done using a simple function:

def set_context(context):
for var, value in context:
var.set(value)
set_context(contextvars.copy_context())

Using this function, our code will work as expected:

import contextvars
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
current_user = contextvars.ContextVar("ID of current user")def say_hello():
time.sleep(1)
return f"Hello, {current_user.get()}"
def offer_sympathies():
time.sleep(1)
return f"Very sorry, {current_user.get()}"
current_user.set("koby")def set_context(context):
for var, value in context:
var.set(value)
with ThreadPoolExecutor() as exec:
tasks = (say_hello, offer_sympathies)
futures = (exec.submit(task) for task in tasks)
for future in as_completed(futures):
print(future.result())
Current user is properly passed to child threads

Other Options

Instead of explicitly calling copy_context and passing the arguments every time, you could make a subclass to help you:

class ContextExecutor(ThreadPoolExecutor):
def __init__(self):
self.context = contextvars.copy_context()
super().__init__(initializer=self._set_child_context)
def _set_child_context(self):
for var, value in self.context.items():
var.set(value)

Or, if you prefer a functional approach, a wrapper function:

@contextlib.contextmanager
def ContextExecutor():
parent_context = contextvars.copy_context()
with ThreadPoolExecutor(
initializer=set_context,
initargs=(parent_context, )
) as executor:
yield executor

All of these method assume the context is not changed inside the with statement, which can cause race conditions and should be avoided.

Wrapping Up

If you use contextvars, using an executor will break the context between threads. You can either move to asyncio which propagates context out of the box, or make a simple wrapper to pass context variables.

Even if you don’t use contextvars, there’s a good change a library you’re using is passing around arguments behind your back for you convenience using contextlib. Executors will break the library behavior and you won’t be sure why.

For example, elastic APM python client uses contextlib to pass transaction information between functions, and breaks when using executors.

Hope this helped, if you have a better solution or an opensource library feel free to share. When I was researching this it was harder than it should be, so hopefully this saved you time.

--

--

Responses (2)