Prevent unexpected states by applying correct initialized of local objects as they remain available when a thread’s resources are re-used in a thread-pool.
[!NOTE] Prerequisite to understand this page: Intro to multiprocessing and multithreading
The TheadPoolExecutor
provides an interface to a thread pool and resources which it reserved from the Operating System (OS). A thread in a thread-pool also known as “workers” can share an instance of a local object without overwriting each other’s values inside of it [Python docs 2023].
Thread-local data is data whose values are thread specific. To manage thread-local data, just create an instance of local (or a subclass) and store attributes on it:
mydata = threading.local()
mydata.x = 1
The instance’s values will be different for separate threads.
The noncompliant01.py
code is simulating the spawning of processes with the different access rights.
The Session(object)
class simulates access control via User(Enum)thread
defining a User
object to be either GUEST = 1
(default) or ADMIN = 2
. The set_user()
method changes access level and set_user_as_guest()
is to reset access.
In SessionPool()
we spawn tasks and access control in the following order:
1) ADMIN
2) GUEST
3) GUEST
Configuring self.num_of_threads = 2
in SessionPool
reduces the sample size to make it easier to spot the unwanted behaviour in the output.
""" Non-compliant Code Example """
from time import sleep
from enum import Enum
from threading import local, current_thread
from concurrent.futures import ThreadPoolExecutor, wait
class User(Enum):
GUEST = 1
ADMIN = 2
class Session(object):
def __init__(self):
self.user = local()
self.set_user_as_guest()
def set_user_as_guest(self):
self.user.value = User.GUEST
def set_user(self, user):
self.user.value = user
def work_thread(self):
""" Perform a task for the user in its own thread """
thread = current_thread()
print(f"{thread.name}: Working concurrently as {self.user.value}")
sleep(1) # To allow for worker threads to be reused
class SessionPool(object):
def __init__(self):
self.num_of_threads = 2
self.session = Session()
self.executor = ThreadPoolExecutor(initializer=self.initializer,
max_workers=self.num_of_threads
)
def initializer(self):
thread = current_thread()
print(f"+++ {thread.name} initializer +++")
self.session.set_user_as_guest()
def work_as_admin(self):
self.session.set_user(User.ADMIN)
self.session.work_thread()
def work_as_guest(self):
"""Uses the default user (GUEST) to perform a task"""
self.session.work_thread()
def execute_task(self, task):
return self.executor.submit(task)
#####################
# exploiting above code example
#####################
sp = SessionPool()
futures = [
sp.execute_task(sp.work_as_admin), # Thread 1, works as ADMIN
sp.execute_task(sp.work_as_guest), # Thread 2, should work as GUEST
sp.execute_task(sp.work_as_guest), # Thread 3, should work as GUEST
]
# To prevent the main thread from stopping before worker threads finish
wait(futures)
for future in futures:
future.result()
Due to the re-use of worker threads and failure to reset state we can end up with random access control on each worker. The initializer method is not called when a worker thread is reused, it is only called on the worker thread’s creation!
Example noncompliant01.py
output:
+++ ThreadPoolExecutor-0_0 initializer +++
+++ ThreadPoolExecutor-0_1 initializer +++
ThreadPoolExecutor-0_1: Working concurrently as User.ADMIN
ThreadPoolExecutor-0_0: Working concurrently as User.GUEST
ThreadPoolExecutor-0_1: Working concurrently as User.ADMIN
The two worker threads have been initialized only once using the initializer method. ThreadPoolExecutor-0_1
has completed the work_as_admin()
task and has been reused to complete one of the work_as_guest()
. Because the local values have been changed by the first task, the changed values persisted to the second task.
Table listing a possible execution order:
Task | Workder Thread | Executed Method | User |
---|---|---|---|
1 | 0 | work_as_admin() |
ADMIN |
2 | 1 | work_as_admin() |
GUEST |
3 | 0 | work_as_admin() |
ADMIN |
[!NOTE] The initializer parameter has been introduced in Python 3.7 [Python_docs_2_2023].
In noncompliant02.py
the self.num_of_threads
is increased by one in an attempt to mitigate the issue:
""" Non-compliant Code Example """
from time import sleep
from enum import Enum
from threading import local, current_thread
from concurrent.futures import ThreadPoolExecutor, wait
class User(Enum):
GUEST = 1
ADMIN = 2
class Session(object):
def __init__(self):
self.user = local()
self.set_user_as_guest()
def set_user_as_guest(self):
self.user.value = User.GUEST
def set_user(self, user):
self.user.value = user
def work_thread(self):
""" Perform a task for the user in its own thread """
thread = current_thread()
print(f"{thread.name}: Working concurrently as {self.user.value}")
sleep(1) # To allow for worker threads to be reused
class SessionPool(object):
def __init__(self):
self.num_of_threads = 3
self.session = Session()
self.executor = ThreadPoolExecutor(initializer=self.initializer,
max_workers=self.num_of_threads
)
def initializer(self):
thread = current_thread()
print(f"+++ {thread.name} initializer +++")
self.session.set_user_as_guest()
def work_as_admin(self):
self.session.set_user(User.ADMIN)
self.session.work_thread()
def work_as_guest(self):
"""Uses the default user (GUEST) to perform a task"""
self.session.work_thread()
def execute_task(self, task):
return self.executor.submit(task)
#####################
# exploiting above code example
#####################
sp = SessionPool()
futures = [
sp.execute_task(sp.work_as_admin), # Thread 1, works as ADMIN
sp.execute_task(sp.work_as_guest), # Thread 2, should work as GUEST
sp.execute_task(sp.work_as_guest), # Thread 3, should work as GUEST
]
# To prevent the main thread from stopping before worker threads finish
wait(futures)
for future in futures:
future.result()
The increased thread pool size circumvents the problem for this specific example. However, the problem has not been resolved since expanding the number of submitted tasks will cause it to reoccur. For example, if we change the fututes
list as follows:
futures = [
sp.execute_task(sp.work_as_admin), # Thread 1, works as ADMIN
sp.execute_task(sp.work_as_guest), # Thread 2, should work as GUEST
sp.execute_task(sp.work_as_admin), # Thread 1, works as ADMIN
sp.execute_task(sp.work_as_guest), # Thread 3, should work as GUEST
sp.execute_task(sp.work_as_guest), # Thread 2, should work as GUEST
sp.execute_task(sp.work_as_guest), # Thread 3, should work as GUEST
]
The worker threads will execute the tasks as the incorrect user again, working as ADMIN in four of the tasks instead of the two we want to use the ADMIN role in:
+++ ThreadPoolExecutor-0_0 initializer +++
ThreadPoolExecutor-0_0: Working concurrently as User.ADMIN
+++ ThreadPoolExecutor-0_1 initializer +++
ThreadPoolExecutor-0_1: Working concurrently as User.GUEST
+++ ThreadPoolExecutor-0_2 initializer +++
ThreadPoolExecutor-0_2: Working concurrently as User.ADMIN
ThreadPoolExecutor-0_0: Working concurrently as User.ADMIN
ThreadPoolExecutor-0_1: Working concurrently as User.GUEST
ThreadPoolExecutor-0_2: Working concurrently as User.ADMIN
The finally
block in SessionPoolwork_as_admin()
restores the initial state of the user value from the Session class by calling set_user_as_guest()
in the compliant01.py
code.
""" Compliant Code Example """
from time import sleep
from enum import Enum
from threading import local, current_thread
from concurrent.futures import ThreadPoolExecutor, wait
class User(Enum):
GUEST = 1
ADMIN = 2
class Session(object):
def __init__(self):
self.user = local()
self.set_user_as_guest()
def set_user_as_guest(self):
self.user.value = User.GUEST
def set_user(self, user):
self.user.value = user
def work_thread(self):
""" Perform a task for the user in its own thread """
thread = current_thread()
print(f"{thread.name}: Working concurrently as {self.user.value}")
sleep(1) # To allow for worker threads to be reused
class SessionPool(object):
def __init__(self):
self.num_of_threads = 2
self.session = Session()
self.executor = ThreadPoolExecutor(initializer=self.initializer,
max_workers=self.num_of_threads
)
def initializer(self):
thread = current_thread()
print(f"+++ {thread.name} initializer +++")
self.session.set_user_as_guest()
def work_as_admin(self):
try:
self.session.set_user(User.ADMIN)
self.session.work_thread()
finally:
self.session.set_user_as_guest()
def work_as_guest(self):
"""Uses the default user (GUEST) to perform a task"""
self.session.work_thread()
def execute_task(self, task):
return self.executor.submit(task)
#####################
# exploiting above code example
#####################
sp = SessionPool()
futures = [
sp.execute_task(sp.work_as_admin), # Thread 1, works as ADMIN
sp.execute_task(sp.work_as_guest), # Thread 2, should work as GUEST
sp.execute_task(sp.work_as_guest), # Thread 3, should work as GUEST
]
# To prevent the main thread from stopping before worker threads finish
wait(futures)
for future in futures:
future.result()
Now, once the worker thread finishes work_as_admin(), it restores the initial value so that it will be used in the next task executed by the same worker thread:
+++ ThreadPoolExecutor-0_0 initializer +++
ThreadPoolExecutor-0_0: Performing thread specific tasks as User.ADMIN
+++ ThreadPoolExecutor-0_1 initializer +++
ThreadPoolExecutor-0_1: Performing thread specific tasks as User.GUEST
ThreadPoolExecutor-0_1: Performing thread specific tasks as User.GUEST
In the compliant02.py
example, the user variable is reinitialized at the beginning of the work_as_guest()
. Like the try-finally
clause, reinitializing the local values before the execution of a proper task will ensure the worker thread contains the desired values.
# SPDX-FileCopyrightText: OpenSSF project contributors
# SPDX-License-Identifier: MIT
""" Compliant Code Example """
from time import sleep
from enum import Enum
from threading import local, current_thread
from concurrent.futures import ThreadPoolExecutor, wait
class User(Enum):
GUEST = 1
ADMIN = 2
class Session(object):
def __init__(self):
self.user = local()
self.set_user_as_guest()
def set_user_as_guest(self):
self.user.value = User.GUEST
def set_user(self, user):
self.user.value = user
def work_thread(self):
""" Perform a task for the user in its own thread """
thread = current_thread()
print(f"{thread.name}: Working concurrently as {self.user.value}")
sleep(1) # To allow for worker threads to be reused
class SessionPool(object):
def __init__(self):
self.num_of_threads = 2
self.session = Session()
self.executor = ThreadPoolExecutor(initializer=self.initializer,
max_workers=self.num_of_threads
)
def initializer(self):
thread = current_thread()
print(f"+++ {thread.name} initializer +++")
self.session.set_user_as_guest()
def work_as_admin(self):
self.session.set_user(User.ADMIN)
self.session.work_thread()
def work_as_guest(self):
"""Uses the default user (GUEST) to perform a task"""
self.session.set_user_as_guest()
self.session.work_thread()
def execute_task(self, task):
return self.executor.submit(task)
#####################
# exploiting above code example
#####################
sp = SessionPool()
futures = [
sp.execute_task(sp.work_as_admin), # Thread 1, works as ADMIN
sp.execute_task(sp.work_as_guest), # Thread 2, should work as GUEST
sp.execute_task(sp.work_as_admin), # Thread 1, works as ADMIN
sp.execute_task(sp.work_as_guest), # Thread 3, should work as GUEST
sp.execute_task(sp.work_as_guest), # Thread 2, should work as GUEST
sp.execute_task(sp.work_as_guest), # Thread 3, should work as GUEST
]
# To prevent the main thread from stopping before worker threads finish
wait(futures)
for future in futures:
future.result()
Python docs 2023 | Thread-Local Data, available from https://docs.python.org/3/library/threading.html#thread-local-data [accessed 8 August 2024] |
Python_docs_2_2023 | ThreadPoolExecutor, available from https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor [accessed 8 August 2024] |