DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Reactive Kafka With Streaming in Spring Boot
  • How to Design Event Streams, Part 2
  • How to Design Event Streams, Part 1
  • Competing Consumers With Spring Boot and Hazelcast

Trending

  • Unlocking the Benefits of a Private API in AWS API Gateway
  • Integrating Security as Code: A Necessity for DevSecOps
  • Unlocking AI Coding Assistants Part 2: Generating Code
  • Java's Quiet Revolution: Thriving in the Serverless Kubernetes Era
  1. DZone
  2. Software Design and Architecture
  3. Microservices
  4. Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)

Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)

A guide to streaming events from Kafka to Redis, using SSE to ensure only relevant events are processed and sent to individual client IDs.

By 
Greg Lawson user avatar
Greg Lawson
·
Aug. 17, 23 · Presentation
Likes (10)
Comment
Save
Tweet
Share
9.6K Views

Join the DZone community and get the full member experience.

Join For Free

This article outlines a solution for streaming events from Kafka, forwarding them to Redis using its Stream API, and reading individual streams from Redis via its streaming API. The added complexity in this scenario is the need to stream events from an HTTP endpoint using Server-Sent Events (SSE) while ensuring that only events relevant to a specific client ID are processed and sent.

Problem Statement

Many companies have an existing Kafka infrastructure where events are being produced. Our goal is to set up a system that subscribes to Kafka messages but only processes events relevant to a specific client ID. These filtered events should be forwarded to Redis using its Stream API. Additionally, we need to establish an HTTP endpoint for Server-Sent Events (SSE) that allows the specified client to receive real-time event updates.

Solution Architecture Overview

The architecture consists of the following components:

  • Kafka: A distributed event streaming platform that allows you to publish and subscribe to streams of records (events).
  • Spring Boot: A framework for building Java applications. We'll use it to create Kafka consumers and Redis Stream producers. Subscribes to Kafka messages, filters events based on the client ID, and forwards relevant events to Redis Streams.
  • Redis: A high-performance, in-memory data store. We'll use its Streams feature to handle event streams. Stores the streamed events using its Streams API.
  • Docker: A containerization platform. We'll use Docker and Docker-Compose to create containers for Kafka, Redis, and our Spring Boot application. We will utilize this for a local POT, POC.
  • HTTP Server-Sent Events (SSE) Endpoint: Provides real-time event updates to the client, filtering events based on the client ID.

Solution Architecture


Redis Streams 

Redis Streams is a feature in Redis that provides a way to handle real-time data streams with various use cases. Here are some scenarios where you might want to use Redis Streams:

  • Real-Time Event Processing: Redis Streams are excellent for processing and storing real-time events. You can use it for things like logging, monitoring, tracking user activities, or any use case that involves handling a continuous stream of events.
  • Task Queues: If you need a reliable and distributed task queue, Redis Streams can be a great choice. It allows you to push tasks into a stream and have multiple consumers process those tasks concurrently.
  • Activity Feeds: If you're building a social network or any application that requires activity feeds, Redis Streams can efficiently handle the feed data, ensuring fast access and scalability.
  • Message Brokering: Redis Streams can serve as a lightweight message broker for microservices or other distributed systems. It can handle message routing and ensure that messages are delivered to interested consumers.
  • Real-Time Analytics: When you need to analyze data in real-time, Redis Streams can be useful for storing the incoming data and then processing and aggregating it using Redis capabilities.
  • IoT Data Ingestion: If you're dealing with data from Internet of Things (IoT) devices, Redis Streams can handle the high-throughput and real-time nature of the data generated by these devices.
  • Logging and Audit Trails: Redis Streams can be used to store logs or audit trails in real-time, making it easy to analyze and troubleshoot issues.
  • Stream Processing: If you need to process a continuous stream of data in a specific order (for example, financial transactions or sensor readings), Redis Streams can help you manage the data in the order it was received.

Prerequisites

  • Docker and Docker Compose are installed.
  • Basic understanding of Spring Boot, Redis Streams, and Kafka.
  • Java 17 or higher
  • An HTTP client of your choice. I used [httpie]

Of course, you can get the code here.

Steps

1. Set Up Docker Compose for the Backend Infrastructure and the Spring Boot App

Create a `docker-compose.yml` file to define the services:

YAML
 
