76
Universidade Estadual de Campinas Instituto de Computação INSTITUTO DE COMPUTAÇÃO Augusto Rodrigues de Souza Mecanismos para Escalonamento de Aplicações MapReduce de Diferentes Prioridades CAMPINAS 2017

Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

  • Upload
    others

  • View
    4

  • Download
    0

Embed Size (px)

Citation preview

Page 1: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Universidade Estadual de CampinasInstituto de Computação

INSTITUTO DECOMPUTAÇÃO

Augusto Rodrigues de Souza

Mecanismos para Escalonamento de AplicaçõesMapReduce de Diferentes Prioridades

CAMPINAS2017

Page 2: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Augusto Rodrigues de Souza

Mecanismos para Escalonamento de Aplicações MapReduce deDiferentes Prioridades

Dissertação apresentada ao Instituto deComputação da Universidade Estadual deCampinas como parte dos requisitos para aobtenção do título de Mestre em Ciência daComputação.

Orientadora: Profa. Dra. Islene Calciolari Garcia

Este exemplar corresponde à versão final daDissertação defendida por AugustoRodrigues de Souza e orientada pela Profa.Dra. Islene Calciolari Garcia.

CAMPINAS2017

Page 3: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Agência(s) de fomento e nº(s) de processo(s): Não se aplica.ORCID: http://orcid.org/0000-0001-9296-7769

Ficha catalográficaUniversidade Estadual de Campinas

Biblioteca do Instituto de Matemática, Estatística e Computação CientíficaAna Regina Machado - CRB 8/5467

Souza, Augusto Rodrigues de, 1985- So89m SouMecanismos para escalonamento de aplicações MapReduce de diferentes

prioridades / Augusto Rodrigues de Souza. – Campinas, SP : [s.n.], 2017.

SouOrientador: Islene Calciolari Garcia. SouDissertação (mestrado) – Universidade Estadual de Campinas, Instituto de

Computação.

Sou1. Sistemas distribuídos. 2. MapReduce (Programação paralela

distribuída). 3. Processamento eletrônico de dados - Processamentodistribuído. I. Garcia, Islene Calciolari,1971-. II. Universidade Estadual deCampinas. Instituto de Computação. III. Título.

Informações para Biblioteca Digital

Título em outro idioma: Scheduling mechanisms for MapReduce applications with distinctprioritiesPalavras-chave em inglês:Distributed systemsMapReduce (Distributed parallel programming)Electronic data processing - Distributed processingÁrea de concentração: Ciência da ComputaçãoTitulação: Mestre em Ciência da ComputaçãoBanca examinadora:Islene Calciolari Garcia [Orientador]Luiz Fernando BittencourtGustavo Maciel Dias VieiraData de defesa: 28-04-2017Programa de Pós-Graduação: Ciência da Computação

Powered by TCPDF (www.tcpdf.org)

Page 4: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Universidade Estadual de CampinasInstituto de Computação

INSTITUTO DECOMPUTAÇÃO

Augusto Rodrigues de Souza

Mecanismos para Escalonamento de Aplicações MapReduce deDiferentes Prioridades

Banca Examinadora:

• Profa. Dra. Islene Calciolari GarciaInstituto de Computação — UNICAMP

• Prof. Dr. Gustavo Maciel Dias VieiraDepartamento de Computação — UFSCar-Sorocaba

• Prof. Dr. Luiz Fernando BittencourtInstituto de Computação — UNICAMP

A ata da defesa com as respectivas assinaturas dos membros da banca encontra-se noprocesso de vida acadêmica do aluno.

Campinas, 28 de abril de 2017

Page 5: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Agradecimentos

À minha esposa Carolina e minha filha Julieta pelo apoio e compreensão principalmentequando o escasso tempo que deveria ser gasto com a família foi dedicado a este trabalho.

Ao meu pai Nelson, minha mãe Vera Ruth e minhas irmãs Katia e Camila pela minhaformação como cidadão e estudante, por me fazerem me interessar por estudar.

Às comunidades do Hadoop e do Disco que nos ajudaram fornecendo os projetosque foram utilizados nesse trabalho de mestrado. Em especial, aos desenvolvedores daApache Carlo Curino e Chris Douglas que foram mentores no início do nosso trabalho noHadoop. Ao serviço de computação na nuvem Digital Ocean que nos forneceu créditospara executarmos nossos testes.

Por último, mas não menos importante, à minha orientadora Professora Doutora IsleneCalciolari Garcia pela orientação, paciência, ensinamentos e compreensão. Sem dúvidas,este foi um dos trabalhos mais complicados da minha vida e pude contar com uma orien-tação que o simplificou muito nos momentos mais necessários.

Page 6: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Resumo

Em 2004, o Google surpreendeu a comunidade de sistemas distribuídos ao divulgar comofuncionava seu framework de MapReduce e seu sistema de arquivos distribuído. Tal inova-ção rapidamente chamou a atenção das comunidades de software livre e diversos sistemasde MapReduce de código aberto foram criados implementando as ideias divulgadas peloGoogle. Neste trabalho, focamos nossa atenção em dois deles, chamados Hadoop e Disco.O primeiro é desenvolvido principalmente em Java e o segundo em Erlang e discutimosum pouco sobre como essas linguagens influenciaram a arquitetura e o funcionamentodestes sistemas.

Focamos em especial em como esses sistemas trabalham com cargas heterogêneas,ou seja, compostas por aplicações de pesquisa (baixa prioridade e longa duração) e deprodução (alta prioridade e baixa duração) em um mesmo cluster. Analisamos a fundocomo esses sistemas funcionam, principalmente, quais são os mecanismos que auxiliam natomada de decisão sobre agendamento de tarefas aos escalonadores. Por fim, propusemoso uso de uma política de escalonamento justa e baseada em preempção de tarefas paraauxiliar o escalonador do Disco a dar ênfase à aplicação de produção sem deixar de serjusto, ou seja, faz com que rapidamente a aplicação de produção atinja a quantidade derecursos do cluster que por justiça pertencem à ela em uma divisão justa dos mesmos.Para isso tarefas da aplicação menos prioritária (de pesquisa) sofrem preempção.

Contribuímos para as comunidades do Hadoop e do Disco ao longo desse mestrado:colaboramos na codificação de uma política de preempção e checkpointing ao Hadoop edesenvolvemos uma política justa e que se utiliza de preempção no Disco para priorizaras aplicações de produção com resultados significativos no tempo de execução da nossacarga experimental. Outra contribuição deste trabalho é uma interface Web auxiliar aoDisco para reprodução dos nossos experimentos.

Page 7: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Abstract

In 2004, Google released a paper in which they described how their MapReduce frameworkworked and how they structured their distributed file system. This innovation madethe open software communities to organize themselves in order to develop open sourcealternatives for the Google’s MapReduce framework. In this work, we focused in two ofthese systems: Hadoop and Disco. The former was developed mainly in Java and thelater in Erlang, and we also discuss how these languages influenced the architecture andthe behavior of these systems.

Especially we looked to how these systems address the problem related to hetero-geneous workloads which are composed of research applications (low priority and longduration) and production (high priority and short duration) in the same cluster. We an-alyzed deeply how these systems work and which are the mechanisms they use to help inthe scheduling decision making. Lastly, we proposed the use of a Fair scheduling policybased in using preemption of tasks to guide the scheduler of Disco to give emphasis tothe production application without giving up of being fair. As a consequence the produc-tion application quickly gets the resources designated to it in a Fair division. For this tohappen, tasks with the lower priority application (research) are affected by preemption.

We also contributed to the communities of Hadoop and Disco: we collaborated in thedevelopment of a preemption policy and checkpointing service for Hadoop and developeda Fair scheduling policy based on preemption for Disco to help it prioritize productionapplications with good results in the execution time for our experimental workload. An-other contribution of this work was a Web interface for Disco to help in the reproductionof our experiments.

Page 8: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Lista de Figuras

2.1 Execução do contador de palavras em um ambiente MapReduce . . . . . . 19

3.1 Execução de MapReduce (traduzido de [19]) . . . . . . . . . . . . . . . . . 243.2 Arquitetura clássica do Hadoop (traduzido de [50]) . . . . . . . . . . . . . 253.3 Arquitetura do YARN (traduzido de [17]) . . . . . . . . . . . . . . . . . . 263.4 Divisão de recursos utilizando-se o Hadoop Fair Scheduler . . . . . . . . . 273.5 Divisão de recursos com o Hadoop Capacity Scheduler . . . . . . . . . . . 28

4.1 Visão geral da arquitetura do Disco (traduzido de [13]) . . . . . . . . . . . 324.2 Troca de mensagens do protocolo Worker para execução de uma tarefa ([13]) 334.3 Pipeline do Disco: abstração baseada em uma sequência de estágios . . . . 344.4 Política de agrupamento: split ([54]) . . . . . . . . . . . . . . . . . . . . . 354.5 Política de agrupamento: group_all ([54]) . . . . . . . . . . . . . . . . . . 354.6 Política de agrupamento: group_label ([54]) . . . . . . . . . . . . . . . . . 364.7 Política de agrupamento: group_label_node ([54]) . . . . . . . . . . . . . 364.8 Política de agrupamento: group_node ([54]) . . . . . . . . . . . . . . . . . 374.9 MapReduce segundo as regras do pipeline . . . . . . . . . . . . . . . . . . 384.10 Comparação da distribuição dos recursos do cluster para as políticas FIFO

e Fair do Disco . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 39

5.1 Replicação de dados em um sistema de arquivos distribuído e sua relaçãocom o mecanismo de localicade dos dados . . . . . . . . . . . . . . . . . . . 42

5.2 Exemplo de execução de tarefas especulativas para o Hadoop . . . . . . . . 435.3 Armazenamento e transporte dos dados intermediários sem a presença de

I-File no sistema de arquivos distribuídos . . . . . . . . . . . . . . . . . . . 465.4 Armazenamento e transporte dos dados intermediários com a presença de

I-File no sistema de arquivos distribuídos . . . . . . . . . . . . . . . . . . . 475.5 Preempção sem checkpointing . . . . . . . . . . . . . . . . . . . . . . . . . 485.6 Preempção com checkpointing . . . . . . . . . . . . . . . . . . . . . . . . . 48

6.1 Melhor utilização dos recursos do cluster com o auxílio de preempção echeckpointing . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 55

6.2 Preempção das tarefas de uma aplicação menos prioritária pela política deescalonamento “Preemptive Fair”. . . . . . . . . . . . . . . . . . . . . . . . 58

6.3 Concessão dos recursos à aplicação mais injustiçada pela política de esca-lonamento “Preemptive Fair”. . . . . . . . . . . . . . . . . . . . . . . . . . 58

6.4 Tempos de execução das aplicações de produção . . . . . . . . . . . . . . . 596.5 Tempos de execução das aplicações de pesquisa. . . . . . . . . . . . . . . . 596.6 Utilização do cluster para a execução do experimento com a versão original

da política Fair. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 60

Page 9: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

6.7 Utilização do cluster para a execução do experimento com a política PreemptiveFair que implementamos . . . . . . . . . . . . . . . . . . . . . . . . . . . . 61

6.8 Interface do Jupyter Notebooks, famosa ferramenta da comunidade Python 626.9 Link para a ferramenta Notebooks exibido no canto superior direito da tela 636.10 Tela do Notebooks para a submissão de novas aplicações . . . . . . . . . . 646.11 Tela principal do Disco Web enquanto aplicações estão em execução . . . . 656.12 Tela do Notebooks com a listagem dos relatórios disponíveis . . . . . . . . 656.13 Tela do Notebooks com os resultados dos experimentos de um dado rela-

tório selecionado . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 66

Page 10: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Lista de Tabelas

4.1 Comparação de tamanhos dos projetos Hadoop e Disco . . . . . . . . . . . 30

Page 11: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Sumário

1 Introdução 131.1 Contribuições . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 151.2 Organização da dissertação . . . . . . . . . . . . . . . . . . . . . . . . . . . 16

2 MapReduce 172.1 Modelo de programação . . . . . . . . . . . . . . . . . . . . . . . . . . . . 182.2 Ambiente de execução . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 192.3 Exemplos de aplicações . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 21

3 Hadoop 233.1 Arquitetura . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 243.2 Escalonadores . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 26

3.2.1 Fair Scheduler . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 273.2.2 Capacity Scheduler . . . . . . . . . . . . . . . . . . . . . . . . . . . 28

3.3 Preempção de tarefas . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 29

4 Disco 304.1 Arquitetura . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 314.2 Protocolo do worker . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 324.3 Pipeline . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 33

4.3.1 Split . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 344.3.2 Group_all . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 354.3.3 Group_label . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 364.3.4 Group_label_node . . . . . . . . . . . . . . . . . . . . . . . . . . . 364.3.5 Group_node . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 374.3.6 MapReduce como um pipeline . . . . . . . . . . . . . . . . . . . . . 37

4.4 Políticas de escalonamento . . . . . . . . . . . . . . . . . . . . . . . . . . . 37

5 Mecanismos Auxiliares aos Escalonadores 405.1 Localidade de dados . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 405.2 Tarefas especulativas . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 415.3 I-Files . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 445.4 Preempção e checkpointing . . . . . . . . . . . . . . . . . . . . . . . . . . 455.5 Outros trabalhos relacionados . . . . . . . . . . . . . . . . . . . . . . . . . 49

6 Contribuições e Resultados 516.1 Trabalho realizado no Hadoop . . . . . . . . . . . . . . . . . . . . . . . . . 526.2 Trabalho realizado no Disco . . . . . . . . . . . . . . . . . . . . . . . . . . 56

6.2.1 Experimentos . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 58

Page 12: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

6.2.2 Notebooks . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 60

7 Conclusão 67

Referências Bibliográficas 69

A Comparação entre Java e Erlang 74A.1 Gerenciamento de memória . . . . . . . . . . . . . . . . . . . . . . . . . . 75A.2 Concorrência e paralelismo . . . . . . . . . . . . . . . . . . . . . . . . . . . 76

Page 13: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Capítulo 1

Introdução

Atualmente, convivemos com coleções de dados tão grandes e complexas que ferramentascomputacionais tradicionais, como por exemplo bancos de dados relacionais, já não sãosuficientes para armazenar e analisar esses dados. Um dos principais gargalos é a veloci-dade de acesso ao disco. Enquanto a capacidade dos discos cresceu muito com o passardos anos — um HD típico de 1990 armazenava 1370 MB e atualmente é comum termosHDs de 1 TB — a velocidade de acesso ao disco não acompanhou tal crescimento (4,4MB/s em 1990 e 100 MB/s em 2012). Com essa capacidade e velocidade de acesso, emmédia um disco inteiro era lido em 5 minutos em 1990, enquanto no início desta décadamais do que duas horas e meia eram gastas para realizar tal atividade [19, Capítulo 1].

Uma solução simples para o problema de se armazenar e processar grande quantidadede dados é distribuí-los em vários discos, formando um cluster de máquinas; assim oconteúdo é lido de vários HDs ao mesmo tempo e a velocidade de acesso global é maior.Porém, isso cria uma série de dificuldades, como por exemplo:

• de que forma desenvolver aplicações que leem seus dados desse sistema de arquivosdistribuído;

• como aproveitar o poder de processamento do cluster, pois não seria eficiente utilizarapenas o poder de armazenamento;

• como identificar e se recuperar em casos de falhas de hardware e rede;

• como estruturar uma arquitetura com máquinas convencionais (commodity machi-nes) para diminuir os custos.

O modelo computacional MapReduce conforme o Google propôs [6] tinha como ob-jetivo principal facilitar o tratamento dessas questões comuns a diversas aplicações dis-tribuídas em um framework. Dessa maneira, os programadores podem fazer uso desseframework ao invés de reimplementar as soluções para esses problemas em cada aplicaçãoindividualmente. MapReduce se baseia em delegar ao programador a responsabilidade porimplementar as funções Map e Reduce. A primeira é responsável por dividir a entradaem vários agrupamentos pequenos e independentes, que são processados em paralelo nasdiversas máquinas do cluster. As saídas dos mappers (processos que executam a função

13

Page 14: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 1. INTRODUÇÃO 14

Map) são entradas para os reducers (processos que executam paralelamente a função Re-duce). Os reducers, por sua vez, combinam os dados para chegar à resposta final. Paraque MapReduce funcione adequadamente, é necessário o apoio de um sistema de arquivosdistribuído e escalável que forneça os dados de forma confiável e a altas taxas. No casodo Google, o GFS (Google File System) [9] se encarrega disto.

Baseando-se no MapReduce e GFS do Google, alguns projetos de código aberto sur-giram com o objetivo de resolver os mesmos problemas. Tratam-se de maneiras maisacessíveis, já que o sistema utilizado pelo Google é proprietário e somente seus engenhei-ros têm acesso a ele. Neste trabalho vamos analisar dois desses projetos: Hadoop [24] eDisco [33]. O primeiro é mantido pela Apache e tem como objetivo “Desenvolver soft-ware de código aberto para computação confiável, escalável e distribuída”. Além disso, oHadoop é altamente conhecido e utilizado na indústria inclusive por grandes empresascomo Facebook e Yahoo e usa principalmente a linguagem de programação Java em seunúcleo. O segundo, por sua vez, foi desenvolvido pela Nokia em 2008 é mais enxuto emtermos de quantidade de linhas de código e complexidade tanto para administração comopara adição de novas funcionalidades. O Disco é escrito em Erlang — uma linguagem deprogramação desenvolvida pela Ericsson especialmente para tratar problemas relacionadosa sistemas distribuídos como concorrência, comunicação entre as máquinas de um clustere tolerância a falhas [2]. Para escrever aplicações MapReduce, o Hadoop possibilita ouso de Java, Python, Ruby e C++, sendo Java a mais popularmente adotada. Aplicaçõesque rodem no Disco podem ser escritas em qualquer linguagem que possua uma bibliotecacompatível com o seu núcleo (worker), mas suporta e adota principalmente o Python paraessa finalidade.

A popularização das soluções de MapReduce fez com que diversos tipos de aplicaçõesfossem desenvolvidas seguindo seu modelo. Muitas decisões de negócios dependem hojeem dia de aplicações dessa natureza. Algumas dessas decisões são urgentes, como o pro-cessamento dos logs de cliques para medir a efetividade de anúncios em grandes sites; jáoutras nem tanto, como analisar os dados de uso para decidir onde posicionar anúnciosem uma página. É comum que clusters recebam aplicações com diferentes prioridades [5].Dividimo-nas em duas principais categorias: pesquisa e produção. Normalmente, as apli-cações de pesquisa possuem baixa prioridade e tarefas com tempo de duração longo,enquanto as de produção são mais prioritárias e rápidas no que diz respeito à duração.

O requisito que faz com que seja necessário respeitar as prioridades e rodar aplicaçõesem paralelo para melhor aproveitar um cluster gera a necessidade de mecanismos para queos escalonadores dos frameworks tomem boas decisões sobre qual aplicação deve ter suatarefa atribuída a um dado recurso que esteja disponível. O principal desses mecanismosé a preempção, que possibilita a interrupção de parte das tarefas de uma aplicação depesquisa quando uma de produção precisa ser executada. O escalonador principal doHadoop é o Capacity Scheduler, onde existem mecanismos de preempção e também derecuperação do estado de uma tarefa que sofreu preempção para que a computação nãoseja perdida quando a execução é retomada.

Ao mecanismo de recuperação do estado das tarefas damos o nome de Checkpointing.As políticas de escalonamento do Disco — FIFO (First In First Out) e Fair (justiçana distribuição dos recursos disponíveis) — possuem limitações no que diz respeito a

Page 15: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 1. INTRODUÇÃO 15

mecanismos para execução de aplicações com diferentes prioridades. FIFO escalona todasas tarefas da primeira aplicação submetida antes de começar a escalonar as da segundaindependente da prioridade. Fair demora muito para fornecer à aplicação de produçãoseus recursos já que a aplicação de pesquisa possui tarefas longas que acabam por retê-loaté o término de sua execução na ausência de um mecanismo de preempção.

Para contribuir com tal discussão descrevemos nas páginas que se seguem o resultadode uma pesquisa sobre mecanimos que auxiliam os escalonadores do Hadoop e do Discoa trabalharem melhor com cargas heterogêneas de aplicações. Chamamos de carga he-terogênea esse uso dos clusters MapReduce com aplicações de prioridades e tempos deexecução diferentes em paralelo, ou seja, execução concorrentes de aplicações de pesquisae produção.

1.1 Contribuições

Além desta dissertação, nossa pesquisa produziu as seguintes contribuições:

• Apresentação do trabalho no IX WTD e consequente publicação do resumo expan-dido:

– Augusto Souza e Islene Garcia. Preempção de Tarefas MapReduce via Check-pointing. Em Anais do IX Workshop de Teses, Dissertações e Trabalhos deIniciação Científica em Andamento do IC-UNICAMP, páginas 86–91. Insti-tuto de Computação, Universidade Estadual de Campinas, agosto de 2014 [7].

• Artigo aceito no WPerformance 2016 em Porto Alegre e publicado nos Anais doXXXVI CSBC:

– Augusto Souza e Islene Garcia. A preemptive fair scheduler policy for discoMapReduce framework. Em Anais do XXXVI Congresso da Sociedade Brasi-leira de Computação, páginas 2758–2769. Pontifícia Universidade Católica doRio Grande do Sul (PUCRS), julho de 2016 [8].

• Trabalho junto à comunidade do projeto Hadoop evidenciado pelas seguintes ativi-dades:

– Atualizações em patch para adequar uma solução de checkpointing desenvol-vida por outros pesquisadores ao código corrente na época deste trabalho:

∗ Preemption of Reducer (and Shuffle) via checkpointinghttps://issues.apache.org/jira/browse/MAPREDUCE-5269

– Proposta de divisão de patch em componentes menores, mais facilmente atua-lizáveis e testáveis:

∗ Add support for PartialFileOutputCommiter when checkpointing is an op-tion during preemptionhttps://issues.apache.org/jira/browse/MAPREDUCE-6434

Page 16: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 1. INTRODUÇÃO 16

∗ Add a checkpointable version of shuffle and reduce context supported bya checkpoint manager which uses the HDFShttps://issues.apache.org/jira/browse/MAPREDUCE-6444

• Pesquisa e desenvolvimento de funcionalidades e ferramentas para o projeto Disco:

– Disponibilização do código da política de escalonamento Fair e com preempçãoà comunidade:

∗ Repositório no Github com a mesma licença do Disco e com o código donova política de escalonamento

https://github.com/augustorsouza/disco/tree/fair_policy_with_preemption

– Arquitetura e desenvolvimento de solução para execução facilitada de aplica-ções no Disco por meio de uma interface Web já pré-configurada para facilitara reprodução dos experimentos descritos nesta dissertação

∗ Repositório no Github com a mesma licença do Disco e com o código dainterface Web Notebookshttps://github.com/augustorsouza/disco/tree/notebooks

1.2 Organização da dissertação

O restante dessa dissertação se encontra organizado da seguinte forma:

• Capítulo 2: descreve o modelo de programação e o ambiente de execução paraaplicações MapReduce conforme proposto pelo Google [6];

• Capítulo 3: detalha como MapReduce foi implementado no Hadoop;

• Capítulo 4: detalha como MapReduce foi implementado no Disco;

• Capítulo 5: descreve alguns mecanismos auxiliares aos escalonadores de sistemasMapReduce;

• Capítulo 6: aqui apresentamos os resultados atingidos pela nossa pesquisa deta-lhando as contribuições para o Hadoop e para o Disco;

• Capítulo 7: explicita conclusões e trabalhos futuros;

• Apêndice A: auxilia no entendimento das diferenças entre Hadoop e Disco por meiodo entendimento das diferenças entre as principais linguagens de programação uti-lizadas por esses dois sistemas.

Page 17: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Capítulo 2

MapReduce

Em 2004, pesquisadores do Google divulgaram uma pesquisa muito importante para a áreade sistemas distribuídos por auxiliar a comunidade científica a entender como essa em-presa conseguia produzir e computar quantidades tão massivas de dados [6]. De maneiraempírica, os pesquisadores do Google notaram que as aplicações distribuídas executadasnos clusters da empresa continham diversas similaridades no que diz respeito às operaçõessobre os dados e ao ambiente escalável propício para sua execução.

O formato dessas aplicações podia ser abstraído com o auxílio de duas operações co-muns em linguagens funcionais (como Lisp, por exemplo). A primeira operação chamadade Map, seria responsável por mapear os dados iniciais em conjuntos de valores que com-partilhavam a mesma chave. Na sequência, a operação de Reduce seria responsável poraplicar uma computação sobre o conjunto gerado por Map, produzindo uma saída fi-nal que seria uma resposta para a computação pretendida. Após notar essa organizaçaocomum às aplicações distribuídas, os pesquisadores do Google propuseram um modelode programação aos desenvolvedores que criavam aplicações distribuídas baseadas nessasduas operações. Dessa maneira, todos os detalhes referentes à execução distribuída nãoprecisariam mais serem implementados por cada aplicação, mas sim por um ambiente deexecução comum. Na Seção 2.1 mostramos esse modelo de programação com mais deta-lhes e um exemplo de aplicação expressa segundo essa abstração. Diversos problemas domundo real podem ser expressos seguindo esse formato.

O ambiente de execução desejado para as aplicações precisaria atender a uma sériede requisitos. O primeiro deles, executar de maneira paralelizada sobre máquinas con-vencionais. A quantidade de computadores e CPUs que cada um possui pode variar dealguns poucos a milhares. Máquinas podem ser adicionadas ou removidas do cluster seminterrupções de sua utilização, o que também facilita manutenções. Quando se trabalhacom milhares de máquinas, detalhes referentes à tolerância a falhas são essenciais, já queuma falha completa ou parcial de um dado computador é mais provável nessa situaçãoe isso não deve ser motivo para que o ambiente deixe de estar disponível ou que umadada execução seja interrompida. Por fim, deve-se escalonar os pedaços dessas aplicações(tarefas) de maneira eficiente, por meio da gerência da comunicação entre as máquinas.Na Seção 2.2 veremos como o Google atende esses requisitos.

Para finalizar o capítulo, na Seção 2.3, explicamos em altíssimo nível como algunsproblemas comuns podem ser implementados utilizando-se da lógica proposta por Map-

17

Page 18: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 2. MAPREDUCE 18

Reduce.

2.1 Modelo de programação

O usuário da biblioteca de MapReduce em sua aplicação expressa sua computação emtermos de duas funções: Map e Reduce. Map processa um arquivo de entrada e gera pareschave-valor. Esses constituem um conjunto intermediário de dados, que serão agrupadospelo ambiente de execução de acordo com a chave, e repassados para a função de Reduce.Reduce, por sua vez, recebe a chave intermediária anterior e o conjunto de valores mapeadoa essa chave e mescla-os tipicamente em zero ou um único valor de saída. É importantenotar que para serem computáveis, os dados intermediários são lidos em lotes para quenão possam ocupar mais memória do que há disponível.

Para entender melhor o modelo de programação MapReduce um exemplo pode auxi-liar. Considere o problema de contar a quantidade de ocorrências de cada palavra contidaem uma coleção de documentos. Nesse casso, o desenvolvedor poderia escrever o seguintepseudo-código:

Algoritmo 1 Exemplo de função Map para o problema do Contador de Palavras. Chave: nome do documento. Valor: conteúdo textual do documentofunction map(Chave, Valor)

for each palavra ∈ V alor doEmitaIntermediario(palavra, 1)

end forend function

Algoritmo 2 Exemplo de função Reduce para o problema do Contador de Palavras. Chave: uma palavra. Valores: uma lista de inteiros com valor 1function reduce(Chave, Valores)

resultado← 0for each valor ∈ V alores do

resultado← resultado+ 1end forEmitaResultado(Chave, resultado)

end function

A função Map emite para cada palavra de um dado documento essa mesma palavrae o valor 1. Reduce, por sua vez, recebe para cada palavra um conjunto com a listados inteiros emitidos pelo Map, anteriormente. Lembrando que o ambiente de execuçãotratou de agrupar as saídas do Map nesse formato de chave-valores. Para ser executável,nosso exemplo ainda precisa que o cliente que irá utilizar o sistema de MapReduce forneçainformações a respeito dos dados de entrada e de saída, como os nomes dos arquivos, porexemplo.

Page 19: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 2. MAPREDUCE 19

2.2 Ambiente de execução

O modelo de programação descrito na Seção 2.1 precisa de um ambiente de execuçãoescalável para tirar melhor proveito dos recursos de computação disponíveis. Apesar denesta dissertação focarmos em execuções de aplicações que geram ou consomem umagrande quantidade de dados em clusters de máquinas convencionais, existem adaptaçõesdo modelo de programação MapReduce para outros ambientes como o de memória com-partilhada, por exemplo [14]. Além disso, o tipo de sistema de MapReduce base paraesta dissertação é aquele que compartilha os recursos de computação, armazenamento ecomunicação entre as máquinas de maneira transparente aos desenvolvedores. Todos oscomputadores do cluster estão conectados e possuem informações sobre a localização deseus pares (se estão no mesmo rack, por exemplo). Cada uma dessas máquinas possuium disco rígido tradicional que é utilizado pelo sistema de arquivos distribuído que servecomo base para o MapReduce, replicação de dados é o principal mecanismo para disponi-bilidade e integridade dos dados. Linux é o sistema operacional mais usualmente utilizadoem tais máquinas e elas podem ou não ter múltiplos núcleos.

Uma aplicação MapReduce é constituída de tarefas que são unidades responsáveisprincipalmente por executar uma das duas funções primordiais do modelo de programação(Map ou Reduce). Cabe ao escalonador do ambiente de execução receber essas aplicaçõese decidir em que momento cada recurso será alocado para cada tarefa. Além disso, esseescalonador deve ser capaz de trabalhar com múltiplas aplicações em paralelo, decidindoquais de suas tarefas devem ser direcionadas a cada máquina sempre que existirem recursosdisponíveis no cluster.

Figura 2.1: Execução do contador de palavras em um ambiente MapReduce

A Figura 2.1 ilustra a execução de uma aplicação de contar palavras descrita na Se-ção 2.1 em um ambiente de execução MapReduce conforme descrito pelo Google. Primei-ramente, note que a arquitetura é mestre-escravo, ou seja, a uma das máquinas (mestre)é atribuída a responsabilidade de coordenar o trabalho das demais (escravos). Note tam-

Page 20: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 2. MAPREDUCE 20

bém que cada um dos escravos deve executar a função de Map ou a de Reduce. Alémdisso, existe um canal de comunicação sempre ativo entre o mestre e os escravos, repre-sentado pelas setas com linhas tracejadas. Dito isso, a aplicação segue o seguinte roteirode execução:

1. A quantidade de máquinas responsáveis por executar funções de Map e de Reduceé um dado previamente configurado no cluster. Vamos supor que tenhamos Mmáquinas designadas à Map e R à Reduce. Então, o cliente submete a aplicaçãoe as informações necessárias para a inicialização, como a localização dos arquivosde entrada, por exemplo. Múltiplas cópias desse programa são inicializadas pelasdiferentes máquinas do cluster. A entrada é dividida em M partes.

2. À uma das cópias do programa é dada a função de mestre (o restante são consi-deradas escravos e recebem comandos dele). M dos escravos que estão disponíveisrecebem a designação de executar Map e R a de executar Reduce.

3. Aqueles que receberam a designação de executar Map leem uma parte da entrada echamam a função correspondente com os parâmetros correspondentes. Os resultadosintermediários produzidos como saída dessa chamada são mantidos em memória.

4. Para não arriscar perder os dados intermediários, periodicamente eles são gravadosem arquivo e particionados em R pedaços. A localização desses arquivos é enviadado escravo correspondente ao mestre para que esse último possa depois repassar aosescravos que executarão Reduce.

5. O mestre notifica os escravos que irão executar Reduce que prontamente via cha-mada remota de funções pedem a parte do resultado da execução de Map que lhesinteressa aos escravos correspondentes. Com todos os dados lidos, o escravo respon-sável por Reduce ordena-os de acordo com a chave. Por fim, a função Reduce dousuário é chamada recebendo por parâmetro a chave e o conjunto de valores corres-pondente. A saída é então concatenada a um arquivo de saída que estará gravadono sistema de arquivos distribuído.

Uma importante restrição desse ambiente reside no mestre, pois ele precisa manteruma estrutura de dados que indique a localização para cada máquina que executou a fun-ção de Map das suas R saídas. Sendo assim, os valores de M e R precisam ser muito bemescolhidos para que a memória do mestre possa comportar tamanha estrutura de dados.Além disso, na arquitetura proposta pelo Google, o mestre é um ponto único de falha,ou seja, caso ele falhe não é possível que o sistema se recupere e que a computação atéo momento da falha não seja perdida. Apesar disso, a falha de mestre é bastante impro-vável estatisticamente. Por outro lado, os escravos tendem a falhar com mais frequência.Por esse motivo, quando falham o mestre consegue detectar isso por meio do canal decomunicação entre eles e imediatamente pode demandar a reexecução da atividade queo escravo estivesse executando. No caso da mesma atividade ter sido executada por doisescravos diferentes (por uma lentidão de resposta no canal de comunicação, por exemplo),a primeira a ser enviada como resposta ao mestre é aquela que é considerada. Essa úl-tima otimização é denominada Backup Task e auxilia no caso de atrasos pela presença no

Page 21: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 2. MAPREDUCE 21

cluster de máquinas que estejam lentas, porém não tenham falhado ainda (por um discodefeituoso, por exemplo).

Tanto no início, para a leitura dos dados iniciais, como no término, para a gravaçãodos dados finais em um ambiente confiável, a aplicação MapReduce faz uso de um sistemade arquivos distribuído. Esse tipo de sistema costuma distribuir um mesmo arquivo emdiversas máquinas para garantir a alta disponibilidade das informações. Esse detalhe deimplementação possibilita uma outra otimização garantida pelo ambiente de execuçãodescrito nesta seção denominada: localidade dos dados. Essa otimização busca diminuir aquantidade de dados trafegados na rede do cluster através da alocação de tarefas próximasaos seus dados, ou seja, suponha que uma dada máquina irá executar a função de Mape precise de um determinado arquivo para a computação a ser realizada, o ambientesabendo disso pode garantir que uma máquina onde o arquivo esteja gravado seja tambémonde a função será executada. Esse detalhe garante que o arquivo não precise ser lido etransportado pela rede.

2.3 Exemplos de aplicações

Nesta seção vamos exemplificar mais algumas aplicações sobre a ótica do modelo de pro-gramação MapReduce [6]:

Grep Distribuído: caso um padrão seja reconhecido em uma dada linha de texto afunção Map emite essa linha. Reduce apenas reemite o mesmo valor, servindoapenas para copiar o resultado intermediário para a saída final.

Contagem de frequência de acessos a URL: muito similar ao contador de palavras,nesse caso, Map se encarrega de emitir um par com as URLs e um valor 1, a partirdo processamento de um arquivo de log de requisições de páginas da Web. Reducedeve computar a soma das aparições das URLs.

Grafo reverso de links da Web: dada uma página da Web (fonte) com diversos linksem seu conteúdo textual, Map deve varrê-la para emitir pares compostos por link efonte. Reduce concatena as fontes que apontam para um determinado link gerandoum mapeamento entre link e a lista de fontes que apontam para ele.

Frequência de palavras em um host: um host pode conter muitos documentos e deseja-se saber as palavras mais frequentes dentro deles. Map gera para cada documentouma estrutura de dados contendo a palavra e sua frequência, no final possuirá umalista dessas estruturas chamada de vetor de termos. A saída de Map será um par ohost e esse vetor de termos. Reduce mescla os vetores de termos de cada documentopor meio da soma das frequências e descarta as palavras menos frequentes. No fi-nal, produz o vetor de termos final para um dado host que contêm a informaçãocompilada.

Índice inverso: esse programa é útil para saber em quais documentos algumas palavrasaparecem. Map emite para cada palavra que aparece no documento um identificador

Page 22: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 2. MAPREDUCE 22

único. Reduce se encarrega de ordenar os identificadores dos documentos e mesclá-los gerando uma lista de documentos para cada palavra. Facilmente, poderíamostambém monitorar o posicionamento das palavras.

Ordenação distribuída: a função de Map extrai a chave de cada registro e emite essasinformações. Como a computação intermédiaria entre as fase de Map e Reducese encarregam de ordenar os dados intermediários, Reduce pode apenas emitir aschaves e valores sem alterações.

Importante notar que neste trabalho de mestrado não focamos em uma aplicação espe-cífica mas em uma taxonomia de aplicações que as divide entre produção e pesquisa [5]. Asaplicações de produção tendem a ser mais urgentes, seus resultados precisam ser calcula-dos rapidamente, por isso possuem uma prioridade alta, porém sabe-se que são executadasrapidamente. As aplicações de pesquisa, por sua vez, são menos prioritárias e possuemuma execução mais demorada. Costuma-se exigir que os clusters consigam trabalhar comcargas heterogêneas, ou seja, constituídas por esses dois tipos de aplicações e é um desafioaos escalonadores se comportarem bem nessa situação.

Page 23: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Capítulo 3

Hadoop

Hadoop contém uma implementação de MapReduce disponível livremente através da li-cença Apache [25]. Seu código base é primariamente escrito em Java assim como asaplicações MapReduce que rodam sobre sua plataforma. Utilizando a biblioteca HadoopStreaming [40] que faz uma interface com outras linguagens de programação, o desenvol-vedor pode optar também por Python, Ruby e C++.

Hadoop foi promovido pela Apache a um projeto de nível significativo em 2008, massua história começa muito antes. Em 2002, Doug Cutting, criador do projeto ApacheNutch, o fez com o objetivo de dar uma solução de busca na Web mais democráticae aberta. O Nutch pode ser considerado o pai do Hadoop, pois esse projeto utilizavasistemas distribuídos como base para um rastreador e indexador das páginas da Web.Porém, em 2003 já era notável que a solução de Cutting não seria escalável. Nesse mesmoano, o Google publicou o artigo sobre seu sistema de arquivos — o Google File System(GFS) [9]. Uma solução similar a desse sistema de arquivos ajudaria o projeto Nutch aarmazenar a imensa quantidade de dados gerados pelo seu algoritmo de rastreamento dainternet. Em 2004, eles finalizaram a implementação do NDFS (Nutch Distributed FileSystem) que foi a base para o HDFS (Hadoop Distributed File System) do Hadoop [16].

Em 2005, depois de se inspirar novamente em um trabalho do Google, dessa vez arespeito de MapReduce [17], os desenvolvedores do Nutch criaram uma versão similarde computação dentro do projeto e portaram diversos de seus algoritmos para esse novosistema. Novamente, o ganho de escalabilidade e desempenho foi impressionante e decidiu-se por modulizar os componentes de computação MapReduce e sistemas de arquivos doNutch, criando-se as bases para o projeto Hadoop. Graças ao investimento do Yahoo noHadoop, que veio a contratar Doug Cutting e a prover um time e recursos para incrementare acelerar o desenvolvimento da solução, foi que o projeto ganhou ainda mais notoriedade.Prova disso é que além do Yahoo, em janeiro de 2008, empresas como Last.fm, Facebook eNew York Times já haviam divulgado casos de sucesso com o uso da plataforma. Destaqueespecial para o projeto do New York Times junto à Amazon EC2 para digitalizar o seuacervo [51].

Atualmente, Hadoop pode ser considerado o projeto de MapReduce com maior adoçãoe de maior sucesso dentro da comunidade de sistemas distribuídos, sendo utilizado pordezenas de empresas como a sua solução padrão para armazenamento e computação degrandes quantidades de dados. Isso pode ser notado pela quantidade de produtos que

23

Page 24: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 3. HADOOP 24

utilizam Hadoop de alguma maneira, existem distribuições feitas pela EMC, IBM, Mi-crosoft e Oracle, além de grandes empresas especializadas na plataforma como Cloudera,Hortonworks e MapR [19].

Neste capítulo falamos sobre a arquitetura do Hadoop (Seção 3.1), sobre seus princi-pais escalonadores (Capacity e Fair) (Seção 3.2) e sobre como esse sistema trabalha compreempção de tarefas (Seção 3.3).

3.1 Arquitetura

Figura 3.1: Execução de MapReduce (traduzido de [19])

Um arquivo de entrada previamente gravado no HDFS é utilizado em conjunto comfunções de Map e Reduce para formar o que chamamos de aplicação no Hadoop. Conformeilustrado na Figura 3.1, primeiramente, os mappers leem os dados de entrada do HDFS eproduzem sua saída (chave-valor). Então, cada reducer computa um intervalo de chavese seus respectivos valores, para isso é necessário que os reducers copiem a saída dosmappers diretamente do disco local de cada máquina que as processou. Essa fase decópia é denominada Shuffle. Após a fase de Reduce, múltiplas saídas são concatenadasem um arquivo que é gravado no HDFS.

A primeira proposta do time do Hadoop para possibilitar a execução de tarefas Map-Reduce foi dividir as máquinas do cluster dentre as seguintes responsabilidades: Job-Tracker, TaskTracker, NameNode e DataNode — os dois primeiros pertencem ao Map-Reduce e os dois últimos ao HDFS. Destacamos que uma arquitetura mestre-escravo foiproposta, tanto para o ambiente de execução (MapReduce), como para o sistema de ar-quivos distribuído (HDFS) — sendo os mestres JobTracker e NameNode responsáveis porcoordenar os escravos TaskTracker e DataNode, respectivamente. O principal problema

Page 25: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 3. HADOOP 25

Figura 3.2: Arquitetura clássica do Hadoop (traduzido de [50])

desse tipo de arquitetura é o ponto único de falha: o mestre. A presença de tal vulne-rabilidade é algo muito ruim, pois é difícil para o sistema se recuperar em caso de falhadesse mestre.

A arquitetura clássica do Hadoop é ilustrada na Figura 3.2. Nesse modelo, após oJobTracker dividir a aplicação em tarefas para distribuir seu processamento, ele cria ummapper por pedaço do arquivo de entrada e um número pré-configurado de tarefas deReduce. Tarefas são designadas para os nós baseados no tamanho de suas filas internas.Além disso, quando uma tarefa de Map é designada para um nó, preferencialmente sãoescolhidos aqueles que contenham os blocos dos arquivos de entrada, em seguida nósno mesmo rack que os dados de entrada, por fim no mesmo cluster e mesmo datacenter.Chama-se a esta otimização de localidade dos dados [6]. Os TaskTrackers além de realizara computação em si, enviam mensagens de heartbeat para o JobTracker regularmente.Essas mensagens transportam informações sobre a porcentagem de pedaços da entradaque já foram processados pelas tarefas locais (se alguma estiver sendo executada). Alémdisso, sinalizam quando uma tarefa finaliza e alertam o JobTracker sobre condições deerros. Respostas a heartbeats podem carregar novas tarefas para rodar no nó.

Uma mudança arquitetural mostrou-se necessária à medida que o Hadoop passou a seramplamente adotado em grandes clusters. Com mais de 4000 nós, o MapReduce mostrou-se não escalável [17]. O principal gargalo era o JobTracker e foi por isso que um grupo doYahoo trabalhou em um novo modelo arquitetural denominado YARN (ou MapReduce2), que significa “Yet Another Resource Negotiator”. A maior mudança no YARN foia divisão das responsabilidade do JobTracker entre duas novas entidades: o Resource-Manager e o ApplicationMaster. O primeiro gerencia os recursos do cluster, enquanto osegundo gerencia o ciclo de vida da aplicação. Além disso, introduziu-se o conceito de

Page 26: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 3. HADOOP 26

containers, divisão lógica de memória e CPU dos nós, utilizada pelo ResourceManagerna alocação de recursos. A responsabilidade de gerenciar os containers é de uma novaentidade denominada NodeManager.

Figura 3.3: Arquitetura do YARN (traduzido de [17])

A arquitetura do YARN é ilustrada na Figura 3.3, onde duas aplicações (representadaspelas cores preto e cinza) estão sendo executadas em um mesmo cluster. No YARN, aexecução de uma aplicação de MapReduce se inicia quando um cliente do cluster sub-mete uma aplicação ao ResourceManager, em seguida, o escalonador aloca um containerpara executar o ApplicationMaster que obtém a entrada necessária para sua execução.Ele então negocia com o ResourceManager por mais containers repassando informaçõessobre a localidade dos dados. Além disso, o ResourceManager também recebe dados arespeito da quantidade de memória e CPU necessários para a execução das tarefas paraserem utilizadas no escalonamento. Depois que o escalonador mapeia tarefas a recursos,o ApplicationMaster inicia os containers através de uma requisição ao NodeManager quebusca os dados necessários para realizar a execução propriamente dita da tarefa.

3.2 Escalonadores

Hadoop começou com uma abordagem bastante simplificada para seu escalonador, o cha-mado escalonador “First In First Out” (FIFO) fez parte da primeira solução que o sistemaadotou. Normalmente, utilizando-se da política FIFO, cada aplicação tem 100% do clus-ter dedicado a ela durante sua execução. Além disso, enquanto uma aplicação está sendoexecutada as outras devem aguardar em uma fila ordenada pela sua ordem de submissão.Claramente, esse escalonador não é ideal para casos com múltiplos usuários tendo acessoao mesmo tempo aos recursos computacionais do cluster. É muito comum que os clusterscom Hadoop tenham a necessidade de suportar configurações multiusuários.

Page 27: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 3. HADOOP 27

A primeira ideia para auxiliar a execução das aplicações em ambientes compartilhadosfoi a inserção de prioridades às aplicações do Hadoop. Assim, cada aplicação poderia rece-ber um parâmetro complementar no momento de sua submissão indicando qual seu nívelde prioridade. As possibilidades indo da mais prioritária até a menos prioritária eram:VERY_HIGH, HIGH, NORMAL, LOW e VERY_LOW. Mesmo assim, com o escalona-dor FIFO do Hadoop, preempção não é algo suportado. A falta desse mecanismo acarretana possibilidade de uma aplicação de baixíssima prioridade tomar para si todos os recur-sos do cluster fazendo com que uma aplicação de altíssima prioridade necessite aguardarseu término. Basta que a mais prioritária seja submetida após a menos prioritária.

Dois escalonadores surgiram no Hadoop para tratar clusters multiusuários, são eles oFair Scheduler e o Capacity Scheduler. Nas próximas seções vamos apresentar ambos commaiores detalhes.

3.2.1 Fair Scheduler

O principal objetivo do Fair Scheduler é fornecer a cada usuário uma fatia justa dos re-cursos do cluster, para isso se utiliza do conceito de pools [39]. Sendo assim, se em umapool que utilize uma distribuição Fair dos recursos, em dado momento uma aplicaçãoestiver sendo executada sozinha, ela deve estar consumindo 100% dos recursos disponí-veis, porém se duas estiverem sendo executadas, 50% dos recursos devem ser dedicados acada uma delas, e assim por diante. Suponha que 100% dos recursos estão destinados àaplicação A e então a aplicação B é submetida na mesma pool, a passagem dos recursosantes destinados a A para B é feita de maneira incremental, ou seja, à medida que tarefasde A vão terminando, o escalonador garante que recursos agora vagos sejam atribuídosàs tarefas de B. Esse processo continua até que a quantidade justa de recursos tenha sidodestinada a cada aplicação, ou seja, dentro de uma mesma pool não há preempção. Comessa distribuição, ambas estarão progredindo com o passar do tempo sem que uma neces-site aguardar o término da outra. Esse é o principal diferencial que o Fair Scheduler trazem relação ao FIFO Scheduler e é o que o torna uma opção para clusters multiusuários.

Figura 3.4: Divisão de recursos utilizando-se o Hadoop Fair Scheduler

A estrutura que auxilia a coordenação das aplicações pelo Fair Scheduler são suas poolse entre elas, inclusive suporta-se preempção. Cada usuário possui uma pool e os recursossão divididos igualmente dentre as pools que possuem aplicações em execução. Cadapool, por sua vez, pode dividir seus recursos dentre as aplicações que contêm utilizando-se de uma política Fair ou FIFO. Além disso, esse escalonador suporta preempção entrediferentes pools, então se uma pool estiver com menos recursos do que o justo para ela,

Page 28: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 3. HADOOP 28

tarefas de aplicações em outras pools serão mortas para garantir que a primeira recebaseus recursos.

A Figura 3.4 ilustra uma divisão de recursos do cluster onde dois usuários submete-ram 5 aplicações para execução. O primeiro usuário submeteu duas aplicações (A e B),enquanto o segundo submeteu três (C, D e E). Perceba que cada usuário ganha 50% dosrecursos do cluster que por sua vez são divididos com suas respectivas aplicações. Nofinal, teremos as aplicações A e B com 25% do total de recursos cada e C, D e E com16.6666% aproximadamente, caso Fair também esteja sendo utilizado para a distribuiçãodos recursos internos das pools.

3.2.2 Capacity Scheduler

Com o Capacity Scheduler a abordagem de escalonamento é um pouco diferente quandocomparada a do Fair Scheduler. Também é muito apropriado para ambientes multiusuá-rios e nele o cluster é dividido em filas [38]. Cada fila pode possuir filas filhas, formandouma árvore com configurações e características compartilhadas. Cada uma dessas filastem uma capacidade destinada a ela, ou seja, uma porcentagem dos recursos do cluster,podendo ser um intervalo de minímo e máximo percentual.

Até o momento o Capacity e o Fair Scheduler têm estruturas análogas e muito simila-res, no primeiro temos as filas enquanto no último as pools. Porém, nas filas do CapacityScheduler uma lógica FIFO com prioridades é utilizada para selecionar qual a próximaaplicação a ser executada. A Figura 3.5 ilustra esse comportamento.

Figura 3.5: Divisão de recursos com o Hadoop Capacity Scheduler

Esse escalonador é muito utilizado e é o padrão na versão estudada do Hadoop (2.4.1).É muito útil quando os usuários querem enxergar a sua fila de forma a simular um clustercompleto onde FIFO é a política de escalonamento. Esse é um cenário comum às orga-nizações que utilizam Hadoop [19]. Diferentemente do Fair Scheduler, não ocorre nessecaso o compartilhamento dos recursos internos à pool, cada fila é independente.

Page 29: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 3. HADOOP 29

3.3 Preempção de tarefas

Tipicamente se trabalha com dois tipos de aplicações no Hadoop, as de produção e as depesquisa. Essa heterogeneidade de prioridades criou a necessidade do suporte à preempçãonos escalonadores do Hadoop.

Na versão estudada do Hadoop (2.4.1), a preempção de uma tarefa corresponde afinalizar a sua execução imediatamente para liberar seus recursos para serem ocupadospor uma nova tarefa (de uma aplicação de maior pioridade). Nenhum tipo de estado ésalvo, por isso, todo o trabalho já realizado por aquela tarefa é perdido e quando ela voltara ser executada precisará ser refeito.

Em condições convencionais, o retrabalho realizado quando as tarefas de uma aplicaçãode menor prioridade retomarem suas execuções pode não significar grande parte do tempode processamento. Porém, em casos com tarefas de longa duração isso pode representarum problema grave. Para exemplicar isso, imagine um cluster onde uma aplicação deprioridade baixa está sendo executada e que ela seja composta de tarefas de longa duração.Agora, imagine que nesse mesmo cluster, aplicações de maior prioridade passem a sersubmetidas com frequência. Uma perda de recursos significativa poderá ser observada acada chegada de uma aplicação de maior prioridade, pois toda a computação realizadapelas tarefas da aplicação de menor prioridade que forem suspensas não será reaproveitada.Para piorar, como essas tarefas são longas, isso consistirá em um desperdício grande detempo gasto em computar dados que anteriomente já haviam sido processados.

Page 30: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Capítulo 4

Disco

Enquanto o Yahoo olhava para a implementação de MapReduce proposta pelo Google e in-vestia no Hadoop, outras empresas também resolveram investir no paradigma MapReducee criaram outros projetos de código aberto. Esse foi o caso da Nokia, que desenvolveu em2008 em seu centro de pesquisas em Palo Alto um projeto chamado Disco [13]. Trata-sede uma implementação simples e leve que também suporta a computação paralela sobregrandes quantidades de dados em clusters de máquinas convencionais. Quando dizemosque o Disco é simples isso na verdade é uma qualidade do sistema, pois é muito maisflexível e ao invés de tentar resolver todos os problemas relacionados a computação distri-buída internamente, ele delega muito disso à máquina virtual do Erlang. Veja a Tabela 4.1comparando o tamanho dos projetos Disco e Hadoop em termos de arquivos e linhas decódigo para entender melhor o quão mais fácil é implementar algo novo no Disco.

A ferramenta é muito utilizada em atividades como análise de logs, modelagem proba-bilística, mineração de dados e indexação de textos. O núcleo principal do Disco é escritoem Erlang, uma linguagem de programação funcional famosa pelas suas funcionalida-des associadas a sistemas distribuídos como imutabilidade, um modelo de concorrênciasimplificado e a facilidade na comunicação entre as máquinas do cluster que estiveremexecutando sua máquina virtual. Sobre essa sólida fundação, o Disco pode fornecer aoprogramador um ambiente que cuide de entraves técnicos da execução de aplicações Map-Reduce, como protocolos de comunicação, balanceamento de carga, escalonamento dastarefas e tolerância a falhas.

Os programadores, usualmente, utilizam Python para escrever suas aplicações. Lin-guagem simples e poderosa, com um ambiente rico de bibliotecas para computação ci-entífica, estatística e de análise de dados, muito apropriada para aplicações MapReduce.Além disso, o projeto almeja permitir que o programador que não se sinta confortávelcom Python também possa escrever aplicações na linguagem de sua preferência, para isso

Tabela 4.1: Comparação de tamanhos dos projetos Hadoop e DiscoHadoop Disco

As duas linguagens mais utilizadas Java e XML Erlang e PythonQuantidade de arquivos nessas duas linguagens 6.474 190Quantidade de linhas de código nessas duas linguagens 1.688.407 19.134

30

Page 31: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 4. DISCO 31

fornece uma abstração chamada worker para lidar com a comunicação de seu núcleo coma aplicação MapReduce.

Assim como o HDFS atua no Hadoop como seu sistema de arquivos distribuídos paraauxiliar o ambiente de execução com a persistência dos dados, o Disco possui seu equi-valente, o DDFS (Disco Distributed Filesystem). Esse sistema de arquivos se utilizado conceito de tags ao invés de diretórios para mapear a localização de seus arquivos etambém fornece alta disponibilidade e confiabilidade ao cluster.

Nas próximas seções vamos no aprofundar um pouco mais no Disco. Vamos analisarsua arquitetura (Seção 4.1), o protocolo do worker (Seção 4.2), o pipeline (Seção 4.3) eas políticas de escalonamento (Seção 4.4).

4.1 Arquitetura

A arquitetura do Disco estudada neste trabalho é a da versão 0.5.4. Nessa versão, exa-tamente da mesma maneira que o Hadoop clássico (antes do YARN), um padrão demestre-escravo é utilizado para organizar as máquinas do cluster. Nessa arquitetura omestre é responsável por duas atividades principais:

• inicialização de todos os componentes do sistema como o mecanismo de log, a in-terface Web e os workers que são executados nos nós escravos;

• escalonamento, monitoramento e alocação dos recursos para as tarefas de novasaplicações submetidas pelos clientes.

O mestre precisa de poucas configurações para inicializar seus escravos. O administra-dor do cluster apenas deve informar via interface adminstrativa quantos workers cada nóirá executar e quais os hostnames desses nós. O mais usual é configurar cada host parautilizar um worker por core de CPU que possua. Após essa configuração, o mestre usaos hostnames para inicializar um escravo único em cada nó escravo através da máquinavirtual do Erlang. Os nós escravos serão responsáveis por executar e monitorar a execuçãodas tarefas especificadas pelas aplicações submetidas pelo cliente. Além disso, os escra-vos são as unidades de armazenamento utilizadas pelo DDFS [30]. Os administradoresdo cluster podem adicionar e remover nós escravos em tempo real via a interface Webadministrativa do Disco.

A decisão de utilizar a máquina virtual do Erlang se mostra uma ótima escolha quandose observa a maneira como os nós interagem uns com os outros. As conexões entre os mes-tres e os escravos são monitoradas e mantidas pela infraestrutura de monitoramento doErlang. Como a linguagem e sua máquina virtual foram desenhadas especialmente paratrabalhar com sistemas distribuídos, existem funcionalidades muito úteis que proveem demaneira transparente o gerenciamento de comunicação, tolerância a falhas, alta disponi-bilidade e escalabilidade. Dessa maneira, quando um nó falha, o mestre é notificado epode rescalonar as tarefas deste nós em outro.

Um resumo da arquitetura mestre-escravo do Disco e a forma como a comunicaçãoentre os nós é feita são ilustrados pela Figura 4.1, em que quadrados cinzas denotam oscomponentes principais (mestre, escravos e workers).

Page 32: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 4. DISCO 32

Figura 4.1: Visão geral da arquitetura do Disco (traduzido de [13])

4.2 Protocolo do worker

Um dos objetivos da arquitetura do Disco é permitir que aplicações escritas em qualquerlinguagem sejam executáveis em seu ambiente MapReduce. Para atingir esse objetivo dedesign, existe um protocolo de comunicação entre os nós escravos do Disco e seus wor-kers. Hoje em dia, além de Python existem implementações do protocolo do worker emOCaml [52], Golang [36], LFE [48], e Haskell [41]. Isso dá aos programadores a possi-bilidade de utilizar a linguagem de programação com a qual se sintam mais confortáveisna hora de programar sua aplicação MapReduce. O protocolo se utiliza do descritor dearquivos standard error (stderr) para mensagem do worker para o nó escravo (caracteri-zado por estar executando a máquina virtual Erlang com Disco) e o descritor de arquivosstandard input (stdin) para mensagem do nó escravo ao worker. O fluxo de mensagens ésempre inicializado pelo worker e as mensagens principais do protocolo são:

WORKER: anuncia o início da execução;

TASK: requisita informações a respeito da tarefa;

INPUT: requisita as entradas para a tarefa;

INPUT_ERR: avisa que ocorreram falhas enquanto as entradas estavam sendo requi-sitadas;

MSG: utilizada para mensagens gerais que serão apresentadas na interface Web, porexemplo;

Page 33: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 4. DISCO 33

OUTPUT: indica que a saída será gravada;

DONE: o worker finalizou sua execução;

ERROR: falha com a entrada ou erro transitório;

FATAL: erro fatal;

PING: mensagem heartbeat (utilizada para garantir que o mestre está funcional e exe-cutando);

O seguinte diagrama, ilustrado pela Figura 4.2 mostra algumas dessas trocas de men-sagens para o início da execução de uma tarefa.

Figura 4.2: Troca de mensagens do protocolo Worker para execução de uma tarefa ([13])

4.3 Pipeline

O pipeline no Disco é uma abstração adicionada para permitir que o Disco execute apli-cações além de MapReduce, ou seja, que respeite outros paradigmas. Trata-se de umadivisão lógica dos estágios computacionais que definem uma aplicação em duas etapas:

Page 34: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 4. DISCO 34

política de agrupamento seguida de função de tranformação dos dados. Um pipeline éuma sequência desses estágios sendo a saída de um estágio a entrada do próximo comoilustra a Figura 4.3.

Figura 4.3: Pipeline do Disco: abstração baseada em uma sequência de estágios

Um estágio consiste de um conjunto de tarefas que executam a mesma computação,porém em diferentes entradas. Por exemplo, um estágio de Map consiste de um conjuntode tarefas Map, cada uma recebe uma entrada única, enquanto um estágio de Reducetipicamente consiste de uma única tarefa Reduce que processa todas as saídas de Map noestágio de Reduce.

Uma política de agrupamento é uma operação que especifica como as entradas paraum estágio devem ser divididas e agrupadas. O agrupamento de um conjunto de entradasé executado utilizando uma informação que o Disco permite ser adicionada a cada entradachamada de label.

As saídas geradas pelas tarefas em um estágio são as entradas para as tarefas dopróximo estágio na sequência. Sempre que uma tarefa gera uma saída ela adiciona umlabel a ela. Utilizando-se dessa informação a política de agrupamento sabe como operarsobre as saídas para torná-las a entrada do próximo estágio.

Em outras palavras, um pipeline é uma sequência de estágios e cada estágio executaum agrupamento de suas entradas demarcadas com um label gerando um ou mais grupos.Esses grupos, por sua vez, são as entradas a serem processadas por uma tarefa cujadefinição também compõe o próximo estágio no pipeline.

Existem cinco diferentes políticas de agrupamento: split, group_node, group_label,group_node_label e group_all. Cada uma dessas políticas recebe um conjunto de entra-das e as agrupa utilizando-se de duas informações importantes: os labels e a localidade donó do cluster onde a entrada está armazenada. Vamos detalhar cada uma dessa políticasiniciando nossa explicação por aquelas que não dependem da informação referente ao nódo cluster onde a entrada está armazenada.

4.3.1 Split

A operação de split simplesmente recebe a entrada e independentemente do label a repassaa tarefa para processá-la. A Figura 4.4 ilustra tal comportamento, onde os retângulos

Page 35: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 4. DISCO 35

maiores com bordas pretas representam os nós do cluster onde a entrada está armazenada,enquanto as cores das entradas representam seus labels (cinza claro, cinza escuro e preto,por exemplo) e os círculos representam as tarefas que irão utilizar as entradas.

Figura 4.4: Política de agrupamento: split ([54])

4.3.2 Group_all

A política group_all coloca todas as entradas em um único grupo, independentemente deseus labels ou nós onde estão armazenadas. A Figura 4.5 ilustra essa técnica.

Figura 4.5: Política de agrupamento: group_all ([54])

Page 36: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 4. DISCO 36

4.3.3 Group_label

A política group_label, por sua vez, leva em consideração os labels das entradas e nãoos nós onde estão armazenadas para agrupá-las. Cada label diferente se torna um grupoque conterá as saídas correspondentes como mostra a Figura 4.6

Figura 4.6: Política de agrupamento: group_label ([54])

4.3.4 Group_label_node

A política group_label_node ilustrada na Figura 4.7 tem basicamente o comportamentoda política group_label apresentada anteriormente, mas levando em consideração a loca-lidade dos dados. Nesse caso, o agrupamento ocorre por label porém internamente emcada nó. Perceba que um estágio utilizando-se de tal política pode ser útil em uma fase in-termediária entre as fases de Map e Reduce para condensar os dados antes de transferi-lospela rede.

Figura 4.7: Política de agrupamento: group_label_node ([54])

Page 37: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 4. DISCO 37

4.3.5 Group_node

Essa política equivale a group_all, porém realizada internamente em cada nó também,assim como a anterior. A Figura 4.8 a ilustra.

Figura 4.8: Política de agrupamento: group_node ([54])

4.3.6 MapReduce como um pipeline

Por fim, vamos ver como expressar uma computação MapReduce segundo essa lógica depipeline do Disco.

O estágio de Map pode ser representado pelo agrupamento do tipo split, onde cadaentrada é repassada diretamente para a função (no caso, à função Map). O label dadopela função da Map as suas saída deve ser calculado de maneira a distribuir os gruposde chaves entre os reducers pois o estágio de Reduce, por sua vez, pode ser representadocomo um agrupamento do tipo group_label. Nesse tipo de agrupamento, a informaçãoreferente ao label é utilizada para agrupar as entradas antes de repassá-las à função (nocaso a função Reduce).

Note que a quantidade de labels nesse caso será igual à quantidade de reducers. Notetambém que a operação de agrupamento group_label é o que muitas vezes chamamos deShuffle no MapReduce. A quantidade de dados trafegados pode ser otimizada utilizando-se de mais estágios intermediários entre o estágio de Map e de Reduce, como o group_-label_node, por exemplo.

A Figura 4.9 ilustra essa computação de MapReduce descrito dentro das regras dopipeline.

4.4 Políticas de escalonamento

O escalonador do Disco possui um mecanismo bem definido e facilmente adaptável depolíticas de escalonamento. Duas políticas de escalonamento são compiladas junto com ocódigo do Disco por padrão: FIFO e Fair.

A política FIFO trabalha com uma fila simples onde a primeira aplicação será prova-velmente a única em execução em um dado momento. Ela se apossa de todos os recursos

Page 38: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 4. DISCO 38

Figura 4.9: MapReduce segundo as regras do pipeline

disponíveis do cluster e executa até seu término enquanto as outras devem aguardar.Com a política FIFO, quando existem recursos disponíveis a primeira aplicação da fila ésempre priorizada para recebê-los. Caso ela não possua mais tarefas prontas para seremexecutadas, a próxima aplicação da fila é selecionada pelo escalonador para ocupar osrecursos com suas tarefas.

Por sua vez, a política Fair tenta dividir o cluster em fatias iguais, fornecendo às apli-cações que estiverem sendo executadas a mesma quantidade de recursos em média com opassar do tempo. Para implementar tal comportamento, o escalonador Fair circula dentreas aplicações em execução quando existirem recursos disponíveis no cluster. Ele tambémmantêm um cálculo de déficit, ou seja, de quanto tempo uma aplicação foi injustiçadacom menos recursos do que deveriam ter sido destinados a ela porque outras estavam comuma fatia maior do que deveriam. O objetivo de manter tal cálculo é compensar as apli-cações que ficarem desfavorecidas com o passar do tempo. A política Fair padrão sempretenta não possuir recursos inutilizados enquanto existirem tarefas a serem escalonadas.Um demonstrativo dessa divisão pode ser visto na Figura 4.10.

Ambas as políticas possuem problemas quando tentam trabalhar com um carga detrabalho composta de aplicações de pesquisa e de produção. FIFO não prioriza a divisãodos recursos do cluster entre as aplicações em execução, dessa forma, se a aplicação deprodução for submetida após a de pesquisa teremos problemas. A aplicação de produçãosó poderá executar depois de um longo período de espera, porque a aplicação de pesquisase apossa de todos os recursos do cluster pelo tempo que precisar, porém a aplicação deprodução possui uma prioridade maior, por isso essa distribuição dos recursos não é ideal.Por outro lado, Fair também não se sai muito melhor com tal carga de trabalho. Issoporque, Fair não possui o conceito de preempção das tarefas em execução, dessa forma, sea aplicação de pesquisa se apossa de todos os recursos do cluster e possui tarefas longas,somente após o término dessas tarefas é que a aplicação de produção poderá iniciar suaexecução.

Outro problema pode ocorrer no escalonador Fair do Disco entre transições de fases naaplicação de produção. Nesse contexto, chamamos de transição de fases o estágio entre ofim da fase de Map e início da fase de Shuffle ou entre o fim da fase de Shuffle e o começo da

Page 39: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 4. DISCO 39

Figura 4.10: Comparação da distribuição dos recursos do cluster para as políticas FIFOe Fair do Disco

fase de Reduce. Esse período é especial durante a execução da aplicação porque o mestreestá calculando a quantidade de trabalho remanescente para a aplicação de produção.Caso não seja monitorado que a aplicação se encontra em uma transição de fases, durantetal período pode ocorrer de o escalonador se perder durante a distribuição dos recursos efornecê-los novamente à aplicação de pesquisa, já que ele ainda não terminou de calculara quantidade de trabalho remanescente para a aplicação de produção e por isso consideraque o melhor é priorizar aquela que ele já sabe que possui trabalho remanescente (no casoa de pesquisa). Como a aplicação de pesquisa pode possuir tarefas muito longas, ela podeacabar por travar os recursos por um longo tempo para si, novamente impedindo que aaplicação de produção execute e finalize rapidamente sua execução prioritária.

Neste trabalho propomos uma nova política de escalonamento que utiliza preempçãoe atraso no escalonamento durante transições de fases para possibilitar a rápida execuçãode aplicações de produção nesse cenário de carga heterogênea do cluster.

Page 40: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Capítulo 5

Mecanismos Auxiliares aosEscalonadores

Os escalonadores de MapReduce se valem de diversos mecanismos auxiliares para ajudarna tomada da decisão de escalonamento.

Nas próximas seções, vamos descrever em detalhes alguns mecanismos já relatados poroutros pesquisadores que auxiliam os escalonadores de MapReduce a tomarem melhoresdecisões de distribuição das tarefas junto aos recursos do cluster. Nesse caso, melhoresdecisões podem ser aquelas que acelerem a execução de todas as aplicações ou que le-vem em conta a prioridade entre as aplicações para acelarar preferencialmente aquelasmais prioritárias. Além disso, para cada mecanismo descrito neste capítulo vamos comen-tar brevemente em qual dos frameworks MapReduce estudados nesta dissertação têm-serelatos de que o mecanismo foi implementado.

Neste capítulo, começaremos com o mecanismo de localidade de dados (Seção 5.1) ede tarefas especulativas (Seção 5.2), os dois mais populares e já descritos pelo Googleno trabalho inicial sobre MapReduce [6]. Depois detalhamos os I-Files (Seção 5.3) e omecanismo de preempção e checkpointing (Seção 5.4), assuntos co-relacionados, já queesses arquivos são estruturas de dados que auxiliam os serviços de checkpointing. Outrostrabalhos, não menos importantes, mas para os quais não nos aprofundamos tanto fechamo capítulo (Seção 5.5).

5.1 Localidade de dados

Os três frameworks MapReduce que estudamos neste trabalho utilizam um mecanismorelacionado a localidade dos dados no cluster onde o ambiente de execução MapReduceestá configurado para auxiliar no escalonamento das tarefas. A proposta desse mecanismovem do artigo original do MapReduce [6] com o objetivo de reduzir a utilização de bandade rede, que é o recurso mais crítico para um cluster.

O conceito de localidade de dados está relacionado à proximidade dos dados. Nãose trata de uma proximidade física essencialmente, mas sim de uma proximidade lógica.Por exemplo, um arquivo consegue ser mais rapidamente acessado por máquinas quecompartilhem o mesmo switch de rede com o computador que o hospeda.

40

Page 41: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 5. MECANISMOS AUXILIARES AOS ESCALONADORES 41

Para fornecer suporte ao escalonador com informações a respeito da localidade dedados, o papel do sistema de arquivos distribuído é fundamental, pois é ele quem sabeonde os dados estão efetivamente armazenados nos nós do cluster. É interessante notar queuma das funcionalidades que auxiliam o sistema de arquivos distribuído em caso de falhas,também favorece a localidade dos dados: a replicação automática dos arquivos. Paragarantir que os dados estejam disponíveis, acessíveis e íntegros, todo arquivo é copiado esuas réplicas são gravadas em máquinas distintas no mesmo cluster. Normalmente, pelomenos três cópias são armazenadas. Dessa maneira, as chances do arquivo necessáriopara a execução de uma tarefa estar logicamente próximo a uma máquina apta a recebera responsabilidade de executar essa mesma tarefa aumentam.

A Figura 5.1 ilustra o conceito de replicação de dados para um caso onde dois arquivos,A e B estão sendo salvos no cluster. Cada arquivo é replicado três vezes no exemplo dessafigura. Essa replicação permitiria que a execução de uma tarefa que dependa do arquivoA seja muito rápida nos nós 1, 2 e 3; o equivalente ocorre para uma tarefa que dependade B nos nós 2, 3 e 4. Caso não seja possível escalonar uma tarefa que dependa de A nosnós 1, 2 e 3 a segunda melhor opção seria o nó 4, pois ele está no mesmo rack do nó 3, epor esse motivo a requisição pelo arquivo seria mais rápida do que se fosse feita a um nósque não estivesse no mesmo rack. Cenário semelhante ocorre para o caso de uma tarefaque dependa do arquivo B e não possa ser escalonada em 2, 3 ou 4, a segunda melhoropção seria o nó 1. Somente em último caso, tarefas que dependam de A ou de B seriamescalonadas em algum nó do Rack X, pois ele não está próximo a esses arquivos e de suasréplicas.

Durante a fase de Map uma das atividades mais custosas do ponto de vista de consumode largura de banda ocorre quando uma tarefa é escalonada em um nó que não possua osdados necessários a sua execução. Isso acontece porque para iniciar a execução do Map,o nó deverá requisitar ao sistema de arquivos distribuído um dado arquivo de entrada,e os dados desse arquivo deverão trafegar pela rede até chegarem à máquina que seráresponsável pela sua execução. Agora imagine isso para uma aplicação com centenas denós responsáveis pela execução de Map. Além disso, é importante notar que a fase deMap normalmente é a que consome a maior parte do tempo de execução de uma aplicaçãoMapReduce. Por esses motivos, é tão importante que o escalonador possua informaçõessobre a localização dos dados que serão necessários para a computação que está prestes ainiciar antes de tomar sua decisão de escalonamento.

O MapReduce do Google, Hadoop e o Disco implementaram esse mecanismo de ma-neira a utilizar-se da distância lógica dos dados durante o escalonamento de tarefas paraexecutar de maneira mais rápida a computação de aplicações MapReduce.

5.2 Tarefas especulativas

Quando se trabalha com centenas e às vezes milhares de máquinas em um cluster —como é comum em ambientes que estão executando MapReduce — uma série de fatorespodem gerar comportamentos fora do padrão em algumas das máquinas fazendo com otempo que as tarefas levem para executar quando escalonadas em uma dessas máquinas

Page 42: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 5. MECANISMOS AUXILIARES AOS ESCALONADORES 42

Nó 1

Arquivo A

Nó 2

Arquivo A Arquivo B

Rack I

Nó 3

Arquivo A

Nó 4

Arquivo B

Rack II

Arquivo B

Nó Y

Nó Z

Rack X…

Figura 5.1: Replicação de dados em um sistema de arquivos distribuído e sua relação como mecanismo de localicade dos dados

seja prolongado. Chamadas em diversos trabalhos de stragglers, diversos fatores podemtornar essas máquinas mais lentas:

• Falhas de hardware, relacionadas à rede ou ao disco, por exemplo;

• Competição por recursos com outras tarefas escalonadas no mesmo nó;

• Competição por recursos com outras máquinas virtuais, em ambientes virtualizados.

A presença de stragglers em um cluster pode afetar significativamente o tempo deexecução das aplicações e por isso a maioria dos frameworks MapReduce oferece suporteao que chamamos de execução especulativa (backup tasks no caso do MapReduce propostopelo Google [6]). Trata-se de um mecanismo no qual alguma métrica sobre as tarefas emexecução é mantida para que por meio de comparação defina-se se alguma das tarefas estáexecutando com desempenho abaixo do esperado. Caso alguma tarefa nessas condiçõesseja encontrada, o escalonador se encarrega de agendar a execução de uma cópia da mesmaem um outro nó. O escalonador passa então a monitorar a execução de ambas e somenteo resultado da que terminar primeiro será aproveitado ao final da computação.

O Google MapReduce realiza a execução especulativa das últimas tarefas que conti-nuam a ser executadas quando uma aplicação está quase terminando. Após uma série de

Page 43: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 5. MECANISMOS AUXILIARES AOS ESCALONADORES 43

calibrações, os pesquisadores responsáveis pelo projeto garantiram que esse mecanismonão aumente muito a utilização de recursos computacionais. Além disso, notaram umamelhoria significativa no tempo de execução médio das aplicações com essa abordagem:com esse mecanismo desativado um programa de ordenação é executado com 44% a maisde tempo quando comparado ao mesmo programa com esse mecanismo ativado.

O Hadoop, por sua vez, possui um mecanismo para detecção de stragglers e execuçãode tarefas especulativas baseado na comparação do progresso das tarefas de uma mesmaaplicação. Sendo assim, sempre que uma tarefa está com seu progresso abaixo da médiadas outras tarefas da mesma aplicação por um valor constante, escalona-se a execuçãoespeculativa da mesma. A Figura 5.2 contém um exemplo de tal cenário. Nessa ilustra-ção, as Tarefas A, B e C estão sendo executadas em paralelo. Então, no tempo T1 dafigura, o escalonador detecta que o progresso da Tarefa A está muito abaixo do progressomédio e decide por escalonar uma cópia de A (Tarefa A’, também conhecida como tarefaespeculativa). No tempo T2, o escalonador observa que a tarefa especulativa terminouantes de A e utiliza então o resultado da cópia. Por fim, ele aborta a execução da tarefaoriginal, já que caso ela terminasse chegaria no mesmo resultado que já foi armazenado.

Figura 5.2: Exemplo de execução de tarefas especulativas para o Hadoop

Essa estratégia funciona muito bem em ambientes homogêneos, ou seja, onde todas asmáquinas do cluster possuem a mesma configuração. Atualmente, grande parte dos clus-ters que executam Hadoop não podem garantir a homogeneidade. Veja o caso da AmazonEC2 [34], por exemplo, que é um ambiente muito utilizado e naturalmente heterogêneo.Pensando em ambientes heterogêneos, outras abordagens surgiram para auxiliar os esca-lonadores a decidirem quando a execução especulativa de uma dada tarefa é interessante.Vamos descrever três dessas abordagens: LATE [21], MCP [3] e Mantri [1].

LATE significa Longest Approximate Time to End é uma proposta para o Hadoopna qual não se utiliza mais um valor constante para detectar se o progresso de umatarefa está abaixo da média. Esse trabalho propõe utilizar uma informação de tempoestimado para a conclusão das tarefas. Quando uma aplicação possui poucos mappersou reducers, a tarefa lenta, ou seja, que nesse momento ainda estiver sendo executada,com o maior tempo restante para a conclusão é selecionada e escalona-se uma cópia da

Page 44: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 5. MECANISMOS AUXILIARES AOS ESCALONADORES 44

mesma de forma especulativa. A taxa de progresso de uma dada tarefa é calculada pelarazão entre o progresso da tarefa e o tempo de execução. Com essa informação estima-seo tempo restante para a conclusão como a razão entre o progresso remanescente e a taxade progresso.

MCP significa Maximum Cost Performance. Esse trabalho pode ser considerado umaevoluação do LATE. A principal diferença está no modelo de análise da forma comoas tarefas progridem. Enquanto LATE considera uma evolução das tarefas em ritmoconstante, o MCP usa um conjunto de outras estatísticas. Para uma tarefa se escalonadade maneira especulativa ela deve atender as seguintes condições:

• Ela já foi executada por um tempo mínimo (speculative lag);

• A taxa de progresso e a estimativa de evolução estão baixas;

• Vale a pena do ponto de vista de tempo total de execução criar a tarefa especulativa,visto que em um cluster com poucas tarefas em execução pode valer a pena, enquantoem um cluster super atarefado pode não valer;

• A estimativa de tempo de execução remanescente é maior do que a previsão detempo para término da tarefa especulativa no nó disponível para sua execução;

• Ela possui o tempo de execução remanescente maior do que o de todas as tarefasque satisfazem as condições anteriores.

O conceito de tarefas especulativas existe no framework de MapReduce do Google etambém no Hadoop. As pesquisas e artigos sobre LATE e MCP foram feitas sobre oHadoop. Para o Disco, não tivemos conhecimento de nenhuma otimização relacionadaa tarefas especulativas. Existe uma outra pesquisa interessante que vale a pena descre-vermos rapidamente sobre um framework MapReduce que não iremos nos aprofundar: oDryad [11] da Microsoft.

Mantri está para o framework Dryad assim como o LATE está para o Hadoop. Ouseja, da mesma maneira que o LATE, o Mantri propõe uma forma de se detectar stragglersbaseada no tempo restante para o término das tarefas. A diferença entre os mecanismosestá na forma como o tempo restante é calculado. Para o Mantri, tempo restante é a razãoentre a quantidade de dados restantes para serem processados e a latência do processo.Essa última é calculada como a razão entre a quantidade de dados já processados e otempo de execução da tarefa.

5.3 I-Files

As implementações convencionais dos ambientes de execução de aplicações MapReducecostumam armazenar os dados intermediários localmente. Chamamos de dados interme-diários aqueles produzidos como resultado da execução da função de Map e que posteri-ormente serão consumidos pelos nós que irão executar Reduce. Sendo assim, quando aexecução domapper finaliza, seu resultado é gravado no disco local da máquina que foi res-ponsável pela computação. Os I-Files são uma proposta dos pesquisadores do Yahoo [15]

Page 45: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 5. MECANISMOS AUXILIARES AOS ESCALONADORES 45

para atacar alguns dos problemas que tal abordagem possui, como por exemplo lenti-dão na leitura e perda de informação caso o disco da máquina responsável pelo arquivointermediário fique inacessível.

I-Files nada mais são do que um formato simples para os arquivos utilizados no arma-zenamento dos dados intermediários das aplicações MapReduce que privilegia não mais agravação em discos locais, mas sim no sistema de arquivos distribuído. Foi implementadoprimeiramente para o KFS (Kosmos distributed filesystem) [47] e depois para o HDFS doHadoop.

Normalmente, para se trabalhar com dados intermediários existem dois tipos de custosprincipais: os de leitura e escrita dos dados e os de transferência desses dados pela rede.Os I-Files auxiliam na melhoria do desempenho da leitura e escrita dos dados por meioda diminuição da quantidade de seeks. Esse mecanismo foi implementado como umaextensão do KFS inicialmente suportando a escrita e leitura dos dados em lote, ou seja,por múltiplos mappers e reducers. Além disso, esse formato propõe a ordenação dosdados e a geração de um índice durante a escrita para auxiliar na busca por informaçõesdepois. Tanto a ordenação como o índice auxiliam na diminuição dos seeks, pois fazemcom que os dados interessantes ao reducer estejam próximos fisicamente no disco e sejamrapidamente encontrados. Isso faz com que cada seek seja melhor aproveitado, ou seja,mais dados sejam lidos por vez.

Tradicionalmente, a fase de Reduce só pode iniciar quando todos os mappers termi-narem sua computação, pois a saída de todos é necessária aos reducers para que cadaum realize a ordenação e obtenha as chaves à ele endereçadas. Como se tem um únicoI-File com os dados já devidamente organizados por reducer, ele não precisa mais obter eaguardar a saída de todos os mappers antes de iniciar sua computação.

As Figuras 5.3 e 5.4 ilustram o funcionamento dos I-Files. Perceba a mudança comrelação ao armazenamento das saídas dos mappers, como mostra a Figura 5.3, sem I-Filesno sistema de arquivos distribuído a saída é mantida no disco local do mapper. Já com I-File, como mostra a Figura 5.4, a saída é armazenada no sistemas de arquivos distríbuidojá ordenada e com índices para auxiliar a busca. Além disso, perceba que pode-se manterum canal aberto entre o reducer e o I-File para que a medida que as saídas vão sendoproduzidas já possam ir sendo consumidas pelo reducer correspondente, no caso padrão(sem I-File), tal processo não é possível e cada reducer é responsável por contatar osmappers a fim de que crie-se um canal para transferência das saídas direcionadas a ele.

5.4 Preempção e checkpointing

Tipicamente se trabalha com dois tipos de aplicação em clusters MapReduce, as de pro-dução e as de pesquisa [5]. Essas aplicações têm características distintas, normalmente asde produção tendem a ter maior prioridade e por isso devem ser escalonados assim quesubmetidas e devem ter grande parte dos recursos do cluster dedicados a elas para seremexecutadas mais rapidamente. Por outro lado, as aplicações de pesquisa apesar de teremprioridade menor não podem sofrer por falta de recursos a ponto de não conseguirem serexecutadas por muito tempo (starvation). Nos datacenters do Google, as aplicações de

Page 46: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 5. MECANISMOS AUXILIARES AOS ESCALONADORES 46

Figura 5.3: Armazenamento e transporte dos dados intermediários sem a presença deI-File no sistema de arquivos distribuídos

produção correspondem a 30% do total [4]. Essa heterogeneidade criou a necessidade dosuporte à preempção em diversos escalonadores de MapReduce.

Em condições convencionais, o retrabalho realizado quando as tarefas de uma aplicaçãode menor prioridade retomarem suas execuções pode não significar grande parte do tempode processamento. Porém, em casos com tarefas de longa duração isso pode representar umproblema grave. Para exemplicar, imagine um cluster onde uma aplicação de prioridadebaixa está sendo executada e que ela seja composta de tarefas de longa duração. Agora,imagine que nesse mesmo cluster, aplicações de maior prioridade passem a ser submetidascom frequência. Uma perda de recursos significativa poderá ser observada a cada chegadade uma aplicação de maior prioridade, pois toda a computação realizada pelas tarefas daaplicação de menor prioridade que forem suspensas não será reaproveitada. Para piorar,como essas tarefas podem ser longas, isso ocasionará um grande desperdício de tempo quefoi gasto para computar dados que anteriomente já haviam sido processados.

As Figuras 5.5 e 5.6 nos ajudam a observar tal fenômeno. Elas ilustram o caso deuma aplicação A menos prioritária e composta de duas tarefas e uma aplicação B maisprioritária e composta de uma única tarefa. No cluster hipotético das figuras existemapenas duas unidades computacionais para execução de tarefas em paralelo. No tempo

Page 47: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 5. MECANISMOS AUXILIARES AOS ESCALONADORES 47

Figura 5.4: Armazenamento e transporte dos dados intermediários com a presença deI-File no sistema de arquivos distribuídos

T1, a aplicação B é submetida ao cluster e por ser mais prioritária acaba por demandarque alguma tarefa de A sofra preempção para que uma unidade computacional possaser dedicada à ela. No tempo T2, a tarefa de A que não sofreu preempção termina suaexecução liberando a unidade computacional para alguma tarefa de outra aplicação queesteja aguardando para ser executada, no caso a tarefa de A que anteriormente sofreupreempção. É após esse tempo T2 que as figuras ilustram a diferença entre se ter ummecanismo de checkpointing ou não. A reexecuçaão da tarefa 1 da aplicação A em umcenário com checkpointing (Figura 5.6) é mais rápida do que a mesma tarefa em umcenário sem checkpointing (Figura 5.5). Isso ocorre porque quando existe um mecanismode checkpointing a aplicação é capaz de retomar a computação do ponto em que parouatravés da gravação e posterior leitura de informações de estado da tarefa. Por outrolado, sem tal mecanismo, só resta à aplicação refazer toda a computação já executadaanteriormente pela tarefa através de uma reinicialização. É importante notar que todosos frameworks de MapReduce estudados neste trabalho possuem pelo menos a capacidadede reinicializar tarefas.

A capacidade de reinicializar tarefas em frameworks MapReduce é essencial. Issoocorre porque mesmo em casos na qual preempção e checkpointing não sejam suportados

Page 48: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 5. MECANISMOS AUXILIARES AOS ESCALONADORES 48

Figura 5.5: Preempção sem checkpointing

Figura 5.6: Preempção com checkpointing

pelo ambiente de execução essa capacidade é a base da tolerância a falhas do MapReduce.Podemos exemplificar dois tipos de falhas na qual essa capacidade é importante: falhade rede que evite que um nó mestre possa entrar em contato com um nó escravo e falhade disco de um nó escravo que impossibilita a computação da tarefa ou a degrade demaneira expressiva. No caso da falha de rede, é necessário que o mestre reinicialize aexecução da tarefa em um nó comunicável, pois um nó incomunicável não conseguiriarepassar informações a respeito da sua computação aos interessados. No caso da falha dedisco, como vimos na Seção 5.2, uma cópia da mesma tarefa é inicializada em um outronó, note que o mecanismo para possibilitar isso é o mesmo que permite que uma tarefaseja reinicializada no caso de uma preempção.

A preempção no ambientes de execução MapReduce deve ser analisada pelo escalona-dor em duas etapas:

1. Escolher uma ou mais aplicações dentre as que estão em execução para ter parte deseus recursos liberados;

2. Escolher tarefas dentre as aplicações selecionadas na etapa anterior para efetiva-mente sofrerem a preempção.

Cada uma dessas etapas, envolve um conjunto diferente de dados que o escalonador

Page 49: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 5. MECANISMOS AUXILIARES AOS ESCALONADORES 49

deve coletar e utilizar. Para a primeira etapa, o escalonador deve estar ciente dos per-centuais de recursos que cada aplicação está ocupando e no caso de uma política que sepreocupe em ser justa se essa quantidade é desproporcional para com as outras aplicaçõesem execução. Para a segunda etapa, o escalonador deve estimar o progresso de cada tarefaseja por meio de um percentual de completude, tempo em execução ou tempo restantepara execução.

Na versão 2.4.1 do Hadoop, que foi aquela que estudamos nesta dissertação, a pre-empção de uma tarefa corresponde a abortar a sua execução imediatamente para liberarseus recursos para serem ocupados por novas tarefas (pertencentes a uma aplicação demaior pioridade). Nenhum tipo de estado é salvo (checkpointing), por isso, todo o tra-balho já realizado por aquela tarefa é perdido e precisará ser refeito quando ela voltar aser executada. Um mecanismo de preempção e checkpointing do YARN já foi propostoe testado [17]. Ele trabalha com o Capacity Scheduler (Seção 3.2.2) do Hadoop, umaversão mais robusta do Fair Scheduler (Seção 3.2.1) baseado em ser justo com as aplica-ções dentro da mesma fila enquanto cada uma dessas filas possui uma porcentagem dosrecursos do cluster dedicada a si. Também preserva o trabalho das tarefas que vão sofrerpreempção com um serviço de checkpointing. Finalmente, ele tenta escolher as tarefasque progrediram pouco para sofrerem preempção. Essas mudanças ainda são trabalhosem andamento no código base do Hadoop, como pode ser visto no sistema de controle dealterações JIRA utilizado pelo projeto [55, 58]. Logo, mesmo tais funcionalidades tendosido testadas e publicadas em um artigo [17], no momento da escrita desta dissertação,essas mudanças importantes não estão disponíveis para a maioria dos usuários do Hadoop.

O Disco, por sua vez, na versão que estudamos (0.5.4) não possui nenhum mecanismopara preempção de tarefas ou checkpointing integrado a seu código fonte.

5.5 Outros trabalhos relacionados

A ideia de fazer com a decisão de escalonamento seja postergada por um curto períodode tempo impedindo que uma decisão possa provar-se não ideal já foi explorada tambémno Hadoop [20]. Isso é similar ao que nosso trabalho propõe durante as transições deestágio nas aplicações do Disco. Diferentemente da nossa decisão de escalonamento queaguarda para ter mais informações sobre a quantidade de trabalho remanescente, ou seja,a quantidade de tarefas ainda a ser executada, o trabalho citado prefere aguardar quealgumas tarefas completem e assim possa utilizar os recursos que vierem a ser liberadospara alocar às outras tarefas que poderão de beneficiar da localidade dos dados parareceber as entradas necessárias à suas computações. Mesmo o Disco tendo implementadoo conceito de localidade dos dados para auxiliar em melhores políticas de escalonamento,nós decidimos não olhar para essa informações antes de tomar a decisão de preempçãopor ora. Outra informação interessante que melhora o escalonamento do Hadoop é aestimativa de quanto tempo falta para o término das aplicações antes de decidir se umatarefa especulativa poderia melhorar seu desempenho [21]. Isso é interessante para oHadoop, mas não para o Disco, pois ele não trabalha com o conceito de tarefa especulativa.Nenhum dos trabalhos citados nesse parágrafo suporta prempção como o nosso.

Page 50: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 5. MECANISMOS AUXILIARES AOS ESCALONADORES 50

O problema que observamos com o fato do escalonador alocar recursos à tarefas comlongos tempos de duração que acabam por reter tais recursos de maneira gulosa tambémocorre no Hadoop [18]. Nesse trabalho utiliza-se preempção de tarefas durante a fase deReduce para ajudar a resolver tal problema mesmo após uma decisão de escalonamentoque se prove ruim. Nosso trabalho demonstra que esse problema pode ocorrer no Discotambém e introduzimos um mecanismo de preempção e um pequeno atraso na decisãode escalonamento durante fases transitórias para melhorar o desempenho das aplicaçõesde produção. Porém, como consequência acabamos por subutilizar o cluster durante umpequeno período de tempo.

Finalmente, o problema da carga de trabalho heterogênea e suas consequências ruinspara as políticas de escalonamento também ocorrem no Hadoop. Existem trabalhos queabordam esse problema caracterizado pela competição por recursos entre aplicações deprodução e de pesquisa, com diferentes tempos de execução e prioridades. Uma técnicadenominada Global Preemption (GP) [4] age principalmente sob a decisão de quais ta-refas devem sofrer preempção do escalonador. Na implementação padrão do Hadoop,escolhem-se as tarefas mais recentes, que ocasionam que suas aplicações ultrapassem aquantidade justa de recursos que devam ser dedicados a elas, liberando recursos para astarefas da aplicação que acabe de ser submetida. GP propõe não se checar as tarefasde cada aplicação separadamente, mas sim checar cada tarefa que esteja em execução nocluster, independente de qual aplicação ela pertença, e disparar a preempção das maisrecentes, mesmo que a justiça seja perdida durante o processo por um curto período.Nosso trabalho prefere fazer como a implementação padrão já que ainda não checamos otempo de execução das tarefas antes de optar pela preempção das mesmas.

Todos os trabalhos relacionados descritos aqui foram implementados para o Hadoop.Nosso trabalho estudou alguns dos mecanismos interessantes do Hadoop e os implementouem um sistema menos pesquisado: o Disco.

Page 51: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Capítulo 6

Contribuições e Resultados

Um dos objetivos principais desse mestrado foi realizar uma contribuição para um sistemade código aberto. Sabíamos que assim poderíamos vivenciar todo o fluxo de trabalho deuma comunidade em torno de um projeto de software livre. Seria também uma oportu-nidade de unir conhecimentos acadêmicos a uma implementação que não se limitasse aosartigos científicos, sendo utilizada também em outras realidades, como na indústria, porexemplo. Nossa escolha inicial foi por tentar trabalhar em algo relacionado ao Hadoop.Como vamos relatar em detalhes na Seção 6.1 o trabalho no Hadoop não teve o alcanceque esperávamos. Porém, serviu como base para a busca por outras soluções de sistemasdistribuídos e MapReduce.

Durante algumas de nossas pesquisas sobre linguagens de programação e sistemasdistribuídos, uma linguagem que chamou nossa atenção foi Erlang. Arquitetada para aconstrução de sistemas distribuídos, cada vez que estudávamos Erlang mais víamos seupotencial para lidar com problemas e complexidades vistos no código do Hadoop. Não queo código do Hadoop seja mal escrito, pelo contrário, é muito bem arquiteturado, mas éperceptível que Java e a Java Virtual Machine (JVM) não dão aos programadores as basesde tolerância a falhas e comunicação que Erlang e sua máquina virtual propiciam. Foiisso que nos levou a buscar por soluções dentro da comunidade de Erlang relacionadas aMapReduce e consequemente ao Disco. Sem as facilidades do Erlang, o código do Hadoopé bastante poluído com detalhes de comunicação e tolerância a falhas, algo que não vemosno Disco. Essa simplicidade, facilita muito a implementação de alterações, o teste rápidode mudanças e a pesquisa como um todo. Por esse motivo, o Disco foi nossa escolha finalpara aplicação das técnicas previamente pesquisadas no Hadoop.

Com o intuito de embutir um sistema de preempção e checkpointing ao Disco inicia-mos nossos trabalhos nesse projeto. Porém, diferentemente do Hadoop, o Disco é bastantedesprovido de opções avançadas para tratar aplicações MapReduce. Isso ocorre por seuintuito de se manter enxuto e simples. O Disco resolve o problema de MapReduce deuma maneira bastante direta, sem muitas configurações ou opções avançadas aos admi-nistradores. Por esse motivo, novamente foi necessário focar nossos esforços em algo umpouco menor do que o plano inicial, ao invés de trabalhar tanto com preempção comocom checkpointing, decidimos reduzir nossos esforços a apenas permitir a preempção detarefas no Disco — funcionalidade que não existia em tal sistema.

Além disso, trabalhamos em uma ferramenta para o Disco que se chama Notebooks.

51

Page 52: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 52

Trata-se de uma interface Web para facilitar a submissão de aplicações MapReduce aoDisco. O principal objetivo dessa ferramenta é facilitar o trabalho de outros pesquisadoresque queiram refazer nossos experimentos. Da mesma maneira que fizemos, gostaríamosque o restante da comunidade que se interesse por esse tipo de facilitador contribua comos seus experimentos também nessa ferramenta para assim criarmos um ecossistema fértilde pesquisa em cima do Disco.

Nas próximas seções vamos descrever em detalhes nossos trabalhos, experimentos eresultados tanto no Hadoop (Seção 6.1) como no Disco (Seção 6.2).

6.1 Trabalho realizado no Hadoop

O Hadoop possui um sistema de controle de alterações chamado JIRA. Trata-se de umaplataforma Web para auxiliar a mapear as alterações realizadas e planejadas para o pro-jeto. Como o acesso ao JIRA é totalmente aberto, as mudanças planejadas podem serestudadas e analisadas por qualquer pessoa. Nossa pesquisa se iniciou com uma buscanesse sistema por assuntos do nosso interesse como preempção, tolerância a falhas e check-pointing.

Por meio das pesquisas no JIRA, nos deparamos com uma série de propostas demelhorias nos mecanismos de preempção e a implementação de um checkpointing paratarefas interrompidas no Hadoop [55, 59, 27]. Uma coisa era comum a todas essas entradasno JIRA: não estavam resolvidas.

O intuito das propostas de melhorias relatadas no JIRA e relacionadas à preempçãoe checkpointing era permitir que caso o cluster tenha suporte e o desenvolvedor autorize,quando uma aplicação de maior prioridade for submetida ao cluster, ela poderá tomar olugar da aplicação de mais baixa prioridade. Se a aplicação a sofrer preempção estiver nafases de Shuffle ou Reduce, será possível gravar os estados das tarefas a serem supensasno HDFS (checkpointing) e restaurá-los assim que elas retomarem a execução. Inclusivealgumas delas possuíam patches anexos, ou seja, propostas de melhorias sob a forma decódigo aguardando revisão da comunidade para serem incorporadas ao código principaldo projeto.

Uma alteração relacionada com o tópico preempção e checkpointing está incompleta eexistem patches anexos a ela que precisam de algumas correções para serem incorporadosà base de código do Hadoop [55]. Visando auxiliar a comunidade do Hadoop e trabalharem algo relevante durante esse mestrado, entramos em contato com os desenvolvedoresCarlo Curino e Chris Douglas, responsáveis por esse pedido de mudança. Recebemosa mentoria deles que além de contribuir para a Apache tiveram passagens em times dedesenvolvimento focados no Hadoop dentro do Yahoo e da Microsoft. Nosso trabalhoconsistia em atualizar o patch, deixando-o compatível com a arquitetura mais atual docódigo fonte do Hadoop que se encontra no branch trunk do sistema de controle de versõesGit utilizado pela Apache [37].

O fluxo de trabalho da comunidade do Hadoop se divide nas seguintes etapas:

1. Proposta de alteração com descrição sobre quais benefícios a alteração trará, casoo proponente possua dados, diagramas, referências, etc que auxiliem na aceitação

Page 53: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 53

da proposta pela comunidade é bem visto que anexe essas informação à entrada noJIRA;

2. Designação de um responsável para implementar a alteração;

3. O responsável submete a alteração por meio de um patch do Git;

4. Um robô que visa garantir a qualidade do código é executado aplicando-se o patchno branch trunk do Hadoop, ou seja, executa seus testes sobre uma visão de comoficaria o projeto caso o patch fosse aceito. Esse robô busca por quebras em tes-tes unitários, falta de padronização na escrita do código, desrespeito ao estilo decodificação proposto pela comunidade, etc;

5. Os outros contribuidores do Hadoop via JIRA podem ver o patch e sugerir mudan-ças como quebra do patch em alterações menores, uma nova arquitetura para asmudanças, uma relação não mapeada entre essa entrada do JIRA e uma outra, etc;

6. Passada essa bateria inicial de aceitação do patch, um desenvolvedor mais experienteda comunidade se responsabiliza por incorporar o patch ao branch principal doprojeto (trunk).

Agora vamos detalhar a funcionalidade que os patches relacionados à preempção echeckpointing intencionavam adicionar ao Hadoop. Relembrando que chamamos de Shuf-fle a fase posterior ao Map e anterior ao Reduce responsável por buscar junto aos mapperssuas saídas e tranferi-las via rede às máquinas que posteriormente executarão a fase deReduce. Ainda nessa fase, é feita uma seleção das chaves e valores da saída do Map queinteressam à função de Reduce designada para a máquina correspondente.

Segue uma descrição em alto nível de como salvar o estado da computação durante afase de Shuffle propostas no patch na qual trabalhamos:

1. Uma subclasse de Shuffle é fornecida pelo patch, utilizando-se de uma arquiteturaplugável, ela é chamada CheckpointableShuffle. O primeiro passo de um Shuffle compreempção é fazer com que todas as threads de fetchers — aquelas responsáveis porse conectar ao mapper e trazer suas saídas ao reducer interessado — entrem emuma condição segura para serem desligadas;

2. Grava-se quais as saídas de Maps foram transferidas até o momento, assim sabere-mos quais não o foram também;

3. Finaliza-se a ordenação local dos Maps que foram transferidos com sucesso;

4. Escreve-se utilizando a estrutura de dados I-File (descrita em detalhes na Seção 5.3)um arquivo de checkpoint que conterá um cabeçalho com a lista de identificadoresdos Maps que foram transferidos e seus conteúdos ordenados.

Além disso, o patch propunha o seguinte para salvar o estado da computação durantea fase de Reduce:

Page 54: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 54

1. As funções definidas pelo usuários para Reduce e para gravação dos resultados doreducer (OutputCommiter) devem ter sido marcadas como “passíveis de sofrerempreempção” (via a anotação @Checkpointable nos métodos em Java);

2. Aguarda-se a conclusão da execução das funções definidas pelo usuários para umdeterminado grupo de chaves;

3. Grava-se utilizando uma versão alterada do FileOutputCommiter — o PartialFile-OutputCommiter — a saída parcial da computação do Reduce (concluída no passoanterior);

4. Grava-se utilizando a mesma estrutura de dados I-File utilizado para gravação docheckpoint de Shuffle descrito anteriormente os dados restantes a serem processados.Novamente o arquivo conterá um cabeçalho de identificadores de maps e uma listade pares chave valor que ainda não passaram pela função Reduce.

Por fim, agora que foi descrito como salvar os estados durante Shuffle e Reduce, veja-mos como o patch propunha recuperar tal estado:

1. Durante a inicialização dos objetos referentes ao Shuffle ou ao Reduce, é checada aexistência de um checkpoint para a tarefa correspondente;

2. Se existir tal checkpoint inicializam-se as estruturas de dados correspondentes (listade maps finalizados, por exemplo), com o valor contidos nos arquivos de checkpoint;

3. Retoma-se a computação tanto do Shuffle como do Reduce com as correspondentesestruturas de dados com valores que estariam caso a computação tivesse chegadoali por vias normais (sem uma preempção e gravação de estado, no caso);

4. Caso exista algum problema durante a leitura dos checkpoints, como mecanismo detolerância a falhas, descarta-se tal arquivo e inicia-se novamente a computação doinício para a correspondente tarefa; basicamente volta-se a uma reexecução comoera feito anteriormente.

Além dos benefícios já citados, os desenvolvedores do Hadoop previam que a preemp-ção e o checkpointing auxiliariam o sistema também a ganhar com uma melhor utilizaçãodo cluster como ilustra a Figura 6.1. Note que nesse caso, tem-se 5 mappers e 3 máquinas,o que faria com que uma das máquinas ficasse ociosa após finalizar a atividade de Mapdedicada a ela, pois deveria aguardar com que as outras terminassem sua fase de Map paraentão todas juntas iniciarem a fase de Reduce. Porém, como há preempção no cenárioilustrado na figura, é possível que a máquina não fique ociosa, mas sim realize os Shufflesque serão necessários aos reducers enquanto aguarda, à medida que novas saídas de Mapvão sendo produzidas por outras máquinas, a máquina que em um cenário convencionalestaria ociosa pode ir aproveitando essas novas saídas também. Sequência de eventos daFigura 6.1:

1. Mappers 1, 2 e 3 são executados e gravam suas saídas;

Page 55: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 55

Figura 6.1: Melhor utilização dos recursos do cluster com o auxílio de preempção e check-pointing

2. Reducer 1 é executado no nó 3, coleta sua porção das saídas dos mappers 1, 2e 3 e realiza o Shuffle. Acaba ficando sem mais trabalho para realizar pois estáaguardando a finalização dos mappers 4 e 5, portanto faz um checkpoint do seuestado e finaliza sua execução;

3. Reducer 2 é executado no nó 3 e segue os mesmos passos do reducer 1 descritos nopasso anterior;

4. Map 5 completa sua execução;

5. Reducer 1 volta a ser executado na máquina 3: agora recuperando seu checkpointgravado anteriormente e realizando um Shuffle com a saída do Map 5;

6. Reducer 2 é escalonado no nó 2, também recupera seu estado a partir do checkpointe realiza um Shuffle com a saída do Map 5;

7. Map 5 completa sua execução;

8. Reducer 1 e 2 buscam a saída do Map 5 que era a última faltante em suas entradase assim finalizam sua execução.

Quando assumimos a alteração de preempção de reducers e shufflers, teoricamente pre-cisávamos apenas atualizar o patch que já se encontrava praticamente completo, faltandotambém a execução de alguns testes e experimentos. O primeiro passo seria adequar opatch que tinha ficado antigo, já que o projeto no branch trunk evoluiu e o patch ficoucompátivel com uma versão antiga do mesmo branch. Na realidade, esse trabalho se mos-trou muito maior e desgastante do que previmos, pois as alterações eram muitas e muitofrequentes, então a rotina de manter o patch atualizado era bastante intensa. Ao mesmotempo, ser aceito pelo robô não vinha sendo uma tarefa fácil, pois a mudança era tãogrande que alterava múltiplos sub-projetos do projeto Hadoop, fato que gerava um erro

Page 56: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 56

no robô por não saber trabalhar com a versão mais atualizada de múltiplos sub-projetos.Sendo assim, propusemos uma divisão da atividade em partes menores [23, 22] que foibem vista pelos nossos mentores e trabalhamos também nessas alterações. Nossas ativi-dades no projeto Hadoop foram interrompidas por decisão própria à medida que vimosque a quantidade de trabalho necessária para concluí-las com qualidade não caberia notempo hábil para um projeto de mestrado, já que as mudanças que ocorreram no trunkforam tantas que para termos o serviço de checkpointing para reducers uma reformulaçãocompleta do patch seria necessária. Isso porque uma funcionalidade relacionada à encrip-tação de dados do Shuffle [56] foi aceita pela comunidade e têm impacto direto com asalterações do patch em questão.

A comunidade do Hadoop tem uma série de mudanças propostas a respeito do temapreempção [58], o que mostra sua importância. O trabalho junto aos desenvolvedoresCarlo e Chris nos ajudou a focar naquilo que eles apontam como necessidades mais pri-oritárias para o projeto nesse momento. Baseado em nosso aprendizado, gostaríamos derelatar dois possíveis trabalhos a respeito desse tema que poderiam ser assuntos parapesquisas futuras. O primeiro consistiria em alterar o local de armazenamento das saídasda fase de Map, passando a utilizar o HDFS para as persistir, ao invés do disco localonde o mapper foi executado. O principal benefício disso será que estado a ser gravadono checkpoint diminuirá significativamente. Seria necessário gravar somente qual foi aúltima chave processada para se restaurar um reducer, por exemplo. Uma outra possívelexpansão desse trabalho seria atuar em preempção com checkpointing para os mapperstambém, já que após a alteração que descrevemos nesse trabalho somente tarefas de Re-duce e Shuffle estarão sendo preemptíveis via checkpointing.

Apesar de não termos conseguido terminar a implementação da alteração ou realizarexperimentos em cima das mudanças que realizamos, essas atividades foram valiosas parapodermos olhar para um framework de MapReduce menor e consequentemente mais ma-leável e propor mudanças também relacionadas ao mecanismos de preempção de tarefas.O framework que escolhemos foi o Disco e nas próximas seções relatamos nosso trabalhosobre ele.

6.2 Trabalho realizado no Disco

Conseguimos dois resultados interessantes no Disco: uma nova política de escalonamentoFair com preempção e uma ferramenta para execução e visualização de resultados paraos nossos experimentos, chamada Notebooks.

O escalonador do Disco possui um sistema de políticas de escalonamento bastantemodular. Ele consiste em repassar eventos que venham a ocorrer com as aplicações soba forma de um contrato bem definido entre escalonador e o módulo que implementasua política de escalonamento. São exemplos de tais eventos: a submissão de uma novaaplicação (como o caso da aplicação B na Figura 6.2), o término de uma execução ou umrecurso que ficou disponível para a execução de alguma tarefa (como pode ser visto naFigura 6.2 e na Figura 6.3).

FIFO e Fair já são duas políticas que estão embutidas ao código do Disco e seguem as

Page 57: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 57

regras dessa arquitetura modular. Nosso trabalho em cima do Disco foi o de implementaruma nova política que adicione a preempção de tarefas ao escalonador. Essa política focouem ser justa por meio da rápida alocação de recursos disponíveis no cluster à aplicaçãoque esteja sendo injustiçada, através de preempção de tarefas de aplicações que estejamutilizando mais recursos do que a quantia justa. Essa nova politíca ainda garante queum número justo de recursos vai estar disponível à aplicação necessitada mesmo duranteas fases transitórias — quando as informações sobre ocupação do cluster podem não serconfiáveis — mesmo que ao custo de alguma espera para se ter informações confiáveis.Chamamos essa política de “Preemptive Fair”.

A política “Preemptive Fair” age em quatro situações, onde esses eventos servem comogatilho:

1. Uma nova aplicação é submetida ao cluster: para destinar a quantidade justa de re-cursos à recém-submetida aplicação o quanto antes, a política calcula a quantidadejusta desses recursos e mata tarefas das aplicações em execução de maneira iguali-tária para liberar recursos, um exemplo desse cenário é ilustrado pela Figura 6.2;

2. Um worker está disponível para executar uma nova tarefa: primeiramente, a políticaverifica se uma aplicação recém-submetida necessita obter seus primeiros recursos,se sim ela aloca-os à aplicação assim que ela esteja pronta para iniciar sua execução(para garantir recursos rapidamente à aplicação recém submetida como mostra aFigura 6.2); se não, ela cria uma lista de aplicações ordenada pela quantidade detarefas em execução de cada uma delas, depois filtra essa lista deixando apenasaquelas cujo número de tarefas em execução é menor do que a quantidade justa, aaplicação mais desfavorecida será a primeira dessa nova lista e será a escolhida parautilizar os recursos que se tornaram disponíveis, esse caso é ilustrado pela Figura 6.3;

3. Um worker está disponível para executar uma nova tarefa, porém existe uma apli-cação em execução que está em uma fase transitória: a política sabe que é melhoraguardar a aplicação sair da fase transitória antes de tomar uma decisão, isso é feitoatravés de uma resposta ao escalonador de que não há aplicação a ser escalonada,essa é a melhor decisão pois a quantidade de trabalho remanescente para a tarefaem fase transitória é desconhecida e a decisão poderia ser equivocada;

4. Uma aplicação finalizou sua execução: a política remove essa aplicação de todas asestruturas de dados internas que ela mantem para auxiliá-la em suas decisões (porexemplo: lista de aplicações ordenadas por número de tarefas em execução).

Outra vantagem da arquitetura modular das políticas de escalonamento no Disco é quecom uma simples configuração, o administrador do cluster pode garantir que as máquinastrabalhem com qualquer uma dessas políticas. Para executar nossos experimentos nosbastou alterar essa configuração para que a “Preemptive Fair” passasse a ser utilizada aoinvés da FIFO ou da Fair padrões.

Page 58: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 58

Figura 6.2: Preempção das tarefas de uma aplicação menos prioritária pela política deescalonamento “Preemptive Fair”.

Figura 6.3: Concessão dos recursos à aplicação mais injustiçada pela política de escalo-namento “Preemptive Fair” .

6.2.1 Experimentos

Nossos experimentos foram executados com patrocínio da Digital Ocean [28]. Nós con-figuramos um cluster em rede privada em seu serviço de cloud com 5 nós fisicamentelocalizados no data center de Nova Iorque. Cada nó possui 2 núcleos de CPU, 2 GB dememória RAM e 40 GB de disco SSD. Eles executaram o Disco no sistema operacionalUbuntu Linux 14.04.3 LTC utilizando a máquina virtual Erlang/OTP R16B03 e o inter-pretador Python 2.7.6. Nós realizamos nossos experimentos comparando duas instalaçõesdo Disco: a original da versão 0.5.4 obtida através do repositório oficial [29] com a po-lítica Fair e a adaptação que implementamos executando o escalonador com a política“Preemptive Fair”, explicada anteriormente [31].

Para simular uma carga de trabalho heterogênea composta por aplicações de pesquisa ede produção, nós utilizamos o exemplo mais comum de MapReduce: contador de palavras.Esse é um programa onde grandes arquivos de texto simples servem como entrada e aaplicação calcula o número de vezes que cada uma das palavras dos documentos apareceno texto. Nossa aplicação de pesquisa possui 25 mappers que “dormem” por um intervalode tempo para transformar suas tarefas simples em atividades de longa duração, comosão as tarefas de aplicações de pesquisa muitas vezes. Por outro lado, nossa aplicaçãode produção possui o mesmo número de mappers, porém sem essa configuração paratornar suas tarefas longas. Nosso experimento se constituiu em submeter a aplicaçãode produção 20 segundos após a de pesquisa ser submetida ao nosso cluster configuradoora com a versão oficial do Disco ora com a política “Preemptive Fair” . Os gráficos daFigura 6.4 comparam o tempo total de execução para a aplicação de produção após dezexecuções desse experimento.

Page 59: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 59

Figura 6.4: Tempos de execução das aplicações de produção

O novo mecanismo de preempção e o tratamento especial que a nova política deu àsfases transitórias das aplicações produziram um resultado interessante: a aplicação deprodução é executada 8.4 vezes mais rapidamente em média. Porém, essa melhoria vemcom um custo: um impacto no tempo de execução da aplicação de pesquisa, como mostraa Figura 6.5. Em média, com a política “Preemptive Fair”, a aplicação de pesquisa é4% mais devagar. Ela teve o tempo de execução aumentado em 9 segundos em nossosexperimentos.

Figura 6.5: Tempos de execução das aplicações de pesquisa.

A Figura 6.6 mostra a divisão dos recursos do cluster durante a execução das aplicaçõesde pesquisa e produção com a versão original do Disco sendo executada no cluster, ouseja, com a utilização da política Fair. É possível notar que existem dois grandes temposde espera para iniciar a execução da aplicação de produção, apesar de ser a prioritária. Aprimeira espera se dá no começo da execução da aplicação de produção. Imediatamenteapós ela ser submetida ela só começa a ser executada após 1 minuto de espera (01:20sno gráfico, relembrando que ela foi submetida 20 segundos após a aplicação de pesquisa).A segunda espera ocorre entre o término da fase de Map e o inicio da fase de Shuffle: aaplicação de produção não possui tarefas sendo executadas entre 01:41s e 02:42s.

Page 60: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 60

Figura 6.6: Utilização do cluster para a execução do experimento com a versão originalda política Fair.

A política Preemptive Fair tenta sanar o problema que essas duas esperas geram notempo de execução final da aplicação de produção. Isso pode ser visto na Figura 6.7. Paraisso, a aplicação de produção recebe seus recursos imediatamente após ser submetida,eliminando a primeira espera. A quantidade de trabalho perdido por causa da preempçãonecessária para evitar essa espera é proporcional a quantidade justa de recursos e aointervalo anterior a submissão da aplicação de produção, em nossos experimentos 100segundos de computação são perdidos porque temos 5 workers para cada aplicação e 20segundos de intervalo (tempo de execução da aplicação de pesquisa no momento que a deprodução é submetida). Porém, em um caso geral a quantidade de trabalho perdido podevariar já que cada tarefa poderia estar em um estágio diferente da computação. Alémdisso, nossa política prefere aguardar por um pequeno intervalo tendo parte do clustersub-utilizado durante as fases de Shuffle e Reduce da aplicação de produção a alocarrecursos de maneira incorreta a aplicação de pesquisa, que além de tudo pode ser gulosae segurar esses recursos por muito mais tempo, impendindo que a aplicação de produçãoprioritária continue sua execução. A Figura 6.6 ilustra o que ocorre caso não se aguardepor informações mais precisas sobre a utilização dos recursos do cluster durante fases detransição: a aplicação de pesquisa toma para si todos os recursos disponíveis e faz comque a de produção tenha que aguardar por um longo intervalo de tempo. Cabe notar quea área dos gráficos não é a mesma pois o trabalho total executado depende também dastarefas subjacentes de escalonamento do Disco.

6.2.2 Notebooks

Durante toda a pesquisa descrita nesta dissertação, uma dificuldade recorrente semprefoi encontrar uma maneira de refazer experimentos contidos em artigos que líamos arespeito de MapReduce. Mais especificamente, como reexecutar os experimentos e como

Page 61: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 61

Figura 6.7: Utilização do cluster para a execução do experimento com a políticaPreemptive Fair que implementamos

visualizar os resultados com gráficos e tabelas depois de colhidos os dados. Durante aconferência WPerformance 2016, notamos que essa dificuldade não era somente nossa,vimos alunos interessados em utilizar MapReduce em seus Trabalhos de Conclusão deCurso e que temiam mais a dificuldade de se coletar dados e de expor os resultadoscolhidos de maneira visual do que a criação de aplicações ou as mudanças no Hadoop ouDisco propriamente ditas. Isso ocorre pois para se colher resultados, faltam ferramentas einformações online, enquanto para se criar aplicações existem diversos códigos de exemploe tutoriais na internet e para se alterar o Hadoop e o Disco basta ler o código que é muitobem arquiteturado e a documentação que geralmente é muito bem feita e atualizada.

Para atacar esses problemas buscamos inspiração em uma ferramenta comum entre oscientistas de dados que trabalham com Python chamada Jupyter Notebooks [57]. Essa fer-ramenta permite ao usuário executar um ambiente completo de desenvolvimento Pythoncomo um servidor local em sua máquina. Acessando um endereço local no navegador,o usuário pode então digitar comandos Python em células que são executados imediata-mente mostrando a saída desses comandos sejam elas impressões na tela de um comandoprint, por exemplo, ou até gráficos complexos produzidos com o auxílio de bibliotecas doPython famosas no mundo da Ciência de Dados, como a matplotlib [49], por exemplo. AFigura 6.8 mostra uma tela do Jupyter Notebooks para exemplificar a interatividade queesse sistemas entrega aos desenvolvedores Python.

Esse tipo de ferramenta auxilia na rápida visualização de resultados e experimentaçãode possíveis caminhos em busca de uma determinada solução. E inspirados por essasideias que criamos uma versão minimalista desses Notebooks para o Disco. Adicionamosà interface Web padrão do Disco — utilizada pelos mantenedores do cluster para visualizarinformações sobre as aplicações enviadas ao Disco e sobre os nós ativos e seus workers —um link para a nossa ferramenta. Assim, é facilmente acessível pelos desenvolvedores epesquisadores que desejem reproduzir nossos experimentos, como ilustra a Figura 6.9.

Page 62: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 62

Figura 6.8: Interface do Jupyter Notebooks, famosa ferramenta da comunidade Python

Ao entrar na nossa plataforma, pode-se optar por submeter novos trabalhos para ocluster ou visualizar resultados dos trabalhos já executados. Como fizemos a plataformabastante voltada para a nossa pesquisa sobre os escalonadores do Disco em especial paratestar o nosso escalonador o Preemptive Fair Scheduler a tela de submissão de novasaplicações (ilustrada na Figura 6.10) mostra os seguintes campos:

• Notebook name: nome que o usuário deve dar para esse notebook para que depoispossa encontrá-lo na lista de relatórios gerados;

• Interval: tempo de intervalo entre as submissões da aplicação A (geralmente a depesquisa) e a aplicação B (geralmente a de produção)

• Number of runs: quantidade de vezes que o experimento deve ser executado;

• Job A: espaço para que o usuário escreva as funções Map e Reduce da aplicação A(geralmente a de pesquisa), esse campo vem pré-populado com a versão do word-count que utilizamos para obtenção dos resultados, aquela que contém um sleeppara simular uma aplicação lenta;

• Job B: espaço para que o usuário escreva as funções Map e Reduce da aplicação B(geralmente a de produção), esse campo vem pré-populado com a versão do word-count original do Disco, baseado nos códigos de exemplo do Disco;

• Botão “mais”: utilizado para submeter aplicações ao cluster.

Page 63: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 63

Figura 6.9: Link para a ferramenta Notebooks exibido no canto superior direito da tela

Após submetido um notebook, o usuário deve aguardar as execuções do experimento.Isso pode ser feito na tela principal da interface Web do Disco que como ilustra a Fi-gura 6.11 exibe um menu na lateral direita da tela com informações sobre aplicações emexecução e finalizadas. Quando todas as aplicações estiverem com o ícone de estado verde,o usuário saberá que foram finalizadas as execuções do seu experimento.

Assim que concluído ele poderá observar na tela de relatórios a lista de todos osnotebooks que já foram executados nesse cluster (ilustrada na Figura 6.12). O mesmonome dado ao notebook na tela de submissão será utilizado na listagem. E assim que umitem dessa listagem é selecionado uma nova tela com os gráficos de utilização do clusterao longo do tempo do experimento com a quantidade de nós dedicados à cada aplicaçãono decorrer do experimento é exibida.

Para desenvolver essas telas utilizamos uma tecnologia de componentes Web do Googlechamada Polymer [60]. Essa abstração nos permite escrever pouco código, aproveitarmuito componentes já criados e disponibilizados de maneira livre na Internet e ter umdesign minimamente agradável baseado em alguns padrões criados pelo Google e chamadosde Material [42]. Os principais componentes que utilizamos foram:

chart-elements [26]: para criação dos gráficos de maneira dinâmica;

juicy-ace-editor [46]: para campos de código no formulário com colorações de códigoPython semelhantes aos editores mais populares do mercado;

paper-elements [53] : para criação de formulários e da aplicação no padrão Material;

iron-ajax [43]: para nos auxiliar com as requisições e tratamento de respostas entre aaplicação Polymer e a API do Disco;

Page 64: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 64

Figura 6.10: Tela do Notebooks para a submissão de novas aplicações

Para criar as informações e trazê-las novamente à aplicação Polymer via AJAX (Asyn-chronous JavaScript and XML) [35] desenvolvemos os seguintes métodos remotos no mó-dulo Web do Disco:

HTTP POST para criar um notebook com os parâmetros colhidos no formulários desubmissão de aplicacões: ao ser executado o notebook gera log em uma pasta padrãopré-definida em nosso código;

HTTP GET para listar os notebooks disponíveis para visualização de relatórios: o ser-vidor faz uma consulta ao diretório padrão para verificar quais logs foram geradosnaquele diretório e devolve essa lista de logs ao cliente;

HTTP GET para trazer o conteúdo do arquivo de log : como o arquivo pode ser muitogrande, essa requisição contém parâmetros referentes à linha corrente desejada e umoffset da quantidade de linhas a partir da corrente se deseja, além disso é devolvidauma flag ao cliente que indica se o log acabou ou não de ser lido de acordo comesses parâmetros, assim o cliente pode fazer diversos requests até que chegue ao fimdo log para então formatar seu gráfico.

Vale salientar que todos esses métodos remotos são acessíveis via HTTP utilizandoJSON [45] como a forma de serialização. Ao receber um POST com os dados de umnotebook, executa-se um comando Python que acaba por utilizar os parâmetros recebidose executar todo o código sem checagem alguma. Apesar dessa arquitetura abrir portaspara diversas falhas de segurança, foi a maneira mais rápida de atingirmos nosso objetivo.O código dessa aplicação Polymer é bastante simples e componentizável, além de opensource [32]. Esperamos que possa servir de base para que outros pesquisadores façam o

Page 65: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 65

Figura 6.11: Tela principal do Disco Web enquanto aplicações estão em execução

Figura 6.12: Tela do Notebooks com a listagem dos relatórios disponíveis

mesmo com seus experimentos, ou seja, criem formulários para submissão de notebooksda mesma maneira que fizemos e telas para visualização dos principais dados. Um co-nhecimento básico de HTML, CSS e Javascript já é o suficiente para utilizar-se do nossomodelo para a criação de algo semelhante.

Page 66: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 6. CONTRIBUIÇÕES E RESULTADOS 66

Figura 6.13: Tela do Notebooks com os resultados dos experimentos de um dado relatórioselecionado

Page 67: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Capítulo 7

Conclusão

Em um mundo onde grandes quantidades de dados são produzidas diariamente, a neces-sidade de mecanismos eficazes de armazenamento e processamento desses dados culminouno desenvolvimento do GFS [9] e do MapReduce pelo Google [6]. A necessidade de siste-mas similares originou o Apache Hadoop, projeto de código aberto que hoje em dia já podeser considerado o padrão do mercado para se trabalhar com Big Data. Os clusters que uti-lizam Hadoop já são tão grandes (da ordem de milhares de máquinas) que sua arquiteturaprecisou ser revista, incluindo o gerenciador de recursos denominado YARN [17].

Hadoop é primordialmente escrito em Java, a resposta da comunidade de Erlang —linguagem especialmente desenhada para sistemas distribuídos e tolerância a falhas — foio projeto Disco [13]. Trata-se de um robusto e elegante projeto também de código abertodesenvolvido pelo Nokia, inicialmente.

Neste mestrado, focamos nossos esforços no problema que esses frameworks de Map-Reduce possuem ao tentar trabalhar com as chamadas cargas heterogêneas [4]. Essassão cargas especialmente difíceis para os escalonadores já que as aplicações de produção(mais prioritárias e de execução rápida) estão misturadas às aplicações de pesquisa (menosprioritárias e de execução demorada). Tanto Hadoop como Disco possuem mecanismosplugáveis de escalonadores ou políticas de escalonamento, onde pudemos experimentarcom possíveis alterações em seus códigos fontes para tentar melhores resultados com otempo de execução médio de cargas heterogêneas.

O principal mecanismo que abordamos foi o de preempção, ou seja, na presença detarefas de uma aplicação de produção, por possuir maior prioridade, esse mecanismopermite ao escalonador que pratique a preempção de tarefas de uma aplicação de menorprioridade (pesquisa) para liberar recursos para a primeira. Estratégias de checkpointingdo estado das tarefas antes de sofrerem preempção para poupar o cluster do posteriorretrabalho para recomputar o resultado perdido pela preempção forma deixadas paratrabalhos futuros.

No Hadoop, soluções a esses problemas foram requisitadas pela comunidade por meioda mudança [55]. Ela consiste em inserir mecanismos para preempção e checkpointingàs tarefas de Reduce e Shuffle. Nossa primeira atividade nessa pesquisa foi trabalharna atualização do código do patch anexo à esse pedido de mudança em conjunto comos desenvolvedores responsáveis (Carlo Curino e Chris Douglas). Conseguimos contribuircom uma nova proposta de divisão da solução [22, 23] e com atualizações do patch, porém,

67

Page 68: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

CAPÍTULO 7. CONCLUSÃO 68

não finalizamos a implementação já que uma mudança estrutural muito grande relacionadaà encriptação dos dados do Shuffle [56] tornaria inviável termos algo finalizado no prazodeste mestrado.

Após as tentativas no Hadoop, abrimos nosso leque ao projeto Disco e tentamos im-plementar os mecanismos de preempção e checkpointing nele. Conseguimos mitigar osproblemas das cargas heterogêneas nesse sistema por meio da criação da política de escalo-namento Preemptive Fair [8]. Para a implementarmos, necessitamos de poucas alteraçõesno código do Disco, permitindo que esse projeto continue sendo uma alternativa enxutaao Hadoop. Usando o conhecimento adquirido por meio do estudo de pesquisas anteriores(em sua maioria feitas em cima do Hadoop), conseguimos adaptar algumas das melho-res práticas relacionadas à preempção e ao atraso do escalonamento (em busca de maisinformações para se tomar melhores decisões) originando uma nova política de escalona-mento para o Disco que além de justa possui preempção. As aplicações prioritárias emnossa carga de testes foram executadas 8,4 vezes mais rapidamente em média com a nossapolítica Preemptive Fair quando comparada à política Fair padrão do Disco. Porém, aotimização veio com um pequeno custo: subutilização do cluster durante a fase final daexecução da aplicação de produção e um acréscimo em 4% no tempo de execução daaplicação de pesquisa.

Outro ponto que observamos durante nossa pesquisa foi o quanto é complicado re-produzir experimentos anteriores nesses sistemas de MapReduce, tanto no Disco como noHadoop. Como uma forma de permitir uma reprodução facilitada dos experimentos querealizamos, pondo nosso política de escalonamento à prova, criamos a interface Web No-tebooks para o Disco. Inspirada nos Jupyter Notebooks do Python, essa interface facilitapara futuros pesquisadores submeter aplicações de pesquisa e produção e um intervaloentre a submissão da aplicação de pesquisa e de produção. Além disso, por meio da nossainterface as mesmas aplicações podem ser submetidas diversas vezes para se calcular umamédia dos tempos de execução e exibir relatórios com os resultados das execuções: medi-ções relativas ao tempo de execução e quantidade de recursos dedicados a cada aplicaçãocom o passar do tempo da execução.

Deixamos como trabalho futuro a possibilidade de preservar o estado das tarefas doDisco antes de sofrerem preempção (checkpoiting). Além disso, gostaríamos de impedir asubutilização do cluster durante as fases de Shuffle e Reduce para a aplicação de produção:nos nossos experimentos, a aplicação de pesquisa poderia ter utilizado mais recursos doque aqueles que o escalonador direcionou à ela. Outra melhoria interessante seria umaforma de submeter aplicações ao Disco com algum valor de prioridade, para nossas cargasde testes consideramos a primeira aplicação como a de pesquisa e a segunda como a deprodução, mas uma forma facilitada de se definir prioridades seria de grande valor para oescalonador desse sistema.

Page 69: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Referências Bibliográficas

[1] Ganesh Ananthanarayanan, Srikanth Kandula, Albert Greenberg, Ion Stoica, Yi Lu,Bikas Saha, and Edward Harris. Reining in the outliers in Map-Reduce clusters usingMantri. In Proceedings of the 9th USENIX Conference on Operating Systems Designand Implementation, OSDI’10, pages 265–278, Berkeley, CA, USA, 2010. USENIXAssociation.

[2] Joe Armstrong. Making reliable distributed systems in the presence of software errors,2003.

[3] Q. Chen, C. Liu, and Z. Xiao. Improving MapReduce performance using smartspeculative execution strategy. IEEE Transactions on Computers, 63(4):954–967,April 2014.

[4] Lu Cheng, Qi Zhang, and Raouf Boutaba. Mitigating the negative impact ofpreemption on heterogeneous mapreduce workloads. In Proceedings of the 7th In-ternational Conference on Network and Services Management, CNSM ’11, pages189–197, Laxenburg, Austria, Austria, 2011. International Federation for InformationProcessing.

[5] Brian Cho, Muntasir Rahman, Tej Chajed, Indranil Gupta, Cristina Abad, NathanRoberts, and Philbert Lin. Natjam: Design and evaluation of eviction policies forsupporting priorities and deadlines in Mapreduce clusters. In Proceedings of the 4thAnnual Symposium on Cloud Computing, SOCC ’13, pages 6:1–6:17, New York, NY,USA, 2013. ACM.

[6] Jeffrey Dean and Sanjay Ghemawat. MapReduce: Simplified Data Processing onLarge Clusters. In Proceedings of the 6th Conference on Symposium on OpeartingSystems Design & Implementation - Volume 6, OSDI’04, pages 10–10, Berkeley, CA,USA, 2004. USENIX Association.

[7] Augusto Souza e Islene Garcia. Preempção de Tarefas MapReduce via Checkpointing.In Anais do IX Workshop de Teses, Dissertações e Trabalhos de Iniciação Científicaem Andamento do IC-UNICAMP, pages 86–91, 2014.

[8] Augusto Souza e Islene Garcia. A preemptive fair scheduler policy for disco Map-Reduce framework. In Anais do XXXVI Congresso da Sociedade Brasileira de Com-putação, pages 2758–2769, 2016.

69

Page 70: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

REFERÊNCIAS BIBLIOGRÁFICAS 70

[9] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google File System.In Proceedings of the nineteenth ACM symposium on Operating systems principles,SOSP ’03, pages 29–43, New York, NY, USA, 2003. ACM.

[10] Carl Hewitt, Peter Bishop, and Richard Steiger. A universal modular actor formalismfor artificial intelligence. In Proceedings of the 3rd International Joint Conferenceon Artificial Intelligence, IJCAI’73, pages 235–245, San Francisco, CA, USA, 1973.Morgan Kaufmann Publishers Inc.

[11] Michael Isard, Mihai Budiu, Yuan Yu, Andrew Birrell, and Dennis Fetterly. Dryad:Distributed data-parallel programs from sequential building blocks. In Proceedingsof the 2007 Eurosys Conference, Lisbon, Portugal, March 2007. Association for Com-puting Machinery, Inc.

[12] Erik Johansson, Konstantinos Sagonas, and Jesper Wilhelmsson. Heap archi-tectures for concurrent languages using message passing. SIGPLAN Not., 38(2supplement):88–99, June 2002.

[13] Prashanth Mundkur, Ville Tuulos, and Jared Flatow. Disco: A computing platformfor large-scale data analytics. In Proceedings of the 10th ACM SIGPLAN Workshopon Erlang, Erlang ’11, pages 84–89, New York, NY, USA, 2011. ACM.

[14] C. Ranger, R. Raghuraman, A. Penmetsa, G. Bradski, and C. Kozyrakis. Evalua-ting MapReduce for multi-core and multiprocessor systems. In High PerformanceComputer Architecture, 2007. HPCA 2007. IEEE 13th International Symposium on,pages 13–24, Feb 2007.

[15] Sriram Rao, Raghu Ramakrishnan, Adam Silberstein, Mike Ovsiannikov, and Da-mian Reeves. Sailfish: A framework for large scale data processing. In Proceedingsof the Third ACM Symposium on Cloud Computing, SoCC ’12, pages 4:1–4:14, NewYork, NY, USA, 2012. ACM.

[16] K. Shvachko, Hairong Kuang, S. Radia, and R. Chansler. The Hadoop DistributedFile System. In Mass Storage Systems and Technologies (MSST), 2010 IEEE 26thSymposium on, pages 1–10, 2010.

[17] Vinod Kumar Vavilapalli, Arun C. Murthy, Chris Douglas, Sharad Agarwal, Maha-dev Konar, Robert Evans, Thomas Graves, Jason Lowe, Hitesh Shah, SiddharthSeth, Bikas Saha, Carlo Curino, Owen O’Malley, Sanjay Radia, Benjamin Reed, andEric Baldeschwieler. Apache Hadoop YARN: Yet Another Resource Negotiator. InProceedings of the 4th Annual Symposium on Cloud Computing, SOCC ’13, pages5:1–5:16, New York, NY, USA, 2013. ACM.

[18] Yandong Wang, Jian Tan, Weikuan Yu, Li Zhang, Xiaoqiao Meng, and Xiaobing Li.Preemptive reducetask scheduling for fair and fast job completion. In Proceedingsof the 10th International Conference on Autonomic Computing (ICAC 13), pages279–289, San Jose, CA, 2013. USENIX.

Page 71: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

REFERÊNCIAS BIBLIOGRÁFICAS 71

[19] Tom White. Hadoop: The Definitive Guide. O’Reilly Media, Inc., 2012.

[20] Matei Zaharia, Dhruba Borthakur, Joydeep Sen Sarma, Khaled Elmeleegy, ScottShenker, and Ion Stoica. Delay scheduling: A simple technique for achieving localityand fairness in cluster scheduling. In Proceedings of the 5th European Conference onComputer Systems, EuroSys ’10, pages 265–278, New York, NY, USA, 2010. ACM.

[21] Matei Zaharia, Andy Konwinski, Anthony D. Joseph, Randy Katz, and Ion Stoica.Improving MapReduce performance in heterogeneous environments. In Proceedingsof the 8th USENIX Conference on Operating Systems Design and Implementation,OSDI’08, pages 29–42, Berkeley, CA, USA, 2008. USENIX Association.

[22] Add a checkpointable version of shuffle and reduce context supported by a checkpointmanager which uses the hdfs - jira - mapreduce-6444. https://issues.apache.org/jira/browse/MAPREDUCE-6444. Último acesso em 15 de junho de 2017.

[23] Add support for partialfileoutputcommiter when checkpointing is an option duringpreemption - jira - mapreduce-6434. https://issues.apache.org/jira/browse/MAPREDUCE-6434. Último acesso em 15 de junho de 2017.

[24] Apache hadoop website. http://hadoop.apache.org/. Último acesso em 15 dejunho de 2017.

[25] Apache license 2.0. http://www.apache.org/licenses/LICENSE-2.0. Últimoacesso em 15 de junho de 2017.

[26] Chart elements at github.com. https://github.com/robdodson/chart-elements.Último acesso em 15 de junho de 2017.

[27] Checkpoint shuffle aggregation as map output - jira - mapreduce-4585. https://issues.apache.org/jira/browse/MAPREDUCE-4585. Último acesso em 15 de junhode 2017.

[28] Digital ocean website. https://www.digitalocean.com. Último acesso em 15 dejunho de 2017.

[29] Disco at github.com. https://github.com/discoproject/disco. Último acessoem 15 de junho de 2017.

[30] Disco distributed filesystem website. http://disco.readthedocs.org/en/latest/howto/ddfs.html. Último acesso em 15 de junho de 2017.

[31] Disco fork at github.com. https://github.com/augustorsouza/disco. Últimoacesso em 15 de junho de 2017.

[32] Disco notebooks at github.com. https://github.com/augustorsouza/disco/tree/notebooks. Último acesso em 15 de junho de 2017.

[33] Disco project website. http://discoproject.org. Último acesso em 15 de junhode 2017.

Page 72: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

REFERÊNCIAS BIBLIOGRÁFICAS 72

[34] Elastic compute cloud (ec2) – servidor e hospedagem na nuvem – aws. https://aws.amazon.com/pt/ec2/. Último acesso em 15 de junho de 2017.

[35] Getting started - ajax | mdn. https://developer.mozilla.org/en-US/docs/AJAX/Getting_Started. Último acesso em 15 de junho de 2017.

[36] Go worker at github.com. https://github.com/discoproject/goworker. Últimoacesso em 15 de junho de 2017.

[37] Hadoop at github.com. https://github.com/apache/hadoop. Último acesso em 15de junho de 2017.

[38] Hadoop mapreduce next generation - capacity scheduler. https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html.Último acesso em 15 de junho de 2017.

[39] Hadoop mapreduce next generation - fair scheduler. https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-site/FairScheduler.html. Úl-timo acesso em 15 de junho de 2017.

[40] Hadoop streaming at apache wiki. https://wiki.apache.org/hadoop/HadoopStreaming. Último acesso em 15 de junho de 2017.

[41] Haskell worker at github.com. https://github.com/zuzia/haskell_worker. Úl-timo acesso em 15 de junho de 2017.

[42] Introduction - material design - material design guidelines. https://material.google.com/. Último acesso em 15 de junho de 2017.

[43] Iron ajax at github.com. https://github.com/PolymerElements/iron-ajax. Úl-timo acesso em 15 de junho de 2017.

[44] Java theory and practice: Garbage collection in the hotspot jvm. http://www.ibm.com/developerworks/library/j-jtp11253/. Último acesso em 15 de junhode 2017.

[45] Json. http://www.json.org/. Último acesso em 15 de junho de 2017.

[46] Juicy ace editor at github.com. https://github.com/Juicy/juicy-ace-editor.Último acesso em 15 de junho de 2017.

[47] Kosmos distributed filesystem. https://code.google.com/archive/p/kosmosfs/.Último acesso em 15 de junho de 2017.

[48] Lfe worker at github.com. https://github.com/oubiwann/lfe-disco. Últimoacesso em 15 de junho de 2017.

[49] http://matplotlib.org/. Último acesso em 15 de junho de 2017.

[50] Moving ahead with hadoop yarn. https://www.ibm.com/developerworks/library/bd-hadoopyarn/. Último acesso em 15 de junho de 2017.

Page 73: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

REFERÊNCIAS BIBLIOGRÁFICAS 73

[51] New york times - self-service, prorated supercomputing fun! https://open.blogs.nytimes.com/2007/11/01/self-service-prorated-super-computing-fun/. Úl-timo acesso em 15 de junho de 2017.

[52] Ocaml worker at github.com. https://github.com/discoproject/odisco. Últimoacesso em 15 de junho de 2017.

[53] Paper elements at github.com. https://github.com/Polymer/paper-elements.Último acesso em 15 de junho de 2017.

[54] Pipeline data flow in disco jobs. http://disco.readthedocs.io/en/develop/howto/pipeline.html. Último acesso em 15 de junho de 2017.

[55] Preemption of reducer (and shuffle) via checkpointing - jira - mapreduce-5269.https://issues.apache.org/jira/browse/MAPREDUCE-5269. Último acesso em 15de junho de 2017.

[56] Support for encrypting intermediate data and spills in local filesystem - jira- mapreduce-5890. https://issues.apache.org/jira/browse/MAPREDUCE-5890.Último acesso em 15 de junho de 2017.

[57] The jupyter notebook — ipython. https://ipython.org/notebook.html. Últimoacesso em 15 de junho de 2017.

[58] Umbrella: Preemption and restart of mapreduce tasks - jira - mapreduce-4584.https://issues.apache.org/jira/browse/MAPREDUCE-4584. Último acesso em 15de junho de 2017.

[59] Umbrella: Preemption and restart of mapreduce tasks - jira - mapreduce-4584.https://issues.apache.org/jira/browse/MAPREDUCE-4584. Último acesso em 15de junho de 2017.

[60] Welcome - polymer project. https://www.polymer-project.org/1.0/. Últimoacesso em 15 de junho de 2017.

Page 74: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

Apêndice A

Comparação entre Java e Erlang

Em várias partes do texto desta dissertação argumentamos sobre como a linguagem deprogramação Erlang, utilizada na Disco, auxilia seus programadores a se preocuparem me-nos com aspectos específicos de sistemas distríbuidos do que Java, utilizada no Hadoop.Como as duas linguagens são compiladas em bytecode e executadas em máquinas virtu-ais, vamos nesse anexo analisar as características dessas linguagens e de seus ambientes deexecução para entender como Erlang se destaca em relação ao Java em sistemas distribuí-dos. Além disso, vamos ver como esses ambientes de execução acabam por influenciar namaneira como o código é escrito, no paradigma de programação utilizado por cada umadessas linguagens e em como concorrência e paralelismo podem ser atingidos em ambosos casos.

Chamamos a máquina virtual responsável pela execução de bytecode Java de JVM(Java Virtual Machine). Trata-se de uma das máquinas virtuais mais conhecidas e quetalvez mais tenha recebido investimentos de grandes empresas (por exemplo: Sun, Oraclee Google). A funcionalidade que talvez tornou Java tão popular e que foi um dos trunfosda JVM é a garbage collection, ou seja, a delegação da responsabilidade de gerenciamentode memória do programa ao ambiente de execução e não ao programador. Evitandoassim falhas humanas nesse processo custoso que é gerenciar memória em um programade computador.

O equivalente da JVM para o Erlang é a BEAM que curiosamente também oferecegarbage collection. Essa máquina virtual porém não conquistou a mesma fama que Javanos ambientes da indústria e mesmo da academia, apesar do investimento em sua criaçãode uma grande empresa também: a Ericsson. Diferentemente da JVM, a BEAM além degarbage collection traz a tona um conceito muito comum em sistema operacionais masque ela se encarrega de endereçar internamente: processos. Diferentemente dos processosde sistemas operacionais que podem ser pesados, a BEAM possui processos extremamenteleves (certa de 2KB) e regras muito bem definidas para comunicação (chamadas de ActorModel [10]) e gerenciamento de memória que veremos nas próximas seções deste anexo.

Costuma-se dizer que Erlang é para os locks o que Java é para os ponteiros. Essaé uma afirmação que vem do fato de que Java revolucionou o mundo da programaçãode computadores quando tirou do programador o fardo de gerenciar memória com seugarbage collector, isso possibilitou que a máquina se encarrega de gerenciar a sua própriamemória e que o programador se preocupe em resolver o problema que seu código visa

74

Page 75: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

APÊNDICE A. COMPARAÇÃO ENTRE JAVA E ERLANG 75

endereçar com uma responsabilidade a menos. Da mesma maneira, Erlang endereça aquestão dos locks, ou seja, ao invés de fazer com que o programador tenha a obrigação desaber se uma região crítica está sendo utilizada por uma thread e por isso não pode serutilizada por uma outra, por exemplo, lógica que complica o entendimento e a manutençãode seu código, Erlang olha para esse problema de uma outra maneira: e se não existissemregiões críticas em nossos programas?

A.1 Gerenciamento de memória

Quando a JVM é executada vemos uma divisão da memória em uma área de heap com-partilhada por todos os objetos, uma pilha (stack) para memória que não seja alocadadinamicamente e uma área denominada non heap para armazenar meta informações dasclasses e também código nativo que o Just In Time Compiler (JIT) venha a gerar. O JITé uma otimização importante, trechos de código que sejam executados frequentementesão automaticamente identificados pela JVM e compilados para código nativo para umamelhor performance.

A área do heap na JVM é por sua vez subdividida. Cada uma dessas subdivisõesarmazena objetos segundo suas “idades”. Essa lógica permite que para os objetos maisnovos o garbage collector seja executado mais vezes, pois pautado em observações amaioria dos objetos tende a ser alocado e desalocado rapidamente da memória. Alémdisso, essa técnica permite que uma área menor do heap seja varrida por execução dogarbage collector já que essa varredura é custosa e exige que a execução do programa emsi pare por um tempo até que ela termine [44].

Quando olhamos para a BEAM vemos que a maior diferença em termos de gerencia-mento de memória dela em relação à JVM é que cada processo possui seu próprio heape sua própria stack. Além disso, cada processo possui seu bloco de controle que é utili-zado para armazenar meta-informações e uma fila de mensagens — que como veremos napróxima seção é a base da comunicação no Actor Model. O garbage collector nessa estru-tura é executado para cada processo isoladamente, o que tem vários benefícios como umaexecução rápida e que não trave a execução de outros processos. Além disso, quando umprocesso acaba toda sua memória pode ser desalocada imediatamente. Diferentemente daJVM não existem pausa globais por motivos de limpeza de memória na BEAM [12].

Toda essa lógica de memória não compartilhada acaba por influenciar nos paradigmasde programação que devem ser seguidos em cada uma das linguagens. Java se debruçasobre o paradigma Orientado a Objetos que permite que estrutura mutáveis (objetos)sejam utilizadas encapsulando seu estado em atributos e permitindo por meio de métodose suas visibilidades que outros objetos alterem esse estado. Por outro lado Erlang abraça oparadigma Funcional, pautado na inexistência de estado compartilhado e na imutabilidadedas estruturas de dados, ou seja, não se alteram valores em memória, mas sim se criamnovos valores baseadas nas estruturas antigas. Por exemplo, uma variável numérica quevalha 2 não pode ter seu valor alterado para 3 ao se somar 1 a ela, mas sim pode-se criaruma nova variável que seja igual a primeira mais 1.

Page 76: Augusto Rodrigues de Souza Mecanismos para ...repositorio.unicamp.br/bitstream/REPOSIP/322699/1/Souza...Sou 1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída)

APÊNDICE A. COMPARAÇÃO ENTRE JAVA E ERLANG 76

A.2 Concorrência e paralelismo

Trabalhar com unidades de processamento que sejam executadas de maneira concorrenteou paralela traz consigo uma série de possíveis problemas quando se tem uma estruturasemalhante a da JVM de gerenciamento de memória, pois ela é compartilhada entre asthreads. A maioria dessas problemas tem que ser resolvida e pensada pelo programadorutilizando-se do recurso de semáforos (locks) para controlar o acesso às regiões críticas.Alguns dos problemas mais comuns são:

Deadlocks: ocorre quando duas threads necessitam dos recursos A e B, a primeira travaA e tenta obter B e a segunda faz o mesmo na ordem inversa, dessa forma os recursosacabam por ficar travados e nunca são liberados por nenhuma das threads;

Race conditions: ocorre quando a ordem de execução de duas threads não foi garantidapelo programador, em Java no caso as threads são mapeadas uma a uma para threadsa nível de sistema operacional (SO), ou seja, o escalonador padrão do SO acaba pordecidir a ordem de execução podendo causar comportamentos não esperados;

Starvation: ocorre quando uma thread fica tempo demais na região crítica não dando aoportunidade a outras de utilizá-la também.

Erlang trabalha quase sempre com imutabilidade e sem memória compartilhada, aúnica forma das unidades de processamento (no caso do Erlang, os processos) se comuni-carem e compartilharem informações é através da passagem de mensagens. Essa é umadas regras do Actor Model e que faz com que concorrência em Erlang seja muito menospassível de problemas a serem endereçados pelo programador do que em Java. Passagemde mensagens está um nível acima no quesito abstração do que locks.

Outro aspecto interessante é que os processos podem ser distribuídos dentro de umamesma máquina executando concorrentemente (em máquinas com mais de 1 core) ou emparalelo, ou seja, em outra máquina totalmente diferente, sem grandes configurações porparte dos programadores responsáveis. Até mesmo nesses casos o ambiente de execuçãose encarrega de passar as mensagens de um processo ao outro, independentemente deonde ele esteja sendo executado fisicamente. Uma limitação que esse modelo ocasionaé que as mensagens não podem ser passadas por referência de um processo a outro,elas obrigatoriamente precisam ser copiadas de uma caixa de saída para uma caixa deentrada de mensagens. Como os processadores não vão ficar mais rápido e arquiteturasmulticore são a realidade para máquinas termos mais potentes, além de que clusters paracomputação de aplicações distribuídas são cada vez mais necessários, podemos dizer queErlang possui características interessantes para esses tipos de sistemas ao permitir umnível maior de abstração aos programadores que precisem de concorrência e paralelismo.