Tuesday, March 24, 2015

Spark Streaming - Using twitter4j package

Install conscript as git8 needs it:
curl https://raw.githubusercontent.com/n8han/conscript/master/setup.sh | sh

Install git8
cs n8han/giter8

Create basic project structure for sbt
g8 chrislewis/basic-project

... and you'll see a trre directory as below:

[root@nodo1 ejemplo-test-1]# ls -lrt
total 4
drwxr-xr-x. 4 root root  28 Mar 19 10:12 src
-rw-r--r--. 1 root root 354 Mar 19 10:12 build.sbt
drwxr-xr-x. 3 root root  42 Mar 19 10:19 project
drwxr-xr-x. 5 root root  75 Mar 19 10:21 target

Downlaod twitter4j.jar 3.0.3 and place under ./target

Install SBT if not done yet
curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum install sbt

cd ejemplo-test-1
sbt
> compile
> package (produce el *.jar)
> run
[info] Running com.example.ejemplotest1.App
Hello com.example.ejemplo test 1![success] Total time: 0 s, completed Mar 19, 2015 10:22:09 AM

under ./project create assembly.sbt

[root@nodo1 project]# more assembly.sbt
resolvers += Resolver.url("sbt-plugin-releases-scalasbt", url("http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/"))
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

under ./ create build.sbt

[root@nodo1 ejemplo-test-1]# more build.sbt
name := "ejemplo test 1"

organization := "com.example"

version := "0.1.0-SNAPSHOT"

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

scalaVersion := "2.11.2"

crossScalaVersions := Seq("2.10.4", "2.11.2")

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.2.0" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.2.0"

libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "3.0.3"

initialCommands := "import com.example.ejemplotest1._"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

$ sbt assembly (maybe it needs to be compiled first sbt compile)

$ spark-submit --class com.example1.CollectTweets /opt/spark/ejemplo1/ejemplo-test-1/target/scala-2.11/ejemplo_test_1-assembly-0.1.0-SNAPSHOT.jar  XX YY ZZ WW

Friday, March 6, 2015

Spark Streaming - scala Twitter Popular Tags

[root@nodo1 spark]# pwd
/usr/local/spark

[root@nodo1 spark]# bin/spark-submit \
--class org.apache.spark.examples.streaming.TwitterPopularTags \
--master local[2]  \
./lib/spark-examples-1.2.1-hadoop2.4.0.jar \
pTNQtr6CSXdu YIDalLhnh5ys BbpliqLm3j5Ilzn GzjjzTmid

Friday, February 6, 2015

Pyspark example: parse, filter and agregation

PYSPARK

file = sc.textFile("hdfs://nodo1/claudio/partesin.csv")
header = file.take(1)[0]
rows = file.filter(lambda line: line != header)
enTuples = rows.map(lambda x: x.split(","))
enKeyValuePairs = enTuples.map(lambda x: (x[16], int(x[18]))).reduceByKey(lambda a,b: a+b).take(10)

Wednesday, February 4, 2015

Spark, scala group by example

our source dataset resides here : http://stat-computing.org/dataexpo/2009/the-data.html

and this is how I worked out to parse the csv file and reduce by airport (column 17)

[root@nodo1 spark]# ./bin/spark-shell --master spark://nodo1:7077
scala> import au.com.bytecode.opencsv.CSVParser
import au.com.bytecode.opencsv.CSVParser

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> val myfile = sc.textFile("hdfs://nodo1/claudio/2008.csv").cache()
15/02/04 16:37:35 INFO storage.MemoryStore: ensureFreeSpace(139132) called with curMem=0, maxMem=311387750
15/02/04 16:37:35 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.9 KB, free 296.8 MB)
myfile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14

scala> myfile.mapPartitions(lines => {
         val parser = new CSVParser(',')
         lines.map(line => {
           parser.parseLine(line).mkString(",")
         })
       }).take(5).foreach(println)

scala> def dropHeader(data: RDD[String]): RDD[String] = {
         data.mapPartitionsWithIndex((idx, lines) => {
           if (idx == 0) {
             lines.drop(1)
           }
           lines
         })
       }

scala> val withoutHeader: RDD[String] = dropHeader(myfile)
scala> withoutHeader.mapPartitions(lines => {
         val parser = new CSVParser(',')
         lines.map(line => {
           parser.parseLine(line).mkString(",")
         })
       }).take(5).foreach(println)

scala> withoutHeader.mapPartitions(lines => {
         val parser=new CSVParser(',')
         lines.map(line => {
           val columns = parser.parseLine(line)
           Array(columns(16)).mkString(",")
         })
       }).countByValue().toList.sortBy(-_._2).foreach(println)

