E2EHIRING Logo
search

Search blogs by title

Jobs
Jobs
internships
Internships
Company
Assessment
mentorship
Mentorship
more
Moredropdown
Login
HomeSepratorIconBlogsSepratorIconCelery with fastapi - for better performanceSepratorIcon

Celery with fastapi - for better performance

Han SoloAnkita Gupta
calendar29 Nov 2022
poster

Problem statement

We came across memory leak and low performance issues due to multi threading. Increasing worker thread count for parallel processing led to GIT issue and we knew there has to other ways to easy asynchronous parallel processing. Now we know that multithreading by queuing is not a recommended practice and alternatives are celery, asyncio etc. This blog is about celery with fastapi.

Objectives

  1. Set up Celery with FastAPI
  2. Execute Celery tasks in the Python shell
  3. Monitor a Celery app with Flower

Setting up Redis

Set up and run Redis directly from your operating system or from a Docker container. Alternate to reddis is rabbitmq.

With Docker

Start by installing Docker if you haven't already done so. Then, open your terminal and run the following command:

$ docker run -p 6379:6379 --name some-redis -d redis

This downloads the official Redis Docker image from Docker Hub and runs it on port 6379 in the background.

To test if Redis is up and running, run:

$ docker exec -it some-redis redis-cli ping

You should see:

PONG

Without Docker

Either download Redis from source or via a package manager (like APT, YUM, Homebrew, or Chocolatey) and then start the Redis server via:

$ redis-server

To test if Redis is up and running, run:

$ redis-cli ping

You should see:

PONG

Next, we'll look at how to set up Celery in a FastAPI project.

Setting up Celery

Celery with FastAPI Diagram

Create a FastAPI project

Create a new project directory:

$ mkdir fastapi-celery-project && cd fastapi-celery-project

Then, create and activate a new Python virtual environment:

$ python3.10 -m venv venv
$ source venv/bin/activate
(venv)$

Create a requirements.txt file:

fastapi==0.79.0
uvicorn[standard]==0.18.2
Run fastapi['all'] if there's any error while installing fastapi.

Install:

(venv)$ pip install -r requirements.txt

Create a new file called main.py:

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World"}

Run the app:

(venv)$ uvicorn main:app --reload

INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [2061] using WatchFiles
INFO:     Started server process [2063]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

Visit http://localhost:8000 in your browser. You should see {"message":"Hello World"}.

Press Ctrl+C to terminate the development server.

Project structure thus far:

├── main.py
└── requirements.txt

Add Celery

Next, let's install and configure Celery. Proceed with this celery version only if OS is mac or linux, for windows download version below 4, as windows has stopped supporting celery 4 and above. 

Update requirements.txt, adding redis-py and Celery:

celery==5.2.7
redis==4.3.4

Install:

(venv)$ pip install -r requirements.txt

Update main.py:

from celery import Celery
from fastapi import FastAPI

app = FastAPI()


celery = Celery(
    __name__,
    broker="redis://127.0.0.1:6379/0",
    backend="redis://127.0.0.1:6379/0"
)


@app.get("/")
async def root():
    return {"message": "Hello World"}


@celery.task
def divide(x, y):
    import time
    time.sleep(5)
    return x / y

Notes:

  1. After creating a FastAPI instance, we created a new instance of Celery.
  2. The broker and backend tells Celery to use the Redis service we just launched. Rather than hard-coding these values, you can define them in a separate config file or pull them from environment variables.
  3. We defined a Celery task called divide, which simulates a long-running task.

Sending a Task to Celery

With the config done, let's try sending a task to Celery to see how it works. 

In a new terminal window, navigate to your project directory, activate the virtual environment, and then run:

(venv)$ celery -A main.celery worker --loglevel=info

You should see something similar to:

[config]
.> app:         main:0x10ad0d5f8
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . main.divide

Back in the first terminal window, run:

(venv)$ python

Let's send some tasks to the Celery worker:

>>> from main import app, divide
>>> task = divide.delay(1, 2)

What's happening?

  1. We used the delay method to send a new message to the message broker. The worker process then picked up and executed the task from the queue.
  2. After releasing from the Enter key, the code finished executing while the divide task ran in the background.

Turn to the Celery worker terminal. You should see something similar to:

[2022-08-20 15:14:02,089: INFO/MainProcess] Task main.divide[0dd7c3a2-27c0-4bef-bc42-31ca4f1007bd] received
[2022-08-20 15:14:07,109: INFO/ForkPoolWorker-8] Task main.divide[0dd7c3a2-27c0-4bef-bc42-31ca4f1007bd] succeeded in 5.017738905007718s: 0.5

The worker process received the task at 15:14:02. It took about five seconds for the task to start and finish.

Add another task or two. As you do this, picture the workflow in your head:

  1. The Celery client (the producer) adds a new task to the queue via the message broker.
  2. The Celery worker (the consumer) grabs the tasks from the queue, again, via the message broker.
  3. Once processed, results are stored in the result backend.

Add another new task:

>>> task = divide.delay(1, 2)

>>> type(task)
<class 'celery.result.AsyncResult'>

After we called the delay method, we get an AsyncResult instance, which can be used to check the task state along with the return value or exception details.

Add a new task then print task.state and task.result:

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
SUCCESS 0.5

>>> print(task.state, task.result)
SUCCESS 0.5

What happens if there's an error?

>>> task = divide.delay(1, 0)

# wait a few seconds before checking the state and result

>>> task.state
'FAILURE'

>>> task.result
ZeroDivisionError('division by zero')

Monitoring Celery with Flower

Flower is a real-time web application monitoring and administration tool for Celery.

Add the dependency to the requirements.txt file:

flower==1.2.0

Open a third terminal window, navigate to the project directory. Activate your virtual environment and then install Flower:

(venv)$ pip install -r requirements.txt

Once installed, spin up the server:

(venv)$ celery -A main.celery flower --port=5555

Navigate to http://localhost:5555 in your browser of choice to view the dashboard. Click "Tasks" in the nav bar at the top to view the finished tasks.

In the first terminal window, run a few more tasks, making sure you have at least one that will fail:

>>> task = divide.delay(1, 2)
>>> task = divide.delay(1, 0)
>>> task = divide.delay(1, 2)
>>> task = divide.delay(1, 3)

owser of choice to view the dashboard. Click "Tasks" in the nav bar at the top to view the finished tasks.

In the first terminal window, run a few more tasks, making sure you have at least one that will fail:

>>> task = divide.delay(1, 2)
>>> task = divide.delay(1, 0)
>>> task = divide.delay(1, 2)
>>> task = divide.delay(1, 3)

Recent Posts

e2eHiring’s GenZ mock assessments to enhance campus placements: Register now!

e2eHiring’s GenZ mock assessments to enhance campus placements: Register now!

How is Technology impacting the HR practices in India in the post Covid Era?

How is Technology impacting the HR practices in India in the post Covid Era?

Collection of 500+ ML projects

Collection of 500+ ML projects

How to Retain Your Tech Employees amid Economic Recession?

How to Retain Your Tech Employees amid Economic Recession?

User enumeration - vulnerability and mitigation

User enumeration - vulnerability and mitigation

copycopycopycopy

Han Solo

Recent Posts

e2eHiring’s GenZ mock assessments to enhance campus placements: Register now!

e2eHiring’s GenZ mock assessments to enhance campus placements: Register now!

How is Technology impacting the HR practices in India in the post Covid Era?

How is Technology impacting the HR practices in India in the post Covid Era?

Collection of 500+ ML projects

Collection of 500+ ML projects

How to Retain Your Tech Employees amid Economic Recession?

How to Retain Your Tech Employees amid Economic Recession?

User enumeration - vulnerability and mitigation

User enumeration - vulnerability and mitigation