DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Enterprise AI Trend Report: Gain insights on ethical AI, MLOps, generative AI, large language models, and much more.

2024 Cloud survey: Share your insights on microservices, containers, K8s, CI/CD, and DevOps (+ enter a $750 raffle!) for our Trend Reports.

PostgreSQL: Learn about the open-source RDBMS' advanced capabilities, core components, common commands and functions, and general DBA tasks.

AI Automation Essentials. Check out the latest Refcard on all things AI automation, including model training, data security, and more.

Related

  • Scaling Java Microservices to Extreme Performance Using NCache
  • Testcontainers With Kotlin and Spring Data R2DBC
  • Build a Java Microservice With AuraDB Free
  • Ultra-Fast Microservices: When MicroStream Meets Helidon

Trending

  • Navigating the Digital Frontier: A Journey Through Information Technology Progress
  • RRR Retro and IPL for Rewards and Recognition
  • Harnessing the Power of SIMD With Java Vector API
  • DSL Validations: Properties
  1. DZone
  2. Data Engineering
  3. Databases
  4. Transactional Outbox Patterns Step by Step With Spring and Kotlin

Transactional Outbox Patterns Step by Step With Spring and Kotlin

Distributed microservices Transactional Outbox pattern step-by-step implementation guide with Reactive Spring and Kotlin with coroutines.

By 
Alexander Bryksin user avatar
Alexander Bryksin
·
May. 30, 23 · Tutorial
Like (4)
Save
Tweet
Share
6.0K Views

Join the DZone community and get the full member experience.

Join For Free

The reason why we need a Transactional Outbox is that a service often needs to publish messages as part of a transaction that updates the database. Both the database update and the sending of the message must happen within a transaction. Otherwise, if the service doesn’t perform these two operations automatically, a failure could leave the system in an inconsistent state.

The GitHub repository with the source code for this article.

Transactional Outbox

In this article, we will implement it using Reactive Spring and Kotlin with Coroutines. Here is a full list of used dependencies: Kotlin with Coroutines, Spring Boot 3, WebFlux, R2DBC, Postgres, MongoDB, Kafka, Grafana, Prometheus, Zipkin, and Micrometer for observability.

The Transactional Outbox pattern solves the problem of the implementation where usually the transaction tries to update the database table, then publishes a message to the broker and commits the transaction. But here is the problem: If the last step of the transaction fails, the transaction will roll back database changes, but the event has already been published to the broker. So, we need to find a way to guarantee both databases are written and published to the broker. The idea of how we can solve it is in one transaction, save it to the orders table, and in the same transaction, save to the outbox table and commit the transaction. Then, we have to publish saved events from the outbox table to the broker. 

We have two ways to do that; using a CDC (Change data capture) tool like Debezium, which continuously monitors your databases and lets any of your applications stream every row-level change in the same order they were committed to the database and polling publisher. For this project, we used polling publisher. Highly recommend Chris Richardson's Book: Microservices Patterns, where the Transactional Outbox pattern is very well explained.

One more important thing is that we have to be ready for cases when the same event can be published more than one time, so the consumer must be idempotent. Idempotence describes the reliability of messages in a distributed system, specifically the reception of duplicated messages. Because of retries or message broker features, a message sent once can be received multiple times by consumers. A service is idempotent if processing the same event multiple times results in the same state and output as processing that event just a single time. The reception of a duplicated event does not change the application state or behavior. Most of the time, an idempotent service detects these events and ignores them. Idempotence can be implemented using unique identifiers.

So, let’s implement it. The business logic of our example microservice is simple: orders with product shop items; it’s two tables for simplicity and an outbox table, of course. Usually, when an outbox table looks like it does when in the data field, we store serialized events. The most common is the JSON format, but it’s up to you and concrete microservices. We can put as data field state changes or can simply put every time the last updated full order domain entity; of course, state changes take much less size, but again it’s up to you. Other fields in the outbox table usually include event type, timestamp, version, and other metadata. It depends on each concrete implementation, but often it’s required minimum. The version field is for concurrency control.

All UI interfaces will be available on ports:

Swagger UI

URL.
Swagger UI

Grafana UI

URL.

Grafana UI

Zipkin UI

URL.

Zipkin UI

Kafka UI

URL.

Kafka UI

Prometheus UI

URL.
Prometheus UI

The docker-compose file for this article has Postgres, MongoDB, zookeeper, Kafka, Kafka-ui, Zipkin, Prometheus, and Grafana,

For local development run: use make local or make develop, first run only docker-compose, second same include the microservice image.

YAML
 
version: "3.9"

services:
  microservices_postgresql:
    image: postgres:latest
    container_name: microservices_postgresql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=microservices
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./docker_data/microservices_pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  zoo1:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888
    volumes:
      - "./zookeeper:/zookeeper"
    networks: [ "microservices" ]

  kafka1:
    image: confluentinc/cp-kafka:7.3.0
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
    volumes:
      - "./kafka_data:/kafka"
    networks: [ "microservices" ]

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8086:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:19092
    networks: [ "microservices" ]

  zipkin-all-in-one:
    image: openzipkin/zipkin:latest
    restart: always
    ports:
      - "9411:9411"
    networks: [ "microservices" ]

  mongo:
    image: mongo
    restart: always
    ports:
      - "27017:27017"
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: bank_accounts
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: microservices_node_exporter
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: microservices_grafana
    restart: always
    image: grafana/grafana
    ports:
      - '3000:3000'
    networks: [ "microservices" ]


networks:
  microservices:
    name: microservices


The Postgres database schema for this project is:

Postgres database schema

Orders domain REST Controller has the following methods:

Kotlin
 
@RestController
@RequestMapping(path = ["/api/v1/orders"])
class OrderController(private val orderService: OrderService, private val or: ObservationRegistry) {

    @GetMapping
    @Operation(method = "getOrders", summary = "get order with pagination", operationId = "getOrders")
    suspend fun getOrders(
        @RequestParam(name = "page", defaultValue = "0") page: Int,
        @RequestParam(name = "size", defaultValue = "20") size: Int,
    ) = coroutineScopeWithObservation(GET_ORDERS, or) { observation ->
        ResponseEntity.ok()
            .body(orderService.getAllOrders(PageRequest.of(page, size))
                .map { it.toSuccessResponse() }
                .also { response -> observation.highCardinalityKeyValue("response", response.toString()) }
            )
    }

    @GetMapping(path = ["{id}"])
    @Operation(method = "getOrderByID", summary = "get order by id", operationId = "getOrderByID")
    suspend fun getOrderByID(@PathVariable id: String) = coroutineScopeWithObservation(GET_ORDER_BY_ID, or) { observation ->
        ResponseEntity.ok().body(orderService.getOrderWithProductsByID(UUID.fromString(id)).toSuccessResponse())
            .also { response ->
                observation.highCardinalityKeyValue("response", response.toString())
                log.info("getOrderByID response: $response")
            }
    }

    @PostMapping
    @Operation(method = "createOrder", summary = "create new order", operationId = "createOrder")
    suspend fun createOrder(@Valid @RequestBody createOrderDTO: CreateOrderDTO) = coroutineScopeWithObservation(CREATE_ORDER, or) { observation ->
        ResponseEntity.status(HttpStatus.CREATED).body(orderService.createOrder(createOrderDTO.toOrder()).toSuccessResponse())
            .also {
                log.info("created order: $it")
                observation.highCardinalityKeyValue("response", it.toString())
            }
    }

    @PutMapping(path = ["/add/{id}"])
    @Operation(method = "addProductItem", summary = "add to the order product item", operationId = "addProductItem")
    suspend fun addProductItem(
        @PathVariable id: UUID,
        @Valid @RequestBody dto: CreateProductItemDTO
    ) = coroutineScopeWithObservation(ADD_PRODUCT, or) { observation ->
        ResponseEntity.ok().body(orderService.addProductItem(dto.toProductItem(id)))
            .also {
                observation.highCardinalityKeyValue("CreateProductItemDTO", dto.toString())
                observation.highCardinalityKeyValue("id", id.toString())
                log.info("addProductItem id: $id, dto: $dto")
            }
    }

    @PutMapping(path = ["/remove/{orderId}/{productItemId}"])
    @Operation(method = "removeProductItem", summary = "remove product from the order", operationId = "removeProductItem")
    suspend fun removeProductItem(
        @PathVariable orderId: UUID,
        @PathVariable productItemId: UUID
    ) = coroutineScopeWithObservation(REMOVE_PRODUCT, or) { observation ->
        ResponseEntity.ok().body(orderService.removeProductItem(orderId, productItemId))
            .also {
                observation.highCardinalityKeyValue("productItemId", productItemId.toString())
                observation.highCardinalityKeyValue("orderId", orderId.toString())
                log.info("removeProductItem orderId: $orderId, productItemId: $productItemId")
            }
    }

    @PutMapping(path = ["/pay/{id}"])
    @Operation(method = "payOrder", summary = "pay order", operationId = "payOrder")
    suspend fun payOrder(@PathVariable id: UUID, @Valid @RequestBody dto: PayOrderDTO) = coroutineScopeWithObservation(PAY_ORDER, or) { observation ->
        ResponseEntity.ok().body(orderService.pay(id, dto.paymentId).toSuccessResponse())
            .also {
                observation.highCardinalityKeyValue("response", it.toString())
                log.info("payOrder result: $it")
            }
    }

    @PutMapping(path = ["/cancel/{id}"])
    @Operation(method = "cancelOrder", summary = "cancel order", operationId = "cancelOrder")
    suspend fun cancelOrder(@PathVariable id: UUID, @Valid @RequestBody dto: CancelOrderDTO) = coroutineScopeWithObservation(CANCEL_ORDER, or) { observation ->
        ResponseEntity.ok().body(orderService.cancel(id, dto.reason).toSuccessResponse())
            .also {
                observation.highCardinalityKeyValue("response", it.toString())
                log.info("cancelOrder result: $it")
            }
    }

    @PutMapping(path = ["/submit/{id}"])
    @Operation(method = "submitOrder", summary = "submit order", operationId = "submitOrder")
    suspend fun submitOrder(@PathVariable id: UUID) = coroutineScopeWithObservation(SUBMIT_ORDER, or) { observation ->
        ResponseEntity.ok().body(orderService.submit(id).toSuccessResponse())
            .also {
                observation.highCardinalityKeyValue("response", it.toString())
                log.info("submitOrder result: $it")
            }
    }

    @PutMapping(path = ["/complete/{id}"])
    @Operation(method = "completeOrder", summary = "complete order", operationId = "completeOrder")
    suspend fun completeOrder(@PathVariable id: UUID) = coroutineScopeWithObservation(COMPLETE_ORDER, or) { observation ->
        ResponseEntity.ok().body(orderService.complete(id).toSuccessResponse())
            .also {
                observation.highCardinalityKeyValue("response", it.toString())
                log.info("completeOrder result: $it")
            }
    }
}


