AWS MSK + Glue – Part One

Antique cannon covered in snow
(Photo by Author)

Over December 2022 I got it into my head that I wanted my teams to skill up on the combination of Kafka and a Schema Register. There’s a ton of resources on introducing the use of Kafka, and some resources on using a Schema Registry, but I was not satisfied that there were any consolidated quick-starts that experienced engineers could use as templates for building real production solutions.

To that end, I started by building some demonstrations of using what is effectively the canonical stack from Confluent. You can poke around with these if you’re interested, but that’s not the main thing that I want to write up:

  • PyKafka — Python + Avro + Kafka + Confluent Schema Registry
  • KafkaToy — Java + Avro + Kafka + Confluent Schema Registry
  • SpringKafka — SpringBoot + Avro + Kafka + Confluent Schema Registry

These will be a pretty good start for my teams, but I wanted to go a bit further. Managing Kafka clusters is a pain in the neck, and all the major cloud providers have some version of a managed Kafka service to make it easier. In the case of AWS, this is Managed Streaming for Apache Kafka (MSK), which comes in a “provisioned” and “serverless” flavour.

The main difference between these is that the “provisioned” version requires you to define the Kafka cluster up front, and opens up various configuration options to customise how the cluster integrates with the rest of your infrastructure. The “serverless” flavour is simpler to setup, more opinionated and less configurable, but has the advantage of being arbitrarily scalable. It is a little hard to determine whether one solution is cheaper than the other — the up front cost of the “serverless” version appears more expensive for development and experimentation, but conversely will be more cost effective if the traffic is unpredictable or spiky. For what it is worth, the three-node configuration I describe below (which I spun up when needed and shut down overnight) cost me less than $USD10 a day.

This is all well and good, but a Kafka cluster really should be coupled with a Schema Registry for production use. It’s the best way to be able to programatically and automatically validate message formats and manage changes in message formats. While MSK gives us a low-maintenance, low-configuration cluster, it doesn’t give us a Schema Registry.

Because MSK is completely compatible with the Apache API for controlling the data plane (e.g. managing and using topics, monitoring topic behaviours etc), then it’s easy to use existing frameworks and examples to build producers and consumers to use the cluster, but what are we going to do about the Schema Registry? We’ve only got two choices — run and manage a Schema Registry service in EC2 or EKS, or use the managed service that AWS provides… which turns out in late 2022 to be AWS Glue.

Guess what? There’s no straight forward, ready-to-go examples showing engineers how to use this combination: Provisioned MKS (Kafka), AWS Glue, Terraform and Python/Java. Hence… I’m going to need to build this myself, which brings us here.

Once I began work on this more complex demonstration for my team, it became evident there are some subtleties that can trip us up, so I decided to break the effort (and this write up) into two parts:

  • Terraform MSK and demonstrate without a Schema Registry;
  • Add the Schema Registry using Glue.

When looking at what I’ve built so far and discuss it below, there’s an important caveat about directly using this for production purposes: there is no consideration of client authentication. This is one place where AWS MSK starts to vary from “raw” Apache Kafka, and there are several different ways to deal with authentication depending on your application deployment architecture. For further reading on this, I suggest:

If you want to follow along with the demonstration project as I build it, you can find it in Github as TheBellman/msk-glue-example — the version of the code I talk about below is tagged as “1.0-python-text”.

Let’s get on with it then. The Terraform project contains a bunch of support code to do things like setup logging destinations, create KMS keys and so forth, but I won’t talk about them much.

The first thing I did was use my (quite opinionated) VPC module-vpc to create an isolated VPC to play in. This is an implementation of one of Amazon’s suggested architectures, and creates a fairly secure (or at least securable) setup with some ‘private’ subnets that are not directly accessible from the Internet, some ‘public’ subnets that are partly accessible (a good place to mount web services, APIs etc), and some security groups and NACL to lock it down. It’s useful to know that this boiler-plate code allows access from the ‘public’ subnets to the ‘private’ on port 80 and 443, allows access to the ‘public’ on 80, 443 and 22 (for SSH from nominated clients), and both subnets can communicate out to the internet on 80 and 443 in order to install and manage software on hosts.

Step one: create the VPC:

module "vpc" {
  source = "github.com/TheBellman/module-vpc"
  tags   = var.tags
  vpc_cidr    = "172.21.0.0/16"
  vpc_name    = "msk-glue"
  ssh_inbound = ["188.211.160.179/32", "3.8.37.24/29"]
}

Since you will ask — the ‘/32’ CIDR block is my desktop, and ‘3.8.37.24/29’ is the CIDR block for AWS EC2 Instance Connect in the eu-west-2 region. Also since you will ask… no, these values are not hard coded in the Terraform code, but are injected via variables.

Step two: create a cluster Kafka configuration to override some defaults. Note that these configurations can be changed later without tearing down the cluster, but are versioned. Here, I reduce the friction for building test and exploratory clients:

resource "aws_msk_configuration" "dev" {
  kafka_versions = [local.kafka_version]
  name           = local.cluster_name
  server_properties = <<PROPERTIES
auto.create.topics.enable = true
delete.topic.enable = true
PROPERTIES
}

Step three: build the cluster.

The documentation for this warns that it could take 15 minutes to build the cluster. My experience was that it consistently took 30 minutes. If you are used to your Terraform projects running in a few minutes… be prepared to go have lunch while this builds.

