Kafka Sink Connector : Sharded MongoDB Cluster

Interested in learning the process of migrating data from CosmosDB to Atlas? This article spells out the tasks and challenges you will face when migrating substantial size datasets. A demonstrational video is provided for people to follow along with when migrating their own datasets.

Recently, I was working on an IoT data migration project that involved moving data from CosmosDB to Atlas. Initial snapshotting of around ~1TB of data took around a day, and while this was in progress, the ongoing changes were streamed using change capture to a Kafka topic.

The change stream was capturing 12K messages per second on average, and there were occasional spikes in the messages during the day. Before the initial snapshotting was completed, the topic had accumulated approximately 20 million messages.

Below are some of the challenges encountered during the migration.

Challenges

  1. Sink the accumulated messages from the topic to target cluster
  2. Identify the correct write model strategy in the Sink connector, so the document could be upserted.
  3. The default ReplaceDocument write model strategy would not work on the sharded cluster
  4. Parallelise the Sink Connector processing, so we could process both accumulated and ongoing changes in parallel.

Further in this article, we will see how we solve these challenges.

Partitioned Write

The change stream application was modified to create a topic with 10 partitions. Below is a snapshot of the code showcasing the number of partitions required while creating the topic.

The decision on the number of partition can be defined based on the number of messages streamed per second and the amount of parallelisation required while sinking

Below is a snapshot of topic(“POC-TEST-POCDB-POCCOLL”) with 10 partitions.

When a producer is producing a message, it will specify the topic it wants to send the message to. Does it care about partitions?

The producer will decide on which target partition to place any message, depending on:

  • Partition id, if it’s specified within the message
  • key % num partitions, if no partition id is mentioned
  • Round robin if neither partition id nor message key is available in the message means only the value is available

Create a record to be sent to the Kafka partition topic. The produce API take 3 arguments

Parameters:

topic – The topic the record will be appended to

key – The key that will be included in the record

value – The record contents

Because key is included in messages with no partition, messages with the same key will always end up in the same partition. Below is a snapshot of messages distributed across partition in a topic

Sharded Atlas Cluster & Sink Connector

The target Atlas cluster was sharded using the “vin” and “setOn” properties from the message. Below is a snapshot of a sharded collection.

sh.enableSharding(“POCDB”);

db.POCCOLL.createIndex({“vin”:1,”setOn”:1})

sh.shardCollection(“POCDB.POCCOLL”,{“vin”:1,”setOn”:1});

Below is a sample of a message with the sharded keys.

Sink Connector Configuration

The MongoDB Kafka sink connector is a Kafka Connect connector that reads data from Apache Kafka and writes data to MongoDB. The sink connector has different write model strategies. More details about the various strategies can be found here

The default write model strategy (ReplaceOneDefaultStrategy) seems to work for a normal collection, but for a sharded collection I was encountering the below error.

Write errors: [BulkWriteError\{index=0, code=61, message=’Failed to target upsert by query :: could not extract exact shard key’, details={}}].

My intent was to make it work in the same manner as the default ReplaceOneDefaultStrategy. And below configuration was required to get that similar behavior

“writemodel.strategy”:”com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy”,
“document.id.strategy”: “com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy”,
“document.id.strategy.overwrite.existing”: “true”,
“document.id.strategy.partial.value.projection.type”: “allowlist”,
“document.id.strategy.partial.value.projection.list”: “_id,vin, setOn”

Once the connector is set up, the consumer starts to process the messages that are left behind. In the below snapshot, we see 227K messages left behind to sink.

With only one task in the sink connector, it was sinking the messages at a much lower rate. Since we have 10 partitions in the topic, we can parallelize by creating up-to 10 tasks.

In the below snapshot, the sink connector was configured with five tasks, enabling a partition to be read in parallel by five different tasks.

Below is a snapshot showing the messages being written in the target Atlas cluster.

The below snapshot shows all the pending messages have been processed, and now the production and consumption rates are almost the same, enabling real time sync.

The image below depicts how messages in the target are constantly synced.

Demo

A demo of the complete article covering topic creation, sink connector configuration, and connectors with multiple tasks

Subheader 1
Text Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna.Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna.


Subheader 2
Text Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna.

Subheader 3
Text Lorem Ipsum Dolor Sit Amet, Consetetur Sadipscing Elitr, Sed Diam Nonumy Eirmod Tempor Invidunt Ut Labore Et Dolore Magna Aliquyam Erat, Sed Diam Voluptua. At Vero Eos Et Accusam Et Justo Duo Dolores Et Ea Rebum. Stet Clita Kasd Gubergren, No Sea Takimata Sanctus Est Lorem Ipsum Dolor Sit Amet. Lorem Ipsum Dolor Sit Amet, Consetetur Sadipscing Elitr, Sed Diam
Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna.

Rajesh Vinayagam
Distinguished Architect

Share this Article

Latest Insights

Tag Cloud

Share this Article