How to Use Broadway in Your Elixir Application

Posted by Alex Koutmos on Tuesday, January 7, 2020

Contents

How to Use Broadway in Your Elixir Application

In today's post, we will be covering the Elixir library Broadway. Broadway is a library which is maintained by the kind folks over at Plataformatec and allows us to create highly concurrent data processing pipelines with relative ease. After an overview of how Broadway works and when to use it, we'll dive into a sample project where we leverage Broadway to fetch temperature data from https://openweathermap.org/ in order to find the coldest city on earth. Our Broadway pipeline will loop over a list of all the cities in the world from a GenStage producer that we write. Let's jump right in!

What is Broadway and when would you use it?

Broadway allows us to create highly concurrent data processing pipelines largely thanks to how it is built upon GenStage. GenStage is another Elixir library that is used to construct an event/message exchange between processes. Specifically, GenStage provides the behaviors necessary to create coordinated producer and consumer processes and ensure that the event pipeline is never flooded. Consumers subscribe to upstream producers, and then demand for messages when they are free to do work. With this model, we are able to scale the number of consumer processes as is applicable for the problem at hand to achieve the performance characteristics we desire.

The primary problem with using GenStage directly for a production grade application is that the onus is on the developer to create the proper supervision tree to ensure that failures are handled properly. This is exactly where Broadway comes into play. Broadway provides the necessary abstractions on top of GenStage that you would leverage in a production context such as:

  • Rate limiting
  • Batching
  • Ordering and partitioning
  • Automatic restarts
  • Graceful shutdowns
  • Automatic message acknowledgement

