Featured image of post Guzzle Promise 链式(嵌套)请求和控制并发数量

Guzzle Promise 链式(嵌套)请求和控制并发数量

Guzzle 采用 promise 方式,通过链式调用 requestAsync、getAsync、postAsync 等函数嵌套异步请求,并且通过 Pool 控制并发数量的方法。

昨天用异步请求修改了我的fast-mail-bomber这个项目的 update-nodes 模块,在这里记录一下踩坑过程。

需求:

  • 需要能够发送异步请求,并且控制一个并发数量。
  • 在一个请求发送后需要用到这个请求返回的内容,才能发送下一个请求

类似场景:异步获取指定的一个图像 URL 列表的图片内容,并把每一张图片都上传到另一个图床上。

需要用到的环境:PHP 7.2 以上版本+Guzzle v7,Guzzle v6 的 async 系列函数有问题,有时候会莫名其妙地输出 responseBody,好坑啊。

这里就用 httpbin.org 来做演示。

首先创建 guzzle 对象:

1
$guzzle=new \GuzzleHttp\Client();

正确姿势

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
$promises=[];// 1
$respList=[];
foreach (range(0,49) as $i){
    $promises[]=function() use ($guzzle,&$respList,$i){// 1, 2
        $promise=$guzzle->getAsync('https://httpbin.org/uuid');
        $promise->then(function($resp) use ($guzzle,&$respList,$i){
            $uuid=json_decode($resp->getBody())->uuid;
            if(empty($uuid)){
                return;// 4
            }
            echo $i.'获取uuid成功:'.$uuid.PHP_EOL;
            $promise2=$guzzle->getAsync('https://httpbin.org/anything?uuid='.$uuid);
            $promise2->then(function($resp2) use (&$respList,$i){// 2
                $respList[]=json_decode($resp2->getBody());
                echo $i.'上传uuid成功'.PHP_EOL;
            },function($reason){
                echo '上传uuid失败 '.$reason.PHP_EOL;
            });
            return $promise2;// 3
        },function($reason){
            echo '获取uuid失败 '.$reason.PHP_EOL;
        });
        return $promise;// 3
    };
}
$pool=new \GuzzleHttp\Pool($guzzle,$promises,[
    'concurrency'=>5,
    'fulfilled'=>function($index){

    },
    'rejected'=>function($reason,$index){

    }
]);
$pool->promise()->wait();
echo count($respList);

要点,对应上面代码中的注释:

  1. 用一个 promises 数组来接收,在循环里面添加一个调用之后返回 async promise 对象的函数。不要直接添加 promise 对象
  2. 对外部变量进行写操作的时候,需要加上&引用,类似指针,不然外部变量不会实质更改
  3. 新建 promise、配置 promise->then、返回 promise 三步走,包括里层的 promise 也是
  4. 这个地方 IDE 会提示缺少参数,原因是需要返回一个 promise,但其实是不一定需要的。

参考:https://stackoverflow.com/questions/43487856/how-to-chain-two-http-requests-in-guzzle

To create a chain of actions you just need to return a new promise from ->then() callback.

错误姿势

1.没有用一个 function 来 yield async 请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$promises=[];
$respList=[];
foreach (range(0,49) as $i){
    $promise=$guzzle->getAsync('https://httpbin.org/uuid');
    $promise->then(function($resp) use ($guzzle,&$respList,$i){
        $uuid=json_decode($resp->getBody())->uuid;
        if(empty($uuid)){
            return;
        }
        echo $i.'获取uuid成功:'.$uuid.PHP_EOL;
        $promise2=$guzzle->getAsync('https://httpbin.org/anything?uuid='.$uuid);
        $promise2->then(function($resp2) use (&$respList,$i){
            $respList[]=json_decode($resp2->getBody());
            echo $i.'上传uuid成功'.PHP_EOL;
        },function($reason){
            echo '上传uuid失败 '.$reason.PHP_EOL;
        });
        return $promise2;
    },function($reason){
        echo '获取uuid失败 '.$reason.PHP_EOL;
    });
    $promises[]=$promise;
}

参考:https://github.com/guzzle/guzzle/issues/1506#issuecomment-232124029

You need to yield promises from your generator or do as @kkopachev suggests, yield functions.

When you initiate an async transfer with requestAsync(), Guzzle will create a curl handle and add it to a shared curl multi instance. By queueing up a large list of promises, you’re adding all of your promises to the multi handle at once, which means when you eventually call wait, you’re waiting on all of the promises at once and not limiting your queue size at all.

2.在第一个 promise 的 then 里面调用了第二个 promise 的 wait

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
foreach (range(0,49) as $i){
    $promises[]=function() use ($guzzle,&$respList,$i){
        $promise=$guzzle->getAsync('https://httpbin.org/uuid');
        $promise->then(function($resp) use ($guzzle,&$respList,$i){
            $uuid=json_decode($resp->getBody())->uuid;
            if(empty($uuid)){
                return;
            }
            echo $i.'获取uuid成功:'.$uuid.PHP_EOL;
            $promise2=$guzzle->getAsync('https://httpbin.org/anything?uuid='.$uuid);
            $promise2->then(function($resp2) use (&$respList,$i){
                $respList[]=json_decode($resp2->getBody());
                echo $i.'上传uuid成功'.PHP_EOL;
            },function($reason){
                echo '上传uuid失败 '.$reason.PHP_EOL;
            });
            $promise2->wait();// 1
        },function($reason){
            echo '获取uuid失败 '.$reason.PHP_EOL;
        });
        return $promise;
    };
}

见代码中的注释 1 处。guzzle 的 promise 在没有调用 wait 函数的时候是不会真正执行的。而由于 PHP 的 promise 并不是真正的多线程,在 promise 里面调用另一个 promise 的 wait 仍然会阻塞。

参考:https://github.com/guzzle/promises/issues/69#issuecomment-311160782

This is intentional. You need to call wait on a promise, tick the promise queue manually, or allow the queue to be ticked on shutdown to fire callbacks. This prevents recursion when resolving promises because recursion in callbacks of promises could cause a stack overflow. This is recommended by the promises/A+ spec: https://promisesaplus.com/#point-34.

You can tick the queue using the following code:

1
GuzzleHttp\Promise\queue->run();

3.误用了 EachPromise 代替 Pool

1
2
3
4
5
6
7
8
9
$pool=new \GuzzleHttp\Promise\EachPromise($promises,[
    'concurrency'=>5,
    'fulfilled'=>function($index){

    },
    'rejected'=>function($reason,$index){

    }
]);

如果这个错误和前面提到的第一个错误同时犯,就会产生看起来 concurrency 无效的情况。

参考:https://github.com/guzzle/guzzle/issues/1506#issuecomment-229024769

When creating the Pool object, the Iterable and the config are inserted and an EachPromise object is created after some additional work. After that, the promise method is called on the EachPromise object which calls the createPromise method. In this method itself, nowhere concurrency is checked but I have the impression all promises are handled in there and added to one big Promise, while for each promise the wait method is called (which fires them as far as I know). After that, the iterable is rewinded (in the promise method) and the refillPending method is called (in the promise method), in which concurrency is checked. But, there are no pending promises anymore, as they’ve already been processed in the createPromise method.

So:

  • Create Pool object with Iterable and Config settings
  • Pool creates EachPromise object
  • Code calls EachPromise->promise() which does: createPromise(), rewind() Iterable, refillPending()

As far as I can see the createPromise() triggers on all promises the wait method, so refillPending has nothing to add anymore. Concurrency check is only done in refillPending method.

Licensed under CC BY-NC-SA 4.0