如何使用PHP的swoole异步高扩展提高服务器并发能力上传超大文件
不知道有没有人考虑过通过 PHP 上传大文件,比如 1GB、10GB、100GB... ,又如果你运行机器的内存只有几百 M,总之低于目标大文件的大小。
如果这是在公司业务里,大多数会定一个上传上限,比如几 MB,或者否定这个需求。
在某些业务里确实也有用到过上传大型文件的需求,网上的解决方案基本上都是趋向于通过分片上传,或者将文件直接存储到第三方云存储,系统只保存一个链接。
举一个例子,比如 WEBDAV,他对于上传他是没有分片上传这一设计的,就算你实现了分片接收文件,总不能要求所有以 webdav 协议为标准的客户端都去实现分片上传。
但在此做一个结论,在复杂的互联网网络环境里,分片上传断点续传是比较可靠稳定的。那么以下的实现仅限于网络可靠的私有网络。
首先我们需要了解,PHP 默认配置中只支持 8M的上传,要支持更大的文件,你需要将 php.ini 中 upload_max_filesize 调整到大于等于目标文件大小,才能上传这个文件,这个配置也不能灵活动态调整。
之后最重要,PHP 的请求处理机制,他会把所有 HTTP 请求头、内容完整读取到内存才会分发请求,执行脚本文件。
试想一下,如果访客上传一个 10G 文件或者更大,内存岂不是会被一次性被吞掉。这是非常不合理的,而且这段时间速度会非常慢,处于假死状态。
这点在 Golang 基础库里做的不错,它几乎可以实现接近无消耗内存上传大文件,全 CPU 操作。
我们今天采用 Swoole 实现大文件上传,swoole 也有提供创建 HTTP 服务的方法,但依然有上传大小的限制。
修改最大数据包大小参数,package_max_length,不过这个是支持动态调整的,但依然是会将数据吞到内存里。
Swoole 底层是全内存的,因此如果设置过大可能会导致大量并发请求将服务器资源耗尽。
接下来,我们通过 Swoole 创建 TCP 服务,手动完成 HTTP 协议交互,实现大文件上传。
swoole入门到实战
第一步了解下,简单实现一个 TCP 服务。
$server = new Swoole\Server('0.0.0.0', 5555); $server->on('start', function ($server) { echo "TCP Server is started at tcp://127.0.0.1:9503\n";
}); $server->on('connect', function ($server, $fd){ echo "connection open: {$fd}\n";
}); $server->on('receive', function ($server, $fd, $reactor_id, $data) { $server->send($fd, "Swoole: {$data}");
}); $server->on('close', function ($server, $fd) { echo "connection close: {$fd}\n";
}); $server->start();
将以上代码保存为 myhttp.php ,并尝试运行
php myhttp.php
此时只能还不能作为 HTTP 服务使用,只能通过 telnet 尝试连接,测试连接和消息收发。
接下来,我们通过简单改造,在 onReceive 事件时,给连接返回一段 HTTP 基础相应信息。
$server = new Swoole\Server('0.0.0.0', 5555); $server->on('start', function ($server) { echo "TCP Server is started at tcp://127.0.0.1:9503\n";
}); $server->on('connect', function ($server, $fd){ echo "connection open: {$fd}\n";
}); $server->on('receive', function ($server, $fd, $reactor_id, $data) { echo sprintf("<< [收到数据请求:]\n%s\n", $data); $raw = <<; echo sprintf(">> [回应数请求:]\n%s\n", $raw); $server->send($fd, $raw);
}); $server->on('close', function ($server, $fd) { echo "connection close: {$fd}\n";
}); $server->start();
此时,你再通过浏览器访问,浏览器也能展示出来数据了。
相应的,你通过终端也能接收到浏览器的请求信息和你回应的信息了。
[root@localhost phpalbum]# php myhttp.php TCP Server is started at tcp://127.0.0.1:9503 connection open: 1 << [收到数据请求:] GET / HTTP/1.1 Host: 192.168.32.137:5555 Connection: keep-alive Cache-Control: max-age=0 Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9 Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9,ja;q=0.8 >> [回应数请求:] HTTP/1.1 200 OK Content-Type: text/html Content-Length: 11 Connection: close Server: eller-server hello world
接下来,只需要完善 HTTP 请求头的解析操作和相应操作就能实现一个基础的 HTTP 服务。
解析请求头可以使用 Swoole 提供的方法:
$req = Request::create(['parse_cookie' => false]); $req->parse($data);
之后,就可以按照 Swoole 的文档使用这对象,https://wiki.swoole.com/#/http_server?id=httprequest
我们也可以自己写一个简单解析方法,目的是从数据流中分离请求头,并解析出 Header 数据。
我们可以通过抓包或者将接收到的数据保存到文件,进行对比查看。
这里随便拿一个我测试的 HTTP 请求举例:
POST /api?ss=33 HTTP/1.1 Content-Type: application/json User-Agent: PostmanRuntime/7.26.8 Accept: */* Cache-Control: no-cache Postman-Token: 277cd4f6-8841-4bd7-b484-6569eb447a27 Host: 192.168.32.137:5555 Accept-Encoding: gzip, deflate, br Connection: keep-alive Content-Length: 14 {"name":"jax"}
具体如下:
首行请求行,是标注请求方法、请求 URI、请求协议,以及它是通过 \r\n 分割的,十六进制写作 "0d0a"接下来是请求头,存储着客户端的基本信息和附加的请求头数据,也就是我们熟知的 Header 数据,它由多行构成,也是由 \r\n 换行的。再后面会空一行,继续换行,这里实际上是两个换行 \r\n\r\n,十六进制写作 "0d0a0d0a"最后就是 payload(body) 数据了,从此刻起,获取数据的多少有 header 头的 Content-Length 长度决定,一直获取直至结束。注意:这里大请求可能会进行分包,需要多次获取。
以下是解析方法:
protected function preread_http_header(&$data) {//ff d8 ff e0 // $pos = strpos($data,"\r\n"); // var_dump(substr($data,0, $pos)); $pos = strpos($data, hexToStr("0d0a0d0a")); $header = substr($data, 0, $pos); // file_put_contents('./test.txt', $header.PHP_EOL, FILE_APPEND); if ($headers = explode("\r\n", $header)) { $key_map = ['Content-Length' => 0]; foreach ($headers as $item) { $buffer = explode(":", $item); if (count($buffer) >= 2) { $key_map[trim($buffer[0])] = trim($buffer[1]);
} unset($buffer);
} unset($headers); unset($header); unset($pos); return $key_map;
} else { return false;
}
} protected function preread_http_request(&$data){ $pos = strpos($data, hex2bin("0d0a")); $first_line = substr($data, 0, $pos); list($method, $path, $protocol_version) = explode(' ', $first_line); $request = [ 'method' => trim(strtoupper($method)), 'request_uri' => trim($path), 'protocol_version' => trim($protocol_version),
]; $request['headers'] = $this->preread_http_header($data); return $request;
}
解析后的结果:
array(4) {
["method"]=> string(3) "GET" ["request_uri"]=> string(12) "/favicon.ico" ["protocol_version"]=> string(8) "HTTP/1.1" ["headers"]=> array(9) {
["Host"]=> string(14) "192.168.32.137" ["Connection"]=> string(10) "keep-alive" ["Pragma"]=> string(8) "no-cache" ["Cache-Control"]=> string(8) "no-cache" ["User-Agent"]=> string(114) "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36" ["Accept"]=> string(64) "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8" ["Referer"]=> string(4) "http" ["Accept-Encoding"]=> string(13) "gzip, deflate" ["Accept-Language"]=> string(23) "zh-CN,zh;q=0.9,ja;q=0.8" }
}
此时我们还需要再写一个解析 payload 数据的方法,也是一样,通过分割符定位 header 之后的内容,就认为是 body,将具体数据进行解析:
protected function parse_http_payload(array $request, string &$data) { $payload = []; $form = []; $raw = ''; $pos = strpos($data, hex2bin("0d0a0d0a")); if ($pos != -1) { $body = substr($data, $pos + strlen(hex2bin("0d0a0d0a")));
} else { $body = $data;
} if (isset($request['headers']['Content-Type'])) { if ($request['headers']['Content-Type'] == 'application/x-www-form-urlencoded') { $arr_data = explode('&', $body); foreach ($arr_data as $item_data) { $value_ret = explode('=', $item_data); $form[$value_ret[0]] = $value_ret[1] ?? '';
}
} else if (preg_match('#multipart/form-data; boundary=([^\b]+)\b#is', $request['headers']['Content-Type'], $matches)) { $boundary_str = $matches[1]; $arr_data = explode($boundary_str, $body); foreach ($arr_data as $item_data) { if (preg_match('#Content-Disposition: form-data; name="([^"]+)"\r\n\r\n(.*)\r\n#is', $item_data, $value_ret)) { $form[$value_ret[1]] = $value_ret[2];
} //Content-Disposition: form-data; name="file"; filename="222.png" //Content-Type: image/png elseif (preg_match('#Content-Disposition: form-data; name="file"; filename="([^"]+)"\r\nContent-Type: ([A-Za-z0-9/]+)\r\n\r\n(.*)\r\n#is', $item_data, $value_ret)) { $form[$value_ret[1]] = ['type' => $value_ret[2], 'stream' => $value_ret[3]]; file_put_contents('aa3.png', $value_ret[3]);
}
}
} else if (in_array($request['headers']['Content-Type'], [ 'text/plain', 'application/json', 'application/javascript', 'text/html', 'application/xml',
])) { $raw = $body;
} else { // 其他都认为是资源文件 $raw = $body;
}
} $payload = [ 'from' => $form, 'raw' => $raw ]; unset($body, $arr_data, $item_data, $boundary_str, $value_ret, $form, $raw); return $payload;
}
以上就是对请求体的简单解析,这时候还不能够实现大文件上传,依然会将文件吞到内存里。
我们通过对 onReceive方法进行改造,实现对数据包的拆分,读取完整的 HTTP 请求后再进行触发执行。
区别在于,我们设立一个 Content-Length 上限值,如果大于则将后续的数据存储到文件。
由于多文件会进行分包,多次进行接收数据,onReceive 会被执行多次,我们需要要接收的数据和已经接收的数据做记录。
比如第一次接收到数据包将 Content-Length 存起来,直到累计接收到大于等于这个数据包时则认为数据完整。
这里需要注意,如果你使用的是 SWOOLE_BASE 模式,实际 onReceive 会在多个进程里,你需要使用 Swoole 提供的内存 Table 来实现数据共享存储。
在这里我们使用 SWOOLE_PROCESS 模式创建服务,这种模式也是默认的,也就是 onReceive 对于一个连接来说是固定的进程,我们不需要跨进程通信。
我们只需要将数据存储到当前类成员就行了:
protected $fd_data = []; public function onReceive($server, $fd, $reactor_id, $data) { if(!isset($this->fd_data[$fd])){ $this->fd_data[$fd] = [ 'is_completed' => 0, // 是否完成 'is_parse_head' => 0, // 是否解析 head 'require_length' => 0, // 请求需要长度 'recv_length' => 0, // 已经接收长度 ];
}
}
首次执行 onReceive 时,要进行解析 header 操作。
这里有两种情况,一种是,首次解析 header 时就已经读取完 Content-Length 的所有数据了,那就直接触发请求 Request
还有一种,经过不断接收数据包,累计接收达到 Content-Length 的数据,此时再触发请求 Request
这是完成的代码:
public function onReceive($server, $fd, $reactor_id, $data) { if(!isset($this->fd_data[$fd])){ $this->fd_data[$fd] = [ 'is_completed' => 0, // 是否完成 'is_parse_head' => 0, // 是否解析 head 'require_length' => 0, // 请求需要长度 'recv_length' => 0, // 已经接收长度 ];
} if (!$this->fd_data[$fd]['is_parse_head']) { $this->debugText("<<<<<<<<<<尝试解析 header:$fd\n"); $request = $this->preread_http_request($data); $body_pos = strpos($data, hex2bin("0d0a0d0a")); $content_length = intval($request['headers']['Content-Length']); $recv_length = strlen($data) - $body_pos + strlen(hex2bin("0d0a0d0a")); $this->fd_data[$fd]= [ 'is_parse_head' => 1, 'require_length' => $request['headers']['Content-Length'] ?? 0, 'recv_length' => $recv_length ]; if ($recv_length >= $content_length) { // 接收完毕 $this->debugText(sprintf("<< [一次接收完毕,触发请求,$recv_length, $content_length]\n")); $this->onRequest($server, $fd, $request, $data);
} else { $this->debugText(sprintf("<< [未接收完毕]\n")); // 未接收完毕,需要多次接收 return false;
}
} else { $this->debugText(sprintf("<< [分段接收]\n")); // 这里接收数据 $this->fd_data[$fd]['recv_length'] += strlen($data); if ( $this->fd_data[$fd]['recv_length'] >= $this->fd_data[$fd]['require_length']) { $this->debugText(sprintf("<< [接收完毕,触发请求] %s == %s\n", $this->fd_data[$fd]['recv_length'], $this->fd_data[$fd]['require_length'])); $this->onRequest($server, $fd, [], '');
}else{ return false;
}
} // 请求完释放内存数据 $this->debugText(sprintf("释放数据拉 => %s\n", $fd)); unset($this->fd_data[$fd]);
}
仔细观察这里会发现,如果当最后一次回调到 onReceive 时,由于是多次处理的回调,他已经拿不到首次解析的 Header 请求数据了。
我们需要将数据存储起来等到最后一次回调后拿到数据去触发 request 请求,这个数据可以存储在共享内存、或者类成员也可以其他文件等等。
我不打算将数据存储在其他地方,打算通过协程挂起暂停恢复的形式,将首次的回调暂停下来,等到数据接收完毕后再恢复协程,这时候首次的协程继续执行,
Request Header 解析的数据就在当前作用域空间,通过这样实现请求的分发。
// 获取当前协程 ID $cid = Swoole\Coroutine::getuid(); // 暂停协程执行 Swoole\Coroutine::yield(); // 恢复协程执行 Swoole\Coroutine::resume($cid);
除此之外,我们还需要将完整数据包存储到文件,供 onRequest 时调用:
// 首次写入 file_put_contents('./tmp/'.$fd, substr($data, $body_pos + strlen(hex2bin("0d0a0d0a")))); // 之后每次追加 file_put_contents('./tmp/'.$fd, $data,FILE_APPEND);
经过对比,我们发现上传后的文件 MD5 一致。
为了加快性能,我们开启一键协程化,他会自动将所有耗时函数改成协程调用。
将以下代码放入 on Start 里(官方说 enableCoroutine应放到创建进程后,Co::set() 放在创建进程前):
Swoole\Runtime::enableCoroutine($flags = SWOOLE_HOOK_ALL);
实际运行来看,enableCoroutine 要比 Co::set() 设置的协程化性能要高,这点原因不清楚。
为了再加快速度,我们将调试代码统一到一个方法里,统一关闭输出,因为这在终端打印非常耗时,所以仅调试时开启输出。
还有一点就是最终在 onRequest 请求结束后,记得将缓存文件( tmp/* )在合理的机制下删除,否则会导致 onRequest 里的程序没有完整读完。
这是最终的 onReceive 代码:
public function onReceive($server, $fd, $reactor_id, $data) { if(!isset($this->fd_data[$fd])){ $this->fd_data[$fd] = [ 'is_completed' => 0, // 是否完成 'is_parse_head' => 0, // 是否解析 head 'require_length' => 0, // 请求需要长度 'recv_length' => 0, // 已经接收长度 ];
} if (!$this->fd_data[$fd]['is_parse_head']) { $this->debugText("<<<<<<<<<<尝试解析 header:$fd\n"); $request = $this->preread_http_request($data); $body_pos = strpos($data, hex2bin("0d0a0d0a")); $content_length = intval($request['headers']['Content-Length']); $recv_length = strlen($data) - $body_pos + strlen(hex2bin("0d0a0d0a")); $this->fd_data[$fd]= [ 'is_parse_head' => 1, 'require_length' => $request['headers']['Content-Length'] ?? 0, 'recv_length' => $recv_length ]; file_put_contents('./tmp/'.$fd, substr($data, $body_pos + strlen(hex2bin("0d0a0d0a")))); if ($recv_length >= $content_length) { // 接收完毕 $this->debugText(sprintf("<< [一次接收完毕,触发请求,$recv_length, $content_length]\n")); $this->onRequest($server, $fd, $request, $data);
} else { $this->debugText(sprintf("<< [未接收完毕]\n")); // 未接收完毕,需要多次接收 $cid = Swoole\Coroutine::getuid(); $this->debugText(sprintf("<< [让出协程] code => %s\n",$cid)); $this->fd_data[$fd]['cid'] = $cid; Swoole\Coroutine::yield(); $this->debugText(sprintf("<< [恢复协程,准备触发 Request]\n")); $this->onRequest($server, $fd, $request, $data); return false;
}
} else { $this->debugText(sprintf("<< [分段接收]\n")); // 这里接收数据 file_put_contents('./tmp/'.$fd, $data,FILE_APPEND); $this->fd_data[$fd]['recv_length'] += strlen($data); if ( $this->fd_data[$fd]['recv_length'] >= $this->fd_data[$fd]['require_length']) { $this->debugText(sprintf("<< [接收完毕,触发请求] %s == %s\n", $this->fd_data[$fd]['recv_length'], $this->fd_data[$fd]['require_length'])); $cid = $this->fd_data[$fd]['cid']; Swoole\Coroutine::resume($cid); $this->debugText(sprintf("<< [准备恢复协程,%s]\n",$cid));
}else{ return false;
}
} // 请求完释放内存数据 $this->debugText(sprintf("释放数据拉 => %s\n", $fd)); unset($this->fd_data[$fd]);
}
目前测试是通过 RAW 形式上传文件的,没有去解析 formdata 的数据,如果你需要,可以试着从 formdata 解析数据。
POSTMAN 截图测试如下:
HTTP 还有很多细节可以参照文档继续完善,比如其中的 keep alive 机制,目前我们都是短链接,一个请求打开一个链接关闭一个链接。
如果你尝试维护 keep alive 也可以实现更高性能的链接复用,减少握手次数,降低资源消耗。
蓝天采集器实战教程-PbootCMS采集发布直播
使用PHP的swoole扩展创建了一个TCP/HTTP服务,可以接收客户端的请求,并根据请求参数,使用task模型或多进程模型来异步执行爬虫采集百度搜索的任务,
可以根据业务需要通过拓展实现更多个并发功能。