Profile picture Schedule a Meeting
c a n d l a n d . n e t

Spark RDD to CSV with headers

Dusty Candland | | clojure, spark

We have some Spark jobs that we want the results stored as a CSV with headers so they can be directly used. Saving the data as CSV is pretty straight forward, just map the values into CSV lines.

The trouble starts when you want that data in one file. FileUtil.copyMerge is the key for that. It takes all the files in a directly, like those output by saveAsTextFile and merges them into one file.

Great, now we just need a header line. My first attempt was to union an RDD w/ the header and the output RDD. This works sometimes, if you get lucky. Since union just smashes everything together, more often then not, the CSV has the header row somewhere in the middle of the results.

No problem! I'll just prepend the header after the copyMerge. Nope, generally Hadoop is write only, you can get append to work, but still not a great option.

The solution was to write the header as a file BEFORE the copyMerge using a name that puts it first in the resulting CSV! Here's what we ended up using:

(ns roximity.spark.output
  (:require [sparkling.conf :as conf]
            [sparkling.core :as spark]
            [sparkling.destructuring :as de]
            [clojure.data.csv :as csv]
            [clojure.java.io :as io]
    )
  (:import [org.apache.hadoop.fs FileUtil FileSystem Path]
    )
  )

(defn- csv-row
  [values]
  (let [writer (java.io.StringWriter.)]
    (clojure.data.csv/write-csv writer [values])
    (clojure.string/trimr (.toString writer))
    )
  )

(defn save-csv
  "Convert to CSV and save at URL.csv. URL should be a directory.
   Headers should be a vector of keywords that match the map in a tuple value.
   and should be in the order you want the data writen out in."
  [url headers sc rdd]
  (let [
      header (str (csv-row (map name headers)) "\n")
      file url
      dest (str file ".csv")
      conf (org.apache.hadoop.conf.Configuration.)
      srcFs (FileSystem/get (java.net.URI/create file) conf)
    ]
    (FileUtil/fullyDelete (io/as-file file))
    (FileUtil/fullyDelete (io/as-file dest))
    (->> rdd
      (spark/map (de/value-fn (fn [value]
        (let [values (map value headers)]
          (csv-row values)
          )
        )))
      (spark/coalesce 1 true)
      (#(.saveAsTextFile % file))
      )
    (with-open [out-file (io/writer (.create srcFs (Path. (str file "/_header"))))]
      (.write out-file header)
      )
    (FileUtil/copyMerge srcFs (Path. file) srcFs (Path. dest) true conf nil)
    (.close srcFs)
    )
  )

This works for local files and s3, and it should work for HDFS. Since we're using s3 and the results are not huge, we use (coalesce 1 true) so that only one part file is written to s3, without that we had issues with too many requests. Could probably use a higher number and find a happy medium, but we just use 1.

Webmentions

These are webmentions via the IndieWeb and webmention.io. Mention this post from your site: