Swoole RPC 的實(shí)現(xiàn)
文章實(shí)現(xiàn)了一個簡單的 RPC 遠(yuǎn)程調(diào)用,在實(shí)現(xiàn)之前需要先了解什么是 RPC,不清楚的可以看下之前發(fā)的這篇文章 《我眼中的 RPC》。
下面的演示代碼主要使用了 Swoole 的 Task 任務(wù)池,通過 OnRequest/OnReceive 獲得信息交給 Task 去處理。
舉個工作中的例子吧,在電商系統(tǒng)中的兩個模塊,個人中心模塊和訂單管理模塊,這兩個模塊是獨(dú)立部署的,可能不在一個機(jī)房,可能不是一個域名,現(xiàn)在個人中心需要通過 用戶ID 和 訂單類型 獲取訂單數(shù)據(jù)。
實(shí)現(xiàn)效果
客戶端
HTTP 請求
//代碼片段 <?php $demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'get_list', 'param' => [ 'uid' => 1, 'type' => 2, ], ], ]; $ch = curl_init(); $options = [ CURLOPT_URL => 'http://10.211.55.4:9509/', CURLOPT_POST => 1, CURLOPT_POSTFIELDS => json_encode($demo), ]; curl_setopt_array($ch, $options); curl_exec($ch); curl_close($ch);TCP 請求
//代碼片段 $demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'get_list', 'param' => [ 'uid' => 1, 'type' => 2, ], ], ]; $this->client->send(json_encode($demo));
請求方式
SW 單個請求,等待結(jié)果
發(fā)出請求后,分配給 Task ,并等待 Task 執(zhí)行完成后,再返回。
SN 單個請求,不等待結(jié)果
發(fā)出請求后,分配給 Task 之后,就直接返回。
發(fā)送數(shù)據(jù)
$demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'get_list', 'param' => [ 'uid' => 1, 'type' => 2, ], ], ];
type 同步/異步設(shè)置
token 可進(jìn)行權(quán)限驗(yàn)證
class 請求的類名
method 請求的方法名
uid 參數(shù)一
type 參數(shù)二
返回?cái)?shù)據(jù)

request_method 請求方式
request_time 請求開始時間
response_time 請求結(jié)束時間
code 標(biāo)識
msg 標(biāo)識值
data 約定數(shù)據(jù)
query 請求參數(shù)
代碼
OnRequest.php
<?php if (!defined('SERVER_PATH')) exit("No Access"); class OnRequest { private static $query; private static $code; private static $msg; private static $data; public static function run($serv, $request, $response) { try { $data = decrypt($request->rawContent()); self::$query = $data; if (empty($data)) { self::$code = '-1'; self::$msg = '非法請求'; self::end($request, $response); } //TODO 驗(yàn)證Token switch ($data['type']) { case 'SW': //單個請求,等待結(jié)果 $task = [ 'request' => $data, 'server' => 'http' ]; $rs = $serv->task(json_encode($task), -1, function ($serv, $task_id, $rs_data) use ($request, $response) { self::$code = '1'; self::$msg = '成功'; self::$data = $rs_data['response']; self::end($request, $response); }); if ($rs === false) { self::$code = '-1'; self::$msg = '失敗'; self::end($request, $response); } break; case 'SN': //單個請求,不等待結(jié)果 $task = [ 'request' => $data, 'server' => 'http' ]; $rs = $serv->task(json_encode($task)); if ($rs === false) { self::$code = '-1'; self::$msg = '失敗'; } else { self::$code = '1'; self::$msg = '成功'; } self::end($request, $response); break; default: self::$code = '-1'; self::$msg = '非法請求'; self::end($request, $response); } } catch(Exception $e) { } } private static function end($request = null, $response = null) { $rs['request_method'] = $request->server['request_method']; $rs['request_time'] = $request->server['request_time']; $rs['response_time'] = time(); $rs['code'] = self::$code; $rs['msg'] = self::$msg; $rs['data'] = self::$data; $rs['query'] = self::$query; $response->end(json_encode($rs)); self::$data = []; return; } }
OnReceive.php
<?php if (!defined('SERVER_PATH')) exit("No Access"); class OnReceive { private static $request_time; private static $query; private static $code; private static $msg; private static $data; public static function run($serv, $fd, $reactor_id, $data) { try { self::$request_time = time(); $data = decrypt($data); self::$query = $data; //TODO 驗(yàn)證Token switch ($data['type']) { case 'SW': //單個請求,等待結(jié)果 $task = [ 'fd' => $fd, 'request' => $data, 'server' => 'tcp', 'request_time' => self::$request_time, ]; $rs = $serv->task(json_encode($task)); if ($rs === false) { self::$code = '-1'; self::$msg = '失敗'; self::handlerTask($serv, $fd); } break; case 'SN': //單個請求,不等待結(jié)果 $task = [ 'fd' => $fd, 'request' => $data, 'server' => 'tcp', 'request_time' => self::$request_time, ]; $rs = $serv->task(json_encode($task)); if ($rs === false) { self::$code = '-1'; self::$msg = '失敗'; } else { self::$code = '1'; self::$msg = '成功'; } self::handlerTask($serv, $fd); break; default: self::$code = '-1'; self::$msg = '非法請求'; self::handlerTask($serv, $fd); } } catch(Exception $e) { } } private static function handlerTask($serv, $fd) { $rs['request_method'] = 'TCP'; $rs['request_time'] = self::$request_time; $rs['response_time'] = time(); $rs['code'] = self::$code; $rs['msg'] = self::$msg; $rs['data'] = self::$data; $rs['query'] = self::$query; $serv->send($fd, json_encode($rs)); } }
小結(jié)
Demo 代碼僅供參考,里面有很多不嚴(yán)謹(jǐn)?shù)牡胤剑?br />
服務(wù)的調(diào)用方與提供方中間需要有一個服務(wù)注冊中心,很顯然上面的代碼中沒有,需要自己去實(shí)現(xiàn)。
服務(wù)注冊中心,負(fù)責(zé)管理 IP、Port 信息,提供給調(diào)用方使用,還要能負(fù)載均衡和故障切換。
根據(jù)自己的情況,服務(wù)注冊中心實(shí)現(xiàn)可容易可復(fù)雜,用 Redis 也行,用 Zookeeper、Consul 也行。
感興趣的也可以了解下網(wǎng)關(guān) Kong ,包括 身份認(rèn)證、權(quán)限認(rèn)證、流量控制、監(jiān)控預(yù)警...
再推薦一個 Swoole RPC 框架 Hprose,支持多語言。
掃碼二維碼 獲取免費(fèi)視頻學(xué)習(xí)資料
- 本文固定鏈接: http://www.wangchenghua.com/post/6783/
- 轉(zhuǎn)載請注明:轉(zhuǎn)載必須在正文中標(biāo)注并保留原文鏈接
- 掃碼: 掃上方二維碼獲取免費(fèi)視頻資料
查 看2022高級編程視頻教程免費(fèi)獲取