Using Broadway in an Elixir application

In today’s post, we will be covering the Elixir library named Broadway . This library is maintained by the kind folks at Plataformatec and allows us to create highly concurrent data processing pipelines with relative ease.

After an overview of how Broadway works and when to use it, we’ll dive into a sample project where we’ll leverage Broadway to fetch temperature data from https://openweathermap.org/ in order to find the coldest city on earth. Our Broadway pipeline will loop over a list of all the cities in the world from a GenStage producer that we write.

Let’s jump right in!

What Is Broadway and When Should You Use It in Your Elixir App?

Broadway allows us to create highly concurrent data processing pipelines largely due to how it is built on top of GenStage. GenStage is another Elixir library that is used to construct an event/message exchange between processes. Specifically, GenStage provides the features necessary to create coordinated producer and consumer processes and ensure that the event pipeline is never flooded. Consumers subscribe to upstream producers and demand messages when they are free to do work. With this model, we can scale the number of consumer processes as is needed for the task at hand to achieve the performance we desire.

Issues with Using GenStage in Production

A major problem with using GenStage directly for a production-grade application is that the onus is on the developer to create the proper supervision tree to ensure that failures are handled properly. This is exactly where Broadway comes into play. Broadway provides the necessary abstractions on top of GenStage that you would leverage in a production context such as:

  • Rate limiting
  • Batching
  • Ordering and partitioning
  • Automatic restarts
  • Graceful shutdowns
  • Automatic message acknowledgment

Additional Supporting Libraries

In addition to the aforementioned features, Broadway also has several supporting libraries that allow you to use a message queue service such as a Broadway producer (some are maintained by Plataformatec and some are community maintained). At the time of writing this post, there are official Broadway Producers available for Amazon SQS, Google Pub/Sub and RabbitMQ. Support for Kafka is currently underway, but a release has not yet been cut for the project. The benefit of using Broadway Producers is that it abstracts away the problems that come along with managing a persistent and valid connection to the data source and provides a convenient way of using these data sources as the entry point into your data processing pipeline.

All this is well and good, but when should you reach for a tool like Broadway?

Broadway is a useful tool when the task at hand is embarrassingly parallel and spawning more processes yields positive results. It is also useful when you have an ETL (extract, transform and load) pipeline and want to break up the problem into discrete components and scale them independently. One example would be processing user image uploads. Increasing the number of workers will allow you to process more image uploads, and you can disconnect the image upload from the HTTP request/response cycle by leveraging a queuing system like RabbitMQ, SQS, etc.

How Does Broadway Work Internally?

As previously mentioned, Broadway leverages GenStage in order to orchestrate the event pipelines. If you look at the Broadway.Producer module , you’ll notice the line use GenStage at the top. If you read further down into the module, you’ll find implementation for the handle_subscribe/4 , handle_demand/2 , handle_cancel/3 , and handle_call/3 , callbacks (to name a few) that are defined in the GenStage behavior. The magic of Broadway comes into play in how it automatically creates a supervision tree that ensures that your pipeline is fault-tolerant and reliable. Below is the supervision tree from the world temperature application that we will be writing shortly:

Let’s break the supervision tree down step by step. At the top of our tree, we have our project supervisor— WorldTemp.Supervisor —which is defined in our application.ex file. From there, we turn our focus to WorldTemp.TempProcessor as that is the Broadway module that we’ll be creating ( WorldTemp.CityProducer and WorldTemp.TempTracker are supporting GenServers). Further down the supervision tree, we get to WorldTemp.TempProcessor.Broadway.Supervisor . This supervisor is responsible for monitoring all the various components of Broadway and restarting them if any errors occur. The processes that WorldTemp.TempProcessor.Broadway.Supervisor supervises are listed below along with their purpose:

  • WorldTemp.TempProcessor.Broadway.ProducerSupervisor : This supervisor is responsible for monitoring the data producer processes. This supervision tree has a strategy of :one_for_one as it only needs to restart the particular data producer that is experiencing issues. All other producers can keep running if everything is okay.
  • WorldTemp.TempProcessor.Broadway.ProcessSupervisor : This supervisor is responsible for monitoring the worker processes that consume data from the producers. This supervision tree has a strategy of :one_for_all . The reason that it’s not :one_for_one is that the processing callback functions that we write should be stateless and any errors that occur can be handled without crashing the process. If an error does occur to crash the process, it is likely that some internal bookkeeping related to Broadway has gone awry and all the consumers need to be restarted.
  • WorldTemp.TempProcessor.Broadway.Terminator : This process is responsible for the proper stoppage of your Broadway pipeline. It will notify all the consumer processes that they should not resubscribe to producers once they terminate and it also notifies all the producers to flush all of their current events and ignore any subsequent data requests.
  • WorldTemp.TempProcessor.RateLimiter : This process is optionally started if your pipeline requires a rate limiter. It is effectively a token bucket rate limiter and throttles how much work your processors can perform within a configurable time interval.

