First Steps With Luigi Data Pipelines

Luigi makes managing your data pipelines easy and fun.

Are you a data scientist or engineer that deals with large amounts of data in your daily work? Do you have an unmanageable mess of cron jobs and scripts to run queries and ETL tasks? Does dependency management make your head spin?

Enter Luigi. Named after the famous character, Luigi is a Python utility that allows you to manage and modularize data pipelines in a consistent fashion. Luigi was originally created by Spotify to handle their data processing needs (billions of log messages and terabytes of files per day). They later released into the world as an open source project. Any service that has python bindings (from SQL to MongoDB to Hadoop to Spark to graphing and analysis in Pandas) can be tied together as a Task in Luigi.

To install, simply run pip install luigi and it should install the necessary dependencies. You can also find the latest source at Luigi and build it yourself.

How does it work?

Building a data pipeline in Luigi is similar to creating a makefile. Actions or steps of your process are contained in Tasks, and each Task has a few simple methods to tell Luigi what to do with it.

• run()
• This tells Luigi what you want the task to do when it is run. This could be something like submitting a Map-Reduce job, querying a database, or running an external script in Python. The main logic of the task goes here. Since Luigi is highly modular, we break up our work into chunks that we can move around, edit, and maintain later. This way, if one part of the process goes down (say, if the database is unreachable), then it won’t lose the progress on other parts of the pipeline and you can pinpoint what exactly failed.
• output()
• This describes the method of output of your Task. It could be writing to a file, HDFS, or simply updating a database.
• requires()
• This function defines the dependencies of your task. Maybe you need to wait for something else to run first. Maybe you need to make sure that a certain dataset is updated before your task runs. The requires() function is what ties your tasks together in Luigi and adds a sense of chronology to the execution.

Let’s start with a simple Task and then we’ll grow it out to be a more complex pipeline.

import luigi

def run(self):
with self.output().open('w') as f:

def output(self):
# tell luigi where to ouptut to

if __name__ == '__main__':

# it will check for the requirements first, then run


https://gist.github.com/nelsonam/3a17d1f540a749dbff89

All right, so this does the very simple task of printing a line to a file. But that’s easy. Let’s add in some dependencies. Run this at the command line by typing python mytask.py. This is a very simple example just to give you the structure. Let’s add some dependencies.

class MySecondTask(luigi.Task):
# MyFirstTask() needs to run first
def requires(self):

def run(self):
# here we're going to count the words in the first file
# and output it to a second file
numwords = 0
# once MyFirstTask() has run, this file has the info we need
# it could just as easily be database output in here
for line in f:
words = line.split(None) # this splits on whitespace of any length
numwords += len(words)

# print our results
with open('words.txt', 'w') as out:
out.write(str(numwords)+"\n")

def output(self):
return luigi.LocalTarget('words.txt')


https://gist.github.com/nelsonam/5dc77ffcdfa5556d47a8

As you can see here, we utilize the method to define a dependency. This means that when you run the second method, MySecondTask(), it will check to see if an output exists for MyFirstTask(). If it’s not there, it will run the task under requires() before proceeding. Additionally, if the dependent task fails for any reason, an error email will be sent to the address of your choosing and the workflow will start up where you left off the next time you run it.

The Central Scheduler

All right, let’s take this online! The Central Scheduler is a Luigi daemon that runs on your server. To set it up, run luigid.

The official docs go into more detailed documentation about configuring the daemon here.

When you go to http://localhost:8082(or whatever the ip of your server is), then you should see Luigi running like this:

Let’s run our tasks and see what happens. Run python mytask.py again while you have luigid running. Make sure you change the luigi.run statement so that it no longer includes ['--local-scheduler']. This will tell it to go to the central scheduler on our server.

After it runs, you should see this:

One of the best features of the central scheduler is that it provides this visualization of running tasks. It shows tasks in queue, tasks running, tasks failed, and where all the dependencies point. Our example is small, but as your workflows grow more complex, this is a very useful tool indeed.

Extensions

Luigi isn’t just for straight Python, either! Spotify uses Luigi to run thousands of Hadoop jobs per day, and has built in other extensions since then as well.

According to the official Luigi docs, you can now implement tasks that talk to all kinds of data technologies big and small. Just a few things you can do once you get the hang of the basics are submit Hadoop jobs, interface with mySQL, and store data in Redis.

Best of all, Luigi is open source and contributions are welcomed. Go check it out today!

Pretty Print JSON From The Command Line

JSON can come in all shapes and sizes, and sometimes you want to see it in a structured format that’s easier on the eyes. This is called pretty printing. But how do you accomplish that, especially if you have a really large JSON file? While there are some converter tools online to show json files in a pretty format, they can get very slow and even freeze if your file is too large. Let’s do it locally.

