Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ REGION=asia-northeast1 \
PROJECT_ID=your-project-id \
TRIGGER_BUCKET=your-bucket \
SERVICE_ACCOUNT_EMAIL=etl-runner@your-project-id.iam.gserviceaccount.com \
BQ_DATASET_ID=airbnb_management \
BQ_TABLE_ID=earnings_cleaned \
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/... \
./deploy.sh
```

Expand All @@ -94,7 +97,15 @@ SERVICE_ACCOUNT_EMAIL=etl-runner@your-project-id.iam.gserviceaccount.com \
2. Upload the CSV to your designated GCS bucket.
3. Eventarc sends the object-finalized event to Cloud Run.
4. The service cleans, stages, and merges the data into BigQuery.
5. Analyze your data in BigQuery, Google Sheets, or Looker Studio.
5. Receive a notification in Slack (if configured).
6. Analyze your data in BigQuery, Google Sheets, or Looker Studio.

### Slack Notifications

When `SLACK_WEBHOOK_URL` is provided, the pipeline sends a rich attachment message to your channel:

* **Success**: Shows the filename, import mode (Full Import for new tables vs. Merge Import for existing tables), and the count of inserted and updated rows.
* **Failure**: Sends an alert with the error message and filename to help you troubleshoot quickly (e.g., schema mismatches or permission issues).

## Notes

Expand Down
3 changes: 2 additions & 1 deletion cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ steps:
- --service-account=${_SERVICE_ACCOUNT_EMAIL}
- --source=.
- --no-allow-unauthenticated
- --update-env-vars=GCP_PROJECT_ID=${PROJECT_ID},BQ_DATASET_ID=${_BQ_DATASET_ID},BQ_TABLE_ID=${_BQ_TABLE_ID}
- --update-env-vars=GCP_PROJECT_ID=${PROJECT_ID},BQ_DATASET_ID=${_BQ_DATASET_ID},BQ_TABLE_ID=${_BQ_TABLE_ID},SLACK_WEBHOOK_URL=${_SLACK_WEBHOOK_URL}
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: bash
args:
Expand Down Expand Up @@ -44,3 +44,4 @@ substitutions:
_TRIGGER_NAME: "airbnb-payouts-importer-trigger"
_TRIGGER_BUCKET: "REQUIRED_SET_IN_TRIGGER_UI"
_SERVICE_ACCOUNT_EMAIL: "REQUIRED_SET_IN_TRIGGER_UI"
_SLACK_WEBHOOK_URL: ""
3 changes: 2 additions & 1 deletion deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ PROJECT_ID="${PROJECT_ID:-YOUR_PROJECT_ID}"
SERVICE_ACCOUNT_EMAIL="${SERVICE_ACCOUNT_EMAIL:-YOUR_SERVICE_ACCOUNT_EMAIL}"
BQ_DATASET_ID="${BQ_DATASET_ID:-airbnb_management}"
BQ_TABLE_ID="${BQ_TABLE_ID:-earnings_cleaned}"
SLACK_WEBHOOK_URL="${SLACK_WEBHOOK_URL:-}"

echo "Deploying Cloud Run service ${SERVICE_NAME} to ${REGION}..."

Expand All @@ -20,7 +21,7 @@ gcloud run deploy "${SERVICE_NAME}" \
--memory 512Mi \
--service-account "${SERVICE_ACCOUNT_EMAIL}" \
--no-allow-unauthenticated \
--set-env-vars "GCP_PROJECT_ID=${PROJECT_ID},BQ_DATASET_ID=${BQ_DATASET_ID},BQ_TABLE_ID=${BQ_TABLE_ID}"
--set-env-vars "GCP_PROJECT_ID=${PROJECT_ID},BQ_DATASET_ID=${BQ_DATASET_ID},BQ_TABLE_ID=${BQ_TABLE_ID},SLACK_WEBHOOK_URL=${SLACK_WEBHOOK_URL}"

echo "Creating or updating Eventarc trigger ${TRIGGER_NAME}..."

Expand Down
12 changes: 11 additions & 1 deletion lib/airbnb_payous/bigquery_gateway.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,21 @@ def load_and_merge!(rows:)
load_job.wait_until_done!
raise load_job.error if load_job.failed?

@logger.info("Loaded #{rows.length} rows to staging table.")
total_rows = load_job.output_rows
@logger.info("Loaded #{total_rows} rows to staging table.")

target_table = dataset.table(table_id)
result = { inserted_count: 0, updated_count: 0 }

if target_table.nil?
@logger.info("Target table #{qualified_table_name(table_id)} not found. Creating it for the first time.")
copy_job = @bigquery.copy_job qualified_table_name(staging_table_id), qualified_table_name(table_id), write: "truncate"
copy_job.wait_until_done!
raise copy_job.error if copy_job.failed?
@logger.info("Target table created successfully.")

result[:mode] = :create_table
result[:inserted_count] = total_rows
else
merge_sql = build_merge_query(rows.first.keys)
query_job = @bigquery.query_job(merge_sql)
Expand All @@ -83,7 +88,12 @@ def load_and_merge!(rows:)
inserted_count = query_job.respond_to?(:dml_stats) && query_job.dml_stats ? query_job.dml_stats.inserted_row_count.to_i : 0
updated_count = query_job.respond_to?(:dml_stats) && query_job.dml_stats ? query_job.dml_stats.updated_row_count.to_i : 0
@logger.info("MERGE operation completed. Rows inserted: #{inserted_count}, Rows updated: #{updated_count}.")

result[:mode] = :merge
result[:inserted_count] = inserted_count
result[:updated_count] = updated_count
end
result
ensure
temp_file&.close!
delete_staging_table(dataset)
Expand Down
13 changes: 12 additions & 1 deletion lib/airbnb_payous/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@

require_relative "bigquery_gateway"
require_relative "csv_transformer"
require_relative "slack_notifier"

module AirbnbPayous
class Processor
def initialize(
transformer: CsvTransformer.new,
gateway: nil,
notifier: SlackNotifier.new,
logger: Logger.new($stdout)
)
@transformer = transformer
@gateway = gateway || default_gateway
@notifier = notifier
@logger = logger
end

Expand All @@ -29,11 +32,19 @@ def call(event_payload)

csv_content = @gateway.download(bucket_name:, file_name:)
rows = @transformer.call(csv_content)
@gateway.load_and_merge!(rows:)
result = @gateway.load_and_merge!(rows:)

@notifier.notify_success(
file_name: file_name,
mode: result[:mode],
inserted_count: result[:inserted_count],
updated_count: result[:updated_count]
)

nil
rescue StandardError => e
@logger.error("Failed to process Airbnb CSV: #{e.message}")
@notifier.notify_failure(file_name: file_name || "unknown", error_message: e.message)
raise e
end

Expand Down
84 changes: 84 additions & 0 deletions lib/airbnb_payous/slack_notifier.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# frozen_string_literal: true

require "json"
require "net/http"
require "uri"

module AirbnbPayous
class SlackNotifier
def initialize(webhook_url: ENV["SLACK_WEBHOOK_URL"], logger: Logger.new($stdout))
@webhook_url = webhook_url
@logger = logger
end

def enabled?
!(@webhook_url.nil? || @webhook_url.empty?)
end

def notify_success(file_name:, mode:, inserted_count:, updated_count:)
return unless enabled?

mode_text = mode == :create_table ? "フルインポート(新規作成)" : "マージインポート(更新)"

payload = {
attachments: [
{
fallback: "Airbnb Payouts Import Successful: #{file_name}",
color: "#36a64f",
title: "✅ Airbnb Payouts データ登録完了",
fields: [
{ title: "ファイル名", value: file_name, short: false },
{ title: "インポートモード", value: mode_text, short: true },
{ title: "新規挿入件数", value: "#{inserted_count} 件", short: true },
{ title: "更新件数", value: "#{updated_count} 件", short: true }
],
ts: Time.now.to_i
}
]
}

send_notification(payload)
end

def notify_failure(file_name:, error_message:)
return unless enabled?

payload = {
attachments: [
{
fallback: "Airbnb Payouts Import Failed: #{file_name}",
color: "#ff0000",
title: "❌ Airbnb Payouts データ登録失敗",
text: "ファイルの処理中にエラーが発生しました。",
fields: [
{ title: "ファイル名", value: file_name, short: false },
{ title: "エラー内容", value: error_message, short: false }
],
ts: Time.now.to_i
}
]
}

send_notification(payload)
end

private

def send_notification(payload)
uri = URI.parse(@webhook_url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = (uri.scheme == "https")

request = Net::HTTP::Post.new(uri.path, { "Content-Type" => "application/json" })
request.body = JSON.generate(payload)

response = http.request(request)

unless response.is_a?(Net::HTTPSuccess)
@logger.error("Failed to send Slack notification: #{response.code} #{response.body}")
end
rescue StandardError => e
@logger.error("Error sending Slack notification: #{e.message}")
end
end
end
38 changes: 29 additions & 9 deletions test/bigquery_gateway_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ def numeric(name, mode:) = @fields << [:numeric, name, mode]
class FakeLoadJob
attr_reader :waited

def initialize
def initialize(output_rows: 1)
@waited = false
@output_rows = output_rows
end

def wait_until_done!
Expand All @@ -74,21 +75,31 @@ def wait_until_done!
def failed?
false
end

def output_rows
@output_rows
end

def error; nil; end
end

class FakeCopyJob < FakeLoadJob; end
class FakeCopyJob < FakeLoadJob
def error; nil; end
end

class FakeQueryJob < FakeLoadJob
attr_reader :sql

def initialize
super
@dml_stats = Struct.new(:inserted_row_count, :updated_row_count).new(1, 0)
def initialize(inserted: 1, updated: 0)
super()
@dml_stats = Struct.new(:inserted_row_count, :updated_row_count).new(inserted, updated)
end

def dml_stats
@dml_stats
end

def error; nil; end
end

class FakeTable
Expand All @@ -109,9 +120,10 @@ def delete
class FakeDataset
attr_reader :load_job_calls, :table_requests, :loaded_json, :schema_fields

def initialize(target_table:, staging_table:)
def initialize(target_table:, staging_table:, output_rows: 1)
@target_table = target_table
@staging_table = staging_table
@output_rows = output_rows
@load_job_calls = []
@table_requests = []
@loaded_json = []
Expand All @@ -128,7 +140,7 @@ def load_job(table_id, path, **kwargs)
end
yield updater
@schema_fields = schema.fields
FakeLoadJob.new
FakeLoadJob.new(output_rows: @output_rows)
end

def table(name)
Expand Down Expand Up @@ -214,7 +226,11 @@ def test_copies_the_staging_table_on_the_first_run
storage: @storage
)

gateway.load_and_merge!(rows: @rows)
result = gateway.load_and_merge!(rows: @rows)

assert_equal :create_table, result[:mode]
assert_equal @rows.length, result[:inserted_count]
assert_equal 0, result[:updated_count]

assert_equal 1, bigquery.copy_job_calls.length
assert_equal "project.dataset.table_staging", bigquery.copy_job_calls.first[:source]
Expand All @@ -224,7 +240,11 @@ def test_copies_the_staging_table_on_the_first_run
end

def test_merges_into_an_existing_target_table
@gateway.load_and_merge!(rows: @rows)
result = @gateway.load_and_merge!(rows: @rows)

assert_equal :merge, result[:mode]
assert_equal 1, result[:inserted_count]
assert_equal 0, result[:updated_count]

assert_equal 1, @bigquery.query_job_calls.length
assert_includes @bigquery.query_job_calls.first, "MERGE `project.dataset.table`"
Expand Down
Loading
Loading