Streaming data with Flask and Fetch + the Streams API

streaming data

This post is somewhat related to my previous post on point in polygon queries in MongoDB.

It's related in the sense that in the same project, we needed to be able to fetch a large number of points from the API and display them in the browser.

500,000 data points

lots of points

We needed to be able to handle datasets with at least 500,000 points.

  • Sometimes all of these points needed to be returned and displayed in the browser.
  • Or when we only returned the points within a polygon, a subset of the points needed to be returned, which could still be a lot of data.

The rough first draft solution involved making paginated, sequential requests to the API endpoint. We had to paginate the data because it was too large to fit into a single response.

from 2-3 minutes

This sometimes took up to 2 or 3 minutes to load all of the points, when there were 100,000 or more.

So then we tried a few things to speed it up, including parallelizing the requests, but even then each request was taking about 20 seconds.

down to 5 seconds

We were ultimately able to load all of the data in around 5 seconds by refactoring the endpoint to implement streaming, along with some other related optimizations in this post.

HTTP Streaming

When you think about streaming, you might first think of websockets, but we didn't have to use websockets here.

Instead, we just had to make a few changes to the existing REST endpoint to make use of HTTP streaming.

No external dependencies

We also don't need to use any additional external dependencies on the front or back end for this.

One API call

one_api_request.png

One nice thing is that streaming the data involves only a single HTTP transaction.

Previously one of our optimizations was to parallelize the API requests, along with implementing pagination, and we were sometimes making 10-20 requests, depending on how much data there was.

This HTTP heading is what allows for streaming in a single HTTP transaction:

Transfer-Encoding: chunked

Using streaming, we only need to make one API request, which also turned out to be a huge benefit due to some other issues we were having that I won't go into in this post.

Demo: Populating a grid of points

We're going to visualize populating a 50 x 50 grid of points, from data stored in MongoDB - similar to my previous post.

This visualization was made with D3.js.

The streaming happens pretty quickly, but I slowed it down a bit here in order to visualize the process.

Each point is added to the grid as it is received from the API.

Demo points data

The points are generated like this:

width = 50
height = 50
points = []
count = 0
for x in range(width):
    for y in range(height):
        points.append({'pt': [x,y], 'id': count})
        count += 1

Where each point looks like this:

{'pt': [x,y], 'id': count}

We've got the x,y coordinates for each point, along with an id that is not really necessary for this demo, but it was a unique data key for each item.

Flask streaming endpoint

Flask supports streaming right out of the box, and it's pretty straightforward to implement.

This endpoint queries MongoDB for points data and returns each object as it is processed, instead of one big chunk of data.

@app.route('/get_points')
def get_index():
    def process_points():
        points_cursor = db.coords.find({}, {'_id': 0})
        for point in points_cursor:
            data = json.dumps(point)
            #adding a newline delimiter
            data = ''.join([data,'\n'])
            yield data
    return process_points(), {"Content-Type": "application/x-ndjson"}

There is a generator process_points within the endpoint, which is passed to the response object.

Note the yield statement, which is what makes it a generator.

Within that, I've queried MongoDB for all of the points. This returns a cursor, which is also like a generator.

Iterate through the cursor to process each result one by one.

A generator is useful in this case, because it doesn't load all of the data into memory - only each item as it is processed.

I've basically created newline-delimited JSON(see the content type) with these two lines:

data = json.dumps(point)
data = ''.join([data,'\n'])

And then each object is returned to the browser in a stream.

Fetch + Streams API

Now it's time to process the stream on the front end.

We can send a Fetch request like usual, and then process the chunked response using the Streams API.

async function fetchData(url){
    let result = '';
    const decoder = new TextDecoder();
    const response = await fetch(url);

    for await (const chunk of response.body){
        result += decoder.decode(chunk, {stream:true});
        const lines = result.split('\n');
        result = lines.pop() || '';

        for(const line of lines){
            //adding points one by one to D3 here
            update([JSON.parse(line)]);
        }
    }
}

Note: the update function is related to the visualization in D3.js - it adds each point to the grid. You can find the code from my previous post here on github.

The body of a response object provides an instance of ReadableStream which is a stream of byte data that we then decode to convert to strings.

So you can see that the code iterates through the chunks of response.body.

Chunks can be unpredictable

Processing this JSON data is not entirely straightforward, mainly because you can't be certain that the chunks will all come through exactly the same as they are sent from the API. Each JSON object won't necessarily be transferred neatly in its own chunk.

For example, maybe this data item:

{"pt": [0, 11], "id": 11}

is broken up and comes through in two pieces like this

{"pt": [0, 11], "id

and

": 11}

So that is why we concatenate each new chunk with the result variable and then split it all up by the newline character \n to get each JSON object.

We've used the newline delimiter in the API in this case to make sure that each JSON object can be split up appropriately when we process it on the front end.

You could use another delimiter as well, it just needs to be something that you can use to cleanly break up the stream if you need to process each object.


Saving space and/or time

Streaming has a couple of main benefits.

1. Saving space

This was the main benefit in our case. For various reasons that we were not completely able to pinpoint, there was a lot of network latency and other processing costs with the original endpoint implementation.

Even when the API calls were parallelized, they were taking 20 seconds.

Part of the refactoring to use streaming was to not load all of the data to be returned from the database query into memory.

Previously we had been loading it all into memory before returning it in the response. There were no memory errors or anything overt, but it seemed to be a contributing factor.

If you have a lot of data to process, it can be a good idea to use a generator to iterate through the data and process one item at a time in memory.


2. Saving time

If you don't need all of your data to load before rendering it, then you can process or visualize it in chunks as it comes in.

In the real project I've been referring to, we actually did need to wait for all of the data to come in before visualizing it, but in this demo visualization you can see that the point data is processed one by one and added to the visualization as it is processed.

Thanks for reading!

Have you used streaming in any of your projects?

Tagged In
blog comments powered by Disqus

Recent Posts

mortonzcurve.png
Computing Morton Codes with a WebGPU Compute Shader
May 29, 2024

Starting out with general purpose computing on the GPU, we are going to write a WebGPU compute shader to compute Morton Codes from an array of 3-D coordinates. This is the first step to detecting collisions between pairs of points.

Read More
webgpuCollide.png
WebGPU: Building a Particle Simulation with Collision Detection
May 13, 2024

In this post, I am dipping my toes into the world of compute shaders in WebGPU. This is the first of a series on building a particle simulation with collision detection using the GPU.

Read More
abstract_tree.png
Solving the Lowest Common Ancestor Problem in Python
May 9, 2023

Finding the Lowest Common Ancestor of a pair of nodes in a tree can be helpful in a variety of problems in areas such as information retrieval, where it is used with suffix trees for string matching. Read on for the basics of this in Python.

Read More
Get the latest posts as soon as they come out!