Tuesday, March 24, 2015

Spark Streaming - Using twitter4j package

Install conscript as git8 needs it:
curl https://raw.githubusercontent.com/n8han/conscript/master/setup.sh | sh

Install git8
cs n8han/giter8

Create basic project structure for sbt
g8 chrislewis/basic-project

... and you'll see a trre directory as below:

[root@nodo1 ejemplo-test-1]# ls -lrt
total 4
drwxr-xr-x. 4 root root  28 Mar 19 10:12 src
-rw-r--r--. 1 root root 354 Mar 19 10:12 build.sbt
drwxr-xr-x. 3 root root  42 Mar 19 10:19 project
drwxr-xr-x. 5 root root  75 Mar 19 10:21 target

Downlaod twitter4j.jar 3.0.3 and place under ./target

Install SBT if not done yet
curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum install sbt

cd ejemplo-test-1
sbt
> compile
> package (produce el *.jar)
> run
[info] Running com.example.ejemplotest1.App
Hello com.example.ejemplo test 1![success] Total time: 0 s, completed Mar 19, 2015 10:22:09 AM

under ./project create assembly.sbt

[root@nodo1 project]# more assembly.sbt
resolvers += Resolver.url("sbt-plugin-releases-scalasbt", url("http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/"))
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

under ./ create build.sbt

[root@nodo1 ejemplo-test-1]# more build.sbt
name := "ejemplo test 1"

organization := "com.example"

version := "0.1.0-SNAPSHOT"

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

scalaVersion := "2.11.2"

crossScalaVersions := Seq("2.10.4", "2.11.2")

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.2.0" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.2.0"

libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "3.0.3"

initialCommands := "import com.example.ejemplotest1._"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

$ sbt assembly (maybe it needs to be compiled first sbt compile)

$ spark-submit --class com.example1.CollectTweets /opt/spark/ejemplo1/ejemplo-test-1/target/scala-2.11/ejemplo_test_1-assembly-0.1.0-SNAPSHOT.jar  XX YY ZZ WW

Friday, March 6, 2015

Spark Streaming - scala Twitter Popular Tags

[root@nodo1 spark]# pwd
/usr/local/spark

[root@nodo1 spark]# bin/spark-submit \
--class org.apache.spark.examples.streaming.TwitterPopularTags \
--master local[2]  \
./lib/spark-examples-1.2.1-hadoop2.4.0.jar \
pTNQtr6CSXdu YIDalLhnh5ys BbpliqLm3j5Ilzn GzjjzTmid

Friday, February 6, 2015

Pyspark example: parse, filter and agregation

PYSPARK

file = sc.textFile("hdfs://nodo1/claudio/partesin.csv")
header = file.take(1)[0]
rows = file.filter(lambda line: line != header)
enTuples = rows.map(lambda x: x.split(","))
enKeyValuePairs = enTuples.map(lambda x: (x[16], int(x[18]))).reduceByKey(lambda a,b: a+b).take(10)

Wednesday, February 4, 2015

Spark, scala group by example

our source dataset resides here : http://stat-computing.org/dataexpo/2009/the-data.html

and this is how I worked out to parse the csv file and reduce by airport (column 17)

[root@nodo1 spark]# ./bin/spark-shell --master spark://nodo1:7077
scala> import au.com.bytecode.opencsv.CSVParser
import au.com.bytecode.opencsv.CSVParser

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> val myfile = sc.textFile("hdfs://nodo1/claudio/2008.csv").cache()
15/02/04 16:37:35 INFO storage.MemoryStore: ensureFreeSpace(139132) called with curMem=0, maxMem=311387750
15/02/04 16:37:35 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.9 KB, free 296.8 MB)
myfile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14

scala> myfile.mapPartitions(lines => {
         val parser = new CSVParser(',')
         lines.map(line => {
           parser.parseLine(line).mkString(",")
         })
       }).take(5).foreach(println)

scala> def dropHeader(data: RDD[String]): RDD[String] = {
         data.mapPartitionsWithIndex((idx, lines) => {
           if (idx == 0) {
             lines.drop(1)
           }
           lines
         })
       }

scala> val withoutHeader: RDD[String] = dropHeader(myfile)
scala> withoutHeader.mapPartitions(lines => {
         val parser = new CSVParser(',')
         lines.map(line => {
           parser.parseLine(line).mkString(",")
         })
       }).take(5).foreach(println)

scala> withoutHeader.mapPartitions(lines => {
         val parser=new CSVParser(',')
         lines.map(line => {
           val columns = parser.parseLine(line)
           Array(columns(16)).mkString(",")
         })
       }).countByValue().toList.sortBy(-_._2).foreach(println)

(ATL,414513)
(ORD,350380)
(DFW,281281)
(DEN,241443)
(LAX,215608)
(PHX,199408)
(IAH,185172)
(LAS,172876)
(DTW,161989)

(SFO,140587)


data science example: Predicting fly delays

import warnings
warnings.filterwarnings('ignore')

import sys
import random
import numpy as np

from sklearn import linear_model, cross_validation, metrics, svm
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler

import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

Wednesday, January 28, 2015

# Python library imports: numpy, random, sklearn, pandas, etc

yum install python-pip
yum install gcc-c++
pip install pandas
yum install numpy scipy python-matplotlib python-nose
pip install -U scikit-learn
pip install "ipython[notebook]"
for installing PYDOOP, first of all check JDK is installed, then follow the steps below:
$ readlink -f $(which java)
/usr/lib/jvm/java-6-oracle/jre/bin/java
$ export JAVA_HOME=/usr/lib/jvm/java-6-oracle
export HADOOP_HOME=/opt/hadoop
git clone https://github.com/crs4/pydoop.git
export LD_LIBRARY_PATH="${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}"
python setup.py build
sudo python setup.py install --skip-build

Monday, January 19, 2015

Pain in the culo setting up IPython Centos 6.5

First pain in the ars is that you are tight to python 2.6 as if you upgrade to 2.7 (because the newest versions of ipython, i.e 2.x, are suported only on python 2.7) you break Yum and this is not a  good idea. Centos 6.5 comes with python 2.6 and Yum is based on it, can;t be changed for clarification.

Then IPython recomends to install 1.0 if you need to stay on python 2.6. So you have to download the source and install it manually (don't forget to install all the dependencies first):
untar the tarball file and run: python setup.py install

Next step is to start the ipython server listening on any IP as by default it starts listening on localhost:8888

ipython notebook --ip='*'
the output errors on logging: "TypeError: super() argument 1 must be type, not classobj"
are not relevants, you still may use ipython without issue.
If you are using Vagrant recall to forward the port to get access from the browser
http://127.0.0.1:8888
en voila !!