Using Microservices efficiently:
Asynchronicity & Generators

Nils Adermann

Composer Co-Author
  http://getcomposer.org

phpBB Development Lead
 http://www.phpbb.com

CTO of Forumatic phpBB Hosting
 https://www.forumatic.com

Contractor at Imagine Easy
  http://www.imagineeasy.com

Web Application Architectures

Microservices

Monolithic Application

class CitationController
{
    public function searchAction($term)
    {
        return [
            'projects' =>
                $this->user->getProjects(),
            'recommendations' =>
                $this->recommender->suggestFor($this->project),
            'results' =>
                $this->citationRepository->search($term),
        ];
    }
}
            
class CitationRepository
{
    function search($term)
    {
        ...
        // comes down to: mysqli_query
    }
}
            

Accessing external services

Accessing external services

  1. write/send syscalls
  2. Data gets written onto shared memory/filesystem/network
  3. read/recv syscalls
  4. Process "sleeps"/waits until data available: synchronous or blocked I/O

Accessing external services

Potential Problems

Traditional solution

» Waiting reduced to acceptable amount

Recap: Monolithic Application

class CitationController
{
    public function searchAction($term)
    {
        $this->render([
            'projects' =>
                $this->user->getProjects(),
            'recommendations' =>
                $this->recommender->suggestFor($this->project),
            'results' =>
                $this->citationRepository->search($term),
        ]);
    }
}
            
class CitationRepository
{
    function search($term)
    {
        ...
        // comes down to: mysqli_query
    }
}
            

Naive Microservices

class CitationController
{
    public function searchAction($term)
    {
        return [
            'projects' =>
                $this->projectService->getAll($user->getId()),
            'recommendations' =>
                $this->recommender->suggestFor($this->project),
            'results' =>
                $this->citationService->search($term),
        ];
    }
}
            
class CitationService
{
    public function search($term)
    {
        ...
        return $httpClient->get('/search?term='.$term)->json();
    }
}
            

Search Microservice (Database)

$app->get('/search', function ($app, $request) {
    return $app['citations.repository']->search(
        $request->get('term')
    );
});
            

Search Microservice (APIs)

$app->get('/search', function ($app, $request) {
    $term = $request->get('term');

    $results = $app['fooapi.client']
        ->get('/find?keywords='.$term)->json();
    $results += $app['barapi.client']
        ->get('/query?search='.$term)->json();

    usort($results, function ($a, $b) {...});

    return $results;
});
            

Result

Let's work while we wait

Asynchronicity

Asynchronicity

Asynchronicity

  1. write/send syscalls
  2. Data gets written onto shared memory/filesystem/network
  3. Continue with other work
    » Process ready while data unavailable: asynchronous or non-blocking I/O
  4. Use poll/select/kqueue/libevent/libev to wait once result required

cURL: My slow server

<?php
 
sleep(2);
 
echo "Hello World!\n";
            

cURL

$ch1 = curl_init();
 
curl_setopt($ch1, CURLOPT_URL, "http://localhost/~naderman/slow.php");
curl_setopt($ch1, CURLOPT_HEADER, 0);
curl_setopt($ch1, CURLOPT_RETURNTRANSFER, true);
 
$mh = curl_multi_init();
curl_multi_add_handle($mh,$ch1);
 
$active = null;
do {
    $mrc = curl_multi_exec($mh, $active);
} while ($active &&
    ($mrc == CURLM_CALL_MULTI_PERFORM || $mrc == CURLM_OK));

// Simulated independent workload
sleep(1);
 
curl_multi_remove_handle($mh, $ch1);
curl_multi_close($mh);
            

