Async Programming in Python

In this article, we are going to discuss asynchronous programming, which is a type of parallel programming in Python, how to use Python async features.

We are usualy familiar with synchronous programs, which executes one step at a time. Traditional Python programs run sequentially in such a manner that each line of code is executed one after the other. If the program is a simple one, it is fine, but when it comes to dealing with tasks that involce waiting, such as network requests or file I/O operations, it becomes very inefficient.

Async programming, short for asynchronous programming, allows us to write concurrent code that can handle multiple tasks simultaneously. We can achieve that with the use of asyncio, built-in Python library. At the heart of async programming in Python, we have two keywords: async and await:

  1. async: We use this to define async functions by putting it in front of the def keyword. A function of this type can contain await keyword and executed asynchronously, which allows the others to run while it waits for the I/O bound taks.
  2. await: We use this keyword inside async function to wait for the result of asynchronous operations. When await is faced, the event loop can switch to other task while waiting the awaited operation to be completed.

The asyncio uses different constructs: event loops, coroutines and futures:

  1. The event loop is responisble for distributing and controlling the execution of different tasks.
  2. Coroutines are functions that are similar to generators, on await they release the flow to the event loop.
  3. Futures are like the result of a task.

Below, I write a script that requests the World Bank’s API endpoints to gather some information about countries such as capitals, population, land square, military expenditure, military personnel etc. The first one is a traditional program to achieve our goal and the second one uses the async approach.

# utils.py

import time
import numpy as np


def function_timer(func):
    """
    Decorator to run a function multiple times and get the mean result of
    the executions.
    :param func: Function to decorate.
    :return: Decorator function.
    """
    def wrapper(*args, n=1, **kwargs):
        results = []
        for _ in range(n):
            start = time.time()
            result = func(*args, **kwargs)
            results.append(time.time() - start)
        mean_time = np.mean(results)
        std_dev = np.std(results)
        print(f'Mean time for {n} runs:'
        f'{round(mean_time, 2)} ± {round(std_dev, 2)} seconds.')
        return result
    return wrapper

Sync Approach:

import asyncio

import pandas as pd
import requests
from aiohttp import ClientSession

from utils import function_timer

class WorldBankSyncDataFetcher:
    """
    This class fetches data from the World Bank API synchronously.
    """
    def __init__(self, urls_dataframes: list) -> None:
        """
        Initialize the WorldBankDataFetcher class.
        :param urls_dataframes: A list of dictionaries containing the URLs and
        column renaming for each DataFrame
        """
        self.urls_dataframes = urls_dataframes
        self.data_frames = []

    @staticmethod
    def fetch_data(url: str, rename_columns: dict) -> pd.DataFrame:
        """
        Fetch data from the World Bank API synchronously.
        :param url: The URL to fetch data from
        :param rename_columns: A dictionary containing the column renaming
        :return: A pandas DataFrame containing the fetched data
        """
        data = []
        page = 1
        is_not_finished = True

        while is_not_finished:
            params = {
                "format": "json",
                "page": page
            }
            res = requests.get(url, params=params)

            if res.status_code == 200:
                result = res.json()
                is_not_finished = result[0]['pages'] > result[0]['page']
                data += result[1]
                page += 1
            else:
                raise Exception(f"Request error: {res.status_code}")

        df = pd.DataFrame(data)
        df.rename(columns=rename_columns, inplace=True)
        return df

    @function_timer
    def fetch_all_data(self) -> list:
        """
        Fetch all data from the World Bank API synchronously.
        :return: A list of pandas DataFrames containing the fetched data
        """
        for data in self.urls_dataframes:
            df = self.fetch_data(data['url'], data['rename_columns'])
            self.data_frames.append(df)
        return self.data_frames

# Define URLs for the World Bank API
BASE_URL = "https://api.worldbank.org/v2/country"
LA_URL = BASE_URL + "/all/indicators/AG.LND.TOTL.K2?date=2021"
POP_URL = BASE_URL + "/all/indicators/SP.POP.TOTL?date=2022"
MIL_EXP_URL = BASE_URL + "/all/indicators/MS.MIL.XPND.CD?date=2021"
MIL_PER_URL = BASE_URL + "/all/indicators/MS.MIL.TOTL.P1?date=2020"

# Define URLs and column renaming for each DataFrame
urls_dataframes = [
    {
        "url": BASE_URL,
        "rename_columns": {'capitalCity': 'capital_city'}
    },
    {
        "url": LA_URL,
        "rename_columns": {'countryiso3code': 'id', 'value': 'land_area'}
    },
    {
        "url": POP_URL,
        "rename_columns": {'countryiso3code': 'id', 'value': 'population'}
    },
    {
        "url": MIL_EXP_URL,
        "rename_columns": {'countryiso3code': 'id',
                           'value': 'military_expenditure'}
    },
    {
        "url": MIL_PER_URL,
        "rename_columns": {'countryiso3code': 'id',
                           'value': 'military_personnel'}
    }
]

