First Steps With Luigi Data Pipelines

luigi

Luigi makes managing your data pipelines easy and fun.

Are you a data scientist or engineer that deals with large amounts of data in your daily work? Do you have an unmanageable mess of cron jobs and scripts to run queries and ETL tasks? Does dependency management make your head spin?

Enter Luigi. Named after the famous character, Luigi is a Python utility that allows you to manage and modularize data pipelines in a consistent fashion. Luigi was originally created by Spotify to handle their data processing needs (billions of log messages and terabytes of files per day). They later released into the world as an open source project. Any service that has python bindings (from SQL to MongoDB to Hadoop to Spark to graphing and analysis in Pandas) can be tied together as a Task in Luigi.

To install, simply run pip install luigi and it should install the necessary dependencies. You can also find the latest source at Luigi and build it yourself.

How does it work?

Building a data pipeline in Luigi is similar to creating a makefile. Actions or steps of your process are contained in Tasks, and each Task has a few simple methods to tell Luigi what to do with it.

  • run()
    • This tells Luigi what you want the task to do when it is run. This could be something like submitting a Map-Reduce job, querying a database, or running an external script in Python. The main logic of the task goes here. Since Luigi is highly modular, we break up our work into chunks that we can move around, edit, and maintain later. This way, if one part of the process goes down (say, if the database is unreachable), then it won’t lose the progress on other parts of the pipeline and you can pinpoint what exactly failed.
  • output()
    • This describes the method of output of your Task. It could be writing to a file, HDFS, or simply updating a database.
  • requires()
    • This function defines the dependencies of your task. Maybe you need to wait for something else to run first. Maybe you need to make sure that a certain dataset is updated before your task runs. The requires() function is what ties your tasks together in Luigi and adds a sense of chronology to the execution.

Your First Task

Let’s start with a simple Task and then we’ll grow it out to be a more complex pipeline.

import luigi

class MyFirstTask(luigi.Task): # inherit from Luigi base class Task

    def run(self):
        with self.output().open('w') as f:
            f.write("This is my first task.\n")
            
    def output(self):
        # tell luigi where to ouptut to
        return luigi.LocalTarget('MyTask.txt')

if __name__ == '__main__':

    # since we are setting MySecondTask to be the main task,
    # it will check for the requirements first, then run
    luigi.run(["--local-scheduler"], main_task_cls=MyFirstTask)

https://gist.github.com/nelsonam/3a17d1f540a749dbff89

All right, so this does the very simple task of printing a line to a file. But that’s easy. Let’s add in some dependencies. Run this at the command line by typing python mytask.py. This is a very simple example just to give you the structure. Let’s add some dependencies.

Add a new task to your file:

class MySecondTask(luigi.Task):
    # MyFirstTask() needs to run first
    def requires(self):
        return MyFirstTask()

    def run(self):
        # here we're going to count the words in the first file
        # and output it to a second file
        numwords = 0
        # once MyFirstTask() has run, this file has the info we need
        # it could just as easily be database output in here
            for line in f:
                words = line.split(None) # this splits on whitespace of any length
                numwords += len(words)

        # print our results
        with open('words.txt', 'w') as out:
            out.write(str(numwords)+"\n")

    def output(self):
        return luigi.LocalTarget('words.txt')

https://gist.github.com/nelsonam/5dc77ffcdfa5556d47a8

As you can see here, we utilize the method to define a dependency. This means that when you run the second method, MySecondTask(), it will check to see if an output exists for MyFirstTask(). If it’s not there, it will run the task under requires() before proceeding. Additionally, if the dependent task fails for any reason, an error email will be sent to the address of your choosing and the workflow will start up where you left off the next time you run it.

The Central Scheduler

All right, let’s take this online! The Central Scheduler is a Luigi daemon that runs on your server. To set it up, run luigid.

The official docs go into more detailed documentation about configuring the daemon here.

When you go to http://localhost:8082(or whatever the ip of your server is), then you should see Luigi running like this:

