
Question:
So i have two data frame . Data Frame 1 like this :
+----------+------+---------+--------+------+
| OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341| 136| 4| 1| I|!||
|4295877346| 136| 4| 1| I|!||
|4295877341| 138| 2| 1| I|!||
|4295877341| 141| 4| 1| I|!||
|4295877341| 143| 2| 1| I|!||
|4295877341| 145| 14| 1| I|!||
| 123456789| 145| 14| 1| I|!||
| 809580109| 145| 9| 9| I|!||
+----------+------+---------+--------+------+
DataFrame2 is like below
+----------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877343| 149| 15| 2| I|!||
|4295877341| 136| null| null| I|!||
| 123456789| 145| 14| 1| D|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
| 809580109| 145| NULL| NULL| I|!||
+----------+------+-----------+----------+--------+
Now i have to join both data frame update data frame 1 column with matching records with data frame 2 .
Now key in both data frame is OrgId and ItemId.
So the expected output should be .
+----------+------+---------+--------+------+
| OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346| 136| 4| 1| I|!||
|4295877341| 145| 14| 1| I|!||
|4295877343| 149| 15| 2| I|!||
|4295877341| 136| null| null| I|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
| 809580109| 145| 9| 9| I|!||
+----------+------+---------+--------+------+
So i need to update data frame 1 with data frame 2 records . If records in data frame 1 is not found in 2 then also we need to retain that records . If any new records are found in dataframe 2 then that records needs to added in the output
Here is what i am doing ..
val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer")
.select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1")
.filter(!$"Action_1".contains("D"))
df3.show()
But i am getting below output .
+----------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877343| 149| 15| 2| I|!||
|4295877341| 136| null| null| I|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
+----------+------+-----------+----------+--------+
I am not getting 4295877346| 136| 4| 1| I|!|
record from data frame 1 ...
left_outer gives me below output
+----------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877341| 136| null| null| I|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
+----------+------+-----------+----------+--------+
Answer1:Let me explain first whats your mistake.
if you only join as below
val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer")
df3.show()
You will get
+----------+------+---------+--------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId|Sequence|Action|segmentId_1|Sequence_1|Action_1|
+----------+------+---------+--------+------+-----------+----------+--------+
|4295877346| 136| 4| 1| I|!|| null| null| null|
|4295877341| 145| 14| 1| I|!|| null| null| null|
|4295877343| 149| null| null| null| 15| 2| I|!||
|4295877341| 136| 4| 1| I|!|| null| null| I|!||
| 123456789| 145| 14| 1| I|!|| 14| 1| D|!||
|4295877341| 138| 2| 1| I|!|| 11| 22| I|!||
|4295877341| 141| 4| 1| I|!|| 10| 1| I|!||
|4295877341| 143| 2| 1| I|!|| 1| 1| I|!||
+----------+------+---------+--------+------+-----------+----------+--------+
It is full evident that the filter
in your code is filtering the null
as well in Action_1
column
So the working code for you is to change the null
values that you get after you join
to valid data from other table where the data is present.
val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer")
.withColumn("segmentId_1", when($"segmentId_1".isNotNull, $"segmentId_1").otherwise($"segmentId"))
.withColumn("Sequence_1", when($"Sequence_1".isNotNull, $"Sequence_1").otherwise($"Sequence"))
.withColumn("Action_1", when($"Action_1".isNotNull, $"Action_1").otherwise($"Action"))
.select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1")
.filter(!$"Action_1".contains("D") )
df3.show()
you should be getting the desired output as
+----------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877346| 136| 4| 1| I|!||
|4295877341| 145| 14| 1| I|!||
|4295877343| 149| 15| 2| I|!||
|4295877341| 136| null| null| I|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
+----------+------+-----------+----------+--------+
Answer2:Try left-outer instead of outer:
val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "left_outer")
.select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1")
.filter(!$"Action_1".contains("D"))
df3.show()
Left outer should retain all non matched in the left.
A nice tutorial <a href="http://kirillpavlov.com/blog/2016/04/23/beyond-traditional-join-with-apache-spark/" rel="nofollow">here</a>.