60
LEONARDO BORGES SENIOR CLOJURE ENGINEER • @LEONARDO_BORGES O futuro chegou: Programação concorrente com futures

Programação concorrente com futures - QCon SP · Programação concorrente com futures. SOBRE Um pouco sobre mim • Senior Clojure Engineer na Atlassian, Sydney ... Futures em

  • Upload
    vancong

  • View
    216

  • Download
    0

Embed Size (px)

Citation preview

LEONARDO BORGES • SENIOR CLOJURE ENGINEER • @LEONARDO_BORGES

O futuro chegou:Programação concorrente com futures

S O B R E

Um pouco sobre mim• Senior Clojure Engineer na

Atlassian, Sydney• Fundador do Grupo de Usuários

Clojure de Sydney• Autor de Clojure Reactive

Programming - http://bit.ly/cljRp* QCon discount code: CRP10

C O N C O R R Ê N C I A

F U T U R E S

O quê?

A B S T R A Ç Ã O

C O M P O S I Ç Ã O

Futures em Java <= 1.7

F U T U R E S E M J AVA < = 1 . 7

static ExecutorService es = Executors.newCachedThreadPool();

static Integer doubler(Integer n) { return 2 * n; }

static Future<Integer> serviceA(Integer n) { return es.submit(() -> { Thread.sleep(1000); return n; }); }

static Future<Integer> serviceB(Integer n) { return es.submit(() -> { Thread.sleep(1500); return Double.valueOf(Math.pow(n, 2)).intValue(); }); }

static Future<Integer> serviceC(Integer n) { return es.submit(() -> { Thread.sleep(2000); return Double.valueOf(Math.pow(n, 3)).intValue(); }); }

F U T U R E S E M J AVA < = 1 . 7

Integer doubled = doubler(serviceA(10).get());System.out.println("Couldn't do anything else while the line above was being executed...");System.out.println("Result: " + serviceB(doubled).get() + " - " + serviceC(doubled).get());

Bloqueia a thread

Bloqueia a thread Bloqueia a thread

• Desperdício de processamento

Problemas

• Desperdício de processamento• Baixo nível de composição

Problemas

E no Java 8?

F U T U R E S N O J AVA 8

final CompletableFuture<Integer> doubled = serviceA(10).thenApply(CompletableFutures::doubler);final CompletableFuture<Integer> resultB = doubled.thenCompose(CompletableFutures::serviceB);final CompletableFuture<Integer> resultC = doubled.thenCompose(CompletableFutures::serviceC);CompletableFuture<Void> allFutures = CompletableFuture.allOf(resultB, resultC);

allFutures.whenComplete((v, ex) -> { try { System.out.println("Result: " + resultB.get() + " - " + resultC.get()); } catch (Exception e) {}});

System.out.println("Doing other important things...");

F U T U R E S N O J AVA 8

final CompletableFuture<Integer> doubled = serviceA(10).thenApply(CompletableFutures::doubler);final CompletableFuture<Integer> resultB = doubled.thenCompose(CompletableFutures::serviceB);final CompletableFuture<Integer> resultC = doubled.thenCompose(CompletableFutures::serviceC);CompletableFuture<Void> allFutures = CompletableFuture.allOf(resultB, resultC);

allFutures.whenComplete((v, ex) -> { try { System.out.println("Result: " + resultB.get() + " - " + resultC.get()); } catch (Exception e) {}});

System.out.println("Doing other important things...");

Não bloqueia a thread

Esses combinadores são

familiares?

S T R E A M S N O J AVA 8

List<Integer> ns = Arrays.asList(1, 2, 3, 4);

Function<Integer, Integer> doubler = (i) -> i * 2;System.out.println(ns.stream().map(doubler).collect(Collectors.toList()));// [2, 4, 6, 8]

