Post

Fitbit steps: an end-to-end data engineering project

Introduction

I’ve been paying more attention to health metrics from my Fitbit tracker lately, and it occurred to me that building a data engineering pipeline to visualize them could be a neat little project (and an excuse to try out new tech 😎). So, let’s implement a pipeline that extracts, loads, transforms, and visualizes Fitbit data. For simplicity, in this post, we’ll only focus on step data.

Here’s how the pipeline is going to look:

fitbit-pipeline-diagram Data pipeline

First, we’ll extract data from the Fitbit Web API and load it into BigQuery using dlt. Then, we transform the data using dbt and visualize it using Streamlit. We’ll automate deployment with GitHub Actions and host the dashboard on Streamlit Community Cloud.

Here’s the final dashboard and GitHub repository implementing all the above.

Authentication

If you decide to clone/fork fnery/fitbit-steps and want to try this pipeline yourself, follow the next couple of sections which go through the process of authenticating with the services we’ll use.

We’ll create files containing sensitive information and store them locally. I’ve added all these to a .gitignore file (here) to avoid accidentally pushing them to a GitHub repo.

Fitbit Web API

Follow these docs to get started with the Fitbit APIs. This involves creating an account, registering an application, and authorizing access to user data (see this OAuth 2.0 tutorial).

Once you have a Fitbit access token, create an .env file in the root of the repository and assign the token value to a FITBIT_ACCESS_TOKEN variable:

1
FITBIT_ACCESS_TOKEN=eyJ...

Google BigQuery

Create a new Google Cloud project and ensure the BigQuery API is enabled. Then, create a service account and assign it the following roles (as specified in the dlt docs):

  • BigQuery Data Editor
  • BigQuery Job User
  • BigQuery Read Session User

Finally, create access credentials for the service account. This process will generate a JSON file which you must store at the root of the repository and rename it to GCLOUD_SERVICE_ACCOUNT_KEY_FILE.json.

To avoid authentication issues, ensure the service account JSON file is renamed exactly to GCLOUD_SERVICE_ACCOUNT_KEY_FILE.json and is stored at the root of the repository.

Streamlit

Follow these docs to create a .streamlit/secrets.toml and populate it with the data from the GCLOUD_SERVICE_ACCOUNT_KEY_FILE.json service account key file. This will allow our Streamlit app to read data from BigQuery.

GitHub Actions workflow

The main focus of this post is to demonstrate the integration of various tools and technologies to create a seamless data workflow. The center point for all this is this GitHub Actions workflow file. Let’s break it down.

First, we name the workflow and define its trigger. We set a cron job so that it runs every two hours. We also want to be able to trigger the workflow manually (e.g. on the GitHub UI) to test changes immediately after modifying it, so we set the workflow_dispatch: null key-value pair.

1
2
3
4
5
6
name: Run Fitbit steps pipeline

on:
  schedule:
    - cron: '0 */2 * * *'
  workflow_dispatch: null

We need to provide the workflow with access to the same sensitive configuration settings we have stored locally (see the Authentication section). We’ll source these from GitHub secrets to pass them securely to the workflow.

1
2
3
4
env:
  GCLOUD_SERVICE_ACCOUNT_KEY_FILE: ${{ secrets.GCLOUD_SERVICE_ACCOUNT_KEY_FILE }}
  DESTINATION__BIGQUERY__LOCATION: ${{ secrets.DESTINATION__BIGQUERY__LOCATION }}
  FITBIT_ACCESS_TOKEN: ${{ secrets.FITBIT_ACCESS_TOKEN }}

You’ll need to create these secrets in your GitHub repository by going to Settings > Secrets and variables > Actions and click “New repository secret”.

Next, we’ll define the two workflow jobs: maybe_skip and run_pipeline.

The first, maybe_skip, uses the fkirc/skip-duplicate-actions@v5 action to check for duplicate workflows and potentially skip the current one if there is a concurrent or recent successful duplicate workflow running.

1
2
3
4
5
6
7
8
9
10
11
12
jobs:
  maybe_skip:
    runs-on: ubuntu-latest
    outputs:
      should_skip: ${{ steps.skip_check.outputs.should_skip }}
    steps:
      - id: skip_check
        uses: fkirc/skip-duplicate-actions@v5
        with:
          concurrent_skipping: always
          skip_after_successful_duplicate: 'false'
          do_not_skip: '[]'

Then, we start the run_pipeline job if maybe_skip.outputs.should_skip is not true. If that’s the case, we start by checking out the repository code.

1
2
3
4
5
6
7
run_pipeline:
  needs: maybe_skip
  if: needs.maybe_skip.outputs.should_skip != 'true'
  runs-on: ubuntu-latest
  steps:
    - name: Check out code
      uses: actions/checkout@v3

Then, we create a JSON file with the contents of the GCLOUD_SERVICE_ACCOUNT_KEY_FILE secret and set its path to an environment variable.

1
2
3
4
5
- name: Create gcloud service account file and set its path to an environment variable
  run: |
    GCLOUD_SERVICE_ACCOUNT_KEY_FILE_PATH=$(pwd)/gcloud_service_account_key_file.json
    echo "$GCLOUD_SERVICE_ACCOUNT_KEY_FILE" > $GCLOUD_SERVICE_ACCOUNT_KEY_FILE_PATH
    echo "GCLOUD_SERVICE_ACCOUNT_KEY_FILE_PATH=$GCLOUD_SERVICE_ACCOUNT_KEY_FILE_PATH" >> $GITHUB_ENV

Then, we set up Python 3.10 and install the dependencies listed on requirements.txt. If either a cached virtual environment or pip download cache are available from previous runs, they are used to speed up workflow execution.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- name: Setup Python
  uses: actions/setup-python@v4
  with:
    python-version: 3.10.x

- name: Restore virtual environment from cache
  uses: syphar/restore-virtualenv@v1
  id: cache-virtualenv
  with:
    requirement_files: requirements.txt

- name: Restore pip download cache
  uses: syphar/restore-pip-download-cache@v1
  if: steps.cache-virtualenv.outputs.cache-hit != 'true'

- name: Install python dependencies
  run: pip install -r requirements.txt
  if: steps.cache-virtualenv.outputs.cache-hit != 'true'

The next step runs the ingest.py script to extract data from the Fitbit API and load it into BigQuery.

1
2
3
4
- name: Extract data from Fitbit API and load to BigQuery
  run: |
    export GOOGLE_APPLICATION_CREDENTIALS=${{ env.GCLOUD_SERVICE_ACCOUNT_KEY_FILE_PATH }}
    python 'ingest.py'

Finally, we compile and run the dbt project to transform the data:

1
2
3
4
5
6
7
- name: Compile dbt project
  run: dbt compile --profiles-dir . --target prod
  working-directory: transform

- name: Run dbt project
  run: dbt run --profiles-dir . --target prod
  working-directory: transform

Having reviewed what the pipeline consists of and how it runs, let’s now provide additional context on the individual steps.

Checking out the Fitbit Web API

For the purposes of this project, all we need is a simple endpoint providing the total step count on a daily basis. Below is an example request to the endpoint we will use to inspect the structure of the data responses:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
curl -sX GET "https://api.fitbit.com/1/user/-/activities/steps/date/2024-05-13/2024-05-14.json" \
  -H "accept: application/json" \
  -H "authorization: Bearer $FITBIT_ACCESS_TOKEN" | jq .
# {
#   "activities-steps": [
#     {
#       "dateTime": "2024-05-13",
#       "value": "13469"
#     },
#     {
#       "dateTime": "2024-05-14",
#       "value": "20657"
#     }
#   ]
# }

Note that for the above to work, I have FITBIT_ACCESS_TOKEN initialized as an environment variable.

Extracting/loading data into BigQuery

It would be trivial to write a custom Python script to load data from such a simple data source. Nevertheless, having recently discovered the dlt open-source tool (which being Python library makes easy to integrate into our workflow), I decided to give it a try.

Our python script, ingest.py, requires 2 environment variables, FITBIT_ACCESS_TOKEN and GOOGLE_APPLICATION_CREDENTIALS (the path to the service account JSON file). As we saw above, when executing the script as part of a GitHub action, these are sourced from GitHub repository secrets. When running locally, they will be loaded as follows:

1
2
3
4
# Setup local environment variables if not running in GitHub Actions
if not os.getenv('GITHUB_ACTIONS') == 'true':
    load_dotenv()
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = '../GCLOUD_SERVICE_ACCOUNT_KEY_FILE.json'

We’re then able to request the Fitbit data from the API using a standard approach:

1
2
3
4
5
6
7
8
9
10
def fetch_fitbit_data(start_date, end_date):
    """Fetch data from Fitbit API."""
    url = f"{FITBIT_API_URL}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}.json"
    headers = {
        'accept': 'application/json',
        'authorization': f'Bearer {os.getenv("FITBIT_ACCESS_TOKEN")}'
    }
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()['activities-steps']

This returns a list of dictionaries that can be passed directly to dlt:

1
2
3
4
5
6
7
8
9
10
11
pipeline = dlt.pipeline(
    pipeline_name="steps_pipeline",
    destination="bigquery",
    dataset_name=DATASET_NAME,
)
load_info = pipeline.run(
    data,
    table_name=TABLE_NAME,
    write_disposition="merge",
    primary_key="date_time"
)

After running ingest.py we’re able to query data on BigQuery. An option is to use the bq command line tool:

1
2
3
4
5
6
7
8
9
10
bq query --nouse_legacy_sql 'SELECT * FROM `dlt-bq-test.fitbit_ingest.steps` ORDER BY date_time DESC LIMIT 5'
# +------------+-------+--------------------+----------------+
# | date_time  | value |    _dlt_load_id    |    _dlt_id     |
# +------------+-------+--------------------+----------------+
# | 2024-05-15 | 3436  | 1715786904.9847844 | gSJ0BcXZ7fVspg |
# | 2024-05-14 | 20657 | 1715786904.9847844 | ssQh5QhKk/jzKA |
# | 2024-05-13 | 13469 | 1715732009.5069923 | 4rdFaaQmKOY2Rw |
# | 2024-05-12 | 12327 | 1715699062.0484285 | 95vXeGnqnXXDCA |
# | 2024-05-11 | 14377 | 1715699062.0484285 | 6NZfKPSyxBKTqQ |
# +------------+-------+--------------------+----------------+

Where dlt-bq-test is the ID of my particular Google Cloud project.

Other things worth noting from ingest.py:

  • Data ingestion starts from START_DATE = "2024-01-02
  • We only extract the full data history on the first run. Subsequent runs extract data from the current and the previous day.
  • Duplicates are prevented by the combination of write_disposition="merge" and primary_key="date_time"

Transforming data with dbt

We’re interested in daily step count data and this is precisely what we get from the API and have available in our data warehouse as a result of the ingestion process, so there’s little we need to do as far as data transformations are concerned. Nevertheless, we include a transformation step to demonstrate how dbt can be used to transform the data.

Initializing a dbt project (dbt init) in the transform directory results in the standard “dbt project” structure you see in the repo. One of these directories, models contains the dbt SQL file fct_steps.sql which defines the single data model where the original date and step count columns are renamed and recasted. After an initial run, subsequent runs are incremental, merging incoming step data into the existing table, only processing records with a date equal or later than the most recent day already in the table.

1
2
3
4
5
6
7
8
9
10
bq query --nouse_legacy_sql 'SELECT * FROM `dlt-bq-test.fitbit_transform.fct_steps` ORDER BY day DESC LIMIT 5'
# +------------+-------+---------------------+
# |    day     | steps |   transformed_at    |
# +------------+-------+---------------------+
# | 2024-05-15 |  5918 | 2024-05-15 20:54:01 |
# | 2024-05-14 | 20657 | 2024-05-15 20:54:01 |
# | 2024-05-13 | 13469 | 2024-05-14 15:04:58 |
# | 2024-05-12 | 12327 | 2024-05-14 15:04:58 |
# | 2024-05-11 | 14377 | 2024-05-14 15:04:58 |
# +------------+-------+---------------------+

Visualizing data with Streamlit

Having our data ready, the goals now are:

  • Go from data to dashboard as quickly and effortlessly as possible
  • Keep the dashboards alive (refreshing at a similar frequency as the underlying data models)
  • Make the dashboards easily shareable
  • Build the dashboards in a programmatic way (no WYSIWYG tools)

With this in mind, I decided to use Streamlit, a library that enables building dashboards in the form of web apps using Python without requiring any frontend knowledge. Furthermore, their Community Cloud service makes it easy (and free) to deploy and share the resulting dashboards. Streamlit is one of several emerging BI-as-code tools1 which enable the use of software engineering best practices (version control, CI/CD, dev/prod environments, etc) to the construction of reports/dashboards.

Here’s the resulting script: visualize.py2. Assuming you followed the Authentication section and ran prior ingestion and transformation steps, you should be able to run it locally with:

1
streamlit run visualize.py

To deploy it to the Streamlit Community Cloud, follow these docs. Note you’ll have to provide the contents of .streamlit/secrets.toml by going to “Advanced settings” in the “Deploy an app” page.

At the time of writing, here’s a preview of how the app looks:

fitbit-steps Fitbit steps streamlit app

This seems like a good place to end this post! While there’s a lot we can do to improve this project3, we’ve successfully achieved our primary goal: building a data pipeline from scratch that extracts, loads, transforms, and visualizes data. Plus, we’ve automated the process to ensure our dashboard is consistently refreshed on schedule.


  1. Others include Evidence and Rill

  2. If you guessed an LLM built this script, you’re right. Thanks ChatGPT! I strongly encourage anyone to experiment with LLMs for these kinds of simple, well-defined tasks. 

  3. Containerize the entire pipeline using Docker, add data quality checks (e.g., dbt tests), expand data sources and process the additional data for better insights, set up monitoring/alerting, transition from cron jobs to dedicated orchestration tools like Airflow or Dagster (if and when the increase in complexity is warranted), etc. 

This post is licensed under CC BY 4.0 by the author.