First Steps With Luigi Data Pipelines

luigi

Luigi makes managing your data pipelines easy and fun.

Are you a data scientist or engineer that deals with large amounts of data in your daily work? Do you have an unmanageable mess of cron jobs and scripts to run queries and ETL tasks? Does dependency management make your head spin?

Enter Luigi. Named after the famous character, Luigi is a Python utility that allows you to manage and modularize data pipelines in a consistent fashion. Luigi was originally created by Spotify to handle their data processing needs (billions of log messages and terabytes of files per day). They later released into the world as an open source project. Any service that has python bindings (from SQL to MongoDB to Hadoop to Spark to graphing and analysis in Pandas) can be tied together as a Task in Luigi.

To install, simply run pip install luigi and it should install the necessary dependencies. You can also find the latest source at Luigi and build it yourself.

How does it work?

Building a data pipeline in Luigi is similar to creating a makefile. Actions or steps of your process are contained in Tasks, and each Task has a few simple methods to tell Luigi what to do with it.

  • run()
    • This tells Luigi what you want the task to do when it is run. This could be something like submitting a Map-Reduce job, querying a database, or running an external script in Python. The main logic of the task goes here. Since Luigi is highly modular, we break up our work into chunks that we can move around, edit, and maintain later. This way, if one part of the process goes down (say, if the database is unreachable), then it won’t lose the progress on other parts of the pipeline and you can pinpoint what exactly failed.
  • output()
    • This describes the method of output of your Task. It could be writing to a file, HDFS, or simply updating a database.
  • requires()
    • This function defines the dependencies of your task. Maybe you need to wait for something else to run first. Maybe you need to make sure that a certain dataset is updated before your task runs. The requires() function is what ties your tasks together in Luigi and adds a sense of chronology to the execution.

Your First Task

Let’s start with a simple Task and then we’ll grow it out to be a more complex pipeline.

import luigi

class MyFirstTask(luigi.Task): # inherit from Luigi base class Task

    def run(self):
        with self.output().open('w') as f:
            f.write("This is my first task.\n")
            
    def output(self):
        # tell luigi where to ouptut to
        return luigi.LocalTarget('MyTask.txt')

if __name__ == '__main__':

    # since we are setting MySecondTask to be the main task,
    # it will check for the requirements first, then run
    luigi.run(["--local-scheduler"], main_task_cls=MyFirstTask)

https://gist.github.com/nelsonam/3a17d1f540a749dbff89

All right, so this does the very simple task of printing a line to a file. But that’s easy. Let’s add in some dependencies. Run this at the command line by typing python mytask.py. This is a very simple example just to give you the structure. Let’s add some dependencies.

Add a new task to your file:

class MySecondTask(luigi.Task):
    # MyFirstTask() needs to run first
    def requires(self):
        return MyFirstTask()

    def run(self):
        # here we're going to count the words in the first file
        # and output it to a second file
        numwords = 0
        # once MyFirstTask() has run, this file has the info we need
        # it could just as easily be database output in here
            for line in f:
                words = line.split(None) # this splits on whitespace of any length
                numwords += len(words)

        # print our results
        with open('words.txt', 'w') as out:
            out.write(str(numwords)+"\n")

    def output(self):
        return luigi.LocalTarget('words.txt')

https://gist.github.com/nelsonam/5dc77ffcdfa5556d47a8

As you can see here, we utilize the method to define a dependency. This means that when you run the second method, MySecondTask(), it will check to see if an output exists for MyFirstTask(). If it’s not there, it will run the task under requires() before proceeding. Additionally, if the dependent task fails for any reason, an error email will be sent to the address of your choosing and the workflow will start up where you left off the next time you run it.

The Central Scheduler

All right, let’s take this online! The Central Scheduler is a Luigi daemon that runs on your server. To set it up, run luigid.

The official docs go into more detailed documentation about configuring the daemon here.

When you go to http://localhost:8082(or whatever the ip of your server is), then you should see Luigi running like this:

Let’s run our tasks and see what happens. Run python mytask.py again while you have luigid running. Make sure you change the luigi.run statement so that it no longer includes ['--local-scheduler']. This will tell it to go to the central scheduler on our server.

After it runs, you should see this:

task2

One of the best features of the central scheduler is that it provides this visualization of running tasks. It shows tasks in queue, tasks running, tasks failed, and where all the dependencies point. Our example is small, but as your workflows grow more complex, this is a very useful tool indeed.

Extensions

Luigi isn’t just for straight Python, either! Spotify uses Luigi to run thousands of Hadoop jobs per day, and has built in other extensions since then as well.