As I mentioned earlier, the main idea of implementation for the transactional outbox is that in the first step in the one transaction, write to orders and outbox tables and commit the transaction—additionally, but not required, optimization. We can, in the same methods, after successfully committing a transaction, then publish the event and delete it from the outbox table. But here, if any one step of publishing to the broker or deleting from the outbox table fails, it’s ok because we have polling producer as a scheduled process.  It’s small optimization and improvement, and it’s not mandatory to implement an outbox pattern. Try both variants and chose the best for your case. In our case, we are using Kafka, so we have to remember that producers have acks setting,

When acks=0, producers consider messages as “written successfully” the moment the message was sent without waiting for the broker to accept it at all. If the broker goes offline or an exception happens, we won’t know and will lose data, so be careful with this setting and don’t use acks=0.

When acks=1, producers consider messages as “written successfully” when the message was acknowledged by only the leader.

When acks=all, producers consider messages as “written successfully” when the message is accepted by all in-sync replicas (ISR).

responses

In the simplified sequence diagram for service layer business logic, steps 5 and 6 are optional and not required optimization because we have polling publisher anyway:

simplified sequence diagram

The order service implementation:

Kotlin
 
interface OrderService {
    suspend fun createOrder(order: Order): Order
    suspend fun getOrderByID(id: UUID): Order
    suspend fun addProductItem(productItem: ProductItem)
    suspend fun removeProductItem(orderID: UUID, productItemId: UUID)
    suspend fun pay(id: UUID, paymentId: String): Order
    suspend fun cancel(id: UUID, reason: String?): Order
    suspend fun submit(id: UUID): Order
    suspend fun complete(id: UUID): Order

    suspend fun getOrderWithProductsByID(id: UUID): Order
    suspend fun getAllOrders(pageable: Pageable): Page<Order>

    suspend fun deleteOutboxRecordsWithLock()
}
Kotlin
 
@Service
class OrderServiceImpl(
    private val orderRepository: OrderRepository,
    private val productItemRepository: ProductItemRepository,
    private val outboxRepository: OrderOutboxRepository,
    private val orderMongoRepository: OrderMongoRepository,
    private val txOp: TransactionalOperator,
    private val eventsPublisher: EventsPublisher,
    private val kafkaTopicsConfiguration: KafkaTopicsConfiguration,
    private val or: ObservationRegistry,
    private val outboxEventSerializer: OutboxEventSerializer
) : OrderService {

    override suspend fun createOrder(order: Order): Order = coroutineScopeWithObservation(CREATE, or) { observation ->
        txOp.executeAndAwait {
            orderRepository.insert(order).let {
                val productItemsEntityList = ProductItemEntity.listOf(order.productsList(), UUID.fromString(it.id))
                val insertedItems = productItemRepository.insertAll(productItemsEntityList).toList()

                it.addProductItems(insertedItems.map { item -> item.toProductItem() })

                Pair(it, outboxRepository.save(outboxEventSerializer.orderCreatedEventOf(it)))
            }
        }.run {
            observation.highCardinalityKeyValue("order", first.toString())
            observation.highCardinalityKeyValue("outboxEvent", second.toString())

            publishOutboxEvent(second)
            first
        }
    }

    override suspend fun addProductItem(productItem: ProductItem): Unit = coroutineScopeWithObservation(ADD_PRODUCT, or) { observation ->
        txOp.executeAndAwait {
            val order = orderRepository.findOrderByID(UUID.fromString(productItem.orderId))
            order.incVersion()

            val updatedProductItem = productItemRepository.upsert(productItem)

            val savedRecord = outboxRepository.save(
                outboxEventSerializer.productItemAddedEventOf(
                    order,
                    productItem.copy(version = updatedProductItem.version).toEntity()
                )
            )

            orderRepository.updateVersion(UUID.fromString(order.id), order.version)
                .also { result -> log.info("addOrderItem result: $result, version: ${order.version}") }

            savedRecord
        }.run {
            observation.highCardinalityKeyValue("outboxEvent", this.toString())
            publishOutboxEvent(this)
        }
    }

    override suspend fun removeProductItem(orderID: UUID, productItemId: UUID): Unit = coroutineScopeWithObservation(REMOVE_PRODUCT, or) { observation ->
        txOp.executeAndAwait {
            if (!productItemRepository.existsById(productItemId)) throw ProductItemNotFoundException(productItemId)

            val order = orderRepository.findOrderByID(orderID)
            productItemRepository.deleteById(productItemId)

            order.incVersion()

            val savedRecord = outboxRepository.save(outboxEventSerializer.productItemRemovedEventOf(order, productItemId))

            orderRepository.updateVersion(UUID.fromString(order.id), order.version)
                .also { log.info("removeProductItem update order result: $it, version: ${order.version}") }

            savedRecord
        }.run {
            observation.highCardinalityKeyValue("outboxEvent", this.toString())
            publishOutboxEvent(this)
        }
    }

    override suspend fun pay(id: UUID, paymentId: String): Order = coroutineScopeWithObservation(PAY, or) { observation ->
        txOp.executeAndAwait {
            val order = orderRepository.getOrderWithProductItemsByID(id)
            order.pay(paymentId)

            val updatedOrder = orderRepository.update(order)
            Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderPaidEventOf(updatedOrder, paymentId)))
        }.run {
            observation.highCardinalityKeyValue("order", first.toString())
            observation.highCardinalityKeyValue("outboxEvent", second.toString())

            publishOutboxEvent(second)
            first
        }
    }

    override suspend fun cancel(id: UUID, reason: String?): Order = coroutineScopeWithObservation(CANCEL, or) { observation ->
        txOp.executeAndAwait {
            val order = orderRepository.findOrderByID(id)
            order.cancel()

            val updatedOrder = orderRepository.update(order)
            Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderCancelledEventOf(updatedOrder, reason)))
        }.run {
            observation.highCardinalityKeyValue("order", first.toString())
            observation.highCardinalityKeyValue("outboxEvent", second.toString())

            publishOutboxEvent(second)
            first
        }
    }

    override suspend fun submit(id: UUID): Order = coroutineScopeWithObservation(SUBMIT, or) { observation ->
        txOp.executeAndAwait {
            val order = orderRepository.getOrderWithProductItemsByID(id)
            order.submit()

            val updatedOrder = orderRepository.update(order)
            updatedOrder.addProductItems(order.productsList())

            Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderSubmittedEventOf(updatedOrder)))
        }.run {
            observation.highCardinalityKeyValue("order", first.toString())
            observation.highCardinalityKeyValue("outboxEvent", second.toString())

            publishOutboxEvent(second)
            first
        }
    }

    override suspend fun complete(id: UUID): Order = coroutineScopeWithObservation(COMPLETE, or) { observation ->
        txOp.executeAndAwait {
            val order = orderRepository.findOrderByID(id)
            order.complete()

            val updatedOrder = orderRepository.update(order)
            log.info("order submitted: ${updatedOrder.status} for id: $id")

            Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderCompletedEventOf(updatedOrder)))
        }.run {
            observation.highCardinalityKeyValue("order", first.toString())
            observation.highCardinalityKeyValue("outboxEvent", second.toString())

            publishOutboxEvent(second)
            first
        }
    }

    @Transactional(readOnly = true)
    override suspend fun getOrderWithProductsByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_WITH_PRODUCTS_BY_ID, or) { observation ->
        orderRepository.getOrderWithProductItemsByID(id).also { observation.highCardinalityKeyValue("order", it.toString()) }
    }

    override suspend fun getAllOrders(pageable: Pageable): Page<Order> = coroutineScopeWithObservation(GET_ALL_ORDERS, or) { observation ->
        orderMongoRepository.getAllOrders(pageable).also { observation.highCardinalityKeyValue("pageResult", it.toString()) }
    }

    override suspend fun deleteOutboxRecordsWithLock() = coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_WITH_LOCK, or) { observation ->
        outboxRepository.deleteOutboxRecordsWithLock {
            observation.highCardinalityKeyValue("outboxEvent", it.toString())
            eventsPublisher.publish(getTopicName(it.eventType), it)
        }
    }

    override suspend fun getOrderByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_BY_ID, or) { observation ->
        orderMongoRepository.getByID(id.toString())
            .also { log.info("getOrderByID: $it") }
            .also { observation.highCardinalityKeyValue("order", it.toString()) }
    }

    private suspend fun publishOutboxEvent(event: OutboxRecord) = coroutineScopeWithObservation(PUBLISH_OUTBOX_EVENT, or) { observation ->
        try {
            log.info("publishing outbox event: $event")

            outboxRepository.deleteOutboxRecordByID(event.eventId!!) {
                eventsPublisher.publish(getTopicName(event.eventType), event.aggregateId.toString(), event)
            }

            log.info("outbox event published and deleted: $event")
            observation.highCardinalityKeyValue("event", event.toString())
        } catch (ex: Exception) {
            log.error("exception while publishing outbox event: ${ex.localizedMessage}")
            observation.error(ex)
        }
    }
}

