53402

Create an Spark udf function to iterate over an Array of bytes and convert it to numeric

Question:

I have a Dataframe with an array of bytes in spark (python)

DF.select(DF.myfield).show(1, False) +----------------+ |myfield | +----------------+ |[00 8F 2B 9C 80]| +----------------+

i'm trying to convert this array to a string

'008F2B9C80'

then to the numeric value

int('008F2B9C80',16)/1000000 > 2402.0

I have found some udf sample, so i already can extract a part of the array like this :

u = f.udf(lambda a: format(a[1],'x')) DF.select(u(DF['myfield'])).show() +------------------+ |<lambda>(myfield) | +------------------+ | 8f| +------------------+

Now how to iterate over the whole array ? Is it possible to do all the operations i have to code in the udf function ?

May be there is a best way to do the cast ???

Thanks for your help

Answer1:

Here is the scala df solution. You need to import the scala.math.BigInteger

scala> val df = Seq((Array("00","8F","2B","9C","80"))).toDF("id") df: org.apache.spark.sql.DataFrame = [id: array<string>] scala> df.withColumn("idstr",concat_ws("",'id)).show +--------------------+----------+ | id| idstr| +--------------------+----------+ |[00, 8F, 2B, 9C, 80]|008F2B9C80| +--------------------+----------+ scala> import scala.math.BigInt import scala.math.BigInt scala> def convertBig(x:String):String = BigInt(x.sliding(2,2).map( x=> Integer.parseInt(x,16)).map(_.toByte).toArray).toString convertBig: (x: String)String scala> val udf_convertBig = udf( convertBig(_:String):String ) udf_convertBig: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType))) scala> df.withColumn("idstr",concat_ws("",'id)).withColumn("idBig",udf_convertBig('idstr)).show(false) +--------------------+----------+----------+ |id |idstr |idBig | +--------------------+----------+----------+ |[00, 8F, 2B, 9C, 80]|008F2B9C80|2402000000| +--------------------+----------+----------+ scala>

There is no spark equivalent for scala's BigInteger, so I'm converting the udf() result to string.

Answer2:

I have found a python solution too

from pyspark.sql.functions import udf spark.udf.register('ByteArrayToDouble', lambda x: int.from_bytes(x, byteorder='big', signed=False) / 10e5) spark.sql('select myfield, ByteArrayToDouble(myfield) myfield_python, convert_binary(hex(myfield))/1000000 myfield_scala from my_table').show(1, False) +-------------+-----------------+----------------+ |myfield |myfield_python |myfield_scala | +-------------+-----------------+----------------+ |[52 F4 92 80]|1391.76 |1391.76 | +-------------+-----------------+----------------+ only showing top 1 row

I'm now able to bench the two solutions

Thank you for your precious help

Answer3:

I came across this question while answering your newest one.

Suppose you have the df as

+--------------------+ | myfield| +--------------------+ |[00, 8F, 2B, 9C, 80]| | [52, F4, 92, 80]| +--------------------+

Now you can use the following lambda function

def func(val): return int("".join(val), 16)/1000000 func_udf = udf(lambda x: func(x), FloatType())

And to create the output, use

df = df.withColumn("myfield1", func_udf("myfield"))

This yields,

+--------------------+--------+ | myfield|myfield1| +--------------------+--------+ |[00, 8F, 2B, 9C, 80]| 2402.0| | [52, F4, 92, 80]| 1391.76| +--------------------+--------+

Recommend

  • Apache Spark group by combining types and sub types
  • Multiple-columns operations in Spark
  • How to split a text file into multiple columns with Spark
  • getting the new row id from pySpark SQL write to remote mysql db (JDBC)
  • Count the number of non-null values in a Spark DataFrame
  • Converting float[,] to list in f#?
  • How to customize whisker lines on a geom_box plot differently than the lines of the box itself
  • Extract data between rows r
  • object play not found in scala application
  • Checkpointing In ALS Spark Scala
  • iOS 6 dateFromString returns wrong date
  • RavenDB indexing errors
  • Multiple Left Join LINQ-to-entities
  • Scala: Function returning an unknown type
  • Spark (Scala) Writing (and reading) to local file system from driver
  • File extension of zlib zipped html page?
  • iOS Cordova first plugin - plugin.xml to inject a feature
  • Suqueries in select clause with JPA
  • Group list of tuples by item
  • Default parameter as generic type
  • IE11 throwing “SCRIPT1014: invalid character” where all other browsers work
  • Breaking out column by groups in Pandas
  • Problem deserializing objects from cache on MyBatis 3/Java
  • jQuery .attr() and value
  • Read a local file using javascript
  • ImageMagick, replace semi-transparent white with opaque white
  • R - Combining Columns to String Based on Logical Match
  • Cannot connect to cassandra from Spark
  • Cross-Platform Protobuf Serialization
  • Alternatives to the OPTIONAL fallback SPARQL pattern?
  • Rearranging Cells in UITableView Bug & Saving Changes
  • How to delete a row from a dynamic generate table using jquery?
  • json Serialization in asp
  • Rails 2: use form_for to build a form covering multiple objects of the same class
  • Benchmarking RAM performance - UWP and C#
  • Angular 2 constructor injection vs direct access
  • How to stop GridView from loading again when I press back button?
  • IndexOutOfRangeException on multidimensional array despite using GetLength check
  • apache spark aggregate function using min value
  • reshape alternating columns in less time and using less memory