From API to BigQuery. A step by step ELT approach.

Santiago Ravotti
2Performant Tech
Published in
13 min readMay 30, 2023

--

Seamlessly Integrating APIs with BigQuery: Leveraging Google Cloud Functions for Automated Data Retrieval and Scheduling on a practical case.

Introduction

This document provides a step-by-step process for integrating data from the Bucharest Stock Exchange (BVB) API into BigQuery using an Extract, Load, Transform (ELT) approach. The goal is to automate data retrieval, processing, and loading into BigQuery, enabling easy analysis and visualization. The process involves building and testing the code in a Jupyter notebook before deploying it as a Google Cloud Function.

Step 1: Building the Code in Jupyter Notebook

1. Set up the Jupyter notebook environment.

You can use several resources to access a Jupyter notebook. The one used in this project was Google Colaboratory due to its simplicity for set up and user friendly interface.

2. Import the necessary Python libraries for making API requests, parsing data, and working with pandas Data-frames

import requests
from xml.etree import ElementTree as ET
import pandas as pd

3. Define user credentials to authenticate the API requests.

user = "your_user"
password = "your_pass"
symbol = "your_symbol"
market = "XRS1"

4. Define API requests to fetch symbol-specific data (bid and ask prices)

request_xml = f"""<?xml version="1.0" encoding="utf-8"?>
<soap12:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap12="http://www.w3.org/2003/05/soap-envelope">
<soap12:Header>
<Header xmlns="www.bvb.ro">
<User>{user}</User>
<Password>{password}</Password>
</Header>
</soap12:Header>
<soap12:Body>
<Level1 xmlns="www.bvb.ro">
<symbol>{symbol}</symbol>
<market>{market}</market>
</Level1>
</soap12:Body>
</soap12:Envelope>"""

5. Send the request and parse the response using ElementTree.

headers = {
"Content-Type": "application/soap+xml; charset=utf-8",
"SOAPAction": "www.bvb.ro/Level1"
}

response = requests.post("https://ws.bvb.ro/BVBDelayedWS/Intraday.asmx", headers=headers, data=request_xml)
root = ET.fromstring(response.content)
namespaces = {
'soap': 'http://www.w3.org/2003/05/soap-envelope',
'bvb': 'http://www.bvb.ro/'
}
data = {}
for element in root.findall('.//bvb:*', namespaces=namespaces):
data[element.tag.replace('{http://www.bvb.ro/}', '')] = element.text
data['Spread'] = float(data['BestAskPrice']) - float(data['BestBidPrice'])

6. Load the parsed data into separate pandas Data-frames for symbol-specific data.

df = pd.DataFrame([data])
df.head()

The expected result:

7. Validate and test the code in the Jupyter notebook to ensure data retrieval and processing are successful.

Step 2: Wrapping the Code as a Google Cloud Function

1. Refactor the code from the Jupyter notebook into a standalone and optimized Python script.

Refactoring the code from the Jupyter notebook into a standalone Python script involves restructuring the code to remove any dependencies on the notebook environment. This ensures that the code can be executed independently. Additionally, the script includes parsing market data and the first 5 ask and bid values, extracting relevant information from the XML responses received from the BVB API.

2. Create a Google Cloud Function that encapsulates the code logic.

Creating a Google Cloud Function involves encapsulating the code logic into a self-contained Python script that can be executed independently. This allows the code to be deployed and run in a server-less environment.
The using of logging is extremely useful to quickly and precisely identify possible errors in your function that could come up during the testing phase.

It is important to note that if you want to use the exact logic and parameters for your code than the ones listing next, you must previously define your tables in BigQuery as native tables.
We will call this program with the Python logic “main.py”:

from functions_framework import http
import requests
from xml.etree import ElementTree as ET
import pandas as pd
import pandas_gbq
import logging
import os
from google.cloud import secretmanager
import google.cloud.logging

# Set up logging
logging_client = google.cloud.logging.Client()
logging_client.setup_logging()

# Create BigQuery client
bigquery_client = bigquery.Client()

def get_bigquery_table_schema(dataset_table_name):
# Retrieve the schema of a BigQuery table
table = bigquery_client.get_table('performant-2.'+dataset_table_name)
schema_list = []
for field in table.schema:
schema_list.append({'name': field.name, 'type': field.field_type})
return schema_list

def get_data_from_bvb():
logging.info('Fetching data from BVB.')

# Defining user credentials
user = "your_user"
symbol = "your_symbol"
market = "XRS1"
password = os.environ["your_password_variable"]

logging.info('Got password from Secret Manager.')
logging.info('Preparing SOAP requests.')

# Defining common request header and content type for the headers
request_header = f"""<?xml version="1.0" encoding="utf-8"?>
<soap12:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap12="http://www.w3.org/2003/05/soap-envelope">
<soap12:Header>
<Header xmlns="www.bvb.ro">
<User>{user}</User>
<Password>{password}</Password>
</Header>
</soap12:Header>"""
ContentType = "application/soap+xml; charset=utf-8"

# Defining XML request for symbol data series
request_xml = f"""{request_header}
<soap12:Body>
<Level1 xmlns="www.bvb.ro">
<symbol>{symbol}</symbol>
<market>{market}</market>
</Level1>
</soap12:Body>
</soap12:Envelope>"""

headers = {
"Content-Type": ContentType,
"SOAPAction": "www.bvb.ro/Level1"
}

# Defining XML request for market exchange statistics
request_xml_market = f"""{request_header}
<soap12:Body>
<ExchangeStatistics xmlns="www.bvb.ro" />
</soap12:Body>
</soap12:Envelope>"""

headers_market = {
"Content-Type": ContentType,
"SOAPAction": "www.bvb.ro/ExchangeStatistics"
}

# Defining XML request for first 5 trades
request_xml_5trades = f"""{request_header}
<soap12:Body>
<Level2 xmlns="www.bvb.ro">
<symbol>{symbol}</symbol>
<market>{market}</market>
</Level2>
</soap12:Body>
</soap12:Envelope>"""

headers_5trades = {
"Content-Type": ContentType,
"SOAPAction": "www.bvb.ro/Level2"
}

# Sending SOAP requests
logging.info('Sending SOAP requests.')
url = "https://ws.bvb.ro/BVBDelayedWS/Intraday.asmx"

response = requests.post(url, headers=headers, data=request_xml)
response_market = requests.post(url, headers=headers_market, data=request_xml_market)
response_5trades = requests.post(url, headers=headers_5trades, data=request_xml_5trades)

logging.info('Received SOAP responses.')

return response, response_market, response_5trades

# Parsing SOAP responses

# Parsing SOAP responses
def parse_xml_response(response, response_market, response_5trades):
logging.info('Parsing XML response.')

root = ET.fromstring(response.content)
root_5trades = ET.fromstring(response_5trades.content)

namespaces = {
'soap': 'http://www.w3.org/2003/05/soap-envelope',
'bvb': 'http://www.bvb.ro/'
}

# Standardize the XML parsing and dictionary creation process with parse_xml
def parse_xml(root, namespaces):
data = {}
for element in root.findall('.//bvb:*', namespaces=namespaces):
data[element.tag.replace('{http://www.bvb.ro/}', '')] = element.text
return data

parsed_data = parse_xml(root, namespaces)

# Use BeautifulSoup to parse response_market and get XRS1 data
soup = BeautifulSoup(response_market.content, 'lxml-xml')
market_data = {}

# Find the 'XRS1' Marketcode
marketcode_tag = soup.find('Marketcode', string='XRS1')

if marketcode_tag is not None:
# Navigate to parent TypeExchangeStatistics element
type_exchange_statistics_tag = marketcode_tag.find_parent('TypeExchangeStatistics')

# Parse all children into the dictionary
for tag in type_exchange_statistics_tag.find_all():
market_data[tag.name] = tag.text

first_trades_data = parse_xml(root_5trades, namespaces)

logging.info('Parsed XML response.')

# Adding logging to view data
logging.debug(f"parsed_data: {parsed_data}")
logging.debug(f"market_data: {market_data}")
logging.debug(f"first_trades_data: {first_trades_data}")

return parsed_data, market_data, first_trades_data

# Creates pandas dataframes and loads data to BigQuery

# Creates pandas dataframes and loads data to BigQuery
def load_data_to_gbq(parsed_data, market_data, first_trades_data):
logging.info('Loading data to GBQ.')

df_2P = pd.DataFrame([parsed_data])
df_market = pd.DataFrame([market_data])
df_5trades = pd.DataFrame([first_trades_data])

logging.info(f"df_2P is {'not ' if not df_2P.empty else ''}empty")
logging.info(f"df_market is {'not ' if not df_market.empty else ''}empty")
logging.info(f"df_5trades is {'not ' if not df_5trades.empty else ''}empty")

# Save parsed response into a BigQuery table with dynamic schema
df_2P.to_gbq(destination_table='stock_market.bvb_2performant',
project_id='performant-2',
if_exists='append',
table_schema=get_bigquery_table_schema('stock_market.bvb_2performant'))
logging.info('Loaded df_2P to BigQuery')

# Save the market data dataframe to BigQuery
df_market.to_gbq(destination_table='stock_market.bvb_full_market',
project_id='performant-2',
if_exists='append',
table_schema=get_bigquery_table_schema('stock_market.bvb_full_market'))
logging.info('Loaded df_market to BigQuery')

# Save the symbol first 5 trades dataframe to BigQuery
df_5trades.to_gbq(destination_table='stock_market.bvb_daily_first_5_trades',
project_id='performant-2',
if_exists='append',
table_schema=get_bigquery_table_schema('stock_market.bvb_daily_first_5_trades'))
logging.info('Loaded df_5trades to BigQuery')

logging.info('Data successfully written to BigQuery!')

@http
def main(request):
try:
logging.info('Function execution started.')
response, response_market, response_5trades = get_data_from_bvb()
except Exception as e:
logging.error('Error fetching data from BVB: %s', e)
raise
try:
parsed_data, market_data, first_trades_data = parse_xml_response(response, response_market, response_5trades)
except Exception as e:
logging.error('Error parsing XML response: %s', e)
raise
try:
load_data_to_gbq(parsed_data, market_data, first_trades_data)
except Exception as e:
logging.error('Error loading data to GBQ: %s', e)
raise

logging.info('Function execution completed.')
return 'Data successfully written to BigQuery!', 200

If this function doesn’t adapt to your code, you can check some Python examples on Google Cloud Function in the GitHub link https://github.com/GoogleCloudPlatform/python-docs-samples/tree/master/functions.

3. Define the necessary dependencies and configurations for the Cloud Function.

Defining the necessary dependencies and configurations for the Cloud Function involves specifying the required libraries and packages that need to be installed and imported for the function to run successfully. These dependencies are listed in the requirements.txt file. Here’s a breakdown of the dependencies:

  1. functions-framework: This is a framework for writing lightweight, portable Python functions that can run in various Google Cloud environments including Google Cloud Functions, Google Cloud Run, and your local development machine. Its purpose is to help you develop, run, and test your functions locally before deploying them.
  2. requests: This is a popular Python library for making HTTP requests. It abstracts the complexities of making requests behind a beautiful, simple API, allowing you to send HTTP/1.1 requests.
  3. google-cloud-bigquery: This is a client library for Google BigQuery, a fully-managed, server-less data warehouse that enables super-fast SQL queries using the processing power of Google’s infrastructure.
  4. pandas: This is a data analysis and manipulation tool. It’s particularly well-suited to working with structured (e.g. tabular, multidimensional, potentially heterogeneous) and time series data.
  5. pandas-gbq: This is a library to integrate pandas with Google BigQuery. It allows you to read and write pandas Data-frames directly from and to BigQuery.
  6. google-cloud-logging: This is a client library for Google Cloud Logging. It allows you to send logs from your applications to Google Cloud’s logging service and to access and analyze your log data.
  7. beautifulsoup4: This is a Python library for parsing HTML and XML documents. It’s often used for web scraping.
  8. lxml: This is a library for processing XML and HTML in Python. It can also be used for web scraping and can work in conjunction with beautifulsoup4 for more complex tasks.

By including these dependencies in the requirements.txt file, the Cloud Function will have the necessary tools and libraries to execute its code successfully.

4. Test the Cloud Function locally to ensure it functions correctly.

To ensure the Cloud Function functions correctly, it can be tested locally. This involves running the function on a local development environment and sending simulated HTTP requests to verify its behavior, data retrieval from the API, data processing, and successful loading into BigQuery.

The test command in the Cloud Shell would look like this GET request:

curl -m 70 -X GET https://stock-market-extract-load-REST-OF-YOUR-URL \
-H "Authorization: bearer $(gcloud auth print-identity-token)" \
-H "ce-id: 1234567890" \
-H "ce-specversion: 1.0" \
-H "ce-type: google.cloud.pubsub.topic.v1.messagePublished" \
-H "ce-time: 2020-08-08T00:11:44.895529672Z" \
-H "ce-source: //pubsub.googleapis.com/projects/YOUR-PROJECT/topics/parse_feeds_noon"

If all goes according to plan, your logging message should be ‘Data successfully written to BigQuery!’ . If this is not the case, check the logs to pinpoint the precise issue that is not allowing your application to run successfully.

Step 3: Deploying the Cloud Function

1. Set up any required environment variables, including API keys or secrets.

Setting up environment variables is an important step in configuring the Cloud Function. These variables include API keys or secrets that are required for the function to access external resources or sensitive information securely. After deployment we will edit the function to include Eventrac Trigger and more.

To set up environment variables, including API keys or secrets, for your Cloud Function, follow these steps:

  1. Go to the Cloud Functions page in the Google Cloud Console.
  2. Click on the name of your Cloud Function to access its details.
  3. Click on the “Edit” button to modify the function’s configuration.
  4. In the “Runtime, Build, Connections, and Security Settings” section, click on the “Security” tab.
  5. Under the “Reference a secret” option, select “Exposed as environment variable”.
  6. In the “Name” field, enter the name of the environment variable.
  7. From the “Version” drop-down, select the version of the secret to reference.
  8. Click “Done” to save the configuration.
  9. Click “Next” and then “Deploy” to apply the changes to your Cloud Function.

By setting up the environment variable in this way, the Cloud Function will have access to the secret value stored in Secret Manager, which can be retrieved programmatically at runtime using the specified environment variable name.

More information on how to use Secrets in Google Cloud https://cloud.google.com/functions/docs/configuring/secrets.

2. Deploy the Cloud Function to the Google Cloud platform.

After deployment you should be able to seea header with a green check-mark, the name of your function and the URL to invoke your function:

3. Ensure the Cloud Function is accessible and ready for execution in the cloud environment.

To test the Cloud Function in the Google Cloud UI, follow these steps:

  1. Go to the Google Cloud Console (console.cloud.google.com) and navigate to the Cloud Functions section.
  2. Locate your Cloud Function from the list of functions.
  3. Click on the name of your function to access its details page.
  4. On the details page, click on the “Testing” tab.
  5. In the “Testing” tab, you can provide sample data or parameters for testing the function. This data should match the expected input format for your function.
  6. Once you have provided the testing data, click the “Test the function” button.
  7. The Cloud Function will be executed with the provided test data, and you can view the execution results in the UI.
  8. The results will include any logs or output generated by the function during its execution.

By using the testing functionality in the Google Cloud UI, you can verify that your Cloud Function is accessible, and its execution is functioning as expected. This allows you to catch any issues or errors before deploying the function in a production environment.

Step 4: Scheduling a Google Cloud Job

1. Use Google Cloud Scheduler to schedule a job at specific intervals.

Google Cloud Scheduler provides the capability to schedule jobs that run at precise times or intervals. You can configure the scheduler to execute tasks according to your desired schedule.

2. Configure the Cloud Scheduler job to trigger a Pub/Sub event.

In the Cloud Scheduler configuration, you specify that the scheduled job should trigger a Pub/Sub event when it runs. This event acts as a trigger for other processes or services.

3. Set the frequency of the job using the Unix-cron format.

The frequency of the job is defined using the Unix-cron format. This format allows you to specify the precise schedule at which the job should run. For example, using the expression “30 18 * * MON-FRI” will schedule the job to run at 6:30 PM from Monday to Friday.

4. When the scheduled job runs, it sends a message to the Pub/Sub topic.

Once the scheduled job is triggered based on the specified schedule, it sends a message to the designated Pub/Sub topic. This message can be used to trigger other processes or services, such as invoking a Cloud Function or initiating data processing tasks.

By leveraging Google Cloud Scheduler, you can effectively schedule and automate tasks in your cloud environment, ensuring timely execution and coordination between different components of your system.

More information on how to use the Google Cloud Scheduler with Pub/Sub topics https://cloud.google.com/scheduler/docs/tut-pub-sub.

Step 5: Triggering the Cloud Function with Pub/Sub

1. Create a Pub/Sub topic that the Cloud Function will subscribe to.

In order for the Cloud Function to be triggered by events, you need to create a Pub/Sub topic. This topic acts as a communication channel where messages can be published and received. The Cloud Function will subscribe to this topic, waiting for incoming messages.

2. Configure the Cloud Scheduler job to send a message to the Pub/Sub topic.

To schedule the execution of the Cloud Function, you need to configure the Cloud Scheduler job. In the job configuration, you specify the frequency and timing for the job to run. As part of this configuration, you also set the target of the job to be the Pub/Sub topic that you created.

3. When the Pub/Sub message is received, it triggers the execution of the Cloud Function.

Once the Cloud Scheduler job runs at the specified intervals, it sends a message to the Pub/Sub topic. When the Pub/Sub topic receives this message, it triggers the execution of the Cloud Function that is subscribed to the topic. The Cloud Function then processes the message and performs the desired operations or tasks based on the received information.

For this we have to create the Eventarc trigger. If you have not defined yet, you can modify your function to include it.

Remember to give the service account the necessary permissions. Until this moment, that service account should have assigned the following roles:

For running a Google Cloud Function:

  • Cloud Functions Invoker (roles/cloudfunctions.invoker): Allows the service account to invoke (execute) the Cloud Function.

For using a Secret as an environment variable:

  • Secret Manager Secret Accessor (roles/secretmanager.secretAccessor): Grants read access to the specified Secret in Secret Manager. This permission allows the service account to access the secret value and set it as an environment variable.

For using a Pub/Sub topic:

  • Pub/Sub Publisher (roles/pubsub.publisher): Enables publishing messages to a Pub/Sub topic. This permission allows the service account to send messages to the specified topic.

For running a job in Google Cloud Scheduler:

  • Cloud Scheduler Job Runner (roles/cloudscheduler.jobRunner): Authorizes the service account to run the Cloud Scheduler job. This permission allows the service account to execute the scheduled job and trigger the associated actions, such as publishing messages to a Pub/Sub topic.

Finally, by creating a Pub/Sub topic, configuring the Cloud Scheduler job, and leveraging the Pub/Sub messaging system, you establish a pipeline where the Cloud Function is triggered by messages sent to the topic. This enables you to schedule and automate the execution of the Cloud Function based on specific events or intervals.

Step 6: Creating a Customized View in BigQuery

1. Write an SQL query to create a customized view in BigQuery. Use the query to exclude unnecessary data, reformat columns, and calculate additional metrics if required.

From the three original tables populated by our Google Cloud Function, we can use some SQL to avoid duplicates (mainly due to data extraction in local holidays) and have an easier-to-work-with set of data

2. Create the final view in BigQuery, tailored to specific requirements for the dashboard.

The final two views that will feed our report in the data visualization platform of our choice will be in this case the followings:

YOUR-PROJECT-ID.stock_market.bvb_symbol_full_market that combines market and symbol specific data, obtaining as well the weight of the symbol in the market:

WITH 
distinct_full_market AS (
SELECT
Volume AS MarketVolume,
Value AS MarketValue,
Trades AS MarketTrades,
DATE(LastSession) AS Date,
FROM `YOUR-PROJECT-ID.stock_market.bvb_full_market`
),

distinct_symbol AS (
SELECT
* EXCEPT(LastTradeTime, SymbolCode, MarketCode, Lowpricecurrent, Highpricecurrent, SymbolStatus, LastBestTime),
DATE(LastTradeTime) AS Date,
BestAskPrice - BestBidPrice AS DailySpread
FROM `YOUR-PROJECT-ID.stock_market.bvb_symbol`
)

SELECT
dfm.*,
COALESCE((dps.Volume/dfm.MarketVolume), 0) AS VolumeShareSymbol,
COALESCE((dps.Value/dfm.MarketValue), 0) AS ValueShareSymbol,
COALESCE((dps.Trades/dfm.MarketTrades), 0) AS TradesShareSymbol,
dps.* EXCEPT (Date)
FROM distinct_full_market dfm
LEFT JOIN distinct_symbol dps
ON dfm.Date = dps.Date

YOUR-PROJECT-ID.stock_market.first_5_orders_aggregated that obtains totals and averages for volume, price and value of first five both bid and ask orders:

SELECT DISTINCT
DATE(LastTradeTime) AS Date,
(B1Vol + B2Vol + B3Vol + B4Vol + B5Vol) AS TotalBidVol,
(A1Vol + A2Vol + A3Vol + A4Vol + A5Vol) AS TotalAskVol,
(B1Vol*B1Price + B2Vol*B2PriCe + B3Vol*B3Price + B4Vol*B4Price + B5Vol*B5Price) AS TotalBidValue,
(A1Vol*A1Price + A2Vol*A2PriCe + A3Vol*A3Price + A4Vol*A4Price + A5Vol*A5Price) AS TotalAskValue,
B1Price,
B1Vol,
B2Price,
B2Vol,
B3Price,
B3Vol,
B4Price,
B4Vol,
B5Price,
B5Vol,
A1Price,
A1Vol,
A2Price,
A2Vol,
A3Price,
A3Vol,
A4Price,
A4Vol,
A5Price,
A5Vol
FROM `YOUR-PROJECT-ID.stock_market.bvb_daily_first_5_trades`

Conclusion

By following this step-by-step process, data can be seamlessly retrieved from the BVB API, processed, and loaded into BigQuery. This enables efficient analysis and visualization of the data, ultimately driving valuable insights for decision-making.

--

--