Zipkin

Apache Kafka

Order and product items Postgres repositories are a combination of CoroutineCrudRepository and custom implementation using DatabaseClient and R2dbcEntityTemplate, supporting optimistic and pessimistic locking, depending on method requirements.

Kotlin
 
@Repository
interface OrderRepository : CoroutineCrudRepository<OrderEntity, UUID>, OrderBaseRepository

@Repository
interface OrderBaseRepository {
    suspend fun getOrderWithProductItemsByID(id: UUID): Order
    suspend fun updateVersion(id: UUID, newVersion: Long): Long
    suspend fun findOrderByID(id: UUID): Order
    suspend fun insert(order: Order): Order
    suspend fun update(order: Order): Order
}

@Repository
class OrderBaseRepositoryImpl(
    private val dbClient: DatabaseClient,
    private val entityTemplate: R2dbcEntityTemplate,
    private val or: ObservationRegistry
) : OrderBaseRepository {

    override suspend fun updateVersion(id: UUID, newVersion: Long): Long = coroutineScopeWithObservation(UPDATE_VERSION, or) { observation ->
        dbClient.sql("UPDATE microservices.orders SET version = (version + 1) WHERE id = :id AND version = :version")
            .bind(ID, id)
            .bind(VERSION, newVersion - 1)
            .fetch()
            .rowsUpdated()
            .awaitSingle()
            .also { log.info("for order with id: $id version updated to $newVersion") }
            .also {
                observation.highCardinalityKeyValue("id", id.toString())
                observation.highCardinalityKeyValue("newVersion", newVersion.toString())
            }
    }

    override suspend fun getOrderWithProductItemsByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_WITH_PRODUCTS_BY_ID, or) { observation ->
        dbClient.sql(
            """SELECT o.id, o.email, o.status, o.address, o.version, o.payment_id, o.created_at, o.updated_at, 
            |pi.id as productId, pi.price, pi.title, pi.quantity, pi.order_id, pi.version as itemVersion, pi.created_at as itemCreatedAt, pi.updated_at as itemUpdatedAt
            |FROM microservices.orders o 
            |LEFT JOIN microservices.product_items pi on o.id = pi.order_id 
            |WHERE o.id = :id""".trimMargin()
        )
            .bind(ID, id)
            .map { row, _ -> Pair(OrderEntity.of(row), ProductItemEntity.of(row)) }
            .flow()
            .toList()
            .let { orderFromList(it) }
            .also {
                log.info("getOrderWithProductItemsByID order: $it")
                observation.highCardinalityKeyValue("order", it.toString())
            }
    }

    override suspend fun findOrderByID(id: UUID): Order = coroutineScopeWithObservation(FIND_ORDER_BY_ID, or) { observation ->
        val query = Query.query(Criteria.where(ID).`is`(id))
        entityTemplate.selectOne(query, OrderEntity::class.java).awaitSingleOrNull()?.toOrder()
            .also { observation.highCardinalityKeyValue("order", it.toString()) }
            ?: throw OrderNotFoundException(id)
    }

    override suspend fun insert(order: Order): Order = coroutineScopeWithObservation(INSERT, or) { observation ->
        entityTemplate.insert(order.toEntity()).awaitSingle().toOrder()
            .also {
                log.info("inserted order: $it")
                observation.highCardinalityKeyValue("order", it.toString())
            }
    }

    override suspend fun update(order: Order): Order = coroutineScopeWithObservation(UPDATE, or) { observation ->
        entityTemplate.update(order.toEntity()).awaitSingle().toOrder()
            .also {
                log.info("updated order: $it")
                observation.highCardinalityKeyValue("order", it.toString())
            }
    }
}


