Dowemo
0 0 0 0


Question:

I'm new to spark. I want to perform some operations on particular data in a CSV record.

I'm trying to read a CSV file and convert it to RDD. My further operations are based on the heading provided in CSV file.

(From comments) This is my code so far:

final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { 
    @Override public Iterable<String> call(String s) { 
    return Arrays.asList(EOL.split(s)); 
    } 
});
final String heading=lines.first().toString();

I can get the header values like this. I want to map this to each record in CSV file.

final String[] header=heading.split(" "); 

I can get the header values like this. I want to map this to each record in CSV file.

In java I’m using CSVReader record.getColumnValue(Column header) to get the particular value. I need to do something similar to that here.


Best Answer:


We can use the new DataFrameRDD for reading and writing the CSV data. There are few advantages of DataFrameRDD over NormalRDD:

  • DataFrameRDD are bit more faster than NormalRDD since we determine the schema and which helps to optimize a lot on runtime and provide us with significant performance gain.
  • Even if the column shifts in CSV it will automatically take the correct column as we are not hard coding the column number which was present in reading the data as textFile and then splitting it and then using the number of column to get the data.
  • In few lines of code you can read the CSV file directly.
  • You will be required to have this library: Add it in build.sbt

    libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"

    Spark Scala code for it:

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val csvInPath = "/path/to/csv/abc.csv"
    val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
    //format is for specifying the type of file you are reading
    //header = true indicates that the first line is header in it

    To convert to normal RDD by taking some of the columns from it and

    val rddData = df.map(x=>Row(x.getAs("colA")))
    //Do other RDD operation on it

    Saving the RDD to CSV format:

    val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
    aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")

    Since the header is set to true we will be getting the header name in all the output files.




    Copyright © 2011 Dowemo All rights reserved.    Creative Commons   AboutUs