version: '3.8'
services:
   zookeeper:
      image: confluentinc/cp-zookeeper:latest
      environment:
         ZOOKEEPER_CLIENT_PORT: 2181
         ZOOKEEPER_TICK_TIME: 2000
      ports:
         - 22181:2181

      networks:
         node_net:
            ipv4_address: 172.28.1.81

   kafka:
      image: confluentinc/cp-kafka:latest
      depends_on:
         - zookeeper
      ports:
         - 29092:29092
         - 9092:9092
         - 9093:9093
      environment:
         KAFKA_BROKER_ID: 1
         KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
         KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092,,EXTERNAL://172.28.1.93:9093
         KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,EXTERNAL:PLAINTEXT
         KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
         KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

      networks:
         node_net:
            ipv4_address: 172.28.1.93

   cache:
      image: redis:6.2-alpine
      #image: redis:5.0.3-alpine
      restart: always
      ports:
         - '6379:6379'
      #command: redis-server --save 20 1 --loglevel warning --requirepass eYVX7EwVmmxKPCDmwMtyKVge8oLd2t81
      command: redis-server /usr/local/etc/redis/redis.conf --loglevel verbose --save 20 1
      volumes:
         - cache:/data
         - ./redis.conf:/usr/local/etc/redis/redis.conf
         - $PWD/redis-data:/var/lib/redis

         #environment:
         # - REDIS_REPLICATION_MODE=master

      networks:
         node_net:
            ipv4_address: 172.28.1.79

volumes:
   cache:
      driver: local

networks:
   node_net:
      ipam:
         driver: default
         config:
            - subnet: 172.28.0.0/16


Create the yaml for the application 'sse-demo.yml'.

YAML
 
version: "3.8"
services:
  sse-demo:
    image: "sse/spring-sse-demo:latest"
    ports:
      - "8080:8080"
      #- "51000-52000:51000-52000"

    env_file:
      - local.env

    environment:
      - REDIS_REPLICATION_MODE=master
      - SPRING_PROFILES_ACTIVE=default
      - REDIS_HOST=172.28.1.79
      - REDIS_PORT=6379
      - KAFKA_BOOTSTRAP_SERVERS=172.28.1.93:9093

    networks:
      node_net:
        ipv4_address: 172.28.1.12

networks:
  node_net:
    external:
      name: docker_node_net


2. Create Spring Boot Application

Create a Spring Boot application with the required dependencies:

Shell
 
git checkout https://github.com/glawson6/spring-sse-demo.git


In `pom.xml,` add the necessary dependencies for Kafka and Redis integration.

3. Implement Kafka Consumer and Redis Stream Producer

Create a Kafka consumer that listens to Kafka events and sends them to Redis Streams. This component also consumes the Redis streams for the HTTP clients:

Java
 
package com.taptech.sse.event;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.taptech.sse.utils.DurationSupplier;
import com.taptech.sse.utils.ObjectMapperFactory;
import com.taptech.sse.config.SSEProperties;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.function.Function;

public class DefaultEventReceiverService implements EventReceiverService {
	private static final Logger logger = LoggerFactory.getLogger(DefaultEventReceiverService.class);

	public final static String TEST_STREAM = "test.stream";
	public final static String TEST_STREAM_KEY = "test.stream.key";
	public final static String TEST_STREAM_VALUE = "test.stream.value";
	private static final String EMPTY_STR = "";
	public static final String CLIENT_STREAM_STARTED = "client.stream.started";
	public static final String HASH_PREFIX = "hash.";

	private static ObjectMapper objectMapper = ObjectMapperFactory.createObjectMapper(ObjectMapperFactory.Scope.SINGLETON);

	ReactiveStringRedisTemplate redisTemplate;
	KafkaReceiver<String, String> kafkaReceiver;
	SSEProperties sseProperties;
	StreamReadOptions streamReadOptions;

	public DefaultEventReceiverService(ReactiveStringRedisTemplate redisTemplate, KafkaReceiver<String, String> kafkaReceiver,
									   SSEProperties sseProperties) {
		this.redisTemplate = redisTemplate;
		this.kafkaReceiver = kafkaReceiver;
		this.sseProperties = sseProperties;
		this.streamReadOptions = StreamReadOptions.empty().autoAcknowledge()
				.block(Duration.of(sseProperties.getClientHoldSeconds(), ChronoUnit.SECONDS));
	}

	static final Function<String,String> calculateHashKey = str -> new StringBuilder(HASH_PREFIX).append(str).toString();

	@PostConstruct
	public void init() {

		this.redisTemplate.opsForValue().append(TEST_STREAM_KEY, TEST_STREAM_VALUE).subscribe();

	}