resource "aws_msk_cluster" "dev" {
  cluster_name  = local.cluster_name
  kafka_version = local.kafka_version
  number_of_broker_nodes = length(data.aws_availability_zones.available.zone_ids)
  broker_node_group_info {
    instance_type = "kafka.t3.small"
    client_subnets = tolist(module.vpc.private_subnet_id)
    security_groups = [aws_security_group.dev.id]
    storage_info {
      ebs_storage_info {
        volume_size = 100
      }
    }
  }
  configuration_info {
    arn      = aws_msk_configuration.dev.arn
    revision = 1
  }
  encryption_info {
    encryption_at_rest_kms_key_arn = aws_kms_key.dev.arn
  }
  logging_info {
    broker_logs {
      cloudwatch_logs {
        enabled   = true
        log_group = aws_cloudwatch_log_group.dev.name
      }
      s3 {
        enabled = true
        bucket  = aws_s3_bucket.logs.id
        prefix  = "logs/msk"
      }
    }
  }
}

Whoah. That’s a lot. Let’s break it down. First up, number_of_broker_nodes needs to be a multiple of the number of availability zones in your region. It also seems to be necessary that you have an available subnet in each of those availability zones! I have three subnets, and there are three availability zones in my region, so that’s easy.

client_subnets is a terrible name, because this is actually the subnets that the Kafka cluster is built into! Figuring this out cost me half a day, as the documentation is… not good. I could rant endlessly about the cost of poor documentation, but limit myself to saying: please choose your names wisely, and be explicit in the description of what a parameter is used for.

The other big gotcha is security_groups — again, the documentation is not great on this but this parameter controls where requests to the Kafka cluster can come from. This edges into the uncomfortable dichotomy of where security groups are used to somehow classify or identify EC2 instances, and where security groups are used as a stateful firewall. I’ll touch on the security group below.

The rest of the configuration is pretty straight forward. The Terraform resource has quite a lot of configuration available, but defaults to sensible things if not specified — for example because we’ve not included a client_authentication block, the cluster defaults to no authentication, but still requiring SSL for communication.

You can see the link to our cluster configuration at configuration_info — note the revision number there, which you will need to update if you change the configuration!

I encrypt the data at rest, knowing that it’s protected by TLS in flight, and log both to Cloudwatch and into a bucket.

Ok, back to security groups. As I said, we attach a security group to the cluster that specifies where requests can come from. The ports that we open up are from a range specified by Amazon, as different authentication and request routes use different ports.

resource "aws_security_group" "dev" {
  name        = "msk_dev"
  vpc_id      = module.vpc.vpc_id
  description = "Security group for MSK cluster"
  tags        = { Name = "msk_dev" }
}
resource "aws_security_group_rule" "dev_kafka_in" {
  security_group_id = aws_security_group.dev.id
  type              = "ingress"
  from_port         = 9092
  to_port           = 9098
  protocol          = "tcp"
  cidr_blocks       = tolist(module.vpc.public_subnet)
}
resource "aws_security_group_rule" "dev_zk_in" {
  security_group_id = aws_security_group.dev.id
  type              = "ingress"
  from_port         = 2181
  to_port           = 2182
  protocol          = "tcp"
  cidr_blocks       = tolist(module.vpc.public_subnet)
}

This winds up being pretty straightforward — I only want to open up to allow requests from clients in my “public” subnets, where I will mount applications.

There was one other tweak that I needed though. My VPC module created security groups that could be used by “public” EC2 instances, but that needs a little tweak to allow my client instances to reach the Kafka cluster:

resource "aws_security_group_rule" "dev_kafka_out" {
  security_group_id = module.vpc.public_sg
  type              = "egress"
  from_port         = 9082
  to_port           = 9098
  protocol          = "tcp"
  cidr_blocks       = tolist(module.vpc.private_subnet)
}
resource "aws_security_group_rule" "dev_zk_out" {
  security_group_id = module.vpc.public_sg
  type              = "egress"
  from_port         = 2181
  to_port           = 2182
  protocol          = "tcp"
  cidr_blocks       = tolist(module.vpc.private_subnet)
}

And that’s pretty well that. The only other thing I will mention is that your client code does need to specify that it will use SSL. You can see this for instance in the simple consumer code in the project:

import logging
import socket
import uuid
from pykafka.Config import Config
from pykafka.DataStream import DataStream
from confluent_kafka import Producer
class KafkaProducer:
    """
    This class uses the supplied configuration and a data stream to write to kafka.
    """
    def __init__(self, config: Config, datastream: DataStream):
        self.config = config
        self.datastream = datastream
        self.errors = 0
        self.success = 0
        self.producer = Producer(
            {
                'bootstrap.servers': config.bootstrap,
                'client.id': socket.gethostname(),
                'security.protocol': 'SSL'
            }
        )
    def execute(self):
        """
        When called, will use the configuration and data stream to write to Kafka
        """
        logging.info('Started')
        for _ in range(self.config.count):
            self.producer.produce(
                self.config.topic,
                key=str(uuid.uuid4()),
                value=next(self.datastream.data_stream()),
                callback=self.error_logger)
        # Block until the messages are sent.
        remaining = self.producer.poll(10)
        if remaining > 0:
            logging.warning(f'{remaining} messages were still in the queue waiting to go')
        self.producer.flush()
        self.datastream.data_stream().close()
        logging.info(f'Stopped - {self.errors} errors, {self.success} sent')
    def error_logger(self, err, _):
        if err is not None:
            self.errors += 1
            logging.error(f'Failed to send message: {str(err)}')
        else:
            self.success += 1

That’s all there is, so far. Next step, which I will write up as a follow up, is to wire up AWS Glue as the schema registry for Avro, but that’s a problem for next weekend!

To recap, there are a handful of subtleties to be aware of in the Terraform configuration of this provisioned MSK cluster:

  • client_subnets controls where the cluster will be built;
  • you need at least 3 subnets in different AZ to build the cluster into;
  • security_groups controls where requests to the cluster can come from.

Watch out for Part 2, and I hope this project and writeup may be of some assistance to you.

Leave a Reply

Your email address will not be published. Required fields are marked *