Last Updated: 2019-05-02

What you'll build

In this codelab, you're going to build a BigQuery pipeline and schedule it with cron.

Your app will:

What you'll learn

This codelab is focused on running example queries to demo command line and query schedule features

What you'll need

Click the below button to get started

Cloud Shell

Checkout the code

git clone git@github.com:openx/bq-analysts-team-bq-scheduled-jobs.git

First time only setup

The quickest way to setup github authentication is to enter your Github username. Follow the instructions in this post to create a personal access token and paste this as your password. You can name this personal access token could-shell.

Alternatively, to avoid having to do that every time you can configure passwordless login with an ssh key.

chmod +r ~/.ssh/id_rsa.pub
cat ~/.ssh/id_rsa.pub
# Copy the output.
chmod -r ~/.ssh/id_rsa.pub

Additionally, there's a one-time setup step for git to associate your username and email address with commits:

git config --global user.email "first.last@openx.com"
git config --global user.name "First Last"

Create a virtual environment using the steps below. This will keep your python dependency management simpler.

cd python && sudo apt-get install python3-venv -y && \
python3 -m venv bq-pipeline-venv

Activate your virtual environment.

source python/bq-pipeline-venv/bin/activate

Install dependencies defined in requirements.txt. We've included several tools make your development workflow better.

If further python dependencies are needed this file will need to be updated.

python3 -m pip install -r python/requirements.txt

Note: You can safely ignore the warnings on "bdlist_wheel"

Each Next Login

You're good to go! Next time you log into the cloud shell, you can simply activate your virtual environment and install again to ensure your installed packages are up to date

source bq-pipeline-venv/bin/activate
python3 -m pip install -r requirements.txt

Add A Bash Profile (Optional)

vi ~/.bash_profile

Copy/paste the following content to the above .bash_profile file.

# Git branch in prompt.
parse_git_branch() {
  git branch 2> /dev/null | sed -e '/^[^*]/d' -e 's/* \(.*\)/ (\1)/'
}
export PS1="\u@\h \W\[\033[32m\]\$(parse_git_branch)\[\033[00m\] $ "

alias init_bq_venv="source ~/bq-analysts-team-bq-scheduled-jobs/python/bq-pipeline-venv/bin/activate"

Now you can activate your virtual environment by simply running the following:

init_bq_venv

Create a branch for your development.

git checkout -b feature/myNewScheduledQuery-$(whoami)
git add . 
git commit --all --message='this message describes the change'
git push --set-upstream origin feature/myNewScheduledQuery-$(whoami)

# Contained in the output of this push should be a shortcut to create a pull request (PR). It will look like the following:
remote: Create a pull request for 'feature/myNewScheduledQuery-<your-username>' on GitHub by visiting:
remote:      https://github.com/openx/analytics-team-repo/pull/new/feature/myNewScheduledQuery-brandonjacob

Directory structure of the pipeline

Each team shares a repo to scheduled pipelines, and the repo comprises of the following elements:

In this section you will define a BigQuery Pipeline that does the following:

Define some SQL Scripts

First let's create a two sql files race_splits.sql and finish_times.sql

Ensure you are in the analytics-team-repo directory (the repository root). Create a files ./sql/race_splits.sql and ./sql/finish_times.sql:

touch ./sql/race_splits.sql ; cloudshell edit ./sql/race_splits.sql
touch ./sql/finish_times.sql ; cloudshell edit ./sql/finish_times.sql

Copy the contents of ./sql/race_splits.sql to be:

./sql/race_splits.sql

SELECT "800M" AS race,
   [STRUCT("Rudisha" as name, [23.4, 26.3, 26.4, 26.1] as splits),
    STRUCT("Makhloufi" as name, [24.5, 25.4, 26.6, 26.1] as splits),
    STRUCT("Murphy" as name, [23.9, 26.0, 27.0, 26.0] as splits),
    STRUCT("Bosse" as name, [23.6, 26.2, 26.5, 27.1] as splits),
    STRUCT("Rotich" as name, [24.7, 25.6, 26.9, 26.4] as splits),
    STRUCT("Lewandowski" as name, [25.0, 25.7, 26.3, 27.2] as splits),
    STRUCT("Kipketer" as name, [23.2, 26.1, 27.3, 29.4] as splits),
    STRUCT("Berian" as name, [23.7, 26.1, 27.0, 29.3] as splits),
    STRUCT("Nathan" as name, ARRAY<FLOAT64>[] as splits),
    STRUCT("David" as name, NULL as splits)]
    AS participants

