Chapter 23: Asynchronous API Calls

From Sequential to Concurrent: 10x Performance for Multi-API Applications

1. The Async Vision

Sequential's slow. Concurrent's fast.

Through 22 chapters, every API call you've made has been synchronous. Call requests.get(), wait for the response, move to the next line. This pull-and-wait pattern works perfectly for one or two requests. But modern applications aggregate data from dozens of endpoints. When you fetch from NewsAPI, The Guardian, Reddit, Hacker News, GitHub, and five other sources, synchronous code becomes painfully slow.

The problem isn't your code or your connection. The problem is idle time. Network requests spend 90% of their lifecycle waiting for servers to respond. Your CPU sits there doing nothing while packets travel across the internet. Synchronous code makes you wait for each request sequentially. Ten requests at 500ms each: 5 seconds wasted.

Asynchronous programming eliminates the wait. Fire off all ten requests simultaneously. While one waits for a response, Python switches to another. All requests complete in ~500ms—the time of the slowest one, not the sum of all ten. The performance difference is dramatic: what takes 50 seconds synchronously runs in under a second with async.

This chapter teaches you to write async API code using Python's asyncio and httpx. You'll build an Async News Aggregator that fetches from multiple sources concurrently, demonstrating the 10x performance improvements that separate professional applications from tutorial code.

What You'll Build: Async News Aggregator

The Async News Aggregator fetches headlines from multiple news APIs concurrently and aggregates them into a unified feed. Four core features demonstrate production async patterns:

1.

Concurrent Multi-Source Fetching

Fetch from NewsAPI, The Guardian, and Hacker News simultaneously using asyncio.gather(). Instead of waiting 1.5 seconds for three sequential requests, all three complete in ~500ms. The system demonstrates how async transforms I/O-bound applications: more sources don't mean proportionally longer wait times.

2.

Graceful Degradation on Failures

Handle partial failures intelligently. When The Guardian API times out, still show headlines from NewsAPI and Hacker News. Each source gets error handling that catches exceptions, logs failures, and returns empty results rather than crashing the entire aggregation. Users see what succeeded, not error pages.

3.

Rate Limiting with Concurrency Control

Respect API quotas while maximizing throughput. Use semaphores to limit concurrent requests (e.g., max 5 at a time) and track request timestamps to enforce rate limits (e.g., 100 requests per hour). The system demonstrates async doesn't mean unlimited parallelism—production code balances speed with API provider requirements.

4.

Performance Measurement and Comparison

Measure actual speedups with timing instrumentation. Compare synchronous baseline (1.5s for 3 sources) against async implementation (~0.5s for same sources). Log metrics showing request durations, concurrency levels, and throughput. This proves async value with data, not assumptions.

Why This Matters for Your Portfolio

Async skills separate junior from senior backend developers. Junior devs make sequential API calls and wonder why dashboards load slowly. Senior devs use async to aggregate data from 20 sources in under 2 seconds. This project demonstrates you understand concurrency, I/O multiplexing, and performance optimization.

More importantly, async is everywhere in modern Python. FastAPI (the fastest-growing web framework) is async-first. Background job processors use async for throughput. Real-time applications require async I/O. Stripe's payment processor, Twilio's messaging APIs, and cloud platforms all benefit from async client code. Master async once, apply it everywhere.

Learning Objectives

By the end of this chapter, you'll be able to:

  • Explain when async provides value versus when it adds unnecessary complexity.
  • Convert synchronous API code to async using httpx.
  • Execute multiple API calls concurrently with asyncio.gather().
  • Handle errors and timeouts in async code gracefully.
  • Implement rate limiting that respects API quotas while maximizing concurrency.
  • Test async code with pytest-asyncio.
  • Measure and compare sync versus async performance with timing instrumentation.

2. Chapter Roadmap

This chapter builds your async skills through six progressive stages: immediate proof-of-concept, performance analysis, conversion techniques, concurrency patterns, error handling, testing, and complete project implementation. Here's the journey:

1

Your First Async Function in 5 Minutes

Section 3 • Quick Win

Write a minimal async function, run three API calls concurrently, and see 3x speedup immediately. This quick win proves async works before diving into theory.

async/await asyncio.gather() Instant Speedup
2

Why Async Matters

Section 4 • Performance Analysis

Understand the blocking problem, measure sync vs async performance, and learn when async provides value versus unnecessary complexity.

Performance Math Decision Framework I/O vs CPU
3

Converting Synchronous Code to Async

Section 5 • Conversion Techniques

Transform sync functions to async step-by-step. Replace requests with httpx, add async/await keywords, and understand the event loop.

httpx AsyncClient async def Event Loop
4

Concurrent Request Patterns

Section 6 • Concurrency Control

Use gather(), semaphores for limiting concurrency, and rate limiting patterns. Balance speed with API quotas and server capacity.

asyncio.gather() Semaphores Rate Limiting
5

Error Handling in Async Code

Section 7 • Fault Tolerance

Handle timeouts, network errors, and partial failures gracefully. Implement try/except within async functions and use return_exceptions for resilience.

Timeout Handling Graceful Degradation Exception Patterns
6

Testing Async Code

Section 8 • Automated Testing

Write tests for async functions using pytest-asyncio. Mock async HTTP calls, verify concurrent execution, and test error scenarios.

pytest-asyncio Async Mocking Test Patterns
7

Async News Aggregator Project

Section 9 • Complete Implementation

Build the complete async news aggregator with multi-source fetching, rate limiting, error handling, and performance measurement.

Production Async Multi-Source Performance Metrics

Key strategy: You'll build incrementally. Section 3 proves async works with a 5-minute example. Section 4 explains why and when async matters. Section 5 teaches conversion techniques. Section 6 adds concurrency patterns. Section 7 hardens error handling. Section 8 adds testing. Section 9 builds the complete project. Section 10 synthesizes everything. Each stage builds on the previous, teaching you how async applications evolve from proof-of-concept to production-ready systems.

3. Your First Async Function in 5 Minutes

Before theory, proof. You'll write a minimal async function, run three API calls concurrently, and see 3x speedup in your terminal. Five minutes from now, you'll understand why async matters.

The Minimal Async Example

First, install httpx (async HTTP client):

Shell
pip install httpx

Create first_async.py:

Your First Async Function
Python
import asyncio
import time
import httpx

async def fetch_url(url):
    async with httpx.AsyncClient() as client:
        response = await client.get(url, timeout=10)
        return len(response.text)

async def main():
    urls = [
        "https://api.github.com",
        "https://httpbin.org/delay/1",
        "https://jsonplaceholder.typicode.com/posts",
    ]
    
    start = time.time()
    
    # Run all three requests concurrently
    results = await asyncio.gather(*[fetch_url(url) for url in urls])
    
    elapsed = time.time() - start
    
    print(f"Fetched {len(results)} URLs concurrently")
    print(f"Response sizes: {results}")
    print(f"Total time: {elapsed:.2f} seconds")

if __name__ == "__main__":
    asyncio.run(main())

Run it:

Shell
python first_async.py

You'll see output similar to:

Output
Fetched 3 URLs concurrently
Response sizes: [4523, 3891, 15024]
Total time: 1.23 seconds

Three requests completed in ~1.2 seconds. The middle URL (httpbin.org/delay/1) deliberately delays 1 second. If these ran sequentially, total time would be ~2+ seconds. Async cut runtime nearly in half.

What Just Happened

