用Rust流gRPC

最近,我一直在进行一个附带项目,该项目涉及汇总媒体库的索引,并允许一个库将文件“借给”另一个。 例如,如果我想让我的住宅塔楼在上飞机之前将文件“借出”到笔记本电脑上,我可能会使用此系统。 我很快将这个项目的正在进行中的工作提交给github(只是想先让MVP工作并进行一些重构),但是我想我会退后一会儿并分享我的想法。该项目的方面: gRPC。 特别是: gRPC

如果您不熟悉gRPC流传输,那么可以说HTTP / 2(支持gRPC)的全双工性质允许您同时通过连接发送和接收消息流。 这具有启用各种新服务通信模式的潜力。

为gRPC客户端和服务器构建应用程序代码时,我要做的第一件事就是弄清楚要使用哪个库。 对于Rust而言,这意味着三个主要竞争者:

  • Stepan Koltsov的gRPC实施-https://github.com/stepancheg/grpc-rust
  • PingCAP的gRPC实施-https://github.com/pingcap/grpc-rs
  • Tower-gRPC -https://github.com/tower-rs/tower-grpc

我第一次接触gRPC和Rust是通过Stepan Koltsov的实现。 我发现入门非常容易,但是一旦遇到双向流这类更复杂的情况,我就会很快陷入困境。 他的库很棒,但是我觉得我需要一个库,其中需要更多的主动维护和更强大的示例。

然后,我尝试了Tower-gRPC库。 这感觉要先进得多,它在协议缓冲区库中使用了prost ,并且看起来确实做出了权衡利于性能的决策。 存储库本身警告您,API尚不稳定,尽管有可用的流示例,但我发现它们几乎不可理解。 我不是Rust专家,但我也不是新手。 我无法获得双向流样本来按我想要的方式工作。 我会说使用此库需要您自担风险。 比我自己更有经验的Rust专家可能会发现此库比我更易于使用。 Tower-gRPC用作Conduit Kubernetes服务网格的数据平面的基础。

有趣的是,Stepan Koltsov的gRPC库和PingCAP的gRPC库都使用Stepan Koltsov的协议缓冲区库。 最终,我发现PingCAP库已经足够成熟,拥有足够的维护者,并且有足够清晰的示例可供我摸索。 它并非没有那么令人费解的时刻,但是对于我而言,一个有效的应用程序比不完整的应用程序具有更高的价值。

好的,既然我在早期的实验和原型中放着一堆闷热的代码,现在该写一些实际的gRPC流代码了。

我需要在服务器端做的第一件事是保存我的数据存储并从gRPC生成的代码实现特征的结构:

  #[派生(克隆)] 
pub struct AlexandriaApiServer
哪里
T: “静态 +数据存储+发送+同步,
{
data_store:Arc ,
}

impl AlexandriaApiServer
哪里
T: “静态 +数据存储+发送+同步,
{
pub fn new (商店:T)-> AlexandriaApiServer {
AlexandriaApiServer {
data_store:弧:: 新建 (存储),
}
}
}

这是非常基本的Rust,尚未纳入gRPC。 刚刚将这个结构设置为容纳任何线程安全的东西,并实现了我的DataStore特性。 值得指出的是,我使用的是Arc而不是Arc<Mutex> 。 这是因为我的数据存储区是无状态的,并且仅与Redis建立连接,所以我不需要为互斥量保护而阻塞它。 另外,我真的不喜欢互斥锁在RPC调用堆栈中阻塞这种情况,因为这会对性能造成严重影响。

我具有流式传输的第一个功能是服务器功能get_bindings 。 在我的应用中,绑定实质上是自治代理程序之间的借出协议。 因此,如果我的塔式计算机和笔记本电脑之间存在文件绑定,那么合作代理将确保笔记本电脑上存在该文件的副本。

  fn get_bindings(& self ,ctx:RpcContext,req:BindingQuery, 
响应:ServerStreamingSink ){
info!(“处理获取绑定请求:{}”,req.get_agent_id());

匹配自我 .data_store.get_bindings(){
// TODO:按代理过滤
好的 (raw_bindings)=> {
绑定:Vec = raw_bindings
.into_iter()
.map(| binding |(binding.into(),
WriteFlags :: 默认 ()))
。收藏();
f = resp.send_all(
流:: iter_ok :: (绑定))
.map(| _ |())
.map_err(| e | log_fail(“获取绑定”,e));
ctx.spawn(f);
}
错误 (_)=> {
f = resp.fail(
RpcStatus :: (RpcStatusCode :: InternalNone ))
.map_err(| e | log_fail(“获取绑定”,e));
ctx.spawn(f);
}
}
}