Function<Integer, Stream<? extends Integer>> toRange = (i) -> IntStream.range(0, i).boxed();Stream<Integer> combined = ns.stream() .map(doubler) .flatMap(toRange);System.out.println(combined.collect(Collectors.toList()));// [0, 1, 0, 1, 2, 3, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 4, 5, 6, 7]

Streams vs Futures

Stream<R> map(Function<? super T, ? extends R> mapper) {…}

Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {…}

CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {…}

CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {…}

Streams vs Futures

Stream<R> map(Function<? super T, ? extends R> mapper) {…}

Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {…}

CompletableFuture<U> thenApply (Function<? super T,? extends U> fn) {…}

CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {…}

E se quisermos escrever funções

que funcionem com Streams e Futures?

S E Q U E N C I N G F U T U R E S

CompletableFuture<Collection<Integer>> result = sequence(serviceA(10), serviceB(10), serviceC(10));// java.util.concurrent.CompletableFuture[10, 100, 1000]

S E Q U E N C I N G F U T U R E S

static <A> CompletableFuture<Collection<A>> sequence(CompletableFuture<A>... cfs) { return Arrays.asList(cfs).stream().reduce( CompletableFuture.completedFuture(new ArrayList<>()), (acc, future) -> acc.thenCompose((xs) -> future.thenApply((x) -> { xs.add(x); return xs; })), (a, b) -> a.thenCompose((xs) -> b.thenApply((ys) -> { xs.addAll(ys); return xs; }))); }

S E Q U E N C I N G F U T U R E S

static <A> CompletableFuture<Collection<A>> sequence(CompletableFuture<A>... cfs) { return Arrays.asList(cfs).stream().reduce( CompletableFuture.completedFuture(new ArrayList<>()), (acc, future) -> acc.thenCompose((xs) -> future.thenApply((x) -> { xs.add(x); return xs; })), (a, b) -> a.thenCompose((xs) -> b.thenApply((ys) -> { xs.addAll(ys); return xs; }))); }

S E Q U E N C I N G F U T U R E S

static <A> CompletableFuture<Collection<A>> sequence(CompletableFuture<A>... cfs) { return Arrays.asList(cfs).stream().reduce( CompletableFuture.completedFuture(new ArrayList<>()), (acc, future) -> acc.thenCompose((xs) -> future.thenApply((x) -> { xs.add(x); return xs; })), (a, b) -> a.thenCompose((xs) -> b.thenApply((ys) -> { xs.addAll(ys); return xs; }))); }

S E Q U E N C I N G F U T U R E S

static <A> CompletableFuture<Collection<A>> sequence(CompletableFuture<A>... cfs) { return Arrays.asList(cfs).stream().reduce( CompletableFuture.completedFuture(new ArrayList<>()), (acc, future) -> acc.thenCompose((xs) -> future.thenApply((x) -> { xs.add(x); return xs; })), (a, b) -> a.thenCompose((xs) -> b.thenApply((ys) -> { xs.addAll(ys); return xs; }))); }

S E Q U E N C I N G F U T U R E S

static <A> CompletableFuture<Collection<A>> sequence(CompletableFuture<A>... cfs) { return Arrays.asList(cfs).stream().reduce( CompletableFuture.completedFuture(new ArrayList<>()), (acc, future) -> acc.thenCompose((xs) -> future.thenApply((x) -> { xs.add(x); return xs; })), (a, b) -> a.thenCompose((xs) -> b.thenApply((ys) -> { xs.addAll(ys); return xs; }))); }

S E Q U E N C I N G S T R E A M S

Stream<Integer> s1 = Arrays.asList(1).stream();Stream<Integer> s2 = Arrays.asList(2).stream();Stream<Integer> s3 = Arrays.asList(3).stream();

sequenceS(s1, s2, s3)

// [[1, 2, 3]]

S E Q U E N C I N G S T R E A M S

static <A> Stream<Stream<A>> sequenceS(Stream<A>... cfs) { return Arrays.asList(cfs).stream().reduce( Stream.of(Stream.empty()), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, Stream.of(x)))), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, x))));}

