First Steps With Luigi Data Pipelines

luigi

Luigi makes managing your data pipelines easy and fun.

Are you a data scientist or engineer that deals with large amounts of data in your daily work? Do you have an unmanageable mess of cron jobs and scripts to run queries and ETL tasks? Does dependency management make your head spin?

Enter Luigi. Named after the famous character, Luigi is a Python utility that allows you to manage and modularize data pipelines in a consistent fashion. Luigi was originally created by Spotify to handle their data processing needs (billions of log messages and terabytes of files per day). They later released into the world as an open source project. Any service that has python bindings (from SQL to MongoDB to Hadoop to Spark to graphing and analysis in Pandas) can be tied together as a Task in Luigi.

To install, simply run pip install luigi and it should install the necessary dependencies. You can also find the latest source at Luigi and build it yourself.

How does it work?

Building a data pipeline in Luigi is similar to creating a makefile. Actions or steps of your process are contained in Tasks, and each Task has a few simple methods to tell Luigi what to do with it.

  • run()
    • This tells Luigi what you want the task to do when it is run. This could be something like submitting a Map-Reduce job, querying a database, or running an external script in Python. The main logic of the task goes here. Since Luigi is highly modular, we break up our work into chunks that we can move around, edit, and maintain later. This way, if one part of the process goes down (say, if the database is unreachable), then it won’t lose the progress on other parts of the pipeline and you can pinpoint what exactly failed.
  • output()
    • This describes the method of output of your Task. It could be writing to a file, HDFS, or simply updating a database.
  • requires()
    • This function defines the dependencies of your task. Maybe you need to wait for something else to run first. Maybe you need to make sure that a certain dataset is updated before your task runs. The requires() function is what ties your tasks together in Luigi and adds a sense of chronology to the execution.

Your First Task

Let’s start with a simple Task and then we’ll grow it out to be a more complex pipeline.

import luigi

class MyFirstTask(luigi.Task): # inherit from Luigi base class Task

    def run(self):
        with self.output().open('w') as f:
            f.write("This is my first task.\n")
            
    def output(self):
        # tell luigi where to ouptut to
        return luigi.LocalTarget('MyTask.txt')

if __name__ == '__main__':

    # since we are setting MySecondTask to be the main task,
    # it will check for the requirements first, then run
    luigi.run(["--local-scheduler"], main_task_cls=MyFirstTask)

https://gist.github.com/nelsonam/3a17d1f540a749dbff89

All right, so this does the very simple task of printing a line to a file. But that’s easy. Let’s add in some dependencies. Run this at the command line by typing python mytask.py. This is a very simple example just to give you the structure. Let’s add some dependencies.

Add a new task to your file:

class MySecondTask(luigi.Task):
    # MyFirstTask() needs to run first
    def requires(self):
        return MyFirstTask()

    def run(self):
        # here we're going to count the words in the first file
        # and output it to a second file
        numwords = 0
        # once MyFirstTask() has run, this file has the info we need
        # it could just as easily be database output in here
            for line in f:
                words = line.split(None) # this splits on whitespace of any length
                numwords += len(words)

        # print our results
        with open('words.txt', 'w') as out:
            out.write(str(numwords)+"\n")

    def output(self):
        return luigi.LocalTarget('words.txt')

https://gist.github.com/nelsonam/5dc77ffcdfa5556d47a8

As you can see here, we utilize the method to define a dependency. This means that when you run the second method, MySecondTask(), it will check to see if an output exists for MyFirstTask(). If it’s not there, it will run the task under requires() before proceeding. Additionally, if the dependent task fails for any reason, an error email will be sent to the address of your choosing and the workflow will start up where you left off the next time you run it.

The Central Scheduler

All right, let’s take this online! The Central Scheduler is a Luigi daemon that runs on your server. To set it up, run luigid.

The official docs go into more detailed documentation about configuring the daemon here.

When you go to http://localhost:8082(or whatever the ip of your server is), then you should see Luigi running like this:

Let’s run our tasks and see what happens. Run python mytask.py again while you have luigid running. Make sure you change the luigi.run statement so that it no longer includes ['--local-scheduler']. This will tell it to go to the central scheduler on our server.