Kotlin
 
interface ProductItemBaseRepository {
    suspend fun insert(productItemEntity: ProductItemEntity): ProductItemEntity
    suspend fun insertAll(productItemEntities: List<ProductItemEntity>): List<ProductItemEntity>
    suspend fun upsert(productItem: ProductItem): ProductItem
}

@Repository
class ProductItemBaseRepositoryImpl(
    private val entityTemplate: R2dbcEntityTemplate,
    private val or: ObservationRegistry,
) : ProductItemBaseRepository {

    override suspend fun upsert(productItem: ProductItem): ProductItem = coroutineScopeWithObservation(UPDATE, or) { observation ->
        val query = Query.query(
            Criteria.where("id").`is`(UUID.fromString(productItem.id))
                .and("order_id").`is`(UUID.fromString(productItem.orderId))
        )
        
        val product = entityTemplate.selectOne(query, ProductItemEntity::class.java).awaitSingleOrNull()
        if (product != null) {
            val update = Update
                .update("quantity", (productItem.quantity + product.quantity))
                .set("version", product.version + 1)
                .set("updated_at", LocalDateTime.now())
            
            val updatedProduct = product.copy(quantity = (productItem.quantity + product.quantity), version = product.version + 1)
            val updateResult = entityTemplate.update(query, update, ProductItemEntity::class.java).awaitSingle()
            log.info("updateResult product: $updateResult")
            log.info("updateResult updatedProduct: $updatedProduct")
            return@coroutineScopeWithObservation updatedProduct.toProductItem()
        }
        
        entityTemplate.insert(ProductItemEntity.of(productItem)).awaitSingle().toProductItem()
            .also { productItem ->
                log.info("saved productItem: $productItem")
                observation.highCardinalityKeyValue("productItem", productItem.toString())
            }
    }

    override suspend fun insert(productItemEntity: ProductItemEntity): ProductItemEntity = coroutineScopeWithObservation(INSERT, or) { observation ->
        val product = entityTemplate.insert(productItemEntity).awaitSingle()

        log.info("saved product: $product")
        observation.highCardinalityKeyValue("product", product.toString())
        product
    }

    override suspend fun insertAll(productItemEntities: List<ProductItemEntity>) = coroutineScopeWithObservation(INSERT_ALL, or) { observation ->
        val result = productItemEntities.map { entityTemplate.insert(it) }.map { it.awaitSingle() }
        log.info("inserted product items: $result")
        observation.highCardinalityKeyValue("result", result.toString())
        result
    }
}


The important detail here is to be able to handle the case of multiple pod instances processing in a parallel outbox table. We have idempotent consumers, but we have to avoid processing the same table events more than one time. To prevent multiple instances select and publish the same events, we use  FOR UPDATE SKIP LOCKED.This combination tries to select a batch of outbox events. If some other instance has already selected these records, first, one will skip locked records and select the next available and not locked, and so on.
Select 1 and 2

Kotlin
 
@Repository
interface OutboxBaseRepository {
    suspend fun deleteOutboxRecordByID(id: UUID, callback: suspend () -> Unit): Long
    suspend fun deleteOutboxRecordsWithLock(callback: suspend (outboxRecord: OutboxRecord) -> Unit)
}

class OutboxBaseRepositoryImpl(
    private val dbClient: DatabaseClient,
    private val txOp: TransactionalOperator,
    private val or: ObservationRegistry,
    private val transactionalOperator: TransactionalOperator
) : OutboxBaseRepository {

    override suspend fun deleteOutboxRecordByID(id: UUID, callback: suspend () -> Unit): Long =
        coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_BY_ID, or) { observation ->
            withTimeout(DELETE_OUTBOX_RECORD_TIMEOUT_MILLIS) {
                txOp.executeAndAwait {

                    callback()

                    dbClient.sql("DELETE FROM microservices.outbox_table WHERE event_id = :eventId")
                        .bind("eventId", id)
                        .fetch()
                        .rowsUpdated()
                        .awaitSingle()
                        .also {
                            log.info("outbox event with id: $it deleted")
                            observation.highCardinalityKeyValue("id", it.toString())
                        }
                }
            }
        }

    override suspend fun deleteOutboxRecordsWithLock(callback: suspend (outboxRecord: OutboxRecord) -> Unit) =
        coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_WITH_LOCK, or) { observation ->
            withTimeout(DELETE_OUTBOX_RECORD_TIMEOUT_MILLIS) {
                txOp.executeAndAwait {

                    dbClient.sql("SELECT * FROM microservices.outbox_table ORDER BY timestamp ASC LIMIT 10 FOR UPDATE SKIP LOCKED")
                        .map { row, _ -> OutboxRecord.of(row) }
                        .flow()
                        .onEach {
                            log.info("deleting outboxEvent with id: ${it.eventId}")

                            callback(it)

                            dbClient.sql("DELETE FROM microservices.outbox_table WHERE event_id = :eventId")
                                .bind("eventId", it.eventId!!)
                                .fetch()
                                .rowsUpdated()
                                .awaitSingle()

                            log.info("outboxEvent with id: ${it.eventId} published and deleted")
                            observation.highCardinalityKeyValue("eventId", it.eventId.toString())
                        }
                        .collect()
                }
            }
        }
}


