66
MC970/MO644 Programação Paralela na Nuvem usando Spark Hervé Yviquel, Guido Araújo [email protected]

spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

  • Upload
    others

  • View
    2

  • Download
    0

Embed Size (px)

Citation preview

Page 1: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

MC970/MO644ProgramaçãoParalela

naNuvemusandoSparkHervé Yviquel,GuidoAraú[email protected]

Page 2: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

2

Page 3: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

VivemosemumMundodeDados

3

Page 4: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

EnormesVolumesdeDados...

Fonte:domo.comDataNever Sleeps 3.0

3.7bilhõesdepessoas

emJaneirode20174

Page 5: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Masnãoésó"volume"

Fonte: usr.uvic.catTheImportanceofBigData

5

Page 6: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Oqueé“BigData”

« BigDataéqualquerdadoqueécaroparasegerenciaredoqualédifícil

extrairvalor »Prof.ThomasSiebel,DiretordoAMPLab,UC-Berkeley

6

Page 7: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Odatacentercomo Computador

7

Page 8: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

ComputaçãoemNuvem

Soluçãopara“TheRising of BigData”• Massivopoderdeprocessamento• Datacenter(50.000to 80.000servidores)• Clusterdecomputadores

• Podeserútilparaoutrosdomíniosdeaplicação• Aplicaçõescientíficas(HPC)• Aplicaçõesmobile(Mobilecloud offloading)• InternetdasCoisas(IoT)

Mascomoprogramararquiteturadistribuída??

8

Page 9: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Message Passing Interface

UtilizadoemHigh-PerformanceComputing• Programaçãodebaixonível

• Eficientemaiscomplicado• Exigeredescombaixalatência

• Nuvemsãoimprevisíveis(recursoscompartilhados)• Semtolerânciaafalhas

• Compartilharrecursosaumentafalhas9

Page 10: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Dividirparaconquistar!

Dadoscomopares(chave,valor)

m1 m2 m3

r1 r2

« Resultado »

Particione

Map

Combine

Reduce

Armazene

Map-Reduce [Google2004]

10

Page 11: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

ContarPalavrascomMap-Reduce

11

Page 12: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

SobreMap-Reduce

• Confienosistemadearquivodistribuído• Dividearquivosemgrandesblocosdedados(e.g.64MB)• Usaralocalidadededadosparaaceleraroprocessamentodistribuído

• Independentedaarquitetura• Massivamenteparalelizável

• Permitetolerânciaafalhas

12

Page 13: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Arquiteturamestreeescravo

1processomestre(JobTracker)responsávelporescalonaraexecuçãodetarefaspelosescravos,monitorá-lasere-executá-las emcasodefalha

Nprocessosescravos(TaskTracker)– 1porcadanóresponsávelporexecutarastarefas(map oureduce)designadaspelomestre.

13

Page 14: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

SistemadeArquivoDistribuído

14

Page 15: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

TolerânciaaFalhas

• Replicaçãodosdados(HDFS)• Podereconstruirdadoscorrompidos

• MensagensdeHeartbeat• Informaostatusdasmáquinas• Podereexecutartarefasexpiradas

15

Page 16: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

ApacheHadoop

FrameworkparacomputaçãodistribuídaHadoop Common

Corelibrary

Hadoop MapReduceHadoop Distributed FileSystemHadoop YARN

Job scheduling and clusterresource management

16

Page 17: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

BreveHistória(1)

2002– Creation of Nutch2003– GFSpaper by Google2004–MapReduce paper by Google2005– Nutch added support to GFSand MapReduce2006– Hadoop creation from Nutch project2008– ApacheHadoop top-level2009– Hadoop won Graysort Daytona competition

17

Page 18: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Map-Reduce nãoresolveTodososProblemas

ProcessamentodeconsultasBancodeDados

ProcessamentoiterativoDadosficamnamemoria

ProcessamentodefluxosNemsempreoarquivoéaformadaentrada

ProcessamentodegrafosProcessarestruturasirregulares

18

Page 19: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Spark

• Plataformadecomputaçãoemclusters.• Criadaparaserrápidaedepropósitogeral• Modelodeprogramaçãodealto-nível

