71713

How to pull the data from remote database in Apache Kafka?

Question:

I want to make real-time data pipeline in Apache Kafka. I have database which is located at remote location and that database continuously updating. Can anybody which Kafka connect API i should use to pull the data from database and ingest into Kafka broker in real time? later on i would use kafka stream and KSQL to run ad-hoc queries to perform the metrics.

Any help would be highly appreciated!

Answer1:

If you want to create a real-time data pipeline you need to use a Change Data Capture (CDC) tool which is able to stream changes from MySQL. I would suggest <a href="https://debezium.io/" rel="nofollow">Debezium</a> which is an open source distributed platform for change data capture.

<strong>Capturing Inserts</strong>

When a new record is added to a table, a JSON similar to the one below will be produced:

{ "payload":{ "before":null, "after":{ "id":1005, "first_name":"Giorgos", "last_name":"Myrianthous", "email":"giorgos@abc.com" }, "source":{ "name":"dbserver1", "server_id":223344, "ts_sec":1500369632, "gtid":null, "file":"mysql-bin.000003", "pos":364, "row":0, "snapshot":null, "thread":13, "db":"inventory", "table":"customers" }, "op":"c", "ts_ms":1500369632095 } }

before object is null and after object contains the newly inserted values. Note that the op attribute is c, indicating that this was a CREATE event.

<strong>Capturing Updates</strong>

Assuming that email attribute has been updated, a JSON similar to the one below will be produced:

{ "payload":{ "before":{ "id":1005, "first_name":"Giorgos", "last_name":"Myrianthous", "email":"giorgos@abc.com" }, "after":{ "id":1005, "first_name":"Giorgos", "last_name":"Myrianthous", "email":"newEmail@abc.com" }, "source":{ "name":"dbserver1", "server_id":223344, "ts_sec":1500369929, "gtid":null, "file":"mysql-bin.000003", "pos":673, "row":0, "snapshot":null, "thread":13, "db":"inventory", "table":"customers" }, "op":"u", "ts_ms":1500369929464 } }

Notice op which is now u, indicating that this was an UPDATE event. before object shows the row state before the update and after object captures the current state of the updated row.

<strong>Capturing deletes</strong>

Now assume that the row has been deleted;

{ "payload":{ "before":{ "id":1005, "first_name":"Giorgos", "last_name":"Myrianthous", "email":"newEmail@abc.com" }, "after":null, "source":{ "name":"dbserver1", "server_id":223344, "ts_sec":1500370394, "gtid":null, "file":"mysql-bin.000003", "pos":1025, "row":0, "snapshot":null, "thread":13, "db":"inventory", "table":"customers" }, "op":"d", "ts_ms":1500370394589 } }

op new is equal to d, indicating a DELETE event. after attribute will be null and before object contains the row before it gets deleted.

You can also have a look at the <a href="https://debezium.io/docs/tutorial/" rel="nofollow">extensive tutorial</a> provided in their website.

<strong>EDIT:</strong> <a href="https://debezium.io/docs/connectors/mysql/#example-configuration" rel="nofollow">Example configuration</a> for a MySQL database

{ "name": "inventory-connector", (1) "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", (2) "database.hostname": "192.168.99.100", (3) "database.port": "3306", (4) "database.user": "debezium", (5) "database.password": "dbz", (6) "database.server.id": "184054", (7) "database.server.name": "fullfillment", (8) "database.whitelist": "inventory", (9) "database.history.kafka.bootstrap.servers": "kafka:9092", (10) "database.history.kafka.topic": "dbhistory.fullfillment" (11) "include.schema.changes": "true" (12) } } <blockquote>

1 The name of our connector when we register it with a Kafka Connect service. <br /> 2 The name of this MySQL connector class. <br />3 The address of the MySQL server. <br />4 The port number of the MySQL server. <br />5 The name of the MySQL user that has the required privileges. <br />6 The password for the MySQL user that has the required privileges. <br />7 The connector’s identifier that must be unique within the MySQL cluster and similar to MySQL’s server-id configuration property. <br />8 The logical name of the MySQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. <br />9 A list of all databases hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the databases and tables to include or exclude from monitoring. <br />10 The list of Kafka brokers that this connector will use to write and recover DDL statements to the database history topic. <br />11 The name of the database history topic where the connector will write and recover DDL statements. This topic is for internal use only and should not be used by consumers. <br />12 The flag specifying that the connector should generate on the schema change topic named fullfillment events with the DDL changes that can be used by consumers.

</blockquote>

Answer2:

If you're reading from a MySQL database use Confluent's JDBC Source connector. <a href="https://github.com/confluentinc/kafka-connect-jdbc/" rel="nofollow">https://github.com/confluentinc/kafka-connect-jdbc/</a> You'll also need to download the MYSQL driver and put it with the kafka jars: <a href="https://dev.mysql.com/downloads/connector/j/5.1.html" rel="nofollow">https://dev.mysql.com/downloads/connector/j/5.1.html</a>

Recommend

  • MySQL first free number between exists values
  • Bandwidth summary per server
  • How to rank a record with same weight in MySQL
  • Convert Oracle legacy outer join to Ansi SQL
  • Give wx.StaticBitmap a transparent background? wxpython
  • C++ how to get substring after get position of its index
  • Using extern @class in order to add a category?
  • Replace and retrieve placeholder value
  • Getting error java.io.FileNotFoundException (log4j log file) at the time of publish project on cloud
  • Setting the run time properties on SpringApplicationBuilder()
  • nodemcu custom firmware build problems
  • Android Studio Can't Find tools.jar
  • Convert SQLite database to XML
  • Subclassing QGraphicsItem prevents me from being able to use itemAt() on a QGraphicsScene/View
  • Eclipse CDT error: Unable to compile
  • How to create a file in java without a extension
  • Typescript - Unable to get 'import' statement to function
  • Spark fat jar to run multiple versions on YARN
  • Extracting HTML between tags
  • Python CGI os.system causing malformed header
  • Ajax Loaded meta Tags
  • Exchange data b/w iOS devices using Bluetooth 4.0
  • Master page gives error
  • Uncaught Error: Could not find module `ember-load-initializers`
  • Apache 2.4 and php-fpm does not trigger apache http basic auth for php pages
  • How to set/get protobuf's extension field in Go?
  • Sending data from AppleScript to FileMaker records
  • Running a C# exe file
  • Symfony2: How to get request parameter
  • Google cloud sdk not working when python points python3
  • Why winpcap requires both .lib and .dll to run?
  • align graphs with different xlab
  • Run Powershell script from inside other Powershell script with dynamic redirection to file
  • Unanticipated behavior
  • using conditional logic : check if record exists; if it does, update it, if not, create it
  • Load html files in TinyMce
  • How can I get HTML syntax highlighting in my editor for CakePHP?
  • coudnt use logback because of log4j
  • Can't mass-assign protected attributes when import data from csv file
  • Unable to use reactive element in my shiny app