First Steps With Luigi Data Pipelines

luigi

Luigi makes managing your data pipelines easy and fun.

Are you a data scientist or engineer that deals with large amounts of data in your daily work? Do you have an unmanageable mess of cron jobs and scripts to run queries and ETL tasks? Does dependency management make your head spin?

Enter Luigi. Named after the famous character, Luigi is a Python utility that allows you to manage and modularize data pipelines in a consistent fashion. Luigi was originally created by Spotify to handle their data processing needs (billions of log messages and terabytes of files per day). They later released into the world as an open source project. Any service that has python bindings (from SQL to MongoDB to Hadoop to Spark to graphing and analysis in Pandas) can be tied together as a Task in Luigi.

To install, simply run pip install luigi and it should install the necessary dependencies. You can also find the latest source at Luigi and build it yourself.

How does it work?

Building a data pipeline in Luigi is similar to creating a makefile. Actions or steps of your process are contained in Tasks, and each Task has a few simple methods to tell Luigi what to do with it.

  • run()
    • This tells Luigi what you want the task to do when it is run. This could be something like submitting a Map-Reduce job, querying a database, or running an external script in Python. The main logic of the task goes here. Since Luigi is highly modular, we break up our work into chunks that we can move around, edit, and maintain later. This way, if one part of the process goes down (say, if the database is unreachable), then it won’t lose the progress on other parts of the pipeline and you can pinpoint what exactly failed.
  • output()
    • This describes the method of output of your Task. It could be writing to a file, HDFS, or simply updating a database.
  • requires()
    • This function defines the dependencies of your task. Maybe you need to wait for something else to run first. Maybe you need to make sure that a certain dataset is updated before your task runs. The requires() function is what ties your tasks together in Luigi and adds a sense of chronology to the execution.

Your First Task

Let’s start with a simple Task and then we’ll grow it out to be a more complex pipeline.

import luigi

class MyFirstTask(luigi.Task): # inherit from Luigi base class Task

    def run(self):
        with self.output().open('w') as f:
            f.write("This is my first task.\n")
            
    def output(self):
        # tell luigi where to ouptut to
        return luigi.LocalTarget('MyTask.txt')

if __name__ == '__main__':

    # since we are setting MySecondTask to be the main task,
    # it will check for the requirements first, then run
    luigi.run(["--local-scheduler"], main_task_cls=MyFirstTask)

https://gist.github.com/nelsonam/3a17d1f540a749dbff89

All right, so this does the very simple task of printing a line to a file. But that’s easy. Let’s add in some dependencies. Run this at the command line by typing python mytask.py. This is a very simple example just to give you the structure. Let’s add some dependencies.

Add a new task to your file:

class MySecondTask(luigi.Task):
    # MyFirstTask() needs to run first
    def requires(self):
        return MyFirstTask()

    def run(self):
        # here we're going to count the words in the first file
        # and output it to a second file
        numwords = 0
        # once MyFirstTask() has run, this file has the info we need
        # it could just as easily be database output in here
            for line in f:
                words = line.split(None) # this splits on whitespace of any length
                numwords += len(words)

        # print our results
        with open('words.txt', 'w') as out:
            out.write(str(numwords)+"\n")

    def output(self):
        return luigi.LocalTarget('words.txt')

https://gist.github.com/nelsonam/5dc77ffcdfa5556d47a8

As you can see here, we utilize the method to define a dependency. This means that when you run the second method, MySecondTask(), it will check to see if an output exists for MyFirstTask(). If it’s not there, it will run the task under requires() before proceeding. Additionally, if the dependent task fails for any reason, an error email will be sent to the address of your choosing and the workflow will start up where you left off the next time you run it.

The Central Scheduler

All right, let’s take this online! The Central Scheduler is a Luigi daemon that runs on your server. To set it up, run luigid.

The official docs go into more detailed documentation about configuring the daemon here.

When you go to http://localhost:8082(or whatever the ip of your server is), then you should see Luigi running like this:

Let’s run our tasks and see what happens. Run python mytask.py again while you have luigid running. Make sure you change the luigi.run statement so that it no longer includes ['--local-scheduler']. This will tell it to go to the central scheduler on our server.

After it runs, you should see this:

task2

One of the best features of the central scheduler is that it provides this visualization of running tasks. It shows tasks in queue, tasks running, tasks failed, and where all the dependencies point. Our example is small, but as your workflows grow more complex, this is a very useful tool indeed.

Extensions

Luigi isn’t just for straight Python, either! Spotify uses Luigi to run thousands of Hadoop jobs per day, and has built in other extensions since then as well.

According to the official Luigi docs, you can now implement tasks that talk to all kinds of data technologies big and small. Just a few things you can do once you get the hang of the basics are submit Hadoop jobs, interface with mySQL, and store data in Redis.

Best of all, Luigi is open source and contributions are welcomed. Go check it out today!