Luigi基础概念
基础模块
想要构建一个基本的Luigi工作流, 需要创建Task和Target类, 还有Parameter类.
使用这些类来定义任务的好处是在代码里定义依赖, 而不是使用DSL.
Target
Target是Task output返回的结果. Target类对应磁盘上的一个文件, HDFS上的一个文件或者某种checkpoint(比如数据库的条目). 理论上只需要实现exists方法,用于返回文件是否存在就可以实现这个类. Target有多个子类:LocalTarget, HdfsTarget, S3Target, ssh.RemoteTarget, ftp.RemoteTarget, mysqldb.MysqlTarget, redshift.RedshiftTarget, 所以基本上不需要自己subclass.Target类是对文件的映射, 如果只有一个target支持原子性操作, 也支持open()和Gzip. 多个targets需要用户保持文件的原子性操作.
Task
Task是实际做任务的地方. 通过run(), output(), requires()设置任务的行为. Task通过其它Task产生的Targets作为输入, 结果产生也是Target.
任务之间可以通过requires()指定依赖.
每个任务通过output()指定输出, input()指定输入.
requires()
返回本task需要的其它tasks, 可以是task对象或封装的dicts, lists, tuples.如果需要依赖外部task, 那么可以封装
ExternalTask, 然后把这个task作为当前task的requires
1  | class LogFiles(luigi.ExternalTask):  | 
- run()
 
run()函数是实际的任务运行地方, 如果有requires那么就会先解决依赖, 然后跑run的逻辑. input()会把requires的输出封装成targets, 用作run()的输入.
1  | class TaskWithManyInputs(luigi.Task):  | 
- task的事件和回调
 
luigi有事件系统能够注册事件回调, 然后使用自定义的task触发任务.
1  | 
  | 
Parameter
Parameter可以给每个task增加参数, 用于定制化一些额外信息.
- 使用@inherits, @requires来传递多个task直接的参数, 考虑如下问题:
 
1  | class TaskA(luigi.ExternalTask):  | 
对上述代码,下游的task将会需要写上所有上游需要的参数, 这样就会产生参数爆炸, 如果想要简化参数, 可以是使用@inherits和requires
1  | import luigi  | 
Luigi的模式
Luigi没有中间文件的概念, 所以如果两个依赖的任务运行一半失败, 中间结果将会被保留.
如何触发多个任务
在每个不相关的任务链的结尾加一个相同的dummy task, 这样只需要触发这个任务就会触发多个任务, 类似make.
实际使用时, 在Luigi中使用WrapperTask来封装和唤起其它tasks就行了, 它不会有输出output.
1  | class AllReports(luigi.WrapperTask):  | 
Luigi的执行模型
luigi的执行模型很简单, 一个worker的进程执行所有tasks, 所以如果有成千上万个tasks, 扩展性将成为问题.
调度
luigi的调度由单独的luigid中心化管理, 多个worker执行run()时, 每次都会从依赖树从头向下遍历, 找到需要执行的task运行, 跳过已完成的task. 见
gif
- Title: Luigi基础概念
 - Author: Kopei
 - Created at : 2018-07-20 00:00:00
 - Updated at : 2025-08-13 18:15:58
 - Link: https://kopei.github.io/2018/07/19/python-2018-07-20-luiqi-base/
 - License: This work is licensed under CC BY-NC-SA 4.0.