Install conscript as git8 needs it:
curl https://raw.githubusercontent.com/n8han/conscript/master/setup.sh | sh
Install git8
cs n8han/giter8
Create basic project structure for sbt
g8 chrislewis/basic-project
... and you'll see a trre directory as below:
[root@nodo1 ejemplo-test-1]# ls -lrt
total 4
drwxr-xr-x. 4 root root 28 Mar 19 10:12 src
-rw-r--r--. 1 root root 354 Mar 19 10:12 build.sbt
drwxr-xr-x. 3 root root 42 Mar 19 10:19 project
drwxr-xr-x. 5 root root 75 Mar 19 10:21 target
Downlaod twitter4j.jar 3.0.3 and place under ./target
Install SBT if not done yet
curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum install sbt
cd ejemplo-test-1
sbt
> compile
> package (produce el *.jar)
> run
[info] Running com.example.ejemplotest1.App
Hello com.example.ejemplo test 1![success] Total time: 0 s, completed Mar 19, 2015 10:22:09 AM
under ./project create assembly.sbt
[root@nodo1 project]# more assembly.sbt
resolvers += Resolver.url("sbt-plugin-releases-scalasbt", url("http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/"))
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
under ./ create build.sbt
[root@nodo1 ejemplo-test-1]# more build.sbt
name := "ejemplo test 1"
organization := "com.example"
version := "0.1.0-SNAPSHOT"
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
scalaVersion := "2.11.2"
crossScalaVersions := Seq("2.10.4", "2.11.2")
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.2.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.2.0"
libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "3.0.3"
initialCommands := "import com.example.ejemplotest1._"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
$ sbt assembly (maybe it needs to be compiled first sbt compile)
$ spark-submit --class com.example1.CollectTweets /opt/spark/ejemplo1/ejemplo-test-1/target/scala-2.11/ejemplo_test_1-assembly-0.1.0-SNAPSHOT.jar XX YY ZZ WW
Big Mac data
Tuesday, March 24, 2015
Friday, March 6, 2015
Spark Streaming - scala Twitter Popular Tags
[root@nodo1 spark]# pwd
/usr/local/spark
[root@nodo1 spark]# bin/spark-submit \
--class org.apache.spark.examples.streaming.TwitterPopularTags \
--master local[2] \
./lib/spark-examples-1.2.1-hadoop2.4.0.jar \
pTNQtr6CSXdu YIDalLhnh5ys BbpliqLm3j5Ilzn GzjjzTmid
/usr/local/spark
[root@nodo1 spark]# bin/spark-submit \
--class org.apache.spark.examples.streaming.TwitterPopularTags \
--master local[2] \
./lib/spark-examples-1.2.1-hadoop2.4.0.jar \
pTNQtr6CSXdu YIDalLhnh5ys BbpliqLm3j5Ilzn GzjjzTmid
Friday, February 6, 2015
Pyspark example: parse, filter and agregation
PYSPARK
file = sc.textFile("hdfs://nodo1/claudio/partesin.csv")
header = file.take(1)[0]
rows = file.filter(lambda line: line != header)
enTuples = rows.map(lambda x: x.split(","))
enKeyValuePairs = enTuples.map(lambda x: (x[16], int(x[18]))).reduceByKey(lambda a,b: a+b).take(10)
file = sc.textFile("hdfs://nodo1/claudio/partesin.csv")
header = file.take(1)[0]
rows = file.filter(lambda line: line != header)
enTuples = rows.map(lambda x: x.split(","))
enKeyValuePairs = enTuples.map(lambda x: (x[16], int(x[18]))).reduceByKey(lambda a,b: a+b).take(10)
Wednesday, February 4, 2015
Spark, scala group by example
our source dataset resides here : http://stat-computing.org/dataexpo/2009/the-data.html
and this is how I worked out to parse the csv file and reduce by airport (column 17)
[root@nodo1 spark]# ./bin/spark-shell --master spark://nodo1:7077
scala> import au.com.bytecode.opencsv.CSVParser
import au.com.bytecode.opencsv.CSVParser
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> val myfile = sc.textFile("hdfs://nodo1/claudio/2008.csv").cache()
15/02/04 16:37:35 INFO storage.MemoryStore: ensureFreeSpace(139132) called with curMem=0, maxMem=311387750
15/02/04 16:37:35 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.9 KB, free 296.8 MB)
myfile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14
scala> myfile.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
parser.parseLine(line).mkString(",")
})
}).take(5).foreach(println)
scala> def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
scala> val withoutHeader: RDD[String] = dropHeader(myfile)
scala> withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
parser.parseLine(line).mkString(",")
})
}).take(5).foreach(println)
scala> withoutHeader.mapPartitions(lines => {
val parser=new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
Array(columns(16)).mkString(",")
})
}).countByValue().toList.sortBy(-_._2).foreach(println)
(ATL,414513)
(ORD,350380)
(DFW,281281)
(DEN,241443)
(LAX,215608)
(PHX,199408)
(IAH,185172)
(LAS,172876)
(DTW,161989)
(SFO,140587)
and this is how I worked out to parse the csv file and reduce by airport (column 17)
[root@nodo1 spark]# ./bin/spark-shell --master spark://nodo1:7077
scala> import au.com.bytecode.opencsv.CSVParser
import au.com.bytecode.opencsv.CSVParser
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> val myfile = sc.textFile("hdfs://nodo1/claudio/2008.csv").cache()
15/02/04 16:37:35 INFO storage.MemoryStore: ensureFreeSpace(139132) called with curMem=0, maxMem=311387750
15/02/04 16:37:35 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.9 KB, free 296.8 MB)
myfile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14
scala> myfile.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
parser.parseLine(line).mkString(",")
})
}).take(5).foreach(println)
scala> def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
scala> val withoutHeader: RDD[String] = dropHeader(myfile)
scala> withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
parser.parseLine(line).mkString(",")
})
}).take(5).foreach(println)
scala> withoutHeader.mapPartitions(lines => {
val parser=new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
Array(columns(16)).mkString(",")
})
}).countByValue().toList.sortBy(-_._2).foreach(println)
(ATL,414513)
(ORD,350380)
(DFW,281281)
(DEN,241443)
(LAX,215608)
(PHX,199408)
(IAH,185172)
(LAS,172876)
(DTW,161989)
(SFO,140587)
data science example: Predicting fly delays
import warnings warnings.filterwarnings('ignore') import sys import random import numpy as np from sklearn import linear_model, cross_validation, metrics, svm from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score from sklearn.ensemble import RandomForestClassifier from sklearn.preprocessing import StandardScaler import pandas as pd import matplotlib.pyplot as plt
%matplotlib inline
Wednesday, January 28, 2015
# Python library imports: numpy, random, sklearn, pandas, etc
yum install python-pip
yum install gcc-c++
pip install pandas
yum install numpy scipy python-matplotlib python-nose
pip install -U scikit-learn
pip install "ipython[notebook]"
for installing PYDOOP, first of all check JDK is installed, then follow the steps below:
$ readlink -f $(which java) /usr/lib/jvm/java-6-oracle/jre/bin/java $ export JAVA_HOME=/usr/lib/jvm/java-6-oracleexport HADOOP_HOME=/opt/hadoopgit clone https://github.com/crs4/pydoop.gitexport LD_LIBRARY_PATH="${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}"python setup.py buildsudo python setup.py install --skip-build
Monday, January 19, 2015
Pain in the culo setting up IPython Centos 6.5
First pain in the ars is that you are tight to python 2.6 as if you upgrade to 2.7 (because the newest versions of ipython, i.e 2.x, are suported only on python 2.7) you break Yum and this is not a good idea. Centos 6.5 comes with python 2.6 and Yum is based on it, can;t be changed for clarification.
Then IPython recomends to install 1.0 if you need to stay on python 2.6. So you have to download the source and install it manually (don't forget to install all the dependencies first):
untar the tarball file and run: python setup.py install
Next step is to start the ipython server listening on any IP as by default it starts listening on localhost:8888
Then IPython recomends to install 1.0 if you need to stay on python 2.6. So you have to download the source and install it manually (don't forget to install all the dependencies first):
untar the tarball file and run: python setup.py install
Next step is to start the ipython server listening on any IP as by default it starts listening on localhost:8888
ipython notebook --ip='*'
the output errors on logging: "TypeError: super() argument 1 must be type, not classobj"
are not relevants, you still may use ipython without issue.
If you are using Vagrant recall to forward the port to get access from the browser
http://127.0.0.1:8888
en voila !!
Subscribe to:
Posts (Atom)