首先要注意的是,响应的类型为ServerStreamingSinkBinding是我的protobuf IDL中的数据类型之一。 此流要求每个项目都是一个元组,第一个元素是项目( Binding ),第二个元素是WriteFlags ,在我的情况下,我正在使用WriteFlags::default() 。 这使我可以控制流的缓冲行为。

这段代码中的binding.into()将内部域模型绑定结构转换为protobuf生成的结构。

当数据存储操作成功时,我在接收器上ctx.spawn调用send_all 。 如果失败,我们ctx.spawn调用resp.fail 。 我仍然被这里对ctx.spawn的重复调用困扰。 尽管这两个匹配臂都产生了可能会成为未来的东西,但它们都不兼容,因此我无法将它们分配给大小值,因此无法将其减少为对ctx.spawn的单个调用。

让我们看一下调用get_bindings的客户端:

  pub fn get_bindings(& self )->结果<Vec > { 
let mut results = Vec :: :: new ();
let mut req = BindingQuery :: new ();
req.set_agent_id(“ agent007” .to_string());
让mut bindings = self .client.get_bindings(&req) 吗? ;
循环 {
f = bindings.into_future();
匹配 f.wait(){
(( Some (binding),s))=> {
绑定= s;
client_binding = self :: Binding :: from (binding);
results.push(client_binding);
}
(( ,_))=> 休息
错误 ((e,_))=> 返回 错误 (ClientError :: RPC (e)),
}
}
info!(“查询绑定成功。”);
好的 (结果)
}

BindingQuery结构也来自我的protobuf IDL。 protobufs的这种实现方式的工作方式(来自Stepan Koltsov的库),您必须创建一个新的可变对象并调用一堆setter。 这感觉非常“不耐烦”,而prost profbuf库则更多地采用了“普通旧结构”方法。 如果我可以在PingCAP更易接近且更易于生产的gRPC库下使用prost的更简单的protobuf API,我想我最终会很高兴的。

从服务器接收响应流的客户端模式是将远程调用转换为将来的调用,然后循环以耗尽该流。 如果获得Ok(Some(value), stream)则提取该值,然后为下一个循环迭代重新分配 future(这使future可以在循环中移动并避免违反借用检查器)。 如果得到Ok((None, _))那么我们知道流为空,没有错误。 最后,获得Err((e, _))表示接收流时出错。 每当您在流的接收端时,都会看到此模式。

我在很大程度上依赖于Into特质的使用来创建一个小的反腐败层,这样我的客户端和服务器上的业务逻辑就不会直接在protobuf生成的结构上进行操作,而是直接在它们所属的类型上进行操作相对领域。 这是一项额外的工作,但是我喜欢它给我带来的隔离感,并且使我的各阶层免受“流血”的困扰。 如果我使用的是prost ,其中的结构携带的protobuf专用行李较少,那么使用ACL可能不会那么积极。 您可能还注意到我仍在对某些内容进行硬编码(例如“ agent007”代理ID),这是我尚未将此项目推送到github的另一个原因。

现在让我们看一下双向流。 在此功能中,服务器正在接收(包含LibraryEntry实例的)媒体库更新流,其中包含从监视媒体库的单个代理程序上载的文件哈希,路径和文件大小,并通过确认流进行回复,指示该库条目是否已成功保留:

  fn update_library_entries( 
自我
ctx:RpcContext,
条目:RequestStream ,
响应:DuplexSink ,
){
信息!(“处理库更新”);

store = self .data_store.clone();

to_send = entry.map( move | entry | {
let mut ack = LibraryEntryAck :: from (&entry);
如果让 Ok (_)= store.put_library_entry(entry){
ack.set_saved( true );
}
(ack,WriteFlags :: 默认 ())
});

f = resp.send_all(to_send)
.map(| _ |())
.map_err(| e | log_fail(“库更新”,e));
ctx.spawn(f);
}

