and this is how I worked out to parse the csv file and reduce by airport (column 17)
[root@nodo1 spark]# ./bin/spark-shell --master spark://nodo1:7077
scala> import au.com.bytecode.opencsv.CSVParser
import au.com.bytecode.opencsv.CSVParser
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> val myfile = sc.textFile("hdfs://nodo1/claudio/2008.csv").cache()
15/02/04 16:37:35 INFO storage.MemoryStore: ensureFreeSpace(139132) called with curMem=0, maxMem=311387750
15/02/04 16:37:35 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.9 KB, free 296.8 MB)
myfile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14
scala> myfile.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
parser.parseLine(line).mkString(",")
})
}).take(5).foreach(println)
scala> def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
scala> val withoutHeader: RDD[String] = dropHeader(myfile)
scala> withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
parser.parseLine(line).mkString(",")
})
}).take(5).foreach(println)
scala> withoutHeader.mapPartitions(lines => {
val parser=new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
Array(columns(16)).mkString(",")
})
}).countByValue().toList.sortBy(-_._2).foreach(println)
(ATL,414513)
(ORD,350380)
(DFW,281281)
(DEN,241443)
(LAX,215608)
(PHX,199408)
(IAH,185172)
(LAS,172876)
(DTW,161989)
(SFO,140587)
No comments:
Post a Comment