-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathBatchLayerProcessing.txt
108 lines (72 loc) · 6.57 KB
/
BatchLayerProcessing.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
Solution to the batch layer problem with steps
1. Used sqoop to ingest card_member and member_score tables from AWS RDS to Hadoop
sqoop import --connect jdbc:mysql://<amazon rds>:3306/cred_financials_data --username <username> --password <password> --table card_member --warehouse-dir /user/ec2-user/CCFraudDetection/input/awsrdstables
sqoop import --connect jdbc:mysql://<amazon rds>:3306/cred_financials_data --username <username> --password <password> --table member_score --warehouse-dir /user/ec2-user/CCFraudDetection/input/awsrdstables
2. Copied provided card_transactions.csv to local file system and then to hdfs
hadoop fs -mkdir /user/ec2-user/CCFraudDetection
hadoop fs -mkdir /user/ec2-user/CCFraudDetection/input
hadoop fs -put card_transactions.csv /user/ec2-user/CCFraudDetection/input/
3. Used Hive- data warehousing tool in Hadoop. Created master table for card_transactions.csv and created a hive-hbase integrated table “card_transactions_hbase_master” to store card transactions history data in Hbase which is a NoSql db of Hadoop
create external table if not exists card_transactions_history_data
(card_id BIGINT, member_id BIGINT, amount DOUBLE, postcode INT, pos_id BIGINT, transaction_dt STRING, status STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/ec2-user/CCFraudDetection/input/hive/' tblproperties("skip.header.line.count"="1");
load data inpath '/user/ec2-user/CCFraudDetection/input/card_transactions.csv' overwrite
into table card_transactions_history_data;
create 'card_transactions_master', 'cardDetail', 'transactionDetail'
create external table if not exists card_transactions_hbase_master
(rowid STRING, card_id BIGINT, member_id BIGINT, amount DOUBLE, postcode INT, pos_id BIGINT, transaction_dt STRING, status STRING)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,
cardDetail:card_id, cardDetail:member_id, transactionDetail:amount, transactionDetail:postcode, transactionDetail:pos_id, transactionDetail:transaction_dt, transactionDetail:status")
tblproperties("hbase.table.name"="card_transactions_master");
insert overwrite table card_transactions_hbase_master select regexp_replace(reflect('java.util.UUID','randomUUID'), '-', '') as rowid, card_id, member_id, amount, postcode, pos_id, transaction_dt, status from card_transactions_history_data;
4. Created lookup table using hive-hbase integrated table
create 'card_transaction_lookup', 'cf'
create external table if not exists card_transactions_lookup
(card_id BIGINT, ucl DOUBLE, postcode INT, transaction_dt STRING, score INT)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,
cf:ucl, cf:postcode, cf:transaction_dt, cf:score")
tblproperties("hbase.table.name"="card_transaction_lookup");
5. Used Hive for calculating the batch layer problem i.e., to get Ucl, creditscore, poscode and transaction_dt for each card_id
Created tables for card_member and member_score in hive
create external table if not exists card_member
(card_id BIGINT, member_id BIGINT, member_joining_dt STRING, card_purchase_dt STRING, country STRING, city STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/ec2-user/CCFraudDetection/input/hive/card_member/';
create external table if not exists member_score
(member_id BIGINT, score INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/ec2-user/CCFraudDetection/input/hive/member_score/';
Creating staging tables to hold intermediate outputs
create external table if not exists card_score
(card_id BIGINT, score INT)
STORED as ORC
LOCATION '/user/ec2-user/CCFraudDetection/input/hive/card_score/'
tblproperties ("orc.compress"="SNAPPY");
create external table if not exists card_last_ten_transactions
(card_id BIGINT, amount DOUBLE, postcode INT, transaction_dt STRING, status STRING)
STORED as ORC
LOCATION '/user/ec2-user/CCFraudDetection/input/hive/card_last_ten_transactions/'
tblproperties ("orc.compress"="SNAPPY");
create external table if not exists card_ucl
(card_id BIGINT, ucl DOUBLE)
STORED as ORC
LOCATION '/user/ec2-user/CCFraudDetection/input/hive/card_ucl/'
tblproperties ("orc.compress"="SNAPPY");
create external table if not exists card_zipcode
(card_id BIGINT, postcode INT, transaction_dt STRING)
STORED as ORC
LOCATION '/user/ec2-user/CCFraudDetection/input/hive/card_zipcode/'
tblproperties ("orc.compress"="SNAPPY");
6. Load data from previously ingested card_member and member_score tables into respective hive tables
load data inpath '/user/ec2-user/CCFraudDetection/input/awsrdstables/card_member/part*' overwrite into table card_member;
load data inpath '/user/ec2-user/CCFraudDetection/input/awsrdstables/member_score/part*' overwrite into table member_score;
7. Calculate credit score for each card_id and store in staging table card_score
insert overwrite table card_score select cm.card_id, ms.score from card_member cm inner join member_score ms on cm.member_id = ms.member_id;
8. Retrieved last ten transactions for each card_id which are Genuine and store in staging table card_last_ten_transactions
insert overwrite table card_last_ten_transactions select card_id,amount, postcode, transaction_dt, status from (select card_id, member_id, amount, postcode, pos_id, transaction_dt, status, ROW_NUMBER() OVER (PARTITION BY card_id ORDER BY unix_timestamp(transaction_dt,'yyyy-MM-dd hh:mm:ss') DESC) AS ROW_NUMBER from card_transactions_hbase_master where status = 'GENUINE') master where master.ROW_NUMBER <= 10;
9. Calculated ucl value and store in staging table card_ucl
insert overwrite table card_ucl select card_id, (MovingAverage+3*StandardDeviation) as UCL from (select card_id, AVG(amount) as MovingAverage, STDDEV(amount) as StandardDeviation from card_last_ten_transactions group by card_id) temp;
10. Retrieved postcode and transaction date of last genuine transaction and store in staging table card_zipcode
insert overwrite table card_zipcode select card_id, postcode, transaction_dt from (select card_id, postcode, transaction_dt, ROW_NUMBER() OVER (PARTITION BY card_id ORDER BY unix_timestamp(transaction_dt,'yyyy-MM-dd hh:mm:ss') DESC) AS ROW_NUMBER from card_last_ten_transactions) temp where temp.ROW_NUMBER <= 1;
11. Performed Join on card_last_ten_transactions, card_ucl and card_zipcode and insert into final hive-hbase integrated lookup table card_transactions_lookup
insert overwrite table card_transactions_lookup select cs.card_id, cu.ucl, cz.postcode, cz.transaction_dt, cs.score from card_score cs inner join card_ucl cu on cs.card_id = cu.card_id inner join card_zipcode cz on cs.card_id = cz.card_id;