Apache Kafka – Producer and Consumer. A simple example of a Nodejs program

Apache Kafka – Producer and Consumer. A simple example of a Nodejs program

Greetings! Continuing the topic of studying microservices, I decided to deal with the interaction of these same “services” and write a simple example of the interaction of two services with each other.

Before reading this article, I strongly recommend reading this article on kafka (Kafka in 20 minutes. Mental model and how to work with it)

An example of implementation is possible find here…

And so the example is:

The user service has a method for registering these users, where after registration, it is necessary to create a profile for the user.

Of course, in the given example, there is no logic of “answers to questions” such as:

The example is aimed exclusively at demonstrating the communication between two services using Apache Kafka.

Realization

And yes, in this example user‑service acting as producer — that is, the sender of events, a profile-service — acts as consumer — that is, the one who listens to incoming events.

Let’s create 2 absolutely identical services with the following files

  1. Let’s create a directory microservices — where we will place our services

  2. Let’s create files for the profile service:
    They should be placed in a separate directory profile

    profile.service.js
    import Fastify from 'fastify'
    
    const fastify = Fastify({
        logger: true,
    })
    
    fastify.listen({ port: 3001, host: "0.0.0.0" }, (err, address) => {
        if (err) throw err
        console.log("Profile service: Start Up!")
    })
    Dockerfile
    FROM node:22
    
    WORKDIR /profile-microservice
    
    COPY package.json .
    COPY yarn.lock .
    
    RUN yarn install
    
    COPY . .
    
    EXPOSE 3001
    
    CMD ["node", "profile.service.js"]
    package.json
    {
      "name": "microservice-kafka-learn_profile",
      "version": "1.0.0",
      "license": "MIT",
      "type": "module",
      "dependencies": {
        "@fastify/kafka": "^3.0.0",
        "fastify": "^5.0.0"
      }
    }
  3. Then follow the same principle as for the service profilelet’s create a directory and file structure for the user service

Next, we will fundamentally create our project docker-compose in which we will raise our previously created services, and also immediately place ours there Apache Kafka

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka_broker:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  user_microservice:
    build: "./microservices/user"
    ports:
      - "3000:3000"
    depends_on:
      - kafka_broker

  profile_microservice:
    build: "./microservices/profile"
    ports:
      - "3001:3001"
    depends_on:
      - kafka_broker

At this stage, launch docker-compose should lead you to the fact that your services user, profileas well as kafka and zookeeper should work correctly.

Scenario: “User registered successfully”

Next step, let’s add to ours user.service.js Kafka connection file

// ...

const fastify = Fastify({
    logger: true,
})

fastify.register(kafka, {
    producer: {
        'metadata.broker.list': 'kafka:9092',
        'fetch.wait.max.ms': 10,
        'fetch.error.backoff.ms': 50,
        'client.id': 'user-service',
        'dr_cb': true
    },
});

// ...

We have successfully connected to our kafkanow let’s add a user “register” method

fastify.post('/user/register', async (request, reply) => {
    // .....логика регистрации пользователя.....
    // Считаем что тут у нас логика создания пользователя, которая прошла успешно
    // ...

    /*
    * Отправьте событие для создания профиля для успешно зарегистрированного пользователя.
    * */    
    fastify.kafka.push({
        topic: "user_register",
        payload: JSON.stringify({
            id: Date.now(),
            email: "[email protected]",
            username: "imbatman"
        }),
        key: 'user_register_key'
    })

    reply.send({
        message: "User successfully created!"
    })
})

What is going on here?

  1. In this code, we believe that we have successfully registered the user and proceed to send a message to kafka

  2. We create a name topicAnd, it is desirable to transfer these topic names to constants, for their further use, for example, in other services, as we will see in the example below

  3. Forming payload that we want to pass on to the potential recipient of our event

  4. We form the key of our message

  5. We are sending!

  6. OK, your data has been sent to the broker!

Scenario: “Creating a user profile”

Let’s add a listener to our event to create a profile in the service profile.service.js

It’s the same for us as in the service useryou need to connect to ours Kafka in the service profile

import crypto from "node:crypto"

const groupId = crypto.randomBytes(20).toString('hex')

fastify.register(kafka, {
    consumer: {
        'metadata.broker.list': 'kafka:9092',
        'fetch.wait.max.ms': 10,
        'fetch.error.backoff.ms': 50,
        'topic.metadata.refresh.interval.ms': 1000,
        'group.id': groupId,
    },
})

Before connecting, we need to generate a group ID, it serves to identify a group of consumers (consumer). All consumers with the same group.id form one group and jointly consume messages from topics, sharing partitions among themselves. (If you’ve read the article I recommended above, you know what I’m talking about).

Next we need to subscribe to our user registration event ie “user_register”

Add the following code:

fastify.register(kafka, {
    consumer: {
        'metadata.broker.list': 'kafka:9092',
        'fetch.wait.max.ms': 10,
        'fetch.error.backoff.ms': 50,
        'topic.metadata.refresh.interval.ms': 1000,
        'group.id': groupId,
    },
}).after((err) => {
    if (err) throw err

    fastify.kafka
        .subscribe(["user_register"])
        .on("user_register", (msg, commit) => {
            const registeredUser = JSON.parse(msg.value.toString());

            console.log(registeredUser) // Тут наш зарегистрированный юзверь
            commit()
        })

    fastify.kafka.consume()
})

Let’s figure out what’s going on here?

  1. To begin with, we subscribe to all events necessary for this service in our service .subscribe(["user_register"]). In this case, this event is “user_register”

    Again, I will repeat that the keys of topics must be stored outside of all services, for their general access. In this case, it is done for the simplicity of the example.

  2. Next, we set up an event handler
    .on("user_register", (msg, commit) => {

  3. In the body of this event, we receive the result of the event we sent in the service userie:

    {
      id: "1726957487909",
      email: "[email protected]",
      username: "imbatman"
    }
  1. It remains for us to process in ours profile service given the event, and create a user profile based on the provided data

  2. Next, by calling the commit function presented in the argument, we confirm the receipt and processing of the message Kafkasignaling to the broker that this message can be marked as “read” in the current consumer group, preventing it from being received again

fastify.kafka.consume() in turn, activates the process of consuming messages from subscribed topics, in this case “user_register”. After calling this method, Fastify will start processing the incoming messages and pass them to the appropriate event handlers such as on("user_register", ...)

Conclusion

I hope that with such a simple example, I, first of all, as a person who has just become acquainted with this technology, and you, as readers of this article, managed to catch the first simple steps and understand the interaction of entities in the form of services (as in this example) among themselves!

Related posts