• Oprocessamentoémulti-estágio• Representadocomografodirecionadoeacíclico(DAG)• Suporteprocessamentoiterativosedefluxos• Processamentoemmemoria(Ate100xmaisrápidoqueMap-Reduce)

19

Page 20: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

BreveHistória(2)

2002– Creation of Nutch2003– GFSpaper by Google2004–MapReduce paper by Google2005– Nutch added support to GFSand MapReduce2006– Hadoop creation from Nutch project2008– ApacheHadoop top-level2009– Hadoop won Graysort Daytona competition2009– Development of Spark at UC-Berkeley2010– Spark paper and open-source2013– Spark transferred to Apache2014– ApacheSpark top-level2014– Spark won Graysort Daytona competition2016– Spark version 2.0

20

Page 21: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

TendênciasparaMPI,Hadoop,eSpark

21

Page 22: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

PilhadeSoftwaredoSpark

Scala Java Python R

Spark SQLdados

estruturados

Mllibmachine learning

GraphXprocessamento

degrafos

Streamingtemporeal

Spark Core

YARN MesosSpark Scheduler

Local HDFS S3 NoSQL

Linguagem

Libraria

Núcleo

Gestão

Armazenamento ...

22

Page 23: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Scalavs Javavs Python

• Spark foiescritooriginalmenteemScala,quepermitesintaxedefunçãoconcisaeusointerativo• AAPIdaJavaadicionadaparaaplicativosautônomos• AAPIdoPythonfoiadicionadamaisrecentementecomumshell interativo

23

Page 24: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

SobreScala

LinguagemdealtonívelparaaJVM• Programaçãoorientadaaobjetosefuncional

Estaticamentetipada• ComparávelemvelocidadecomJava• Inferênciadetipo(nãoprecisaescrevertiposexplícitosemgeral)

InteroperabilidadecomJava• PodeusarqualquerclasseJava(herançade,etc.)• PodeserchamadoapartirdocódigoJava

24

Page 25: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

VisitaRápidadeScalaDeclararvariáveis:var x: Int = 7var x = 7 // type inferredval y = “hi” // read-only

Funções:def square(x: Int): Int = x*xdef square(x: Int): Int = {

x*x}def announce(text: String) = {

println(text)}

EquivalenteemJava:int x = 7;

final String y = "hi";

EquivalenteemJava:int square(int x) {

return x*x;}

void announce(String text) {System.out.println(text);

}

25

Page 26: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Funçoes emScala(Clausuras)

(x: Int) => x + 2 // versão completax => x + 2 // type inferred_ + 2 // argumento implícito x => { // corpo é um bloco de código

val numberToAdd = 2x + numberToAdd

}// função regular def addTwo(x: Int): Int = x + 2

26

Page 27: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

VisitaRápidadeScala(2)Processarcoleçõescomprogramaçãofuncionalval list = List(1, 2, 3)

list.foreach(x => println(x)) // prints 1, 2, 3

list.foreach(println) // mesmo

list.map(x => x + 2) // retorna nova List(3,4,5)

list.map(_ + 2) // mesmo

list.filter(x => x % 2 == 1) // retorna nova List(1, 3)

list.filter(_ % 2 == 1) // mesmo

list.reduce((x, y) => x + y) // => 6

list.reduce(_ + _) // mesmo

27

Page 28: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

FerramentasdeTrabalhocomSpark

• Investigaçãointerativa:spark shell

• Desenvolvimentodeaplicação:spark submit./bin/spark-submit --class SimpleApp --master localSimpleApp.jar

28

Page 29: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

ConceitosBásicos

Umaaplicaçãoconsistedeumprogramachamadodriver• odriver disparatrabalho(localounocluster)• odriver tomacontroledorecursodoclusteratravésdeumobjetodecontexto(SparkContext).• odriver descreveofluxo(DAG)deumaaplicação,compostoporcoleçõesdedadosdistribuídas(RDDs)eseusrelacionamentos(operações).• nomodointerativo,odriver éopróprioshell emexecução.

29

Page 30: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

EscreveumSpark Job

// Create a Scala Spark Context. val conf = new SparkConf().setAppName("wordCount") val sc = new SparkContext(conf)// Load our input data.val input = sc.textFile(inputFile) // Split it up into words. val words = input.flatMap(line => line.split(" "))// Transform into word and count.val counts = words.map(word => (word, 1))