The polling producer implementation is a scheduled process that does the same job for publishing and deleting events at the given interval as typed earlier and uses the same service method:

Kotlin
 
@Component
@ConditionalOnProperty(prefix = "schedulers", value = ["outbox.enable"], havingValue = "true")
class OutboxScheduler(private val orderService: OrderService, private val or: ObservationRegistry) {

    @Scheduled(initialDelayString = "\${schedulers.outbox.initialDelayMillis}", fixedRateString = "\${schedulers.outbox.fixedRate}")
    fun publishAndDeleteOutboxRecords() = runBlocking {
        coroutineScopeWithObservation(PUBLISH_AND_DELETE_OUTBOX_RECORDS, or) {
            log.debug("starting scheduled outbox table publishing")
            orderService.deleteOutboxRecordsWithLock()
            log.debug("completed scheduled outbox table publishing")
        }
    }

    companion object {
        private val log = LoggerFactory.getLogger(OutboxScheduler::class.java)
        private const val PUBLISH_AND_DELETE_OUTBOX_RECORDS = "OutboxScheduler.publishAndDeleteOutboxRecords"
    }
}


Usually, the transactional outbox is more often required to guarantee data consistency between microservices. Here, for example, consumers in the same microservice process it and save it to MongoDB. The one more important detail here, as we’re processing Kafka events in multiple consumer processes, possible use cases when the order of the events processing can be randomized. In Kafka, we have key features, and it helps us because it sends messages with the same key to one partition. But if the broker has not had this feature, we have to handle it manually. Cases when, for example, first, some of the consumers are trying to process event #6 before events #4 and #5 were processed. For this reason, have a domain entity version field in outbox events, so we can simply look at the version and validate if in our database we have order version #3, but now processing event with version #6, we need first wait for #4,#5 and process them first, but of course, these details depend on each concrete business logic of the application, here shows only the idea that it’s a possible case. And one more important detail — is to retry topics. If we need to retry the process of the messages, better to create a retry topic and process retry here, how much time to retry, and other advanced logic detail depending on your concrete case. In the example, we have two listeners. Where one of them is for retry topic message processing:

Kotlin
 
@Component
class OrderConsumer(
    private val kafkaTopicsConfiguration: KafkaTopicsConfiguration,
    private val serializer: Serializer,
    private val eventsPublisher: EventsPublisher,
    private val orderEventProcessor: OrderEventProcessor,
    private val or: ObservationRegistry,
) {

    @KafkaListener(
        groupId = "\${kafka.consumer-group-id:order-service-group-id}",
        topics = [
            "\${topics.orderCreated.name}",
            "\${topics.productAdded.name}",
            "\${topics.productRemoved.name}",
            "\${topics.orderPaid.name}",
            "\${topics.orderCancelled.name}",
            "\${topics.orderSubmitted.name}",
            "\${topics.orderCompleted.name}",
        ],
        id = "orders-consumer"
    )
    fun process(ack: Acknowledgment, consumerRecord: ConsumerRecord<String, ByteArray>) = runBlocking {
        coroutineScopeWithObservation(PROCESS, or) { observation ->
            try {
                observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))

                processOutboxRecord(serializer.deserialize(consumerRecord.value(), OutboxRecord::class.java))
                ack.acknowledge()

                log.info("committed record: ${getConsumerRecordInfo(consumerRecord)}")
            } catch (ex: Exception) {
                observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))
                observation.error(ex)

                if (ex is SerializationException || ex is UnknownEventTypeException || ex is AlreadyProcessedVersionException) {
                    log.error("ack not serializable, unknown or already processed record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
                    ack.acknowledge()
                    return@coroutineScopeWithObservation
                }

                if (ex is InvalidVersionException || ex is NoSuchElementException || ex is OrderNotFoundException) {
                    publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, 1)
                    ack.acknowledge()
                    log.warn("ack concurrency write or version exception ${ex.localizedMessage}")
                    return@coroutineScopeWithObservation
                }

                publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, 1)
                ack.acknowledge()
                log.error("ack exception while processing record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}", ex)
            }
        }
    }


    @KafkaListener(groupId = "\${kafka.consumer-group-id:order-service-group-id}", topics = ["\${topics.retryTopic.name}"], id = "orders-retry-consumer")
    fun processRetry(ack: Acknowledgment, consumerRecord: ConsumerRecord<String, ByteArray>): Unit = runBlocking {
        coroutineScopeWithObservation(PROCESS_RETRY, or) { observation ->
            try {
                log.warn("processing retry topic record >>>>>>>>>>>>> : ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
                observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))

                processOutboxRecord(serializer.deserialize(consumerRecord.value(), OutboxRecord::class.java))
                ack.acknowledge()

                log.info("committed retry record: ${getConsumerRecordInfo(consumerRecord)}")
            } catch (ex: Exception) {
                observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))
                observation.error(ex)

                val currentRetry = String(consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER).value()).toInt()
                observation.highCardinalityKeyValue("currentRetry", currentRetry.toString())

                if (ex is InvalidVersionException || ex is NoSuchElementException || ex is OrderNotFoundException) {
                    publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, currentRetry)
                    log.warn("ack concurrency write or version exception ${ex.localizedMessage},record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
                    ack.acknowledge()
                    return@coroutineScopeWithObservation
                }

                if (currentRetry > MAX_RETRY_COUNT) {
                    publishRetryTopic(kafkaTopicsConfiguration.deadLetterQueue.name, consumerRecord, currentRetry + 1)
                    ack.acknowledge()
                    log.error("MAX_RETRY_COUNT exceed, send record to DLQ: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
                    return@coroutineScopeWithObservation
                }

                if (ex is SerializationException || ex is UnknownEventTypeException || ex is AlreadyProcessedVersionException) {
                    ack.acknowledge()
                    log.error("commit not serializable, unknown or already processed record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
                    return@coroutineScopeWithObservation
                }

                log.error("exception while processing: ${ex.localizedMessage}, record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
                publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, currentRetry + 1)
                ack.acknowledge()
            }
        }
    }


    private suspend fun publishRetryTopic(topic: String, record: ConsumerRecord<String, ByteArray>, retryCount: Int) =
        coroutineScopeWithObservation(PUBLISH_RETRY_TOPIC, or) { observation ->
            observation.highCardinalityKeyValue("topic", record.topic())
                .highCardinalityKeyValue("key", record.key())
                .highCardinalityKeyValue("offset", record.offset().toString())
                .highCardinalityKeyValue("value", String(record.value()))
                .highCardinalityKeyValue("retryCount", retryCount.toString())

            record.headers().remove(RETRY_COUNT_HEADER)
            record.headers().add(RETRY_COUNT_HEADER, retryCount.toString().toByteArray())

            mono { publishRetryRecord(topic, record, retryCount) }
                .retryWhen(Retry.backoff(PUBLISH_RETRY_COUNT, Duration.ofMillis(PUBLISH_RETRY_BACKOFF_DURATION_MILLIS))
                    .filter { it is SerializationException })
                .awaitSingle()
        }
}


