Extract specific columns form a text file to make a dataframe in scala


I need to clean some data in scala. I have the following raw data and they are exist in a text file:

06:36:15.718068 IP > Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0 06:36:15.718078 IP > Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160

I need to have all of them in a dataframe in the following way:

+----------------+-----------+----------+-------+--------+--------+--------+-----+ |time_stamp_0 |sender_ip_1|reciver_2 |s_por_3|r_por_4 |acknu_5 |winnum_6|len_7| +----------------+-----------+----------+-------+--------+--------+--------+-----+ |06:36:15.718068 | | |5001 |41516 |346 |163 | 0 | |06:36:15.718078 | | |41516 |5001 | 0 | 58 |65160| +----------------+-----------+----------+-------+--------+--------+--------+-----+

I used the following code to do that to gain the above dataframe.

val customSchema = StructType(Array( StructField("time_stamp_0", StringType, true), StructField("sender_ip_1", StringType, true), StructField("receiver_ip_2", StringType, true), StructField("s_port_3", StringType, true), StructField("r_port_4", StringType, true), StructField("acknum_5", StringType, true), StructField("winnum_6", StringType, true), StructField("len_7", IntegerType, true))) ///////////////////////////////////////////////////make train dataframe val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt") val Row_Dstream_Train = Dstream_Train.map(line => line.split(">")).map(array => { val first = Try(array(0).trim.split(" ")(0)) getOrElse "" val second = Try(array(1).trim.split(" ")(0)) getOrElse "" val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse "" val fourth = Try(array(3).trim.split(" ")(0)) getOrElse "" val fifth = Try(array(4).trim.split(" ") (0)) getOrElse "" val sixth = Try(array(5).trim.split(" ") (0)) getOrElse "" val seventh = Try(array(6).trim.split(" ")(0)) getOrElse "" val eighth = Try(array(7).trim.split(" ")(0)) getOrElse "" val firstFixed = first.take(first.lastIndexOf(".")) val secondfix = second.take(second.lastIndexOf(".")) val thirdFixed = third.take(third.lastIndexOf(".")) Row.fromSeq(Seq(firstFixed, secondfix, thirdFixed, fourth,fifth,sixth,seventh,eighth)) }) val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)

But the <strong>problem is that from the thired column nothing extracted!</strong> Can you please guid me why the third column is extracted empty? Thanks


Your input data is not of fixed length so it was a bit tricky to get the solution you require. Considering the input data your provided following can be solution. You can change as your need increases

val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => { val array1 = array(0).trim.split("IP") val array2 = array1(1).split(">") val array3 = array2(1).split(":") val acknum5 = if(array(1).contains("seq")) array(2) else array(1) val winnum6 = if(array(1).contains("seq")) array(3) else array(2) val len7 = if(array(1).contains("seq")) array(1).trim.split(" ")(1) else "" val first = Try(array1(0).trim) getOrElse "" val second = Try(array2(0).trim) getOrElse "" val third = Try(array3(0)) getOrElse "" val sixth = Try(acknum5.trim.split(" ")(1)) getOrElse "" val seventh = Try(winnum6.trim.split(" ")(1)) getOrElse "" val eighth = Try(len7.substring(len7.lastIndexOf(":")+1, len7.length).toInt) getOrElse 0 val secondfix = second.take(second.lastIndexOf(".")) val sport3 = second.substring(second.lastIndexOf(".")+1, second.length) val thirdFixed = third.take(third.lastIndexOf(".")) val rport4 = third.substring(third.lastIndexOf(".")+1, third.length) Row.fromSeq(Seq(first, secondfix, thirdFixed, sport3,rport4,sixth,seventh,eighth)) }) val Frist_Dataframe = sqlContext.createDataFrame(Row_Dstream_Train, customSchema)

You will get output as

+---------------+-----------+-------------+--------+--------+--------+--------+-----+ |time_stamp_0 |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7| +---------------+-----------+-------------+--------+--------+--------+--------+-----+ |06:36:15.718068| | |5001 |41516 |346 |163 |0 | |06:36:15.718078| | |41516 |5001 |0 |58 |65161| +---------------+-----------+-------------+--------+--------+--------+--------+-----+

I hope the solution is helpful


After your first map

file.map(line => line.split(">") ).collect

the output that you have is:

Array[Array[String]] = Array( Array("06:36:15.718068 IP ", " Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0"), Array("06:36:15.718078 IP ", " Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160"))

As you can see, you are getting two arrays, soin your next map step, anything you refer to after array(1) is ArrayIndexOutOfBoundsException

You need to dig deep and check for characters on which you can split. This should work to extract the third column.

val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt") val third = Dstream_Train.map(line => line.split(">") ).map( x => x(1).split(":")(0).splitAt(x(1).split(":")(0).lastIndexOf("."))._1 ).collect third: Array[String] = Array("", "")

Similarily you can use that to get other columns but as suggested, RegEx should be cleaner and easier.


There are a number of issues with this code. The first that needs to be mentioned in the Schema. You are declaring an IntegerType in the schema, but if it does not exist you are use a StringType for a null value. So that will need to be changed.

Also, the Array as pointed out above is an issue as you will get an indexing error.