After it runs, you should see this:

task2

One of the best features of the central scheduler is that it provides this visualization of running tasks. It shows tasks in queue, tasks running, tasks failed, and where all the dependencies point. Our example is small, but as your workflows grow more complex, this is a very useful tool indeed.

Extensions

Luigi isn’t just for straight Python, either! Spotify uses Luigi to run thousands of Hadoop jobs per day, and has built in other extensions since then as well.

According to the official Luigi docs, you can now implement tasks that talk to all kinds of data technologies big and small. Just a few things you can do once you get the hang of the basics are submit Hadoop jobs, interface with mySQL, and store data in Redis.

Best of all, Luigi is open source and contributions are welcomed. Go check it out today!

 

Advertisements

A Quick Introduction to HDFS

The Hadoop filesystem is called HDFS, and today I’m going to give a short introduction to how it works for a beginner.

The Hadoop File System (HDFS) sits on top of a Hadoop cluster and facilitates the distributed storage and access of files.  When a file is stored in HDFS, it is split into chunks called “blocks”. They can be of different sizes. The blocks are scattered between the nodes. These nodes have a daemon running called a datanode. There is one node called the namenode that has metadata about the blocks and their whereabouts.

To protect against network or disk failure data is replicated in three places across the cluster. This makes the data redundant. Therefore if one datanode goes down, there are other copies of the data elsewhere. When this happens a new copy of the data is created, so that there are always three.

The namenode is even more important, because it has metadata about all the files. If there is a network issue, all of the data will be unavailable. However, if the disk on the namenode fails, the data may be lost forever, because the namenode has all the information about how the pieces of the files go together. We’d still have all the chunks on the data nodes, but we’d have no idea what file they go to.

To get around this issue, one solution is to also mount the drive on a network file system (NFS). Another way to approach this (which is a better alternative) is to have an active namenode and a standby namenode. This way, there is a “backup” if something goes wrong.

Some commands:
  • To list files on HDFS:
    • $ hadoop fs -ls
  • To put files on HDFS:
    • $ hadoop fs -put filename
    • this takes a local file and puts on HDFS
  • To display the end of a file :
    • $ hadoop fs -tail filename
  • Most bash commands will work if you put a dash in front of them
    • $ hadoop fs -cat
    • $ hadoop fs -mv
    • $ hadoop fs -mkdir
    • etc…

Chopsticks Game – A Combinatorial Challenge

point-finger2trans

So I don’t know if anyone else is familiar with this game, but I just remember playing it with friends in middle school and it occurred to me the other day that it would be an interesting game to analyze combinatorially, and perhaps write a game playing algorithm for. This game can be found in more detail here: http://en.wikipedia.org/wiki/Chopsticks_(hand_game)

Players: 2+.

Rules of Play: Players each begin with two “piles” of points, and each pile has 1 point to begin with. We used fingers to represent this, one finger on each hand.

On each turn, a player can choose to do one of two things:

  1. Send points from one of the player’s pile to one of the opponents piles. So if Player 1 wanted to send 1 point to Player 2’s left pile, then Player 2 would have 2 points in the left pile and Player 1 would still have 1 point in his left pile. Player 1 does not lose points, they are simply “cloned” over to the opponents pile.
  2. If the player has an even number of points in one pile and zero points in the other pile, the player may elect to split his points evenly between the two piles. This consumes the player’s turn. Example: if Player 1 has (0     4) then he can use his turn to split his points, giving him (2     2).

If a player gets exactly 5 points in either pile, that pile loses all of its points and reverts to 0. However, if points applied goes over 5 (such as adding 2 points to a 4 point pile), then the remainder of points are added. (meaning that 4 + 2 = 1). The opponent simply gets points mod 5.

If a player gets to 0 points in both their piles, then they lose. The last person that has points remaining wins.
=========================
Okay, so let’s break this down. Here’s an example game for those of you that are more visually oriented (follow the turns by reading left to right, moves are marked with red arrows):