The role of the orders events processor at this microservice is validating the version of the events and updating MongoDB:

microservice: product-added send

console_4

Kotlin
 
interface OrderEventProcessor {
    suspend fun on(orderCreatedEvent: OrderCreatedEvent)
    suspend fun on(productItemAddedEvent: ProductItemAddedEvent)
    suspend fun on(productItemRemovedEvent: ProductItemRemovedEvent)
    suspend fun on(orderPaidEvent: OrderPaidEvent)
    suspend fun on(orderCancelledEvent: OrderCancelledEvent)
    suspend fun on(orderSubmittedEvent: OrderSubmittedEvent)
    suspend fun on(orderCompletedEvent: OrderCompletedEvent)
}

@Service
class OrderEventProcessorImpl(
    private val orderMongoRepository: OrderMongoRepository,
    private val or: ObservationRegistry,
) : OrderEventProcessor {

    override suspend fun on(orderCreatedEvent: OrderCreatedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_CREATED_EVENT, or) { observation ->
        orderMongoRepository.insert(orderCreatedEvent.order).also {
            log.info("created order: $it")
            observation.highCardinalityKeyValue("order", it.toString())
        }
    }

    override suspend fun on(productItemAddedEvent: ProductItemAddedEvent): Unit =
        coroutineScopeWithObservation(ON_ORDER_PRODUCT_ADDED_EVENT, or) { observation ->
            val order = orderMongoRepository.getByID(productItemAddedEvent.orderId)
            validateVersion(order.id, order.version, productItemAddedEvent.version)

            order.addProductItem(productItemAddedEvent.productItem)
            order.version = productItemAddedEvent.version

            orderMongoRepository.update(order).also {
                log.info("productItemAddedEvent updatedOrder: $it")
                observation.highCardinalityKeyValue("order", it.toString())
            }
        }

    override suspend fun on(productItemRemovedEvent: ProductItemRemovedEvent): Unit =
        coroutineScopeWithObservation(ON_ORDER_PRODUCT_REMOVED_EVENT, or) { observation ->
            val order = orderMongoRepository.getByID(productItemRemovedEvent.orderId)
            validateVersion(order.id, order.version, productItemRemovedEvent.version)

            order.removeProductItem(productItemRemovedEvent.productItemId)
            order.version = productItemRemovedEvent.version

            orderMongoRepository.update(order).also {
                log.info("productItemRemovedEvent updatedOrder: $it")
                observation.highCardinalityKeyValue("order", it.toString())
            }
        }

    override suspend fun on(orderPaidEvent: OrderPaidEvent): Unit = coroutineScopeWithObservation(ON_ORDER_PAID_EVENT, or) { observation ->
        val order = orderMongoRepository.getByID(orderPaidEvent.orderId)
        validateVersion(order.id, order.version, orderPaidEvent.version)

        order.pay(orderPaidEvent.paymentId)
        order.version = orderPaidEvent.version

        orderMongoRepository.update(order).also {
            log.info("orderPaidEvent updatedOrder: $it")
            observation.highCardinalityKeyValue("order", it.toString())
        }
    }

    override suspend fun on(orderCancelledEvent: OrderCancelledEvent): Unit = coroutineScopeWithObservation(ON_ORDER_CANCELLED_EVENT, or) { observation ->
        val order = orderMongoRepository.getByID(orderCancelledEvent.orderId)
        validateVersion(order.id, order.version, orderCancelledEvent.version)

        order.cancel()
        order.version = orderCancelledEvent.version

        orderMongoRepository.update(order).also {
            log.info("orderCancelledEvent updatedOrder: $it")
            observation.highCardinalityKeyValue("order", it.toString())
        }
    }

    override suspend fun on(orderSubmittedEvent: OrderSubmittedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_SUBMITTED_EVENT, or) { observation ->
        val order = orderMongoRepository.getByID(orderSubmittedEvent.orderId)
        validateVersion(order.id, order.version, orderSubmittedEvent.version)

        order.submit()
        order.version = orderSubmittedEvent.version

        orderMongoRepository.update(order).also {
            log.info("orderSubmittedEvent updatedOrder: $it")
            observation.highCardinalityKeyValue("order", it.toString())
        }
    }

    override suspend fun on(orderCompletedEvent: OrderCompletedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_COMPLETED_EVENT, or) { observation ->
        val order = orderMongoRepository.getByID(orderCompletedEvent.orderId)
        validateVersion(order.id, order.version, orderCompletedEvent.version)

        order.complete()
        order.version = orderCompletedEvent.version

        orderMongoRepository.update(order).also {
            log.info("orderCompletedEvent updatedOrder: $it")
            observation.highCardinalityKeyValue("order", it.toString())
        }
    }

    private fun validateVersion(id: Any, currentDomainVersion: Long, eventVersion: Long) {
        log.info("validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion")
        if (currentDomainVersion >= eventVersion) {
            log.warn("currentDomainVersion >= eventVersion validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion")
            throw AlreadyProcessedVersionException(id, eventVersion)
        }
        if ((currentDomainVersion + 1) < eventVersion) {
            log.warn("currentDomainVersion + 1) < eventVersion validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion")
            throw InvalidVersionException(eventVersion)
        }
    }
}


