Your resource for web content, online publishing
and the distribution of digital products.
S M T W T F S
 
 
 
1
 
2
 
3
 
4
 
5
 
6
 
7
 
8
 
9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
 
31
 
 

Your Data Pipeline Is a Mess—Here’s How to Stop Duplicate Data From Wasting Millions

DATE POSTED:January 28, 2025

\ Data Engineers often face challenges with data being in inappropriate format especially junk characters and data, null or empty values, and most importantly dealing with duplicate data which impacts all downstream applications including reporting and data science models. This becomes a hefty daily task for engineers and support teams, draining their resources quickly without being productive. Often poorly designed frameworks have tough times for developers later to mitigate these data fixes. Many organizations have redundant data due to ineffective data pipeline architectures, costing them millions of dollars in storage costs, reprocessing the data several times, and poor resource utilization.

\ Let’s come to the point, in your current role, have you ever faced a challenge in handling duplicates in the streaming or batch data pipelines? Most of the data engineers, Data Scientists, and Data Analysts would say "YES". To correct duplicate data in a data lake, there are numerous tools in the current world but at what cost? Can you handle these in your architecture design phase itself? There could be many questions running into your mind.

\ Let’s discuss in detail what are the tools that could help you deduplicate the streaming data, their pros and cons, setup, and maintenance. Next, we will deep dive into are best practices and standards to handle duplicates in the streaming pipeline.

\

\ Let’s check three primary approaches to deduplication in streaming data pipelines:

Deduplication using Pub/Sub Message Attributes

All streaming pipelines extract data from different applications like IoT devices, sensors, game stats, traffic cameras and speed detector devices, and smart systems that stream vehicle usage data from autonomous vehicles. Most of these systems usually follow a pattern to stream events and each event would normally have a unique identifier, let’s say a Retail Store transaction ID for the sale transaction with it’s event timestamp. Some systems generally don’t have a unique identifier, examples like speed sensor devices usually have its Sensor ID but all the stream events don’t hold a unique identifier except the event timestamp. In these cases, there is a high possibility of duplicate streaming events for the same sensor device.

\

Think of a use case where streaming vehicle speeding data from a device on an interstate would normally range in large volumes per minute on a busy day. Another example is during holiday sale events, retail businesses must deal with billions of transactions per day. Handling such a volume of events in real-time and deduping the data is very crucial for accurate reporting and data science models to operate efficiently by removing outliers and duplicates.

\ Let’s talk in technical terms, google cloud provides Pub/Sub which is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages. It is heavily used for streaming analytics and data integration pipelines to load and distribute data. It is commonly used for Ingesting user interaction events, server events, real-time events, replicating data among databases, acts as an enterprise event bus to share business events across the organization, and data streaming from applications including sensors and application events used in conjunction with other google cloud products through a data pipeline.

\ Pub/Sub offers a simple yet powerful method to handle duplicate data using its attributes. Every message in Pub/Sub topic can include key-value pairs in the metadata. This data can be used to identify duplicate events and enable deduplication in the data pipeline without pushing the load on to data processing services which generally has higher resource costs and slows down the data pipeline greatly.

\ For messages that include a unique field like transaction_id, this value can be set as an attribute when publishing messages. When reading messages from Pub/Sub in Dataflow, you can configure the pipeline to deduplicate using this attribute.

\ This solution is effective when the duplicates are streaming from the source application or device using the unique identifier within the Pub/Sub topic. The limitation for this solution is, it only works well for duplicate messages published within 10 minutes window of each other. Though it is simple to implement but lacks scalability by the time window limitation in Pub/Sub. This is very useful in instances like, speeding camera or sensor devices generating duplicate events within 10minutes window of each message, this works great.

\ There could be instances where the duplicates generated within the publisher itself like Pub/Sub due to delay in consuming messages by the downstream or Pub/Sub never received an acknowledgement for the delivered messages, Pub/Sub retries to send the same message using the same Messageid, thereby creating duplicate events in the publisher. To tackle this, using Pub/Sub, we can determine the messageid of the payload and use this as an identifier. Cloud DataFlow a fully managed service for stream processing data on Google Cloud platform (GCP), provides exactly once processing of every record. What it means for us? - It identifies duplicate events based on message_id and eliminate them when processing in data pipelines but in rare cases these duplicate events when they were processed on the different worker nodes within dataflow, they get to the downstream ineffectively. You will still end up having duplicates in your data lake.

\ We will discuss further on how to handle such cases in this article towards the end. Let’s focus on remaining options to deduplicate the streaming data.

\

Deduplication using Apache Beam’s Deduplicate PTransform

\ Now we know how Pub/Sub handles duplicate events, next comes Processing of these messages using Cloud DataFlow with a Pub/Sub subscriber reads streaming messages from the source application. Dataflow is a fully managed service that uses open source Apache Beam SDK to enable advanced streaming capabilities. Dataflow scales to 4000 worker nodes per job and could processes petabytes of data with autoscaling features for better resource utilization in both batch and streaming pipelines.

\ Apache Beam offers a built-in Deduplicate PTransform that provides a more configurable and robust method for removing duplicates. This method uses Beam’s Stateful API to maintain a state for each key observed and removes duplicates within a user-defined time window. This approach allows you to define deduplication logic based on specific fields in your data or the entire message content, with the ability to configure deduplication based on event time or processing time.

