DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Setting Up Data Pipelines With Snowflake Dynamic Tables
  • How to Design Event Streams, Part 1
  • Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud
  • Kafka JDBC Source Connector for Large Data

Trending

  • Why Rate Limiting Matters in Istio and How to Implement It
  • Your Ultimate Website QA Checklist
  • Is the Model Context Protocol a Replacement for HTTP?
  • How to Write for DZone Publications: Trend Reports and Refcards
  1. DZone
  2. Data Engineering
  3. Data
  4. Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder

Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder

This article explains how to implement request tracing in spring cloud steam data pipelines with Kafka Binder.

By 
Hemanth Atluri user avatar
Hemanth Atluri
·
Nov. 21, 22 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
3.7K Views

Join the DZone community and get the full member experience.

Join For Free

What Is Data Pipeline? 

A data pipeline is a process to transfer data from a single source/multiple sources to a destination via batch or stream process. In this process, raw data is filtered, cleaned, transformed, enriched, and feed to data lakes or data warehouses. In an enterprise, data is scattered to multiple systems in different formats; in those cases, data pipelines help to collect data from all the sources in the same format so that it’s appropriate for business intelligence and analytics.

What Is Request Tracing? 

In a distributed system, a request travels multiple services before completion. Services can be hosted in different network zones, different VMs, different cloud providers, or any combination of these. Triaging an issue in this environment is tedious and time-consuming to eliminate; request tracing is helpful. A unique id is minted at the origin of the request, and it will be carried forward to all the systems request travel; with this approach, using a unique id, we can trace the journey of the request. 

What Is Spring Cloud Stream? 

As per the Spring document, Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with a shared messaging system. Spring Cloud Stream is built on Spring Boot and Spring Integration and supports multiple messaging frameworks from various providers.

Implementation Details

To implement request tracing in Spring Cloud Stream, I used preSend and afterSendCompletion methods in the ChannelInterceptor interface. 

preSend is invoked before sending a message to the spring cloud stream channel. In override implementation of the preSend method, check if requestId exists in the Kafka headers and carry forward to output Kafka messages. If requestId doesn’t exist in the header, it injects the requestId. 

afterSendCompletion is after the completion of a send. In override, implementation of the afterSendCompletion method clears the requestId in the threadLocal, so that the next Kafka message has its own requestId.

Below is the sample code:

Java
 
@Configuration
@Slf4j
public class CloudStreamInterceptor implements ChannelInterceptor {

    static final String type = "type";

    protected RequestTracingContext requestTracingContext;

    public CloudStreamInterceptor(RequestTracingContext requestTracingContext) {
        this.requestTracingContext = requestTracingContext;
    }

    /**
     * This overrided method mints the requestId in the Kafka Header before sending this to channel for processing
     * @param message
     * @param messageChannel
     * @return
     */
    @Override
    public Message<Object> preSend(Message<? extends Object> message, MessageChannel messageChannel) {
        try {
            if (messageChannel instanceof DirectWithAttributesChannel) {
                DirectWithAttributesChannel channel = (DirectWithAttributesChannel) messageChannel;
                String requestId = (String) message.getHeaders().get("requestId");
                Object channelType = channel.getAttribute(type);
                if (Sink.INPUT.equals(channelType)) {
                    preSendInput(message, requestId);
                } else if (Source.OUTPUT.equals(channelType)) {
                    preSendOutput(message, requestId);
                }

            }
        } catch (Exception ex) {
            log.error("Exception while minting requestId. Details: Header {}, Message {}, Exception {}", message.getHeaders(), message, ex);
        }
        return (Message<Object>) message;
    }

    /**
     *
     * @param message
     * @param requestId
     * @return
     */
    private Message<Object> preSendInput(Message<? extends Object> message, String requestId) {
        if (StringUtils.isBlank(requestId)) {
            requestId = UUID.randomUUID().toString();
            message = MessageBuilder.fromMessage(message).setHeader("requestId", requestId).build();

        }
        requestTracingContext.setRequestId(requestId);
        return (Message<Object>) message;
    }

    /**
     *
     * @param message
     * @param requestId
     * @return
     */
    private Message<Object> preSendOutput(Message<? extends Object> message, String requestId) {
        if (StringUtils.isBlank(requestId) && StringUtils.isNotBlank(requestTracingContext.getRequestId())) {
            requestId = UUID.randomUUID().toString();
            message = MessageBuilder.fromMessage(message).setHeader("requestId", requestId).build();

        }
        requestTracingContext.setRequestId(requestId);
        return (Message<Object>) message;
    }