.reduceByKey{case (x, y) => x + y} // Save the word count back out to a text file. counts.saveAsTextFile(outputFile)

SimpleWordCount Scalaexample

30

Page 31: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

AplicaçãoSpark

Conjuntosdeprocessosindependentes(“executor”)emnós(“worker”)doclustercoordenadospeloprogramaprincipal(“driver”)

31

Page 32: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

AplicaçãoSpark

Oobjeto“SparkContext”1. Conectacomogerenciadordocluster2. Adquireexecutoresdosnósnocluster3. Enviaocódigodoaplicativoparaosexecutores4. Enviatarefasaosexecutoresparaseremexecutados

32

Page 33: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

AplicaçãoAuto-Contida

import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._

object SimpleApp { def main(args: Array[String]) {val conf = new SparkConf()conf.setMaster("local").setAppName("WordCount") val sc = new SparkContext(conf) ...

}}

33

Page 34: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

CompilarsuaAplicaçãoSpark

UsandoScalaBuildTool• EscreversuaAplicaçãoSpark emScala• Escrevero“build.sbt”• Executar“sbt”napasta

// build.sbtname := "WordCount" version := "0.0.1" scalaVersion := "2.11.8" // Additional librarieslibraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "2.1.0" % "provided")

34

Page 35: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Coleção dedados em Spark

Resilient Distributed Dataset (RDD)• Umaabstraçãoparatrabalharcomgrandesconjuntosdedados(dataset)• Coleçãodistribuídadeelementos• Divideempartições• Podeconterqualquerobjeto

35

RDD

p1 i1 i2 i3pr ik in... ......

Page 36: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Criação deRDDs:arquivos

• Neste exemplo,afontededados externa é umarquivo.• oprefixo file://indica osistema dearquivos local.• hdfs:// é outraopção comum,seoarquivo estivernosistema dearquivos doHadoop (HDFS).

36

val lines = sc.textFile(file:///path/to/README.md")

Page 37: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Criação deRDDs:paralelizar

OSparkContext « sc »é capaz deparalelizar/distribuir coleções locais ao programadriver

37

val frases = List("pandas", "i like pandas"))val lines = sc.parallelize(frases)

Page 38: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

TarefaemSpark

TodaacomputaçãoaconteceemfunçãodeRDDs1. CriarnovosRDDs2. TransformarRDDs existentes3. ChamaroperaçãosobreRDDs paracalcular

resultados

38

Page 39: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Operações deRDD

Qualquer operação sobreum RDDseenquadra emuma das categorias• Transformação• Criaum novoRDDapartirdeoutro• Avaliação é preguiçosa (lazy)

• Ação• Retornaresultadoparaodriver.• Avaliaçãoéimediata.

39

Page 40: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

OperaçõesdeRDD

• map(f) - transformação• Aplicaafunçãof()acadaelementox doRDD,• gerandoumRDDcontendoosvaloresdef(x)

• reduce(f) - ação• Aplicaafunçãof()atodososelementosdoRDD"deumavez".• Porexemplo,(_+_)significa"sometodososelem"• Afunçãotemqueserassociativa.

40

Page 41: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Transformação

RDD ={1,2,3,4}• RDD.map(x =>x*x )

{1,4,9,16}

• RDD.filter(x =>x!=1){2,3,4}

41

Page 42: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Pseudo-set Operation (1)

RDD1 ={coffee,coffee,panda,monkey,tea}RDD2 ={coffee,monkey,kitty}• RDD1.distinct()

{coffee,panda,monkey,tea}• RDD1.union(RDD2)

{coffee,coffee,coffee,panda,monkey,tea,kitty}• RDD1.intersection(RDD2)

{coffee,monkey}• RDD1.substract(RDD2)

{panda,tea}

42

Page 43: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Pseudo-set Operation (2)

RDD1 ={User(1),User(2),User(3)}RDD2 ={Venue(“BarDoZe”),Venue(“Bardana”),

Venue (“Bronco”)}• RDD1.cartesian(RDD2)

