, , ,

Intro to Apache Kafka for Data Engineers

Streams…. the Apache Kafka one….

Streams, streams, streams…. when will it ever end? It’s hard to keep up with all the messaging systems these days. GCP PubSub, AWS SQS, RabbitMQ, blah blah. Of course there is Kafka, hard to miss that name floating around in the interwebs. Since pretty much every system designed these days is a conglomerate of services… it’s probably a good idea to poke at things under the cover. Of course Apache Kafka is probably at the top of list of those open source streaming services. Today I’m going to attempt to install a Kafka cluster and push some messages around.

Apache Kafka – what’s all about.

Apache Kafka the “distributed streaming platform.” Pretty sure that would be impressive if there wasn’t like 10 other tools for the same thing. I will reserve my judgment for after I’ve tried to install the stinker on a cluster and tried the supported Python and maybe Scala APIs, and maybe tried to decipher some documentation.

I’m going to assume some knowledge about streaming. It’s really just a fault tolerant, scalable, reliable way to handle passing messages/data around.

What do you need to know about Kafka?

  • “Kafka is run as a cluster of one or more servers…..”
  • “Some of these servers form the storage layer, called the brokers.”
  • messages come from outside the “cluster” are called producers.
  • “The data can be partitioned into different “partitions” within different “topics“.”
  • These partitions can be stored across multiple brokers, giving the system resiliency.
  • Consumers, both inside and outside the system read messages from the partitions.
  • Apache Zookeeper is usually used on conjunction with Kafka for cluster metadata management.

Like most good Apache software projects there are number of APIs into the system to provide different functionality…

Installing a Kafka 3 node cluster.

Now it’s time to find out how well this project was setup. I’m going to be attempting to install and setup a Kafka cluster on my 3 node Linode cluster. First, I assumed the documentation would have some straight forward install instructions for a cluster, but that appears not be the case. The instructions mostly cover just installing on a single node. I found this pretty annoying. I mean if you product is a distributed cluster product….ummm…write a good install guide for clusters???? Seriously. All code available on GitHub.

I found some instructions elsewhere I followed.

  1. Download tarball onto machine and unpack. I did this on all three of my nodes.
wget https://apache.claz.org/kafka/2.6.0/kafka_2.13-2.6.0.tgz
tar -xzf kafka_2.13-2.6.0.tgz

// lets add kafka location to path.
vim ~/.bashrc
// add the following line, then source ~/.bashrc
export PATH=$PATH:$HOME/.local/bin:$HOME/bin:$HOME/kafka_2.13-2.6.0/bin

Configure Zookeeper.

When it comes to configuring Kafka and Zookeeper you can see all the files located in <install-dir>/config

~/kafka_2.13-2.6.0/config$ ls
connect-console-sink.properties    connect-file-sink.properties    connect-mirror-maker.properties  log4j.properties     tools-log4j.properties
connect-console-source.properties  connect-file-source.properties  connect-standalone.properties    producer.properties  trogdor.conf
connect-distributed.properties     connect-log4j.properties        consumer.properties              server.properties    zookeeper.properties

I’m going to keep the defaut zookeeper.properties configs, (some people change default data directory etc.)

Next I’m going to start the Zookeeper service while passing in the properties file apparently. This seems a little strange.

cd ~/kafka_2.13-2.6.0/bin
zookeeper-server-start.sh /home/beach/kafka_2.13-2.6.0/config/zookeeper.properties

In another shell I ran the following command to make sure Zookeeper responds.

zookeeper-shell.sh 0.0.0.0:2181 ls /brokers/ids
//output 
Connecting to 0.0.0.0:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
Node does not exist: /brokers/ids

Configure Kafka properties.

This will have to be done on each Kafka broker node. Change directories back into the config…. cd ~/kafka_2.13-2.6.0/config

vim server.properties

Each broker node will need a unique id. ( So I will just leave this one as 0 and the two other nodes as 1 and 2

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

The next most important settings for a cluster ( not in a single node setup) are the listeners and advertised.listeners configs. Listeners can be set as show below, but the advertised ones need to be what a remote client can actually connect to. You would set this on each node, with the appropriate IP address.

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://173.255.199.161:9092

There are a number of other settings in here you can change and look through. There are directories for where your logs are stored, I pretty much left everything as default except the below. I figured with my small 1CPU and GB ram node machines I might want to cut the threads in half from 8 to 4.

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=4

Also, the other important setting is the Zookeeper connection IP address and port.

// this is the default
zookeeper.connect=localhost:2181
// I changed mine to be the node on which I installed and have Zookeeper running
zookeeper.connect=173.255.199.161:2181

Fire up the Kafka broker nodes.

Next I am actually going to start Kafka on all the nodes. Run the below command from the bin folder of each node.

kafka-server-start.sh /home/beach/kafka_2.13-2.6.0/config/server.properties

The first time I ran that…bunch of errors.

[2020-10-24 22:08:27,411] INFO Initiating client connection, connectString=localhost:21811 sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@5119fb47 (org.apache.zookeeper.ZooKeeper)
[2020-10-24 22:08:27,421] INFO jute.maxbuffer value is 4194304 Bytes (org.apache.zookeeper.ClientCnxnSocket)
[2020-10-24 22:08:27,430] INFO zookeeper.request.timeout value is 0. feature enabled= (org.apache.zookeeper.ClientCnxn)
[2020-10-24 22:08:27,437] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2020-10-24 22:08:27,443] INFO Opening socket connection to server localhost/127.0.0.1:21811. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-10-24 22:08:27,450] INFO Socket error occurred: localhost/127.0.0.1:21811: Connection refused (org.apache.zookeeper.ClientCnxn)
[2020-10-24 22:08:28,553] INFO Opening socket connection to server localhost/127.0.0.1:21811. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-10-24 22:08:28,554] INFO Socket error occurred: localhost/127.0.0.1:21811: Connection refused (org.apache.zookeeper.ClientCnxn)
[2020-10-24 22:08:29,660] INFO Opening socket connection to server localhost/127.0.0.1:21811. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)

Opps…start Zookeeper first. Got the same error again, checked my config for server.properties… changed the following…

changed from localhost:2181 --> to 0.0.0.0:2181

This time success.

[2020-10-24 22:26:19,177] INFO [SocketServer brokerId=0] Started socket server acceptors and processors (kafka.network.SocketServer)
[2020-10-24 22:26:19,239] INFO Kafka version: 2.6.0 (org.apache.kafka.common.utils.AppInfoParser)
[2020-10-24 22:26:19,241] INFO Kafka commitId: 62abe01bee039651 (org.apache.kafka.common.utils.AppInfoParser)
[2020-10-24 22:26:19,244] INFO Kafka startTimeMs: 1603578379182 (org.apache.kafka.common.utils.AppInfoParser)
[2020-10-24 22:26:19,246] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

Let’s try that Zookeeper command again and see if we can see all our Kafka brokers running.

zookeeper-shell.sh 0.0.0.0:2181 ls /brokers/ids
/// output
Connecting to 0.0.0.0:2181
KeeperErrorCode = ConnectionLoss for /brokers/ids

Stinker, well that didn’t work. I didn’t think I did anything wrong. I basically went and tried again. I shut everything down, then restarted Zookeeper, then went to each node and restarted Kafka. That seemed to do the trick. Now when I run the command I can see the unique id showing up for all three Kafka broker nodes. (0, 1, 2)

beach@localhost:~$ zookeeper-shell.sh 0.0.0.0:2181 ls /brokers/ids
Connecting to 0.0.0.0:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[0, 1, 2]

Try out the Python Kafka client.

So now that the Kafka cluster is up and running, let’s try us some of the Python client. You can tell a lot about a project based on how good or not good the Python client is. Here is the documentation for the Python client. Well the documentation is a little lack luster, back then nothing I suppose. Seems to be geared toward more Producer and Consumer interaction on an already setup and running Kafka topic.

pip install kafka-python

First I write a quick class wrapper around the Python client to connect, list my brokers in the cluster, and to create my first topic.

Create first topic.

import kafka


class PyKafka:
    def __init__(self, cluster_endpoints: list = []):
        self.endpoints = cluster_endpoints
        self.meta = None
        self.brokers = None
        self.topics = []
        self.meta_client = None
        self.client = None
        self.clientAdmin = None

    def pull_cluster_meta(self):
        self.meta = kafka.cluster.ClusterMetadata(bootstrap_servers=self.endpoints,
                                                  request_timeout_ms=10000)

    def pull_broker_meta(self):
        self.brokers = self.meta.brokers()

    def pull_topics(self):
        self.topics.extend(list(self.meta.topics()))

    def set_meta_client(self, name: str = "default_meta"):
        self.meta_client = kafka.KafkaClient(bootstrap_servers=self.endpoints,
                                             client_id=name)

    def set_client_admin(self, name: str = "pyKafkaClient"):
        self.clientAdmin = kafka.KafkaAdminClient(bootstrap_servers=self.endpoints,
                                                  client_id=name,
                                                  request_timeout_ms=10000)

    def create_topics(self, topics_list: list = []) -> object:
        kr = self.clientAdmin.create_topics(topics_list)
        return kr


if __name__ == '__main__':
    pK = PyKafka(cluster_endpoints=["198.58.124.54:9092", "173.255.199.161:9092", "50.116.17.69:9092"])
    pK.set_meta_client()
    pK.pull_cluster_meta()
    if pK.meta_client.bootstrap_connected():
        print("connected to meta client brokers")
        pK.pull_broker_meta()
        print(pK.brokers)
    pK.set_client_admin()
    topics_to_create = [kafka.admin.NewTopic(name="my_first_topic", num_partitions=1, replication_factor=2)]
    kafka_response = pK.create_topics(topics_list=topics_to_create)
    print(kafka_response)

It worked fairly well. There I can see the meta data return from my client shows all most nodes and their corresponding connection info.

