PYSPARK
file = sc.textFile("hdfs://nodo1/claudio/partesin.csv")
header = file.take(1)[0]
rows = file.filter(lambda line: line != header)
enTuples = rows.map(lambda x: x.split(","))
enKeyValuePairs = enTuples.map(lambda x: (x[16], int(x[18]))).reduceByKey(lambda a,b: a+b).take(10)
Friday, February 6, 2015
Wednesday, February 4, 2015
Spark, scala group by example
our source dataset resides here : http://stat-computing.org/dataexpo/2009/the-data.html
and this is how I worked out to parse the csv file and reduce by airport (column 17)
[root@nodo1 spark]# ./bin/spark-shell --master spark://nodo1:7077
scala> import au.com.bytecode.opencsv.CSVParser
import au.com.bytecode.opencsv.CSVParser
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> val myfile = sc.textFile("hdfs://nodo1/claudio/2008.csv").cache()
15/02/04 16:37:35 INFO storage.MemoryStore: ensureFreeSpace(139132) called with curMem=0, maxMem=311387750
15/02/04 16:37:35 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.9 KB, free 296.8 MB)
myfile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14
scala> myfile.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
parser.parseLine(line).mkString(",")
})
}).take(5).foreach(println)
scala> def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
scala> val withoutHeader: RDD[String] = dropHeader(myfile)
scala> withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
parser.parseLine(line).mkString(",")
})
}).take(5).foreach(println)
scala> withoutHeader.mapPartitions(lines => {
val parser=new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
Array(columns(16)).mkString(",")
})
}).countByValue().toList.sortBy(-_._2).foreach(println)
(ATL,414513)
(ORD,350380)
(DFW,281281)
(DEN,241443)
(LAX,215608)
(PHX,199408)
(IAH,185172)
(LAS,172876)
(DTW,161989)
(SFO,140587)
and this is how I worked out to parse the csv file and reduce by airport (column 17)
[root@nodo1 spark]# ./bin/spark-shell --master spark://nodo1:7077
scala> import au.com.bytecode.opencsv.CSVParser
import au.com.bytecode.opencsv.CSVParser
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> val myfile = sc.textFile("hdfs://nodo1/claudio/2008.csv").cache()
15/02/04 16:37:35 INFO storage.MemoryStore: ensureFreeSpace(139132) called with curMem=0, maxMem=311387750
15/02/04 16:37:35 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.9 KB, free 296.8 MB)
myfile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14
scala> myfile.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
parser.parseLine(line).mkString(",")
})
}).take(5).foreach(println)
scala> def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
scala> val withoutHeader: RDD[String] = dropHeader(myfile)
scala> withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
parser.parseLine(line).mkString(",")
})
}).take(5).foreach(println)
scala> withoutHeader.mapPartitions(lines => {
val parser=new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
Array(columns(16)).mkString(",")
})
}).countByValue().toList.sortBy(-_._2).foreach(println)
(ATL,414513)
(ORD,350380)
(DFW,281281)
(DEN,241443)
(LAX,215608)
(PHX,199408)
(IAH,185172)
(LAS,172876)
(DTW,161989)
(SFO,140587)
data science example: Predicting fly delays
import warnings warnings.filterwarnings('ignore') import sys import random import numpy as np from sklearn import linear_model, cross_validation, metrics, svm from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score from sklearn.ensemble import RandomForestClassifier from sklearn.preprocessing import StandardScaler import pandas as pd import matplotlib.pyplot as plt
%matplotlib inline
Subscribe to:
Comments (Atom)