In addition to the aforementioned features, Broadway also has several supporting libraries which allow you to use a message queue service as a Broadway producer (some maintained by Plataformatec and some community maintained). As of this writing, there are official Broadway Producers available for Amazon SQS, Google Pub/Sub and RabbitMQ (https://github.com/plataformatec/broadway#official-broadway-producers). Support for Kafka is currently underway, but a release has not yet been cut for the project (https://github.com/plataformatec/broadway_kafka). The benefit of using these Broadway Producers is that it abstracts away the trouble that comes along with managing a persistent and valid connection to the data source and provides a convenient way to use these data sources as the entry point into your data processing pipeline.

All this is well and good, but when should you reach for a tool like Broadway? Broadway is a useful tool when the task at hand is embarrassingly parallel (https://en.wikipedia.org/wiki/Embarrassingly_parallel) and spawning more processes yields positive results. It is also useful when you have an ETL (extract, transform and load) pipeline and want to breakup the problem into discrete components and scale them independently. One example would be processing user image uploads. Increasing the number of workers will allow you to process more image uploads, and you can disconnect the image upload from the HTTP request/response cycle by leverage a queuing system like RabbitMQ, SQS, etc.

How does Broadway work internally?

As previously mentioned, Broadway leverages GenStage in order to orchestrate the event pipelines. If you look at the Broadway.Producer module (https://github.com/plataformatec/broadway/blob/master/lib/broadway/producer.ex) you'll notice the line use GenStage up at the top. If you read further down into the module, you find implementations for the handle_subscribe/4, handle_demand/2, handle_cancel/3, and handle_call/3, callbacks (to name a few) that are defined in the GenStage behaviour. The magic of Broadway comes into play in how it automatically creates a supervision tree that ensures that your pipeline is fault tolerant and reliable. Below is the supervision tree from the world temperature application that we will be writing shortly:

Let's break the supervision tree down step by step. At the top of our tree we have our project supervisor, WorldTemp.Supervisor, which is defined in our application.ex file. From there we'll turn our focus to WorldTemp.TempProcessor as that is our Broadway module that we will be creating (WorldTemp.CityProducer and WorldTemp.TempTracker are supporting GenStage and GenServer modules, respectively). Going down the supervision tree, we get to WorldTemp.TempProcessor.Broadway.Supervisor. This supervisor is responsible for monitoring all the various components of Broadway and restarting them if any errors occur. The processes that WorldTemp.TempProcessor.Broadway.Supervisor supervises are listed below along with their purpose:

  • WorldTemp.TempProcessor.Broadway.ProducerSupervisor: This supervisor is responsible for monitoring the data producer processes. This supervision tree has a strategy of :one_for_one as it only needs to restart the particular data producer that is experiencing issues. All other producers can keep running if everything is okay.
  • WorldTemp.TempProcessor.Broadway.ProcessSupervisor: This supervisor is responsible for monitoring the worker processes that consume data from the producers. This supervision tree has a strategy of :one_for_all. The reason for this is not also :one_for_one is that the processing callback functions that we write should be stateless and any errors that occur can be handled without crashing the process. If an error does occur to crash the process, it is likely that some internal bookkeeping related to Broadway has gone awry and all the consumers need to be restarted.
  • WorldTemp.TempProcessor.Broadway.Terminator: This process is responsible for the proper stoppage of your Broadway pipeline. It will notify all the consumer processes that they should not resubscribe to producers once they terminate and it also notifies all the producers to flush all of their current events and ignore any subsequent data requests.
  • WorldTemp.TempProcessor.RateLimiter: This process is optionally started if your pipeline requires a rate limiter. It is effectively a token bucket (https://en.wikipedia.org/wiki/Token_bucket) rate limiter and throttles how much work your processors can perform within a configurable time interval.

It is also important to note that the WorldTemp.TempProcessor.Broadway.Supervisor has a supervision policy of :rest_for_one. The reason for this being that if the producer supervision tree crashes, the parent supervisor can restart all the subsequent supervision trees and restore the pipeline back to a working fresh state.

Hands on with Broadway

As previously mentioned, we will be creating a very simple Broadway based application which will read weather data from an API and then keep a running record of the coldest city on earth. We will also be using a new feature to Broadway (rate limiting) to ensure that we don't go over our free plan limit on https://openweathermap.org.

With that being said, let's jump right in! Start off by creating a new project using mix new world_temp --sup. You'll want to use the --sup flag for convenience to setup the root application supervision tree. With your new project created, you want to change into the project directory and open up the mix.exs file. Update the dependencies to look like the following (we'll lock to a specific git SHA as a release with rate limiting has yet been cut):

defp deps do
  [
    {:httpoison, "~> 1.6"},
    {:jason, "~> 1.1"},
    {:broadway, github: "plataformatec/broadway", tag: "08497708e10867935f2e92351e4cde9e4a57135e"}
  ]
end

To fetch your dependencies run mix deps.get in your project directory. From there, we'll want to create the file lib/city_producer.ex which will act as our data producer for our Broadway pipeline. For the purposes of this sample project, this data producer will be a GenStage based module, but you can use the other Broadway producers if that better serves your needs (Kafka, RabbitMQ, SQS, etc). In the lib/city_producer file, add the following:

defmodule WorldTemp.CityProducer do
  use GenStage

  require Logger

  def start_link(_args) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_args) do
    {:producer, city_list()}
  end

  # If the demand is greater than the state of the GenState process,
  # readd the city list and reprocess
  def handle_demand(demand, state) when demand > length(state) do
    handle_demand(demand, state ++ city_list())
  end

  # Enough data is available in the GenStage's state, serve that to
  # the consumer
  def handle_demand(demand, state) do
    {to_dispatch, remaining} = Enum.split(state, demand)

    {:noreply, to_dispatch, remaining}
  end

  # List of cities for which to get weather data
  defp city_list do
    [
      {"Abu Dhabi", "United Arab Emirates"},
      {"Abuja", "Nigeria"},
      {"Accra", "Ghana"},
      {"Adamstown", "Pitcairn Islands"},
      {"Addis Ababa", "Ethiopia"},
      {"Algiers", "Algeria"},
      {"Alofi", "Niue"},
      ...
      # For full contents of this function go to Github project
      # https://github.com/akoutmos/world_temp
    ]
  end
end

Next we'll want to create a module that fetches weather data from the OpenWeatherMap API. You'll want to go to https://openweathermap.org/ and create a free account to get an API key. Go ahead and create lib/temp_fetcher.ex with the following contents:

defmodule WorldTemp.TempFetcher do
  require Logger

  @api_key "YOU_API_KEY_GOES_HERE"

  def fetch_data(city, country) do
    city
    |> generate_url(country)
    |> HTTPoison.get()
    |> handle_response()
  end

  defp handle_response({:ok, %HTTPoison.Response{status_code: 200, body: body}}) do
    body
    |> Jason.decode!()
    |> get_in(["main", "temp"])
  end

  defp handle_response(resp) do
    Logger.warn("Failed to fetch temperature data: #{inspect(resp)}")

    :error
  end

  defp generate_url(city, country) do
    "http://api.openweathermap.org/data/2.5/weather?q=#{city},#{country}&appid=#{@api_key}"
  end
end

With that in place, we are going to need to create our Broadway consumer module to leverage this HTTP API wrapper. Create lib/temp_processor.ex with the following contents and we'll go over what it exactly it does:

defmodule WorldTemp.TempProcessor do
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {WorldTemp.CityProducer, []},
        transformer: {__MODULE__, :transform, []},
        rate_limiting: [
          allowed_messages: 60,
          interval: 60_000
        ]
      ],
      processors: [
        default: [concurrency: 5]
      ]
    )
  end

  @impl true
  def handle_message(:default, message, _context) do
    message
    |> Message.update_data(fn {city, country} ->
      city_data = {city, country, WorldTemp.TempFetcher.fetch_data(city, country)}
      WorldTemp.TempTracker.update_coldest_city(city_data)
    end)
  end

  def transform(event, _opts) do
    %Message{
      data: event,
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
  end

  def ack(:ack_id, _successful, _failed) do
    :ok
  end
end

The start_link/1 function defined the various options required by Broadway to orchestrate the data pipeline. You'll notice that our GenStage producer that we previously created is referenced in the producer keyword list section and we also define a rate limiter to ensure that we do not go over our free tier usage on OpenWeatherMap. The transformer entry invokes our transform/2 function at the bottom of the module which is required boilerplate to format our incoming messages to a %Broadway.Message{} struct. An important thing to note here is that we specify 5 concurrent processors (if you recall from the supervision tree image earlier there were 5 consumer processes). By changing that one value, you can determine how much concurrency/throughput you would like from your Broadway pipeline. The meat of our logic is in handle_message/3 where we retrieve our message (a city+country tuple), make our API call, and then update our rolling record of the coldest city.

Let's create that WorldTemp.TempTracker module now in lib/temp_tracker.ex with the following contents:

defmodule WorldTemp.TempTracker do
  use Agent

  def start_link(_) do
    Agent.start_link(fn -> nil end, name: __MODULE__)
  end

  def get_coldest_city do
    Agent.get(__MODULE__, fn {city, country, temp} ->
      "The coldest city on earth is currently #{city}, #{country} with a temperature of #{
        kelvin_to_c(temp)
      }°C"
    end)
  end

  def update_coldest_city(:error), do: nil

  def update_coldest_city({_, _, new_temp} = new_data) do
    Agent.update(__MODULE__, fn
      {_, _, orig_temp} = orig_data ->
        if new_temp < orig_temp, do: new_data, else: orig_data

      nil ->
        new_data
    end)
  end

  defp kelvin_to_c(kelvin), do: kelvin - 273.15
end

This module is a relatively straight forward Agent based module that allows for us to retrieve the current coldest city, or update it if the provided value is lower than the currently set value.

Lastly, we need to update our lib/world_temp/application.ex file and add a couple of items to our supervision tree. If you named your modules the same as I did, your application.ex file should look like this:

defmodule WorldTemp.Application do
  @moduledoc false

  use Application

  alias WorldTemp.{CityProducer, TempProcessor, TempTracker}

  def start(_type, _args) do
    children = [
      TempTracker,
      CityProducer,
      TempProcessor
    ]

    opts = [strategy: :one_for_one, name: WorldTemp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

With all that in place, we can run iex -S mix from the command line, and should be able to interact with our application:

iex(1) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Bern, Switzerland with a temperature of 1.8100000000000023°C"
iex(2) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Copenhagen, Denmark with a temperature of -0.37000000000000455°C"
iex(3) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Copenhagen, Denmark with a temperature of -0.37000000000000455°C"
iex(4) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Helsinki, Finland with a temperature of -1.8899999999999864°C"
iex(5) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Roseau, Dominica with a temperature of -18.99999999999997°C"

By calling WorldTemp.TempTracker.get_coldest_city() periodically we can see that as the Broadway processors work through the city+country list, the city with the coldest temperature changes. It may take a few minutes to run through the whole list given we are only processing 60 cities a minute and our list has a length of 216 elements.

Summary

As we can see from the sample application that we have written, with relative ease we were able to create a data processing pipeline using Broadway. With all of the abstractions given to us via Broadway, we can rest easy knowing that our pipeline will operate as intended and will recover from any issues if they arise. In addition, we have all the levers necessary to adjust the performance characteristics of our pipeline via some configuration. Thanks for sticking with me to the end and if you would like to learn more about Broadway, I suggest going through the following resources:


comments powered by Disqus