	@EventListener(ApplicationStartedEvent.class)
	public Disposable startKafkaConsumer() {
		logger.info("############# Starting Kafka listener.....");
		return kafkaReceiver.receive()
				.doOnError(error -> logger.error("Error receiving event, will retry", error))
				.retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(sseProperties.getTopicRetryDelaySeconds())))
				.doOnNext(record -> logger.info("Received event: key {}", record.key()))
				.filterWhen(record -> checkIfStreamBeingAccessed(record))
				.concatMap(this::handleEvent)
				.subscribe(record -> record.receiverOffset().acknowledge());
	}

	Mono<Boolean> checkIfStreamBeingAccessed(ReceiverRecord<String,String> record){
		return this.redisTemplate.opsForHash().hasKey(calculateHashKey.apply(record.key()), CLIENT_STREAM_STARTED)
				.doOnNext(val -> logger.info("key => {}'s stream is being accessed {}",record.key(),val));
	}

	public Mono<ReceiverRecord<String, String>> handleEvent(ReceiverRecord<String, String> record) {
		return Mono.just(record)
				.flatMap(this::produce)
				.doOnError(ex -> logger.warn("Error processing event: key {}", record.key(), ex))
				.onErrorResume(ex -> Mono.empty())
				.doOnNext(rec -> logger.debug("Successfully processed event: key {}", record.key()))
				.then(Mono.just(record));
	}

	public Mono<Tuple2<RecordId, ReceiverRecord<String, String>>> produce(ReceiverRecord<String, String> recRecord) {

		ObjectRecord<String, String> record = StreamRecords.newRecord()
				.ofObject(recRecord.value())
				.withStreamKey(recRecord.key());
		return this.redisTemplate.opsForStream().add(record)
				.map(recId -> Tuples.of(recId, recRecord));
	}

	Function<ObjectRecord<String, String>, NotificationEvent> convertToNotificationEvent() {
		return (record) -> {
			NotificationEvent event = null;
			try {
				event = objectMapper.readValue(record.getValue(), NotificationEvent.class);
			} catch (JsonProcessingException e) {
				e.printStackTrace();
				event = new NotificationEvent();
			}
			return event;
		};
	}

	private Mono<String> createGroup(String workspaceId){
		return redisTemplate.getConnectionFactory().getReactiveConnection().streamCommands()
				.xGroupCreate(ByteBuffer.wrap(workspaceId.getBytes()), workspaceId, ReadOffset.from("0-0"), true)
				.doOnError((error) -> {
					if (logger.isDebugEnabled()){
						logger.debug("Could not create group.",error);
					}
				})
				.map(okStr -> workspaceId)
				.onErrorResume((error) -> Mono.just(workspaceId));
	}

	private Flux<NotificationEvent> findClientNotificationEvents(Consumer consumer, StreamOffset<String> streamOffset, DurationSupplier booleanSupplier){
		return this.redisTemplate.opsForStream().read(String.class, consumer, streamReadOptions, streamOffset)
				.map(convertToNotificationEvent())
				.repeat(booleanSupplier);
	}

	public Flux<NotificationEvent> consume(final String clientId){
		return Flux.from(createGroup(clientId))
				.flatMap(id -> addIdToStream(clientId))
				.map(id -> Tuples.of(StreamOffset.create(clientId, ReadOffset.lastConsumed()),
						Consumer.from(clientId, clientId),
						new DurationSupplier(Duration.of(sseProperties.getClientHoldSeconds(), ChronoUnit.SECONDS), LocalDateTime.now())))
				.flatMap(tuple3 -> findClientNotificationEvents(tuple3.getT2(), tuple3.getT1(), tuple3.getT3()));

	}

	private Mono<String> addIdToStream(String id) {
		return this.redisTemplate.opsForHash().put(calculateHashKey.apply(id), CLIENT_STREAM_STARTED, Boolean.TRUE.toString()).map(val -> id);
	}

	public Flux<Boolean> deleteWorkspaceStream(String workspaceId){
		StreamOffset<String> streamOffset = StreamOffset.create(workspaceId, ReadOffset.lastConsumed());
		StreamReadOptions streamReadOptions = StreamReadOptions.empty().noack();
		Consumer consumer = Consumer.from(workspaceId, workspaceId);

		return this.redisTemplate.opsForStream().read(String.class, consumer, streamReadOptions, streamOffset)
				.flatMap(objRecord -> this.redisTemplate.opsForStream().delete(workspaceId,objRecord.getId()).map(val -> objRecord))
				.flatMap(objRecord -> this.redisTemplate.opsForHash().delete(workspaceId));
	}

	@Override
	public Flux<String> consumeString(String clientId) {
		return this.redisTemplate.opsForStream().read(String.class, StreamOffset.latest(clientId)).map(ObjectRecord::getValue);
	}

}


4. Configure Services in Spring Boot

Properties files
 
cache.provider.name=redis
cache.host=${REDIS_HOST:localhost}
cache.port=${REDIS_PORT:6379}
cache.password=password


