6687

Spark (Scala) Writing (and reading) to local file system from driver

<strong>1st Question :</strong> I have a 2 node virtual cluster with hadoop. I have a jar that runs a spark job. This jar accepts as a cli argument : a path to a commands.txt file which tells the jar which commands to run.

I run the job with spark-submit, and i have noticed that my slave node wasn't running because it couldn't find the commands.txt file which was local on the master.

This is the command i used to run it :

./spark-1.6.1-bin-hadoop2.6/bin/spark-submit --class univ.bigdata.course.MainRunner --master yarn\ --deploy-mode cluster --executor-memory 1g \ --num-executors 4 \ final-project-1.0-SNAPSHOT.jar commands commands.txt

Do i need to upload commands.txt to the hdfs and give the hdfs path instead as follows ? :

hdfs://master:9000/user/vagrant/commands.txt

<strong>2nd Question :</strong> How do i write to a file on the driver machine in the cwd ? I used a normal scala filewriter to write output to queries_out.txt and it worked fine when using spark submit with

-master local[]

But, when running in

-master yarn

I cant find the file, No exceptions are thrown but i just cant locate the file. It doesn't exist as if it was never written. Is there a way to write the results to a file on the driver machine locally ? Or should i only write results to HDFS ?

Thanks.

Answer1:

Question 1</b>: Yes, uploading it to hdfs or any network accessible file system is how you solve your problem.

Question 2</b>:

This is a bit tricky. Assuming your results are in a RDD you could call collect(), that will aggregate all the data on your driver process. Then, you have a standard collection in your hands which you could simply write on disk. Note that you should give your driver's process enough memory to be able to hold all results in memory, do not forget to also increase the maximum result size. The parameters are:

--driver-memory 16G --conf "spark.driver.maxResultSize=15g"

This is has absolutely poor scaling behaviour in both communication complexity and memory (both in the size of the result RDD). This is the easiest way and perfectly fine for a toy project or when the data set is always small. In all other cases it will certainly blow up at some point.

The better way, as you may have mentioned, is to use the build-in "saveAs" methods to write to i.e. hdfs (or another storage format). You can check the documentation for that: http://spark.apache.org/docs/latest/programming-guide.html#actions

Note that if you only want to persist the RDD, because you are reusing it in several computations (like cache, but instead of holding it in memory hold it in disk) there is also a persist method on RDDs.

Answer2:

Solution was very simple, i changed --deploy-mode cluster to --deploy-mode client and then the file writes were done correctly on the machine where i ran the driver.

Answer3:

<strong>Answer to Question 1:</strong> Submitting spark job with the --files tag followed by path to a local file downloads the file from the driver node to the cwd of all the worker nodes and thus be accessed just by using its name.

Recommend

  • FFMPEG Output file #0 does not contain any stream
  • Laravel 5.3 store and read file directories
  • How To Get The Root Directory Of an Elixir Project
  • Docker building fails randomly with Error mounting
  • How to “vagrant up” multiple nodes at a time?
  • Why do I need to force git to sync my remote repository?
  • Storage::move giving “File not found at path:” Laravel with Ubuntu
  • subprocess.Popen: 'OSError: [Errno 13] Permission denied' only on Linux
  • Scala split a multi line string by lines that contain all hyphens
  • object play not found in scala application
  • Cassandra 2.1: Recursion by nesting UDT's
  • Checkpointing In ALS Spark Scala
  • Scala: Function returning an unknown type
  • Android cannot disable cut copy paste
  • pip in virtualenv gets ConnectTimeoutError
  • Bootstrap (v3.3.4) glyphicons not displayed in IE when refresh page (F5)
  • How do I display a dialog that asks the user multi-choice questıon using tkInter?
  • Graphics.CopyFromScreen [Web application] + The handle is invalid
  • pyodbc doesn't report sql server error
  • ADO and msqli connections very slow
  • How to handle images sent by a mobile device?
  • How to use carriage return with multiple line?
  • Recording logins for password protected directories
  • Linq Objects Group By & Sum
  • Email format validation in mvc3 view
  • Pass value from viewmodel to script in zk
  • C# - Is there a limit to the size of an httpWebRequest stream?
  • Optimizing database types to compact database (SQLite)
  • javaw.exe and eclipse startup problems
  • How to add date and time under each post in guestbook in google app engine
  • 'TypeError' while using NSGA2 to solve Multi-objective prob. from pyopt-sparse in OpenMDAO
  • Function pointer “assignment from incompatible pointer type” only when using vararg ellipsis
  • Buffer size for converting unsigned long to string
  • Benchmarking RAM performance - UWP and C#
  • coudnt use logback because of log4j
  • Authorize attributes not working in MVC 4
  • Busy indicator not showing up in wpf window [duplicate]
  • reshape alternating columns in less time and using less memory
  • Python/Django TangoWithDjango Models and Databases
  • Net Present Value in Excel for Grouped Recurring CF