connected to meta client brokers
{BrokerMetadata(nodeId='bootstrap-1', host='50.116.17.69', port=9092, rack=None), BrokerMetadata(nodeId='bootstrap-2', host='198.58.124.54', port=9092, rack=None), BrokerMetadata(nodeId='bootstrap-0', host='173.255.199.161', port=9092, rack=None)}

One thing that is quite annoying is that many times when I try to run my code, I get timeouts from the brokers. Not sure what is up with that. Sometimes my connections go through, sometimes not. I tried playing with the request_timeout_ms1 and such things, but to no avail.

Traceback (most recent call last):
  File "src/exploringKafkaWithPython.py", line 40, in <module>
    pK.set_meta_client()
  File "src/exploringKafkaWithPython.py", line 26, in set_meta_client
    client_id=name)
  File "/Users/danielbeach/code/kafkaPython/lib/python3.7/site-packages/kafka/client_async.py", line 244, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/Users/danielbeach/code/kafkaPython/lib/python3.7/site-packages/kafka/client_async.py", line 927, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

My first topic my_first_topic seemed to go well.

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='my_first_topic', error_code=0, error_message=None)])

Also, running a command on my cluster showed the topic was created as well.

kafkacat -b localhost:9092 -L
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
 3 brokers:
  broker 0 at 173.255.199.161:9092
  broker 2 at 50.116.17.69:9092
  broker 1 at 198.58.124.54:9092
 1 topics:
  topic "my_first_topic" with 1 partitions:
    partition 0, leader 2, replicas: 2,0, isrs: 2,0

Send Messages to Kafka cluster via Producer.

Next, I added a few more methods to my Class to create a Producer and send some messages to the cluster. First, I noticed the messages have to bytes objects. I’m surprised at that part.



    def create_producer(self):
        self.producer = kafka.KafkaProducer(bootstrap_servers=self.endpoints,
                                            client_id="pythonProducer",
                                            acks=1,
                                            retries=3)

    def send_message(self, topic: str,  message: bytes) -> object:
        future_record_meta = self.producer.send(topic=topic, value=message)
        return future_record_meta

    def get_producer_metrics(self):
        print(self.producer.metrics())


if __name__ == '__main__':
    pK = PyKafka(cluster_endpoints=["198.58.124.54:9092", "173.255.199.161:9092", "50.116.17.69:9092"])
    pK.create_producer()
    messages = [b'{"first_message": 1}', b'{"second_message": 2}']
    for message in messages:
        rmf = pK.send_message(topic="my_first_topic",
                              message=message)
        record_metadata = rmf.get(timeout=60)
        print(record_metadata)

I ended up running that a few times. Did throw some error though? Maybe one record made it through?

    raise self.exception # pylint: disable-msg=raising-bad-type
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Batch for TopicPartition(topic='my_first_topic', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time

Get Messages from Kafka cluster via Consumer.

Let’s create a Consumer and pull the messages back off that topic. There appears to be some tricky settings around Consumers. You have to read up on partitions and offsets.

I tried to no avail to get the messages back of the topic. No luck. For one I’m not convinced the messages go published, even though I tried a few times and the error message above indicates at least 1 message made it?

    def create_consumer(self):
        self.consumer = kafka.KafkaConsumer(bootstrap_servers=self.endpoints,
                                            client_id="pythonConsumer",
                                            auto_offset_reset="earliest")


if __name__ == '__main__':
    pK = PyKafka(cluster_endpoints=["198.58.124.54:9092", "173.255.199.161:9092", "50.116.17.69:9092"])
    pK.create_consumer()
    pK.consumer.subscribe(["my_first_topic"])
    print(pK.consumer.subscription())
    records = pK.consumer.poll(max_records=10)
    for record in records:
        print(record)

I also tried this…

if __name__ == '__main__':
    pK = PyKafka(cluster_endpoints=["198.58.124.54:9092", "173.255.199.161:9092", "50.116.17.69:9092"])
    pK.create_consumer()
    pK.consumer.subscribe(["my_first_topic"])
    for r in pK.consumer:
        print(r)

This code just hangs. I’m assuming this all has to do with my lack of understanding fully how messages related to partitions and offsets etc. Even running a consumer from the console didn’t seem to produce any results.

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_first_topic --from-beginning

I give up… for now.

Conclusion

I’m not sure what to think of Kafka so far. The install seemed to be not as straight forward as many of the other great Apache projects like Hadoop and Spark, but that’s probably not that much of a surprise? The documentation was definitely not up to snuff, and I’m not the only who thinks that based on other comments online. It lacks a real explanation of how to install a cluster, considering this is a distributed project that seems a little strange. You have to do a lot of googling to figure out which properties in what config files need to be set.

The Python client was ok. The documentation again was lacking, and others on StackOverflow were complaining about the ambiguity as well. It appears you have to be a master of Kafka before you could, without trouble, work through the client libraries provided.

Since I wasted a weekend getting this Kafka cluster up and running, I will be returning to finish the job and get some messages pushed and pulled off a topic via Python. Till next time.