First Steps With Luigi Data Pipelines

luigi

Luigi makes managing your data pipelines easy and fun.

Are you a data scientist or engineer that deals with large amounts of data in your daily work? Do you have an unmanageable mess of cron jobs and scripts to run queries and ETL tasks? Does dependency management make your head spin?

Enter Luigi. Named after the famous character, Luigi is a Python utility that allows you to manage and modularize data pipelines in a consistent fashion. Luigi was originally created by Spotify to handle their data processing needs (billions of log messages and terabytes of files per day). They later released into the world as an open source project. Any service that has python bindings (from SQL to MongoDB to Hadoop to Spark to graphing and analysis in Pandas) can be tied together as a Task in Luigi.

To install, simply run pip install luigi and it should install the necessary dependencies. You can also find the latest source at Luigi and build it yourself.

How does it work?

Building a data pipeline in Luigi is similar to creating a makefile. Actions or steps of your process are contained in Tasks, and each Task has a few simple methods to tell Luigi what to do with it.

  • run()
    • This tells Luigi what you want the task to do when it is run. This could be something like submitting a Map-Reduce job, querying a database, or running an external script in Python. The main logic of the task goes here. Since Luigi is highly modular, we break up our work into chunks that we can move around, edit, and maintain later. This way, if one part of the process goes down (say, if the database is unreachable), then it won’t lose the progress on other parts of the pipeline and you can pinpoint what exactly failed.
  • output()
    • This describes the method of output of your Task. It could be writing to a file, HDFS, or simply updating a database.
  • requires()
    • This function defines the dependencies of your task. Maybe you need to wait for something else to run first. Maybe you need to make sure that a certain dataset is updated before your task runs. The requires() function is what ties your tasks together in Luigi and adds a sense of chronology to the execution.

Your First Task

Let’s start with a simple Task and then we’ll grow it out to be a more complex pipeline.

import luigi

class MyFirstTask(luigi.Task): # inherit from Luigi base class Task

    def run(self):
        with self.output().open('w') as f:
            f.write("This is my first task.\n")
            
    def output(self):
        # tell luigi where to ouptut to
        return luigi.LocalTarget('MyTask.txt')

if __name__ == '__main__':

    # since we are setting MySecondTask to be the main task,
    # it will check for the requirements first, then run
    luigi.run(["--local-scheduler"], main_task_cls=MyFirstTask)

https://gist.github.com/nelsonam/3a17d1f540a749dbff89

All right, so this does the very simple task of printing a line to a file. But that’s easy. Let’s add in some dependencies. Run this at the command line by typing python mytask.py. This is a very simple example just to give you the structure. Let’s add some dependencies.

Add a new task to your file:

class MySecondTask(luigi.Task):
    # MyFirstTask() needs to run first
    def requires(self):
        return MyFirstTask()

    def run(self):
        # here we're going to count the words in the first file
        # and output it to a second file
        numwords = 0
        # once MyFirstTask() has run, this file has the info we need
        # it could just as easily be database output in here
            for line in f:
                words = line.split(None) # this splits on whitespace of any length
                numwords += len(words)

        # print our results
        with open('words.txt', 'w') as out:
            out.write(str(numwords)+"\n")

    def output(self):
        return luigi.LocalTarget('words.txt')

https://gist.github.com/nelsonam/5dc77ffcdfa5556d47a8

As you can see here, we utilize the method to define a dependency. This means that when you run the second method, MySecondTask(), it will check to see if an output exists for MyFirstTask(). If it’s not there, it will run the task under requires() before proceeding. Additionally, if the dependent task fails for any reason, an error email will be sent to the address of your choosing and the workflow will start up where you left off the next time you run it.

The Central Scheduler

All right, let’s take this online! The Central Scheduler is a Luigi daemon that runs on your server. To set it up, run luigid.

The official docs go into more detailed documentation about configuring the daemon here.

When you go to http://localhost:8082(or whatever the ip of your server is), then you should see Luigi running like this:

