Back
Featured image of post Guzzle Promise 链式(嵌套)请求和控制并发数量

Guzzle Promise 链式(嵌套)请求和控制并发数量

Guzzle采用promise方式,通过链式调用requestAsync、getAsync、postAsync等函数嵌套异步请求,并且通过Pool控制并发数量的方法。

昨天用异步请求修改了我的fast-mail-bomber这个项目的update-nodes模块,在这里记录一下踩坑过程。

需求:

  • 需要能够发送异步请求,并且控制一个并发数量。
  • 在一个请求发送后需要用到这个请求返回的内容,才能发送下一个请求

类似场景:异步获取指定的一个图像URL列表的图片内容,并把每一张图片都上传到另一个图床上。

需要用到的环境:PHP 7.2以上版本+Guzzle v7,Guzzle v6的async系列函数有问题,有时候会莫名其妙地输出responseBody,好坑啊。

这里就用httpbin.org来做演示。

首先创建guzzle对象:

$guzzle=new \GuzzleHttp\Client();

正确姿势

$promises=[];// 1
$respList=[];
foreach (range(0,49) as $i){
    $promises[]=function() use ($guzzle,&$respList,$i){// 1, 2
        $promise=$guzzle->getAsync('https://httpbin.org/uuid');
        $promise->then(function($resp) use ($guzzle,&$respList,$i){
            $uuid=json_decode($resp->getBody())->uuid;
            if(empty($uuid)){
                return;// 4
            }
            echo $i.'获取uuid成功:'.$uuid.PHP_EOL;
            $promise2=$guzzle->getAsync('https://httpbin.org/anything?uuid='.$uuid);
            $promise2->then(function($resp2) use (&$respList,$i){// 2
                $respList[]=json_decode($resp2->getBody());
                echo $i.'上传uuid成功'.PHP_EOL;
            },function($reason){
                echo '上传uuid失败 '.$reason.PHP_EOL;
            });
            return $promise2;// 3
        },function($reason){
            echo '获取uuid失败 '.$reason.PHP_EOL;
        });
        return $promise;// 3
    };
}
$pool=new \GuzzleHttp\Pool($guzzle,$promises,[
    'concurrency'=>5,
    'fulfilled'=>function($index){

    },
    'rejected'=>function($reason,$index){

    }
]);
$pool->promise()->wait();
echo count($respList);

要点,对应上面代码中的注释:

  1. 用一个promises数组来接收,在循环里面添加一个调用之后返回async promise对象的函数。不要直接添加promise对象
  2. 对外部变量进行写操作的时候,需要加上&引用,类似指针,不然外部变量不会实质更改
  3. 新建promise、配置promise->then、返回promise三步走,包括里层的promise也是
  4. 这个地方IDE会提示缺少参数,原因是需要返回一个promise,但其实是不一定需要的。

参考:https://stackoverflow.com/questions/43487856/how-to-chain-two-http-requests-in-guzzle

To create a chain of actions you just need to return a new promise from ->then() callback.

错误姿势

1.没有用一个function来yield async 请求

$promises=[];
$respList=[];
foreach (range(0,49) as $i){
    $promise=$guzzle->getAsync('https://httpbin.org/uuid');
    $promise->then(function($resp) use ($guzzle,&$respList,$i){
        $uuid=json_decode($resp->getBody())->uuid;
        if(empty($uuid)){
            return;
        }
        echo $i.'获取uuid成功:'.$uuid.PHP_EOL;
        $promise2=$guzzle->getAsync('https://httpbin.org/anything?uuid='.$uuid);
        $promise2->then(function($resp2) use (&$respList,$i){
            $respList[]=json_decode($resp2->getBody());
            echo $i.'上传uuid成功'.PHP_EOL;
        },function($reason){
            echo '上传uuid失败 '.$reason.PHP_EOL;
        });
        return $promise2;
    },function($reason){
        echo '获取uuid失败 '.$reason.PHP_EOL;
    });
    $promises[]=$promise;
}

参考:https://github.com/guzzle/guzzle/issues/1506#issuecomment-232124029

You need to yield promises from your generator or do as @kkopachev suggests, yield functions.

When you initiate an async transfer with requestAsync(), Guzzle will create a curl handle and add it to a shared curl multi instance. By queueing up a large list of promises, you’re adding all of your promises to the multi handle at once, which means when you eventually call wait, you’re waiting on all of the promises at once and not limiting your queue size at all.

2.在第一个promise的then里面调用了第二个promise的wait

foreach (range(0,49) as $i){
    $promises[]=function() use ($guzzle,&$respList,$i){
        $promise=$guzzle->getAsync('https://httpbin.org/uuid');
        $promise->then(function($resp) use ($guzzle,&$respList,$i){
            $uuid=json_decode($resp->getBody())->uuid;
            if(empty($uuid)){
                return;
            }
            echo $i.'获取uuid成功:'.$uuid.PHP_EOL;
            $promise2=$guzzle->getAsync('https://httpbin.org/anything?uuid='.$uuid);
            $promise2->then(function($resp2) use (&$respList,$i){
                $respList[]=json_decode($resp2->getBody());
                echo $i.'上传uuid成功'.PHP_EOL;
            },function($reason){
                echo '上传uuid失败 '.$reason.PHP_EOL;
            });
            $promise2->wait();// 1
        },function($reason){
            echo '获取uuid失败 '.$reason.PHP_EOL;
        });
        return $promise;
    };
}

见代码中的注释1处。guzzle的promise在没有调用wait函数的时候是不会真正执行的。而由于PHP的promise并不是真正的多线程,在promise里面调用另一个promise的wait仍然会阻塞。

参考:https://github.com/guzzle/promises/issues/69#issuecomment-311160782

This is intentional. You need to call wait on a promise, tick the promise queue manually, or allow the queue to be ticked on shutdown to fire callbacks. This prevents recursion when resolving promises because recursion in callbacks of promises could cause a stack overflow. This is recommended by the promises/A+ spec: https://promisesaplus.com/#point-34.

You can tick the queue using the following code:

GuzzleHttp\Promise\queue->run();

3.误用了EachPromise代替Pool

$pool=new \GuzzleHttp\Promise\EachPromise($promises,[
    'concurrency'=>5,
    'fulfilled'=>function($index){

    },
    'rejected'=>function($reason,$index){

    }
]);

如果这个错误和前面提到的第一个错误同时犯,就会产生看起来concurrency无效的情况。

参考:https://github.com/guzzle/guzzle/issues/1506#issuecomment-229024769

When creating the Pool object, the Iterable and the config are inserted and an EachPromise object is created after some additional work. After that, the promise method is called on the EachPromise object which calls the createPromise method. In this method itself, nowhere concurrency is checked but I have the impression all promises are handled in there and added to one big Promise, while for each promise the wait method is called (which fires them as far as I know). After that, the iterable is rewinded (in the promise method) and the refillPending method is called (in the promise method), in which concurrency is checked. But, there are no pending promises anymore, as they’ve already been processed in the createPromise method.

So:

  • Create Pool object with Iterable and Config settings
  • Pool creates EachPromise object
  • Code calls EachPromise->promise() which does: createPromise(), rewind() Iterable, refillPending()

As far as I can see the createPromise() triggers on all promises the wait method, so refillPending has nothing to add anymore. Concurrency check is only done in refillPending method.

Licensed under CC BY-NC-SA 4.0
-1