# Create an instance of sync fetcher
print("Sync fetcher is running...")
fetcher_sync = WorldBankSyncDataFetcher(urls_dataframes)
fetcher_sync.fetch_all_data(n=10)

# Output:
# Sync fetcher is running...
# Mean time for 10 runs: 9.06 ± 0.21 seconds.

Async Approach:

import asyncio

import pandas as pd
import requests
from aiohttp import ClientSession

from utils import function_timer


class WorldBankAsyncDataFetcher:
    """
    This class fetches data from the World Bank API asynchronously.
    """
    def __init__(self, urls_dataframes: list[dict]) -> None:
        """
        Initialize the WorldBankAsyncDataFetcher class.
        :param urls_dataframes: A list of dictionaries containing the URLs and
        column renaming for each DataFrame
        """
        self.urls_dataframes = urls_dataframes
        self.data_frames = []

    @staticmethod
    async def fetch_data(session: ClientSession, url: str) -> list[dict]:
        """
        Fetch data from the World Bank API asynchronously.
        :param session: A client session from aiohttp
        :param url: The URL to fetch data from
        :return: A list of dictionaries containing the fetched data
        """
        data = []
        page = 1
        is_not_finished = True

        while is_not_finished:
            params = {
                "format": "json",
                "page": page
            }
            async with session.get(url, params=params) as response:
                if response.status == 200:
                    result = await response.json()
                    data += result[1]
                    is_not_finished = result[0]['pages'] > result[0]['page']
                    page += 1
                else:
                    raise Exception(f"Request error: {response.status}")

        return data

    async def fetch_all(self) -> None:
        async with ClientSession() as session:
            tasks = [self.fetch_data(session, data['url']) for data in \
                self.urls_dataframes]
            results = await asyncio.gather(*tasks)
            self.data_frames = [pd.DataFrame(result) for result in results]

    def process_data_frames(self) -> None:
        """
        Process the data frames by renaming columns as specified in the data
        dictionary.
        :return: None
        """
        for idx, data in enumerate(self.urls_dataframes):
            df = self.data_frames[idx]
            # Rename columns as specified in the data dictionary
            df.rename(columns=data['rename_columns'], inplace=True)

    @function_timer
    def fetch_all_data(self) -> list:
        """
        Fetch all data from the World Bank API asynchronously.
        :return: A list of pandas DataFrames containing the fetched data
        """
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.fetch_all())
        self.process_data_frames()
        return self.data_frames

# Define URLs for the World Bank API
BASE_URL = "https://api.worldbank.org/v2/country"
LA_URL = BASE_URL + "/all/indicators/AG.LND.TOTL.K2?date=2021"
POP_URL = BASE_URL + "/all/indicators/SP.POP.TOTL?date=2022"
MIL_EXP_URL = BASE_URL + "/all/indicators/MS.MIL.XPND.CD?date=2021"
MIL_PER_URL = BASE_URL + "/all/indicators/MS.MIL.TOTL.P1?date=2020"

# Define URLs and column renaming for each DataFrame
urls_dataframes = [
    {
        "url": BASE_URL,
        "rename_columns": {'capitalCity': 'capital_city'}
    },
    {
        "url": LA_URL,
        "rename_columns": {'countryiso3code': 'id', 'value': 'land_area'}
    },
    {
        "url": POP_URL,
        "rename_columns": {'countryiso3code': 'id', 'value': 'population'}
    },
    {
        "url": MIL_EXP_URL,
        "rename_columns": {'countryiso3code': 'id',
                           'value': 'military_expenditure'}
    },
    {
        "url": MIL_PER_URL,
        "rename_columns": {'countryiso3code': 'id',
                           'value': 'military_personnel'}
    }
]

# Create an instance of async fetcher
print("Async fetcher is running...")
fetcher_async = WorldBankAsyncDataFetcher(urls_dataframes)
fetcher_async.fetch_all_data(n=10)

# Output:
# Async fetcher is running...
# Mean time for 10 runs: 1.43 ± 0.11 seconds.

In conclusion, as we read the codes above, we iterate each approach 10 times to get more solid results and see the following mean value for approaches:

  • Async: 1.43 ± 0.11 seconds.
  • Sync : 9.06 ± 0.21 seconds.

It is clearly seen that async approach in our request case is 6 times faster than the traditional one and this makes very big difference in big scale projects.

References