How to remove duplicates (more like filter based on multiple properties) with Spark RDD in Scala?


As a policy, we do not update our documents, but we recreate with updated values. When I will process the events, I would like to keep only the updated ones, so I would like to filter items out of my RDD based on multiple values. For instance, say an item would be:

{ "name": "Sample", "someId": "123", "createdAt": "2016-09-21T02:16:32+00:00" }

and when it is updated:

{ "name": "Sample-Updated", "someId": "123", # This remains the same "createdAt": "2016-09-21T03:16:32+00:00" # This is greater than the one of above, since the update operation is done after the document is generated }

What I have been doing is:

items = items.toList. .sortBy(_.createdAt).reverse items = items .groupBy(_.someId) .map(_._2.head)(breakOut)

but this obviously converts RDD into a list; end of Spark. How do I achieve this?


So far, I have achieved this by looking into comments, but no luck when adding into set:

// Is this correct? (1) val initialSet = sc.parallelize(List[(String, Event)]()) val addToSet = (eventSet: RDD[(String, Event)], event: Event) => { // What to do here? (2) } // Is this correct? (3) val mergeSets = (p1: RDD[(String, Event)], p2: RDD[(String, Event)]) => p1.union(p2) // resultSet is of type RDD[(String, RDD[(String, Event)])]. How to get it as RDD[(String, Event)]? (4) val resultSet = initialSet.aggregateByKey(initialSet)(addToSet, mergeSets)


You should be able to use reduceByKey here:

<pre class="lang-scala prettyprint-override">rdd .keyBy(_.someId) .reduceByKey((x, y) => if (x.createdAt > y.createdAt) x else y) .values

where initial keyBy creates (id, object), reduceByKey selects the most recent object, and values drops keys.


First use map to get a pair RDD like (data)->(name,data) then use aggregateByKey,it will perform an aggregate operation on pairs that have same keys just like group by.


  • Calculating number of occurrences of dual parameter match in a csv appending the results to csv
  • Sort a list of pairs using sort Haskell
  • PHP Sorting
  • width of column in rich:datatable
  • VSTS build failed with Git fetch failed with exit code: 128
  • View Azure WebJob Schedule in Portal?
  • ObjectMaterialize in EF not firing on first level query
  • dc.js / crossfilter dimensions (year / week)
  • Getting directory of input file (Applescript)
  • Finding parents in a tree hierarchy for a given child LINQ (lambda expression)
  • How to sort things out in ListView?
  • JS object key sequence
  • How to bind comma separated list of values to List
  • How to control Trigger state (Pause, Play) using code (not just buttons)
  • How to replace TouchesBegan with UIGestureRecognizer
  • How can I select the most recent and distinct records using LINQ?
  • Stitching 2 images (OpenCV)
  • Dynamically set LESS variables from user settings
  • How to concat Pandas dataframe columns
  • What is the default HTTP verb in WebApi ? GET or POST?
  • How to get the date of next specified day of week
  • Python cosine function precision [duplicate]
  • NHibernate manually control fetching
  • Group list of tuples by item
  • TFS 2015 - Waiting for an agent to be requested
  • How to synchronize jQuery dialog box to act like alert() of Javascript
  • Invalid Date on validation Date of js
  • How to remove a SwiftyJSON element?
  • Object and struct member access and address offset calculation
  • How do I get HTML corresponding to current DOM tree?
  • one Local Olampyad Questions on Informatic in 2011
  • JQuery Internet Explorer and ajaxstop
  • JSON response opens as a file, but I can't access it with JavaScript
  • How to set/get protobuf's extension field in Go?
  • Cassandra Data Model
  • Trying to switch camera back to front but getting exception
  • Change an a tag attribute in JavaScript based on screen width
  • When should I choose bucket sort over other sorting algorithms?
  • Importing jscolor library in angular 2
  • Python: how to group similar lists together in a list of lists?