{(User(1),Venue(“BarDoZe”)),(User(1),Venue(“Bardana)),(User(1),Venue (“Bronco”)),

(User(2),Venue(“BarDoZe”)),(User(2),Venue(“Bardana”)),(User(2),Venue (“Bronco”)),

(User(3),Venue(“BarDoZe”)),(User(3),Venue(“Bardana”)),(User(3),Venue (“Bronco”))}

43

Page 44: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

MaisAçõesRDD ={1,2,3,3}• RDD.reduce( _+_)

9

• RDD.collect(){1,2,3,3}

• RDD.count()4

• RDD.first()1

• RDD.take(2){1,2}

• foreach(func)Nothing

• saveAsTextFile(path),saveAsSequenceFile(path),saveAsObjectFile(path)

44

Page 45: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

WordCount em Sparkobject SimpleApp {

def main(args: Array[String]) {// Create a Scala Spark Context. val conf = new SparkConf().setAppName("wordCount") val sc = new SparkContext(conf)val lines = sc.textFile(inputFile)// cada item do RDD é uma linha do arquivo (String)val words = lines.flatMap(line => line.split (" "))// cada item do RDD é uma palavra do arquivoval intermData = words.map(word => (word,1))// cada item do arquivo é um par (palavra,1)val wordCount = intermData.reduceByKey(_ + _)// cada item do RDD contém ocorrência final de cada palavraval 5contagens = wordCount.take(5)// 5 resultados no programa driver

}}

45

Page 46: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

WordCount em Map-Reduce (1)public class WordCount {

public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

context.write(word, one);

} }

}

46

Page 47: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

WordCount em Map-Reduce (2)public static class IntSumReducer extends

Reducer<Text,IntWritable,Text,IntWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

result.set(sum);

context.write(key, result);

}

}

47

Page 48: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

WordCount em Map-Reduce (3)

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf, "word count");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

48

Page 49: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Estimando Pi(denovo)

val pi =

println("Pi is roughly " + pi)49

Page 50: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Estimando Pi(denovo)

val pi = 4 * sc.parallelize(0 to N).map{ i =>(if(i%2==0) 1 else 1).toDouble/(2*i+1)

}.reduce( _+_ )println("Pi is roughly " + pi)

50

Page 51: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Mecanismo dePersistência

RDDs sãoavaliadospreguiçosamenteval cachedRDD = anyRDD.persist(<nível>)• <nível>indicaseocaching deveserfeitoemmemória,disco,serializadooumisturas

val cachedRDD = anyRDD.cache()• nívelpadrão• cache()=persist(StorageLevel.MEMORY_ONLY)

51

Page 52: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

ExemplodePersistência

GerarumRDDdenúmerosaleatórios(pseudo)

val recompRDD = sc.parallelize(1 to 1000000).map(_ => Math.random())for(i <- 1 to 10) println(recompRDD.reduce(_+_))

Vamos testar no spark-shell !!

52

Page 53: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Persistência (níveis)

nível consumoespaço

consumoCPU

emmemória

em disco

MEMORY_ONLY muito pouco tudo nada

MEMORY_ONLY_SER pouco muito tudo nada

MEMORY_AND_DISK muito médio parte parte

MEMORY_AND_DISK_SER pouco muito parte parte

DISK_ONLY pouco muito nada tudo

53

val cachedRDD = anyRDD.persist(<nível>)

Page 54: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

TransformaçãodePares

rdd ={(1, 2), (3, 4), (3, 6)} • rdd.groupByKey()

{(1,[2]),(3,[4,6])}• rdd.reduceByKey(_+_)

{(1,2),(3,10)}• rdd.mapValues(x=>x+1)

{(1,3),(3,5),(3,7)}• rdd.flatMapValues(x=>x.to(5))

{(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}• rdd.keys()

{1,3 ,3}• rdd.values()

{2,4,6}• rdd.sortByKey()

{(1,2),(3,4),(3,6)}

54

Page 55: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

TransformaçãodePares

rdd ={(1, 2), (3, 4), (3, 6)}other ={(3,9)}• rdd.groupByKey(other)

{1,2}• rdd.join(other)

{(3, (4, 9)), (3, (6, 9))}• rdd.cogroup(other)

{(1,([2],[])), (3, ([4, 6],[9]))}

55

Page 56: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Estudodecaso:pagerank

Exemploclássicoquemostra2pontosfortesdeSpark:cachingecomputaçãoiterativa• Propósito:criarumranqueamentodeimportânciadenósemumgrafo.• Ondeéusado?• GoogleutilizaoPageRank(propostoporLarryPage)

• SabeaordemdelinksqueaparecememumabuscaquevocêfaznoGoogle?

56

Page 57: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Premissa doPageRank

Aimportanciadeumapáginaédeterminadapelaimportanciadaspáginasqueapontamparaela

57

Page 58: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

DescriçãodoAlgoritmo

• Todasiniciamcomimportância1.0,ouseja,100%.• Acadaiteraçãotodapáginadistribuisuaimportânciaigualmenteparaosvizinhos.• Agoracadapáginatemumanovaimportância,queéasomadosvaloresrecebidos.

58

1 2

4 3

1.0 1.0

1.0 1.0

0.3

0.30.3

1.0

0.50.5

1.01.5 0.8

1.30.3

Page 59: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

ImplementaçãodoPageRankobject PageRank { def main(args: Array[String]) { val links = // RDD de pares (página, lista de adjacência) links.cache() var rankings = // RDD de pares (página, 1.0) for (i <- 1 to ITERATIONS) { val contribs = links.join (rankings).flatMap { case (url, (adjList, rank)) => adjList.map (dest => (dest, rank / adjList.size)) } rankings = contribs.reduceByKey (_ + _) } }}

• Parteiterativa• oforprincipal,queocorrenodriver

• Parteparalela• astransformaçõesacadaiteração

59

Page 60: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

EntenderClausura

Comportamentoindefinido• LocalversusCluster

var counter = 0 var rdd = sc.parallelize(data) // Wrong: Don't do this!! rdd.foreach(x => counter += x) println("Counter value: " + counter)

60

Page 61: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

MelhorarDesempenho

Particionamentointeligente• Distribuiçãodaschaves• Balanceamentodacargadetrabalho

Usoconscientedememória• Tamanhoderesultadosretornadosparaodriveratravésdeações• Capacidadeversusdemandaparacaching

Variáveiscompartilhadas• Broadcast• Acumuladores

61

Page 62: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

VariáveisCompartilhadas(1)

Variáveisdetransmissão(broadcast)• Únicacopiaemcadanó• Transmissãoeficiente(BitTorrent)• Paragrandeconjuntodedadosdeentrada

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.value

62

Page 63: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

VariáveisCompartilhadas(2)

Acumuladores• “adicionado”aatravésdeumaoperaçãoassociativo,eficientementesuportadosemparalelo• Porexemploparaimplementarcontador

scala> val accum = sc.accumulator(0) scala> sc.parallelize(Array(1, 2, 3, 4))

.foreach(x => accum += x)scala> accum.valueres2: Int = 10

63

Page 64: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

BibliotecasEspecíficas

• MLlib• Estatística:testesdehipóteses,amostragem• Classificação/Regressão/Agrupamento• Extraçãodecaracterísticas• Mineraçãodepadrõesfrequentes,etc.

• SparkStreaming• Processamentodedadosemtempo(quase)real

• SparkSQL• DataFramesparadadosestruturados.

• GraphX• Abstraçõesparagrafosetrocademensagens

Tudoissoimplementadosobreosmesmosconceitos!

64

Page 65: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Conclusão

• ModeloMap-Reduce adaptadoparanuvem• Grandeescalabilidade• Tolerânciaafalhas

• Spark éumageneralizaçãodoMap-Reduce• “Better,Stronger,Faster”• Programaçãoalto-nívelemaisflexível• Desenvolvimentomuitoativo

65

Page 66: spark - Unicampoxent2.ic.unicamp.br/.../files/spark_0.pdfComputação em Nuvem Solução para “The RisingofBig Data” •Massivo poder de processamento •Datacenter (50.000 to80.000

Bibliografia

• “LearningSpark - Lightning-Fast BigDataAnalysis”deH.Karau,A.Konwinski,P.Wendell,eM.Zaharia

• DocumentaçãodoApacheSpark (latest)• Palestra doViniciusDias (UFMG)• ScalaCrashCourse (Databricks)

66