cURL: Busy waiting (DON'T!)

$ch1 = curl_init();
 
curl_setopt($ch1, CURLOPT_URL, "http://localhost/~naderman/slow.php");
curl_setopt($ch1, CURLOPT_HEADER, 0);
curl_setopt($ch1, CURLOPT_RETURNTRANSFER, true);
 
$mh = curl_multi_init();
curl_multi_add_handle($mh,$ch1);
 
$active = null;
do {
    $mrc = curl_multi_exec($mh, $active);
} while ($active &&
    ($mrc == CURLM_CALL_MULTI_PERFORM || $mrc == CURLM_OK));

// Simulated independent workload
sleep(1);
 
curl_multi_remove_handle($mh, $ch1);
curl_multi_close($mh);
            

cURL: Synchronous HTTP request

$ch1 = curl_init();
 
curl_setopt($ch1, CURLOPT_URL, "http://localhost/~naderman/slow.php");
curl_setopt($ch1, CURLOPT_HEADER, 0);
curl_setopt($ch1, CURLOPT_RETURNTRANSFER, true);
 
$mh = curl_multi_init();
curl_multi_add_handle($mh,$ch1);
 
$active = null;
do {
    $mrc = curl_multi_exec($mh, $active);
} while ($mrc == CURLM_CALL_MULTI_PERFORM);
 
while ($active && $mrc == CURLM_OK) {
    if (curl_multi_select($mh) != -1) {
        do {
            $mrc = curl_multi_exec($mh, $active);
        } while ($mrc == CURLM_CALL_MULTI_PERFORM);
    }
}
 
// Simulated independent workload
sleep(1);
 
curl_multi_remove_handle($mh, $ch1);
curl_multi_close($mh);
            

cURL: Asynchronous HTTP request

$ch1 = curl_init();
 
curl_setopt($ch1, CURLOPT_URL, "http://localhost/~naderman/slow.php");
curl_setopt($ch1, CURLOPT_HEADER, 0);
curl_setopt($ch1, CURLOPT_RETURNTRANSFER, true);
 
$mh = curl_multi_init();
curl_multi_add_handle($mh,$ch1);
 
$active = null;
do {
    $mrc = curl_multi_exec($mh, $active);
} while ($mrc == CURLM_CALL_MULTI_PERFORM);
 
// Simulated independent workload
sleep(1);
 
while ($active && $mrc == CURLM_OK) {
    if (curl_multi_select($mh) != -1) {
        do {
            $mrc = curl_multi_exec($mh, $active);
        } while ($mrc == CURLM_CALL_MULTI_PERFORM);
    }
}
 
curl_multi_remove_handle($mh, $ch1);
curl_multi_close($mh);
            

cURL: Comparison synchronous vs. asynchronous

naderman@Montsoreau:~/tmp$ time php -f client-parallel.php
 
real    0m2.029s
user    0m0.012s
sys     0m0.012s
naderman@Montsoreau:~/tmp$ time php -f client-linear.php
 
real    0m3.029s
user    0m0.012s
sys     0m0.012s
            

Futures

Guzzle5 / RingPHP: Futures

Explicit dereferencing

$response = $client->get(
    'http://httpbin.org',
    ['future' => true]);

// perform other work here

$response->wait();
echo $response->getStatusCode(); // 200
            

Implicit dereferencing

$response = $client->get(
    'http://httpbin.org',
    ['future' => true]);

// perform other work here

echo $response->getStatusCode(); // 200
            

Promises

Promises

https://github.com/reactphp/promise

function getAwesomeResultPromise()
{
    $deferred = new React\Promise\Deferred();
    
    computeAwesomeResultAsynchronously(
        function ($error, $result) use ($deferred) {
            if ($error) {
                $deferred->reject($error);
            } else {
                $deferred->resolve($result);
            }
        });
    return $deferred->promise();
}

getAwesomeResultPromise()
    ->then(
        function ($value) {
            // Deferred resolved, do something with $value
        },
        function ($reason) {
            // Deferred rejected, do something with $reason
        },
        function ($update) {
            // Progress triggered, do sth with $update
        }
    );
            

Guzzle5 / RingPHP: Promises

$response = $client->get(
    'http://httpbin.org',
    ['future' => true]);

// Use the response asynchronously
$response->then(function ($response) {
    echo $response->getStatusCode(); // 200
});

// define other work here
            

React

http://reactphp.org     http://github.com/reactphp

Generators: 0 to 200

function range()
{
    for ($i = 0; $i <= 100; $i++) {
        yield $i => $i*2;
    }
}

foreach (range() as $i => $v) {
    echo $v;
}
// 0 2 4 6 8 10 ... 200
            

processing http response with generators

function filterBadUrls($urlBlacklist, $items)
{
    foreach ($items as $key => $item) {
        if (!isset($urlBlacklist[$item['url']])) {
            yield $key => $item;
        }
    }
}
            
function statsdCount()
{
    foreach ($items as $item) {
        \StatsD::increment('elements-processed');
        yield $item;
    }
}
            

processing http response with generators

function bulkify()
{
    $bulk = array();
    foreach ($items as $item) {
        $bulk[] = $item;

        if (count($bulk) == 100) {
            yield $bulk;
            $bulk = array();
        }
    }

    if (!empty($bulk)) {
        yield $bulk;
    }
}

foreach (bulkify($results) as $documents) {
    $elasticaType->addDocuments($documents);
}
            

nikic/iter

https://github.com/nikic/iter

Iteration primitives implemented using generators

Accept all iterables: array, traversable, iterator, aggregate

Iterator map(callable $function, iterable $iterable)            
Iterator filter(callable $predicate, iterable $iterable)
Iterator flatten(iterable $iterable)
Iterator slice(iterable $iterable, int $start, int $length)
...
            
$nums = iter\range(1, 10);
$numsTimesTen = iter\map(iter\fn\operator('*', 10), $nums);
// 10 20 30 40 50 60 70 80 90 100
            
public function downloadFile($filename, $callback)
{
    $contents = get the file asynchronously...
    $callback($contents);
}

public function processDownloadResult($filename, $contents)
{
    echo "The file $filename contained a lot of stuff:\n";
    echo $contents;
}

public function handleDownload($filename)
{
    $this->downloadFile($filename,
        [$this, 'processDownloadResult']);
    // how to get $filename in?
}
            

react/partial

https://github.com/reactphp/partial

"Dependency Injection for functions"

public function handleDownload($filename)
{
    $this->downloadFile($filename,
        Partial\bind(
            [$this, 'processDownloadResult'],
            $filename
        )
    );
}
            

Example: OAI Importer

$c['drop'] = function ($c) {
    return igorw\pipeline(
        Partial\bind([Process\Report::class, 'statsd'],
            'import-skip'),
        Partial\bind([Process\Report::class, 'debugPrintItem'],
            $c['logger'])
    );
};
$c['process'] = $c->protect(function ($job) use ($c) {
    return igorw\pipeline(
        $c['crossref.oai'],
        $c['crossref.mapper'],
        Partial\bind([Process\Report::class, 'statsd'],
            'import-read'),
        Partial\bind([Process\Report::class, 'write'],
            'Read   ', $c['logger']),
        Partial\bind([Process\Filter::class, 'requiredFields'],
            $c['drop']),
        Partial\bind([Process\Transform::class, 'bulkify'],
            $c['config']['elasticsearch']['bulksize']),
        Partial\bind([Process\ElasticSearch::class, 'write'],
            $c['elasticsearch.type']),
        [Process\Transform::class, 'unbulkify'],
        Partial\bind([Process\Doctrine::class, 'updateJob'],
            $job, $c['doctrine.orm.entity_manager']),
        Partial\bind([Process\Report::class, 'statsd'],
            'import-write'),
        Partial\bind([Process\Report::class, 'write'],
            'Written', $c['logger'])
    );
});
    

Example: OAI Importer


            $input =  new \DatePeriod(
                $from, new \DateInterval('P1D'), $until
            );
            $pipeline = $container['process']($job);

            // consume all input through the pipeline
            foreach ($pipeline($input) as $result) {}
            

Example: Citation Search

$app['search'] = $app->protect(function ($term) use ($app) {
    $responses = $results = [];

    foreach ($app['search.backends'] as $backend) {
        $response = $backend->search($term);
        $response->then(function ($response) use ($backend) {
            foreach ($response->json() as $value) {
                yield $backend->citationFromJson($value);
            }
        })
        ->then(function ($citations) use (&$results) {
            foreach ($citations as $citation) {
                $results[] = json_encode(
                    $citation->getStandardData()
                );
            }
        });
        $responses[] = $response;
    }
    foreach ($responses as $response) {
        $response->wait();
    }
    return $results;
});

$app->get('/search', function (
        Application $app, Request $request) {
    $term = $request->get('term');
    return json_encode($app['search']($term));
});
            

Example: Citation controller

CitationController::searchAction($term)
            $searchResults =
                $this->citationService->search($term)
...
            'results' =>
                $searchResults->deref(),
            
class CitationService
{
    public function search($term)
    {
        ...
        return new CitationListFuture(
            $httpClient->get('/search?term='.$term));
    }
}

class CitationListFuture
{
    public function __construct(FutureResponse $response) {
        $this->response = $response;
    }

    public function deref()
    {
        $response->wait();
        return $response->json();
    }
}
            

Summary

Thank you.

Questions?

@naderman