16
+ Tecnologia Web Edição - 2017/01 Edição Big Data Prof. Altigran Soares da Silva MapReduce e Hadoop Básico Baseado nos Slides do Professor Jimmy Lin 1 + Problema Típico de Big Data n Iterar sobre um grande número de registros n Extrair “algo” de interesse de cada registro (MAP) n Distribuir e ordenar resultados intermediários n Agregar resultados intermediários (REDUCE) n Gerar a saída final Ideia geral: prover um abstração funcional destas duas operações. (Dean and Ghemawat, OSDI 2004) + MapReduce n Programador especifica duas funções: map (k 1 , v 1 ) [<k 2 , v 2 >] reduce (k 2 , [v 2 ]) [<k 3 , v 3 >] n Todos os valores com a mesta chave são enviados ao mesmo reducer n O arcabouço de execução cuida de todo o resto. map map map map Distribui e organiza valores agregados reduce reduce reduce k 1 k 2 k 3 k 4 k 5 k 6 v 1 v 2 v 3 v 4 v 5 v 6 b a 1 2 c c 3 6 a c 5 2 b c 7 8 a 1 5 b 2 7 c 2 3 6 8 r 1 s 1 r 2 s 2 r 3 s 3

Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

  • Upload
    donga

  • View
    219

  • Download
    0

Embed Size (px)

Citation preview

Page 1: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+

Tecnologia Web Edição - 2017/01Edição Big Data Prof. Altigran Soares da Silva

MapReduce e Hadoop BásicoBaseado nos Slides do Professor Jimmy Lin 1

+Problema Típico de Big Data

nIterar sobre um grande número de registros

nExtrair “algo” de interesse de cada registro (MAP)

nDistribuir e ordenar resultados intermediários

nAgregar resultados intermediários (REDUCE)

nGerar a saída final

Ideia geral: prover um abstraçãofuncional destas duas operações.

(Dean and Ghemawat, OSDI 2004)

+MapReduce

nProgramador especifica duas funções:map (k1, v1) → [<k2, v2>]reduce (k2, [v2]) → [<k3, v3>]n Todos os valores com a mesta chave são enviados ao mesmo

reducer

nO arcabouço de execução cuida de todo o resto.

mapmap map map

Distribui e organiza valores agregados

reduce reduce reduce

k1 k2 k3 k4 k5 k6v1 v2 v3 v4 v5 v6

ba 1 2 c c3 6 a c5 2 b c7 8

a 1 5 b 2 7 c 2 3 6 8

r1 s1 r2 s2 r3 s3

Page 2: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+MapReduce “Runtime”

n Escalonador: Atribui tarefas map e reduce aos nodos workers

n “Distribuição” : Move tarefas para os nodos onde estão os dados

n Sincronização: Reúne, organiza e ordena dados intermediários

n Erros e Falhas: Detecta falhas nos workers e os re-ininicia

n Tudo ocorre em cima de um sistema de arquivos distribuído

+MapReduce

nEventualmente, pode-se especificar:

partition (k’, no. de partições) → partição para k’n Divide o espaço de chaves para operações reduce paralelasn Ex: hash(k’) mod n

combine (k’, v’) → <k’, v’>*n ”Mini-reducers” que rodam em memória depois de uma fase de

mapn Otimização para reduzir o tráfego na rede

+MapReduce com Partition & Combine

n Programadores devem specificar:n map (k, v) → <k’, v’>*

n reduce (k’, v’) → <k’, v’>*

n Todos os valores com a mesma chave são reduzidos juntos

n Opcionalmente, pode-se também especificar:n partition (k’, número de partições → partição para k’

n Geralmente é um simples hash sobre a chave: hash(k’) mod n

n Divide o espaço de chaves para reduces paralelos

n combine (k’, v’) → <k’, v’>*

n Mini-reducers que rodam em memória depois da fase map

n Usados como otimização para reduzir o trafégo na rede

n O framework de execução cuida do resto …

combinecombine combine combine

ba 1 2 c 9 a c5 2 b c7 8

partition partition partition partition

mapmap map map

k1 k2 k3 k4 k5 k6v1 v2 v3 v4 v5 v6

ba 1 2 c c3 6 a c5 2 b c7 8

Shuffle and Sort: aggregate values by keys

reduce

reduce

reduce

a 1 5 b 2 7 c 2 9 8

r1 s1 r2 s2 r3 s3

Page 3: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+O "Resto”n O framework de execução cuida de todo o resto

n Escalonamento: atribuir tarefas de map e reduce aos workers

n ”Distribuição de Dados”: mover os processos para os dados

n Sincronização: agrupa, ordena e distribui dados intermediários

n Erros e falhas: Detecta falhas dos workers e reinicia

n Controle limitado sobre o fluxo de dados e execuçãon Todo o algoritmo deve estar expresso em m,r,c,p

n Não se sabe nem se controla:n Onde os mappers e reducers rodam

n Quando um mapper ou reducer inicia e termina

n Que entrada um mapper em particular está processando

n Que chave intermediária um reducer em particular está processando

+Sincronização

nEm MR a sincronização é conseguida através de uma barreira ente as fases map e reduce.

nPares intermediários de chave-valor devem ser agrupados pela chave

nShuffle & Sort: n Grande processo distribuído de ordenação

n Envolve todos os nodos do cluster que executam tarefas map e todos os que executam tarefas reduce.

n Envolve copia de dados intermediários pela rede

10

+Sincronização

nPara garantir que todos os valores associados a uma chave serão reunidos, computação no reduce só pode iniciar depois quen (1) Todos os mappers tenham emitido os pares desta chave

n (2) Todos os pares tenham sido ordenados e distribuídos

nO reducer recebe todos os valores associados a uma mesma chave ao mesmo tempo.

nNo entanto, os mappers podem copiar os pares intermediários assim que possível.

11

combinecombine combine combine

ba 1 2 c 9 a c5 2 b c7 8

partition partition partition partition

mapmap map map

k1 k2 k3 k4 k5 k6v1 v2 v3 v4 v5 v6

ba 1 2 c c3 6 a c5 2 b c7 8

Shuffle and Sort: aggregate values by keys

reduce reduce reduce

a 1 5 b 2 7 c 2 9 8

r1 s1 r2 s2 r3 s3

Barreira deSincronização

Page 4: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+Contador de Palavras

Map(String docid, String text):for each word w in text:

Emit(w, 1);

Reduce(String term, Iterator<Int> values):int sum = 0;for each v in values:

sum += v;Emit(term, sum);

+Implementações MapReduce

n Google: implementação proprietária em C++n Bindings em Java, Python

n Hadoop: implementação open-source em Javan Desenvolvida pela Yahoo, agora um projeto Apache

n Usado em produção: Yahoo, FB, Twitter, LinkedIn, Netflix, …

n Plataforma de facto para processamento de big data

n Ecosystema de sofware amplo e em expansão

split 0split 1split 2split 3split 4

worker

worker

worker

worker

worker

Master

UserProgram

outputfile 0

outputfile 1

(1) submit

(2) schedule map (2) schedule reduce

(3) read(4) local write

(5) remote read(6) write

Inputfiles

Mapphase

Intermediate files(on local disk)

Reducephase

Outputfiles

Adapted from (Dean and Ghemawat, OSDI 2004)

+Sistema de Arquivos Distribuído

nGFS (Google File System):n Google MapReduce

nHDFS (Hadoop Distributed File System) nHadoop

Page 5: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+Sistema de Arquivos Distribuído

nWorkers movidos para onde estão os dadosn Dados são armazenados nos discos locais dos nodos do clustern Inicia o workers no nó que contém o dado local

nPorque?n Dados possivelmente não cabem RAMn Acesso ao disco é lento, mas o disk throughput é razoável

+GFS: Hipóteses

n Commodity hardware ao invés hardware caro: Scale “out”, not “up”

n Altas taxas de falhas de componentes: Componentes baratos falham com frequência

n Existe um número pequeno de grandes arquivos a serem processados.

n Arquivos são para escrita, principalmente append

n Dados são lidos de longas streamsn Não há acessos aleatórios a disco

GFS slides adapted from material by (Ghemawat et al., SOSP 2003)

+GFS: Decisões de Projeto

nArquivos armazenados em chunksn Tamanho fixo (64MB)

nConfiabilidade por replicaçãon Cada chunk replicado em 3+ chunkservers

nNó master único: coordena acesso e mantém meta-dados

+GFS vs. HDFS

nMesmas características básicas

nDiferenças na terminologia:n GFS master = Hadoop namenoden GFS chunkservers = Hadoop datanodes

nModelo de consistência para appends em arquivo

n Implementação

nPerformance

Page 6: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+

Adapted from (Ghemawat et al., SOSP 2003)

(file name, block id)

(block id, block location)

instructions to datanode

datanode state(block id, byte range)

block data

HDFS namenode

HDFS datanode

Linux file system

HDFS datanode

Linux file system

File namespace/foo/bar

block 3df2

Application

HDFS Client

Arquitetura HDFS+

Responsabilidades dos Namenodes

n Gerenciamento do sistema de arquivon Guarda a estutura de arquivos/diretórios, metadados, mapeamento arquivo-

bloco, permissão de acesso, etc.

n Coordena as operações de arquivon Direciona clientes para os datanodes para leituras e escritas

n Não há movimento de dados através do namenode!

n Manutenção geral:n Comunicação periódica com os datanodes

n Replicação e rebalanceamento de blocos

n Garbage collection

+Em resumo …

datanode daemon

Linux file system

tasktracker

slave node

datanode daemon

Linux file system

tasktracker

slave node

datanode daemon

Linux file system

tasktracker

slave node

namenode

namenode daemon

job submission node

jobtracker

Alternativa: YARN – veremos depois

+RDBMS vs. MapReduce

n Giga a Terabytes

n Interativo e Batch

n Muitos updates e reads

n Esquema estático

n Alto grau de integridade

n ACID

n Tera a Petabytes

n Batch

n Poucos updates, muitos reads

n Esquema irregular e dinâmico

n Baixo grau de integridade

n Sem ACID

RDBMS MapReduce

24

Page 7: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+Exemplo: Contador de Palavras

n Considere um grande arquivo de palavras, sendo que cada palavra ocorre em um linha

n Deseja-se contar o número de vezes em que cada palava distinta ocorre no arquivo

n Exemplo de aplicação: analisar o log de um servidor Web para determinar URLs populares

+Contador de Palavras (2)

n Caso 1: O arquivo inteiro cabe na memória

n Caso 2: O arquivo não cabe na memória, mas todos os pares <palavra, contador> cabem

n Case 3: O arquivo está em disco, mas a memória não é suficiente para armazenar os pares

nsort datafile | uniq –c

+Contador de Palavras (3)

n Para dificultar um pouco, suponha que temos uma grande coleção de documentos

n Contar o número de vezes que cada palavra distinta ocorre na coleçãon words(docs/*) | sort | uniq –c

n words : dado um arquivo, gera uma lista das palavras nele contidas, uma por linha

n Considerando a arquitetura de nodos, o problema pode ser resolvido usando MapReducen O problema e naturalmente paralelizável

+MapReduce: Passo “Map”

vk

k v

k vmap

vk

vk

…k v

map

EntradaPares chave-valor

IntermediárioPares chave-valor

k v

Chaves não são chaves “únicas”

map

Page 8: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+MapReduce: Passo “Reduce”

k v

k v

k v

k v

IntermediárioPares chave-valor

group

reduce

reducek v

k v

k v

k v

k v

k v v

v v

Grupos chave-valorSaída Pares chave-valor

reduce

+MapReduce

n Entrada: um conjunto de pares chave-valor

n O programador fornece duas funçõesn map(k,v) à list(k1,v1)

n reduce(k1, list(v1)) à v2

n (k1,v1) para chave-valor intermediário

n A saída é um conjunto de pares chave-valor (k1,v2)

+Contador de Palavras

Map(String docid, String text):for each word w in text:

Emit(w, 1);

Reduce(String term, Iterator<Int> values):int sum = 0;for each v in values:

sum += v;Emit(term, sum);

+Execução Distribuída

UserProgram

Worker

Worker

Master

Worker

Worker

Worker

fork fork fork

assignmap

assignreduce

readlocalwrite

remoteread,sort

OutputFile 0

OutputFile 1

writeSplit 0Split 1Split 2

Input Data

Page 9: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+Fluxo de Dados

n A entrada e a saída final são armazenadas no sistema de arquivos distribuídon O escalonador tende escalonar tarefas “map” nodos “próximos” ao local de

armazenamento físico dos dados de entrada.

n Resultados intermediários são armazenados no sistema de arquivos locais onde rodam os workers de map e reduce.

n Muitas vezes, a saída serve de entrada para outra tarefa MapReduce

+Coordenação

nEstruturas de dados do Mastern Estado da tarefa: (livre, em execução, completada)

n Tarefas livres são escalonadas quando os Workes ficam disponíveis

n Quando a tarefa map termina, o Master recebe o tamanho e a localização de seus arquivos intermediários.

n Essa informação é enviada aos reducers

nO Master “pinga” os workers periodicamente para detectar falhas.

+Falhas

n Falha em um Map workern Tarefa Map completada ou em execução no worker é re-setada para livre

n Workers Reduce são notificados quando a tarefa é re-escalonada para outro worker

n Falha em um Reduce workern Somente tarefas em execução são re-setadas para livre

n Falha no Master n A tarefa MapReduce é abortada e o cliente notificado

+Quantas tarefas Map e Reduce?

nM tarefas map , R tarefas reduce

nRegra prática:n Usar M e R muito maior que o número de nós no cluster

n Um chunk de arquivo por tarefa map é comum

n Melhor o balanceamento dinâmico da carga e acelera a recuperação em caso de falha

nGeramente, R é menor que M, porque a saída é distribuída entre R arquivos

Page 10: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+Combinadores

n Em geral, uma tarefa map produzirá muitos pares (k,v1), (k,v2), … para a mesma chave kn Ex., palavras frequentes no contador de palavras

n É possível economizar tráfego na rede fazendo uma pré-agregação no mappern combine(k1, list(v1)) à v2n Em geral é a mesma combinação usada no reducen Funciona apelas se a função reduce é comutativa e associativa

+Função de Partição

n As entradas para o map são criadas a partir de divisões contíguas no arquivo de entrada

n Para o reduce, é necessário garantir que os pares com a mesma chave intermediária serão processados pelo mesmo worker

n O sistema usa um função default de particionamento ex., hash(key) mod R

n Às vezes pode ser útil substituí-la por uma função específican Ex. hash(hostname(URL)) mod R garante que todas as URLs de um mesmo host

estarão na mesma saída

+Hadoop Básico

39

40

This is the final output: the maximum global temperature recorded in each year.

The whole data flow is illustrated in Figure 2-1. At the bottom of the diagram is a Unixpipeline, which mimics the whole MapReduce flow and which we will see again later inthis chapter when we look at Hadoop Streaming.

Figure 2-1. MapReduce logical data flow

Java MapReduceHaving run through how the MapReduce program works, the next step is to express itin code. We need three things: a map function, a reduce function, and some code to runthe job. The map function is represented by the Mapper class, which declares an abstractmap() method. Example 2-3 shows the implementation of our map function.

Example 2-3. Mapper for the maximum temperature exampleimport java.io.IOException;

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93);

24 | Chapter 2: MapReduce

if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } }}

The Mapper class is a generic type, with four formal type parameters that specify theinput key, input value, output key, and output value types of the map function. For thepresent example, the input key is a long integer offset, the input value is a line of text,the output key is a year, and the output value is an air temperature (an integer). Ratherthan using built-in Java types, Hadoop provides its own set of basic types that are op‐timized for network serialization. These are found in the org.apache.hadoop.io pack‐age. Here we use LongWritable, which corresponds to a Java Long, Text (like JavaString), and IntWritable (like Java Integer).

The map() method is passed a key and a value. We convert the Text value containingthe line of input into a Java String, then use its substring() method to extract thecolumns we are interested in.

The map() method also provides an instance of Context to write the output to. In thiscase, we write the year as a Text object (since we are just using it as a key), and thetemperature is wrapped in an IntWritable. We write an output record only if the tem‐perature is present and the quality code indicates the temperature reading is OK.

The reduce function is similarly defined using a Reducer, as illustrated in Example 2-4.

Example 2-4. Reducer for the maximum temperature exampleimport java.io.IOException;

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); }}

Analyzing the Data with Hadoop | 25

Classe Mapper declara um método asbstrato map

Page 11: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

41

This is the final output: the maximum global temperature recorded in each year.

The whole data flow is illustrated in Figure 2-1. At the bottom of the diagram is a Unixpipeline, which mimics the whole MapReduce flow and which we will see again later inthis chapter when we look at Hadoop Streaming.

Figure 2-1. MapReduce logical data flow

Java MapReduceHaving run through how the MapReduce program works, the next step is to express itin code. We need three things: a map function, a reduce function, and some code to runthe job. The map function is represented by the Mapper class, which declares an abstractmap() method. Example 2-3 shows the implementation of our map function.

Example 2-3. Mapper for the maximum temperature exampleimport java.io.IOException;

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93);

24 | Chapter 2: MapReduce

if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } }}

The Mapper class is a generic type, with four formal type parameters that specify theinput key, input value, output key, and output value types of the map function. For thepresent example, the input key is a long integer offset, the input value is a line of text,the output key is a year, and the output value is an air temperature (an integer). Ratherthan using built-in Java types, Hadoop provides its own set of basic types that are op‐timized for network serialization. These are found in the org.apache.hadoop.io pack‐age. Here we use LongWritable, which corresponds to a Java Long, Text (like JavaString), and IntWritable (like Java Integer).

The map() method is passed a key and a value. We convert the Text value containingthe line of input into a Java String, then use its substring() method to extract thecolumns we are interested in.

The map() method also provides an instance of Context to write the output to. In thiscase, we write the year as a Text object (since we are just using it as a key), and thetemperature is wrapped in an IntWritable. We write an output record only if the tem‐perature is present and the quality code indicates the temperature reading is OK.

The reduce function is similarly defined using a Reducer, as illustrated in Example 2-4.

Example 2-4. Reducer for the maximum temperature exampleimport java.io.IOException;

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); }}

Analyzing the Data with Hadoop | 25

key input value input key output value output

+Tipos de Dados Hadoop

Writable Define um protocolo de (de)serialização. Todo tipo dados Hadoop é um Writable.

WritableComprable Define uma ordem. Todas as chaves tem que ser deste tipo (mas não os valores)

IntWritableLongWritableText…

Classes concretas para diferentes tiposde dados.

SequenceFiles Codificação binária para uma sequênciade pares chaves/valor

43

This is the final output: the maximum global temperature recorded in each year.

The whole data flow is illustrated in Figure 2-1. At the bottom of the diagram is a Unixpipeline, which mimics the whole MapReduce flow and which we will see again later inthis chapter when we look at Hadoop Streaming.

Figure 2-1. MapReduce logical data flow

Java MapReduceHaving run through how the MapReduce program works, the next step is to express itin code. We need three things: a map function, a reduce function, and some code to runthe job. The map function is represented by the Mapper class, which declares an abstractmap() method. Example 2-3 shows the implementation of our map function.

Example 2-3. Mapper for the maximum temperature exampleimport java.io.IOException;

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93);

24 | Chapter 2: MapReduce

if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } }}

The Mapper class is a generic type, with four formal type parameters that specify theinput key, input value, output key, and output value types of the map function. For thepresent example, the input key is a long integer offset, the input value is a line of text,the output key is a year, and the output value is an air temperature (an integer). Ratherthan using built-in Java types, Hadoop provides its own set of basic types that are op‐timized for network serialization. These are found in the org.apache.hadoop.io pack‐age. Here we use LongWritable, which corresponds to a Java Long, Text (like JavaString), and IntWritable (like Java Integer).

The map() method is passed a key and a value. We convert the Text value containingthe line of input into a Java String, then use its substring() method to extract thecolumns we are interested in.

The map() method also provides an instance of Context to write the output to. In thiscase, we write the year as a Text object (since we are just using it as a key), and thetemperature is wrapped in an IntWritable. We write an output record only if the tem‐perature is present and the quality code indicates the temperature reading is OK.

The reduce function is similarly defined using a Reducer, as illustrated in Example 2-4.

Example 2-4. Reducer for the maximum temperature exampleimport java.io.IOException;

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); }}

Analyzing the Data with Hadoop | 25

key input value input output

output

44

if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } }}

The Mapper class is a generic type, with four formal type parameters that specify theinput key, input value, output key, and output value types of the map function. For thepresent example, the input key is a long integer offset, the input value is a line of text,the output key is a year, and the output value is an air temperature (an integer). Ratherthan using built-in Java types, Hadoop provides its own set of basic types that are op‐timized for network serialization. These are found in the org.apache.hadoop.io pack‐age. Here we use LongWritable, which corresponds to a Java Long, Text (like JavaString), and IntWritable (like Java Integer).

The map() method is passed a key and a value. We convert the Text value containingthe line of input into a Java String, then use its substring() method to extract thecolumns we are interested in.

The map() method also provides an instance of Context to write the output to. In thiscase, we write the year as a Text object (since we are just using it as a key), and thetemperature is wrapped in an IntWritable. We write an output record only if the tem‐perature is present and the quality code indicates the temperature reading is OK.

The reduce function is similarly defined using a Reducer, as illustrated in Example 2-4.

Example 2-4. Reducer for the maximum temperature exampleimport java.io.IOException;

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); }}

Analyzing the Data with Hadoop | 25

key input value input key output value output

Page 12: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+Job

n Um objeto “Job” contém a especificação de uma tarefa e nos permite controlar sua execução

n Para rodar o job em um cluster Hadoop, vamos empacotar o código em um arquivo JAR, que será distribuído pelo Hadoop entre os nodos do cluster.

n Ao invés de explicitamente especificar o nome do JAR, podemos uma classe no método setJarByClass(). O Hadoop usa essa informação para localizar o JAR.

45 46

Again, four formal type parameters are used to specify the input and output types, thistime for the reduce function. The input types of the reduce function must match theoutput types of the map function: Text and IntWritable. And in this case, the outputtypes of the reduce function are Text and IntWritable, for a year and its maximumtemperature, which we find by iterating through the temperatures and comparing eachwith a record of the highest found so far.

The third piece of code runs the MapReduce job (see Example 2-5).

Example 2-5. Application to find the maximum temperature in the weather datasetimport org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {

public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature");

FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

A Job object forms the specification of the job and gives you control over how the jobis run. When we run this job on a Hadoop cluster, we will package the code into a JARfile (which Hadoop will distribute around the cluster). Rather than explicitly specifyingthe name of the JAR file, we can pass a class in the Job’s setJarByClass() method,which Hadoop will use to locate the relevant JAR file by looking for the JAR file con‐taining this class.

26 | Chapter 2: MapReduce

Entrada:Arquivo, Diretório ou Pattern

Saída:Arquivo único

Classes Map e Reduce

Submete o Job e espera pelo términotrue: mensagens de progresso enviadas

pro console

Criação/Definiçãodo Job

47

hadoop MaxTemperature input/ncdc/sample.txt output

+Mapper

public class DemoWordCount extends Configured implements Tool {private static final Logger LOG = Logger.getLogger(DemoWordCount.class);

// Mapper: emits (token, 1) for every word occurrence.private static class MyMapper extends

Mapper<LongWritable,Text,Text,IntWritable> {

// Reuse objects to save overhead of object creation.private final static IntWritable ONE = new IntWritable(1);private final static Text WORD = new Text();

@Overridepublic void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {String line = ((Text) value).toString();StringTokenizer itr = new StringTokenizer(line);while (itr.hasMoreTokens()) {WORD.set(itr.nextToken());context.write(WORD, ONE);

}}

}

Page 13: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+Reducer

// Reducer: sums up all the counts.private static class MyReducer extends

Reducer<Text,IntWritable,Text,IntWritable> {

// Reuse objects.private final static IntWritable SUM = new IntWritable();

@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context)

throws IOException, InterruptedException {// Sum up values.Iterator<IntWritable> iter = values.iterator();int sum = 0;while (iter.hasNext()) {sum += iter.next().get();

}SUM.set(sum);context.write(key, SUM);

}}

+JOB

public DemoWordCount() {}.....

Configuration conf = getConf();Job job = Job.getInstance(conf);job.setJobName(”Word Count");job.setJarByClass(DemoWordCount.class);

FileInputFormat.setInputPaths(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outputPath));

job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);

job.setMapperClass(MyMapper.class);job.setCombinerClass(MyReducer.class);job.setReducerClass(MyReducer.class);

job.waitForCompletion(true);}

+Dados Complexos in Hadoop

n Jeito Fácil:n Codificar como texto, e.x., (a, b) = “a:b”

n Usar expressões regulares para ler e extrair os dados

n Funciona, mas é meio gambiarra

n Jeito Difícil:n Definir formalmente uma implementação de Writable(Comprable)

n Deve implmentar: readFields, write, (compareTo)

n Computacionalmente eficiente, mas complicado

n Implementação de WritableComparator deve ser eficiente

n No meio do do caminho:n Bibliotecas como Cloud9 e Bespin tem suporte para JSON e vários tipos úteis para usar com Hadoop

+Componentes Básicos do Cluster

n Um de cada:n Namenode (NN): Nó principal (master) do HDFS

n Jobtracker (JT): Nó principal para submissão de tarefas

n Conjunto de máquinas escravas:n Tasktracker (TT): contêm múltiplos slots de tarefas

n Datanode (DN): serve blocos de dados HDFS

* Not quite… leaving aside YARN for now

Page 14: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+Componentes Básicos - Cluster

datanode daemon

Linux file system

tasktracker

slave node

datanode daemon

Linux file system

tasktracker

slave node

datanode daemon

Linux file system

tasktracker

slave node

namenode

namenode daemon

job submission node

jobtracker

+Anatomia de um Job

n Programa MapReduce em Hadoop = Job Hadoopn Jobs são dividos em tarefas Map e tarefas reduce

n Uma instância de uma tarefa em execução ocupa um slot

n Chamade de "task attempt”

n Múltiplos Jobs podem ser executados em um workflow

n Submissão de Jobs:n Um cliente cria um Job, configura, e submete para o jobtracker

n Pronto! O cluster Hadoop toma conta…

+Anatomia de um Job

n Nos bastidores:n As divisões da entrada são computadas, no lado do cliente

n Informações do job (jar, conf. XML) são enviadas ao JobTracker

n JobTracker coloca essa informação em um local compartilhado em enfileira as tarefas

n Os TaskTrackers retiram as tarefas das suas filas

n O job entra em execução …

Page 15: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

InputSplit

Source: redrawn from a slide by Cloduera by Jimmy lin, cc-licensed

InputSplit InputSplit

Input File Input File

InputSplit InputSplit

RecordReader RecordReader RecordReader RecordReader RecordReader

Mapper

Intermediates

Mapper

Intermediates

Mapper

Intermediates

Mapper

Intermediates

Mapper

Intermediates

Inpu

tFor

mat

… …

InputSplit InputSplit InputSplit

Client

Records

Mapper

RecordReader

Mapper

RecordReader

Mapper

RecordReader

Source: redrawn from a slide by Cloduera by Jimmy lin, cc-licensed

Mapper Mapper Mapper Mapper Mapper

Partitioner Partitioner Partitioner Partitioner Partitioner

Intermediates Intermediates Intermediates Intermediates Intermediates

Reducer Reducer Reduce

Intermediates Intermediates Intermediates

(combiners omitted here)

Source: redrawn from a slide by Cloduera by Jimmy lin, cc-licensed

Reducer Reducer Reduce

Output File

RecordWriter

Outp

utFo

rmat

Output File

RecordWriter

Output File

RecordWriter

Source: redrawn from a slide by Cloduera by Jimmy lin, cc-licensed

Page 16: Problema T pico de Big Data MapReduce Iterar sobre um ... · Manuten o geral:! Comunica o peri dica com os datanodes! ... Exemplo de aplica o: analisar o log de um servidor Web para

+Input e Output

n InputFormat:n TextInputFormat

n KeyValueTextInputFormat

n SequenceFileInputFormat

n …

n OutputFormat:n TextOutputFormat

n SequenceFileOutputFormat

n …

+Distribuição e Ordenação

n Provavelmente o aspecto mais complexo do MapReduce

n No Mapn Saídas do Map são colocadas na memória em um buffer circular

n Quando o buffer enche, o conteúdo é ”ejetado” para o disco

n O "jatos" sofrem merge em um único arquivo particionado, que é ordenado dentro de cada partição. O combiner roda durante os merges

n No Reducen Primeiro, as saídas dos maps são copiadas nas máquinas dos reduces.

n A ordenação é um merge de vários passos destas saídas. Pode ser executada na memória e no disco.

n O combiner roda durantes os merges

n O passo final do merge vai diretamente para o reducer

+Distribuição e Ordenação

Mapper

Reducer

other mappers

other reducers

circular buffer

(in memory)

spills (on disk)

merged spills (on disk)

intermediate files (on disk)

Combiner

Combiner