S E Q U E N C I N G S T R E A M S

static <A> Stream<Stream<A>> sequenceS(Stream<A>... cfs) { return Arrays.asList(cfs).stream().reduce( Stream.of(Stream.empty()), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, Stream.of(x)))), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, x))));}

S E Q U E N C I N G S T R E A M S

static <A> Stream<Stream<A>> sequenceS(Stream<A>... cfs) { return Arrays.asList(cfs).stream().reduce( Stream.of(Stream.empty()), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, Stream.of(x)))), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, x))));}

S E Q U E N C I N G S T R E A M S

static <A> Stream<Stream<A>> sequenceS(Stream<A>... cfs) { return Arrays.asList(cfs).stream().reduce( Stream.of(Stream.empty()), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, Stream.of(x)))), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, x))));}

Perceberam alguma semelhança?

F U T U R E S V S S T R E A M S

static <A> Stream<Stream<A>> sequenceS(Stream<A>... cfs) { return Arrays.asList(cfs).stream().reduce( Stream.of(Stream.empty()), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, Stream.of(x)))), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, x))));}

static <A> CompletableFuture<Collection<A>> sequence(CompletableFuture<A>... cfs) { return Arrays.asList(cfs).stream().reduce( CompletableFuture.completedFuture(new ArrayList<>()), (acc, future) -> acc.thenCompose((xs) -> future.thenApply((x) -> { xs.add(x); return xs; })), (a, b) -> a.thenCompose((xs) -> b.thenApply((ys) -> { xs.addAll(ys); return xs; }))); }

F U T U R E S V S S T R E A M S

static <A> Stream<Stream<A>> sequenceS(Stream<A>... cfs) { return Arrays.asList(cfs).stream().reduce( Stream.of(Stream.empty()), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, Stream.of(x)))), (acc, coll) -> acc.flatMap((xs) -> coll.map((x) -> Stream.concat(xs, x))));}

static <A> CompletableFuture<Collection<A>> sequence(CompletableFuture<A>... cfs) { return Arrays.asList(cfs).stream().reduce( CompletableFuture.completedFuture(new ArrayList<>()), (acc, future) -> acc.thenCompose((xs) -> future.thenApply((x) -> { xs.add(x); return xs; })), (a, b) -> a.thenCompose((xs) -> b.thenApply((ys) -> { xs.addAll(ys); return xs; }))); }

FlatMappable

F L AT M A P PA B L E

<M extends FlatMappable, A> M<List<A>> sequence(M<A>... ma) { … }

• Java não suporta tipos de alta espécie (higher kinded types)

• Tipos de alta espécie são indispensáveis ao se implementar tais abstrações

Chegando no limite do sistema de tipos

Colocando nome nos bois

FlatMappable se chama Monad

trait Monad[F[_]] { def point[A](a: => A): F[A] def bind[A, B](a: F[A])(f: A => F[B]): F[B] def map[A, B](a: F[A])(f: A => B): F[B] = bind(a)(b => point(f(b)))}

FlatMappable se chama Monad

trait Monad[F[_]] { def point[A](a: => A): F[A] def bind[A, B](a: F[A])(f: A => F[B]): F[B] def map[A, B](a: F[A])(f: A => B): F[B] = bind(a)(b => point(f(b)))}

Tipos de alta espécie em ação

M O N A D S E M S C A L A

O Monad de Futures

implicit def FutureMonad: Monad[Future] = new Monad[Future] { def point[A](a: => A) = Future.successful(a)

def bind[A, B](a: Future[A])(f: A => Future[B]): Future[B] = a flatMap f }

M O N A D S E M S C A L A

O Monad de Listas

implicit def ListMonad: Monad[List] = new Monad[List] { def point[A](a: => A) = List(a) def bind[A, B](a: List[A])(f: A => List[B]): List[B] = a flatMap f }

M O N A D S E M S C A L A

Implementando sequence