Copy the contents of ./sql/finish_times.sql to be:

./sql/finish_times.sql

SELECT name, sum(duration) AS finish_time
FROM `{{ project }}.{{ dataset }}`.races, races.participants LEFT JOIN participants.splits duration
GROUP BY name;

Define a Python Wrapper

Start by making a copy of the example.py and opening it in your editor:

cp python/example_pipeline.py python/my_first_pipeline.py
cloudshell edit python/my_first_pipeline.py

Now we'll walk through line by line and edit my_first_pipeline.py in the Cloud Shell Code Editor:

 ...
    JOB_NAME = "race-analysis"
    DATASET = "scratch"
...

Edit the crontab file

In the cloud shell editor, edit the crontab file at the root of this directory to get your pipeline scheduled. Each entry consist of:

*/30 * * * * python3 /home/cronuser/cronjob_repo/python/example_pipeline.py

Linting your sql

Format your query using sqlformat. This tool can automate making your sql queries look pretty and consistent, similar to the bigquery format button in the UI. If you've never used this tool within cloudshell, install it using:

sudo apt-get install sqlformat -y

Invoke sqlformat from the sql/ directory:

sqlformat \
  --reindent \
  --keywords upper \
  --identifiers lower \
  race_splits.sql -o sql/race_splits.sql

sqlformat \
  --reindent \
  --keywords upper \
  --identifiers lower \
  finish_times.sql -o sql/finish_times.sql

Validate your query

bq query --use_legacy_sql=false --dry_run --flagfile sql/race_splits.sql

bq query --use_legacy_sql=false --dry_run --flagfile sql/finish_times.sql

Successfully validating this query

# Create this table in the scratch dataset. (Not necessary for existing tables)
bq query --use_legacy_sql=false \
--destination_table=ox-data-analytics-prod:scratch.races \
--flagfile sql/race_splits.sql

# Set environment variables for templated values.
export project=ox-data-analytics-prod
export dataset=scratch

# Use j2 to render the template and dry run it.
j2 sql/finish_times.sql | bq query --use_legacy_sql=false --dry_run

Check your Python style. Note the cloud shell code editor will give you hints on python style as you develop! pylint is just a CLI tool to use as a final check.

Strongly Recommended: Strive to achieve 10/10 score !

python3 -m pylint python/my_first_pipeline.py

Validate your python wrapper by running the below command locally and observe the expected logs and outputs in GCP console. Note, this is not a dry run, each query in you pipeline will actually get executed in BigQuery and destination tables created as needed.

python3 my_first_pipeline.py

Submitting a git Pull Request

Use below git commands to commit and push your code (checkout this reference for learning git!):

git add sql/ python/ crontab
git commit -m "Adding <name of your scheduled job>"
git push --set-upstream origin feature/myNewScheduledQuery

Strongly Recommended: Please do not add / commit your virtual environment to the repo!

You can now create a Pull Request in your browser with Github.

Once your PR is merged to this repo your work is done.

The scheduling VM pulls this repo once every 15 mins. This means that you may need to wait up to 15 mins before your first scheduled run occurs.

Note, merging to the devint branch is effectively deploying those change in the devint project.

The BigQuery Pipeline utility class is configured to log to files in /home/ubuntu/bq-pipeline-*.log*. The CronBox is configured with fluentd to write logs in this directory to StackDriver and tags each log with bq-analyst-cron.

You can look at the logs with this command. Using the --freshness flag to control how far back in the logs you want to look.

gcloud logging read \
--freshness 3h \
logName:"projects/ox-data-analytics-devint/logs/bq-analysts-bq-scheduled-jobs"'

In this code lab we learned

Once your Pull Request is merged to the branch for the appropriate environment, it will be scheduled on that environment's BigQuery CronBox VM. This means the process for promoting from devint to qa is to create a Pull Request that merges the devint branch to qa.