The MongoDB repository code is quite simple:

Kotlin
 
interface OrderMongoRepository {
    suspend fun insert(order: Order): Order
    suspend fun update(order: Order): Order
    suspend fun getByID(id: String): Order
    suspend fun getAllOrders(pageable: Pageable): Page<Order>
}

@Repository
class OrderMongoRepositoryImpl(
    private val mongoTemplate: ReactiveMongoTemplate,
    private val or: ObservationRegistry,
) : OrderMongoRepository {

    override suspend fun insert(order: Order): Order = coroutineScopeWithObservation(INSERT, or) { observation ->
        withContext(Dispatchers.IO) {
            mongoTemplate.insert(OrderDocument.of(order)).awaitSingle().toOrder()
                .also { log.info("inserted order: $it") }
                .also { observation.highCardinalityKeyValue("order", it.toString()) }
        }
    }

    override suspend fun update(order: Order): Order = coroutineScopeWithObservation(UPDATE, or) { observation ->
        withContext(Dispatchers.IO) {
            val query = Query.query(Criteria.where(ID).`is`(order.id).and(VERSION).`is`(order.version - 1))

            val update = Update()
                .set(EMAIL, order.email)
                .set(ADDRESS, order.address)
                .set(STATUS, order.status)
                .set(VERSION, order.version)
                .set(PAYMENT_ID, order.paymentId)
                .set(PRODUCT_ITEMS, order.productsList())

            val options = FindAndModifyOptions.options().returnNew(true).upsert(false)
            val updatedOrderDocument = mongoTemplate.findAndModify(query, update, options, OrderDocument::class.java)
                .awaitSingleOrNull() ?: throw OrderNotFoundException(order.id.toUUID())

            observation.highCardinalityKeyValue("order", updatedOrderDocument.toString())
            updatedOrderDocument.toOrder().also { orderDocument -> log.info("updated order: $orderDocument") }
        }
    }

    override suspend fun getByID(id: String): Order = coroutineScopeWithObservation(GET_BY_ID, or) { observation ->
        withContext(Dispatchers.IO) {
            mongoTemplate.findById(id, OrderDocument::class.java).awaitSingle().toOrder()
                .also { log.info("found order: $it") }
                .also { observation.highCardinalityKeyValue("order", it.toString()) }
        }
    }

    override suspend fun getAllOrders(pageable: Pageable): Page<Order> = coroutineScopeWithObservation(GET_ALL_ORDERS, or) { observation ->
        withContext(Dispatchers.IO) {
            val query = Query().with(pageable)
            val data = async { mongoTemplate.find(query, OrderDocument::class.java).collectList().awaitSingle() }.await()
            val count = async { mongoTemplate.count(Query(), OrderDocument::class.java).awaitSingle() }.await()
            PageableExecutionUtils.getPage(data.map { it.toOrder() }, pageable) { count }
                .also { observation.highCardinalityKeyValue("pageResult", it.pageable.toString()) }
        }
    }
}


More details and source code of the full project you can find here in the GitHub repository. In real-world applications, we have to implement many more necessary features, like k8s health checks, rate limiters, etc. Depending on the project, it can be implemented in different ways. For example, you can use Kubernetes and Istio for some of them. I hope this article is useful and helpful, and am happy to receive any feedback or questions. Feel free to contact me by email or any messengers :)

Database Kotlin (programming language) microservice Change data capture Java (programming language)

Published at DZone with permission of Alexander Bryksin. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Scaling Java Microservices to Extreme Performance Using NCache
  • Testcontainers With Kotlin and Spring Data R2DBC
  • Build a Java Microservice With AuraDB Free
  • Ultra-Fast Microservices: When MicroStream Meets Helidon

Partner Resources


Comments

ABOUT US

  • About DZone
  • Send feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: