50
RECUPERAC ¸ ˜ AO DE CAT ´ ASTROFE EM SISTEMAS DE ARMAZENAMENTO DISTRIBU ´ IDOS QUE UTILIZAM HADOOP Leonardo Ribas Machado das Neves Projeto de Gradua¸c˜ao apresentado ao Curso de Computa¸c˜ ao e Informa¸c˜ ao da Escola Polit´ ecnica da Universidade Federal do Rio de Janeiro como parte dos requisitos necess´ arios para a obten¸c˜ ao do grau de Engenheiro de Computa¸c˜ aoeInforma¸c˜ao. Orientador: Marta Lima de Queir´ os Mattoso Rio de Janeiro Maio de 2015

Recuperação de Catástrofe em Sistemas de Armazenamento

  • Upload
    hoangtu

  • View
    214

  • Download
    1

Embed Size (px)

Citation preview

RECUPERACAO DE CATASTROFE EM SISTEMAS DE ARMAZENAMENTODISTRIBUIDOS QUE UTILIZAM HADOOP

Leonardo Ribas Machado das Neves

Projeto de Graduacao apresentado ao Cursode Computacao e Informacao da EscolaPolitecnica da Universidade Federal do Rio deJaneiro como parte dos requisitos necessariospara a obtencao do grau de Engenheiro deComputacao e Informacao.

Orientador: Marta Lima de Queiros Mattoso

Rio de JaneiroMaio de 2015

RECUPERACAO DE CATASTROFE EM SISTEMAS DE ARMAZENAMENTODISTRIBUIDOS QUE UTILIZAM HADOOP

Leonardo Ribas Machado das Neves

PROJETO SUBMETIDO AO CORPO DOCENTE DO CURSO DECOMPUTACAO E INFORMACAO DA ESCOLA POLITECNICA DAUNIVERSIDADE FEDERAL DO RIO DE JANEIRO COMO PARTEDOS REQUISITOS NECESSARIOS PARA A OBTENCAO DO GRAU DEENGENHEIRO DE COMPUTACAO E INFORMACAO.

Examinadores:

Profa. Marta Lima de Queiros Mattoso, Dr.

Prof. Alvaro Luiz Gayoso de Azeredo Coutinho, Dr.

Prof. Alexandre de Assis Bento Lima, Dr.

RIO DE JANEIRO, RJ – BRASILMAIO DE 2015

Neves, Leonardo Ribas Machado dasRecuperacao de Catastrofe em Sistemas

de Armazenamento Distribuıdos que utilizamHadoop/Leonardo Ribas Machado das Neves. – Riode Janeiro: UFRJ/POLI – COPPE, 2015.

XI, 39 p.: il.; 29, 7cm.Orientador: Marta Lima de Queiros MattosoProjeto (graduacao) – UFRJ/ Escola Politecnica/ Curso

de Computacao e Informacao, 2015.References: p. 38 – 39.1. Hadoop. 2. Recuperacao de Catastrofes. 3.

Big Data. 4. Sistemas de Armazenamento Distribuıdos.I. Mattoso, Marta Lima de Queiros. II. UniversidadeFederal do Rio de Janeiro, Escola Politecnica/ Curso deComputacao e Informacao. III. Tıtulo.

iii

Three things are certain in life:Death, taxes and lost data.

David Dixon

iv

Agradecimentos

Aos meus pais, que incondicionalmente apoiaram e acreditaram em meu sucesso,ate quando nem eu mesmo acreditava, e a minha avo que, em vida, sempre irradiouorgulho. Essa jornada seria impossıvel sem voces e todo o amor, suporte e carinhoque me deram.Ao Pedro Diverio, Bruno Monzani, Guilherme Vargas, Vitor Piffer, Fernanda Conde,Aline Lombardi e todos meus amigos de longa data que, apesar da minha ausenciaem tantos momentos, sempre me acolheram quando precisei. Obrigado pelas cerve-jas, tanto as para comemorar quanto as para afogar as magoas.Aos meus amigos da Fluxo, em especial CCE e os que formaram minha DP. Aprendimuito como voces, mas o mais importante foi sempre acreditar. Tenho muito orgulhode dizer que um dia fui parte desta equipe.Aos meus amigos da ECI e da Mecanica, obrigado por dividirem os desafios comigo.Fico feliz de ter conhecido todos voces ao longo do caminho.A Ana Maria Gurgel, Mariana Wink, Victor Silva, Joao Marcos Conradi e todosmeus amigos de Cornell, obrigado por terem sido a minha famılia enquanto eu estivelonge da minha e de me empurrarem alem do meu limite. A Julie Simmons-Lynch eTim Devoogd, por terem proporcionado essa oportunidade que definitivamente mu-dou minha vida. Aos professores John Hopcroft e David Mimno, por me orientaremdurante meu perıodo de adaptacao e abrirem oportunidades quando mais precisei.Aos meus amigos da Pivotal, por todo o conhecimento e apoio ao longo do estagioe deste projeto. Agradeco a todos pela confianca e por terem me acolhido tao bem.Agradeco especialmente ao Shivram Mani, por ter se disponibilizado a ser meu men-tor ao longo de todo o projeto e por ter sempre se preocupado com relacao a minhacarreira.Ao professor Jonas Knopman, por ter me apresentado a computacao e, consequente-mente, transformado meu futuro profissional. Ao professor Alexandre Evsukoff e aprofessora Marta Mattoso, por terem depositado sua confianca em meu trabalho depesquisa e por terem me orientado ao longo de toda essa jornada.Um agradecimento especial a professora Marta Mattoso por ter me orientado aolongo deste projeto e ao meu pai e a Danielle Cosntancio por me auxiliarem narevisao do texto.

v

Resumo do Projeto de Graduacao apresentado a Escola Politecnica/COPPE/UFRJcomo parte dos requisitos necessarios para a obtencao do grau de Engenheiro deComputacao e Informacao.

RECUPERACAO DE CATASTROFE EM SISTEMAS DEARMAZENAMENTO DISTRIBUIDOS QUE UTILIZAM HADOOP

Leonardo Ribas Machado das Neves

Maio/2015

Orientador: Marta Lima de Queiros Mattoso

Curso: Engenharia de Computacao e Informacao

Com a crescente demanda pelo tratamento e armazenamento de dados, tanto naindustria como na academia, decorrente da grande quantidade de dados gerados, sejapor consumidores, experimentos cientıficos ou qualquer outro processo, aplicacoescapazes de realizar essa tarefa de forma eficiente sao cada vez mais utilizadas, exigi-das e aprimoradas. O Apache Hadoop e uma das plataformas voltadas para a analisee tratamento de grandes volumes de dados, conhecidos como “Big Data”, que maisvem se popularizando ao longo dos ultimos anos e que tem se mostrado muito efi-ciente tanto para realizar as tarefas como tambem por sua resiliencia no caso defalhas de alguns nos de sua maquina paralela. Contudo, por mais eficiente que sejaessa tolerancia a falhas, o Hadoop ainda nao conta com um mecanismo padrao pararecuperacao de falhas mais graves, normalmente decorrentes de catastrofes naturaiscomo enchentes e terremotos, onde um cluster inteiro pode parar de funcionar. Estasfalhas, por mais que raras, sao extremamente custosas nao apenas pela indisponi-bilidade das aplicacoes, mas principalmente pelo risco da perda de dados, muitasvezes valiosos.Este trabalho tem como objetivo estudar as solucoes vigentes e propor um novo pro-cedimento capaz de suprir a necessidade de instituicoes que dependem do Hadooppara realizar seus servicos e que nao podem arriscar sofrer o prejuızo causado pelaocorrencia de catastrofes.

Palavras-Chave: Hadoop, Recuperacao de Catastrofes, Big Data, Sistemas deArmazenamento Distribuıdos.

vi

Abstract of the Undergraduate Project presented to Poli/COPPE/UFRJ as a partialfulfillment of the requirements for the degree of Computer and Information Engineer.

A DISASTER RECOVERY SOLUTION FOR HADOOP CLUSTERS

Leonardo Ribas Machado das Neves

May/2015

Advisor: Marta Lima de Queiros Mattoso

Course: Computer and Information Engineering

With the growing demand for data processing and storage both in industryand academia, due to the large amount of data generated, whether by consumers,scientific experiments or any other process, applications capable of performing thistask efficiently are being increasingly used, required and improved. Apache Hadoopis a platform aimed at the analysis and processing of large amounts of data, alsocalled ”Big Data”, that has become popular over the past few years and that hasproven to be very efficient not only to perform this tasks but also for its resiliencein case of node failure of its clusters. However, as efficient as it is, Hadoop doesnot yet have a standard mechanism for recovery of more serious problems, usuallycaused by natural disasters such as floods and earthquakes, where an entire clustermay stop working. These failures, however rare, are extremely costly for not onlystopping the applications from running, but also for the loss of valuable data.This project therefore aims to study existing solutions and propose a solution thatwould meet the needs for institutions that rely on Hadoop to perform their servicesand cannot afford to suffer the damage caused by the occurrence of disasters.

Keywords: Hadoop, Disaster Recovery, Big Data, Clusters.

vii

Contents

List of Figures x

Glossary xi

1 Introduction 11.1 Motivation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 21.2 Objective . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 21.3 Organization . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 2

2 Apache Hadoop 42.1 Hadoop Overview . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 42.2 Components . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 5

2.2.1 Hadoop Common . . . . . . . . . . . . . . . . . . . . . . . . . 52.2.2 HDFS(Hadoop Distributed File System) . . . . . . . . . . . . 62.2.3 MapReduce Framework . . . . . . . . . . . . . . . . . . . . . . 82.2.4 YARN . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 8

3 Literature and Industry Review 113.1 DistCp Backup . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 113.2 Apache Falcon . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 123.3 Proposed Solution on Apache Foundation JIRA . . . . . . . . . . . . 123.4 WANdisco Distributed Coordination Engine (DConE) . . . . . . . . . 13

4 Proposed DR Solution 144.1 Architecture and Design . . . . . . . . . . . . . . . . . . . . . . . . . 14

5 Building a Disaster Recovery Engine Prototype 175.1 The EventListener . . . . . . . . . . . . . . . . . . . . . . . . . . . . 175.2 The OpCommunicator Interface . . . . . . . . . . . . . . . . . . . . . 19

5.2.1 RabbitMQ . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 205.2.2 Implementation . . . . . . . . . . . . . . . . . . . . . . . . . . 21

5.3 The OpApplier . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 21

viii

5.3.1 The StatusTracker . . . . . . . . . . . . . . . . . . . . . . . . 225.3.2 The OperationHandler . . . . . . . . . . . . . . . . . . . . . . 23

5.4 The FileTransfer Interface . . . . . . . . . . . . . . . . . . . . . . . . 265.4.1 The SingleFileTransfer . . . . . . . . . . . . . . . . . . . . . . 265.4.2 The BulkFileTransfer . . . . . . . . . . . . . . . . . . . . . . . 275.4.3 The DataValidator . . . . . . . . . . . . . . . . . . . . . . . . 28

5.5 Configuration Files . . . . . . . . . . . . . . . . . . . . . . . . . . . . 28

6 Results 30

7 Conclusion and Future Work 36

References 38

ix

List of Figures

2.1 Simplified Hadoop Structure . . . . . . . . . . . . . . . . . . . . . . . 52.2 YARN Architecture . . . . . . . . . . . . . . . . . . . . . . . . . . . . 9

4.1 Proposed Solution Design . . . . . . . . . . . . . . . . . . . . . . . . 16

5.1 RabbitMQ queue . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 215.2 Trie Data structure . . . . . . . . . . . . . . . . . . . . . . . . . . . . 24

6.1 Metadata Only . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 316.2 File Copy Operation . . . . . . . . . . . . . . . . . . . . . . . . . . . 326.3 Performance Improvement . . . . . . . . . . . . . . . . . . . . . . . . 336.4 Log Scale Performance Comparison . . . . . . . . . . . . . . . . . . . 346.5 Primary Cluster Performance Impact Comparison . . . . . . . . . . . 35

x

Glossary

AWS Amazon Web Services, p. 1

DFS Distributed File System, p. 6

DR Disaster Recovery, p. 2

HA Highly Availability, p. 37

HDFS Hadoop Distributed File System, p. 6

IaaS Infrastructure as a Service, p. 2

xi

Chapter 1

Introduction

On the information era, more and more data is generated every day. Companies

are investing lots of money on developing software to handle the large amount of

data their customers generate, creating the so-called Big Data. The challenge to

increase the business value of Big Data is changing the computer industry [1]. On

the academia, having access to such huge number of resources is making it possible

to tackle problems that were inconceivable before. Machine learning and artifi-

cial intelligence studies became more relevant since now their results can be more

accurate and converge faster. All this progress requires investments on infrastruc-

ture, improving performance of distributed databases and making cloud applications

cheaper and more reliable.

One widely used platform to handle distributed data is the Apache Hadoop open-

source software library [2]. The project, with almost ten years from its creation,

provides mature high performance reliable solutions capable of scaling to huge loads

of data, even tera or petabytes. Relying on a distributed file system inspired on the

Google file system [3], and supporting the MapReduce framework, also developed

by Google and published as a paper [4]. Because it has a large user community,

Hadoop is supported by several cloud computing platforms like AWS (Amazon Web

Services) and some of them even have their own proprietary versions of it, like

Pivotal Software and Cloudera.

1

1.1 Motivation

Although Hadoop is designed to be fault-tolerant on the node level [5], making a

single machine or even an entire rack failure transparent to the client users, for really

busy clusters with thousands or even tens of thousands of nodes, this might not be

enough. In the case of a natural disaster, such as earthquakes or floods, or any other

disaster capable of destroying or bringing down a whole cluster, like a fire or a power

outage, the company will suffer with downtime and/or data loss.

Companies invest millions of dollars on distributed systems and even have their core

business based on them, like the IaaS (Infrastructure as a Service) products sold by

large enterprises. In the case of losing data and facing a significant downtime the

responsible will also suffer from financial losses and might even have its reputation

of being a reliable service to be damaged.

1.2 Objective

The current proposal has the objective of designing, prototyping and testing a DR

(Disaster Recovery) solution capable of minimizing the impact of a catastrophe on

the operation of a Hadoop cluster, curtailing the decline in performance and data

loss. This project scheme will derive from analyzing the existing solutions, highlight-

ing their advantages and disadvantages and architecturing an application to improve

on them. The implemented solution will be tested on a controlled cluster provided

by Pivotal Software Inc. as part of an agreement after the student responsible for

this project have participated on an internship on their Hadoop Team.

1.3 Organization

On chapter 2, an overview of Apache Hadoop is given, introducing important con-

cepts for the proposal to be clearly understood. Chapter 3 presents and analyzes

existing solutions for the disaster recovery problem, highlighting advantages and

2

disadvantages. On chapter 4, the architecture and design of the proposed applica-

tion is explained, discussing why decision were made and what were the alternatives

while chapter 5 describes the implementation steps based on the planned scheme.

Chapter 6 presents the achieved results and chapter 7 discusses future work or what

is left to be done to make this prototype an actual industry level project.

3

Chapter 2

Apache Hadoop

Apache Hadoop is an open-source platform designed to handle reliable, distributed,

scalable applications. It allows the user to deal with large amounts of structured or

unstructured data across computer clusters using simple programming models like

mapreduce, for example. It is designed to be fault-tolerant on single and multiple

node failures and to scale to a huge number of machines.

2.1 Hadoop Overview

The big data studies increasing importance arises from the fact that internet is gen-

erating more and more data every day. In the last few years the human society

has created more data than in the last forty thousand years together. The rea-

son why handling this amount of information might be tricky is because most of

it is composed by unstructured data like voice, images and text. Since unstruc-

tured data requires more processing to make it useful for applications, traditional

databases perform poorly on big data sets as moving petabytes of data in and out

of disk is too costly. For a relational database management system to have a good

performance on situations like this, expressive investment on hardware is necessary,

together with multiple kinds of software to analyze and handle data.

Hadoop, then, became a better solution when huge volumes of mostly unstructured

data must be processed in a regular basis. Most of all, Hadoop, by allowing a signif-

4

icant cost saving, enabled a new range of possibilities for applications approaching

intensive and unstructured data, such as Clickstream Analytics, Sensor Data and

Sentiment Analysis, which were previously restricted by cost. [6]

2.2 Components

A simplified structure of a Hadoop Cluster is shown in image 2.1. The main com-

ponents are the Hadoop Common, which is not specified on image 2.1 because it

works on all levels, the MapReduce Framework, the Yarn resource manager and the

Hadoop Distributed File System. Moreover, some other related projects are built

on top of this framework, like Hive, Pig and Mahout. The principal components

and some of this side projects will be discussed in this section to give the reader a

better understanding of the Hadoop Ecosystem.

Figure 2.1: Simplified Hadoop Structure.

2.2.1 Hadoop Common

Contains the utilities and libraries to support the other modules. If an application

intends to run on top of Hadoop, it will most likely use functions from the classes

inside Hadoop Common.

5

2.2.2 HDFS(Hadoop Distributed File System)

The Apache Hadoop project is based on two important features, the MapReduce

Framework and the HDFS (Hadoop Distributed File System), also known as DFS.

Both were initially inspired on papers from Google [3] [4] and are fundamentally

important for Hadoop.

Since Hadoop computations are distributed among several nodes on a cluster or

even between clusters, it would be extremely hard to manage all machines if each

of them had a separate file system. Realizing that, Google Engineers projected and

implemented the Google File System, a distributed file system to handle massive

parallel computation. A distributed file system relies on scalability, performance

and reliability and providing a high quality service implies on addressing this three

concerns.

For handling terabytes or even petabytes of data in a distributed fashion, the file

system must be scalable to handle thousands or even tens of thousands of nodes

without losing much performance and throughput. Furthermore, it is designed to

work on commodity hardware, so the DFS must be fault-tolerant enough to handle

nodes failures and making them unnoticeable to clients and assuring that no data

is lost on the process.

Apache Hadoop followed the same concepts with its own DFS and has based the

implementation into three components: The Namenode, Secondary Namenode and

Datanodes. [7]. This section will discuss more about those components, their inter-

action and important information regarding the proposed disaster recovery solution.

As the focus of this discussion is not going deep into the Hadoop internals but to

give the necessary overview for the reader to understand the important concepts,

only the standard configuration of a cluster will be discussed.

2.2.2.1 Namenode

The Namenode is responsible for handling the HDFS metadata operations (like copy,

append, move, ...) and the filesystem namespace. Under the master/slave architec-

6

ture on which the DFS is designed, the Namenode is the master server, regulating

access to files by clients. It determines the block mapping of the Datanodes.

Regarding the proposed disaster recovery solution, two files are extremely impor-

tant: The editlog and the FSImage. The first one is a log file containing all the

metadata operations (also called transactions) related to modifying a file, the so

called write operations. Among them are the copyFromLocal and appendToFile

operations, which copy a file or add its content to a file on the DFS, the operations

to change permissions, owners and groups of files and folders and the rename, move

and delete operations, between others. The only transactions not persisted to the

editlog are the read operations.

The editlog is encrypted and, consequently, not human-readable. To be able to deal

with the persisted information it is necessary to use methods from the Hadoop Com-

mon libraries. When decrypting the transactions, the only possible way to read all

the metadata stored for each operation is by getting a comma separated value (csv)

string of it. Possibly, if using non-public methods of the Hadoop core libraries, there

should be other ways, but, during the solution study, this was the only accessible

method to do it.

The FSImage file contains the last known to be good state of the filesystem. The

Namenode reads it on start and applies all pending edits from the editlog. This file

is not directly important for the proposed solution, but will create some restrictions

to it that should be discussed on a further section.

2.2.2.2 Secondary Namenode

On a busy Hadoop cluster, the editlog files might get really big. On the case of

failure, it would take a long time to merge the FSImage and the edits on the node

and make a longer startup time. To avoid that, the Secondary Namenode works by

applying checkpoints. Periodically or based on configurable parameters it merges

the editlog to the FSImage so, in the case of the Namenode going down, it has only

to retrieve the most recent FSImage and apply the remaining edits.

7

2.2.2.3 Datanode

If the Namenode deals with the metadata, the Datanode stores the actual files.

A file receives a block identification and is replicated on several Datanodes. The

Datanode keeps on communicating with the Namenode through heartbeats, signals

to inform the monitor that the node is still active, and can talk directly to client

applications.

2.2.3 MapReduce Framework

MapReduce is a simple programming model to handle large amounts of data in

parallel, dividing the work on independent small tasks. Based on the divide and

conquer principles, the mapreduce framework come from the paper wrote by Google

engineers and is widely used on big data projects. Most Hadoop side projects are

built on top of the MapReduce therefore it is important for the disaster recovery

scheme to look for corner cases originated by these jobs.

For the first prototype, DistCp was used, a distributed file transfer tool to copy files

from a HDFS to another that uses MapReduce to move files in bulks.

2.2.4 YARN

It is easy to imagine that, in a cluster with thousands of nodes, a lot of mapreduce

jobs are generated in parallel and it is not trivial to make them run smoothly among

all nodes. For that reason Hadoop has a resource manager and, since the release of

Hadoop 2.0, it is called YARN. YARN manages the Datanodes and scheduling re-

sources based on their availability. It is composed by four components, the Resource

Manager, Node Manager, Application Master and Container.

8

Figure 2.2: The Yarn Architecture for HDFS.

2.2.4.1 Resource Manager

The resource manager is composed by a scheduler and an application manager. The

first one is, as the name suggests, a scheduler to arbitrate the available resources

among the competing applications. The second one is responsible for handling job-

submissions and allocating the containers to first deal with an application master.

Moreover, it provides the service to allow the application master’s container to

restart on failure.

2.2.4.2 Node Manager

A node manager is responsible for monitoring a node and reporting its status to the

Resource manager. The monitoring policies are set by the Resource Manager itself

and are related to the resources to be used by the containers inside the node, like

CPU, memory, network, etc.).

9

2.2.4.3 Application Master

The application master is per-application and is responsible for negotiating resources

containers from the Resource Manager, tracking status and monitoring for progress.

2.2.4.4 Container

A container is a specific amount of resources, like memory and CPU, which an

application is granted rights to use on a specific host.

10

Chapter 3

Literature and Industry Review

By the time this project is done, there are no standard solutions for Disaster Re-

covery on Hadoop. As disasters are not so frequent, institutions tend to postpone

their investment on DR so that they can improve other areas of their applications.

Although it is an understandable decision, the huge losses involved should be taken

into consideration. This chapter studies the most common Disaster Recovery ap-

proaches so far, looking for ways to better solve the problem.

3.1 DistCp Backup

A commonly used solution for disaster recovery is the backup of the whole primary

cluster to a mirror cluster. A popular way of completing this task is the use of

DistCp [8], a map-reduce based tool that allow data to be transferred inter/intra-

clusters, between two distributed file systems. Since it relies on the map-reduce

framework, it inherits its perks like the error handling and recovering capability.

DistCp is optimized to copy large files or a high number of files at once, what makes

it an efficient weapon for backing up an entire cluster. In addition, it is easy to

use, making it fast to be put into production. On the other hand, because it is only

the copy of all modified files, it requires a scheduler application to trigger backups,

what creates a time window where the primary cluster and the mirror cluster are

not synchronized. Additionally, as DistCp causes overhead and use of memory on

11

the primary cluster, copying all the modified files of a busy cluster might strongly

affect performance.

3.2 Apache Falcon

Although it is possible for Apache Falcon [9] to be used as a disaster recovery

solution, it is not the main reason why it was developed. Falcon is a data man-

agement and pipeline processing framework that allows users to easily orchestrate

data motion, retention and replication workflows and simplifies its configuration.

By allowing HDFS files to be replicated between multiple clusters, it can also be

used as a DR solution. The biggest problem with this approach is the same as the

DistCp, the mirror cluster inconsistency between backups.

3.3 Proposed Solution on Apache Foundation

JIRA

JIRA [10] is a popular issue-tracker used on major software companies to keep

track of bugs and issues and manage better their projects. The Apache Software

Foundation, the non-profit corporation responsible for supporting Hadoop and other

Apache projects, is one of the companies to use JIRA and have discussion topics for

suggested solutions for known issues or improvements on existing projects.

On one of those discussions, on the HDFS part of the forum, a group of Intel

researchers proposed a yet to be implemented Disaster Recovery solution for Apache

Hadoop [? ]. The solution, planned to be released with Hadoop 3.0.0 but still not

confirmed, have an asynchronous and a synchronous way of operating to be selected

by the cluster administrator. Both require changes to the HDFS structure so that it

starts supporting the concept of mirror cluster. Furthermore, Namenodes on both

primary and secondary clusters will act differently and communicate with each other.

The block replication policies will also have to be changed, since it will now need to

12

adapt to replicating data to the secondary cluster. An important improvement of

this solution is the capability of having multiple mirror clusters.

Synchronous replication allows a zero data loss disaster recovery solution, which

means we can guarantee clusters are in sync at any time and no data will be lost

in case of a disaster. However, to allow this to happen, a performance drop is

expected on the primary cluster due to the increase of work on the Namenode. If

data availability is extremely important, this might be acceptable; otherwise an

asynchronous approach is more likely to be chosen by the cluster administrator.

The asynchronous replication works by scheduling the file replication, falling on the

same problems as Apache Falcon and DistCp of relying on eventual consistency but

allowing an inconsistency state time frame between backups.

3.4 WANdisco Distributed Coordination Engine

(DConE)

WANdisco is a company founded in 2005 and specialized on distributed computing.

In addition to their own Hadoop version, they also provide distributed versions of

version control software like Git and SVN and a disaster recovery solution [11]. On

all their projects they claim to reach 100% uptime and distributed access to data,

all structured on a Paxos algorithm [12] based solution, allowing a zero data loss

service on multiple clusters. Despite providing close to zero downtime and ensuring

the consistency at all times, the additional work necessary for clusters to operate

on a conscientious environment is expected to affect performance, making it a great

solution for when high availability is needed but requiring more thought if not.

13

Chapter 4

Proposed DR Solution

As discussed on chapter 3, the existing solutions can be divided into synchronous

replication approaches, where the primary cluster performance is sacrificed for the

guarantee that data in both clusters will be consistent at all time, also called zero

data loss, and the asynchronous data sync, which schedule the synchronization and

rely on eventual consistency, allowing a time frame where the data won’t be syn-

chronized.

This project proposes a solution that counts on eventual consistency to minimize

the footprint on the primary cluster, but, instead of waiting for a scheduled backup,

it synchronizes changes on each operation applied on the primary cluster with a low

latency inter-cluster communication and a high throughput on applied operations.

It puts together the benefits of not adding much extra work on the Namenode and

maintaining the clusters on similar states at all times.

4.1 Architecture and Design

The proposed disaster recovery architecture has two clusters: the primary or active

cluster and the secondary or mirror cluster. As shown in figure 3.1, the main idea is

to have a component tracking performed transactions on the Namenode and flagging

them to an operation filter. This block filters relevant information and sends it to a

transmitter that delivers the information to the mirror cluster. There, the operation

14

is applied and, if necessary, data is copied from the primary cluster’s Datanodes to

the secondary cluster.

As discussed on chapter two, the Namenode has a file called editlog. This file

contains information regarding all write operations on the cluster, updated on an

incremental order like every other file on HDFS. Since all new operations are written

to the end of the file, it is easy to keep track of the new transactions applied to the

cluster, a task that is performed by an OperationTracker on the primary cluster.

Not all operations should be sent to the mirror cluster, though. Transactions re-

lated to block id, for example, are not important to the secondary cluster because

the machines are different and files are not organized on the same block disposition.

In order to minimize the used bandwidth of the application, which is an important

aspect of this architecture as clusters might be geographically distant, only the im-

portant operations are transmitted. This might create extra work on the primary

cluster but it is not enough to make visible impact on performance.

The communicator block, shown on figure 3.1, must ensure that all operations are

handled at the same order as they were sent. If a user has applied two operations on

the same file, the last of them should be the one to persist; therefore a strict order

must be followed. More, in case the communicator goes down, it should be able to

recover without losing information. File copying is the most expensive part of the

architecture since the necessary communication with the primary cluster impacts

performance. To minimize this effect, the operation applier has to work together

with the file copier to skip unnecessary copies or copy multiple files together when

possible. A bulk copy allows for only one connection to be open for multiple files,

what helps minimize the decrease in performance.

Analyzing the editlog allow operations to be read just after they are applied. Fil-

tering unnecessary transactions minimize the bandwidth use and allowing the op-

erations to be serially transmitted ensures the same state is kept on both clusters.

In addition, it is important that if operations need to be applied again, in the case

of failure, that the final state is the same. Intelligent file copy is another important

15

factor. The discussed architecture, then, takes all problems into consideration to

build a reliable disaster recovery solution.

Figure 4.1: Proposed Solution Design.

16

Chapter 5

Building a Disaster Recovery

Engine Prototype

5.1 The EventListener

As explained in Chapter 2, the Namenode has a file called editlog responsible for

keeping track of all operations taking place on the cluster. If a file is moved or

renamed, the editlog stores a RenameOldOp, when a folder is created, a log for an

MkdirOp is written to it and so it is done for all kinds of transactions. For the

design of the proposed application, dividing those operations into two groups was

an efficient way of approaching the two kinds of transactions.

The first group, the metadata operations, is composed by operations that do not

involve the creation and the change of paths of a file like logs for the creation of a

folder (mkdir), changing properties of the file (chmod, chown, chgrp...), etc. . . This

distinction is important because it allows the application to apply smart ways of

synchronizing the clusters. There is no need to copy a file only because the permis-

sions have changed if you can apply the same commands on the Mirror cluster and

obtain the same state, avoiding network traffic and reducing latency. In addition,

using multiple threads for applying operations and copying files becomes feasible

after knowing which metadata operations can be applied while a file is being copied

17

and which ones have to wait for it to be available.

The second group is called file operations and involves the creation, renaming, mov-

ing and deletion of files. When any of these operations takes place, the path of a file

changes and, because the proposed solution relies on eventual consistency, they have

to be handled with caution. More information on how the Mirror cluster handles

inconsistencies will be explained in the next chapter.

Relying on the operations persisted to the editlog, a Daemon called EventListener

runs on the Primary Cluster checking for changes on the log file. Here, two so-

lutions were tested. The first was to make the Daemon to check uninterruptedly

if the size of the editlog had changed, allowing immediate propagation of the new

transactions. As all Hadoop files, the only way for a file to change its size is when

bytes are added to the end of the file, since only append operations are allowed. So,

when the size changes, the new operations are at the end of the file. The second

approach was setting a timed interval, configurable on the dre site.xml file, for the

size to be checked. This would make the synchronization to have a bigger gap but

should use less memory from the primary cluster. After tests, the second option was

chosen as the purpose of the application was to minimize the effect of the Daemon

on the primary cluster.

After detecting changes to the editlog, the Daemon uses methods from the Hadoop

Commons library to be able to read the transactions. By extracting the kind of

operation and the paths and parameters related to it using regular expression meth-

ods, the EventListener can now propagate the new operation to the Communicator.

As the clusters are not completely identical, some operations are not interesting for

the Mirror cluster, such as operations related to the disposition of blocks. In order

to minimize the network usage, a filtering mechanism runs before the operations

are sent to the other cluster, only sending the important ones. Although the filter

results on more processing on the Primary Cluster, the performance impact was

measured and considered not to be significant.

An important factor on Hadoop clusters is recovery. As explained in Chapter 2,

18

the framework is based on commodity hardware with small mean time to failure

(MTTF) and creating a mechanism for efficient failure recovery is of utmost impor-

tance to the application. All three modules, the listener, the communicator and the

operation receiver have their own recovery mechanism. For this section, only the

Listener recovery engine will be explained.

An editlog can have two statuses, in progress and finished. There is only one editlog

in progress at a time and it is responsible for receiving and persisting the most recent

operations. This log is named edits inprogress 0000000000000000003, being the last

number the transaction id for the first transaction persisted to the file. When an

editlog is too big, with over a million operations, or if the user has specified on the

Hadoop configuration files a time period for the editlog to be persisted, it is renamed

to edits 0000000000000000001-0000000000000000002, being the number before the

hyphen the transaction id for the first transaction on the file and the second number

the id for the last persisted transaction.

Running in parallel to the Daemon, an EventStatusTracker keeps track of all trans-

actions read from the editlog. It takes advantage of the format which the log is

named to determine from which log file to read and if it is already up-to-date. Also,

if reading from an old log, the status tracker reads all transactions until it is in sync

with the most recent updates even if they were already split into several editlog

files, persisting the information to disk. As disk access is an expensive operation

that could cause the application to impact performance on the primary cluster, a

parameter must be set on the dre site.xml so that the user can decide how many

transactions should be read before the EventStatusTracker persists the status to

disk. This is an important decision to be made because, in case of failure, the

listener will have to read all the non-persisted transactions again.

5.2 The OpCommunicator Interface

As the main goal of the project is to provide a disaster recovery solution, it is ex-

pected that the Primary and Mirror clusters are on different Geographical locations.

19

Are they in different cities, countries or even continents, the communication should

be efficient and reliable, having high throughput. To build such a communication

platform would be extremely complex and time consuming, going beyond the scope

of the project. In order to achieve the desired results, the open source message

broker software RabbitMQ was used to build the communication between clusters.

In order to make it possible for change the message broker in the future without too

much effort, the OpCommunicator interface was created to make the solution to be

modular.

5.2.1 RabbitMQ

RabbitMQ [13] is an open source message-oriented middleware that implements

the AMQP [14] protocol. It enables communications between applications and has

interfaces for all major programming languages, enabling them to communicate using

the same broker. The application, first developed by Rabbit Technologies Ltd. is

now part of the Pivotal initiative and provides scalable and robust messaging with

flexible routing and high availability.

The RabbitMQ was a good fit for the project primarily for its queue structure and

for being capable of handling millions of operations per minute, what would be

enough even for large clusters. Once a message fails, the queue will retransmit all

messages from that point. Since the state of the cluster should be the same if all

operations are reapplied, it turns as a good recovery mechanism for when the whole

solution fails. Furthermore, the broker can persist the queue to disk and, in case of

failure of the node hosting the RabbitMQ server, no operations would be lost.

20

Figure 5.1: The broker sends all messages but keeps track of the last received ack.

5.2.2 Implementation

The OpCommunicator was created as a singleton on each Daemon and uses an

OpCommunicationObject to keep track of operation status and parameters. When

the EventListener sends a new operation to the Communicator, it encapsulate this

object on the message and set parameters like the transaction id, which kind of

operation it is, the necessary paths and related metadata.

After being received by the Mirror Cluster, if applied correctly, a positive acknowl-

edgement is sent back to the broker and the space in queue is cleared. On the other

hand, if any problems make it impossible for the cluster to handle the operation, it

sends a negative acknowledgment and the broker sends all operations again. This

structure allows a robust failure recovery solution and guarantees that all messages

will be received and applied in the order they were sent, a vital characteristic of the

solution.

5.3 The OpApplier

The most important part of the solution is the ability of the Mirror cluster to be in

sync with the Primary cluster without making significant impact on its performance

and by minimizing the inconsistency time window between the operation happening

on the first and applied on the second. For this to happen, the Daemon running

21

on the Mirror cluster needs to optimize the way operations are applied and files are

copied. Also, as the other two modules, the OpApplier must be able to recover from

failure. In addition, as a crash may take long to recover, sometimes it will be more

efficient to simply check the inconsistencies and copy the files rather than applying

millions of operations, something the application must be aware of and capable of

deciding the best option based on users parameters.

5.3.1 The StatusTracker

For keeping track of the Daemon status and persisting the information necessary

in case of failure to disk, the StatusTracker stores in memory the oldest successful

transaction or the last negative acknowledgment sent. This task was easy to

implement for the first version of the solution, which would handle all operations

serially, but for the final version, where the operations are applied in parallel, the

notion of last transaction was vague. For this reason, the concept of last negative

acknowledgment was inserted.

Suppose a situation where two different files are being modified. File F1 receives

operation O1 and file F2 receives operation O2 some seconds after. The mirror

cluster, since they are different files, will handle the operations in parallel. O2 is

successfully applied to F2 but O1 fails just before this happen and sends a negative

ack without persisting it to disk and then O2 persists its id to the log. Before the

Communicator is able to send all operation again after receiving the negative ack,

the system crashes. In this particular case, the Daemon would restart and would

not know about O1 failure, since a more recent transaction was persisted. To

handle this case, storing the negative ack id showed up to be an efficient solution.

At the creation of the log on the first start of the cluster, the value is set to be a

dummy value. After the first negative ack, the operation id is stored before the

ack is sent to ensure it will not send an ack without persisting and, in case of a

system failure, both ids, the neg id and the last operation id, are compared and

the older one is used as start point. In case the ack is sent and the operations

22

are reapplied without failure, the dummy value is persisted again, representing no

negative acknowledgments sent.

Another important feature of the StatusTracker is the cold start. On a busy cluster,

millions of operations may happen per hour and, in the case of failure, it is faster to

synchronize the clusters rather than reapplying all operations on the mirror cluster.

The cold start mechanism works by checking the amount of operations stored on the

Communicator queue before applying them. If it is greater than a specific threshold

set on the configuration file, the Daemon ignores all operations on the queue until

that moment and runs a checksum between all files on the primary cluster and all

files on the mirror cluster. The files that are different on the primary cluster or do

not exist on the mirror cluster are copied. As experiments have shown, for more

than 10.000 (ten thousand) operations, it is usually more efficient to trigger a cold

start than reapplying all operations. However, this metric depends on the specific

characteristics of the cluster and was made configurable on the configuration file.

5.3.2 The OperationHandler

On start, if not a cold start, the OperationHandler reads all operations on the Com-

municator queue into memory before applying them. This step is needed because of

the most difficult problem faced by the application, the files that have been moved

or renamed before operations have been applied to them.

Suppose a situation where a file named F1 is created by a mapreduce job. Just

after it has been created, a permission change operation is applied to it and then

it is renamed to file F2. The three operations, the creation, permission change and

rename operation will be propagated to the mirror cluster. When it tries to copy

the file F1, it will not find the path on the primary cluster, since now the file is

named F2. Without an algorithm capable of finding the right path, the application

would fail to synchronize the clusters and the consistency would never be achieved.

To handle this situation common on mapreduce jobs and, as a consequence, on all

23

Hadoop clusters, two algorithms were implemented and tested.

The first proposed algorithm was the implementation of the trie data structure.

Having the folder “/” as the start, the trie would have each of its nodes as a folder

and the leaves as files.

Figure 5.2: Trie of file paths.

On the previous situation, when loading the rename operation, the trie would

change the pointer of F1 to F2. When the FileTransfer tries to copy the file, it would

look for the new path instead of the old path and copy the new file. It is true that

the copied file will be in a more advanced state than the cluster, but after applying

all the operations and relying on eventual consistency, both clusters will be on the

same state. After receiving the rename operation, the pointer from history/ to F1

would be deleted to avoid unnecessary information to be kept in memory.

Although the trie data structure is an efficient solution, the implementation is more

complex and handling the change of pointers on an environment with multiple

threads would require the use of locks and a more advanced mechanism to guar-

antee consistency. In order to minimize the complexity of the system and make it

easier for changes to be made during the first versions of the project, a simpler but

still efficient solution was implemented.

As a second approach, a HashMap was built with the keys being the previous path

to a file and the values as the actual path after the file has been renamed or moved.

After every rename or move operation arriving to the cluster in-memory queue, a

24

new entry is inserted to the HashMap and, after each of them is applied to a file, the

entry is removed. In addition, all values of the HashMap are checked and updated

if necessary, for the case which a folder is renamed and the file is inside the folder.

To exemplify this problem, suppose the folder “history” on figure 5.2 is renamed

to “history2”. If an operation was to be applied to file F1 or F2, it would not find

the file because the actual path would be “/user/history2/F1” instead of the stored

path “/user/history/F1”.

Despite checking if a path is in a HashMap is a O(1) operation, checking if all paths

are up-to-date requires traversing all values, making the operation to be O(n), what

will affect performance and require the HashMap to handle concurrent access when

dealing with multiple threads.

The second implementation was used on the project because the performance dif-

ference was negligible. On a production level, the trie could be rethought and

implemented again with better testing.

Another important property of the OperationHandler is the capability of handling

operations in parallel. As files can be renamed or moved and the order of the op-

erations applied to each file matter, it is not trivial to make multiple threads to

work simultaneously. To solve this problem, a semaphore approach was used. The

operation in-memory queue can only be accessed by one thread at a time. This ac-

tive thread starts applying operations until a file operation arrives. A file operation

implies a call to the FileTransfer to copy the file from the Primary cluster. Before

start copying the file, the thread read all operations on the beginning of the queue

until the first one is not a file operation. Thus, the FileTransfer copies all the files at

once and minimizes the overhead on the Primary cluster. Moreover, each file being

copied has its path added to a HashSet so it is easy to check if a file is being copied

by another thread.

After triggering the FileTransfer, the thread releases the semaphore and a new

thread starts to read the operations from the queue. When it finds a rename or

move operation, it checks if the old path is on the HashSet and, if it is, it adds the

25

new path to the set. This way, a thread ignores all metadata operations applied to

files that are being copied just by checking the HashSet. When the file is copied

or when the rename or move operation is going to be applied, it removes the entry

from the set and the thread starts, as soon as the semaphore is free, from the next

operation after the file operation, applying all the metadata operations related to

that file.

An important optimization arises from the use of a HashSet. When another file op-

eration comes on a file that is being copied, it is ignored by the OpApplier as long as

the file transfer is successful, avoiding multiple copies of the same file. On situations

where a file is being incrementally changed, the FileTransfer will copy only the most

up-to-date version and avoid unnecessary work and use of the network.

5.4 The FileTransfer Interface

Like the OpCommunicator, the FileTransfer must be efficient even if both clusters

are in different geographical locations. DistCp, the standard tool for copying files

between Hadoop clusters, is the only free and available option for the job and,

although it creates overhead and affects performance on the cluster where the files

are being copied from, it was used for this project. For copying a large number of

files, the overhead is not too significant, but for single files or a small number of

files, it is not an efficient solution for transferring files in DR. To better prepare

the proposed solution for further studies and improvements, two FileTransfer types

were created, the SingleFileTransfer and the BulkFileTransfer.

5.4.1 The SingleFileTransfer

To minimize the overhead when copying a small number of files, the SingleFile-

Transfer was created. Mostly, for Hadoop clusters, DistCp is used as the only way

of moving files between two clusters. When a thread request files to be transferred,

the Daemon checks if the number of files to be copied are below a user defined

26

threshold, normally set to one file. If it is, it uses the SingleFileTransfer to copy

each of the files separately, using the BulkFileTransfer otherwise. For the project,

despite the overhead, DistCp was used for both FileTransfer interfaces and became

the bottleneck. Creating a new way of transferring files would go beyond the scope

of the current project, but a modularized design, capable of being easily modified

and improved, is essential for a successful solution. In addition, since operations

involving multiples files are more common than moving a single file on MapReduce

operations, the bottleneck would not significantly affect the overall performance of

the application.

5.4.2 The BulkFileTransfer

Copying multiple files together is an efficient way to minimize the file transferring

overhead. The OpApplier tries to put together as many files as possible, but it

faces a barrier as DistCp allows the user to define multiple paths to copy files from

but only one destination path. To handle this limitation, first the OpApplier was

built to copy all files to a temporary folder. The name of this folder would be a

hash value to avoid collisions when copying files from the same folder in two copies,

mostly because more files were created while the first ones were copied.

The implementation difficulty of this first attempt arises when a large number of

files, hundreds or thousands, are copied to the temporary folder and then some of

their paths are renamed during the copy. When a path is renamed, DistCp restarts

the copy with the newly updated paths and having several different folder paths

could cause a lot of restarts on a file transferring operation, making the overhead

to be even larger than separating the copies per folder. The implemented solution,

thus, copies only files of the same folder for each copy. The thread, when saving

the source paths to copy, checks if the file paths are all within the same folder path

before triggering the FileTransfer call, skipping all files on other folders. This way,

each copy, in case of failure, has only one path to be updated, instead of hundreds

or thousands, and, as experiments have shown, it will probably not fail more than

27

one or two times on a crowded cluster, on a worst case scenario.

The overall overhead of adopting the second solution was less than 1% larger while

testing both approaches, what was considered to be a small impact on performance,

compared to the larger time window gap and the increase in overhead derived of

multiple failures when adoption the first solution.

5.4.3 The DataValidator

To check if the files are being copied without errors, a separate Daemon was created

to check if files on the Primary and Mirror clusters are on the same state after being

copied. The DataValidator uses a checksum functionality of the Hadoop Common

library to determine if two files are identical or not. The Daemon is used after a file

is copied, to check if the source and destination folders are identical, after a cold

start, to check if both clusters are in sync, and periodically on random files of the

server, to make sure the other Daemons are working correctly. DistCp has also a

checksum feature too, but some inconsistency regarding file permissions, owners and

groups were found and a tailor-made tool was developed to assure consistency.

5.5 Configuration Files

When installing Hadoop on a cluster, two important configuration files are defined

for HDFS, hdfs-site.xml and hdfs-defaultf.xml. The first one have parameters to be

set by the user to customize the distributed file system, such as defining the IP for

the Namenode, Secondary Namenode and Datanodes, the checkpoint intervals and

other customizable configurations, different for each cluster. The other file defines

the default configurations of the cluster, intended to be changed only by advanced

users on special cases, such as replication level and block sizes.

To make it easier for Hadoop Administrators to use the proposed DR solution,

two configuration files were defined on a similar way. Dre-site.xml, dre standing

for disaster recovery engine, sets the user defined parameters of the application,

28

including the IP of the Primary and Mirror clusters and for the RabbitMQ broker,

the checkpoint interval for persisting the status of each Daemon to disk, the folders

to be ignored by the application or, in case only some folders are to be watched, the

paths to those folders, the time window between reads to the editlog, the number

of files to be set as a threshold for a cold start and other important parameters

for the DR application to work properly on all situations. The same stands for

the dre-default.xml, where information regarding default paths and parameters are

stored. This way, configuring the disaster recovery application is easy and similar

to configuring the HDFS.

29

Chapter 6

Results

During the development of the project and for the experiments done to test the

prototype, an artificial cluster, provided by Pivotal Software Inc., was used. The

cluster would consist of five virtual machines, one for the Admin Node, responsible

for installing Hadoop and monitoring the whole cluster, one for the Namenode, one

for the Secondary Namenode and two Datanodes. All machines had the same spec-

ification, a 64 bits CentOS 6.4 operating system with 4GB of RAM, 40GB of disk

space and 1.8 GHz for the processor speed. For the tests, this set up was used for

the Primary Cluster. For the Mirror Cluster, a MacBook Pro running a 64 bits OS

X v10.9 with up to 3GHz to the processor speed, 4 GB of RAM and 256 GB of SSD

was set as a single node cluster, holding 4 virtual machines to mimic the cluster,

one for the Namenode, one for the Secondary Namenode and two Datanodes.

Having the environment defined, two different performance tests were created. A

DistCp Backup tool was implemented, where a file would be copied after each mod-

ification, being it a metadata operation or a file operation. The implementation

differs from the normally used DistCp Backup for not allowing a big time window

between backups, working closer to how the proposed DR solution does. This way,

although more use of the network is required, the state gap between clusters is

smaller.

The first test was composed only by metadata operation. As the biggest improve-

ment of the solution relies on not having to copy unnecessary files and on applying

30

the metadata operations directly to the Mirror cluster, it would be expected for

the proposed solution to be much faster than the DistCp Backup. So, for this first

test, 150 operations were reproduced on the primary cluster using a shell script.

The operations were composed by creating folders, renaming, changing permissions,

changing groups and changing owners of each folder. The results are shown on figure

6.1 and, as expected, the DRE overcomes the DistCp Backup. As presented on im-

age 6.3, the proposed solution finishes 172.6 times faster than the existing approach,

a result extremely satisfactory.

Figure 6.1: Performance Comparison for Metadata Operations only.

For the second test, a more common scenario was set. Again, around 150 oper-

ations were reproduced but now most of them were file operations. First, the same

file is changed 25 times, appending information to it until it reaches 50 MB of data.

After this step, three word count MapReduce jobs were submitted on the file, where

files were created on specific folders and then moved and renamed by Hadoop.

31

Figure 6.2: Performance Comparison for File Copy Operation.

As exhibited on figure 6.2 and 6.3, the results were 13.3 times better for the

proposed solution, since it did not have to copy the file each time it was changed,

but only once, and could copy all the files on the folder in parallel.

32

Figure 6.3: Performance Improvement.

To measure how better the solutions were, Image 6.4 shows the plots from figure

6.1 and 6.2 on a log scale, demonstrating that, for metadata operations, the im-

provement is of about two orders of magnitude and, for file operations, one order of

magnitude, a significant improvement in any scenario.

33

Figure 6.4: Log Scale Performance Comparison

Another important factor for assess how good the results are is the impact on

the Primary Cluster performance and the Unix top terminal command was used for

measuring. Performance, here, is considered the CPU usage of an application. As

presented on figure 6.5, DistCp had an almost constant impact on the performance

of 5%. While running by just checking if new operations were applied or by only

transmitting metadata operation, the DRE solution would face less than 1% of

performance impact. On the other hand, facing the limitations of adopting DistCp

as the file transferring tool, file copying faces overhead and, for dealing with single

file copy requests, the impact is the same for both applications, what was expected

since both are using the same mechanism. During the second test, for the copy of

tens of files in parallel for the three MapReduce jobs, the Daemon has peaked a

7% CPU usage. Although the footprint was greater for DRE, it is important to

consider that one was copying only one file and the other was copying tens of files

from MapReduce jobs. After the experiments, it became clear that a bottleneck of

the proposed solution is the use of DistCp and developing a more efficient way of

34

transferring files is important in case of creating a production grade application.

Figure 6.5: Primary Cluster Performance Impact Comparison

35

Chapter 7

Conclusion and Future Work

This project consisted on developing a disaster recovery engine for Apache Hadoop

to be used as a free eventual consistency based alternative for the existing DR so-

lutions, pursuing a low latency, high operation throughput and minimal primary

cluster footprint application. After reviewing the extant work and using the ade-

quate data structures, a modularized design was put into practice, using concepts of

both algorithm design and software engineering to make the software scalable and

easy to be improved. Distributed systems principles, together with a deep under-

standing of the Hadoop environment, have contributed for a result much superior

than the naive approach, used by most companies and universities that do not want

or can not invest money on disaster recovery solutions.

The proposed design was tested on a controlled cluster, on two particular and re-

current situations, in order to assess the achieved improvements. The results have

shown that it is possible to have a significant gain by minimizing the number of

copies and dividing the work between multiple threads. This outcome confirms the

need for further studies on the problem and the possibility of choosing performance

over strong consistency, providing an alternative for users to better fill their needs.

As a first prototype, some improvements must still be made for it to become a

production level application. First and most important, the existing resources for

testing and comparison of results were limited and, although the outcome was very

good and overcame other free and popular applications, more testing should be done

36

to assure the solution is an alternative for the WANdisco software, the most widely

accepted DR engine so far. In addition, implementing a separate file transfer, in-

dependent of DistCp, would enhance the performance when copying single files and

minimize the footprint caused by overhead while copying a large number of files, tens

or hundreds. Testing the application on a real production environment, other than

on two simulated clusters, could reveal other limitation not found on an artificial

scenario. To make it a complete generic solution for Hadoop, adapting and testing

the Daemons to run on HA (Highly Availability) HDFS [15] configurations and on

Secure clusters [16] would allow more companies to use the service and, despite of

not being a requirement, creating a recovery mechanism for the Communicator, in-

stead of using RabbitMQ’s, would grant more independence and flexibility to change

the communicator module in case a better library or custom solution comes as a

better tool.

Even with all the work still to be done for the project to become a real industrial

product, a lot of progress was made towards an alternative for the existing DR so-

lutions. Furthermore, the result can be used as an alternative for situations where

sacrificing the cluster performance is worse than relying on eventual consistency.

37

References

[1] WALL, M. “Big Data: Are you ready for blast-off?” . http://www.bbc.com/

news/business-26383058. Acessado em setembro de 2014.

[2] APACHE SOFTWARE FOUNDATION. “Apache Hadoop”. . https://hadoop.

apache.org/, . Acessado em setembro de 2014.

[3] GHEMAWAT, S., GOBIOFF, H., LEUNG, S.-T. “The Google File System”. In:

Proceedings of the nineteenth ACM symposium on Operating systems prin-

ciples, pp. 29–43, NY, USA, 2003. Association for Computing Machinery

- ACM.

[4] DEAN, J., GHEMAWAT, S. “MapReduce: simplified data processing on large

clusters”, Communications of the ACM - 50th anniversary issue: 1958 -

2008, v. 51, n. 1, pp. 107–113, January 2008.

[5] LIU, H. H. Hadoop 2 Essentials: An End-to-End Approach. 1 ed. Dan-

ver,MA, CreateSpace Independent Publishing Platform, 2014. ISBN:

9781495496127.

[6] WAGMOB. Big Data and Hadoop. 1.5 ed. , CreateSpace Independent Publishing

Platform, 2014. ISBN: 9781500721275.

[7] APACHE SOFTWARE FOUNDATION. “HDFS Architecture”. .

http://hadoop.apache.org/docs/current/hadoop-project-dist/

hadoop-hdfs/HdfsDesign.html, . Acessado em setembro de 2014.

38

[8] APACHE SOFTWARE FOUNDATION. “DistCp Guide”. . http://hadoop.

apache.org/docs/r1.2.1/distcp.html, . Acessado em setembro de

2014.

[9] HORTONWORKS. “Apache Falcon - A framework for managing data processing

in Hadoop clusters”. . http://br.hortonworks.com/hadoop/falcon/.

Acessado em outubro de 2014.

[10] DIAN, F., WENJIE, J., HAIFENG, C., et al. “Zero loss HDFS data replication

for multiple datacenters”. . https://issues.apache.org/jira/browse/

HDFS-5442. Acessado em outubro de 2014.

[11] WANDISCO. “The Distributed Coordination Engine(DConE)”. .

http://www.wandisco.com/system/files/documentation/WANdisco_

DConE_White_Paper.pdf. Acessado em outubro de 2014.

[12] LAMPORT, L. The Part-Time Parliament. Relatorio tecnico, 1990.

[13] PIVOTAL SOFTWARE, INC. “RabbitMQ - Messaging that just works”. .

http://www.rabbitmq.com/. Acessado em setembro de 2014.

[14] OASIS. “AMQP - Advanced Message Queuing Protocol”. . https://www.amqp.

org/. Acessado em setembro de 2014.

[15] APACHE SOFTWARE FOUNDATION. “HDFS High Availabil-

ity”. . https://hadoop.apache.org/docs/r2.3.0/hadoop-yarn/

hadoop-yarn-site/HDFSHighAvailabilityWithNFS.html, . Acessado

em abril de 2015.

[16] APACHE SOFTWARE FOUNDATION. “Hadoop in Secure Mode”.

. http://hadoop.apache.org/docs/current/hadoop-project-dist/

hadoop-common/SecureMode.html, . Acessado em abril de 2015.

39