\ Checkout my sample data pipeline code from GitHub to try this functionality.

\ One thing to note here is, batch pipelines always use exactly once processing whereas streaming pipelines use exactly-once processing by default but can be configured to use at least once processing as well. The catch here is to note that window that dataflow currently processing, when it crosses the window processing a duplicate message would not compare it with what it’s already processed because dataflow doesn’t store the record IDs in the memory. Dataflow may discards this message based on late arriving data or if the data pipeline has another leg to capture unprocessed messages and write to either a table in Cloud Bigquery - a fully-managed, cloud-native data warehouse on GCP or write a cloud storage - is a managed service for storing unstructured data, as a file for further reprocessing and troubleshooting purposes.

\ \

This solution provides a flexible option for processing complex deduplication login and making it suitable for scenarios where the deduplication window is larger or more complex than what Pub/Sub offers. Trade-offs include higher resource usage for maintaining each state to determine record uniqueness

\

Deduplication in the Sink

\ So far, we have seen how Publisher like Pub/Sub and integration services Cloud DataFlow handles duplicates in real-time. I think these solutions are not 100% effective when it comes to windowing due to processing overhead and volume issues, in such scenarios, to handle edge cases including where a duplicate message is a late arrival and dataflow thinks it is a unique record because it doesn’t hold the record IDs to cross check the messages uniqueness and in another scenario, dataflow handles these messages on different worker nodes due to network failures and/or worker node failures causes it to think it is a unique record while it is processing in dataflow and gets into the downstream systems like Google cloud bigquery table.

\ To mitigate such instances and as a final check for the deduplication can occur at the sink level, such as in BigQuery or other data warehouses. This approach is often useful when real-time deduplication isn’t critical, and periodic deduplication suffices. This will effectively filter out and eliminate all duplicate messages using advanced SQL queries.

\ Based on the use case, there are two types of solutions available for fixing duplicates.

\

First, use scheduled queries either through a composer DAG or within BigQuery console to create dedup table periodically using partitions (either Daily or hourly), making it a simple choice for anyone to create the process and store the dedup data in a staging table and load the distinct data into the final table.

\ Second, we can either use a materialized view to get real-time data making it an ideal solution for getting business insights quickly.

\

\ Bigquery SQL query is presented on my Github dedup_sql link.

\ Below bigquery sql code explains two options that we have discussed:

-- Use below SQL queries to periodically deduplicate data in BigQuery tables. CREATE OR REPLACE TABLE Transactions AS SELECT DISTINCT * FROM raw_transactions; --OR use below incremental steps to drop the necessary partitions and re-insert the deduped data into the original table -- Step 1: Insert distinct records from the original table based on the max timestamp available CREATE OR REPLACE TABLE STAGE_TRANSACTIONS AS SELECT DISTINCT * FROM raw_transactions WHERE event_timestamp > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 2: Drop the partition after deduplication DELETE FROM raw_transactions WHERE event_timestamp = > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 3: Insert deduplicated data into the main table INSERT INTO raw_transactions SELECT DISTINCT * FROM STAGE_TRANSACTIONS; --OR Use below SQL query to Merge new data without duplicates the table MERGE INTO raw_transactions AS target USING ( SELECT * FROM STAGE_TRANSACTIONS ) AS source ON target.transaction_id = source.transaction_id AND target.event_timestamp <= source.event_timestamp WHEN MATCHED THEN UPDATE SET target.product = source.product, target.price = source.price, target.location = source.location, target.store = source.store, target.zipcode = source.zipcode, target.city = source.city, target.promotion = source.promotion, target.event_timestamp = source.event_timestamp WHEN NOT MATCHED THEN INSERT (transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp) VALUES (source.transaction_id, source.product, source.price, source.location, source.store, source.zipcode, source.city, source.promotion, source.event_timestamp); --OR to get the real-time data without duplicates, use following materialized view and a finest solution to retrieve dedup records quickly CREATE MATERIALIZED VIEW raw_transactions_mv AS SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp FROM ( SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp, ROW_NUMBER() OVER ( PARTITION BY transaction_id ORDER BY event_timestamp DESC ) AS row_num FROM raw_transactions ) WHERE row_num = 1; Technical Trade-Offs

Each deduplication strategy comes with its own set of trade-offs. Here's a summary to help you choose the right approach:

| Method | Advantages | Disadvantages | |----|----|----| | Pub/Sub Message Attributes | Low-latency, native to Pub/Sub | Limited to 10-minute deduplication window | | Apache Beam Deduplicate | Highly flexible, supports complex deduplication logic | Higher resource consumption due to state management | | Sink-Based Deduplication | Suitable for batch or periodic updates, minimal logic | May introduce latency; orchestration tools may be needed |

In a nutshell

Deduplication is a cornerstone of effective data processing in streaming pipelines. The choice of strategy depends on your pipeline’s real-time needs, complexity, and resource constraints. By leveraging the strengths of Pub/Sub attributes, Apache Beam Deduplicate PTransform, or sink-based deduplication, you can ensure clean, reliable data for downstream systems. Explore these approaches, implement the examples provided, and adapt them to your use case for optimal results.

\ Are you interested in more in-depth guides on data analytics and machine learning? Follow me on Medium or LinkedIn for the latest articles, and feel free to share your thoughts or questions in the comments below. If you found this article helpful, share it with your network and help others unlock the potential of real-time analytics in retail.