Requirements: Have Python installed, and a terminal environment.

cat file.json | python -m json.tool > prettyfile.json

And that’s all there is to it! Python comes built in with a JSON encoding/decoding library, and you can use it to your advantage to get nice formatted output. Alternatively, if you are receiving JSON from an API or HTTP request, you can pipe your results from a curl call directly into this tool as well.

Hope this helps!

A Quick Introduction to HDFS

The Hadoop filesystem is called HDFS, and today I’m going to give a short introduction to how it works for a beginner.

The Hadoop File System (HDFS) sits on top of a Hadoop cluster and facilitates the distributed storage and access of files.  When a file is stored in HDFS, it is split into chunks called “blocks”. They can be of different sizes. The blocks are scattered between the nodes. These nodes have a daemon running called a datanode. There is one node called the namenode that has metadata about the blocks and their whereabouts.

To protect against network or disk failure data is replicated in three places across the cluster. This makes the data redundant. Therefore if one datanode goes down, there are other copies of the data elsewhere. When this happens a new copy of the data is created, so that there are always three.

The namenode is even more important, because it has metadata about all the files. If there is a network issue, all of the data will be unavailable. However, if the disk on the namenode fails, the data may be lost forever, because the namenode has all the information about how the pieces of the files go together. We’d still have all the chunks on the data nodes, but we’d have no idea what file they go to.

To get around this issue, one solution is to also mount the drive on a network file system (NFS). Another way to approach this (which is a better alternative) is to have an active namenode and a standby namenode. This way, there is a “backup” if something goes wrong.

Some commands:
• To list files on HDFS:
• $hadoop fs -ls • To put files on HDFS: • $ hadoop fs -put filename
• this takes a local file and puts on HDFS
• To display the end of a file :
• $hadoop fs -tail filename • Most bash commands will work if you put a dash in front of them • $ hadoop fs -cat
• $hadoop fs -mv • $ hadoop fs -mkdir
• etc…

Happy Halloween From  The Muse Garden!

In the tradition of my Holiday Math Puzzles, I’m here with an appropriately themed puzzle for this time of year.

Candy Distribution

It’s that time of year all right. You’re out and about, trick or treating with your friends or family and when you come home, you decide to dump all the candy out on the floor to sort through it. But then, as siblings often do, you begin to bicker about who has “more” than the other. In fact, there are some candies that you really like, and some you don’t like. You’d rather have a bunch of chocolate than a bunch of peppermints, for instance.  But wait a minute! Of course you can’t like the same thing! Your sibling actually likes peppermints!

Here’s a table of the different candies you have. Each candy has a “value” to it; that is, how much you “want” it. Try to split up the candies such that you and your sibling both have as equal value as possible at the end. And no fighting!

 Candy Quantity Your Value (per piece) Sibling’s Value (per piece) Candy Corn 150 25 50 Peppermints 50 5 50 Peanut Butter Cups 10 100 75 Hershey Bars 25 50 10 Kit Kat 20 75 30

Chopsticks Game – A Combinatorial Challenge

So I don’t know if anyone else is familiar with this game, but I just remember playing it with friends in middle school and it occurred to me the other day that it would be an interesting game to analyze combinatorially, and perhaps write a game playing algorithm for. This game can be found in more detail here: http://en.wikipedia.org/wiki/Chopsticks_(hand_game)

Players: 2+.

Rules of Play: Players each begin with two “piles” of points, and each pile has 1 point to begin with. We used fingers to represent this, one finger on each hand.

On each turn, a player can choose to do one of two things:

1. Send points from one of the player’s pile to one of the opponents piles. So if Player 1 wanted to send 1 point to Player 2’s left pile, then Player 2 would have 2 points in the left pile and Player 1 would still have 1 point in his left pile. Player 1 does not lose points, they are simply “cloned” over to the opponents pile.
2. If the player has an even number of points in one pile and zero points in the other pile, the player may elect to split his points evenly between the two piles. This consumes the player’s turn. Example: if Player 1 has (0     4) then he can use his turn to split his points, giving him (2     2).

If a player gets exactly 5 points in either pile, that pile loses all of its points and reverts to 0. However, if points applied goes over 5 (such as adding 2 points to a 4 point pile), then the remainder of points are added. (meaning that 4 + 2 = 1). The opponent simply gets points mod 5.

If a player gets to 0 points in both their piles, then they lose. The last person that has points remaining wins.
$=========================$
Okay, so let’s break this down. Here’s an example game for those of you that are more visually oriented (follow the turns by reading left to right, moves are marked with red arrows):

• On turn 7, player 2 adds 3 points to 3 points. $3+3=6$ as we all know, but $6 \equiv 1 \mod 5$, so player 1 now has one point in his pile.