• Spark RDD to CSV with headers

    We have some Spark jobs that we want the results stored as a CSV with headers so they can be directly used. Saving the data as CSV is pretty straight forward, just map the values into CSV lines.

    The trouble starts when you want that data in one file. FileUtil.copyMerge is the key for that. It takes all the files in a directly, like those output by saveAsTextFile and merges them into one file.

    Great, now we just need a header line. My first attempt was to union an RDD w/ the header and the output RDD. This works sometimes, if you get lucky. Since union just smashes everything together, more often then not, the CSV has the header row somewhere in the middle of the results.

    No problem! I’ll just prepend the header after the copyMerge. Nope, generally Hadoop is write only, you can get append to work, but still not a great option.

    The solution was to write the header as a file BEFORE the copyMerge using a name that puts it first in the resulting CSV! Here’s what we ended up using:

    (ns roximity.spark.output
      (:require [sparkling.conf :as conf]
                [sparkling.core :as spark]
                [sparkling.destructuring :as de]
                [clojure.data.csv :as csv]
                [clojure.java.io :as io]
      (:import [org.apache.hadoop.fs FileUtil FileSystem Path]
    (defn- csv-row
      (let [writer (java.io.StringWriter.)]
        (clojure.data.csv/write-csv writer [values])
        (clojure.string/trimr (.toString writer))
    (defn save-csv
      "Convert to CSV and save at URL.csv. URL should be a directory.
       Headers should be a vector of keywords that match the map in a tuple value.
       and should be in the order you want the data writen out in."
      [url headers sc rdd]
      (let [
          header (str (csv-row (map name headers)) "\n")
          file url
          dest (str file ".csv")
          conf (org.apache.hadoop.conf.Configuration.)
          srcFs (FileSystem/get (java.net.URI/create file) conf)
        (FileUtil/fullyDelete (io/as-file file))
        (FileUtil/fullyDelete (io/as-file dest))
        (->> rdd
          (spark/map (de/value-fn (fn [value]
            (let [values (map value headers)]
              (csv-row values)
          (spark/coalesce 1 true)
          (#(.saveAsTextFile % file))
        (with-open [out-file (io/writer (.create srcFs (Path. (str file "/_header"))))]
          (.write out-file header)
        (FileUtil/copyMerge srcFs (Path. file) srcFs (Path. dest) true conf nil)
        (.close srcFs)

    This works for local files and s3, and it should work for HDFS. Since we’re using s3 and the results are not huge, we use (coalesce 1 true) so that only one part file is written to s3, without that we had issues with too many requests. Could probably use a higher number and find a happy medium, but we just use 1.

  • Why ROXIMITY Selected MongoDB – Compose.io

    When we initially started development of ROXIMITY, I decided to go with MongoDB. There were three reasons for this choice: Geospatial support, redundancy and scalability and a lack of schema. If you are thinking about MongoDB, these are still all valid reasons for considering it and our experience should aid your decision making. read more

  • Capistrano 2 rolling task

    This took a long time to track down, but this will allow rolling deploys or tasks with Capistrano 2.x.

    task :rolling, roles: :web do 
      find_servers_for_task(current_task).each do |s|
        puts roles[:web].clear()
        server s.host, :web
        puts "Deploying to #{s.host}..."

    Ref: https://groups.google.com/forum/#!topic/capistrano/H-tizsMN2Tk

  • AngularJS dynamic filter

    Use this to dynamically pass a filter from a controller or somewhere else as a string. It will use DI to lookup the filter and apply it.

    In the template

    {{row.label | date | dynamic:nameFilter }}

    In the controller

    $scope.nameFilter = 'mycustomerfilter'; 
    app.filter('dynamic', ['$injector', function($injector) {
      return function(text, filter) {
        var result = text;
        if (filter) {
          var f = $injector.get(filter + "Filter");
          if (f) { result = f(text); }
        return result;
  • Run GIT GC in all subdirectories

    Find the release directories and run git gc in each.

    find ./ -maxdepth 1 -type d -exec git --git-dir={}/.git --work-tree={} gc \;
  • Semi Forward Foot Controls for Sportsters ’86 and ’03

    These controls place your feet in between the stock mid position and forward controls.4-020bb_s

  • Short MongoDB Fields with Mongoid

    Need to have shorter field name in MongoDB, but still use readable names in code?

    # Mongoid Timestamps
    include Mongoid::Timestamps::Short
    # fields
    field :aid, as: :application_id, type:String
    # one to one
    embeds_one :address, :store_as => :ad
    # collections
    field :aid, as: :application_id
    belongs_to :application, foreign_key: :aid