Let's break down the async pattern:

  • async def: Declares fetch_url() and main() as async functions.
  • await: Pauses execution until client.get() completes. While waiting, Python switches to other tasks.
  • asyncio.gather(): Runs all three fetch_url() calls concurrently and collects results.
  • asyncio.run(): Starts the event loop and runs main().
The Event Loop Magic

When fetch_url() hits await client.get(), it doesn't block. Instead, it yields control to asyncio's event loop. The loop switches to the next fetch_url() call and starts that request. While both wait for network responses, the loop can handle other work or simply idle efficiently.

Once a response arrives, the loop resumes that particular fetch_url() execution right after the await. All three functions progress independently. Total time equals the slowest request, not the sum of all requests.

Compare to Synchronous Code

Here's the same task with synchronous requests:

Synchronous Version (Slower)
Python
import time
import requests

def fetch_url_sync(url):
    response = requests.get(url, timeout=10)
    return len(response.text)

urls = [
    "https://api.github.com",
    "https://httpbin.org/delay/1",
    "https://jsonplaceholder.typicode.com/posts",
]

start = time.time()

# Each request blocks until complete
results = [fetch_url_sync(url) for url in urls]

elapsed = time.time() - start

print(f"Fetched {len(results)} URLs sequentially")
print(f"Response sizes: {results}")
print(f"Total time: {elapsed:.2f} seconds")  # ~2.5 seconds

Synchronous version: ~2.5 seconds. Async version: ~1.2 seconds. Speedup: 2x with just three URLs. Add 10 URLs? Sync takes 10+ seconds. Async still completes in ~1-2 seconds.

When to Use Async

Use async when you have multiple I/O-bound operations that can run independently. Good candidates: fetching from many APIs, querying multiple databases, processing files concurrently, web scraping dozens of pages.

Skip async for: single API calls, CPU-heavy work (image processing, machine learning), simple scripts where performance is already acceptable. Async adds complexity—only use it when the performance gain justifies the cost.

What's Missing

This minimal example proves async works, but it's not production-ready:

  • No error handling (one failed request crashes everything)
  • No rate limiting (might overwhelm APIs or hit quotas)
  • No timeout handling per request (slow APIs block others)
  • No concurrency limiting (unlimited parallel requests can cause issues)

The next sections fix all four problems. You'll add error handling, implement rate limiting, control concurrency with semaphores, and build production-ready async patterns. But first, let's understand why async is so much faster.

4. Why Async Matters: The Performance Problem

So far in this book, all your API calls have been synchronous. When you call requests.get(), your program stops and waits for the response before moving to the next line. This is simple to understand and works well when you make one or two API calls. But when you need to fetch data from dozens of endpoints, synchronous code becomes painfully slow.

The problem is not your code or your internet connection. The problem is that network requests spend most of their time waiting. When you send an HTTP request, the actual data transfer might take 50 milliseconds, but the round trip to the server and back can take 500 milliseconds or more. During that wait, your CPU sits idle doing nothing useful.

Asynchronous programming fixes this by letting your code do other work while waiting for network responses. Instead of making one request, waiting, then making the next request, you can fire off dozens of requests at once and handle the responses as they arrive. The performance difference is dramatic: what takes 50 seconds synchronously can take less than a second with async code.

In this chapter, you will learn how to convert synchronous API code to asynchronous code using Python's asyncio library and the httpx HTTP client. You will build an Async News Aggregator that fetches from multiple news APIs concurrently, demonstrating real world performance improvements that separate professional applications from tutorial code.

Learning Objectives

By the end of this chapter, you will be able to:

  • Explain when async provides value versus when it adds unnecessary complexity.
  • Convert synchronous API code to async using httpx.
  • Execute multiple API calls concurrently with asyncio.gather().
  • Handle errors and timeouts in async code gracefully.
  • Implement rate limiting to respect API quotas while maximizing concurrency.
  • Test async code with pytest-asyncio.
  • Measure and compare sync versus async performance.
  • Deploy async applications to production.

Real Numbers: Sync vs Async

To see the problem clearly, imagine you are building a news aggregator that fetches headlines from three different APIs: NewsAPI, The Guardian, and Hacker News. Each API takes about 500 milliseconds to respond. Here is what happens with synchronous code:

Synchronous News Fetching (Slow)
Python
import time
import requests

def fetch_newsapi():
    response = requests.get("https://newsapi.org/v2/top-headlines?country=us&apiKey=...")
    return response.json()

def fetch_guardian():
    response = requests.get("https://content.guardianapis.com/search?api-key=...")
    return response.json()

def fetch_hackernews():
    response = requests.get("https://hacker-news.firebaseio.com/v0/topstories.json")
    return response.json()

start = time.time()

# Each call blocks until it completes
newsapi_data = fetch_newsapi()      # Wait ~500ms
guardian_data = fetch_guardian()    # Wait ~500ms
hackernews_data = fetch_hackernews()  # Wait ~500ms

elapsed = time.time() - start
print(f"Fetched all sources in {elapsed:.2f} seconds")  # ~1.5 seconds

This code takes about 1.5 seconds because it waits for each request to complete before starting the next one. The total time is the sum of all individual request times. If you add ten more APIs, you are waiting ten times longer.

Now imagine the same task with async code:

Asynchronous News Fetching (Fast)
Python
import asyncio
import time
import httpx

async def fetch_newsapi():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://newsapi.org/v2/top-headlines?country=us&apiKey=...")
        return response.json()

async def fetch_guardian():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://content.guardianapis.com/search?api-key=...")
        return response.json()

async def fetch_hackernews():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://hacker-news.firebaseio.com/v0/topstories.json")
        return response.json()

async def main():
    start = time.time()
    
    # All calls start at the same time and run concurrently
    results = await asyncio.gather(
        fetch_newsapi(),
        fetch_guardian(),
        fetch_hackernews(),
    )
    
    elapsed = time.time() - start
    print(f"Fetched all sources in {elapsed:.2f} seconds")  # ~0.5 seconds
    return results

if __name__ == "__main__":
    asyncio.run(main())

This async version takes about 0.5 seconds—the time of the slowest request, not the sum of all requests. All three API calls happen at the same time. Python switches between them while they are waiting for network responses, which is where almost all the time is spent.

Diagram comparing synchronous versus asynchronous execution. Top panel shows Synchronous Execution with three colored bars (Task A, Task B, Task C) arranged sequentially, each taking 500ms for a total of 1.5 seconds. Bottom panel shows Asynchronous Execution with the same three tasks running in parallel, all starting at 0s and completing within 0.5 seconds.
Synchronous execution runs tasks one after another (1.5s total). Asynchronous execution runs all tasks at once (0.5s total).

The performance improvement grows with the number of requests. If you fetch from 20 APIs, synchronous code might take 10 seconds while async code still finishes in under a second. For applications that aggregate data from many sources, async is not optional—it is the difference between a sluggish user experience and a fast, responsive one.

💡 Performance Formula

You can predict async speedup with simple math:

  • Sync time: n × average_request_time
  • Async time: max(request_times)

For 10 requests at ~500ms each:

  • Sync: 10 × 0.5s = 5 seconds
  • Async: max(0.5s) = 0.5 seconds
  • Speedup: ~10x

When Async Provides Value vs Added Complexity

Async is powerful, but it is not always the right choice. Adding async to your code makes it more complex: you have to deal with event loops, manage async context managers, and reason about concurrency. If you are only making one or two API calls, the performance gain is negligible and the added complexity is not worth it.

Here is a quick checklist to help you decide:

Good Candidates for Async
  • Many simultaneous HTTP calls to external APIs (10+ endpoints)
  • High-latency I/O operations like web requests and database queries
  • Background workers that fan out to many services concurrently
  • Web applications serving many users with I/O-bound request handlers
Bad Candidates for Async
  • CPU-heavy data processing (use multiprocessing or C extensions instead)
  • Simple scripts that make one or two quick API calls
  • Code where performance is already acceptable and complexity would hurt maintainability
  • Teams unfamiliar with async patterns where sync code is easier to understand

Here are good rules of thumb for when async makes sense:

1.

Many Independent Requests

If you need to fetch from ten or more endpoints and the requests do not depend on each other, async provides massive speed improvements. Examples include aggregating news from multiple sources, checking status across many services, or bulk fetching user data.

2.

High Latency, Low CPU Work

Async shines when your code spends most of its time waiting for network or disk I/O and little time doing CPU-intensive work. If you are processing large images or running machine learning models, async does not help much because the bottleneck is computation, not waiting.

3.

Background Tasks in Web Apps

If you are building a web application that makes API calls while serving user requests, async lets you handle more users concurrently. Frameworks like FastAPI use async by default because web servers spend most of their time waiting for database queries and external API calls.

4.

When to Avoid Async

Skip async if you are making only one or two API calls, if your code is simple and performance is already fine, or if your team is unfamiliar with async patterns. Synchronous code is easier to write, test, and debug. Only add async when you have measured a real performance problem and confirmed that async will solve it.

In this chapter you will learn how to convert synchronous code to async, but you will also learn how to measure whether async is worth the trade off. Professional developers do not use async everywhere—they use it strategically where it provides clear value.

Understanding asyncio and the Event Loop

Python's asyncio library manages asynchronous code using something called an event loop. The event loop is a scheduler that runs multiple tasks concurrently by switching between them whenever they are waiting for something.

Think of it like a waiter at a busy restaurant. Instead of taking one table's order, walking to the kitchen, waiting for the food, bringing it back, and only then moving to the next table, a good waiter takes orders from multiple tables, submits them all to the kitchen at once, and delivers food as it becomes ready. The waiter switches between tables efficiently instead of blocking on any single one.

Diagram showing the Asyncio Event Loop in the center as a circular scheduler with three tasks (Task A, Task B, Task C) surrounding it. Arrows show tasks being activated and run by the event loop, then yielding control back when they hit await statements. The event loop continuously checks ready tasks, runs selected tasks, receives yields, and handles IO/Timers.
The event loop schedules tasks. When a task hits await, it yields control and the loop switches to another task.

In async Python, the event loop is the waiter and your tasks are the tables. When you use await, you are telling Python, "This will take a while, so work on something else and come back when this is ready." The event loop switches to another task, and returns to your code when the awaited operation completes.

You do not usually interact with the event loop directly. You write async def functions, use await for I/O operations, and call asyncio.run() to start the event loop. The library handles the rest.

The key insight is this: async is cooperative multitasking. Your code must explicitly yield control with await for the event loop to switch tasks. If you forget to use await on a slow operation, you will block the entire event loop and lose all the concurrency benefits.

Async/Await Syntax Fundamentals

Python's async syntax is built around two keywords: async and await. Here is what they mean:

  • async def: Defines a coroutine function. When you call an async function, it returns a coroutine object that represents work to be done, but it does not run yet.
  • await: Pauses the coroutine and waits for another coroutine to complete. While waiting, the event loop can run other tasks. You can only use await inside an async def function.

Here is a minimal example that shows the basic pattern:

Async/Await Basics
Python
import asyncio

async def fetch_data():
    print("Starting fetch...")
    await asyncio.sleep(1)  # Simulate a slow network call
    print("Fetch complete!")
    return {"data": "example"}

async def main():
    result = await fetch_data()
    print("Got result:", result)

if __name__ == "__main__":
    asyncio.run(main())

In this code, fetch_data() is a coroutine. When you call it, you get a coroutine object. The await keyword actually runs the coroutine and waits for it to finish. asyncio.run(main()) starts the event loop and runs the main() coroutine.

The rules are simple but strict:

  • You cannot use await in a regular function. Only in async def functions.
  • If you call an async function without await, you get a coroutine object but nothing runs.
  • The top level entry point (usually main()) is started with asyncio.run().
🚨 Most Common Async Bug: Missing await

This is the error that catches everyone:

Python
# ❌ WRONG: This looks like it works but does NOTHING
result = fetch_data()  # Returns coroutine object, doesn't run!

# ✅ CORRECT: Actually executes the function
result = await fetch_data()  # Runs and returns the data

If you forget await, Python returns a coroutine object instead of executing the function. Your code appears to run without errors, but the async operation never happens. This is a silent bug that can be very hard to debug.

In the next section, you will learn how to convert real synchronous API code into async code using these patterns.

5. Converting Synchronous Code to Async

From requests to httpx: The Async HTTP Client

The requests library that you have been using throughout this book is synchronous. It blocks until each HTTP request completes. To use async, you need an async HTTP client. The most popular option is httpx, which has an API that is nearly identical to requests but supports both sync and async.

Install httpx with:

Shell
pip install httpx

Here is a side by side comparison of synchronous requests and async httpx:

Synchronous with requests
Python
import requests

def fetch_weather(city):
    response = requests.get(
        f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid=YOUR_KEY"
    )
    response.raise_for_status()
    return response.json()

weather = fetch_weather("London")
print(weather)
Asynchronous with httpx
Python
import asyncio
import httpx

async def fetch_weather(city):
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid=YOUR_KEY"
        )
        response.raise_for_status()
        return response.json()

async def main():
    weather = await fetch_weather("London")
    print(weather)

if __name__ == "__main__":
    asyncio.run(main())

The changes are small but important:

  • Change def to async def.
  • Use httpx.AsyncClient() instead of requests.
  • Add await before the client.get() call.
  • Use async with to manage the client as an async context manager.
  • Wrap the code in a main() coroutine and run it with asyncio.run().
💡 Why async with?

Regular with blocks cannot await during cleanup. When closing an HTTP connection, you need to wait for pending data to flush—that requires await. The async with statement lets Python run async code in both __aenter__ and __aexit__ methods.

Without async with, you would have to manually manage connection lifecycle with explicit await client.aclose() calls. The context manager handles this automatically, even if an exception occurs.

This async version does not look much different, and if you only make one request, it will not be faster. The power comes when you make many concurrent requests, which you will learn in the next section.

Converting Functions: def → async def

The first step in converting synchronous code to async is changing function definitions. Any function that calls an async operation must itself be async. This propagates upward through your call stack: if A calls B and B is async, then A must also be async.

Here is a typical synchronous data pipeline:

Synchronous Pipeline
Python
import requests

def fetch_api_data(url):
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

def process_data(data):
    # Some CPU work: filtering, transforming
    return [item for item in data if item["active"]]

def save_to_file(data, filename):
    with open(filename, "w") as f:
        json.dump(data, f)

def run_pipeline(url, filename):
    raw_data = fetch_api_data(url)
    processed = process_data(raw_data)
    save_to_file(processed, filename)
    return len(processed)

To convert this to async, you make the I/O functions async and add await where they are called:

Async Pipeline
Python
import asyncio
import httpx
import aiofiles  # async file I/O
import json

async def fetch_api_data(url):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

def process_data(data):
    # CPU work stays synchronous (no I/O to await)
    return [item for item in data if item["active"]]

async def save_to_file(data, filename):
    async with aiofiles.open(filename, "w") as f:
        await f.write(json.dumps(data))