    /**
     * This method clears the requestId in theardLocal after send completion
     * @param message
     * @param messageChannel
     * @param sent
     * @param ex
     */
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean sent, @Nullable Exception ex) {
        try {
            if (messageChannel instanceof DirectWithAttributesChannel) {
                DirectWithAttributesChannel channel = (DirectWithAttributesChannel) messageChannel;
                Object channelType = channel.getAttribute(type);
                if (Sink.INPUT.equals(channelType)) {
                    requestTracingContext.clear();
                }
            }
        } catch (Exception e) {
            log.error("Exception while minting requestId. Details: Header {}, Message {}, Exception {}", message.getHeaders(), message, e);
        }
    }
}


RequestTracingContext class helps in setting, retrieving, and clearing the requestId to the threadlocal. 

Below is the sample code:

Java
 
package com.example.demo.configuration;

import com.example.demo.requesttracing.RequestTracingContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageBuilder;

import java.util.UUID;

@Configuration
@Slf4j
public class CloudStreamInterceptor implements ChannelInterceptor {

    static final String type = "type";

    protected RequestTracingContext requestTracingContext;

    public CloudStreamInterceptor(RequestTracingContext requestTracingContext) {
        this.requestTracingContext = requestTracingContext;
    }

    /**
     * This overrided method mints the requestId in the Kafka Header before sending this to channel for processing
     * @param message
     * @param messageChannel
     * @return
     */
    @Override
    public Message<Object> preSend(Message<? extends Object> message, MessageChannel messageChannel) {
        try {
            if (messageChannel instanceof DirectWithAttributesChannel) {
                DirectWithAttributesChannel channel = (DirectWithAttributesChannel) messageChannel;
                String requestId = (String) message.getHeaders().get("requestId");
                Object channelType = channel.getAttribute(type);
                if (Sink.INPUT.equals(channelType)) {
                    preSendInput(message, requestId);
                } else if (Source.OUTPUT.equals(channelType)) {
                    preSendOutput(message, requestId);
                }

            }
        } catch (Exception ex) {
            log.error("Exception while minting requestId. Details: Header {}, Message {}, Exception {}", message.getHeaders(), message, ex);
        }
        return (Message<Object>) message;
    }

    /**
     *
     * @param message
     * @param requestId
     * @return
     */
    private Message<Object> preSendInput(Message<? extends Object> message, String requestId) {
        if (StringUtils.isBlank(requestId)) {
            requestId = UUID.randomUUID().toString();
            message = MessageBuilder.fromMessage(message).setHeader("requestId", requestId).build();

        }
        requestTracingContext.setRequestId(requestId);
        return (Message<Object>) message;
    }

    /**
     *
     * @param message
     * @param requestId
     * @return
     */
    private Message<Object> preSendOutput(Message<? extends Object> message, String requestId) {
        if (StringUtils.isBlank(requestId) && StringUtils.isNotBlank(requestTracingContext.getRequestId())) {
            requestId = UUID.randomUUID().toString();
            message = MessageBuilder.fromMessage(message).setHeader("requestId", requestId).build();

        }
        requestTracingContext.setRequestId(requestId);
        return (Message<Object>) message;
    }

    /**
     * This method clears the requestId in theardLocal after send completion
     * @param message
     * @param messageChannel
     * @param sent
     * @param ex
     */
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean sent, @Nullable Exception ex) {
        try {
            if (messageChannel instanceof DirectWithAttributesChannel) {
                DirectWithAttributesChannel channel = (DirectWithAttributesChannel) messageChannel;
                Object channelType = channel.getAttribute(type);
                if (Sink.INPUT.equals(channelType)) {
                    requestTracingContext.clear();
                }
            }
        } catch (Exception e) {
            log.error("Exception while minting requestId. Details: Header {}, Message {}, Exception {}", message.getHeaders(), message, e);
        }
    }
}


Conclusion

Request tracking is helpful when dealing with asynchronous processing and applications having heavy traffic. 

Spring Cloud Data (computing) kafka Pipeline (software) Stream (computing) Data Types

Opinions expressed by DZone contributors are their own.

Related

  • Setting Up Data Pipelines With Snowflake Dynamic Tables
  • How to Design Event Streams, Part 1
  • Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud
  • Kafka JDBC Source Connector for Large Data

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.