I just saw Ramesh posted before me with an answer, but this is another way using RegEx.

RegEx is another way forward to solve this. If you look at your example you should notice that in fact that the two rows are in fact structurally different.

So here is what I did to get the outcome (albeit the Regex may need to be tested a bit more just in case). So start with the Regex cases:

object RegexPatterns{ // this needs to be done this way to avoid serialisation errors val patternTS: Regex = "([0-9]+:[0-9]+:[0-9]+.[0-9]+)".r val patternSIP1: Regex = "(?<=\\b IP \\b)([0-9]+.[0-9].[0-9].[0-9])(?=.[0-9]+)".r val patternRIP2: Regex = "(?<=\\b > \\b)([0-9]..[0-9].[0-9].[0-9])(?=.[0-9]+)".r val patternSP3: Regex = "(?<=\\b IP \\b[0-9]..[0-9].[0-9].[0-9].)([0-9]+)".r val patternRP4: Regex = "(?<=\\b > \\b[0-9]..[0-9].[0-9].[0-9].)([0-9]+)".r val patternAN5: Regex = "(?<=\\back \\b)([0-9]+)".r val patternWN6: Regex = "(?<=\\bwin \\b)([0-9]+)".r val patternL7: Regex = "(?<=\\blength \\b)([0-9]+)".r }

The onto your already implemented code:

import RegexPatterns._ val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt") val customSchema = StructType(Array( StructField("time_stamp_0", StringType, nullable = true), StructField("sender_ip_1", StringType, nullable = true), StructField("receiver_ip_2", StringType, nullable = true), StructField("s_port_3", StringType, nullable = true), StructField("r_port_4", StringType, nullable = true), StructField("acknum_5", StringType, nullable = true), StructField("winnum_6", StringType, nullable = true), StructField("len_7", StringType, nullable = true))) ///////////////////////////////////////////////////make train dataframe val Dstream_Train: RDD[String] = sparkContext.parallelize(input) val Row_Dstream_Train: RDD[Row] = Dstream_Train.map { line: String => val first = Try((patternTS findAllIn line).mkString(",")) getOrElse "" val second = Try((patternSIP1 findAllIn line).mkString(",")) getOrElse "" val third = Try((patternRIP2 findAllIn line).mkString(",")) getOrElse "" val fourth = Try((patternSP3 findAllIn line).mkString(",")) getOrElse "" val fifth = Try((patternRP4 findAllIn line).mkString(",")) getOrElse "" val sixth = Try((patternAN5 findAllIn line).mkString(",")) getOrElse "" val seventh = Try((patternWN6 findAllIn line).mkString(",")) getOrElse "" val eighth = Try((patternL7 findAllIn line).mkString(",")) getOrElse "" Row.fromSeq(Seq(first, second, third, fourth, fifth, sixth, seventh, eighth)) } val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema) Frist_Dataframe.show(false)

This yields:

+---------------+-----------+-------------+--------+--------+--------+--------+-----+ |time_stamp_0 |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7| +---------------+-----------+-------------+--------+--------+--------+--------+-----+ |06:36:15.718068| | |5001 |41516 |346 |163 |0 | |06:36:15.718078| | |41516 |5001 |0 |58 |65160| +---------------+-----------+-------------+--------+--------+--------+--------+-----+


  • Replacing entire table row in dynamic table
  • NSDateComponents: difference between weekOfYear and weekOfMonth
  • Scala Play Future Interdependancy
  • orbeon-form : eclipse project
  • Python csv skip first two empty rows
  • Is there a scala replacement for Guava MultiSet and Table concepts?
  • Scala Play: how to wait until future is complete before OK result is returned to frontend
  • sorting array of arrays using one of their indexes
  • regular expression multiple matches
  • The difference between 0 and '0' in array
  • Execution order of expressions in SELECT statement
  • C# “cannot assign field because it is a foreach iteration variable”
  • Excel 2007: Format of email address from Outlook 2007
  • Updating and removing unique join relationships in CakePHP
  • Python: Split a String Field into 3 Separate Fields using Lambda
  • Negating Regex PO BOX
  • how can I compare dates in array to find the earliest one?
  • Use default value of a column in stored procedures
  • Geom_jitter colour based on values
  • Divide a $1 by 3 and adjusting 1 cent
  • Replace last two characters in column
  • NSIS decompiler
  • PayPal API Listener Website Payments Standard URI
  • How do I shift the decimal place in Python?
  • SAXReader not re-ecape characters
  • Checking free space on FTP server
  • Using $this when not in object context
  • How do I fake an specific browser client when using Java's Net library?
  • How reduce the height of an mschart by breaking up the y-axis
  • How to add a column to a Pandas dataframe made of arrays of the n-preceding values of another column
  • script to move all files from one location to another location
  • Perl system calls when running as another user using sudo
  • Rearranging Cells in UITableView Bug & Saving Changes
  • R: gsub and capture
  • AT Commands to Send SMS not working in Windows 8.1
  • Benchmarking RAM performance - UWP and C#
  • Angular 2 constructor injection vs direct access
  • IndexOutOfRangeException on multidimensional array despite using GetLength check
  • Running Map reduces the dimensions of the matrices
  • Android Heatmap on canvas or ImageView