Let’s run our tasks and see what happens. Run python mytask.py again while you have luigid running. Make sure you change the luigi.run statement so that it no longer includes ['--local-scheduler']. This will tell it to go to the central scheduler on our server.

After it runs, you should see this:

task2

One of the best features of the central scheduler is that it provides this visualization of running tasks. It shows tasks in queue, tasks running, tasks failed, and where all the dependencies point. Our example is small, but as your workflows grow more complex, this is a very useful tool indeed.

Extensions

Luigi isn’t just for straight Python, either! Spotify uses Luigi to run thousands of Hadoop jobs per day, and has built in other extensions since then as well.

According to the official Luigi docs, you can now implement tasks that talk to all kinds of data technologies big and small. Just a few things you can do once you get the hang of the basics are submit Hadoop jobs, interface with mySQL, and store data in Redis.

Best of all, Luigi is open source and contributions are welcomed. Go check it out today!

 

Advertisements

Pretty Print JSON From The Command Line

JSON can come in all shapes and sizes, and sometimes you want to see it in a structured format that’s easier on the eyes. This is called pretty printing. But how do you accomplish that, especially if you have a really large JSON file? While there are some converter tools online to show json files in a pretty format, they can get very slow and even freeze if your file is too large. Let’s do it locally.

Requirements: Have Python installed, and a terminal environment.

cat file.json | python -m json.tool > prettyfile.json

And that’s all there is to it! Python comes built in with a JSON encoding/decoding library, and you can use it to your advantage to get nice formatted output. Alternatively, if you are receiving JSON from an API or HTTP request, you can pipe your results from a curl call directly into this tool as well.

Hope this helps!

A Quick Introduction to HDFS

The Hadoop filesystem is called HDFS, and today I’m going to give a short introduction to how it works for a beginner.

The Hadoop File System (HDFS) sits on top of a Hadoop cluster and facilitates the distributed storage and access of files.  When a file is stored in HDFS, it is split into chunks called “blocks”. They can be of different sizes. The blocks are scattered between the nodes. These nodes have a daemon running called a datanode. There is one node called the namenode that has metadata about the blocks and their whereabouts.

To protect against network or disk failure data is replicated in three places across the cluster. This makes the data redundant. Therefore if one datanode goes down, there are other copies of the data elsewhere. When this happens a new copy of the data is created, so that there are always three.

The namenode is even more important, because it has metadata about all the files. If there is a network issue, all of the data will be unavailable. However, if the disk on the namenode fails, the data may be lost forever, because the namenode has all the information about how the pieces of the files go together. We’d still have all the chunks on the data nodes, but we’d have no idea what file they go to.

To get around this issue, one solution is to also mount the drive on a network file system (NFS). Another way to approach this (which is a better alternative) is to have an active namenode and a standby namenode. This way, there is a “backup” if something goes wrong.

Some commands:
  • To list files on HDFS:
    • $ hadoop fs -ls
  • To put files on HDFS:
    • $ hadoop fs -put filename
    • this takes a local file and puts on HDFS
  • To display the end of a file :
    • $ hadoop fs -tail filename
  • Most bash commands will work if you put a dash in front of them
    • $ hadoop fs -cat
    • $ hadoop fs -mv
    • $ hadoop fs -mkdir
    • etc…

Halloween Themed Math Puzzles

free-halloween-powerpoint-background-8

Happy Halloween From  The Muse Garden!

In the tradition of my Holiday Math Puzzles, I’m here with an appropriately themed puzzle for this time of year.

Candy Distribution

Halloween-Candy1

It’s that time of year all right. You’re out and about, trick or treating with your friends or family and when you come home, you decide to dump all the candy out on the floor to sort through it. But then, as siblings often do, you begin to bicker about who has “more” than the other. In fact, there are some candies that you really like, and some you don’t like. You’d rather have a bunch of chocolate than a bunch of peppermints, for instance.  But wait a minute! Of course you can’t like the same thing! Your sibling actually likes peppermints!