def sequence[M[_] : Monad, A](ma: List[M[A]]): M[List[A]] = { ma.foldLeft(Monad[M].point(List(): List[A]))((acc, m) => acc.flatMap((xs) => m.map((x) => xs :+ x)) ) }

Being abstract is something profoundly different from being vague … The purpose of abstraction is not to be vague, but to create a new semantic level in which one can be absolutely precise. E D S G E R W. D I J K S T R A ”

M O N A D S E M S C A L A

Sequencing

val resultF: Future[List[Integer]] = sequence(List(serviceA(10), serviceB(10), serviceC(10)))println(Await.result(resultF, Duration(2, "seconds")))// List(10, 100, 1000)

val resultL: List[List[Int]] = sequence(List(List(1,2,3), List(4,5,6), List(7,8,9)))println(resultL)// List(List(1, 4, 7), List(2, 4, 7), List(3, 4, 7), List(1, 5, 7), ...)

Demais! O quê mais podemos fazer??

Folding

F O L D I N G

List(2, 3, 4).reduce(_+_)//9

F O L D I N G

List(2, 3, 4).reduce(_+_)//9

val intFutures = List(Future.successful(1), Future.successful(2), Future.successful(3))val result: Future[Int] = sequence(intFurures).map((x) => x.reduce(_ + _))//…Future[9]

Existe algo em comum?

Introduzindo Foldable

trait Foldable[F[_]] { self => … def fold[M: Monoid](t: F[M]): M = ???}

Introduzindo Monoids

trait Monoid[F] { self => def zero: F def append(f1: F, f2: => F): F}

Introduzindo Monoids: Ints

implicit def intMonoid: Monoid[Int] = new Monoid[Int] { def zero: Int = 0 def append(f1: Int, f2: => Int): Int = f1 + f2}

Introduzindo Monoids: Ints

implicit def intMonoid: Monoid[Int] = new Monoid[Int] { def zero: Int = 0 def append(f1: Int, f2: => Int): Int = f1 + f2}

Foldable[List].fold(List(2, 3, 4)))//9

Introduzindo Monoids: Futures

Introduzindo Monoids: Futures

implicit def futureFreeMonoid[A] = new Monoid[Future[List[A]]] { def zero: Future[List[A]] = Future.successful(List())

def append(f1: Future[List[A]], f2: => Future[List[A]]) = for { a1 <- f1 a2 <- f2 } yield a1 ++ a2}

Introduzindo Monoids: Futures

implicit def futureFreeMonoid[A] = new Monoid[Future[List[A]]] { def zero: Future[List[A]] = Future.successful(List())

def append(f1: Future[List[A]], f2: => Future[List[A]]) = for { a1 <- f1 a2 <- f2 } yield a1 ++ a2}

Foldable[List].fold(List(Future.successful(2), Future.successful(3), Future.successful(4)))

//…Future[9]

Monad, Foldable e Monoid são apenas

o começo

Em Scala, muitas delas já foram

implementadas em Scalaz

A Teoria das Categorias pode ter um impacto grande

na criação de bibliotecas

Being abstract is something profoundly different from being vague … The purpose of abstraction is not to be vague, but to create a new semantic level in which one can be absolutely precise. E D S G E R W. D I J K S T R A ”

Referências

• Clojure Reactive Programming - http://bit.ly/cljRp• Java 8 CompletableFuture - http://bit.ly/j8Future• Java 8 Streams - http://bit.ly/j8stream• Category Theory - http://amzn.to/1NfL08U• Free Monoids - http://en.wikipedia.org/wiki/Free_monoid• Scalaz - https://github.com/scalaz/scalaz• Fluokitten (Clojure) - https://github.com/uncomplicate/fluokitten

Obrigado!

LEONARDO BORGES • SENIOR CLOJURE DEVELOPER • @LEONARDO_BORGES

Q&A

We are hiring!

LEONARDO BORGES • SENIOR CLOJURE DEVELOPER • @LEONARDO_BORGES