Luigi:Spark作业的工作流管理系统

Luigi是一个工作流管理系统,可以有效地启动一组任务,并在它们之间定义依赖关系。 这是Spotify®开发的基于Python的API,用于构建和执行Hadoop作业的管道,但是它也可以用于创建使用R或Scala或Spark编写的任何外部作业的工作流。

它提供的一些很酷的功能包括:工作流定义,失败处理,常见事件处理,任务跟踪,常规任务和Spark作业的平滑集成

以下是Luigi的工作原理的概述,我们将深入研究这些部件。

1.安装luigi

由于luigi是用Python编写的,因此您可以使用pip进行安装:

 点安装路易吉 

2.定义路易吉任务

这是在文件luigi_example.py中定义的示例任务,该任务检查文件是否存在:

6.启动Spark作业

luigi最有趣的功能之一是能够将Spark作业作为luigi任务启动。 Luigi API提供了一个PySparkTask类,可以将其扩展以编写自定义Spark作业。 下面是一个这样的任务的示例,其中我们尝试将文件从常规文件系统复制到HDFS。

在启动这些任务之前,我们需要将Hadoop和Spark库的路径添加到luigi.cfg配置文件:

 [email] 
receiver:receiver_email_address
sender:sender_email_address
 [smtp] 
host:your_company_smtp_server
  [核心] 
default-scheduler-host:name_of_your_luigi_server
python-home-dir:$PYTHON_HOME
 [spark] 
spark-submit:$SPARK_HOME/bin/spark-submit
hadoop-conf-dir:$HADOOP_HOME
yarn-conf-dir:$YARN_HOME
master:yarn
num-executors:10

您必须知道$PYTHON_HOME$HADOOP_HOME$SPARK_HOME$YARN_HOME才能启动基于Spark的任务。 您现在可以按以下方式启动Spark任务:

 PYTHONPATH='$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.1-src.zip:.' luigi --module luigi_example MoveFileTask --input-filepath test_data.csv --output-filepath test_data_hdfs