# Producer properties
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id=group_id
spring.kafka.boostrap.servers=${KAFKA_BOOTSTRAP_SERVERS:loclahost:9093}


# Common Kafka Properties
auto.create.topics.enable=true
sse.client-hold-seconds=${SSE_CLIENT_HOLD_SECONDS:120}

logging.level.root=INFO
logging.level.com.taptech.sse=DEBUG


5. Build Docker Image for Spring Boot App

Using the `kubernetes-maven-plugin` from jkube, create the image for your Spring Boot application:

Shell
 
./mvnw clean package -Dmaven.test.skip=true k8s:build


6. Start the Services and Run the Application

From the src/test/resources/docker directory

Start the services:

Shell
 
./startServices.sh


Start the app:

Shell
 
./start-sse-demo.sh


7. Connect to a Stream Using One of the IDS in the client-ids.json File 

Shell
 
http --stream GET http://localhost:8080/sse clientId==dd07bd51-1ab0-4e69-a0ff-f625fa9e7fc0


8. Generate Some Events

You can do an HTTP POST to http://localhost:8080/sse/generateNE

Shell
 
http POST http://localhost:8080/sse/generateNE


After this, watch as your HTTP client receives an event for the clientId that it is subscribed to.

Discussion

Why would you use Kafka and Redis? Does not Kafka offer this alone? Many companies have invested in Kafka as a backend message provider between their systems. Kafka in itself does not handle message selection very easily. 

Message selection is not a typical feature provided natively by Kafka for a couple of reasons:

  • Data Size and Latency: Kafka is designed for high-throughput, low-latency message processing. Its architecture focuses on distributing messages to a large number of consumers quickly. Introducing message selection based on arbitrary conditions can slow down the overall processing and introduce latency, which goes against Kafka's primary design goals.
  • Idempotency: Kafka relies on the concept of idempotent producers and consumers. This means that if a consumer or producer retries a message due to a failure, it should not result in duplicate processing. Introducing selective message retrieval would complicate this idempotency guarantee, potentially leading to unintended duplicate processing.
  • Consumer Offset Tracking: Kafka maintains consumer offsets, allowing consumers to keep track of the last processed message. If message selection is introduced, offsets become less straightforward, as some messages might be skipped based on selection criteria.
  • Decoupled Architecture: Kafka is designed to decouple producers from consumers. Producers are unaware of consumer behavior, and consumers can independently decide what messages they want to consume. Message selection would break this decoupling, as producers would need to know which messages to produce based on specific consumer needs.
  • Consumer Flexibility: Kafka consumers can be highly flexible in terms of message processing. They can be designed to filter, transform, and aggregate messages based on their own criteria. Introducing message selection at the Kafka level would limit this flexibility and make the system less adaptable to changing consumer requirements.
  • Scaling and Parallelism: Kafka's scalability and parallelism benefits come from the ability to distribute messages across multiple partitions and allow multiple consumers to process messages in parallel. Selective message retrieval would complicate this parallelism, making it harder to distribute work efficiently.

While Kafka itself doesn't provide native message selection features, it's essential to design the consumers to handle message filtering and selection if needed. Consumers can be designed to filter and process messages based on specific criteria, ensuring that only relevant messages are processed within the consumer application. This approach allows Kafka to maintain its core design principles while still providing the flexibility needed for various message-processing scenarios.

Kafka could not essentially solve the problem in an easy way, which lead to pushing the messages to another persistent space that could easily select based on known criteria. This requirement leads to the decision to use Redis and allow pushing messages directly to Redis.

A decision was made to limit the events being pushed into Redis based on whether there was a client actually expecting a message. If there were no clients, then Kafka messages were being filtered out.

Java
 
.filterWhen(record -> checkIfStreamBeingAccessed(record))


The client registers the id so that the Kafka listener will push the to the Redis stream. 

Java
 
.flatMap(id -> addIdToStream(clientId))


Conclusion

By following the steps outlined in this document, we have successfully implemented an event streaming architecture that takes events from Kafka, filters them based on a specific client ID, and forwards the relevant events to Redis using its Stream API. The SSE endpoint allows clients to receive real-time event updates tailored to their respective client IDs. This solution provides an efficient and scalable way to handle event streaming for targeted clients.

Server-sent events Event kafka Redis (company) Spring Boot Stream (computing)

Opinions expressed by DZone contributors are their own.

Related

  • Reactive Kafka With Streaming in Spring Boot
  • How to Design Event Streams, Part 2
  • How to Design Event Streams, Part 1
  • Competing Consumers With Spring Boot and Hazelcast

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends: