Last Updated: 2019-05-02
In this codelab, you're going to build a BigQuery pipeline and schedule it with cron.
Your app will:
This codelab is focused on running example queries to demo command line and query schedule features
Click the below button to get started
git clone git@github.com:openx/bq-analysts-team-bq-scheduled-jobs.git
The quickest way to setup github authentication is to enter your Github username. Follow the instructions in this post to create a personal access token and paste this as your password. You can name this personal access token could-shell
.
Alternatively, to avoid having to do that every time you can configure passwordless login with an ssh key.
xclip
will not work for copying your public key. Instead just cat
, highlight, copy and paste it into the browser.chmod +r ~/.ssh/id_rsa.pub
cat ~/.ssh/id_rsa.pub
# Copy the output.
chmod -r ~/.ssh/id_rsa.pub
Additionally, there's a one-time setup step for git to associate your username and email address with commits:
git config --global user.email "first.last@openx.com"
git config --global user.name "First Last"
Create a virtual environment using the steps below. This will keep your python dependency management simpler.
cd python && sudo apt-get install python3-venv -y && \
python3 -m venv bq-pipeline-venv
Activate your virtual environment.
source python/bq-pipeline-venv/bin/activate
Install dependencies defined in requirements.txt
. We've included several tools make your development workflow better.
If further python dependencies are needed this file will need to be updated.
python3 -m pip install -r python/requirements.txt
Note: You can safely ignore the warnings on "bdlist_wheel"
You're good to go! Next time you log into the cloud shell, you can simply activate your virtual environment and install again to ensure your installed packages are up to date
source bq-pipeline-venv/bin/activate
python3 -m pip install -r requirements.txt
vi ~/.bash_profile
Copy/paste the following content to the above .bash_profile file.
# Git branch in prompt.
parse_git_branch() {
git branch 2> /dev/null | sed -e '/^[^*]/d' -e 's/* \(.*\)/ (\1)/'
}
export PS1="\u@\h \W\[\033[32m\]\$(parse_git_branch)\[\033[00m\] $ "
alias init_bq_venv="source ~/bq-analysts-team-bq-scheduled-jobs/python/bq-pipeline-venv/bin/activate"
Now you can activate your virtual environment by simply running the following:
init_bq_venv
Create a branch for your development.
git checkout -b feature/myNewScheduledQuery-$(whoami)
git push
(shown below). This where you will request a review from a teammate.git add .
git commit --all --message='this message describes the change'
git push --set-upstream origin feature/myNewScheduledQuery-$(whoami)
# Contained in the output of this push should be a shortcut to create a pull request (PR). It will look like the following:
remote: Create a pull request for 'feature/myNewScheduledQuery-<your-username>' on GitHub by visiting:
remote: https://github.com/openx/analytics-team-repo/pull/new/feature/myNewScheduledQuery-brandonjacob
devint
branch. If you've added a cron job it will get picked up and scheduled on the cron box auto-magically! We call this Continuous Deployment.Each team shares a repo to scheduled pipelines, and the repo comprises of the following elements:
sql/
directory to define SQL Queries. This can optionally use jinja templating in the queries.python/
directory to define Python wrappers. This should take advantage of the BQPipeline utility class (docs).crontab
file in the root of the repo to define your cron entry. This should reference your Python wrapper script.In this section you will define a BigQuery Pipeline that does the following:
races
table containing information about the 2012 Olympic 800m championship race in a nested structure.finish_times
table.finish_times
table to a CSV file on GCS.First let's create a two sql files race_splits.sql
and finish_times.sql
{{ project }}
and {{ dataset }}
in the second query are Jinja templates. These values will be replaced by your python wrapper.Ensure you are in the analytics-team-repo
directory (the repository root). Create a files ./sql/race_splits.sql
and ./sql/finish_times.sql:
touch ./sql/race_splits.sql ; cloudshell edit ./sql/race_splits.sql
touch ./sql/finish_times.sql ; cloudshell edit ./sql/finish_times.sql
Copy the contents of ./sql/race_splits.sql
to be:
SELECT "800M" AS race,
[STRUCT("Rudisha" as name, [23.4, 26.3, 26.4, 26.1] as splits),
STRUCT("Makhloufi" as name, [24.5, 25.4, 26.6, 26.1] as splits),
STRUCT("Murphy" as name, [23.9, 26.0, 27.0, 26.0] as splits),
STRUCT("Bosse" as name, [23.6, 26.2, 26.5, 27.1] as splits),
STRUCT("Rotich" as name, [24.7, 25.6, 26.9, 26.4] as splits),
STRUCT("Lewandowski" as name, [25.0, 25.7, 26.3, 27.2] as splits),
STRUCT("Kipketer" as name, [23.2, 26.1, 27.3, 29.4] as splits),
STRUCT("Berian" as name, [23.7, 26.1, 27.0, 29.3] as splits),
STRUCT("Nathan" as name, ARRAY<FLOAT64>[] as splits),
STRUCT("David" as name, NULL as splits)]
AS participants
Copy the contents of ./sql/finish_times.sql
to be:
SELECT name, sum(duration) AS finish_time
FROM `{{ project }}.{{ dataset }}`.races, races.participants LEFT JOIN participants.splits duration
GROUP BY name;
Start by making a copy of the example.py
and opening it in your editor:
cp python/example_pipeline.py python/my_first_pipeline.py
cloudshell edit python/my_first_pipeline.py
Now we'll walk through line by line and edit my_first_pipeline.py
in the Cloud Shell Code Editor:
...
JOB_NAME = "race-analysis"
DATASET = "scratch"
...
bqp
object is of class BQPipeline
. This provides us convenience methods for interacting with BigQuery.races, avg_speed_csv,
etc. We're only working with two tables for this scheduled job, so delete all but races and avg_speed_csv
. Both should be set to = races
bqp.run_queries
we pass a list of (query, destination table) pairs race_splits.sql
into destination table races
finish_times.sql
into GCS destination: 'gs://analytics-scratch/csv_export/finish_times/'
table_id
(project and dataset inferred from bqp)dataset_id.table_id
(project inferred from bqp)Project_id.dataset_id.table_id
(fully qualified path)bqp.delete_tables
expression to delete the races
table.In the cloud shell editor, edit the crontab
file at the root of this directory to get your pipeline scheduled. Each entry consist of:
/home/cronuser/cronjob_repo/python/)
*/30 * * * * python3 /home/cronuser/cronjob_repo/python/example_pipeline.py
Format your query using sqlformat.
This tool can automate making your sql queries look pretty and consistent, similar to the bigquery format button in the UI. If you've never used this tool within cloudshell, install it using:
sudo apt-get install sqlformat -y
Invoke sqlformat
from the sql/
directory:
sqlformat \
--reindent \
--keywords upper \
--identifiers lower \
race_splits.sql -o sql/race_splits.sql
sqlformat \
--reindent \
--keywords upper \
--identifiers lower \
finish_times.sql -o sql/finish_times.sql
--dry_run
flag will quickly validate syntax and check that the tables you have referenced exist--dry_run
flagbq query --use_legacy_sql=false --dry_run --flagfile sql/race_splits.sql
bq query --use_legacy_sql=false --dry_run --flagfile sql/finish_times.sql
Successfully validating this query
# Create this table in the scratch dataset. (Not necessary for existing tables)
bq query --use_legacy_sql=false \
--destination_table=ox-data-analytics-prod:scratch.races \
--flagfile sql/race_splits.sql
# Set environment variables for templated values.
export project=ox-data-analytics-prod
export dataset=scratch
# Use j2 to render the template and dry run it.
j2 sql/finish_times.sql | bq query --use_legacy_sql=false --dry_run
Check your Python style. Note the cloud shell code editor will give you hints on python style as you develop! pylint
is just a CLI tool to use as a final check.
python3 -m pylint python/my_first_pipeline.py
python3 my_first_pipeline.py
Submitting a git Pull Request
Use below git commands to commit and push your code (checkout this reference for learning git!):
git add sql/ python/ crontab
git commit -m "Adding <name of your scheduled job>"
git push --set-upstream origin feature/myNewScheduledQuery
You can now create a Pull Request in your browser with Github.
Once your PR is merged to this repo your work is done.
The scheduling VM pulls this repo once every 15 mins. This means that you may need to wait up to 15 mins before your first scheduled run occurs.
Note, merging to the devint branch is effectively deploying those change in the devint project.
The BigQuery Pipeline utility class is configured to log to files in /home/ubuntu/bq-pipeline-*.log*
. The CronBox is configured with fluentd to write logs in this directory to StackDriver and tags each log with bq-analyst-cron
.
You can look at the logs with this command. Using the --freshness
flag to control how far back in the logs you want to look.
gcloud logging read \
--freshness 3h \
logName:"projects/ox-data-analytics-devint/logs/bq-analysts-bq-scheduled-jobs"'
Once your Pull Request is merged to the branch for the appropriate environment, it will be scheduled on that environment's BigQuery CronBox VM. This means the process for promoting from devint to qa is to create a Pull Request that merges the devint branch to qa.