Contents
How to Use Broadway in Your Elixir Application
In today’s post, we will be covering the Elixir library Broadway. Broadway is a library which is maintained by the kind folks over at Plataformatec and allows us to create highly concurrent data processing pipelines with relative ease. After an overview of how Broadway works and when to use it, we’ll dive into a sample project where we leverage Broadway to fetch temperature data from https://openweathermap.org/ in order to find the coldest city on earth. Our Broadway pipeline will loop over a list of all the cities in the world from a GenStage producer that we write. Let’s jump right in!
What is Broadway and when would you use it?
Broadway allows us to create highly concurrent data processing pipelines largely thanks to how it is built upon GenStage. GenStage is another Elixir library that is used to construct an event/message exchange between processes. Specifically, GenStage provides the behaviors necessary to create coordinated producer and consumer processes and ensure that the event pipeline is never flooded. Consumers subscribe to upstream producers, and then demand for messages when they are free to do work. With this model, we are able to scale the number of consumer processes as is applicable for the problem at hand to achieve the performance characteristics we desire.
The primary problem with using GenStage directly for a production grade application is that the onus is on the developer to create the proper supervision tree to ensure that failures are handled properly. This is exactly where Broadway comes into play. Broadway provides the necessary abstractions on top of GenStage that you would leverage in a production context such as:
- Rate limiting
- Batching
- Ordering and partitioning
- Automatic restarts
- Graceful shutdowns
- Automatic message acknowledgement
In addition to the aforementioned features, Broadway also has several supporting libraries which allow you to use a message queue service as a Broadway producer (some maintained by Plataformatec and some community maintained). As of this writing, there are official Broadway Producers available for Amazon SQS, Google Pub/Sub and RabbitMQ (https://github.com/plataformatec/broadway#official-broadway-producers). Support for Kafka is currently underway, but a release has not yet been cut for the project (https://github.com/plataformatec/broadway_kafka). The benefit of using these Broadway Producers is that it abstracts away the trouble that comes along with managing a persistent and valid connection to the data source and provides a convenient way to use these data sources as the entry point into your data processing pipeline.
All this is well and good, but when should you reach for a tool like Broadway? Broadway is a useful tool when the task at hand is embarrassingly parallel (https://en.wikipedia.org/wiki/Embarrassingly_parallel) and spawning more processes yields positive results. It is also useful when you have an ETL (extract, transform and load) pipeline and want to breakup the problem into discrete components and scale them independently. One example would be processing user image uploads. Increasing the number of workers will allow you to process more image uploads, and you can disconnect the image upload from the HTTP request/response cycle by leverage a queuing system like RabbitMQ, SQS, etc.
How does Broadway work internally?
As previously mentioned, Broadway leverages GenStage in order to orchestrate the event pipelines. If you look at the
Broadway.Producer module (https://github.com/plataformatec/broadway/blob/master/lib/broadway/producer.ex) you’ll notice
the line use GenStage
up at the top. If you read further down into the module, you find implementations for the
handle_subscribe/4
, handle_demand/2
, handle_cancel/3
, and handle_call/3
, callbacks (to name a few) that are
defined in the GenStage behaviour. The magic of Broadway comes into play in how it automatically creates a supervision
tree that ensures that your pipeline is fault tolerant and reliable. Below is the supervision tree from the world
temperature application that we will be writing shortly:
Let’s break the supervision tree down step by step. At the top of our tree we have our project supervisor,
WorldTemp.Supervisor
, which is defined in our application.ex
file. From there we’ll turn our focus to
WorldTemp.TempProcessor
as that is our Broadway module that we will be creating (WorldTemp.CityProducer
and
WorldTemp.TempTracker
are supporting GenStage and GenServer modules, respectively). Going down the supervision tree,
we get to WorldTemp.TempProcessor.Broadway.Supervisor
. This supervisor is responsible for monitoring all the various
components of Broadway and restarting them if any errors occur. The processes that
WorldTemp.TempProcessor.Broadway.Supervisor
supervises are listed below along with their purpose:
WorldTemp.TempProcessor.Broadway.ProducerSupervisor
: This supervisor is responsible for monitoring the data producer processes. This supervision tree has a strategy of:one_for_one
as it only needs to restart the particular data producer that is experiencing issues. All other producers can keep running if everything is okay.WorldTemp.TempProcessor.Broadway.ProcessSupervisor
: This supervisor is responsible for monitoring the worker processes that consume data from the producers. This supervision tree has a strategy of:one_for_all
. The reason for this is not also:one_for_one
is that the processing callback functions that we write should be stateless and any errors that occur can be handled without crashing the process. If an error does occur to crash the process, it is likely that some internal bookkeeping related to Broadway has gone awry and all the consumers need to be restarted.WorldTemp.TempProcessor.Broadway.Terminator
: This process is responsible for the proper stoppage of your Broadway pipeline. It will notify all the consumer processes that they should not resubscribe to producers once they terminate and it also notifies all the producers to flush all of their current events and ignore any subsequent data requests.WorldTemp.TempProcessor.RateLimiter
: This process is optionally started if your pipeline requires a rate limiter. It is effectively a token bucket (https://en.wikipedia.org/wiki/Token_bucket) rate limiter and throttles how much work your processors can perform within a configurable time interval.
It is also important to note that the WorldTemp.TempProcessor.Broadway.Supervisor
has a supervision policy of
:rest_for_one
. The reason for this being that if the producer supervision tree crashes, the parent supervisor can
restart all the subsequent supervision trees and restore the pipeline back to a working fresh state.
Hands on with Broadway
As previously mentioned, we will be creating a very simple Broadway based application which will read weather data from an API and then keep a running record of the coldest city on earth. We will also be using a new feature to Broadway (rate limiting) to ensure that we don’t go over our free plan limit on https://openweathermap.org.
With that being said, let’s jump right in! Start off by creating a new project using mix new world_temp --sup
. You’ll
want to use the --sup
flag for convenience to setup the root application supervision tree. With your new project
created, you want to change into the project directory and open up the mix.exs
file. Update the dependencies to look
like the following (we’ll lock to a specific git SHA as a release with rate limiting has yet been cut):
defp deps do
[
{:httpoison, "~> 1.6"},
{:jason, "~> 1.1"},
{:broadway, github: "plataformatec/broadway", tag: "08497708e10867935f2e92351e4cde9e4a57135e"}
]
end
To fetch your dependencies run mix deps.get
in your project directory. From there, we’ll want to create the file
lib/city_producer.ex
which will act as our data producer for our Broadway pipeline. For the purposes of this sample
project, this data producer will be a GenStage based module, but you can use the other Broadway producers if that better
serves your needs (Kafka, RabbitMQ, SQS, etc). In the lib/city_producer
file, add the following:
defmodule WorldTemp.CityProducer do
use GenStage
require Logger
def start_link(_args) do
GenStage.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_args) do
{:producer, city_list()}
end
# If the demand is greater than the state of the GenState process,
# readd the city list and reprocess
def handle_demand(demand, state) when demand > length(state) do
handle_demand(demand, state ++ city_list())
end
# Enough data is available in the GenStage's state, serve that to
# the consumer
def handle_demand(demand, state) do
{to_dispatch, remaining} = Enum.split(state, demand)
{:noreply, to_dispatch, remaining}
end
# List of cities for which to get weather data
defp city_list do
[
{"Abu Dhabi", "United Arab Emirates"},
{"Abuja", "Nigeria"},
{"Accra", "Ghana"},
{"Adamstown", "Pitcairn Islands"},
{"Addis Ababa", "Ethiopia"},
{"Algiers", "Algeria"},
{"Alofi", "Niue"},
...
# For full contents of this function go to Github project
# https://github.com/akoutmos/world_temp
]
end
end
Next we’ll want to create a module that fetches weather data from the OpenWeatherMap API. You’ll want to go to
https://openweathermap.org/ and create a free account to get an API key. Go ahead and create lib/temp_fetcher.ex
with
the following contents:
defmodule WorldTemp.TempFetcher do
require Logger
@api_key "YOU_API_KEY_GOES_HERE"
def fetch_data(city, country) do
city
|> generate_url(country)
|> HTTPoison.get()
|> handle_response()
end
defp handle_response({:ok, %HTTPoison.Response{status_code: 200, body: body}}) do
body
|> Jason.decode!()
|> get_in(["main", "temp"])
end
defp handle_response(resp) do
Logger.warn("Failed to fetch temperature data: #{inspect(resp)}")
:error
end
defp generate_url(city, country) do
"http://api.openweathermap.org/data/2.5/weather?q=#{city},#{country}&appid=#{@api_key}"
end
end
With that in place, we are going to need to create our Broadway consumer module to leverage this HTTP API wrapper.
Create lib/temp_processor.ex
with the following contents and we’ll go over what it exactly it does:
defmodule WorldTemp.TempProcessor do
use Broadway
alias Broadway.Message
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {WorldTemp.CityProducer, []},
transformer: {__MODULE__, :transform, []},
rate_limiting: [
allowed_messages: 60,
interval: 60_000
]
],
processors: [
default: [concurrency: 5]
]
)
end
@impl true
def handle_message(:default, message, _context) do
message
|> Message.update_data(fn {city, country} ->
city_data = {city, country, WorldTemp.TempFetcher.fetch_data(city, country)}
WorldTemp.TempTracker.update_coldest_city(city_data)
end)
end
def transform(event, _opts) do
%Message{
data: event,
acknowledger: {__MODULE__, :ack_id, :ack_data}
}
end
def ack(:ack_id, _successful, _failed) do
:ok
end
end
The start_link/1
function defined the various options required by Broadway to orchestrate the data pipeline. You’ll
notice that our GenStage producer that we previously created is referenced in the producer keyword list section and we
also define a rate limiter to ensure that we do not go over our free tier usage on OpenWeatherMap. The transformer entry
invokes our transform/2
function at the bottom of the module which is required boilerplate to format our incoming
messages to a %Broadway.Message{}
struct. An important thing to note here is that we specify 5 concurrent processors
(if you recall from the supervision tree image earlier there were 5 consumer processes). By changing that one value, you
can determine how much concurrency/throughput you would like from your Broadway pipeline. The meat of our logic is in
handle_message/3
where we retrieve our message (a city+country tuple), make our API call, and then update our rolling
record of the coldest city.
Let’s create that WorldTemp.TempTracker
module now in lib/temp_tracker.ex
with the following contents:
defmodule WorldTemp.TempTracker do
use Agent
def start_link(_) do
Agent.start_link(fn -> nil end, name: __MODULE__)
end
def get_coldest_city do
Agent.get(__MODULE__, fn {city, country, temp} ->
"The coldest city on earth is currently #{city}, #{country} with a temperature of #{
kelvin_to_c(temp)
}°C"
end)
end
def update_coldest_city(:error), do: nil
def update_coldest_city({_, _, new_temp} = new_data) do
Agent.update(__MODULE__, fn
{_, _, orig_temp} = orig_data ->
if new_temp < orig_temp, do: new_data, else: orig_data
nil ->
new_data
end)
end
defp kelvin_to_c(kelvin), do: kelvin - 273.15
end
This module is a relatively straight forward Agent based module that allows for us to retrieve the current coldest city, or update it if the provided value is lower than the currently set value.
Lastly, we need to update our lib/world_temp/application.ex
file and add a couple of items to our supervision tree. If
you named your modules the same as I did, your application.ex
file should look like this:
defmodule WorldTemp.Application do
@moduledoc false
use Application
alias WorldTemp.{CityProducer, TempProcessor, TempTracker}
def start(_type, _args) do
children = [
TempTracker,
CityProducer,
TempProcessor
]
opts = [strategy: :one_for_one, name: WorldTemp.Supervisor]
Supervisor.start_link(children, opts)
end
end
With all that in place, we can run iex -S mix
from the command line, and should be able to interact with our
application:
iex(1) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Bern, Switzerland with a temperature of 1.8100000000000023°C"
iex(2) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Copenhagen, Denmark with a temperature of -0.37000000000000455°C"
iex(3) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Copenhagen, Denmark with a temperature of -0.37000000000000455°C"
iex(4) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Helsinki, Finland with a temperature of -1.8899999999999864°C"
iex(5) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Roseau, Dominica with a temperature of -18.99999999999997°C"
By calling WorldTemp.TempTracker.get_coldest_city()
periodically we can see that as the Broadway processors work
through the city+country list, the city with the coldest temperature changes. It may take a few minutes to run through
the whole list given we are only processing 60 cities a minute and our list has a length of 216 elements.
Summary
As we can see from the sample application that we have written, with relative ease we were able to create a data processing pipeline using Broadway. With all of the abstractions given to us via Broadway, we can rest easy knowing that our pipeline will operate as intended and will recover from any issues if they arise. In addition, we have all the levers necessary to adjust the performance characteristics of our pipeline via some configuration. Thanks for sticking with me to the end and if you would like to learn more about Broadway, I suggest going through the following resources:
- https://github.com/plataformatec/broadway
- https://hexdocs.pm/broadway/Broadway.html
- https://www.youtube.com/watch?v=IzFmNQGzApQ
comments powered by Disqus