According to the official Luigi docs, you can now implement tasks that talk to all kinds of data technologies big and small. Just a few things you can do once you get the hang of the basics are submit Hadoop jobs, interface with mySQL, and store data in Redis.

Best of all, Luigi is open source and contributions are welcomed. Go check it out today!

 

Pretty Print JSON From The Command Line

JSON can come in all shapes and sizes, and sometimes you want to see it in a structured format that’s easier on the eyes. This is called pretty printing. But how do you accomplish that, especially if you have a really large JSON file? While there are some converter tools online to show json files in a pretty format, they can get very slow and even freeze if your file is too large. Let’s do it locally.

Requirements: Have Python installed, and a terminal environment.

cat file.json | python -m json.tool > prettyfile.json

And that’s all there is to it! Python comes built in with a JSON encoding/decoding library, and you can use it to your advantage to get nice formatted output. Alternatively, if you are receiving JSON from an API or HTTP request, you can pipe your results from a curl call directly into this tool as well.

Hope this helps!

Chopsticks Game – A Combinatorial Challenge

point-finger2trans

So I don’t know if anyone else is familiar with this game, but I just remember playing it with friends in middle school and it occurred to me the other day that it would be an interesting game to analyze combinatorially, and perhaps write a game playing algorithm for. This game can be found in more detail here: http://en.wikipedia.org/wiki/Chopsticks_(hand_game)

Players: 2+.

Rules of Play: Players each begin with two “piles” of points, and each pile has 1 point to begin with. We used fingers to represent this, one finger on each hand.

On each turn, a player can choose to do one of two things:

  1. Send points from one of the player’s pile to one of the opponents piles. So if Player 1 wanted to send 1 point to Player 2’s left pile, then Player 2 would have 2 points in the left pile and Player 1 would still have 1 point in his left pile. Player 1 does not lose points, they are simply “cloned” over to the opponents pile.
  2. If the player has an even number of points in one pile and zero points in the other pile, the player may elect to split his points evenly between the two piles. This consumes the player’s turn. Example: if Player 1 has (0     4) then he can use his turn to split his points, giving him (2     2).

If a player gets exactly 5 points in either pile, that pile loses all of its points and reverts to 0. However, if points applied goes over 5 (such as adding 2 points to a 4 point pile), then the remainder of points are added. (meaning that 4 + 2 = 1). The opponent simply gets points mod 5.

If a player gets to 0 points in both their piles, then they lose. The last person that has points remaining wins.
=========================
Okay, so let’s break this down. Here’s an example game for those of you that are more visually oriented (follow the turns by reading left to right, moves are marked with red arrows):

Okay, so let’s point out a few things about this game.

  • On turn 4, player 2 adds 3 points to player 1’s 2 points, making 5. The rules state that any pile with exactly 5 points reverts to zero.
  • On turn 7, player 2 adds 3 points to 3 points. 3+3=6 as we all know, but 6 \equiv 1 \mod 5, so player 1 now has one point in his pile.
  • On turn 13, player 2 decides to split his points, turning his one pile of 4 into two piles of 2. This consumes his turn.
  • On turn 15, player 2 adds 2 points to player 1’s 3, thus reverting his pile to zero. Player 1 now has no more points to play with, so player 2 wins.

We can think about this game as a combinatorial problem. What are the optimal positions to play? How would one program a computer to play this game? I plan to create an interactive web game where players can try this for themselves.

The Collatz Conjecture and Hailstone Sequences: Deceptively Simple, Devilishly Difficult

Here is a very simple math problem:

If a number is even, divide the number by two. If a number is odd, multiply the number by three, and then add one. Do this over and over with your results. Will you always get down to the value 1 no matter what number you choose?

Go ahead and test this out for yourself. Plug a few numbers in. Try it out. I’ll wait.

Back? Good. Here’s my example: I start with the number 12.

12 is even, so I divide it by 2.

12/2=6.

6/2=3.

3 is odd, so we multiply it by 3 then add 1.

(3*3)+1=10.

10/2=5.

(5*3)+1=16.

16/2=8.

8/2=4.

4/2=2.

2/2=1.

And we have arrived at one, just like we thought we would. But is this ALWAYS the case, for ANY number I could possibly dream of?

This may sound easy, but in fact it is an unsolved mathematics problem, with some of the best minds in the field doing research on it. It’s easy enough to check low values, but numbers are infinite, and can get very, very, very large. How do we KNOW that it works for every single number that exists?