Let’s run our tasks and see what happens. Run python mytask.py again while you have luigid running. Make sure you change the luigi.run statement so that it no longer includes ['--local-scheduler']. This will tell it to go to the central scheduler on our server.

After it runs, you should see this:

task2

One of the best features of the central scheduler is that it provides this visualization of running tasks. It shows tasks in queue, tasks running, tasks failed, and where all the dependencies point. Our example is small, but as your workflows grow more complex, this is a very useful tool indeed.

Extensions

Luigi isn’t just for straight Python, either! Spotify uses Luigi to run thousands of Hadoop jobs per day, and has built in other extensions since then as well.

According to the official Luigi docs, you can now implement tasks that talk to all kinds of data technologies big and small. Just a few things you can do once you get the hang of the basics are submit Hadoop jobs, interface with mySQL, and store data in Redis.

Best of all, Luigi is open source and contributions are welcomed. Go check it out today!

 

A Quick Introduction to HDFS

The Hadoop filesystem is called HDFS, and today I’m going to give a short introduction to how it works for a beginner.

The Hadoop File System (HDFS) sits on top of a Hadoop cluster and facilitates the distributed storage and access of files.  When a file is stored in HDFS, it is split into chunks called “blocks”. They can be of different sizes. The blocks are scattered between the nodes. These nodes have a daemon running called a datanode. There is one node called the namenode that has metadata about the blocks and their whereabouts.

To protect against network or disk failure data is replicated in three places across the cluster. This makes the data redundant. Therefore if one datanode goes down, there are other copies of the data elsewhere. When this happens a new copy of the data is created, so that there are always three.

The namenode is even more important, because it has metadata about all the files. If there is a network issue, all of the data will be unavailable. However, if the disk on the namenode fails, the data may be lost forever, because the namenode has all the information about how the pieces of the files go together. We’d still have all the chunks on the data nodes, but we’d have no idea what file they go to.

To get around this issue, one solution is to also mount the drive on a network file system (NFS). Another way to approach this (which is a better alternative) is to have an active namenode and a standby namenode. This way, there is a “backup” if something goes wrong.

Some commands:
  • To list files on HDFS:
    • $ hadoop fs -ls
  • To put files on HDFS:
    • $ hadoop fs -put filename
    • this takes a local file and puts on HDFS
  • To display the end of a file :
    • $ hadoop fs -tail filename
  • Most bash commands will work if you put a dash in front of them
    • $ hadoop fs -cat
    • $ hadoop fs -mv
    • $ hadoop fs -mkdir
    • etc…

A Genetic Algorithm for Computing Ramsey Numbers: Update

Friends_strangers_graph

All the 78 possible friends-strangers graphs with 6 nodes. For each graph the red/blue nodes shows a sample triplet of mutual friends/strangers.

In my last post on this topic, I discussed how I was working on a genetic algorithm to search mathematical graphs for elusive properties called Ramsey Numbers. (For a refresher on genetic algorithms,  visit here, and for a refresher on Ramsey Numbers, visit here). I’ve been doing some work on it since then (check out the code here), and I thought I would describe some improvements and further progress I’ve made in this area.

New features:

  • colorings dumped to a file at the end of each run
  • ability to load in data sets from file, further refining of the data than starting from scratch each time

The next problem I ran up against while working through this was that even if I am able to load in previously analyzed data, I still only have one fitness function that checks a static set of edges. As I see it, there are two ways to solve this:

  • Make the current fitness function dynamic; that is, it tests a different set of edges every time. However, this is counterproductive to the purpose of the program “eliminating” certain sets of edges in each “round”. However, this would be easier to maintain than the other option, which is
  • Make a “FitnessHandler” method that takes in a value for which method to run, and uses that to determine what set of edges to test. However, this would lead to a lot of extra code and overhead. I’m thinking having a static variable at the beginning of each run with what “fitness method” to start on, so that it doesn’t have to start on round one each time.

