Kafka Client for Laravel
This post does not introduce fully about what Kafka is, how it works. I focus on sharing Lara Kafka is Kafka client to save your time and force in implementing message broker for your Laravel project. However, I would like to give some short descriptions related to Kafka :)
Apache Kafka is an event streaming platform. Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single battle-tested solution (1):
- To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
- To store streams of events durably and reliably for as long as you want.
- To process streams of events as they occur or retrospectively.
There are some popular terms in Kafka (2):
- Message: A stream of bytes. For simplicity, assume it as a string.
- Producers are those client applications that publish (write) events to Kafka
- Consumers are those that subscribe to (read and process) these events.
In Kafka, producers and consumers are fully decoupled and agnostic of each other, which is a key design element to achieve the high scalability that Kafka is known for.
- Broker: Receives the messages, stores it, and decides the consumers who should get these messages.
- Topic: A unique name through which the data is streamed.
- Partition: A Partition is a virtual division which allows parallelizing a topic by splitting the data in a particular topic across multiple brokers. A topic can have multiple partitions. Minimum of 1.
- Offset: A unique number for a message in a topic’s partition
Before we look at Lara Kafka, we need to install some prerequisite packages:
Now let see Lara Kafka https://github.com/cuongdinhngo/lara-kafka
1-Install `cuongdinhngo/lara-kafka` using Composer.
composer require cuongdinhngo/lara-kafka
2-Add the following service provider in `config/app.php`
/** Package Service Providers...*/LaraAssistant\LaraKafka\LaraKafkaServiceProvider::class,
This command copies Libraries/Kafka folder into app that contains KafkaConsumer & KafkaProducer classes
3-Create a topic
You must start Zookeeper and Kafka service, please read detail about Kafka installation
~/kafka/bin/kafka-topics.sh — create — zookeeper localhost:2181 — replication-factor 1 — partitions 1 — topic TutorialTopic
4 -Create a message
You can create a simple API to generate a message for this demo
use App\Libraries\Kafka\KafkaProducer;use App\Libraries\Kafka\KafkaConsumer;. . .public function testKafka(){ dump(date('Y/m/d h:i:s', time())); $kafkProducer = new KafkaProducer("TutorialTopic"); $kafkProducer->setBrokers("localhost:9092"); $kafkProducer->init(); $kafkProducer->addPayload("HELLO ... ".date('Y/m/d h:i:s', time())); dd(__METHOD__);}
Please member that message content must be string
5 -Consume a message
public function consumeKafka(){ dump('start ...'); dump(date('Y/m/d h:i:s', time())); $consumer = new KafkaConsumer(); $consumer->init( ['TutorialTopic'], 'myConsumerGroup', 'localhost:9092', [ 'group.id' => 'myConsumerGroup', 'auto.offset.reset' => 'earliest' ] ); echo "Waiting for partition assignment... (make take some time when\n"; echo "quickly re-joining the group after leaving it.)\n"; $consumer->consume();}
You can modify Libraries/Kafka/KafkaConsumer class to handle message and error
protected function handleMessage($message){ dump($message);}protected function handleErrors($message){ switch ($message->err) { case RD_KAFKA_RESP_ERR__PARTITION_EOF: dump("No more messages; will wait for more"); break; case RD_KAFKA_RESP_ERR__TIMED_OUT: dump('Timed out: '.date('Y/m/d h:i:s', time())); break; default: throw new \Exception($message->errstr(), $message->err); break; }}
References:
(1) https://kafka.apache.org/intro
(3) https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04
(4) https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.installation.html