Okay, so let’s point out a few things about this game.

  • On turn 4, player 2 adds 3 points to player 1’s 2 points, making 5. The rules state that any pile with exactly 5 points reverts to zero.
  • On turn 7, player 2 adds 3 points to 3 points. 3+3=6 as we all know, but 6 \equiv 1 \mod 5, so player 1 now has one point in his pile.
  • On turn 13, player 2 decides to split his points, turning his one pile of 4 into two piles of 2. This consumes his turn.
  • On turn 15, player 2 adds 2 points to player 1’s 3, thus reverting his pile to zero. Player 1 now has no more points to play with, so player 2 wins.

We can think about this game as a combinatorial problem. What are the optimal positions to play? How would one program a computer to play this game? I plan to create an interactive web game where players can try this for themselves.

The Collatz Conjecture and Hailstone Sequences: Deceptively Simple, Devilishly Difficult

Here is a very simple math problem:

If a number is even, divide the number by two. If a number is odd, multiply the number by three, and then add one. Do this over and over with your results. Will you always get down to the value 1 no matter what number you choose?

Go ahead and test this out for yourself. Plug a few numbers in. Try it out. I’ll wait.

Back? Good. Here’s my example: I start with the number 12.

12 is even, so I divide it by 2.

12/2=6.

6/2=3.

3 is odd, so we multiply it by 3 then add 1.

(3*3)+1=10.

10/2=5.

(5*3)+1=16.

16/2=8.

8/2=4.

4/2=2.

2/2=1.

And we have arrived at one, just like we thought we would. But is this ALWAYS the case, for ANY number I could possibly dream of?

This may sound easy, but in fact it is an unsolved mathematics problem, with some of the best minds in the field doing research on it. It’s easy enough to check low values, but numbers are infinite, and can get very, very, very large. How do we KNOW that it works for every single number that exists?

That is what is so difficult about this problem. While every single number we have checked (which is a LOT) ends up at 1 sooner or later (some numbers can bounce around for a very long time, hence why they are called “hailstone sequences”), we still have no method to prove that it works for every number. Mathematicians have been looking for some method to predict this, but despite getting into some pretty heavy mathematics in an attempt to attack this problem, we still do not know for sure.

This problem is known as the Collatz Conjecture and is very interesting to mathematicians young and old because it is so easy to explain and play with, yet so tough to exhaustively prove. What do you think? Will this problem ever be solved? And what would be the implications if it was?

Here is some simple Python code that can display the cycles for any number you type in:

#!/usr/bin/python
num = int(input("Enter a number: "))
while num!=1:
if num%2 == 0:
num = num/2
else:
num = (num*3)+1
    print num

A Genetic Algorithm for Computing Ramsey Numbers: Update

Friends_strangers_graph

All the 78 possible friends-strangers graphs with 6 nodes. For each graph the red/blue nodes shows a sample triplet of mutual friends/strangers.

In my last post on this topic, I discussed how I was working on a genetic algorithm to search mathematical graphs for elusive properties called Ramsey Numbers. (For a refresher on genetic algorithms,  visit here, and for a refresher on Ramsey Numbers, visit here). I’ve been doing some work on it since then (check out the code here), and I thought I would describe some improvements and further progress I’ve made in this area.

New features:

  • colorings dumped to a file at the end of each run
  • ability to load in data sets from file, further refining of the data than starting from scratch each time

The next problem I ran up against while working through this was that even if I am able to load in previously analyzed data, I still only have one fitness function that checks a static set of edges. As I see it, there are two ways to solve this:

  • Make the current fitness function dynamic; that is, it tests a different set of edges every time. However, this is counterproductive to the purpose of the program “eliminating” certain sets of edges in each “round”. However, this would be easier to maintain than the other option, which is
  • Make a “FitnessHandler” method that takes in a value for which method to run, and uses that to determine what set of edges to test. However, this would lead to a lot of extra code and overhead. I’m thinking having a static variable at the beginning of each run with what “fitness method” to start on, so that it doesn’t have to start on round one each time.

I haven’t fully decided which of these I will go with. I feel like the second one fills my purpose of methodically “weeding out” the improbable graphs, but its going to be a lot of extra work. Oh well, nothing worthwhile ever came easy…

Leave a note here or on my github if you have suggestions!