I haven’t fully decided which of these I will go with. I feel like the second one fills my purpose of methodically “weeding out” the improbable graphs, but its going to be a lot of extra work. Oh well, nothing worthwhile ever came easy…

Leave a note here or on my github if you have suggestions!

Introduction to P vs. NP

800px-Jigsaw_puzzle_01_by_Scouten

Its a Millennium Problem (reward $1,000,000) and the subject of a movie. But what is this mysterious problem, and what makes it so hard?

Creation vs. Verification

Pop quiz: Choose which one of these tasks takes longer.

  • A) someone hands you a jigsaw puzzle and asks you if it is finished or not
  • B) someone hands you a jigsaw puzzle and asks you to put it together

Answer: BWhile you can determine if a puzzle is finished or not in a split second (it is easy to tell if pieces are missing), it is not so easy to put all those pieces together yourself. We have problems like this in computer science and mathematics as well. Think about this: what if there was a method that could solve a jigsaw puzzle as fast as you could check if its right? Wouldn’t that be amazing? That’s what this problem seeks to find out: if such a method exists.

The Class P

There is a class of problems that can be solved in “polynomial time”, and we call this P. Polynomial time means that as the complexity of the problem increases, the time it takes to solve increases at a rate no greater than a polynomial would. Informally, we can say that for P problems, we can solve the problem “quickly”. (Polynomial time).

Here’s an example: Say we have a list of numbers in front of us, and we want to pick out the number that is the greatest. At the very worst, we could start at the beginning of the list and compare every number until we got to the end of the list. Yes, the list may be very very long. But the time it takes to check every number increases in a linear fashion. More numbers does not make it exponentially more difficult. If we compare the growth rates of linear time O(n) (graph A) and polynomial time O(n^2) (graph B) we can clearly see that solving this problem is quicker than polynomial time, thus it is in P.

yeqx2

Graph A

Graph B

Graph B

The Class NP

cookies

“Mom, she got more than me!”

The class of NP problems, put simply, can be verified in polynomial time. NP stands for “nondeterministic polynomial time” and harkens back to a computational construct known as a Turing Machine. Consider this example: a woman is dividing up cookies of different sizes into two groups for her two young children. Naturally, they are very picky about things being fair, so she needs to make sure that each child has exactly the same amount of cookies (by weight). While we can show that this problem can get quite hard to sit down and solve, if someone showed us two piles of cookies we could quickly add up the weights of the two piles and tell if they were the same or not. 

If you have only 2 cookies to separate out, sure, its easy. Put one cookie in each pile and that’s the best you can do. But what if you had 5 cookies? 10? Keep in mind that each cookie weighs a slightly different amount and we want the two groups of cookies to be equal in weight as much as possible. Adding more cookies to work with increases our options for separating them exponentially. This is not good. If she has 5 cookies, the number of different ways to separate them is 2^5 = 32, but if she doubles the amount of cookies to 10, the possibilities skyrocket because 2^{10} = 1024.

Let’s consider, for fun, the number of possibilities if she had 100 cookies! 2^{100}=1,267,650,600,228,229,401,496,703,205,376. This…is pretty large. But wait! Computers can do that in a heartbeat, right? They’re way faster than we are! This is all fine and good, until you realize that the number of seconds in the age of the entire universe is (only) 450,000,000,000,000,000. So even if our computer could go through thousands or millions of computations per second, it would still take longer than the entire age of the universe. This is what makes these sorts of problems so hard.

P = NP?

So now that you know a little about what P and NP are, you may be wondering what all this business about “P=NP” is. What does it mean, after all? If someone were to prove that P equals NP, this would mean that every problem in the class NP (the hard ones) can be solved in a polynomial time, just like the P problems. This would have huge implications for all sorts of applications, not just in the mathematical world. For instance, a cornerstone of many security systems rests on the difficulty of factoring very large numbers. If P=NP, it would mean that there exist very easy methods for factoring these numbers, and as such, financial systems everywhere would be vulnerable.