It is also important to note that the WorldTemp.TempProcessor.Broadway.Supervisor has a supervision policy of :rest_for_one . The reason for this is that if the producer supervision tree crashes, the parent supervisor can restart all the subsequent supervision trees and restore the pipeline back to a working fresh state.

Hands-on with Broadway

As previously mentioned, we will be creating a very simple Broadway based application which will read weather data from an API and then keep a running record of the coldest city on earth. We will also be using a new feature of Broadway (rate limiting) to ensure that we don’t go over our free plan limit on https://openweathermap.org .

With that being said, let’s jump right in. Start by creating a new project using mix new world_temp --sup . You’ll want to use the --sup flag for convenience to set up the root application supervision tree. With your new project created, you want to change into the project directory and open up the mix.exs file. Update the dependencies to look like the following (we’ll use my fork of Broadway as there is a pending issue with the rate limiter that I fixed in my fork):

defp deps do
  [
    {:httpoison, "~> 1.6"},
    {:jason, "~> 1.1"},
    {:broadway, github: "plataformatec/broadway", tag: "08497708e10867935f2e92351e4cde9e4a57135e"}
  ]
end

To fetch your dependencies, run mix deps.get in your project directory. From there, we’ll want to create the file lib/city_producer.ex which will act as the data producer for our Broadway pipeline. For the purposes of this sample project, this data producer will be a GenStage based module, but you can use the other Broadway producers if that better serves your needs (Kafka, RabbitMQ, SQS, etc). In the lib/city_producer file, add the following:

defmodule WorldTemp.CityProducer do
  use GenStage

  require Logger

  def start_link(_args) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_args) do
    {:producer, city_list()}
  end

  # If the demand is greater than the state of the GenState process,
  # readd the city list and reprocess
  def handle_demand(demand, state) when demand > length(state) do
    handle_demand(demand, state ++ city_list())
  end

  # Enough data is available in the GenStage's state, serve that to
  # the consumer
  def handle_demand(demand, state) do
    {to_dispatch, remaining} = Enum.split(state, demand)

    {:noreply, to_dispatch, remaining}
  end

  # List of cities for which to get weather data
  defp city_list do
    [
      {"Abu Dhabi", "United Arab Emirates"},
      {"Abuja", "Nigeria"},
      {"Accra", "Ghana"},
      {"Adamstown", "Pitcairn Islands"},
      {"Addis Ababa", "Ethiopia"},
      {"Algiers", "Algeria"},
      {"Alofi", "Niue"},
      ...
      # For full contents of this function go to Github project
      # https://github.com/akoutmos/world_temp
    ]
  end
end

Next, we’ll want to create a module that fetches weather data from the OpenWeatherMap API. You’ll want to go to https://openweathermap.org/ and create a free account to get an API key. Go ahead and create lib/temp_fetcher.ex with the following content:

defmodule WorldTemp.TempFetcher do
  require Logger

  @api_key "YOU_API_KEY_GOES_HERE"

  def fetch_data(city, country) do
    city
    |> generate_url(country)
    |> HTTPoison.get()
    |> handle_response()
  end

  defp handle_response({:ok, %HTTPoison.Response{status_code: 200, body: body}}) do
    body
    |> Jason.decode!()
    |> get_in(["main", "temp"])
  end

  defp handle_response(resp) do
    Logger.warn("Failed to fetch temperature data: #{inspect(resp)}")

    :error
  end

  defp generate_url(city, country) do
    "http://api.openweathermap.org/data/2.5/weather?q=#{city},#{country}&appid=#{@api_key}"
  end
end

With that in place, we need to create our Broadway consumer module to leverage this HTTP API wrapper. Create lib/temp_processor.ex with the following content. We’ll go over what it does shortly:

defmodule WorldTemp.TempProcessor do
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {WorldTemp.CityProducer, []},
        transformer: {__MODULE__, :transform, []},
        rate_limiting: [
          allowed_messages: 60,
          interval: 60_000
        ]
      ],
      processors: [
        default: [concurrency: 5]
      ]
    )
  end

  @impl true
  def handle_message(:default, message, _context) do
    message
    |> Message.update_data(fn {city, country} ->
      city_data = {city, country, WorldTemp.TempFetcher.fetch_data(city, country)}
      WorldTemp.TempTracker.update_coldest_city(city_data)
    end)
  end

  def transform(event, _opts) do
    %Message{
      data: event,
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
  end

  def ack(:ack_id, _successful, _failed) do
    :ok
  end
end

The start_link/1 function defines the various options required by Broadway to orchestrate the data pipeline. You’ll notice that the GenStage producer that we previously created is referenced in the producer keyword list section and we also defined a rate limiter to ensure that we don’t go over our free tier usage on OpenWeatherMap. The transformer entry invokes the transform/2 function at the bottom of the module which is required boilerplate to format our incoming messages to a %Broadway.Message{} struct. An important thing to note here is that we specify 5 concurrent processors (if you recall from the supervision tree image earlier, there were 5 consumer processes). By changing that one value, you can determine how much concurrency/throughput you would like from your Broadway pipeline. The meat of our logic is in handle_message/3 where we retrieve our message (a city+country tuple), make an API call, and then update our rolling record of the coldest city.

Now let’s create that WorldTemp.TempTracker module in lib/temp_tracker.ex with the following content:

defmodule WorldTemp.TempTracker do
  use Agent

  def start_link(_) do
    Agent.start_link(fn -> nil end, name: __MODULE__)
  end

  def get_coldest_city do
    Agent.get(__MODULE__, fn {city, country, temp} ->
      "The coldest city on earth is currently #{city}, #{country} with a temperature of #{
        kelvin_to_c(temp)
      }°C"
    end)
  end

  def update_coldest_city(:error), do: nil

  def update_coldest_city({_, _, new_temp} = new_data) do
    Agent.update(__MODULE__, fn
      {_, _, orig_temp} = orig_data ->
        if new_temp < orig_temp, do: new_data, else: orig_data

      nil ->
        new_data
    end)
  end

  defp kelvin_to_c(kelvin), do: kelvin - 273.15
end

This module is a relatively straight forward Agent-based module that allows us to retrieve the current coldest city, or update it if the provided value is lower than the currently set value.

Lastly, we need to update our lib/world_temp/application.ex file and add a couple of items to our supervision tree. If you named your modules the same as I did, your application.ex file should look like this:

defmodule WorldTemp.Application do
  @moduledoc false

  use Application

  alias WorldTemp.{CityProducer, TempProcessor, TempTracker}

  def start(_type, _args) do
    children = [
      TempTracker,
      CityProducer,
      TempProcessor
    ]

    opts = [strategy: :one_for_one, name: WorldTemp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

With all that in place, we can run iex -S mix from the command line, and be able to interact with our application:

iex(1) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Bern, Switzerland with a temperature of 1.8100000000000023°C"
iex(2) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Copenhagen, Denmark with a temperature of -0.37000000000000455°C"
iex(3) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Copenhagen, Denmark with a temperature of -0.37000000000000455°C"
iex(4) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Helsinki, Finland with a temperature of -1.8899999999999864°C"
iex(5) ▶ WorldTemp.TempTracker.get_coldest_city()
"The coldest city on earth is currently Roseau, Dominica with a temperature of -18.99999999999997°C"

By calling WorldTemp.TempTracker.get_coldest_city() periodically, we can see that as the Broadway processors work through the city+country list, the city with the coldest temperature changes. It may take a few minutes to run through the whole list given that we are only processing 60 cities a minute and our list has a length of 216 elements.

Summary

As we can see from the sample application that we’ve written, with relative ease, we were able to create a data processing pipeline for our Elixir application by using Broadway. With all of the abstractions given to us via Broadway, we can rest easy knowing that our pipeline will operate as intended and will recover from any issues if they arise. In addition, we have all the levers necessary to adjust the performance characteristics of our pipeline via some configuration.

Thanks for sticking with me to the end and if you would like to learn more about Broadway, I suggest going through the following resources:

Guest author Alex Koutmos is a Senior Software Engineer who writes backends in Elixir, frontends in VueJS and deploys his apps using Kubernetes. When he is not programming or blogging he is wrenching on his 1976 Datsun 280z.

P.S. If you’d like to read Elixir Alchemy posts as soon as they get off the press, subscribe to our Elixir Alchemy newsletter and never miss a single post !

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章