That is what is so difficult about this problem. While every single number we have checked (which is a LOT) ends up at 1 sooner or later (some numbers can bounce around for a very long time, hence why they are called “hailstone sequences”), we still have no method to prove that it works for every number. Mathematicians have been looking for some method to predict this, but despite getting into some pretty heavy mathematics in an attempt to attack this problem, we still do not know for sure.

This problem is known as the Collatz Conjecture and is very interesting to mathematicians young and old because it is so easy to explain and play with, yet so tough to exhaustively prove. What do you think? Will this problem ever be solved? And what would be the implications if it was?

Here is some simple Python code that can display the cycles for any number you type in:

#!/usr/bin/python
num = int(input("Enter a number: "))
while num!=1:
if num%2 == 0:
num = num/2
else:
num = (num*3)+1
    print num

A Genetic Algorithm for Computing Ramsey Numbers: Update

Friends_strangers_graph

All the 78 possible friends-strangers graphs with 6 nodes. For each graph the red/blue nodes shows a sample triplet of mutual friends/strangers.

In my last post on this topic, I discussed how I was working on a genetic algorithm to search mathematical graphs for elusive properties called Ramsey Numbers. (For a refresher on genetic algorithms,  visit here, and for a refresher on Ramsey Numbers, visit here). I’ve been doing some work on it since then (check out the code here), and I thought I would describe some improvements and further progress I’ve made in this area.

New features:

  • colorings dumped to a file at the end of each run
  • ability to load in data sets from file, further refining of the data than starting from scratch each time

The next problem I ran up against while working through this was that even if I am able to load in previously analyzed data, I still only have one fitness function that checks a static set of edges. As I see it, there are two ways to solve this:

  • Make the current fitness function dynamic; that is, it tests a different set of edges every time. However, this is counterproductive to the purpose of the program “eliminating” certain sets of edges in each “round”. However, this would be easier to maintain than the other option, which is
  • Make a “FitnessHandler” method that takes in a value for which method to run, and uses that to determine what set of edges to test. However, this would lead to a lot of extra code and overhead. I’m thinking having a static variable at the beginning of each run with what “fitness method” to start on, so that it doesn’t have to start on round one each time.

I haven’t fully decided which of these I will go with. I feel like the second one fills my purpose of methodically “weeding out” the improbable graphs, but its going to be a lot of extra work. Oh well, nothing worthwhile ever came easy…

Leave a note here or on my github if you have suggestions!

Twitter FriendCloud

Today I’m going to talk about a personal project I’ve been working on recently. I was trying to come up with some way to make a cool project with natural language processing and I had also noticed that with the rise of social networks, there is a treasure trove of data out there waiting to be analyzed. I’m a fairly active user of Twitter, and its a fun way to get short snippets of info from people or topics you’re interested in. I know personally, I have found a lot of math, science, and tech people that have twitter accounts and post about their work or the latest news in the field. I find just on a short inspection that the people I follow tend to fall into certain “groups”:

  • math
  • computer science
  • general science
  • academia
  • authors
  • tech bloggers
  • feminism
  • various political and social activist figures (civil rights, digital privacy, etc)

This list gives a pretty good insight into the things I am most interested in and like hearing about on my twitter timeline. Now I thought to myself, what if I could automate this process and analyze any user’s timeline to find out what “groups” their friends fell into as well? I had my project.

Currently in the beginning stages, I decided to tentatively call my project “FriendCloud” and registered with the API to start messing around. I’m using Python-Twitter to interact with the twitter API, and its helping me to get practice with Python at the same time. The first thing I wanted to do was be able to pull down a list of all the people that I follow. Since I follow a little over 1000 people, this proved to be a daunting task with the rate limiting that Twitter has built in to their API. At the moment, what I had to do was get my script to pull as many user objects as possible until the rate limit ran out, then put the program to sleep for 15 minutes until the rates refreshed and I could download more.

It took a little over an hour to get the list of all my friends and I am trying to look into a way to do this quicker in the future. After that, I can go through users and pull down a selection of their tweets. After this is done, I have a corpus of text that I can analyze. I have been using NLTK (a Python NLP toolkit) to pick out some of the most common keywords and themes. There is a lot of extraneous data to deal with, but as I pare it down I’ve noticed some interesting trends just in my own tweets.

I hope in the future to be able to extend this to the people I follow on twitter and be able to place them into rough “groups” based on their most commonly tweeted keywords (similar to how a word cloud works). In this way, a user can get an at a glance look at what topics the person is most likely interested in and what sort of people they may be likely to follow in the future.