72388

kafka-streams join produce duplicates

Question:

I have two topics:

// photos {'id': 1, 'user_id': 1, 'url': 'url#1'}, {'id': 2, 'user_id': 2, 'url': 'url#2'}, {'id': 3, 'user_id': 2, 'url': 'url#3'} // users {'id': 1, 'name': 'user#1'}, {'id': 1, 'name': 'user#1'}, {'id': 1, 'name': 'user#1'}

I create map photo by user

KStream<Integer, Photo> photo_by_user = ... photo_by_user.to("photo_by_user")

Then, I try to join two tables:

KTable<Integer, User> users_table = builder.table("users"); KTable<Integer, Photo> photo_by_user_table = builder.table("photo_by_user"); KStream<Integer, Result> results = users_table.join(photo_by_user_table, (a, b) -> Result.from(a, b)).toStream(); results.to("results");

result like

{'photo_id': 1, 'user': 1, 'url': 'url#1', 'name': 'user#1'} {'photo_id': 2, 'user': 2, 'url': 'url#2', 'name': 'user#2'} {'photo_id': 3, 'user': 3, 'url': 'url#3', 'name': 'user#3'} {'photo_id': 1, 'user': 1, 'url': 'url#1', 'name': 'user#1'} {'photo_id': 2, 'user': 2, 'url': 'url#2', 'name': 'user#2'} {'photo_id': 3, 'user': 3, 'url': 'url#3', 'name': 'user#3'}

I see that results are duplicated. Why, and how to avoid it?

Answer1:

You might hit a known bug. On "flush" KTable-KTable join might produce some duplicates. Note, that those duplicates are strictly speaking not incorrect, because the result is an update-stream and updating "A" to "A" does not change the result. It's of course undesired to get those duplicates. Try to disable caching -- without caching, the "flush issues" should not occur.

Recommend

  • Error retrieving Avro schema for id 1, Subject not found.; error code: 40401
  • How to start a docker container (ubuntu image)
  • Is it better to use local variables or chain methods inline? [closed]
  • Argument 5: cannot convert from 'System.Drawing.Image' to 'string' - calling cla
  • Save image as is in photo album using swift
  • Azure table store snapshot/backup capability
  • Bigquery event streaming and table creation
  • Button text different than value submitted in query string
  • Alternative to overridePendingTransition() - Android
  • Is there a perl module to validate passwords stored in “{crypt}hashedpassword” “{ssha}hashedpassword
  • How to render a blob on a canvas element?
  • Rails Find when some params will be blank
  • MongoError: Incorrect arguments
  • MailKit: The IMAP server replied to the 'EXAMINE' command with a 'BAD' response
  • Dialing with Intent.ACTION_CALL stopps at # in phone number
  • Django: Count of Group Elements
  • Highlight one bar in a series in highcharts?
  • Lost migrations and Azure database is now out of sync
  • Linq Objects Group By & Sum
  • Is there any way to access browser form field suggestions from JavaScript?
  • Android screen density dpi vs ppi
  • Dynamically accessing properties of knockoutjs observable array
  • script to move all files from one location to another location
  • How to check if every primary key value is being referenced as foreign key in another table
  • WinForms: two way TextBox problem
  • How to handle AllServersUnavailable Exception
  • Can I make an Android app that runs a web view in Chrome 39?
  • Large data - storage and query
  • How to get next/previous record number?
  • Unanticipated behavior
  • Comma separated Values
  • Trying to get generic when generic is not available
  • Turn off referential integrity in Derby? is it possible?
  • Authorize attributes not working in MVC 4
  • Busy indicator not showing up in wpf window [duplicate]
  • How to Embed XSL into XML
  • UserPrincipal.Current returns apppool on IIS
  • Conditional In-Line CSS for IE and Others?
  • Python/Django TangoWithDjango Models and Databases
  • Net Present Value in Excel for Grouped Recurring CF