(ATL,414513)
(ORD,350380)
(DFW,281281)
(DEN,241443)
(LAX,215608)
(PHX,199408)
(IAH,185172)
(LAS,172876)
(DTW,161989)

(SFO,140587)


data science example: Predicting fly delays

import warnings
warnings.filterwarnings('ignore')

import sys
import random
import numpy as np

from sklearn import linear_model, cross_validation, metrics, svm
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler

import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

Wednesday, January 28, 2015

# Python library imports: numpy, random, sklearn, pandas, etc

yum install python-pip
yum install gcc-c++
pip install pandas
yum install numpy scipy python-matplotlib python-nose
pip install -U scikit-learn
pip install "ipython[notebook]"
for installing PYDOOP, first of all check JDK is installed, then follow the steps below:
$ readlink -f $(which java)
/usr/lib/jvm/java-6-oracle/jre/bin/java
$ export JAVA_HOME=/usr/lib/jvm/java-6-oracle
export HADOOP_HOME=/opt/hadoop
git clone https://github.com/crs4/pydoop.git
export LD_LIBRARY_PATH="${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}"
python setup.py build
sudo python setup.py install --skip-build

Monday, January 19, 2015

Pain in the culo setting up IPython Centos 6.5

First pain in the ars is that you are tight to python 2.6 as if you upgrade to 2.7 (because the newest versions of ipython, i.e 2.x, are suported only on python 2.7) you break Yum and this is not a  good idea. Centos 6.5 comes with python 2.6 and Yum is based on it, can;t be changed for clarification.

Then IPython recomends to install 1.0 if you need to stay on python 2.6. So you have to download the source and install it manually (don't forget to install all the dependencies first):
untar the tarball file and run: python setup.py install

Next step is to start the ipython server listening on any IP as by default it starts listening on localhost:8888

ipython notebook --ip='*'
the output errors on logging: "TypeError: super() argument 1 must be type, not classobj"
are not relevants, you still may use ipython without issue.
If you are using Vagrant recall to forward the port to get access from the browser
http://127.0.0.1:8888
en voila !!


Sunday, January 18, 2015

Quick trick update python to version 2.7 on Centos 6.5

yum install centos-release-SCL
yum install python27
Then if you want to use if in your shell you would run something like:
scl enable python27 bash

Disclaimer: this is a trick for using python 2.7 in a separate environment, without removing 2.6 because it would break Yum on this centos version.

Thursday, January 15, 2015

Heads up: starting hadoop

if you see this when you starting or stoping hadoop:


[hadoop@dev ~]$ start-dfs.sh 
13/10/25 22:21:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... 
using builtin-java classes where applicable Starting namenodes on [Java HotSpot(TM) 64-Bit Server VM warning: You have loaded library /home/hadoop/2.2.0/lib/native/libhadoop.so.1.0.0 which might have disabled stack guard. The VM will try to fix the stack guard now....... 
sed: -e expression #1, char 6: unknown option to `s' HotSpot(TM): ssh: 
Could not resolve hostname HotSpot(TM): Name or service not known 64-Bit: ssh: 
Could not resolve hostname 64-Bit: 
Name or service not known 

the workaround is to set the following environment variables: 
export HADOOP_HOME=/usr/local/hadoop 
export PATH=$HADOOP_HOME/bin:$PATH
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native 
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"

Cheat sheet for Hadoop , up, up !!!!

[root@hulk sbin]# hadoop version
Hadoop 2.4.1
Subversion http://svn.apache.org/repos/asf/hadoop/common -r 1604318
Compiled by jenkins on 2014-06-21T05:43Z
Compiled with protoc 2.5.0
From source with checksum bb7ac0a3c73dc131f4844b873c74b630
This command was run using /usr/local/hadoop-2.4.1/share/hadoop/common/hadoop-common-2.4.1.jar

Check hadoop is running
[root@hulk sbin]# hadoop dfsadmin -report
List the content of home directory
$ hdfs dfs -ls /user/claudio
Upload file from local file to HDFS
$ hdfs dfs -put songs.txt /user/claudio
Cat the content of the file from HDFS
$ hdfs dfs -cat /user/claudio/songs.txt
Change permissions to file
$ hdfs dfs -chmod 700 /user/claudio/songs.txt
Set the replication factor of the file to 4
$ hdfs dfs -setrep -w 4 /user/claudio/songs.txt
Check the file size
$ hdfs dfs -du -h /user/adam/songs.txt
Create subdirectory in your home directory
$ hdfs dfs -mkdir songs
Move files across
$ hdfs dfs -mv songs.txt songs/
Remove directory from HDFS
$ hdfs dfs -rm -r songs