DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Enterprise AI Trend Report: Gain insights on ethical AI, MLOps, generative AI, large language models, and much more.

2024 Cloud survey: Share your insights on microservices, containers, K8s, CI/CD, and DevOps (+ enter a $750 raffle!) for our Trend Reports.

PostgreSQL: Learn about the open-source RDBMS' advanced capabilities, core components, common commands and functions, and general DBA tasks.

AI Automation Essentials. Check out the latest Refcard on all things AI automation, including model training, data security, and more.

Related

  • What Is a Streaming Database?
  • Data Vault Data Model: An Efficient and Agile Approach for Data Warehousing
  • Release Management Risk Mitigation Strategies in Data Warehouse Deployments
  • Databricks: An Understanding Inside the WH

Trending

  • The Impact of Biometric Authentication on User Privacy and the Role of Blockchain in Preserving Secure Data
  • Spring Boot 3.2: Replace Your RestTemplate With RestClient
  • Types of Data Breaches in Today’s World
  • Building Safe AI: A Comprehensive Guide to Bias Mitigation, Inclusive Datasets, and Ethical Considerations
  1. DZone
  2. Data Engineering
  3. Data
  4. High Throughput vs. Low Latency in Data Writing: A Way to Have Both

High Throughput vs. Low Latency in Data Writing: A Way to Have Both

Moving your data to Doris based on Flink streaming.

By 
Frank Z user avatar
Frank Z
·
Feb. 26, 23 · Tutorial
Like (1)
Save
Tweet
Share
4.0K Views

Join the DZone community and get the full member experience.

Join For Free

This article is about how Apache Doris helps you import data and conduct Change Data Capture (CDC) from upstream databases like MySQL to Doris based on Flink streaming. But first of all, you might ask: What is Apache Doris and why would I bother to do so?

Well, Apache Doris is an open-source real-time analytical data warehouse that supports both high-concurrency point queries and high-throughput complex analysis. It provides sub-second analytic query capabilities and comes in handy in multi-dimensional analysis, dashboarding, and other real-time data services.

Overview

  • How to perform end-to-end data synchronization within seconds
  • How to ensure real-time data visibility
  • How to smoothen the writing of massive small files
  • How to ensure end-to-end Exactly-Once processing

Real-Timeliness

  • Stream Write
The Flink-Doris Connector in Doris used to follow a "Cache and Batch Write" method for data ingestion. However, that requires a wise choice of batch size and batch write interval; otherwise things could go wrong. For example, if the batch size is too large, OOM errors could occur. On the other hand, frequent writes could lead to too many data versions generated. 

To avoid such troubles, Doris implements a Stream Write method, which works as follows:

  1. A Flink task, once started, asynchronously initiates a Stream Load HTTP request.
  2. The data is transmitted to Doris via the chunked transfer encoding mechanism of HTTP.
  3. The HTTP request ends at Checkpoint, which means the Stream Load task is completed. Meanwhile, the next Stream Load request will be asynchronously initiated.
  4. Repeat the above steps.
  • Transaction Processing
  • Quick Aggregation of Data Versions
Highly concurrent writing of small files can generate too many data versions in Doris and slow down data queries. Thus, Doris has enhanced its data compaction capability in order to quickly aggregate data.

Firstly, Doris introduced Quick Compaction. Specifically speaking, data compaction will be triggered once data versions increase. Meanwhile, by scanning the metadata of tablets, Doris can identify those tablets with too many data versions and conduct compaction correspondingly.

Secondly, for the writing of small files, which happens in high concurrency and frequency, Doris implements Cumulative Compaction. It isolates these compaction tasks from the heavyweight Base Compaction from a scheduling perspective to avoid mutual influence between them.

Last but not least, Doris adopts a tiered data aggregation method, which ensures that each aggregation only involves files of similar sizes. This greatly reduces the total number of aggregation tasks and the CPU usage of the system.

Exactly-Once

The Exactly-Once semantics means that the data will be processed once and only once. It prevents the data from getting reprocessed or lost even if the machine or application fails.

Flink implements a 2PC protocol to realize the Exactly-Once semantics of Sink operators. Based on this, the Flink-Doris Connector in Doris implements Stream Load 2PC to deliver Exactly-Once processing. The details are as follows:

  1. A Flink task will initiate a Stream Load PreCommit request once it is started. Then, a transaction will be opened, and data will be continuously sent to Doris via the chunked mechanism of HTTP.

  1. The HTTP request ends at Checkpoint and the Stream Load is completed. The transaction status will be set to Pre-Committed. At this time, the data has been written to BE and become invisible to users.

  1. The Checkpoint initiates a request and changes the transaction status to Committed. After this, the data will become visible to users.

  1. In the case of Flink application failures, if the previous transaction is in Pre-Committed status, the Checkpoint will initiate a rollback request and change the transaction status to Aborted.

Performance of Doris in High-Concurrency Scenarios

Scenario Description

Import data from Kafka using Flink. After ETL, use the Flink-Doris Connector for real-time data ingestion into Doris.

Requirements

The upstream data is written into Doris at a high frequency of 100,000 per second. To achieve real-time data visibility, the upstream and downstream data needs to be synchronized within around 5s.

Flink Configurations

Concurrency: 20

Checkpoint Interval: 5s

Here's how Doris does it:

Compaction Real-Timeliness

As the result shows, Doris manages to aggregate data quickly and keep the number of data versions in tablets below 50. Meanwhile, the Compaction Score remains stable.


CPU Usage

After optimizing the compaction strategy of small files, Doris reduces CPU usage by 25%.


Query Latency

By reducing the CPU usage and the number of data versions, Doris arranges the data more orderly and thus enables much lower query latency.


Performance of Doris in Low-Latency Scenarios (High-Level Stress Test)

Description

  • Single-BE, single-tablet Stream Load stress test on the client side
  • Data real-timeliness <1s

Here are the Compaction Scores before and after optimization:

Suggestions for Using Doris

Low-Latency Scenarios

As for scenarios requiring real-time data visibility (such as data synchronization within seconds), the files in each ingestion are usually small in size. Thus, it is recommended to reduce cumulative_size_based_promotion_min_size_mbytefrom the default value of 64 to 8 (measured in MB). This can greatly improve the compaction performance.

High-Concurrency Scenarios

For highly concurrent writing scenarios, it is recommended to reduce the frequency of Stream Load by increasing the Checkpoint interval to 5–10s. This not only increases the throughput of Flink tasks, but also reduces the generation of small files and thus avoids extra pressure on compaction. In addition, for scenarios with less strict requirements for real-timeliness (such as data synchronization within minutes), it is recommended to increase the Checkpoint interval to 5–10 minutes. In this way, the Flink-Doris Connector can still ensure data integrity via the 2PC+Checkpoint mechanism.

Conclusion

Apache Doris realizes data real-timeliness by its Stream Write method, transaction processing capability, and aggregation of data versions. These techniques help it reduce memory and CPU usage, which enables lower latency. In addition, for data integrity and consistency, Doris implements Stream Load 2PC to guarantee that all data is processed exactly once. This is how Doris facilitates quick and safe data ingestion.
Aggregate data Data integrity Data synchronization Data warehouse Data (computing) Stream (computing) Stream processing

Opinions expressed by DZone contributors are their own.

Related

  • What Is a Streaming Database?
  • Data Vault Data Model: An Efficient and Agile Approach for Data Warehousing
  • Release Management Risk Mitigation Strategies in Data Warehouse Deployments
  • Databricks: An Understanding Inside the WH

Partner Resources


Comments

ABOUT US

  • About DZone
  • Send feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: