Upload
others
View
3
Download
0
Embed Size (px)
Citation preview
MapReduce
Capítulo 2
Renato Gomes Borges Júnior
Sumário
● Introdução ● Exemplo: um conjunto de dados climáticos ● Analisando os dados● Map and Reduce● Java MapReduce● Fluxo de dados● Executando MapReduce distribuído● Hadoop streaming
Introdução
● Modelo de programação para processamento de dados.
● Inerentemente paralelo.
● Análise de dados de larga escala.
Exemplo: um conjunto de dados climáticos
● Sensores climáticos coletando dados a cada hora em muitos locais do globo geram grande quantidade de dados.
● Bom candidato para análise com MapReduce pois:
○ semi-estruturado;○ orientado a registros.
● Dados são armazenados usando um formato ASCII.
● Cada linha é um registro.
Exemplo: um conjunto de dados climáticos 0057332130 # USAF weather station identifier99999 # WBAN weather station identifier19500101 # observation date0300 # observation time4+51317 # latitude (degrees x 1000)+028783 # longitude (degrees x 1000)FM-12+0171 # elevation (meters)99999V020320 # wind direction (degrees)1 # quality codeN0072100450 # sky ceiling height (meters)1 # quality codeCN010000 # visibility distance (meters)1 # quality code
N9 0128 # air temperature (degrees Celsius x 10)1 # quality code-0139 # dew point temperature (degrees Celsius x 10)1 # quality code10268100450 # sky ceiling height (meters)1 # quality codeCN010000 # visibility distance (meters)1 # quality codeN9-0128 # air temperature (degrees Celsius x 10)1 # quality code-0139 # dew point temperature (degrees Celsius x 10)1 # quality code10268 # atmospheric pressure (hectopascals x 10)1 # quality code
Registro dividido em múltiplas linhas para mostrar o que cada campo representa.
Exemplo: um conjunto de dados climáticos
● Cada arquivo armazena as leituras de um único ano.
● Exemplo de um arquivo (algumas colunas foram retiradas):
0067011990999991950051507004...9999999N9+00001+99999999999...0043011990999991950051512004...9999999N9+00221+99999999999...0043011990999991950051518004...9999999N9-00111+99999999999...
● Estamos interessados em encontrar a maior temperatura para cada ano.
Exemplo: um conjunto de dados climáticos
● Cada arquivo armazena as leituras de um único ano.
● Exemplo de um arquivo (algumas colunas foram retiradas):
0067011990999991950051507004...9999999N9+00001+99999999999...0043011990999991950051512004...9999999N9+00221+99999999999...0043011990999991950051518004...9999999N9-00111+99999999999...
● Estamos interessados em encontrar a maior temperatura para cada ano.
Analisando os dados
● Usando ferramentas Unix para processar os dados (de forma serial), o processamento completo para o século levou 42 minutos.
● Alocando cada ano para um processo diferente ainda leva
tempo, pois o tamanho dos arquivos variam muito.
● A melhor abordagem é dividir a entrada em partes de tamanho fixo e alocar cada uma para um processo. No final combinam-se as soluções para obter a temperatura máxima.
● Para tirar vantagem do processamento paralelo do Hadoop
é necessário expressar nossa busca como uma função MapReduce.
Map and Reduce
● Quebra o processamento em duas fases: map and reduce.
● Cada fase tem pares chave-valor como entrada e saída (os tipos são escolhidos pelo programador).
● O programador especifica duas funções: a função de
mapeamento e a função de redução.
Map and Reduce
● Map
chave de entrada: deslocamento do inicio do arquivo até a linha com uma leitura climática.valor de entrada: string contendo todo o texto de um arquivo.
função: prepara os dados para a função de redução, separando em cada linha o ano e a temperatura obtida. Também filtra campos faltando, dados incorretos ou suspeitos.
chave de saída: string representando o ano.valor de saída: inteiro representando a temperatura.
Map and Reduce
● Reduce
chave de entrada: string representando o ano.valor de entrada: inteiro representando a temperatura.
função: itera sobre os valores de entrada procurando a temperatura máxima para cada ano.
chave de saída: string representando o ano.valor de saída: inteiro representando a temperatura máxima.
Map and Reduce
Figure 1. Fluxo de dados lógico do MapReduce.
Java MapReduce
import java.io.IOException;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*; public class MaxTemperature { static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus sign airTemperature = Integer.parseInt(line.substring(88, 92); } else { airTemperature = Integer.parseInt(line.substring(87, 92); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01495]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } }
Java MapReduce (cont.)
static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }
public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); FileInputFormat.addInputPath(job, new Path(args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1]);
Java MapReduce (cont.)
job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
Fluxo de dados
● Para executar a aplicação em uma escala maior é necessário armazenar os dados em um sistema de arquivos distribuído (tipicamente HDFS).
● Permite o Hadoop mover o processamento do MapReduce
para cada maquina que contém parte dos dados.
Fluxo de dados
● Job - unidade de trabalho que o cliente quer que seja executado:○ dados de entrada;○ programa MapReduce;○ informações de configuração.
● Tasks - divisão de um job, que é processada individualmente.
○ map tasks;○ reduce tasks.
● Jobtracker - nó que coordena toda a execução do job no sistema. Agenda tasks para serem executadas nos tasktrackers.
● Tasktracker - nós que executam tasks e enviam o relatório de
seu progresso para o jobtracker.
Fluxo de dados
● Input splits - divisão dos dados de entrada em partes de tamanho fixo. Para cada split, é criado uma map task que executa a função de map para cada registro no split.
Fluxo de dados
Figura 2. Fluxo de dados MapReduce com um único reduce task.
Fluxo de dados
Figura 3. Fluxo de dados MapReduce com múltiplos reduce tasks.
Fluxo de dados
Figura 4. Fluxo de dados sem reduce tasks.
Executando MapReduce Distribuído
● O mesmo programa levou 6 minutos.
Hadoop Streaming
● Provê API que permite escrever as funções map e reduce em outras linguagens além de Java.
● Usa o Unix como interface entre o Hadoop e o seu
programa.
● Função map recebe o arquivo de entrada, processa linha por linha e gera um arquivo de saída com os pares chave-valor separados por um caractere de tabulação.
● Função reduce recebe o arquivo de saída da função map,
processa e escreve o resultado na saída padrão.
Referências
● Tom White, "Hadoop the definitive guide". O'Reilly, 2ª edição.
● Documentação:
http://hadoop.apache.org/common/docs/r0.17.0/api/org/apache/hadoop/mapred/package-summary.html
● Tutorial MapReduce:
http://hadoop.apache.org/common/docs/current/mapred_tutorial.html