The Open Source Data Platform
Part 1 — Orchestration: Prefect
I can already see the comments yelling at me now: “Kieran, you are crazy, there is no way you can run an entire data platform from your laptop!”, “You must be trying to sell me something!” While I admit that Cloud SaaS offerings are tempting for their ease of use, reducing data engineering overhead, simplification of data science, prebuilt security features, and allow you to take your product to market faster; prebuilds often still require some level of manipulation to fit your business use case. In this eight-part series, I will be building a comprehensive data platform piece by piece, adding layers as needed while reworking code so that as we move from article to article you can see how the platform matures in front of your very eyes. From elephants, to acronyms, to executive buzzword bingo, to knowing which technologies work and which ones do not: Let’s build a data stack together, open source, on your laptop, for free.
We should probably identify what kind of data platform we wish to build. More importantly, why build one at all? Most companies, and by extension most executives, want to start at the end: they want Insight into their data. Who can blame them? They have been sold this narrative: “Data is the new Oil!”, “The world is generating Exabytes of data a day!”, “If you aren’t data-driven, you are falling behind.” All of these statements are factually correct, and yet, most are lost in the hype; either jumping from one technology to another or worse, stuck with an aging tech stack no engineer wants to be associated with. I would like to propose an anti-hype mentality. While the expectations of data do live up to the hype, you wouldn’t start building a house by painting the wood on the ground. You must first pour the foundation of your data platform before you can gather any meaningful insights. For an enterprise to thrive and survive, you need to build the core functionality of a data platform. These are:
- Orchestration
- Extract and Load
- Transform
- Deploys
- Insights
- Streams
- Feature Stores
- Experimentation
This article is going to focus on the first in the list, Orchestration. Why start with orchestration, I hear you may ask? I would make the analogy that the Orchestration engine is the Brain of the Data Platform’s ‘Body’. Nothing functions well without the brain. The brain schedules and runs autonomic functions as well as user-defined functions. While some might argue that the Heart or the engine of your data platform is more important, consider the following: While life functions ‘fine’ without higher ordered thinking, can you truly express something beautiful without order? Complex thoughts, feelings, reasoning, analytics, and understanding beyond base instinct happens in the brain. This is why I chose to start this article here, not because the engine is not important, but so we are not limited in the future by the brain we chose and how we can scale out.
Having said that, to some engineers, a cron scheduler is all they need. Nowadays there is complexity in everything we do. How do we keep an ordered track of our datasets? How do we ensure that datasets are available at the time it is needed to run? Is there a way that you can order these things natively, write Data Engineering and Data Science workflows, and without the need to learn dag terminology? Let’s Dive into the Deep end of this seven-part series with Prefect.
Prefect — Orchestration that just works
This may come off a little biased, but I am going to say it anyway: Prefect just works. Out of the box, it requires so little manual intervention deploying locally, that I really questioned if I had installed anything at all. To get started, all you need to do is this.
$ pip install prefect ## requires python3.7 or later
$ prefect backend server
$ prefect server start
This will install the cli and set Prefect’s operating model to the ‘local backend server mode’ which will allow you to run prefect locally. You should be able to check out the UI now by visiting localhost:8080, assuming you did not change the port number when starting the server. (If you followed the instructions as is you should be good to go. Here is an example of what that looks like:
Your UI will appear empty, I have some flow runs from earlier in the month but thought this would be a good opportunity to preview what yours will look like after running your first flow. The last step before we can truly say Prefect is set up locally is we need to install a local agent. Open a new Terminal and execute the following:
$ prefect agent local start
This will spin up a local agent that will listen for registered flows. “What is a flow?” I hear you ask, why I am glad you asked! It is in part what makes prefect a cut above the rest.
Flows: Abstracted Pythonic Dags
Flows are at their core evolved dag files. They allow for a tighter definition of tasks and subtasks that are orchestrated within the flows, a better scheduling process, and lead to better integration with popular machine learning libraries in python. Let’s start with the code and I will demonstrate how this works. I think simple examples that teach someone how to add two numbers together are important, but pretty useless when trying to apply a technology to a problem as large as something that is going to handle the scheduling of all of your data pipelines. When learning a new technology I highly recommend building something that has applications beyond learning. What do you want to do with code? What problem are you trying to solve? These are the questions that help individuals learn for life, rather than for a test. Here is the code I used to gather data from Twitter and move it into Mongo using Prefect.
Lines 1–10: The setup
First, we import a bunch of useful things from the prefect package we installed earlier in this article. LocalRun is imported as we are running prefect locally on our laptop. We then import the Flow and Parameter as they are needed to define our Flows. Next, we import Schedule, as we want to schedule our job to run at specific intervals. Prefect has some amazing scheduling features, the variety of ways to avoid cron is astonishingly cool. As an engineer, I breathe a sigh of relief not because I can’t convert times; but, because I remember a younger version of me struggling, opening google and going to crontab.guru and experimenting until I got the right schedule. In this example, we are going to use IntervalSchedule to load the 100 tweet json packet into MongoDB on a two-minute schedule. The last Prefect import is LocalResult. I am using this to feed the results of the previous task to the next so I don’t have to save the results out to a file or to a database.
I then import Tweepy to access the Twitter API via Python. There are several python twitter packages, but I like this one. To get started ingesting Twitter data, you will need to create a developer account. There are plenty of tutorials on how to do this. I will link one here. You will need to follow these instructions so you can create credentials that will tie your API interactions to your account. Then you will be able to enter your credentials as seen in the GitHub gist above.
Finally, the last import is pymongo, this is the python API for interfacing with MongoDB, a document store database. Postgres is my usual go-to for these examples, but twitter data is natively stored as JSON, so it makes sense to use Mongo as it is very easy to store JSON data inside of a collection in the database. I have set up a docker container running MongoDB on my machine. Below is a link demonstrating how to set up a MongoDB container with a docker container on your machine. You will need to set this up if you want the last task to work.
Lines 13 — 41: Defining the Tasks for our Flow
I am going to split this section up by the tasks so they are easier to understand. these tasks are:
- authorize_connection (line 13) — This task allows the client to confirm we are who we say we are.
- query_data (line 18) — This task asks the Twitter API for data related to a specific query. Note that this task could be reused and is not hard coded. Given any valid query, you could easily ask for a different collection of tweets.
- print_data (line 23) — This task is here for debugging purposes. When using the local agent I wanted to be able to see the tweets as they were grabbed from the API.
- save_data (line 34) — This task is for saving the data collected from Twitter to mongo. It takes the tweets the user wants to save, the connection arguments, the database, and the collection.
Lines 43 — EOF: Defining the schedule for the flow, registering the project with the agent.
The last lines are all about defining the flow, ordering the tasks within the flow, and then creating a project and registering the project and the flow with the agent. Line 46 defines the name of the flow and the order of operations. Then each task is called sequentially here, but Prefect does support multiple tasks running simultaneously and has the flexibility to define task graphs in a multitude of ways. I will leave some documentation on how to define flows in more intricate ways if the reader wants to augment and change the code I have provided.
https://docs.prefect.io/core/concepts/execution.html#triggers
Register and run your Flow!
Now that this is all defined, assuming your flow is defined and the agent is checking for your flows, you need to create a project for your flow to reside in. Here is the command for creating a project. Please note, that your name needs to match the project you are registering in your python code (line 55)
prefect create project twitter_extract
You can then check the UI that the project was successfully created. Here is an example of my UI. Notice we haven’t run any flows yet, once we run a flow, data will populate. You can have multiple projects at once and can register multiple flows to the same project. This is such a great feature as you can colocate all of your flows within the same project.
Now you can run an individual instance of your flow from within the app by clicking the quick run icon at the top right, (The one that looks like a blue rocket ship), and you will see your Flow run! We are only consuming 100 tweets from Twitter per run so performance is quite good. Now you can check the outputs of your run and even turn on the schedule set in the code right in the UI. Here is the UI after the run. If you’ve made it this far and are following along at home, click around the UI and check out all of the cool features. You can see the order of your Tasks within your Flow.
Conclusion
Congrats, you have now built a Flow to ingest Twitter data using Prefect. Feel free to use this code as you see fit, I could see a lot of use for a Twitter scraper. Just change the query and you can get different data based on your needs. I think that is enough of a taste for now, but hopefully, you can see the power of prefect and orchestration.
You could even set up multiple reads to perform asynchronously and write to any database, but why stop there? You can schedule almost anything with flows? If you want to export and transform that data out of Mongo into Postgres, easy, write another save, or better yet, create an abstract save class that can interpret where to save based on the config! What about exporting the data to Snowflake at the same time? Write a task that writes your json to Snowflake and you are assured that the same data that was gathered in the query task is being sent to two, three, or four different places. Then create another flow that is dependent on the success state of the extract and create a feature store to build a sentiment analysis tool to gauge how Twitter feels about certain users, while using another flow to check when the data is refreshed and run your standard business logic through a different flow. The possibilities are truly endless.
I hope this was an enjoyable experience, I intend to start writing again. I honestly miss the experience of building something cool, accessible, and useful. Please follow me if you enjoyed this article, as the next part will be about scheduling extract and load workflows with Prefect and Airbyte. Thanks for reading, and may you build awesome things!
UPDATE — Part 2 is available, Link below!
https://kjhealey.medium.com/the-open-source-data-platform-33ff5da0b2db