編程學(xué)習(xí)網(wǎng) > PHP技術(shù) > swoole > Swoole RPC 的實(shí)現(xiàn)
2019
12-07

Swoole RPC 的實(shí)現(xiàn)

Swoole RPC 的實(shí)現(xiàn)

文章實(shí)現(xiàn)了一個簡單的 RPC 遠(yuǎn)程調(diào)用,在實(shí)現(xiàn)之前需要先了解什么是 RPC,不清楚的可以看下之前發(fā)的這篇文章 《我眼中的 RPC》。


下面的演示代碼主要使用了 Swoole 的 Task 任務(wù)池,通過 OnRequest/OnReceive 獲得信息交給 Task 去處理。

舉個工作中的例子吧,在電商系統(tǒng)中的兩個模塊,個人中心模塊和訂單管理模塊,這兩個模塊是獨(dú)立部署的,可能不在一個機(jī)房,可能不是一個域名,現(xiàn)在個人中心需要通過 用戶ID 和 訂單類型 獲取訂單數(shù)據(jù)。

實(shí)現(xiàn)效果
客戶端

HTTP 請求


//代碼片段
<?php
$demo = [
'type' => 'SW',
'token' => 'Bb1R3YLipbkTp5p0',
'param' => [
'class' => 'Order',
'method' => 'get_list',
'param' => [
'uid' => 1,
'type' => 2,
],
],
];
$ch = curl_init();
$options = [
CURLOPT_URL => 'http://10.211.55.4:9509/',
CURLOPT_POST => 1,
CURLOPT_POSTFIELDS => json_encode($demo),
];
curl_setopt_array($ch, $options);
curl_exec($ch);
curl_close($ch);
TCP 請求


//代碼片段
$demo = [
'type' => 'SW',
'token' => 'Bb1R3YLipbkTp5p0',
'param' => [
'class' => 'Order',
'method' => 'get_list',
'param' => [
'uid' => 1,
'type' => 2,
],
],
];
$this->client->send(json_encode($demo));




請求方式
SW 單個請求,等待結(jié)果

發(fā)出請求后,分配給 Task ,并等待 Task 執(zhí)行完成后,再返回。

SN 單個請求,不等待結(jié)果

發(fā)出請求后,分配給 Task 之后,就直接返回。

發(fā)送數(shù)據(jù)

$demo = [

'type' => 'SW',

'token' => 'Bb1R3YLipbkTp5p0',

'param' => [

'class' => 'Order',

'method' => 'get_list',

'param' => [

'uid' => 1,

'type' => 2,

],

],

];

 

 

type 同步/異步設(shè)置

token 可進(jìn)行權(quán)限驗(yàn)證

class 請求的類名

method 請求的方法名

uid 參數(shù)一

type 參數(shù)二

返回?cái)?shù)據(jù)


request_method 請求方式

request_time 請求開始時間

response_time 請求結(jié)束時間

code 標(biāo)識

msg 標(biāo)識值

data 約定數(shù)據(jù)

query 請求參數(shù)

代碼
OnRequest.php

<?php

if (!defined('SERVER_PATH')) exit("No Access");

class OnRequest
{
private static $query;
private static $code;
private static $msg;
private static $data;

public static function run($serv, $request, $response)
{
try {
$data = decrypt($request->rawContent());
self::$query = $data;
if (empty($data)) {
self::$code = '-1';
self::$msg = '非法請求';
self::end($request, $response);
}

//TODO 驗(yàn)證Token

switch ($data['type']) {
case 'SW': //單個請求,等待結(jié)果
$task = [
'request' => $data,
'server' => 'http'
];
$rs = $serv->task(json_encode($task), -1, function ($serv, $task_id, $rs_data) use ($request, $response) {
self::$code = '1';
self::$msg = '成功';
self::$data = $rs_data['response'];
self::end($request, $response);
});
if ($rs === false) {
self::$code = '-1';
self::$msg = '失敗';
self::end($request, $response);
}
break;

case 'SN': //單個請求,不等待結(jié)果
$task = [
'request' => $data,
'server' => 'http'
];
$rs = $serv->task(json_encode($task));
if ($rs === false) {
self::$code = '-1';
self::$msg = '失敗';
} else {
self::$code = '1';
self::$msg = '成功';
}
self::end($request, $response);
break;
default:
self::$code = '-1';
self::$msg = '非法請求';
self::end($request, $response);
}
} catch(Exception $e) {
}
}

private static function end($request = null, $response = null)
{
$rs['request_method'] = $request->server['request_method'];
$rs['request_time'] = $request->server['request_time'];
$rs['response_time'] = time();
$rs['code'] = self::$code;
$rs['msg'] = self::$msg;
$rs['data'] = self::$data;
$rs['query'] = self::$query;
$response->end(json_encode($rs));
self::$data = [];
return;
}
}
OnReceive.php




<?php
if (!defined('SERVER_PATH')) exit("No Access");
class OnReceive
{
private static $request_time;
private static $query;
private static $code;
private static $msg;
private static $data;
public static function run($serv, $fd, $reactor_id, $data)
{
try {
self::$request_time = time();
$data = decrypt($data);
self::$query = $data;
//TODO 驗(yàn)證Token

switch ($data['type']) {
case 'SW': //單個請求,等待結(jié)果
$task = [
'fd' => $fd,
'request' => $data,
'server' => 'tcp',
'request_time' => self::$request_time,
];
$rs = $serv->task(json_encode($task));
if ($rs === false) {
self::$code = '-1';
self::$msg = '失敗';
self::handlerTask($serv, $fd);
}
break;
case 'SN': //單個請求,不等待結(jié)果
$task = [
'fd' => $fd,
'request' => $data,
'server' => 'tcp',
'request_time' => self::$request_time,
];
$rs = $serv->task(json_encode($task));
if ($rs === false) {
self::$code = '-1';
self::$msg = '失敗';
} else {
self::$code = '1';
self::$msg = '成功';
}
self::handlerTask($serv, $fd);
break;
default:
self::$code = '-1';
self::$msg = '非法請求';
self::handlerTask($serv, $fd);
}
} catch(Exception $e) {
}
}
private static function handlerTask($serv, $fd)
{
$rs['request_method'] = 'TCP';
$rs['request_time'] = self::$request_time;
$rs['response_time'] = time();
$rs['code'] = self::$code;
$rs['msg'] = self::$msg;
$rs['data'] = self::$data;
$rs['query'] = self::$query;
$serv->send($fd, json_encode($rs));
}
}

小結(jié)
Demo 代碼僅供參考,里面有很多不嚴(yán)謹(jǐn)?shù)牡胤剑?br />
服務(wù)的調(diào)用方與提供方中間需要有一個服務(wù)注冊中心,很顯然上面的代碼中沒有,需要自己去實(shí)現(xiàn)。

服務(wù)注冊中心,負(fù)責(zé)管理 IP、Port 信息,提供給調(diào)用方使用,還要能負(fù)載均衡和故障切換。

根據(jù)自己的情況,服務(wù)注冊中心實(shí)現(xiàn)可容易可復(fù)雜,用 Redis 也行,用 Zookeeper、Consul 也行。

感興趣的也可以了解下網(wǎng)關(guān) Kong ,包括 身份認(rèn)證、權(quán)限認(rèn)證、流量控制、監(jiān)控預(yù)警...

再推薦一個 Swoole RPC 框架 Hprose,支持多語言。


掃碼二維碼 獲取免費(fèi)視頻學(xué)習(xí)資料

Python編程學(xué)習(xí)

查 看2022高級編程視頻教程免費(fèi)獲取