Here's a minimal reproducible example of my FastAPI app. I have a strange behavior and I'm not sure I understand the reason.
I'm using ApacheBench (ab) to send multiple requests as follows:
ab -n 1000 -c 50 -H 'accept: application/json' -H 'x-data-origin: source' 'http://localhost:8001/test/async'
FastAPI app
import time
import asyncio
import enum
from typing import Any
from fastapi import FastAPI, Path, Body
from starlette.concurrency import run_in_threadpool
app = FastAPI()
loop = asyncio.get_running_loop()
def sync_func() -> None:
time.sleep(3)
print("sync func")
async def sync_async_with_fastapi_thread() -> None:
await run_in_threadpool( time.sleep, 3)
print("sync async with fastapi thread")
async def sync_async_func() -> None:
await loop.run_in_executor(None, time.sleep, 3)
async def async_func() -> Any:
await asyncio.sleep(3)
print("async func")
@app.get("/test/sync")
def test_sync() -> None:
sync_func()
print("sync")
@app.get("/test/async")
async def test_async() -> None:
await async_func()
print("async")
@app.get("/test/sync_async")
async def test_sync_async() -> None:
await sync_async_func()
print("sync async")
@app.get("/test/sync_async_fastapi")
async def test_sync_async_with_fastapi_thread() -> None:
await sync_async_with_fastapi_thread()
print("sync async with fastapi thread")
Here's the ApacheBench results:
async with (asyncio.sleep) : *Concurrency Level: 50
- Time taken for tests: 63.528 seconds
- Complete requests: 1000
- Failed requests: 0
- Total transferred: 128000 bytes
- HTML transferred: 4000 bytes
- Requests per second: 15.74 [#/sec] (mean)
- Time per request: 3176.407 [ms] (mean)
- Time per request: 63.528 [ms] (mean, across all concurrent requests) Transfer rate: 1.97 [Kbytes/sec] received*
sync (with time.sleep): Concurrency Level: 50
- *Time taken for tests: 78.615 seconds
- Complete requests: 1000
- Failed requests: 0
- Total transferred: 128000 bytes
- HTML transferred: 4000 bytes
- Requests per second: 12.72 [#/sec] (mean)
- Time per request: 3930.751 [ms] (mean)
- Time per request: 78.615 [ms] (mean, across all concurrent requests) Transfer rate: 1.59 [Kbytes/sec] received*
sync_async (time sleep with run_in_executor) : *Concurrency Level: 50
- Time taken for tests: 256.201 seconds
- Complete requests: 1000
- Failed requests: 0
- Total transferred: 128000 bytes
- HTML transferred: 4000 bytes
- Requests per second: 3.90 [#/sec] (mean)
- Time per request: 12810.038 [ms] (mean)
- Time per request: 256.201 [ms] (mean, across all concurrent requests) Transfer rate: 0.49 [Kbytes/sec] received*
sync_async_fastapi (time sleep with run_in threadpool): *Concurrency Level: 50
- Time taken for tests: 78.877 seconds
- Complete requests: 1000
- Failed requests: 0
- Total transferred: 128000 bytes
- HTML transferred: 4000 bytes
- Requests per second: 12.68 [#/sec] (mean)
- Time per request: 3943.841 [ms] (mean)
- Time per request: 78.877 [ms] (mean, across all concurrent requests) Transfer rate: 1.58 [Kbytes/sec] received*
In conclusion, I'm experiencing a surprising disparity in results, especially when using run_in_executor, where I'm encountering significantly higher average times (12 seconds). I don't understand this outcome.
--- EDIT --- After AKX answer.
Here the code working as expected:
import time
import asyncio
from anyio import to_thread
to_thread.current_default_thread_limiter().total_tokens = 200
loop = asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=100)
def sync_func() -> None:
time.sleep(3)
print("sync func")
async def sync_async_with_fastapi_thread() -> None:
await run_in_threadpool( time.sleep, 3)
print("sync async with fastapi thread")
async def sync_async_func() -> None:
await loop.run_in_executor(executor, time.sleep, 3)
async def async_func() -> Any:
await asyncio.sleep(3)
print("async func")
@app.get("/test/sync")
def test_sync() -> None:
sync_func()
print("sync")
@app.get("/test/async")
async def test_async() -> None:
await async_func()
print("async")
@app.get("/test/sync_async")
async def test_sync_async() -> None:
await sync_async_func()
print("sync async")
@app.get("/test/sync_async_fastapi")
async def test_sync_async_with_fastapi_thread() -> None:
await sync_async_with_fastapi_thread()
print("sync async with fastapi thread")
Using
run_in_threadpool()FastAPI is fully compatible with (and based on) Starlette, and hence, with FastAPI you get all of Starlette's features, such as the
run_in_threadpool()method. Starlette'srun_in_threadpool(), which usesanyio.to_thread.run_sync()behind the scenes, "will run the sync blocking function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked"—see this answer and AnyIO's Working with threads documentation for more details. Callingrun_in_threadpool()—which internally callsanyio.to_thread.run_sync(), and subsequently,AsyncIOBackend.run_sync_in_worker_thread()—will return a coroutine that will then beawaited to get the eventual result of the sync function (e.g.,result = await run_in_threadpool(...)), and hence, FastAPI will still work asynchronously (instead of calling that synchronous function directly, which would block the event loop that runs in the main thread, and hence, the main thread would get blocked as well). As can be seen in Starlette's source code (link is given above), therun_in_threadpool()function simply looks like this (supporting both sequence and keyword arguments):As described in AnyIO's documentation:
Since FastAPI uses Startlette's
concurrencymodule to run blocking functions in an external threadpool (the same threadpool is also used by FastAPI to run endpoints defined with normaldefinstead ofasync def, as described in this answer), the default value of the thread limiter, as shown above, is applied here as well, i.e.,40threads maximum—see the relevantAsyncIOBackend.current_default_thread_limiter()method that returns theCapacityLimiterwith the default number of threads. Hence, sending50requests simultaneously, as in your case, would lead to threadpool starvation, meaning that there wouldn't be enough threads available in the threadpool to handle all incoming requests concurrently.As described earlier, one can adjust that value, thus increasing the number of threads, which might lead to an improvement in performance results—always depending on the number of requests to
defendpoints (orasync defendpoints that create calls torun_in_threadpool()inside) your API is expected to serve concurrently. For instance, if you expect the API to serve no more than 50 requests at a time to such endpoints, then set the maximum number of threads to 50. Note: If your FastAPI application also uses synchronous/blocking background tasks and/orStreamingResponse's generators (i.e., functions defined with normaldefinstead ofasync def) and/orUploadFile'sasyncmethods, such asread/write/etc. (all these methods call the corresponding synchronousdeffile methods underneath, usingrun_in_threadpool()), you could then increase the number of threads as required, as FastAPI actually runs all those functions in the same external threadpool as well—it is all explained in this answer in details.Note that using the approach below, which was described here, would have the same effect on adjusting the number of worker threads:
But, it would be best to follow the approach provided by AnyIO's official documentation (as shown earlier). It is also a good idea to have this done when the application starts up, using a
lifespanevent handler, as demonstrated here.In the working example below, since the
/syncendpoint is defined with normaldefinstead ofasync def, FastAPI will run it in a separate thread from the external threadpool andawaitit, thus ensuring the event loop (and hence, the main thread and the entire server) does not get blocked due to the blocking operations (either blocking IO-bound or CPU-bound) that will be performed inside that endpoint.Working Example 1
Using ApacheBench, you could test the example above as follows, which will send
1000requests in total with50being sent simultaneously at a time (-n: Number of requests,-c: Number of concurrent requests):While running a performance test on the example above, if you call the
/get_available_threadsendpoint from your browser, e.g.,http://localhost:8000/get_available_threads, you would see that the amount of threads available is always 10 or above (since only 50 threads are used at a time in this test, but the thread limiter was set to60), meaning that setting the maximum number of threads on AnyIO's thread limiter to a number that is well above your needs, like200as shown in some other answer and in your recent example, wouldn't bring about any improvements in the performance; on the contrary, you would end up with a number of threads "sitting" there without being used. As explained earlier, the number of maximum threads should depend on (1) the number of requests your API is expected to serve concurrently, (2) any additional blocking tasks/functions that would run in the threadpool by FastAPI itself under the hood, as well as (3) the server machine's resources available.The example below is the same as the one above, but instead of letting FastAPI itself to handle the blocking operation(s) inside the
defendpoint (by running thedefendpoint in the external threadpool andawaiting it), the endpoint is now defined withasync def(meaning that FastAPI will run it directly in the event loop), and inside the endpoint,run_in_threadpool()(which returns anawaitable) is used to run the blocking operation (i.e.,time.sleep()in the example). Performing a benchmark test on the example below would yield similar results to the previous example.Working Example 2
Using ApacheBench, you could test the example above as follows:
Using
loop.run_in_executor()withThreadPoolExecutorWhen using
asyncio'sloop.run_in_executor()—after obtaining the running event loop usingasyncio.get_running_loop()—one could passNoneto theexecutorargument, which would lead to the default executor being used; that is, aThreadPoolExecutor. Note that when callingloop.run_in_executor()and passingNoneto theexecutorargument, this does not create a new instance of aThreadPoolExecutorevery time you do that; instead, aThreadPoolExecutoris only initialised once the first time you do that, but for subsequent calls toloop.run_in_executor()with passingNoneto theexecutorargument, Python reuses that very same instance ofThreadPoolExecutor(hence, the default executor). This can been seen in the source code ofloop.run_in_executor(). That means, the number of threads that can be created, when callingawait loop.run_in_executor(None, ...), is limited to the default number of thread workers in theThreadPoolExecutorclass.As described in the documentation of
ThreadPoolExecutor—and as shown in its implementation here—by default, themax_workersargument is set toNone, in which case, the number of worker threads is set based on the following equation:min(32, os.cpu_count() + 4). Theos.cpu_count()function reutrns the number of logical CPUs in the current system. As explained in this article, physical cores refers to the number of CPU cores provided in the hardware (e.g., the chips), while logical cores is the number of CPU cores after hyperthreading is taken into account. If, for instance, your machine has 4 physical cores, each with hyperthreading (most modern CPUs have this), then Python will see 8 CPUs and will allocate 12 threads (8 CPUs + 4) to the pool by default (Python limits the number of threads to 32 to "avoid consuming surprisingly large resources on multi-core machines"; however, one could always adjust themax_workersargument on their own when using a customThreadPoolExecutor, instead of using the default one). You could check the default number of worker threads on your system as follows:Now, as shown in your original example, you are not using a custom
ThreadPoolExecutor, but instead using the defaultThreadPoolExecutorevery time a request arrives, by callingawait loop.run_in_executor(None, time.sleep, 3)(inside thesync_async_func()function, which is triggered by the/test/sync_asyncendpoint). Assuming your machine has 4 physical cores with hyperthreading enabled (as explained in the example earlier), then the default number of worker threads for the defaultThreadPoolExecutorwould be 12. That means, based on your original example and the/test/sync_asyncendpoint that triggers theawait loop.run_in_executor(None, time.sleep, 3)function, your application could only handle 12 concurrent requests at a time. That is the main reason for the difference observed in the performance results when compared to usingrun_in_threadpool(), which comes with40allocated threads by default. Even though, in both cases, a threadpool starvation was caused when sending50requests simultaneously, the endpoint (in your example) that usesrun_in_threadpool()performed better only because the default number of threads created was greater than the one used by the defaultThreadPoolExecutor(in your other endpoint).One way to solve this is to create a new instance of
ThreadPoolExecutor(on your own, instead of using the default executor) every time a request arrives and have it terminated once the task is completed (using thewithstatement), as shown below:Although the above should wok just fine, it would be best to instantiate a
ThreadPoolExecutoronce at application startup, adjust the number of worker threads as needed, and reuse the executor when required. Having said that, depending on the blocking task and/or external libraries you might be using for that task, if you ever encounter a memory leak—i.e., memory that is no longer needed, but is not released—after tasks are completed when reusing aThreadPoolExecutor, you might find creating a new instance ofThreadPoolExecutoreach time, as shown above, more suitable. Note, however, that if this was aProcessPoolExecutorinstead, creating and destroying many processes over and over could become computationally expensive. Creating and destroying too many threads could consume huge memory as well.Below is a complete working example, demonstrating how to create a reusable custom
ThreadPoolExecutor. Calling the/get_active_threadsendpoint from your browser, e.g.,http://localhost:8000/get_active_threads, while running a performance test with ApacheBench (using50concurrent requests, as described in your question and as shown below), you would see that the number of active threads never goes above51(50 concurrent threads + 1, which is the main thread), despite setting themax_workersargument to60in the example below. This is simply because, in this performance test, the application is never required to serve more than50requests at the same time. Also,ThreadPoolExecutorwon't spawn new threads, if idle threads are available (thus, saving resources)—see the relevant implementation part. Hence, again, initialising theThreadPoolExecutorwithmax_workers=100, as shown in your recent update, would be unecessary, if you never expect your FastAPI application to serve more than 50 requests at a time (to endpoints where thatThreadPoolExecutoris used).Working Example
Using ApacheBench, you could test the example above as follows:
Final Notes
In general, you should always aim to use asynchronous code (i.e., using
async/await), wherever is possible, asasynccode, also known as coroutines, runs directly in the event loop—the event loop runs in the main thread and executes all tasks in that thread. That means there is only one thread that can take a lock on the interpreter; thus, avoiding the additional overhead of context switching (i.e., the CPU jumping from one thread of execution to another). When dealing with sync blocking IO-bound tasks though, you could either (1) define your endpoint withdefand let FastAPI handle it behind the scenes as described earlier, as well as in this answer, or (2) define your endpoint withasync defand userun_in_threadpool()on your own to run that blocking task in a separate thread andawaitit, or (3) define your endpoint withasync defand useasyncio'sloop.run_in_executor()with a custom (preferably reusable)ThreadPoolExecutor, adjusting the number of worker threads as required. When required to perform blocking CPU-bound tasks, while running such tasks in a separate from an external threadpool andawaiting them would successfully prevent the event loop from getting blocked, it wouldn't, however, provide the performance improvement you would expect from running code in parallel. Thus, for CPU-bound tasks, one may choose to use aProcessPoolExecutorinstead (Note: when using processes in general, you need to explicitly protect the entry point withif __name__ == '__main__')—example on using aProcessPoolExecutorcan be found in this answer.To run tasks in the background, without waiting for them to complete in order to proceed with executing the rest of the code in an endpoint, you could use FastAPI's
BackgroundTasks, as shown here and here. If the background task function is defined withasync def, FastAPI will run it directly in the event loop, whereas if it is defined with normaldef, FastAPI will userun_in_threadpool()andawaitthe returned coroutine (same concept as API endpoints). Another option when you need to run anasync deffunction in the background, but not necessarily having it trigerred after returning a FastAPI response (which is the case inBackgroundTasks), is to useasyncio.create_task(), as shown in this answer and this answer. If you need to perform heavy background computation and you don't necessarily need it to be run by the same process, you may benefit from using other bigger tools such as Celery.Finally, regarding the optimal/maximum number of worker threads, I would suggest reading this article (have a look at this article as well for more details on
ThreadPoolExecutorin general). As explained in the article:Also, in the same article: