46719

How does Consumer.endOffsets work in Kafka?

Question:

Assume I've a timer task running indefinitely which iterates over the all the consumer groups in the kafka cluster and outputs lag, committed offset and end offset for all partitions for each group. Similar to how Kafka console consumer group script works except it's for all groups.

Something like

Single Consumer - Not Working - Doesn't return offsets for some of the provided topic partitions ( ex. 10 provided - 5 Offsets Returned )

Consumer consumer; static { consumer = createConsumer(); } run() { List<String> groupIds = getConsumerGroups(); for(String groupId: groupIds) { List<TopicParition> topicParitions = getTopicParitions(groupId); consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5) } }

Multiple Consumers - Working

run() { List<String> groupIds = getConsumerGroups(); for(String groupId: groupIds) { List<TopicParition> topicParitions = getTopicParitions(groupId); Consumer consumer = createConsumer(); consumer.endOffsets(topicParitions); This works!!! } }

Versions:Kafka-Client 2.0.0

Am I using the consumer api incorrectly ? Ideally I would like to use single consumer.

Let me know if you need more details.

Answer1:

I think you're almost there. First collect <em>all</em> the topic partitions you're interested in, and <em>then</em> issue a consumer.endOffsets command.

Bear in mind that I haven't tried to run it, but something like this should work:

run() { Consumer consumer = createConsumer(); List<String> groupIds = getConsumerGroups(); List<TopicPartition> topicPartitions = new ArrayList<>(); for (String groupId: groupIds) { topicPartitions.addAll(getTopicPartitions(groupId)); } consumer.endOffsets(topicPartitions); }

Answer2:

It's a bug in Fetcher.fetchOffsetsByTimes() specifically inside groupListOffsetRequests method in which the logic was not adding the partitions for retry where leader for requesting offset for a partition was unknown or unavailable.

This was more noticeable when you use the single consumer across all consumer group partitions where some groups already have the topics partition leader information when we requested endoffsets and for the topics partitions where there is no leader information is unknown or unavailable are left off because of the bug.

Later, I realized it was not a good idea to pull topics partitions from each consumer group instead made the change to read the topics partitions from AdminClient.listTopics & AdminClient.describeTopics and pass all at once to Consumer.endOffsets.

Although this completely doesn't resolve the issue as topics/partitions may still be unavailable or unknown between multiple runs.

More information can be found - <a href="https://issues.apache.org/jira/browse/KAFKA-7044" rel="nofollow">KAFKA-7044</a> & <a href="https://github.com/apache/kafka/pull/5627" rel="nofollow">pull request</a>. This has been fixed and scheduled for 2.1.0 release.

Recommend

  • ggplot Integer Breaks on Facets
  • Submit form with anchor + javascript - bad practice?
  • response time is higher, when I call procedure in oracle through simple Jdbc call compared to the lo
  • How to calculate the lbp codes at the ends of the images?
  • How to constantly wait for user input while printing values in a script?
  • PowerShell + AD: Return users from within any groups in a specific OU, plus count
  • Faster alternative to using the `map` function
  • C# Vertex Edges
  • Use tornado future to fetch url, two different ways get different results
  • Why node_modules is empty after docker build?
  • How to upload an image in iPhone application
  • Block scroll in ScrollView when touch on android.gesture.GestureOverlayView
  • Facebook Login using PHP SDK not working
  • Is Xss protection in Spring security enabled by default?
  • summarize data from csv using R
  • iText Java - add header to an existing pdf
  • Does Tomcat cache compiled JSP pages in memory if scratchdir is read-only?
  • Is there a way to remove a script from a doc (using the new doc embedded script)
  • How do non-HTML5 JavaScript HSV color pickers work?
  • LocationMatch to allow a main url excluding specific sub folder
  • in Gwt, there are 2 different packages (or 2 options) for doing drag n Drop? Which one is better?
  • Handle query parameters recursively using htaccess
  • Django self join , How to convert this query to ORM query
  • How to close a WebView with double-click?
  • How to define something in JavaScript [closed]
  • Calculate time from document
  • Neo4j…how to get a visual representation of my data?
  • how to get the location(lat/lng) on google maps v3 from the location(x,y)
  • How to turn off notice reporting in xampp?
  • How to get rgb from transparent pixel in js
  • Bitrate JWplayer
  • Sign a Pdf using custom digital signature in Java
  • Accessing Arguments, Workflow Variables from custom activities
  • How to use FirstOrDefault inside Include
  • PHP Permalinks.. how to change?
  • media foundation H264 decoder not working properly
  • Running R's aov() mixed effects model from Python using rpy2
  • Access to a Matlab gui from the web
  • ReferenceError: TextEncoder is not defined