8885

Join two data frame and update one data frame records with another

Question:

So i have two data frame . Data Frame 1 like this :

+----------+------+---------+--------+------+ | OrgId|ItemId|segmentId|Sequence|Action| +----------+------+---------+--------+------+ |4295877341| 136| 4| 1| I|!|| |4295877346| 136| 4| 1| I|!|| |4295877341| 138| 2| 1| I|!|| |4295877341| 141| 4| 1| I|!|| |4295877341| 143| 2| 1| I|!|| |4295877341| 145| 14| 1| I|!|| | 123456789| 145| 14| 1| I|!|| | 809580109| 145| 9| 9| I|!|| +----------+------+---------+--------+------+

DataFrame2 is like below

+----------+------+-----------+----------+--------+ | OrgId|ItemId|segmentId_1|Sequence_1|Action_1| +----------+------+-----------+----------+--------+ |4295877343| 149| 15| 2| I|!|| |4295877341| 136| null| null| I|!|| | 123456789| 145| 14| 1| D|!|| |4295877341| 138| 11| 22| I|!|| |4295877341| 141| 10| 1| I|!|| |4295877341| 143| 1| 1| I|!|| | 809580109| 145| NULL| NULL| I|!|| +----------+------+-----------+----------+--------+

Now i have to join both data frame update data frame 1 column with matching records with data frame 2 .

Now key in both data frame is OrgId and ItemId.

So the expected output should be .

+----------+------+---------+--------+------+ | OrgId|ItemId|segmentId|Sequence|Action| +----------+------+---------+--------+------+ |4295877346| 136| 4| 1| I|!|| |4295877341| 145| 14| 1| I|!|| |4295877343| 149| 15| 2| I|!|| |4295877341| 136| null| null| I|!|| |4295877341| 138| 11| 22| I|!|| |4295877341| 141| 10| 1| I|!|| |4295877341| 143| 1| 1| I|!|| | 809580109| 145| 9| 9| I|!|| +----------+------+---------+--------+------+

So i need to update data frame 1 with data frame 2 records . If records in data frame 1 is not found in 2 then also we need to retain that records . If any new records are found in dataframe 2 then that records needs to added in the output

Here is what i am doing ..

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer") .select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1") .filter(!$"Action_1".contains("D")) df3.show()

But i am getting below output .

+----------+------+-----------+----------+--------+ | OrgId|ItemId|segmentId_1|Sequence_1|Action_1| +----------+------+-----------+----------+--------+ |4295877343| 149| 15| 2| I|!|| |4295877341| 136| null| null| I|!|| |4295877341| 138| 11| 22| I|!|| |4295877341| 141| 10| 1| I|!|| |4295877341| 143| 1| 1| I|!|| +----------+------+-----------+----------+--------+

I am not getting 4295877346| 136| 4| 1| I|!| record from data frame 1 ...

left_outer gives me below output

+----------+------+-----------+----------+--------+ | OrgId|ItemId|segmentId_1|Sequence_1|Action_1| +----------+------+-----------+----------+--------+ |4295877341| 136| null| null| I|!|| |4295877341| 138| 11| 22| I|!|| |4295877341| 141| 10| 1| I|!|| |4295877341| 143| 1| 1| I|!|| +----------+------+-----------+----------+--------+

Answer1:

Let me explain first whats your mistake.

if you only join as below

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer") df3.show()

You will get

+----------+------+---------+--------+------+-----------+----------+--------+ | OrgId|ItemId|segmentId|Sequence|Action|segmentId_1|Sequence_1|Action_1| +----------+------+---------+--------+------+-----------+----------+--------+ |4295877346| 136| 4| 1| I|!|| null| null| null| |4295877341| 145| 14| 1| I|!|| null| null| null| |4295877343| 149| null| null| null| 15| 2| I|!|| |4295877341| 136| 4| 1| I|!|| null| null| I|!|| | 123456789| 145| 14| 1| I|!|| 14| 1| D|!|| |4295877341| 138| 2| 1| I|!|| 11| 22| I|!|| |4295877341| 141| 4| 1| I|!|| 10| 1| I|!|| |4295877341| 143| 2| 1| I|!|| 1| 1| I|!|| +----------+------+---------+--------+------+-----------+----------+--------+

It is full evident that the filter in your code is filtering the null as well in Action_1 column

So the working code for you is to change the null values that you get after you join to valid data from other table where the data is present.

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer") .withColumn("segmentId_1", when($"segmentId_1".isNotNull, $"segmentId_1").otherwise($"segmentId")) .withColumn("Sequence_1", when($"Sequence_1".isNotNull, $"Sequence_1").otherwise($"Sequence")) .withColumn("Action_1", when($"Action_1".isNotNull, $"Action_1").otherwise($"Action")) .select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1") .filter(!$"Action_1".contains("D") ) df3.show()

you should be getting the desired output as

+----------+------+-----------+----------+--------+ | OrgId|ItemId|segmentId_1|Sequence_1|Action_1| +----------+------+-----------+----------+--------+ |4295877346| 136| 4| 1| I|!|| |4295877341| 145| 14| 1| I|!|| |4295877343| 149| 15| 2| I|!|| |4295877341| 136| null| null| I|!|| |4295877341| 138| 11| 22| I|!|| |4295877341| 141| 10| 1| I|!|| |4295877341| 143| 1| 1| I|!|| +----------+------+-----------+----------+--------+

Answer2:

Try left-outer instead of outer:

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "left_outer") .select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1") .filter(!$"Action_1".contains("D")) df3.show()

Left outer should retain all non matched in the left.

A nice tutorial <a href="http://kirillpavlov.com/blog/2016/04/23/beyond-traditional-join-with-apache-spark/" rel="nofollow">here</a>.

Recommend

  • Check if values of datetime column in df2 is within datateime values of df1 in R
  • Selecting data from multiple dataframes
  • Merge multiple data frames with different dimensions using Pandas
  • Getting a ratio in Pandas groupby object
  • Cumulative count of blocks of 1 with 0 separators in a binary vector in R
  • Partial/Full-match value in one RDD to values in another RDD
  • How can I transform an array of characters with a few lines of code to a data.frame?
  • Regex for Specific Tag
  • SQL: Getting the physical size of a subset of a table
  • How do I remove all but some records based on a threshold?
  • How do I get the list of bad records that didn't load in Bigquery?
  • Group list of tuples by item
  • Make new pandas columns based on pipe-delimited column with possible repeats
  • IE11 throwing “SCRIPT1014: invalid character” where all other browsers work
  • Sequential (transactional) API calls in angular 4 with state management
  • Not able to aggregate on nested fields in elasticsearch
  • Groovy: Unexpected token “:”
  • PHP buffered output depending on server setting?
  • Swift: Switch statement fallthrough behavior
  • Functions in global context
  • Replace value with Factor in r data.table
  • jQuery .attr() and value
  • Django: Count of Group Elements
  • How to access EntityManager inside Entity class in EJB3
  • Repeat a vertical line on every page in Report Builder / SSRS
  • Why value captured by reference in lambda is broken? [duplicate]
  • Is possible to count alias result on mysql
  • Check if a string to interpolate provides expected placeholders
  • How to check if every primary key value is being referenced as foreign key in another table
  • How to handle AllServersUnavailable Exception
  • How to get next/previous record number?
  • retrieve vertices with no linked edge in arangodb
  • using conditional logic : check if record exists; if it does, update it, if not, create it
  • What are the advantages and disadvantages of reading an entire file into a single String as opposed
  • Understanding cpu registers
  • Why joiner is not used after Sequence generator or Update statergy
  • apache spark aggregate function using min value
  • Add sale price programmatically to product variations
  • Recursive/Hierarchical Query Using Postgres
  • UserPrincipal.Current returns apppool on IIS