Here’s a table of the different candies you have. Each candy has a “value” to it; that is, how much you “want” it. Try to split up the candies such that you and your sibling both have as equal value as possible at the end. And no fighting!

Candy Quantity Your Value (per piece) Sibling’s Value (per piece)
Candy Corn 150 25 50
Peppermints 50 5 50
Peanut Butter Cups 10 100 75
Hershey Bars 25 50 10
Kit Kat 20 75 30

The Collatz Conjecture and Hailstone Sequences: Deceptively Simple, Devilishly Difficult

Here is a very simple math problem:

If a number is even, divide the number by two. If a number is odd, multiply the number by three, and then add one. Do this over and over with your results. Will you always get down to the value 1 no matter what number you choose?

Go ahead and test this out for yourself. Plug a few numbers in. Try it out. I’ll wait.

Back? Good. Here’s my example: I start with the number 12.

12 is even, so I divide it by 2.

12/2=6.

6/2=3.

3 is odd, so we multiply it by 3 then add 1.

(3*3)+1=10.

10/2=5.

(5*3)+1=16.

16/2=8.

8/2=4.

4/2=2.

2/2=1.

And we have arrived at one, just like we thought we would. But is this ALWAYS the case, for ANY number I could possibly dream of?

This may sound easy, but in fact it is an unsolved mathematics problem, with some of the best minds in the field doing research on it. It’s easy enough to check low values, but numbers are infinite, and can get very, very, very large. How do we KNOW that it works for every single number that exists?

That is what is so difficult about this problem. While every single number we have checked (which is a LOT) ends up at 1 sooner or later (some numbers can bounce around for a very long time, hence why they are called “hailstone sequences”), we still have no method to prove that it works for every number. Mathematicians have been looking for some method to predict this, but despite getting into some pretty heavy mathematics in an attempt to attack this problem, we still do not know for sure.

This problem is known as the Collatz Conjecture and is very interesting to mathematicians young and old because it is so easy to explain and play with, yet so tough to exhaustively prove. What do you think? Will this problem ever be solved? And what would be the implications if it was?

Here is some simple Python code that can display the cycles for any number you type in:

#!/usr/bin/python
num = int(input("Enter a number: "))
while num!=1:
if num%2 == 0:
num = num/2
else:
num = (num*3)+1
    print num

A Genetic Algorithm for Computing Ramsey Numbers: Update

Friends_strangers_graph

All the 78 possible friends-strangers graphs with 6 nodes. For each graph the red/blue nodes shows a sample triplet of mutual friends/strangers.

In my last post on this topic, I discussed how I was working on a genetic algorithm to search mathematical graphs for elusive properties called Ramsey Numbers. (For a refresher on genetic algorithms,  visit here, and for a refresher on Ramsey Numbers, visit here). I’ve been doing some work on it since then (check out the code here), and I thought I would describe some improvements and further progress I’ve made in this area.

New features:

  • colorings dumped to a file at the end of each run
  • ability to load in data sets from file, further refining of the data than starting from scratch each time

The next problem I ran up against while working through this was that even if I am able to load in previously analyzed data, I still only have one fitness function that checks a static set of edges. As I see it, there are two ways to solve this:

  • Make the current fitness function dynamic; that is, it tests a different set of edges every time. However, this is counterproductive to the purpose of the program “eliminating” certain sets of edges in each “round”. However, this would be easier to maintain than the other option, which is
  • Make a “FitnessHandler” method that takes in a value for which method to run, and uses that to determine what set of edges to test. However, this would lead to a lot of extra code and overhead. I’m thinking having a static variable at the beginning of each run with what “fitness method” to start on, so that it doesn’t have to start on round one each time.

I haven’t fully decided which of these I will go with. I feel like the second one fills my purpose of methodically “weeding out” the improbable graphs, but its going to be a lot of extra work. Oh well, nothing worthwhile ever came easy…

Leave a note here or on my github if you have suggestions!