Kafkas Elixir

Posted on Sat 27 April 2024 in english

This is my first successful Elixir project that was actually released. I've been following Elixir for a while now and I'm in the wider diaspora, so it was time to do something productive besides solving Elixir koans and the Elixir track at Exercism.

If you run Kafka like we do at work, you want to check various metrics to see which topics have lags. In other words, how many messages have not yet been processed by the consumers, so that you can draw conclusions about the performance of your own code and act accordingly.

There is already a project that provides you with these metrics (this one), but you also learn by ~~copying~~ imitating :) Some time has passed since I started working on the project and the original project has been archived in the meantime, so my little clone now has a role to fill. Maybe it will help 1 or 2 people with metrics.

For the purpose we had at work, only 2 of the metrics provided by kafka-lag-exporter were of interest:

  • kafka_consumergroup_group_lag: The lag of the consumer group in relation to individual partitions
  • kafka_consumergroup_group_topic_sum_lag: The aggregated lag of the consumer group across all partitions

With this intro, we can now move on to the implementation.

Low-level directives for Kafka

The library brod provides the appropriate low-level functionality to access topics and consumer groups. Since the library is written in Erlang and I can't read it very well, I first had to understand how the library works. I came across a repo on Github that pointed me in the right direction.

 def lag(topic, consumer_group, client) do
    offsets = resolve_offsets(topic, :latest, client)
    comitted_offsets = fetch_committed_offsets(topic, consumer_group, client)

    for {{part, current}, {_part2, committed}} <- Enum.zip(offsets, comitted_offsets) do
      {part, current - committed}
    end
  end

  def lag_total(topic, consumer_group, client) do
    for {_part, recs} <- lag(topic, consumer_group, client), reduce: 0 do
      acc -> acc + recs
    end
  end

Binary parsing

member_assignments, i.e. the information about which topic a consumer group is looking at, is stored in binary format, which looks something like this in a log in iex:

<<0, 0, 0, 0, 0, 1, 0, 17, 111, 119, 108, 115, 104, 111, 112, 45, 99, 117, 115, 116, 111,
        109, 101, 114, 115, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0,
        4, 0>>

With some experience in ASCII and the information that topic names for Kafka should be ASCII, you can already guess that the topic owlshop-customers starts at byte 8 (111 is o). With this knowledge you can then operate on the topic name from the binary as follows:

invalid_topic_characters = ~r/[^[:alnum:]\-\._]/

member_assignment
    |> String.chunk(:printable)
    |> Enum.drop(1)
    |> Enum.take_every(2)
    |> Enum.map(fn topic_name -> Regex.replace(invalid_topic_characters, topic_name, "") end)

The code from kafkajs was helpful for me here, as I could always check whether I was on the right track.

Get topic lag

That was the most difficult part on a technical level. You can then proceed to query the current lag values of the consumer:

  def lag(topic, consumer_group, client) do
    offsets =
      resolve_offsets(topic, client)
      |> Enum.sort_by(fn {key, _value} -> key end)

    committed_offsets =
      fetch_committed_offsets(topic, consumer_group, client)
      |> Enum.sort_by(fn {key, _value} -> key end)

    for {{part, current}, {_, committed}} <- Enum.zip(offsets, committed_offsets) do
      {part, current - committed}
    end
  end

Typespecs

Since I tend to come from typescript and therefore think in terms of types and since types would be very helpful for maintainability in the future I wanted to have typings. Elixir doesn't have this feature by default but you can add typespecs as annotations and have them checked with the help of dializer, a static type checker.

The tool dialyxir allows you to run the typechecker conveniently on the CLI.

I had already dealt with typespecs years ago but at that time I found the information very difficult to access. In the meantime, the official documentation is really easy to read and there is also a beginner-friendly article over at Elixir School.

Testing

No project without tests! Since I am querying values from an external source via API the API must of course be mocked in order to be able to write tests for it properly. If you search for Elixir mocking you will quickly come across Mox. This library is described in detail by the inventor of Elixir in a blog post: Mocks and explicit contracts.

I quite liked the Mox API but unfortunately I didn't manage to get my modules mocked cleanly with it.

Given the case:

Module under test -> Module A -> Module B

You would think that you only have to mock module A to get the test to work as module B is not getting used in the mock for module B. Unfortunately I had to mock the entire chain downwards, which ruined the testing experience.

I followed the problem description here a bit and finally ended up using Patch which also the OP was advised to use.

Metrics

One of the hosts of the podcast Beam Radio, Alex Koutmos, occasionally tells about a new release of his package prom_ex so I knew immediately what I could use for exporting the metrics to Prometheus.

The documentation on manual metrics is well written and so with a short setup

  def manual_metrics(_opts) do
    clients = Application.get_env(:brod, :clients)
    [endpoint | _] = clients[:kafka_client][:endpoints] || [{"redpanda", 29_092}]

    Manual.build(
      :application_versions_manual_metrics,
      {__MODULE__, :group_sum_lag, [endpoint, []]},
      [
        last_value(
          [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
          event_name: [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
          description: "Sum of group offset lag across topic partitions",
          measurement: :lag,
          tags: [:cluster_name, :group, :topic, :consumer_id, :member_host]
        )
      ]
    )
  end

metrics could get served.

To call the metrics in an interval you must then call a method from your genserver.

  def group_sum_lag({host, _port}, consumer_offsets) do
    Enum.each(consumer_offsets, fn %ConsumerOffset{} = consumer_offset ->
      lag = elem(consumer_offset.lag, 1)

      :telemetry.execute(
        [@kafka_event, :consumergroup, :group, :topic, :sum, :lag],
        %{lag: lag},
        %{
          cluster_name: host,
          group: consumer_offset.consumer_group,
          topic: consumer_offset.topic,
          consumer_id: consumer_offset.consumer_id,
          member_host: consumer_offset.member_host
        }
      )
    end)
  end

Conclusion

I dived deeper and longer into Kafka than I would have thought. Testing took longer as well due to the problems with Mox described above.

Now I'm just happy to have a finished project and being able to tackle the next one. You can find the complete code here: https://github.com/Lechindianer/kafkaex_lag_exporter