在这里,我必须克隆我的数据存储(因为这是一条Arc所以我实际上只是创建一个引用计数的指针,而不是克隆真正的存储),因为to_send变量是一个未来,在我的函数超出范围之前可能无法完成。 如果您不执行此克隆操作,Rust的借位检查器将用俱乐部击中您。

现在,我可以在接收器上调用send_all ,它接收一个元组 。 和以前一样,该元组是流的项目类型和WriteFlags值。 客户端返回确认流。 因为我们现在正在进行双向流传输,所以客户端将在继续发送更多库条目更新的同时开始获得确认。 就是gRPC流媒体开始令人惊奇的地方。 如果这只是单个流,则请求仍将是RequestStream ,而响应将是UnarySink ,与简单请求-答复语义中使用的类型相同。

首先,对send_all返回值的第一次map调用使我感到困惑。 语法.map(|_|())本质上意味着我们正在将任何输入值映射到一个单位或空响应中。 我们必须这样做,因为ctx.spawn将来使用单位项目类型,并且send_all返回一个元组。

这是客户端代码,显示了接收流响应的同时发送流:

  pub fn update_library(& self ,条目:Vec )-> 
结果<Vec > {
letmut接收器, mut接收器)=
自我 .client.update_library_entries() 吗? ; h = thread :: spawn( move || {
用于输入条目{
let entry: 超级 :: bemisapi :: LibraryEntry = entry.into();
sink = sink.send((entry,WriteFlags :: default ()))
。等待()
.unwrap();
}
未来::: poll_fn(|| sink.close())。wait()。unwrap();
});

让mut acks:Vec = Vec :: new ();
循环 {
匹配 receiver.into_future()。wait(){
(( Some (ack),r))=> {
acks.push(ack.into());
接收器= r;
},
(( ,_))=> 休息
错误 ((e,_))=> 返回 错误 (ClientError :: RPC (e)),
}
}

h.join()。unwrap();
println!(“ acks:{:?}”,acks);
还行 (ack)
}

我在这里要做的第一件事是创建一个新的后台线程。 在此线程中,我在接收器上调用send来传输库条目(再次以流元组形式)。 然后,我将对sink.close()未来进行轮询,这将使线程死亡。

在这一点上,我认为我可能可以将for循环重构forsend_all entries.into_iter().map(|e| (e.into(), WriteFlags::default())结果的send_all调用entries.into_iter().map(|e| (e.into(), WriteFlags::default())还没有尝试过

接下来,我设置了一个接收器循环(这看起来很像服务器自己的接收器循环),将接收到的acks添加到acks向量中。 我们通过在发送线程上调用join来等待一切完成。

一旦超越了构建“ hello world”示例的范围,我就需要一些可以使我构建坚如磐石,稳定,可靠的gRPC服务器和客户端的工具。 无论我使用什么库,都需要快速且实用,但还必须要符合人体工程学,这样我在使用该库时编写的代码将很容易地从6个月后开始阅读,并且对项目的其他贡献者也很容易了解有限的github存储库。

我尚未编写此部分,但是我将编写一些函数,这些函数使我可以将整个文件从客户端流传输到服务器(例如,从塔式计算机传输到笔记本电脑)。 现在,我已经弄清楚了构建流服务器和客户端的一些模式,我相信我可以继续前进并完成项目的这一部分。

在理想的世界中,我将能够在PingCAP grpcio库的强大功能和易用性的帮助下,从prost库中获得更简单的protobuf结构,但是现在我对折衷方案感到满意,并将继续使用PingCAP gRPC库。 您的工作量可能会有所不同,因此,我绝对鼓励您尝试每种替代方法,而不仅仅是查看其存储库中的示例。 除非您实际上是在构建RouteGuide示例,否则应该浏览并查看您的域与这些库生成的代码的匹配程度。 我在每个库中苦苦挣扎所获得的知识是可以在我的项目的其余部分中继续发展的价值。