async def run_pipeline(url, filename):
    raw_data = await fetch_api_data(url)
    processed = process_data(raw_data)
    await save_to_file(processed, filename)
    return len(processed)

Notice that process_data() remains a regular function because it does not do any I/O. Only functions that perform async operations need to be async. Pure computation stays synchronous.

Managing Async Context Managers and Sessions

When you use httpx.AsyncClient(), you should manage it with an async context manager (async with). This ensures that connections are properly opened and closed, even if an error occurs.

If you are making many requests, it is more efficient to reuse a single client session rather than creating a new one for each request:

Reusing an Async HTTP Client
Python
import asyncio
import httpx

async def fetch_url(client, url):
    response = await client.get(url)
    response.raise_for_status()
    return response.json()

async def main():
    urls = [
        "https://api.example.com/endpoint1",
        "https://api.example.com/endpoint2",
        "https://api.example.com/endpoint3",
    ]
    
    # Create one client and reuse it for all requests
    async with httpx.AsyncClient(timeout=10.0) as client:
        tasks = [fetch_url(client, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    return results

if __name__ == "__main__":
    asyncio.run(main())

By creating the client once and passing it to each function, you avoid the overhead of creating and tearing down connections repeatedly. This pattern is especially important when making hundreds or thousands of requests.

Common Async Pitfalls and How to Avoid Them

Async code is powerful but introduces new failure modes. Here are the most common mistakes and how to avoid them:

1.

Forgetting to Use await

If you call an async function without await, you get a coroutine object but the function does not run. This is a silent bug that can be hard to debug. Always use await when calling async functions.

2.

Blocking the Event Loop

If you call a slow synchronous function (like time.sleep() or a CPU-heavy computation) inside an async function without yielding control, you block the entire event loop. Use asyncio.sleep() instead of time.sleep(), and run CPU-heavy work in a thread or process pool.

3.

Not Handling Exceptions in gather()

By default, asyncio.gather() stops and raises an exception if any task fails. Use return_exceptions=True if you want to collect both successful results and exceptions without aborting.

4.

Mixing Sync and Async Code Incorrectly

You cannot call an async function from a synchronous function without starting an event loop. If you need to mix sync and async, use asyncio.run() to call async code from sync contexts, or redesign your code to be async all the way through.

The best way to avoid these pitfalls is to keep your async code simple and well tested. Start with a small async function, verify it works, then build outward. Use linters like pylint or type checkers like mypy to catch missing await keywords.

Checkpoint Quiz

Before you move on to concurrent request patterns, check that you understand the basics of converting sync code to async:

Select question to reveal the answer:
What is the key difference between requests and httpx for async code?

requests is synchronous and blocks until each HTTP request completes. httpx supports both synchronous and asynchronous modes. When used with httpx.AsyncClient() and await, it allows the event loop to switch to other tasks while waiting for network responses, enabling concurrent requests.

Why do you need to use async with when creating an httpx.AsyncClient()?

The async with statement ensures that the HTTP client properly opens and closes connections, even if an error occurs during execution. It manages the client's lifecycle, including connection pooling and cleanup, which is important for avoiding resource leaks in async code.

What happens if you call an async function without await?

If you call an async function without await, you get a coroutine object but the function does not actually execute. This is a common silent bug. The code appears to run without errors, but the async operation never happens. Always use await when calling async functions.

6. Concurrent Request Patterns

Gathering Results: asyncio.gather() for Parallel Execution

The real power of async comes from running multiple operations concurrently. The most common pattern is asyncio.gather(), which takes multiple coroutines and runs them all at the same time, then returns their results in the same order you passed them in.

Concurrent API Calls with gather()
Python
import asyncio
import httpx

async def fetch_weather(client, city):
    response = await client.get(
        f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid=YOUR_KEY"
    )
    response.raise_for_status()
    return response.json()

async def main():
    cities = ["London", "Paris", "Tokyo", "New York", "Sydney"]
    
    async with httpx.AsyncClient(timeout=10.0) as client:
        # Create a list of coroutines
        tasks = [fetch_weather(client, city) for city in cities]
        
        # The * unpacks the list: gather(*[a, b, c]) becomes gather(a, b, c)
        # Run all tasks concurrently and wait for all to complete
        results = await asyncio.gather(*tasks)
    
    # results is a list in the same order as cities
    for city, weather in zip(cities, results):
        temp = weather["main"]["temp"]
        print(f"{city}: {temp}K")

if __name__ == "__main__":
    asyncio.run(main())

This code fetches weather for five cities concurrently. All five requests start at roughly the same time, and gather() waits until all have completed. The total time is about as long as the slowest request, not the sum of all requests.

The *tasks syntax unpacks the list so that each coroutine is passed as a separate argument to gather(). You could also write it as asyncio.gather(tasks[0], tasks[1], ...), but unpacking is cleaner when you have a dynamic list.

Task Groups for Better Error Handling

Python 3.11 introduced task groups, which provide better control over concurrent tasks and cleaner error handling. Task groups automatically cancel remaining tasks if one fails, which is often what you want.

Using Task Groups (Python 3.11+)
Python
import asyncio
import httpx

async def fetch_weather(client, city):
    response = await client.get(
        f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid=YOUR_KEY"
    )
    response.raise_for_status()
    return response.json()

async def main():
    cities = ["London", "Paris", "Tokyo"]
    results = []
    
    async with httpx.AsyncClient(timeout=10.0) as client:
        async with asyncio.TaskGroup() as group:
            tasks = [
                group.create_task(fetch_weather(client, city))
                for city in cities
            ]
        
        # If we reach here, all tasks completed successfully
        results = [task.result() for task in tasks]
    
    return results

if __name__ == "__main__":
    asyncio.run(main())

If you are using Python 3.10 or earlier, stick with asyncio.gather(). Task groups are a nicer API but not essential.

Limiting Concurrency with Semaphores

Running hundreds of concurrent requests is fast, but it can overwhelm the server or trigger rate limits. A semaphore lets you limit how many tasks run at the same time. Think of it as a bouncer at a club: only N people can be inside at once, and new people must wait until someone leaves.

Limiting Concurrent Requests
Python
import asyncio
import httpx

async def fetch_with_semaphore(client, url, semaphore):
    async with semaphore:
        # Only this many requests run concurrently
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

async def main():
    urls = [f"https://api.example.com/item/{i}" for i in range(100)]
    
    # Allow at most 10 concurrent requests
    semaphore = asyncio.Semaphore(10)
    
    async with httpx.AsyncClient(timeout=10.0) as client:
        tasks = [fetch_with_semaphore(client, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
    
    return results

if __name__ == "__main__":
    asyncio.run(main())

Even though you create 100 tasks, only 10 run at any given moment. As each request finishes, the semaphore releases a slot and the next waiting task starts. This keeps your code fast while respecting server limits.

Timeout and Cancellation Handling

In production, some requests will be slow or hang indefinitely. You should always set timeouts to prevent your application from waiting forever. Both httpx and asyncio support timeouts.

Setting Timeouts in Async Code
Python
import asyncio
import httpx

async def fetch_with_timeout(url):
    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.get(url)
            response.raise_for_status()
            return response.json()
    except httpx.TimeoutException:
        print(f"Request to {url} timed out")
        return None
    except httpx.HTTPStatusError as exc:
        print(f"HTTP error {exc.response.status_code} for {url}")
        return None

async def main():
    urls = ["https://api.example.com/slow", "https://api.example.com/fast"]
    
    # Set a timeout for the entire gather operation
    try:
        results = await asyncio.wait_for(
            asyncio.gather(*[fetch_with_timeout(url) for url in urls]),
            timeout=10.0
        )
    except asyncio.TimeoutError:
        print("Overall operation timed out")
        results = []
    
    return results

if __name__ == "__main__":
    asyncio.run(main())

This code has two layers of timeout protection: individual requests time out after 5 seconds, and the entire gather() operation times out after 10 seconds. This prevents one slow endpoint from blocking your entire application.

Rate Limiting in Async Applications

Many APIs have rate limits: for example, "100 requests per minute" or "10 requests per second." If you send too many requests too quickly, the API will reject them. A semaphore limits concurrency but does not spread requests over time. For rate limiting, you need to add delays.

Simple Rate Limiting Pattern
Python
import asyncio
import httpx
import time

class RateLimiter:
    def __init__(self, max_calls, period):
        """Allow max_calls requests per period (in seconds)."""
        self.max_calls = max_calls
        self.period = period
        self.calls = []
    
    async def acquire(self):
        now = time.time()
        # Remove calls outside the current period
        self.calls = [call_time for call_time in self.calls if now - call_time < self.period]
        
        if len(self.calls) >= self.max_calls:
            # Wait until the oldest call expires
            sleep_time = self.period - (now - self.calls[0])
            await asyncio.sleep(sleep_time)
            self.calls.pop(0)
        
        self.calls.append(now)

async def fetch_with_rate_limit(client, url, rate_limiter):
    await rate_limiter.acquire()
    response = await client.get(url)
    response.raise_for_status()
    return response.json()

async def main():
    urls = [f"https://api.example.com/item/{i}" for i in range(50)]
    
    # Allow 10 requests per second
    rate_limiter = RateLimiter(max_calls=10, period=1.0)
    
    async with httpx.AsyncClient(timeout=10.0) as client:
        tasks = [fetch_with_rate_limit(client, url, rate_limiter) for url in urls]
        results = await asyncio.gather(*tasks)
    
    return results

if __name__ == "__main__":
    asyncio.run(main())

This rate limiter spreads requests over time so that you never exceed the allowed rate. It is a simple implementation that works well for most cases. In real applications, you will combine this pattern with the provider's documented limits (for example, "60 requests per minute") and enforce those rules in your async client rather than hoping retries will save you. For production use, consider libraries like aiolimiter that handle edge cases more gracefully.

Progress Tracking for Multiple Concurrent Operations

When you run many concurrent tasks, it is useful to track progress. You can use asyncio.as_completed() to process results as they finish, rather than waiting for all tasks to complete.

Tracking Progress with as_completed()
Python
import asyncio
import httpx

async def fetch_data(client, url):
    response = await client.get(url)
    response.raise_for_status()
    return response.json()

async def main():
    urls = [f"https://api.example.com/item/{i}" for i in range(20)]
    
    async with httpx.AsyncClient(timeout=10.0) as client:
        tasks = [fetch_data(client, url) for url in urls]
        
        # Process results as they complete
        results = []
        for i, coro in enumerate(asyncio.as_completed(tasks), start=1):
            result = await coro
            results.append(result)
            print(f"Completed {i}/{len(tasks)} requests")
    
    return results

if __name__ == "__main__":
    asyncio.run(main())

This pattern is useful for long-running batch operations where you want to show progress to the user or log intermediate results. Each task completes independently, and you handle them in the order they finish rather than the order you started them.

Checkpoint Quiz

Before moving to error handling, check your understanding of concurrent request patterns:

Select question to reveal the answer:
What is the difference between a semaphore and a rate limiter?

A semaphore limits how many tasks run at the same time (concurrency), but does not control when they start. A rate limiter spreads requests over time to stay under API quotas (for example, "10 requests per second"). You often use both together: a semaphore to avoid overwhelming your own system, and a rate limiter to respect the API provider's limits.

Why might you use asyncio.as_completed() instead of asyncio.gather()?

asyncio.gather() waits for all tasks to complete and returns results in the order you passed them in. asyncio.as_completed() yields results as soon as each task finishes, in completion order. Use as_completed() when you want to process results immediately or show progress, rather than waiting for everything to finish.

How do you handle a situation where some requests succeed and others fail?

Use asyncio.gather(*tasks, return_exceptions=True). This collects both successful results and exceptions in the returned list. You can then iterate through the results and handle exceptions individually instead of having one failure abort the entire operation.

7. Error Handling in Async Code

Try-Except Patterns in Async Functions

Error handling in async code works the same as in synchronous code: you use try-except blocks. The difference is that you must handle errors at the right level to avoid aborting concurrent tasks.

Basic Async Error Handling
Python
import asyncio
import httpx

async def fetch_with_error_handling(client, url):
    try:
        response = await client.get(url, timeout=5.0)
        response.raise_for_status()
        return {"success": True, "data": response.json()}
    except httpx.TimeoutException:
        return {"success": False, "error": "timeout"}
    except httpx.HTTPStatusError as exc:
        return {"success": False, "error": f"HTTP {exc.response.status_code}"}
    except Exception as exc:
        return {"success": False, "error": str(exc)}

async def main():
    urls = [
        "https://api.example.com/working",
        "https://api.example.com/slow",
        "https://api.example.com/broken",
    ]
    
    async with httpx.AsyncClient() as client:
        tasks = [fetch_with_error_handling(client, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    # All tasks complete, some may have failed
    for url, result in zip(urls, results):
        if result["success"]:
            print(f"{url}: Success")
        else:
            print(f"{url}: Failed ({result['error']})")

if __name__ == "__main__":
    asyncio.run(main())

By handling errors inside each async function, you prevent one failure from aborting the entire batch. Every task completes with either a success or error result, and you can inspect them all after gather() finishes.

Handling Partial Failures: Some Succeed, Some Fail

In production, you will often fetch from dozens of endpoints where some succeed and others fail. Your code should handle this gracefully: collect the successful results, log the failures, and continue working with whatever data you have.

Graceful Degradation with Partial Results
Python
import asyncio
import httpx

async def fetch_news_source(client, source_name, url):
    try:
        response = await client.get(url, timeout=5.0)
        response.raise_for_status()
        data = response.json()
        return {
            "source": source_name,
            "success": True,
            "articles": data.get("articles", []),
        }
    except Exception as exc:
        print(f"Failed to fetch {source_name}: {exc}")
        return {
            "source": source_name,
            "success": False,
            "articles": [],
        }

async def aggregate_news():
    sources = {
        "NewsAPI": "https://newsapi.org/v2/top-headlines?country=us&apiKey=...",
        "Guardian": "https://content.guardianapis.com/search?api-key=...",
        "HackerNews": "https://hacker-news.firebaseio.com/v0/topstories.json",
    }
    
    async with httpx.AsyncClient() as client:
        tasks = [
            fetch_news_source(client, name, url)
            for name, url in sources.items()
        ]
        results = await asyncio.gather(*tasks)
    
    # Collect all successful articles
    all_articles = []
    failed_sources = []
    
    for result in results:
        if result["success"]:
            all_articles.extend(result["articles"])
        else:
            failed_sources.append(result["source"])
    
    print(f"Fetched {len(all_articles)} articles from {len(results) - len(failed_sources)} sources")
    if failed_sources:
        print(f"Failed sources: {', '.join(failed_sources)}")
    
    return all_articles

if __name__ == "__main__":
    asyncio.run(aggregate_news())

This pattern is called graceful degradation. If one news source is down, you still show articles from the other sources. The user gets a slightly degraded experience rather than a complete failure.

asyncio.gather(return_exceptions=True) Pattern

By default, asyncio.gather() raises an exception if any task fails, canceling remaining tasks. If you want to collect both results and exceptions, use return_exceptions=True.

Collecting Both Results and Exceptions
Python
import asyncio
import httpx

async def fetch_data(client, url):
    response = await client.get(url)
    response.raise_for_status()
    return response.json()

async def main():
    urls = [
        "https://api.example.com/good",
        "https://api.example.com/broken",
        "https://api.example.com/also-good",
    ]
    
    async with httpx.AsyncClient(timeout=5.0) as client:
        tasks = [fetch_data(client, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # results contains both successful data and Exception objects
    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            print(f"{url}: Failed with {type(result).__name__}: {result}")
        else:
            print(f"{url}: Success, got {len(result)} items")

if __name__ == "__main__":
    asyncio.run(main())

This is useful when you want to inspect each failure individually. You can log detailed error information, retry specific failures, or decide how to handle each case based on the exception type.

💡 When to Use Each Pattern

Handle errors inside functions when you want custom error responses:

Python
async def fetch_with_fallback(url):
    try:
        return await fetch(url)
    except:
        return {"error": "fallback_data"}

Use return_exceptions=True when you want to inspect all failures:

Python
results = await gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
    if isinstance(result, HTTPError):
        log.error(f"Task {i} failed: {result}")

Choose based on whether you need structured responses (first pattern) or want to handle exceptions centrally (second pattern).

Retry Logic with Exponential Backoff (Async Version)

You learned retry patterns in Chapter 9. The async version is nearly identical, but uses asyncio.sleep() instead of time.sleep() so that you do not block the event loop.

Async Retry with Exponential Backoff
Python
import asyncio
import httpx
import random

async def fetch_with_retry(client, url, max_retries=3):
    for attempt in range(max_retries):
        try:
            response = await client.get(url, timeout=5.0)
            response.raise_for_status()
            return response.json()
        except (httpx.TimeoutException, httpx.HTTPStatusError) as exc:
            if attempt == max_retries - 1:
                raise  # Re-raise on final attempt
            
            # Exponential backoff with jitter
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            print(f"Retry {attempt + 1}/{max_retries} after {wait_time:.2f}s")
            await asyncio.sleep(wait_time)

async def main():
    async with httpx.AsyncClient() as client:
        data = await fetch_with_retry(client, "https://api.example.com/unreliable")
        print("Success:", data)

if __name__ == "__main__":
    asyncio.run(main())

The pattern is the same as synchronous retry logic, but using await asyncio.sleep() instead of blocking sleep keeps the event loop responsive. You can run many retry operations concurrently without blocking each other.

Checkpoint Quiz

Before moving to testing async code, check your understanding of error handling patterns:

Select question to reveal the answer:
Why should you handle errors inside each async function rather than at the gather() level?

If you handle errors at the gather() level without return_exceptions=True, one failure aborts all remaining tasks. By handling errors inside each function, you let all tasks complete and collect partial results. This is critical for graceful degradation in production applications.

What is the difference between time.sleep() and asyncio.sleep()?

time.sleep() blocks the entire Python process, including the event loop. asyncio.sleep() yields control to the event loop, allowing other tasks to run while waiting. Never use time.sleep() in async code—it defeats the purpose of async by blocking everything.

How do you check if a result from gather(return_exceptions=True) is an exception?

Use isinstance(result, Exception) to check if a result is an exception. Successful results will be the actual data returned by the function, while failed tasks will be Exception objects. You can then handle each type appropriately.

8. Testing Async Code

pytest-asyncio for Async Test Functions

Testing async code requires some special setup. The pytest-asyncio plugin lets you write async test functions that pytest will run properly.

Install pytest-asyncio:

Shell
pip install pytest-asyncio
Basic Async Test
Python
import pytest
import httpx

async def fetch_weather(city):
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid=test"
        )
        response.raise_for_status()
        return response.json()

@pytest.mark.asyncio
async def test_fetch_weather():
    # This test makes a real API call
    result = await fetch_weather("London")
    assert "main" in result
    assert "temp" in result["main"]

The @pytest.mark.asyncio decorator tells pytest that this is an async test function. Pytest will run it in an event loop and wait for it to complete.

Sync vs Async Tests: Key Differences
  • Sync test: Call the function directly with no special markers.
  • Async test: Mark with @pytest.mark.asyncio and use await when calling the coroutine.
  • Common error: If you see RuntimeError: no running event loop, it usually means you forgot the @pytest.mark.asyncio marker or are mixing sync and async incorrectly.

Mocking Async API Calls with pytest-mock

For unit tests, you should mock API calls rather than making real network requests. The pytest-mock plugin works with async functions. Python 3.8+ includes AsyncMock in the standard library unittest.mock, making async mocking straightforward.

Mocking an Async HTTP Call
Python
import pytest
from unittest.mock import AsyncMock
import httpx

async def fetch_weather(city):
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://api.openweathermap.org/data/2.5/weather?q={city}"
        )
        response.raise_for_status()
        return response.json()

@pytest.mark.asyncio
async def test_fetch_weather_mocked(mocker):
    # Create a mock response
    mock_response = AsyncMock()
    mock_response.json.return_value = {
        "main": {"temp": 280.15},
        "weather": [{"description": "clear sky"}],
    }
    mock_response.raise_for_status.return_value = None
    
    # Mock the httpx client.get method
    mocker.patch("httpx.AsyncClient.get", return_value=mock_response)
    
    result = await fetch_weather("London")
    
    assert result["main"]["temp"] == 280.15
    assert result["weather"][0]["description"] == "clear sky"

By mocking the HTTP call, your test runs instantly and does not depend on external services. This makes your test suite fast and reliable.

💡 AsyncMock Availability

AsyncMock is available in the standard library unittest.mock starting from Python 3.8. If you are using Python 3.8+, you can import it directly without additional dependencies. pytest-mock handles the wrapping automatically, but you can also use the standard library directly.

Testing Concurrent Operations and Race Conditions

When testing async code that runs multiple tasks concurrently, you should verify that all tasks complete successfully and that there are no race conditions.

Testing Concurrent Fetches
Python
import pytest
import asyncio
from unittest.mock import AsyncMock

async def fetch_multiple(urls):
    async def fetch_one(url):
        await asyncio.sleep(0.1)  # Simulate network delay
        return {"url": url, "data": "response"}
    
    results = await asyncio.gather(*[fetch_one(url) for url in urls])
    return results

@pytest.mark.asyncio
async def test_concurrent_fetches():
    urls = [f"https://api.example.com/{i}" for i in range(5)]
    
    results = await fetch_multiple(urls)
    
    # Verify all requests completed
    assert len(results) == 5
    
    # Verify results are in correct order
    for i, result in enumerate(results):
        assert result["url"] == urls[i]

This test verifies that concurrent operations complete successfully and return results in the expected order. For more complex tests, you might verify that a semaphore correctly limits concurrency or that rate limiting spreads requests over time.

Measuring Async Performance Improvements

To prove that async provides value, you should measure actual performance improvements. Compare the time taken for sync versus async implementations.

Benchmarking Sync vs Async
Python
import time
import asyncio
import pytest

def sync_fetch(url):
    time.sleep(0.5)  # Simulate network delay
    return f"result from {url}"

async def async_fetch(url):
    await asyncio.sleep(0.5)  # Simulate network delay
    return f"result from {url}"

def test_sync_performance():
    urls = [f"https://api.example.com/{i}" for i in range(10)]
    
    start = time.time()
    results = [sync_fetch(url) for url in urls]
    elapsed = time.time() - start
    
    assert len(results) == 10
    print(f"Sync: {elapsed:.2f} seconds")  # ~5 seconds

@pytest.mark.asyncio
async def test_async_performance():
    urls = [f"https://api.example.com/{i}" for i in range(10)]
    
    start = time.time()
    results = await asyncio.gather(*[async_fetch(url) for url in urls])
    elapsed = time.time() - start
    
    assert len(results) == 10
    print(f"Async: {elapsed:.2f} seconds")  # ~0.5 seconds

Run these tests with pytest -s to see the printed times. The async version should be roughly 10x faster because all requests run concurrently instead of sequentially.

9. Project: Async News Aggregator Upgrade

Converting the News Aggregator to Async

Back in Chapter 11 you built a synchronous News Aggregator that fetched headlines from NewsAPI, The Guardian, and Hacker News. That version worked well but made one request at a time and took several seconds to complete. In this section, you will upgrade that same project to async and see how much faster it becomes when you hit multiple news APIs concurrently.

The async version will:

  • Fetch from NewsAPI, The Guardian, and Hacker News concurrently.
  • Handle partial failures gracefully (if one source is down, show the others).
  • Implement rate limiting to respect API quotas.
  • Store results in an SQLite database using async database operations.

By the end of this project, you will have a production-ready async news aggregator that demonstrates measurable performance improvements over the synchronous version from Chapter 11.

Fetching from Multiple Sources Concurrently

Start by building async functions for each news source. Each function should handle its own errors and return a consistent data structure.

Async News Fetchers
Python
import asyncio
import httpx
import os
from datetime import datetime

NEWSAPI_KEY = os.environ.get("NEWSAPI_KEY", "")
GUARDIAN_KEY = os.environ.get("GUARDIAN_KEY", "")

async def fetch_newsapi(client):
    try:
        response = await client.get(
            "https://newsapi.org/v2/top-headlines",
            params={"country": "us", "apiKey": NEWSAPI_KEY},
            timeout=5.0,
        )
        response.raise_for_status()
        data = response.json()
        
        articles = []
        for item in data.get("articles", [])[:10]:
            articles.append({
                "source": "NewsAPI",
                "title": item.get("title", "No title"),
                "url": item.get("url"),
                "published_at": item.get("publishedAt"),
            })
        return {"success": True, "source": "NewsAPI", "articles": articles}
    except Exception as exc:
        print(f"Failed to fetch NewsAPI: {exc}")
        return {"success": False, "source": "NewsAPI", "articles": []}

async def fetch_guardian(client):
    try:
        response = await client.get(
            "https://content.guardianapis.com/search",
            params={"api-key": GUARDIAN_KEY, "page-size": 10},
            timeout=5.0,
        )
        response.raise_for_status()
        data = response.json()
        
        articles = []
        for item in data.get("response", {}).get("results", []):
            articles.append({
                "source": "Guardian",
                "title": item.get("webTitle", "No title"),
                "url": item.get("webUrl"),
                "published_at": item.get("webPublicationDate"),
            })
        return {"success": True, "source": "Guardian", "articles": articles}
    except Exception as exc:
        print(f"Failed to fetch Guardian: {exc}")
        return {"success": False, "source": "Guardian", "articles": []}

async def fetch_hackernews(client):
    try:
        # First get top story IDs
        response = await client.get(
            "https://hacker-news.firebaseio.com/v0/topstories.json",
            timeout=5.0,
        )
        response.raise_for_status()
        story_ids = response.json()[:10]
        
        # Fetch each story concurrently
        async def fetch_story(story_id):
            resp = await client.get(
                f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"
            )
            return resp.json()
        
        stories = await asyncio.gather(*[fetch_story(sid) for sid in story_ids])
        
        articles = []
        for story in stories:
            if story and story.get("url"):
                articles.append({
                    "source": "HackerNews",
                    "title": story.get("title", "No title"),
                    "url": story.get("url"),
                    "published_at": datetime.fromtimestamp(story.get("time", 0)).isoformat(),
                })
        
        return {"success": True, "source": "HackerNews", "articles": articles}
    except Exception as exc:
        print(f"Failed to fetch Hacker News: {exc}")
        return {"success": False, "source": "HackerNews", "articles": []}

Each fetcher handles errors internally and returns a consistent structure with success, source, and articles fields. This makes it easy to aggregate results from multiple sources.

Aggregating Results with Rate Limiting

Now combine all fetchers into a single aggregation function that runs them concurrently and collects results:

Main Aggregation Function
Python
import asyncio
import httpx
import time

async def aggregate_news():
    start = time.time()
    
    async with httpx.AsyncClient() as client:
        # Fetch all sources concurrently
        results = await asyncio.gather(
            fetch_newsapi(client),
            fetch_guardian(client),
            fetch_hackernews(client),
        )
    
    elapsed = time.time() - start
    
    # Collect all articles from successful sources
    all_articles = []
    successful_sources = []
    failed_sources = []
    
    for result in results:
        if result["success"]:
            all_articles.extend(result["articles"])
            successful_sources.append(result["source"])
        else:
            failed_sources.append(result["source"])
    
    print(f"Fetched {len(all_articles)} articles from {len(successful_sources)} sources in {elapsed:.2f}s")
    if failed_sources:
        print(f"Failed sources: {', '.join(failed_sources)}")
    
    return all_articles

if __name__ == "__main__":
    articles = asyncio.run(aggregate_news())
    for article in articles[:5]:
        print(f"{article['source']}: {article['title']}")

This function typically completes in under 1 second, compared to 3-5 seconds for the synchronous version. The performance gain comes from fetching all sources simultaneously.

Async Database Writes with aiosqlite

To complete the async upgrade, use aiosqlite for non-blocking database operations:

Shell
pip install aiosqlite
Saving Articles to Database (Async)
Python
import aiosqlite
from datetime import datetime

async def init_database():
    async with aiosqlite.connect("news.db") as db:
        await db.execute(
            """
            CREATE TABLE IF NOT EXISTS articles (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                source TEXT NOT NULL,
                title TEXT NOT NULL,
                url TEXT UNIQUE,
                published_at TEXT,
                fetched_at TEXT NOT NULL
            )
            """
        )
        await db.commit()

async def save_articles(articles):
    async with aiosqlite.connect("news.db") as db:
        fetched_at = datetime.utcnow().isoformat()
        
        for article in articles:
            try:
                await db.execute(
                    """
                    INSERT OR IGNORE INTO articles
                        (source, title, url, published_at, fetched_at)
                    VALUES (?, ?, ?, ?, ?)
                    """,
                    (
                        article["source"],
                        article["title"],
                        article["url"],
                        article["published_at"],
                        fetched_at,
                    ),
                )
            except Exception as exc:
                print(f"Failed to save article: {exc}")
        
        await db.commit()
        print(f"Saved {len(articles)} articles to database")

async def main():
    await init_database()
    articles = await aggregate_news()
    await save_articles(articles)

if __name__ == "__main__":
    asyncio.run(main())

The aiosqlite library provides an async interface to SQLite. Database writes now happen without blocking the event loop, which is important if you are fetching and saving articles repeatedly in a web application.

⚠️ SQLite Concurrency Limitation

SQLite writes are sequential even with async—only one write at a time. The async interface prevents blocking the event loop during writes, but SQLite itself still processes writes serially.

For high-concurrency applications with heavy write loads, consider using PostgreSQL with asyncpg or MySQL with aiomysql. SQLite works great for read-heavy workloads or moderate write rates, which makes it perfect for this news aggregator use case.

Measuring Performance: Sync vs Async

To prove the value of async, run both versions and compare the execution times:

Performance Comparison Script
Python
import time
import asyncio
import requests
import httpx

def sync_aggregator():
    start = time.time()
    
    # Fetch synchronously
    results = []
    for url in [
        "https://newsapi.org/v2/top-headlines?country=us&apiKey=...",
        "https://content.guardianapis.com/search?api-key=...",
        "https://hacker-news.firebaseio.com/v0/topstories.json",
    ]:
        try:
            response = requests.get(url, timeout=5.0)
            results.append(response.json())
        except Exception:
            pass
    
    elapsed = time.time() - start
    print(f"Synchronous: {elapsed:.2f} seconds")
    return results

async def async_aggregator():
    start = time.time()
    
    async with httpx.AsyncClient(timeout=5.0) as client:
        tasks = [
            client.get("https://newsapi.org/v2/top-headlines?country=us&apiKey=..."),
            client.get("https://content.guardianapis.com/search?api-key=..."),
            client.get("https://hacker-news.firebaseio.com/v0/topstories.json"),
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    elapsed = time.time() - start
    print(f"Asynchronous: {elapsed:.2f} seconds")
    return results

if __name__ == "__main__":
    print("Testing synchronous version...")
    sync_aggregator()
    
    print("\nTesting async version...")
    asyncio.run(async_aggregator())
Output
Testing synchronous version...
Synchronous: 2.34 seconds

Testing async version...
Asynchronous: 0.51 seconds

The async version is typically 4-5x faster than the synchronous version. As you add more sources, the difference grows: 10 sources might take 8 seconds sync but still under 1 second async.

10. Chapter Summary and Review

In this chapter you learned how to dramatically improve the performance of multi-API applications using Python's async programming capabilities. You saw how synchronous code wastes time waiting for network responses, and how async code can run dozens of requests concurrently in the time it takes to run one.

You converted synchronous code to async by changing def to async def, using await for I/O operations, and replacing requests with httpx. You learned how to use asyncio.gather() to run multiple tasks concurrently, how to limit concurrency with semaphores, and how to implement rate limiting to respect API quotas.

You saw how to handle errors gracefully in async code, ensuring that one failure does not abort an entire batch of requests. You used return_exceptions=True to collect both successful results and exceptions, and you implemented retry logic with exponential backoff that works without blocking the event loop.

You learned how to test async code with pytest-asyncio, mock async HTTP calls, and measure performance improvements. Finally, you converted your news aggregator from Chapter 11 to async, demonstrating real world performance gains: what took 2-3 seconds synchronously now completes in under a second.

The important lesson is not just that async is faster. It is that async is the right tool when you need to fetch from many independent sources quickly. You learned when to use async (many I/O-bound operations) and when to avoid it (simple scripts or CPU-heavy tasks). Professional developers use async strategically, not everywhere.

In Part V, you will see how these async patterns, webhooks from Chapter 22, and file handling from Chapter 21 fit into larger production systems. You will learn how to scale them safely with proper databases, build your own APIs, and deploy everything to production with confidence.

Key Skills Mastered

1.

Understanding When Async Provides Value

You can explain the performance difference between sync and async code, and you know when async is worth the added complexity. You understand that async is for I/O-bound work, not CPU-bound work, and you can measure whether async actually improves performance in your specific use case.

2.

Converting Sync Code to Async

You can convert synchronous API code to async by changing function definitions, replacing requests with httpx, and adding await keywords. You know how to manage async context managers and reuse HTTP clients across multiple requests for efficiency.

3.

Concurrent Request Patterns

You can use asyncio.gather() to run multiple async operations concurrently. You know how to limit concurrency with semaphores, implement rate limiting to respect API quotas, and track progress with as_completed(). You can run dozens of requests at once while staying within provider limits.

4.

Error Handling in Async Code

You can handle errors gracefully in async functions, ensuring that partial failures do not abort entire batches. You know when to use return_exceptions=True, how to implement async retry logic with exponential backoff, and how to design systems that degrade gracefully when some sources fail.

5.

Testing Async Code

You can write async test functions with pytest-asyncio, mock async API calls, and measure performance improvements. You understand how to test concurrent operations and verify that your async code produces correct results in the right order.

6.

Production Async Applications

You built a real world async news aggregator that fetches from multiple APIs concurrently, handles partial failures, implements rate limiting, and stores results in a database. You demonstrated measurable performance improvements: 4-5x faster than the synchronous version, with the difference growing as you add more sources. You know how to structure async code so it can be deployed behind ASGI servers like uvicorn or gunicorn with async workers. Deployment details live in Chapter 20, but your async code is ready for production traffic.

Quick Reference: Sync → Async Conversion

Async Conversion Cheat Sheet
Synchronous Asynchronous
def function(): async def function():
import requests import httpx
requests.get(url) await client.get(url)
time.sleep(n) await asyncio.sleep(n)
with open(...) as f: async with aiofiles.open(...) as f:
[func(x) for x in items] await gather(*[func(x) for x in items])
function() asyncio.run(function())

Chapter Review Quiz

Use these questions to check your understanding. If you can answer them confidently, you have mastered async at the level needed for this book:

Select question to reveal the answer:
Why is async code faster when fetching from multiple APIs?

Async code runs multiple requests concurrently instead of sequentially. While one request waits for a network response, the event loop switches to other requests. Total time is roughly equal to the slowest request, not the sum of all requests. Synchronous code blocks on each request, so total time is the sum of all individual request times.

When should you avoid using async in your code?

Avoid async if you are only making one or two API calls, if your code is already fast enough, or if your team is unfamiliar with async patterns. Async adds complexity and is only worth it when you have many I/O-bound operations. For CPU-heavy tasks, async provides no benefit because the bottleneck is computation, not waiting.

What happens if you use time.sleep() instead of asyncio.sleep() in async code?

time.sleep() blocks the entire event loop, defeating the purpose of async. All other tasks stop and wait. asyncio.sleep() yields control to the event loop, allowing other tasks to run while waiting. Always use asyncio.sleep() in async functions.

How do you handle a situation where you need to limit the rate of API calls?

Use a rate limiter that tracks when requests were made and delays new requests if you would exceed the limit. A simple implementation stores timestamps of recent calls and sleeps until the oldest call expires if you have reached the maximum. Libraries like aiolimiter provide production-ready rate limiting for async code.

Why should you handle errors inside async functions rather than only at the gather level?

If you handle errors only at the gather() level, one exception aborts all remaining tasks (unless you use return_exceptions=True). Handling errors inside each function lets all tasks complete and return either success or failure. This enables graceful degradation: show what succeeded even if some requests failed.

Looking Forward

You've mastered async programming for API calls, transforming slow sequential operations into blazing-fast concurrent ones. In the next chapter, Chapter 24: From SQLite to PostgreSQL, you'll learn when and how to graduate from SQLite to a production-scale database. You'll set up PostgreSQL, migrate your existing schemas, and discover powerful features like JSONB columns and full-text search that SQLite can't offer.

Chapter 24 builds directly on the database skills from Chapter 15 and the application patterns from your projects. You'll understand the architectural limits that make SQLite unsuitable for concurrent web applications, plan a systematic migration strategy, and execute it without losing data. By the end, your applications will be backed by a database engine that handles production workloads.

The async skills you learned here pair naturally with PostgreSQL's connection pooling and concurrent query support. Fast async data fetching combined with a production database gives your applications the performance foundation they need at scale.