DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Enterprise AI Trend Report: Gain insights on ethical AI, MLOps, generative AI, large language models, and much more.

2024 Cloud survey: Share your insights on microservices, containers, K8s, CI/CD, and DevOps (+ enter a $750 raffle!) for our Trend Reports.

PostgreSQL: Learn about the open-source RDBMS' advanced capabilities, core components, common commands and functions, and general DBA tasks.

AI Automation Essentials. Check out the latest Refcard on all things AI automation, including model training, data security, and more.

Related

  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Getting Started With Postgres: Three Free and Easy Ways
  • Boosting Application Performance With MicroStream and Redis Integration
  • The Generic Way To Convert Between Java and PostgreSQL Enums

Trending

  • Securing Cloud Storage Access: Approach to Limiting Document Access Attempts
  • Secure Your API With JWT: Kong OpenID Connect
  • Maximizing Developer Efficiency and Productivity in 2024: A Personal Toolkit
  • Modern Python: Patterns, Features, and Strategies for Writing Efficient Code (Part 1)
  1. DZone
  2. Coding
  3. Java
  4. Debezium in Embedded Mode

Debezium in Embedded Mode

In this tutorial, learn how to run Debezium through a Java application without the need for a standalone Debezium server running or a streaming component.

By 
Emmanouil Gkatziouras user avatar
Emmanouil Gkatziouras
DZone Core CORE ·
Jan. 23, 24 · Tutorial
Like (4)
Save
Tweet
Share
4.7K Views

Join the DZone community and get the full member experience.

Join For Free

In a previous blog, we set up a Debezium server reading events from a PostgreSQL database. Then we streamed those changes to a Redis instance through a Redis stream.

We might get the impression that to run Debezium we need to have two extra components running in our infrastructure:

  • A standalone Debezium server instance.
  • A software component with streaming capabilities and various integrations, such as Redis or Kafka.

This is not always the case since Debezium can run in embedded mode. By running in embedded mode you use Debezium to read directly from a database’s transaction log. It is up to you how you are gonna handle the entries retrieved. The process of reading the entries from the transaction log can reside on any Java application thus there is no need for a standalone deployment.

Apart from the number of components reduced, the other benefit is that we can alter the entries as we read them from the database and take action in our application. Sometimes we might just need a subset of the capabilities offered.

Let’s use the same PostgreSQL configurations we used previously.

Properties files
 
listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3


Also, we shall create an initialization script for the table we want to focus on.

PLSQL
 
#!/bin/bash

set -e

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL

create schema test_schema;
  create table test_schema.employee(
          id  SERIAL PRIMARY KEY,
          firstname   TEXT    NOT NULL,
          lastname    TEXT    NOT NULL,
          email       TEXT    not null,
          age         INT     NOT NULL,
          salary         real,
          unique(email)
      );

EOSQL


Our Docker Compose file will look like this.

Dockerfile
 
version: '3.1'
  
services:
 
  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init:/docker-entrypoint-initdb.d
    command:
      - "-c"
      - "config_file=/etc/postgresql/postgresql.conf"
    ports:
      - 5432:5432


The configuration files we created are mounted onto the PostgreSQL Docker container. Docker Compose V2 is out there with many good features. Provided we run docker compose up, a Postgresql server with a schema and a table will be up and running. Also, that server will have logical decoding enabled and Debezium shall be able to track changes on that table through the transaction log. We have everything needed to proceed with building our application.

First, let’s add the dependencies needed:

XML
 
<properties>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <version.debezium>2.3.1.Final</version.debezium>
    <logback-core.version>1.4.12</logback-core.version>
</properties>
 
<dependencies>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-api</artifactId>
        <version>${version.debezium}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-embedded</artifactId>
        <version>${version.debezium}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-postgres</artifactId>
        <version>${version.debezium}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-storage-jdbc</artifactId>
        <version>${version.debezium}</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>${logback-core.version}</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>${logback-core.version}</version>
    </dependency>
</dependencies>


We also need to create the Debezium embedded properties:

Properties files
 
name=embedded-debezium-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.flush.interval.ms=60000
database.hostname=127.0.0.1
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=postgres
database.server.name==embedded-debezium
debezium.source.plugin.name=pgoutput
plugin.name=pgoutput
database.server.id=1234
topic.prefix=embedded-debezium
schema.include.list=test_schema
table.include.list=test_schema.employee


Apart from establishing the connection towards the PostgreSQL Database we also decided to store the offset in a file. By using the offset in Debezium we keep track of the progress we make in processing the events.

On each change that happens on the table, test_schema.employee we shall receive an event. Once we receive that event our codebase should handle it. To handle the events we need to create a DebeziumEngine.ChangeConsumer. The ChangeConsumer will consume the events emitted.

Java
 
package com.egkatzioura;
 
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;
 
import java.util.List;
 
public class CustomChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
 
    @Override
    public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
        for(RecordChangeEvent<SourceRecord> record: records) {
            System.out.println(record.record().toString());
        }
    }
 
}


Every incoming event will be printed on the console. Now we can add our main class where we set up the engine.

Java
 
package com.egkatzioura;
 
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.ChangeEventFormat;
 
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
 
public class Application {
 
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
 
        try(final InputStream stream = Application.class.getClassLoader().getResourceAsStream("embedded_debezium.properties")) {
            properties.load(stream);
        }
        properties.put("offset.storage.file.filename",new File("offset.dat").getAbsolutePath());
 
        var engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .using(properties)
                .notifying(new CustomChangeConsumer())
                .build();
        engine.run();
 
    }
 
}


Provided our application is running as well as the PostgreSQL database we configured previously, we can start inserting data.

SQL
 
docker exec -it debezium-embedded-postgres-1 psql postgres postgres
psql (15.3 (Debian 15.3-1.pgdg120+1))
Type "help" for help.
 
postgres=# insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23);


Also, we can see the change in the console.

Shell
 
SourceRecord{sourcePartition={server=embedded-debezium}, sourceOffset={last_snapshot_record=true, lsn=22518160, txId=743, ts_usec=1705916606794160, snapshot=true}} ConnectRecord{topic='embedded-debezium.test_schema.employee', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{embedded-debezium.test_schema.employee.Key:STRUCT}, value=Struct{after=Struct{id=1,firstname=John,lastname=Doe 1,email=john1@doe.com,age=18,salary=1234.23},source=Struct{version=2.3.1.Final,connector=postgresql,name=embedded-debezium,ts_ms=1705916606794,snapshot=last,db=postgres,sequence=[null,"22518160"],schema=test_schema,table=employee,txId=743,lsn=22518160},op=r,ts_ms=1705916606890}, valueSchema=Schema{embedded-debezium.test_schema.employee.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}


We did it. We managed to run Debezium through a Java application without the need for a standalone Debezium server running or a streaming component. You can find the code on GitHub.

Database Docker (software) Java (programming language) PostgreSQL Redis (company)

Published at DZone with permission of Emmanouil Gkatziouras, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Getting Started With Postgres: Three Free and Easy Ways
  • Boosting Application Performance With MicroStream and Redis Integration
  • The Generic Way To Convert Between Java and PostgreSQL Enums

Partner Resources


Comments

ABOUT US

  • About DZone
  • Send feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: