Retrieving Data From a JSON API With Python

For a long time, I had in mind to write a post on visualizing GPS trajectory data. To my surprise, it’s difficult to find a couple of data samples online that you can quickly use for some data analysis. At work, we have developed mobile SDKs that can automatically record driving data and I happened to have collected some data from myself. So I said to myself why not retrieve my data and post some of it here for anyone to use, and use the rest to write the long overdue post that I had in mind. So here’s a short write-up of how I pulled my data out. For obvious reasons I’m not going into the details of the systems I’m trying to connect to instead I’m hoping that you find the code educational and perhaps useful when you have similar use-cases.
Over the years I used several devices to collect data, that’s why first I had to obtain the list of my device identifiers. With that let’s jump into the code.
I start with some routine imports, additionally I used python-dotenv to load the credentials and database connection details from a .env file.

from dotenv import load_dotenv
import os
import json
from datetime import datetime
import pandas as pd

load_dotenv()
True

I retrieve the database connection from the environmental variables and query the database to obtain the list of trip unique identifiers.

conn_string = os.getenv("CONN_STRING")
device_ids = "XXX YYY ZZZ".split()
trip_identifiers = pd.read_sql_query("""
    SELECT uid
    FROM trip_summary
    WHERE driver_id IN %(device_ids)s
    """, conn_string, params={"device_ids": tuple(device_ids)})
trip_identifiers.head()

uid
0--uid-1--redacted--
1--uid-2--redacted--
2--uid-3--redacted--
3--uid-4--redacted--
4--uid-5--redacted--

Next we need to call an endpoint for each trip identifier and retrieve the data. The following approach is certaintly an overkill since I only have around a hundred trips, but I just wanted to use asyncio and aiohttp to make it work. In case you are curious the runtime for this block of code is under 3 seconds.

import aiohttp
import asyncio
from aiohttp import BasicAuth


auth = BasicAuth(os.getenv("USERNAME"), os.getenv("PASSWORD"))
headers = {'content-type': 'application/json'}
endpoint_url = os.getenv("ENDPOINT_URL")

async def get_trips_async():
    async with aiohttp.ClientSession(auth=auth, headers=headers) as session:
        tasks = [asyncio.ensure_future(fetch(session, endpoint_url, {"uid": uid, "renderer": "apiv2"})) for uid in trip_identifiers.uid.unique()]
        results = await asyncio.gather(*tasks)
        return results

async def fetch(session: aiohttp.ClientSession, url, params):
    async with session.get(url, params=params) as response:
        result =  await response.json()
        result = result.get("locations", None)
        if result:
            # Unfortunately the API returns the coordinates in as JSON disguised as a string!
            return json.loads(result[0])
        return []


#results = asyncio.run(get_trips_async())
# Only because Jupyter has already started the event loop
results = await get_trips_async()

After running the above code results variable holds a list with the data for each trip. Each trip is a list of dictionaries. Next I create a simple function that takes each trip, create a DataFrame, rename the columns, and flattens the metaData column. This column itself is a dictionary that needs to be flattened.
I apply the cleanup function to each trip, create a new column called trip_id, which is just a number that is incremented for each trip, this is useful since in the next step I concatenate the trips and I’d like to use this identifier to distinguish the trips.

def trip_cleanup(df: pd.DataFrame) -> pd.DataFrame:
    df = df.rename(columns={"latitude": "lat", "longitude": "lon", "recordDateUTCTimestamp": "timestamp", "speedKmh": "speed_kmh"})
    df = pd.concat([df.loc[:, df.columns != "metaData"], pd.json_normalize(df.metaData)], axis=1)
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
    return df

trips = [trip_cleanup(pd.DataFrame.from_records(result)) for result in results if result]

for trip_id, trip in enumerate(trips):
    trip["trip_id"] = trip_id

trips_df = pd.concat(trips)

Lastly, I convert the two mystery columns (lockstate and callstate) to category and save the resulting DataFrame as feather file.

trips_df["lockstate"] = trips_df.lockstate.astype("category")
trips_df["callstate"] = trips_df.callstate.astype("category")
filename = "trips-{}.feather".format(datetime.now().strftime("%Y-%m-%d-%H-%M"))
trips_df.reset_index().to_feather(filename)

Hope this been helpful.

Independent Researcher, CTO

My research interests include time series classification and privacy-preserving machine learning.