28
Pré-processamento de Grandes Dados com Apache Spark Rio Big Data Meetup - Novembro 2015 Felipe Almeida ([email protected] | http://queirozf.com ) https://github.com/queirozfcom/rio-big-data-meetup-nov-2015

Pré processamento de grandes dados com Apache Spark

Embed Size (px)

Citation preview

Page 1: Pré processamento de grandes dados com Apache Spark

Pré-processamento de Grandes Dados com Apache SparkRio Big Data Meetup - Novembro 2015

Felipe Almeida ([email protected] | http://queirozf.com)

https://github.com/queirozfcom/rio-big-data-meetup-nov-2015

Page 2: Pré processamento de grandes dados com Apache Spark

Estrutura da palestra

● Introdução● O problema● Spark● Spark Dataframes● Spark UDFs● Spark ml● Estudo de caso

2/28

Page 3: Pré processamento de grandes dados com Apache Spark

Introdução

● Tarefas de mineração de dados e aprendizado de máquina só são possíveis após uma etapa de pré-processamento.

● Pergunta: como pré-processar dados da ordem de muitos GB ou TB?● Hadoop e Spark

● As versões recentes do Spark adicionaram recursos úteis para essas tarefas.

3/28

Page 4: Pré processamento de grandes dados com Apache Spark

O problema

● Vamos tratar especificamente do problema de extração de features○ Isto é, transformar uma massa de dados em um conjunto de

dados onde cada amostra é representada por um vetor de features

4/28

Page 5: Pré processamento de grandes dados com Apache Spark

Spark

● Framework para computação em cluster, feito em Scala.

● Nasceu para resolver dois problemas do Hadoop:○ Necessidade de persistir em disco após cada job○ Difícil utilização para tarefas interativas

● Abstração principal: RDD○ Um dataset distribuído nos nós do cluster

● Suporta várias operações além das famosas map e reduce5/28

Page 6: Pré processamento de grandes dados com Apache Spark

Spark

6/28

carrega um arquivo texto do HDFS

cada linha é transformada em uma lista de palavras

exemplo de programa Spark usando Python (wordcount)cada palavra é transformada em uma tupla

(palavra,1)

os pares com a mesma chave (i.e. mesma palavra) são agrupados e somados

Page 7: Pré processamento de grandes dados com Apache Spark

Spark DataFrames

● O módulo spark-sql adicionou o conceito de DataFrames○ Como usado na linguagem R ou na biblioteca pandas (Python)

● Um DataFrame é comumente utilizado para representar um dataset:○ Linhas representam cada amostra (ponto)○ Colunas representam cada feature

● Também é um RDD○ Do tipo RDD[sql.Row]○ Suporta também todas as operações de RDD

7/28

Page 8: Pré processamento de grandes dados com Apache Spark

Spark DataFrames

● Esquema de um DataFrame NxD (N amostras, D features)

8/28

feature #1 feature #2 feature #3 ... feature #D

amostra #1 ...

amostra #2 ...

amostra #3 ...

... ... ... ... ... ...

amostra #N ...

Page 9: Pré processamento de grandes dados com Apache Spark

Spark DataFrames

// dataframes moram neste módulo

import org.apache.spark.sql._

// construindo um sqlcontext a partir do sparkcontext

val sqlContext = new SQLContext(sc)

// carregando dados do S3 (schema inferido automaticamente)

val df = sqlContext.read.json("s3://path/to/dataset")

// select from dataframe where overall > 4.0

df.where(df("overall") > 4.0) 9/28

Page 10: Pré processamento de grandes dados com Apache Spark

Spark DataFrames - transformações

Exemplo: feature scaling (normalização de features)

DataFrame original DataFrame modificado

aplicação da transformação

de normalização

10/28

feature #1 feature #2

amostra #1 2 400

amostra #2 4 200

amostra #3 6 400

feature #1 feature #2

amostra #1 0 1

amostra #2 0.5 0

amostra #3 1 1

Page 11: Pré processamento de grandes dados com Apache Spark

Spark DataFrames - transformações

Exemplo: tokenização (separar texto em tokens)

DataFrame original DataFrame modificado

11/28

feature #1

amostra #1 “olá mundo!!”

amostra #2 “spark para grandes dados”

amostra #3 “usando spark”

aplicação da transformação de tokenização

feature #1’

amostra #1 [“olá”, “mundo”]

amostra #2 [“spark”, “para”, “grandes”, “dados”]

amostra #3 [“usando”,”spark”]

Page 12: Pré processamento de grandes dados com Apache Spark

Spark UDFs

● UDFs: User-defined functions○ São funções que atuam sobre uma coluna de um DataFrame

● Note a diferença com relações a funções como filter, map, reduce que atuam sobre uma linha de um DataFrame

12/28

Page 13: Pré processamento de grandes dados com Apache Spark

Spark UDFs

● UDFs podem ser usadas para criar colunas em um DataFrame:

import org.apache.spark.sql.functions.udf

// uma UDF que retorna o tamanho de uma string

val len = udf { str:String => str.length }

// aplicar a UDF len à coluna texto e criar uma

// nova coluna com o resultado

val df1 = df.withColumn("tamanho", len(df("texto"))

13/28

Page 14: Pré processamento de grandes dados com Apache Spark

Spark UDFs

Resultado da aplicação da UDF len :

DataFrame df DataFrame df1

14/28

texto

amostra #1 “foo”

amostra #2 “foo bar”

amostra #3 “oi”

aplicação da UDF len

tamanho

amostra #1 3

amostra #2 7

amostra #3 2

Page 15: Pré processamento de grandes dados com Apache Spark

Spark ml

15/28

● spark.ml é um componente do módulo spark-mllib

● Disponibiliza, entre outras coisas, várias transformações baseadas em UDFs para manipular um DataFrame

● Entre elas:○ RegexTokenizer○ OneHotEncoder○ StringIndexer○ MinMaxScaler○ Bucketizer

Page 16: Pré processamento de grandes dados com Apache Spark

Spark ml

● RegexTokenizer (String => Vector[String])○ Tokeniza strings usando uma regex como delimitador

DataFrame df DataFrame df1

16/28

texto

amostra #1 “foo,bar”

amostra #2 “bar baz”

amostra #3 “foo.bar.baz”

aplicação do RegexTokenizer usando a regex /\s+|,|\./ como delimitador

tokens

amostra #1 [“foo”,”bar”]

amostra #2 [“bar”,”baz”]

amostra #3 [“foo”,”bar”,”baz”]

Page 17: Pré processamento de grandes dados com Apache Spark

Spark ml

● StringIndexer (String => Double)○ Transforma valores únicos em índices numéricos

DataFrame df DataFrame df1

17/28

feature #1

amostra #1 “amarelo”

amostra #2 “azul”

amostra #3 “verde”

amostra #4 “amarelo”

aplicação do StringIndexer

feature #1 mod

amostra #1 0.0

amostra #2 1.0

amostra #3 2.0

amostra #4 0.0

Page 18: Pré processamento de grandes dados com Apache Spark

Spark ml

● OneHotEncoder (Double => Vector[Double])○ Transforma índices de features em um domínio em um vetor one-hot

DataFrame df DataFrame df1

18/28

aplicação do OneHotEncoder

feature #1

amostra #1 0.0

amostra #2 1.0

amostra #3 2.0

amostra #4 0.0

feature #1 mod

amostra #1 [1.0,0.0,0.0]

amostra #2 [0.0,1.0,0.0]

amostra #3 [0.0,0.0,1.0]

amostra #4 [1.0,0.0,0.0]

Page 19: Pré processamento de grandes dados com Apache Spark

Spark ml

● MinMaxScaler (Vector[Double] => Vector[Double])○ Normaliza os valores de uma coluna para o range [0,1]

DataFrame df DataFrame df1

19/28

aplicação do MinMaxScaler

feature #1

amostra #1 [2.0,800]

amostra #2 [4.0,1200]

amostra #3 [8.0,600]

amostra #4 [16.0,200]

feature #1

amostra #1 [0.0,0.6]

amostra #2 [0.14,1.0]

amostra #3 [0.43,0.4]

amostra #4 [1.0,0.0]

Page 20: Pré processamento de grandes dados com Apache Spark

Spark ml

● Bucketizer (Double => Double)○ Discretiza features contínuas, usando limites

DataFrame df DataFrame df1

20/28

aplicação do Bucketizer usando os limites 100, 200, 500, 2000 e 5000

feature #1

amostra #1 329.23

amostra #2 123.38

amostra #3 3289.57

amostra #4 1982.376

feature #1’

amostra #1 1.0

amostra #2 0.0

amostra #3 3.0

amostra #4 2.0

Page 21: Pré processamento de grandes dados com Apache Spark

Estudo de caso: Reviews Amazon

● Um estudo de caso mostrando várias etapas possíveis para um dataset real

● Não é um processo completo, apenas alguns passos para motivar os ouvintes e dar uma ideia das quantidades envolvidas○ Usa UDFs e também o módulo spark-mllib

● Código completo em https://github.com/queirozfcom/rio-big-data-meetup-nov-2015○ Incluindo a criação dos clusters usando AWS EMR

21/28

Page 22: Pré processamento de grandes dados com Apache Spark

O dataset

Cada elemento é uma review de um produto na Amazon.com

● ~ 55 Gb (após desempacotamento)

● ~ 82 Milhões de Reviews

● Maio/1996 - Julho/2014

● Formato Json

● Link: http://jmcauley.ucsd.edu/data/amazon/22/28

Page 23: Pré processamento de grandes dados com Apache Spark

O dataset

● Elemento de exemplo:{ "reviewerID": "A2SUAM", "asin": "00000", "reviewerName": "J. McDonald", "helpful": [2, 3], "reviewText": "I bought this for ...", "overall": 5.0, "summary": "Heavenly Highway!", "unixReviewTime": 1252800000}

23/28

Page 24: Pré processamento de grandes dados com Apache Spark

Passos do Pipeline de exemplo

1. Retirar linhas com elementos null2. Adicionar features para:

a. O tamanho do texto do reviewText e do summaryb. Período do dia em que a review for feita (dia/noite)c. Período da semana em que a review foi feita (dia útil/fim

de semana)3. Juntar todo o texto corrido (reviewText + summary) em um único

campo, passar para minúsculas, tokenizar e transformar em um vetor de frequências (TF)

4. Normalizar a nota (atributo overall)5. Transformar o atributo helpful em um float e normalizar

24/28

Page 25: Pré processamento de grandes dados com Apache Spark

Dicas Gerais / Lições aprendidas

● É melhor criar uma máquina EC2, baixar o dataset para ela e, de lá, fazer upload para o S3○ Se estiverem na mesma área, o processo é muito mais rápido

● Se usar AWS EMR, cuidado para não esquecer o cluster ligado!○ Use --auto-terminate

● Habilitar dynamicAllocation

● Recomendo a API Scala pois as outras às vezes ficam defasadas25/28

Page 27: Pré processamento de grandes dados com Apache Spark

FIM

27/28

Page 28: Pré processamento de grandes dados com Apache Spark

Agradecimentos

● Professor Alexandre Assis DSc. (orientador do trabalho original)○ PESC/COPPE UFRJ

● Rosângela Oliveira (colega no trabalho original)○ [email protected]

● Julian McAuley PhD. (fornecimento do dataset)

28/28