Apache Kafka – Producer and Consumer. A simple example of a Nodejs program
Greetings! Continuing the topic of studying microservices, I decided to deal with the interaction of these same “services” and write a simple example of the interaction of two services with each other.
Before reading this article, I strongly recommend reading this article on kafka (Kafka in 20 minutes. Mental model and how to work with it)
An example of implementation is possible find here…
And so the example is:
The user service has a method for registering these users, where after registration, it is necessary to create a profile for the user.
Of course, in the given example, there is no logic of “answers to questions” such as:
The example is aimed exclusively at demonstrating the communication between two services using Apache Kafka.
Contents
Realization
And yes, in this example user‑service acting as producer — that is, the sender of events, a profile-service — acts as consumer — that is, the one who listens to incoming events.
Let’s create 2 absolutely identical services with the following files
-
Let’s create a directory microservices — where we will place our services
-
Let’s create files for the profile service:
They should be placed in a separate directory profileprofile.service.js
import Fastify from 'fastify' const fastify = Fastify({ logger: true, }) fastify.listen({ port: 3001, host: "0.0.0.0" }, (err, address) => { if (err) throw err console.log("Profile service: Start Up!") })
Dockerfile
FROM node:22 WORKDIR /profile-microservice COPY package.json . COPY yarn.lock . RUN yarn install COPY . . EXPOSE 3001 CMD ["node", "profile.service.js"]
package.json
{ "name": "microservice-kafka-learn_profile", "version": "1.0.0", "license": "MIT", "type": "module", "dependencies": { "@fastify/kafka": "^3.0.0", "fastify": "^5.0.0" } }
-
Then follow the same principle as for the service profilelet’s create a directory and file structure for the user service
Next, we will fundamentally create our project docker-compose in which we will raise our previously created services, and also immediately place ours there Apache Kafka
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka_broker:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
user_microservice:
build: "./microservices/user"
ports:
- "3000:3000"
depends_on:
- kafka_broker
profile_microservice:
build: "./microservices/profile"
ports:
- "3001:3001"
depends_on:
- kafka_broker
At this stage, launch docker-compose should lead you to the fact that your services user, profileas well as kafka and zookeeper should work correctly.
Scenario: “User registered successfully”
Next step, let’s add to ours user.service.js Kafka connection file
// ...
const fastify = Fastify({
logger: true,
})
fastify.register(kafka, {
producer: {
'metadata.broker.list': 'kafka:9092',
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'client.id': 'user-service',
'dr_cb': true
},
});
// ...
We have successfully connected to our kafkanow let’s add a user “register” method
fastify.post('/user/register', async (request, reply) => {
// .....логика регистрации пользователя.....
// Считаем что тут у нас логика создания пользователя, которая прошла успешно
// ...
/*
* Отправьте событие для создания профиля для успешно зарегистрированного пользователя.
* */
fastify.kafka.push({
topic: "user_register",
payload: JSON.stringify({
id: Date.now(),
email: "[email protected]",
username: "imbatman"
}),
key: 'user_register_key'
})
reply.send({
message: "User successfully created!"
})
})
What is going on here?
-
In this code, we believe that we have successfully registered the user and proceed to send a message to kafka
-
We create a name topicAnd, it is desirable to transfer these topic names to constants, for their further use, for example, in other services, as we will see in the example below
-
Forming payload that we want to pass on to the potential recipient of our event
-
We form the key of our message
-
We are sending!
-
OK, your data has been sent to the broker!
Scenario: “Creating a user profile”
Let’s add a listener to our event to create a profile in the service profile.service.js
It’s the same for us as in the service useryou need to connect to ours Kafka in the service profile
import crypto from "node:crypto"
const groupId = crypto.randomBytes(20).toString('hex')
fastify.register(kafka, {
consumer: {
'metadata.broker.list': 'kafka:9092',
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'topic.metadata.refresh.interval.ms': 1000,
'group.id': groupId,
},
})
Before connecting, we need to generate a group ID, it serves to identify a group of consumers (consumer). All consumers with the same group.id form one group and jointly consume messages from topics, sharing partitions among themselves. (If you’ve read the article I recommended above, you know what I’m talking about).
Next we need to subscribe to our user registration event ie “user_register”
Add the following code:
fastify.register(kafka, {
consumer: {
'metadata.broker.list': 'kafka:9092',
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'topic.metadata.refresh.interval.ms': 1000,
'group.id': groupId,
},
}).after((err) => {
if (err) throw err
fastify.kafka
.subscribe(["user_register"])
.on("user_register", (msg, commit) => {
const registeredUser = JSON.parse(msg.value.toString());
console.log(registeredUser) // Тут наш зарегистрированный юзверь
commit()
})
fastify.kafka.consume()
})
Let’s figure out what’s going on here?
-
To begin with, we subscribe to all events necessary for this service in our service
.subscribe(["user_register"])
. In this case, this event is “user_register”Again, I will repeat that the keys of topics must be stored outside of all services, for their general access. In this case, it is done for the simplicity of the example.
-
Next, we set up an event handler
.on("user_register", (msg, commit) => {
-
In the body of this event, we receive the result of the event we sent in the service userie:
{ id: "1726957487909", email: "[email protected]", username: "imbatman" }
-
It remains for us to process in ours profile service given the event, and create a user profile based on the provided data
-
Next, by calling the commit function presented in the argument, we confirm the receipt and processing of the message Kafkasignaling to the broker that this message can be marked as “read” in the current consumer group, preventing it from being received again
fastify.kafka.consume()
in turn, activates the process of consuming messages from subscribed topics, in this case “user_register”. After calling this method, Fastify will start processing the incoming messages and pass them to the appropriate event handlers such as on("user_register", ...)
Conclusion
I hope that with such a simple example, I, first of all, as a person who has just become acquainted with this technology, and you, as readers of this article, managed to catch the first simple steps and understand the interaction of entities in the form of services (as in this example) among themselves!