This can also be applied to many logistical and optimization problems (see my posts on intractable problems here and here). For many of the NP problems, finding an “easy” method to solve one can be applied to any of the problems in that class. We don’t need to show an easy method to solve every problem in the world. We only need one. That shouldn’t be too hard, right? Unfortunately, researchers have been working on this problem for decades without success. There have been many attempted “proofs” but they all ultimately have fallen short. The problem is so difficult that the Clay Mathematics Institute is offering $1,000,000 for a correct proof of this problem one way or another.

Although an official answer to this question has not been decided one way or another, many professionals believe that P \neq NP. This means that problems in NP will always be inherently “harder” than problems in P, and at the worst case require an exhaustive (brute force) search to find a solution. No matter how good our computers get, these problems will still be very difficult for us to solve, especially at large cases.

I hope you enjoyed this little introduction to P and NP and if you have any comments, questions or corrections, please leave them in the comments below!

Intractable Problems — Part Two: Data Storage

This post continues my series on intractable problems. In this installment, I will talk about problems relating to Data Storage. As a refresher, remember that an intractable problem is one that is very computationally complex and very difficult to solve using a computer without some sort of novel thinking. I will discuss two famous problems related to Data Storage below, as well as provide a few examples and references.

Part Two — Data Storage

486px-Knapsack.svg

Knapsack — given a set of items with weights w and values v, and a knapsack with capacity C, maximize the value of the items in the knapsack without going over capacity.

To start with, here is a small example that I refer to in this previous post. If you’re trying to place ornaments on a tree, you want to get the max amount of coverage possible. However, the tree can only hold so much weight before it falls over. What is the best way to pick ornaments and decorations such that you can cover as much of the tree as possible without it falling over?

I actually saw a really interesting video describing an example of this problem in this video (watch the first few minutes). The professor here sets up a situation of Indiana Jones trying to grab treasures from a temple before it collapses. He wants to get the most valuable treasures he can, but he can only carry so much.

Class: There are several different types of knapsack problems, but the most common one (the one discussed above) is one-dimensional knapsack. The decision problem (can we get to a value V without exceeding weight W?) is NP-Complete. However, the optimization problem (what is the most value we can get for the least weight?) is NP-Hard.

References: 

Wikipedia Page – General discussion of the Knapsack problem, different types, complexity, and a high level view of several algorithms for solving

Coursera Course on Discrete Optimization – The source for the above video and a great discussion of not only Knapsack but quite a few of these problems

Knapsack Problem at Rosetta Code – a good example data set and a variety of implementations in different languages

============================

TetrisPieces2

Bin Packing — given a set of items with weights w and a set of n bins with capacity c each, place the items into the bins such that the minimum amount of bins are used.

This problem is very similar to the knapsack problem found above, but this time we don’t care how much the items are worth. We just want to pack them in the smallest area possible. Solving this problem is invaluable for things like shipping and logistics. Obviously, companies want to be able to ship more with less space.

A more commonplace example could be thinking of this problem as Tetris in real life. Consider that you’re moving to a new place,  and you have you and your friends car in which to move things. How can you place all the items in your cars such that you take up space the most efficiently?

Some progress has been made on reasonably large data sets by using what is called the “first fit decreasing algorithm”. This means that you pick up an item, and place it into the first bin that it will fit in. If it can’t fit in any of the current bins, make a new bin for it. Decreasing means that before you start placing items, you sort them all from biggest to smallest. You probably do this in your everyday life. If you want to pack a box, you start with the big items, right? No need to put lots of small items on the bottom. By getting the big items out of the way first, you can be more flexible with the remaining space because you will then have smaller items.

Class: This problem is NP-Hard.

References:

Wikipedia Page – a high level description of the problem

First Fit Decreasing Paper (pdf) – this is a technical paper describing computational bounds for using the first fit decreasing algorithm. Not for beginners.

3D Bin Packing Simulation – looks like a resource for companies to use to pack boxes and such

============================

I hope this provided a little taste of why these problems are so important. If you know any other good resources please let me know.