Netflix Conductor:工作流程的控制反转

2016年12月,我们开源了Netflix Conductor。

从那时起,我们一直在努力添加更多功能,增强用于监视工作流的用户界面并加强系统功能。 我们已经看到用法的增加,新的用例,功能请求以及社区的极大兴趣。 在本文中,我们将讨论本季度添加到Conductor的几个主要功能。

导体中的IoC

Conductor可以重用工作流程并将其作为子工作流程嵌入。 子工作流是促进流程重用和模块化的好方法。 随着Netflix Conductor的采用率增加,我们观察到了一些有趣的用例,其中基于事件和其他工作流中的状态更改启动了业务流程。

传统上,应用程序使用发布/订阅系统实现此类用例,并在应用程序状态更改并订阅感兴趣的事件以启动操​​作时发布事件。

我们寻求具有以下特征的解决方案:

  1. 工作流程之间的松耦合
  2. 允许根据其他工作流程中的状态更改启动工作流程
  3. 通过SQS / SNS与外部系统集成,以生成/使用事件

输入控制反转-用于工作流。 这个想法是能够基于其他工作流程中的状态更改来链接工作流程操作,例如启动工作流程,完成工作流程中的任务等。

为了进一步说明这一点,让我们看一下提取文件后触发“ QC工作流”的两种解决方案。 提取文件后,将触发多个工作流(由单独的应用程序拥有); 其中之一是文件验证工作流程(QC工作流程),该工作流程被触发以识别文件的类型(音频,视频等),运行适当的检查并开始文件的编码过程。

使用Conductor,有两种单独的方法可以实现此目的; 两种解决方案都可以完成工作,但是两种方法都存在细微的差异。

启动另一个工作流程作为子工作流程

假设子工作流程是任务,则其输出可以由QC工作流程之后的File Ingest工作流程中的其他任务使用,例如QC流程的结果。 因此,当业务流程之间存在紧密的依赖关系和耦合时,子工作流将非常有用。

在事件上启动工作流程

在以上方法中,“文件提取”工作流会产生“提取完成”事件。 一个或多个事件处理程序可以使用此事件来执行操作,包括启动工作流程。

  • 文件提取工作流程没有任何直接的工作流程依赖性
  • 可以基于促进松耦合的事件来触发工作流
  • QC Task工作流程的结果不会影响File Ingest工作流程
  • 每个事件可以执行多个工作流程或动作

活动任务

我们介绍了一种称为EVENT的新型任务。 可以将事件任务添加到工作流定义中。 执行时,它将在指定的“接收器”中产生一个事件。 接收器是事件系统,例如SQS,Conductor或任何其他受支持的系统。 接收器遵循可插拔的体系结构,在该体系结构中,可以通过实现所需的接口并在Conductor服务器JVM的类路径中使用该实现来添加JMS,Kafka等系统。

根据导体的任务输入/输出模型,将计算EVENT任务的输入并将其作为有效负载发送。 接收器可以是导体,也可以是外部系统,例如SQS。 支持各种类型的Sink的体系结构是基于插件的,并且可以通过实现所需的接口并将其在Conductor服务器的JVM类路径中轻松添加新类型(如JMS或Kafka)。

以下是事件任务的示例,该事件将事件发布到按名称标识的SQS队列。

  { 
“ name”:“ example_event”,
“ taskReferenceName”:“ event0”,
“输入”:{
“ filename”:“ $ {workflow.input.filename}”,
“ location”:“ $ {anothertask.output.location}”
},
“ type”:“ EVENT”,
“ sink”:“ sqs:sqs_queue_name”
}

事件处理程序

事件处理程序是在特定事件到达时执行的侦听器。 处理程序侦听各种事件源,包括Conductor和SQS / SNS,并允许用户通过API插入其他源。 导体针对每种事件类型支持多个事件处理程序。 事件处理程序使用/ event API端点与Conductor一起管理。 一个事件源可能有多个事件处理程序。

活动条件

条件是有效负载上的JavaScript表达式,对于事件处理程序执行操作,必须将其评估为true。 条件充当过滤器,以对符合条件的事件子集选择性地采取措施。 条件过滤器是可选的,如果未指定,则处理来自源的所有事件。

事件动作

每个事件处理程序都有一个或多个与之关联的动作。 当与事件关联的条件被评估为“真”时,将执行操作。 支持的操作是:

  • 开始工作流程
  • 以失败或成功完成任务

在样本事件处理程序的下方,该事件处理程序侦听SQS队列并根据条件启动工作流程

  { 
“ name”:“ conductor_event_test”,
“ event”:“ sqs:example_sqs_queue_name”,
“ condition”:“ $ .file_type =='image'”,
“动作”:[
{
“ action”:“ start_workflow”,
“ start_workflow”:{
“ name”:“ workflow_name”,
“输入”:{
“ file_name”:“ $ {filename}”,
“ file_type”:“ $ {file_type}”
}
}
}
],
“有效”:正确
}

导体UI

可以在UI中检查所有已注册的事件处理程序。 可以通过REST API或通过swagger API页面进行更新。

JSON数据转换

Conductor现在支持基于JSONPath的数据转换。 在输入配置中将JSON路径用于任务可以实现复杂的数据转换,并减少了编写一次性任务的工作,而这些任务无需进行数据转换。

https://netflix.github.io/conductor/metadata/#wiring-inputs-and-outputs

前进的道路

虽然这是一个繁忙的季度,但我们尚未完成。 展望第二季度,我们的重点将放在简化开发人员的工作流程以及支持记录任务执行上下文方面,以帮助理解和解决跨多个工作人员和应用程序的工作流程。

如果您喜欢构建分布式系统的挑战,并且对大规模构建Netflix工作室生态系统和内容管道感兴趣,请查看我们的职位空缺。

薇伦·巴拉雅(Viren Baraiya)