作成しようと思ってすっかり忘れていました。2022年に入ってからGA4のBigQueryエクスポートが遅れることが頻発し、ひどい時は2日程度遅れる事もあったので、スケジュールされたクエリが全く機能しない、ということがありました。
そこで、GA4からのエクスポートが完了したらすぐにその後に必要な処理を実行させるための仕組みをCloud Functionなどを使って作成しました。
今回私が使っているGA4がストリーミングでエクスポートしているタイプなのでバッチの場合と多少異なる可能性がありますがご容赦ください。
作業手順
- Cloud FunctionsでトリガーするCloud Pub/Subのトピックを作成
- GAログが格納されているプロジェクトのCloudLoggingでのログルータにて対象のGAログがBigQueryに格納された時に上記1で作成したCloud Pub/Subトピックに送るシンクを作成
- CloudFunctionsで上記Cloud Pub/Subトピックをトリガーとしてスケジュールされたクエリを呼び出す関数を作成
Cloud FunctionsでトリガーするCloud Pub/Subのトピックを作成
GCPでGA4からBigQueryにエクスポートされるテーブルが入っているプロジェクトを選択し、「ロギング」→「ログエクスプローラ」を選択します。
続いて、左側にある期間を「直近の1日」に変更し、右側の「クエリを表示」をONにします。これで新たに表示された空白部分(下の画像の緑枠」にクエリを入力してから右上の「クエリ実行」ボタンを押します。
protoPayload.authenticationInfo.principalEmail="[email protected]"
protoPayload.methodName="jobservice.jobcompleted"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.datasetId="analytics_プロパティID"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.projectId="プロジェクトID
"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId:"events"
NOT protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId:"events_intraday"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.writeDisposition="WRITE_TRUNCATE"
下記のように結果が表示されればOKです。
Cloud Pub/Subトピックに送るシンクを作成
右下の「操作」→「シンクの作成」を選択します。名前は任意です。
- 名前:GA4_analytics_1234567_WRITE_TRUNCATE (名前は任意 ※1234567はプロパティID)
- シンクサービスの選択:Cloud Pub/Sub トピック
- Cloud Pub/Subトピック:トピックを作成する→Topic_GA4_analytics_1234567 (名前は任意)
- 「シンクを作成」をクリック
CloudFunctionsで上記Cloud Pub/Subトピックをトリガーとした関数を作成
トピックを作成したら、ログが吐き出されるたび(=GA4のエクスポートが完了)に呼び出す処理をCloud Function上で実装します。まずCloud Functionsに移動し、「関数の作成」をクリック
下記のように入力して、「作成」ボタンを押します。
- Environment:1st gen
- Function name:任意(ga4_run_scheduled_queriesとしておきます)
- Region:asia_northeast1(BigQueryと同じ)
- Trigger type:Cloud Pub/Sub
- Topic:先ほど作成したトピック(topic_ga4_analytics_1234567)
続いてコード入力では、requirements.txtとmain.pyの2つを作成します。
requirements.txt
google-cloud-bigquery-datatransfer==1.0.0
google-cloud-bigquery==1.0.0
pandas>=1.0.0
db-dtypes>=1.0.0
main.py
main.pyには以下を入力します。前提として、
- 用途はGA4からエクスポートされたテーブルをセッション毎のレコードを作成して別テーブルに追加格納(テーブル1とします)
- ログからこの関数が実行されるたびにGA4の前日テーブルが作成されているか?を確認
- GA4の前日テーブルが作成されていたら、テーブル1にすでに追加済みか?を確認
- まだ未格納だった場合は追加格納
import time
from google.protobuf.timestamp_pb2 import Timestamp
from google.cloud import bigquery_datatransfer_v1
from google.cloud import bigquery
from email.header import Header
from email.mime.text import MIMEText
import smtplib
import datetime
import pandas as pd
TO_EMAIL = '[email protected]'
SENDER_EMAIL = '[email protected]' # gmail account
SENDER_PASSWORD = '##########' # required 2 step verification and set up password for app https://support.google.com/accounts/answer/185833
SENDER_DOMAIN = 'smtp.gmail.com'
SENDER_PORT = 465
TITLE = "GA4_run_query"
def runQuery (projectid, transferid):
try:
client = bigquery_datatransfer_v1.DataTransferServiceClient()
parent = client.project_transfer_config_path(projectid, transferid)
start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))
response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)
print(response)
# sendEmail("Success", transferid, "Info")
except Exception as e:
print(e)
# sendEmail("Error",repr(e), "Error")
#メール送信※今回は省略
def sendEmail(sub, msg, level):
try:
except Exception as e:
print(e)
#前日分のテーブルがエクスポートされているかチェック
def checkGA4Table():
try:
client = bigquery.Client(project="プロジェクトID")
query = """
SELECT COUNT(1)
FROM `プロジェクトID_プロパティID.events_*`
WHERE _TABLE_SUFFIX = FORMAT_DATE('%Y%m%d',DATE_SUB(DATE(CURRENT_TIMESTAMP, "Asia/Tokyo"),INTERVAL 1 DAY)) #前日分のテーブルを探す
"""
df = client.query(query).to_dataframe()
if df.empty:
return 0
else:
return df.iloc[0][0]
except Exception as e:
print(e)
# sendEmail("checkGA4Table",repr(e), "Error")
return None
#毎日GA4のテーブルからセッション単位の行動履歴を蓄積しているがこのテーブルがすでに前日分のデータが入っているかを確認
def checkSessionTable():
try:
client = bigquery.Client(project="プロジェクトID")
query = """
SELECT COUNT(1) AS cnt
FROM `プロジェクトID.データセットID.T_store_session` #セッションを蓄積しているテーブル
WHERE dateNZT = DATE_SUB(DATE(CURRENT_TIMESTAMP, ""Asia/Tokyo"),INTERVAL 1 DAY)
HAVING COUNT(1) > 10 #すでに前日分のデータが入っているか確認
ORDER BY 1 DESC LIMIT 1
"""
df = client.query(query).to_dataframe() # Make an API request
return df.empty
except Exception as e:
print(e)
# sendEmail("checkSessionTable",repr(e), "Error")
def main(data, context):
ga4 = checkGA4Table() #GA4から前日分がエクスポートされたかを確認
print("ga4 = {0}".format(ga4))
if not ga4:
sendEmail('main','ga4 is empty','INFO')
elif ga4 == None:
sendEmail('main','ga4 is None','INFO')
elif ga4 == 0:
sendEmail('main','ga4 == 0','INFO')
elif ga4 > 0:
str = 'ga4 is not empty! '
# sendEmail('main','ga4 is not empty! Run scheduled query','INFO')
flg = checkSessionTable() #前日分のデータがすでにセッション蓄積しているテーブルに格納済みかを確認
print("checkSessionTable = {0}".format(flg))
if flg:
runQuery(プロエクトID,スケジュールされたクエリのID) #スケジュールされたクエリを実行
# sendEmail('main',str + 'Session Table is empty! Run scheduled query','INFO')
else:
# sendEmail('main',str + 'Session Table else','INFO')
else:
# sendEmail('main','ga4 else','INFO')
sendEmail関数はsendgrid APIやGmailアカウントを使用して、メールで通知させることが可能です。(今回は省略)
記入を完了して、デプロイボタンを押し、問題なければ今後はGA4からエクスポートされるたびに実行されます。
コメント