1/42

Using Microservices efficiently:
Asynchronicity & Generators

Nils Adermann

Composer Co-Author
  http://getcomposer.org

phpBB Development Lead
 http://www.phpbb.com

CTO of Forumatic phpBB Hosting
 https://www.forumatic.com

Contractor at Imagine Easy
  http://www.imagineeasy.com

Web Application Architectures

  • Traditional static page per Resource
  • Single Page
    Resources loaded via AJAX
  • Page per Resource with AJAX for dynamic changes

Microservices

  • "SOA without the commercialization and perceived baggage of WS* and ESB"
  • Individual service code simpler
  • Deployments independent
    » Continuous Deployment
  • Teams can work in parallel
  • Explicit service interfaces
    » Prevent tight-coupling » Require coordination
  • BUT: communication overhead

Monolithic Application

01class CitationController
02{
03    public function searchAction($term)
04    {
05        return [
06            'projects' =>
07                $this->user->getProjects(),
08            'recommendations' =>
09                $this->recommender->suggestFor($this->project),
10            'results' =>
11                $this->citationRepository->search($term),
12        ];
13    }
14}
1class CitationRepository
2{
3    function search($term)
4    {
5        ...
6        // comes down to: mysqli_query
7    }
8}

Accessing external services

Accessing external services

  1. write/send syscalls
  2. Data gets written onto shared memory/filesystem/network
  3. read/recv syscalls
  4. Process "sleeps"/waits until data available: synchronous or blocked I/O

Accessing external services

Potential Problems

Traditional solution

» Waiting reduced to acceptable amount

Recap: Monolithic Application

01class CitationController
02{
03    public function searchAction($term)
04    {
05        $this->render([
06            'projects' =>
07                $this->user->getProjects(),
08            'recommendations' =>
09                $this->recommender->suggestFor($this->project),
10            'results' =>
11                $this->citationRepository->search($term),
12        ]);
13    }
14}
1class CitationRepository
2{
3    function search($term)
4    {
5        ...
6        // comes down to: mysqli_query
7    }
8}

Naive Microservices

01class CitationController
02{
03    public function searchAction($term)
04    {
05        return [
06            'projects' =>
07                $this->projectService->getAll($user->getId()),
08            'recommendations' =>
09                $this->recommender->suggestFor($this->project),
10            'results' =>
11                $this->citationService->search($term),
12        ];
13    }
14}
1class CitationService
2{
3    public function search($term)
4    {
5        ...
6        return $httpClient->get('/search?term='.$term)->json();
7    }
8}

Search Microservice (Database)

1$app->get('/search', function ($app, $request) {
2    return $app['citations.repository']->search(
3        $request->get('term')
4    );
5});

Search Microservice (APIs)

01$app->get('/search', function ($app, $request) {
02    $term = $request->get('term');
03 
04    $results = $app['fooapi.client']
05        ->get('/find?keywords='.$term)->json();
06    $results += $app['barapi.client']
07        ->get('/query?search='.$term)->json();
08 
09    usort($results, function ($a, $b) {...});
10 
11    return $results;
12});

Result

Let's work while we wait

Asynchronicity

Asynchronicity

Asynchronicity

  1. write/send syscalls
  2. Data gets written onto shared memory/filesystem/network
  3. Continue with other work
    » Process ready while data unavailable: asynchronous or non-blocking I/O
  4. Use poll/select/kqueue/libevent/libev to wait once result required

cURL: My slow server

1<?php
2  
3sleep(2);
4  
5echo "Hello World!\n";

cURL

01$ch1 = curl_init();
02  
03curl_setopt($ch1, CURLOPT_URL, "http://localhost/~naderman/slow.php");
04curl_setopt($ch1, CURLOPT_HEADER, 0);
05curl_setopt($ch1, CURLOPT_RETURNTRANSFER, true);
06  
07$mh = curl_multi_init();
08curl_multi_add_handle($mh,$ch1);
09  
10$active = null;
11do {
12    $mrc = curl_multi_exec($mh, $active);
13} while ($active &&
14    ($mrc == CURLM_CALL_MULTI_PERFORM || $mrc == CURLM_OK));
15 
16// Simulated independent workload
17sleep(1);
18  
19curl_multi_remove_handle($mh, $ch1);
20curl_multi_close($mh);

cURL: Busy waiting (DON'T!)

01$ch1 = curl_init();
02  
03curl_setopt($ch1, CURLOPT_URL, "http://localhost/~naderman/slow.php");
04curl_setopt($ch1, CURLOPT_HEADER, 0);
05curl_setopt($ch1, CURLOPT_RETURNTRANSFER, true);
06  
07$mh = curl_multi_init();
08curl_multi_add_handle($mh,$ch1);
09  
10$active = null;
11do {
12    $mrc = curl_multi_exec($mh, $active);
13} while ($active &&
14    ($mrc == CURLM_CALL_MULTI_PERFORM || $mrc == CURLM_OK));
15 
16// Simulated independent workload
17sleep(1);
18  
19curl_multi_remove_handle($mh, $ch1);
20curl_multi_close($mh);

cURL: Synchronous HTTP request

01$ch1 = curl_init();
02  
03curl_setopt($ch1, CURLOPT_URL, "http://localhost/~naderman/slow.php");
04curl_setopt($ch1, CURLOPT_HEADER, 0);
05curl_setopt($ch1, CURLOPT_RETURNTRANSFER, true);
06  
07$mh = curl_multi_init();
08curl_multi_add_handle($mh,$ch1);
09  
10$active = null;
11do {
12    $mrc = curl_multi_exec($mh, $active);
13} while ($mrc == CURLM_CALL_MULTI_PERFORM);
14  
15while ($active && $mrc == CURLM_OK) {
16    if (curl_multi_select($mh) != -1) {
17        do {
18            $mrc = curl_multi_exec($mh, $active);
19        } while ($mrc == CURLM_CALL_MULTI_PERFORM);
20    }
21}
22  
23// Simulated independent workload
24sleep(1);
25  
26curl_multi_remove_handle($mh, $ch1);
27curl_multi_close($mh);

cURL: Asynchronous HTTP request

01$ch1 = curl_init();
02  
03curl_setopt($ch1, CURLOPT_URL, "http://localhost/~naderman/slow.php");
04curl_setopt($ch1, CURLOPT_HEADER, 0);
05curl_setopt($ch1, CURLOPT_RETURNTRANSFER, true);
06  
07$mh = curl_multi_init();
08curl_multi_add_handle($mh,$ch1);
09  
10$active = null;
11do {
12    $mrc = curl_multi_exec($mh, $active);
13} while ($mrc == CURLM_CALL_MULTI_PERFORM);
14  
15// Simulated independent workload
16sleep(1);
17  
18while ($active && $mrc == CURLM_OK) {
19    if (curl_multi_select($mh) != -1) {
20        do {
21            $mrc = curl_multi_exec($mh, $active);
22        } while ($mrc == CURLM_CALL_MULTI_PERFORM);
23    }
24}
25  
26curl_multi_remove_handle($mh, $ch1);
27curl_multi_close($mh);

cURL: Comparison synchronous vs. asynchronous

01naderman@Montsoreau:~/tmp$ time php -f client-parallel.php
02  
03real    0m2.029s
04user    0m0.012s
05sys     0m0.012s
06naderman@Montsoreau:~/tmp$ time php -f client-linear.php
07  
08real    0m3.029s
09user    0m0.012s
10sys     0m0.012s

Futures

Guzzle5 / RingPHP: Futures

Explicit dereferencing

1$response = $client->get(
2    'http://httpbin.org',
3    ['future' => true]);
4 
5// perform other work here
6 
7$response->wait();
8echo $response->getStatusCode(); // 200

Implicit dereferencing

1$response = $client->get(
2    'http://httpbin.org',
3    ['future' => true]);
4 
5// perform other work here
6 
7echo $response->getStatusCode(); // 200

Promises

Promises

https://github.com/reactphp/promise

01function getAwesomeResultPromise()
02{
03    $deferred = new React\Promise\Deferred();
04     
05    computeAwesomeResultAsynchronously(
06        function ($error, $result) use ($deferred) {
07            if ($error) {
08                $deferred->reject($error);
09            } else {
10                $deferred->resolve($result);
11            }
12        });
13    return $deferred->promise();
14}
15 
16getAwesomeResultPromise()
17    ->then(
18        function ($value) {
19            // Deferred resolved, do something with $value
20        },
21        function ($reason) {
22            // Deferred rejected, do something with $reason
23        },
24        function ($update) {
25            // Progress triggered, do sth with $update
26        }
27    );

Guzzle5 / RingPHP: Promises

01$response = $client->get(
02    'http://httpbin.org',
03    ['future' => true]);
04 
05// Use the response asynchronously
06$response->then(function ($response) {
07    echo $response->getStatusCode(); // 200
08});
09 
10// define other work here

React

http://reactphp.org     http://github.com/reactphp

Generators: 0 to 200

01function range()
02{
03    for ($i = 0; $i <= 100; $i++) {
04        yield $i => $i*2;
05    }
06}
07 
08foreach (range() as $i => $v) {
09    echo $v;
10}
11// 0 2 4 6 8 10 ... 200

processing http response with generators

1function filterBadUrls($urlBlacklist, $items)
2{
3    foreach ($items as $key => $item) {
4        if (!isset($urlBlacklist[$item['url']])) {
5            yield $key => $item;
6        }
7    }
8}
1function statsdCount()
2{
3    foreach ($items as $item) {
4        \StatsD::increment('elements-processed');
5        yield $item;
6    }
7}

processing http response with generators

01function bulkify()
02{
03    $bulk = array();
04    foreach ($items as $item) {
05        $bulk[] = $item;
06 
07        if (count($bulk) == 100) {
08            yield $bulk;
09            $bulk = array();
10        }
11    }
12 
13    if (!empty($bulk)) {
14        yield $bulk;
15    }
16}
17 
18foreach (bulkify($results) as $documents) {
19    $elasticaType->addDocuments($documents);
20}

nikic/iter

https://github.com/nikic/iter

Iteration primitives implemented using generators

Accept all iterables: array, traversable, iterator, aggregate

1Iterator map(callable $function, iterable $iterable)           
2Iterator filter(callable $predicate, iterable $iterable)
3Iterator flatten(iterable $iterable)
4Iterator slice(iterable $iterable, int $start, int $length)
5...
1$nums = iter\range(1, 10);
2$numsTimesTen = iter\map(iter\fn\operator('*', 10), $nums);
3// 10 20 30 40 50 60 70 80 90 100
01public function downloadFile($filename, $callback)
02{
03    $contents = get the file asynchronously...
04    $callback($contents);
05}
06 
07public function processDownloadResult($filename, $contents)
08{
09    echo "The file $filename contained a lot of stuff:\n";
10    echo $contents;
11}
12 
13public function handleDownload($filename)
14{
15    $this->downloadFile($filename,
16        [$this, 'processDownloadResult']);
17    // how to get $filename in?
18}

react/partial

https://github.com/reactphp/partial

"Dependency Injection for functions"

1public function handleDownload($filename)
2{
3    $this->downloadFile($filename,
4        Partial\bind(
5            [$this, 'processDownloadResult'],
6            $filename
7        )
8    );
9}

Example: OAI Importer

01$c['drop'] = function ($c) {
02    return igorw\pipeline(
03        Partial\bind([Process\Report::class, 'statsd'],
04            'import-skip'),
05        Partial\bind([Process\Report::class, 'debugPrintItem'],
06            $c['logger'])
07    );
08};
09$c['process'] = $c->protect(function ($job) use ($c) {
10    return igorw\pipeline(
11        $c['crossref.oai'],
12        $c['crossref.mapper'],
13        Partial\bind([Process\Report::class, 'statsd'],
14            'import-read'),
15        Partial\bind([Process\Report::class, 'write'],
16            'Read   ', $c['logger']),
17        Partial\bind([Process\Filter::class, 'requiredFields'],
18            $c['drop']),
19        Partial\bind([Process\Transform::class, 'bulkify'],
20            $c['config']['elasticsearch']['bulksize']),
21        Partial\bind([Process\ElasticSearch::class, 'write'],
22            $c['elasticsearch.type']),
23        [Process\Transform::class, 'unbulkify'],
24        Partial\bind([Process\Doctrine::class, 'updateJob'],
25            $job, $c['doctrine.orm.entity_manager']),
26        Partial\bind([Process\Report::class, 'statsd'],
27            'import-write'),
28        Partial\bind([Process\Report::class, 'write'],
29            'Written', $c['logger'])
30    );
31});

Example: OAI Importer

1$input new \DatePeriod(
2    $from, new \DateInterval('P1D'), $until
3);
4$pipeline = $container['process']($job);
5 
6// consume all input through the pipeline
7foreach ($pipeline($input) as $result) {}

Example: Citation Search

01$app['search'] = $app->protect(function ($term) use ($app) {
02    $responses = $results = [];
03 
04    foreach ($app['search.backends'] as $backend) {
05        $response = $backend->search($term);
06        $response->then(function ($response) use ($backend) {
07            foreach ($response->json() as $value) {
08                yield $backend->citationFromJson($value);
09            }
10        })
11        ->then(function ($citations) use (&$results) {
12            foreach ($citations as $citation) {
13                $results[] = json_encode(
14                    $citation->getStandardData()
15                );
16            }
17        });
18        $responses[] = $response;
19    }
20    foreach ($responses as $response) {
21        $response->wait();
22    }
23    return $results;
24});
25 
26$app->get('/search', function (
27        Application $app, Request $request) {
28    $term = $request->get('term');
29    return json_encode($app['search']($term));
30});

Example: Citation controller

1CitationController::searchAction($term)
2            $searchResults =
3                $this->citationService->search($term)
4...
5            'results' =>
6                $searchResults->deref(),
01class CitationService
02{
03    public function search($term)
04    {
05        ...
06        return new CitationListFuture(
07            $httpClient->get('/search?term='.$term));
08    }
09}
10 
11class CitationListFuture
12{
13    public function __construct(FutureResponse $response) {
14        $this->response = $response;
15    }
16 
17    public function deref()
18    {
19        $response->wait();
20        return $response->json();
21    }
22}

Summary

Thank you.

Questions?

@naderman