-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathStreamingLayerProcessingAndFinalLogic.txt
77 lines (56 loc) · 5.37 KB
/
StreamingLayerProcessingAndFinalLogic.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
Below are the step by step approach used to solve streaming problem
• Created a ccfrauddetectionsystem java project, and main driver class CCFraudDetectionSystem that accepts command line arguments for brokerId, topic, groupid, zipcodecsvpath, hbasemaster . If number of command line arguments given is 5, it throws an error without processing.
• Created a Spark configuration and JavaStreamingContext with 1 second batch size
SparkConf conf = new SparkConf().setAppName("CCFraudDetectionSystem");
JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(1));
• Created kafkaparams hashmap with configurations based on command line arguments provided
Set<String> topicSet = new HashSet<String>(Arrays.asList(topic));
Map<String,Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class)
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
• Using kafka-Sparkstreaming integration, subscribed to kafka topic using kafka params, thereby getting DStreams
JavaInputDStream<ConsumerRecord<String, JsonNode>> messages = KafkaUtils.createDirectStream(context, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicSet, kafkaParams));
• Performing map transformation on messages DStreams , mapping data of json format into TransactionData object using ObjectMapper. (TransactionData is a class containing setter and getter properties for fields cardId, memberId, amount, posId, postcode, transaction_dt)
JavaDStream<TransactionData> transactionData = messages.map(new Function<ConsumerRecord<String, JsonNode>, TransactionData>() {
private static final long serialVersionUID = 1L;
@Override
public TransactionData call(ConsumerRecord<String, JsonNode> record) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.treeToValue(record.value(), TransactionData.class);
}
});
• Performing foreachRDD on transactionData DStream i.e., foreach rdd on DStream performing
mapToPair transformation on rdd such that it returns transactionData and status whether it is genuine or fraud. mapToPair transformation has below operations which determines whether transaction is genuine or fraud
a. Calling HBaseClient.getTransactionLookupRecord() method to retrieve lookup record corresponding to card_id from look up table in HBase i.e., card_transaction_lookup . This record has UCL, postcode, transaction_dt, score.
TransactionLookupRecord record = HbaseClient.getTransactionLookupRecord(transactionData, hBaseMaster);
b. Used DistanceUtility class to get the zipcodedistance by loading zipcodeposid csv provided and passing current transaction postcode and lookup record(last transaction) postcode
double zipcodeDistance = DistanceUtility.GetInstance(zipCodePosIdCsvPath) .getDistanceViaZipCode(transactionData.getPostcode().toString(), record.getPostcode().toString());
c. Calculate speed per second by using formula as zipcodeDistance/(currenttransactiondatetime-lasttransactiondatetime) .
SimpleDateFormat format = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");
Date previousTransactionDate = format.parse(record.getTransactionDate());
Date currentTransactionDate = format.parse(transactionData.getTransactionDate());
long diffSeconds = (currentTransactionDate.getTime() - previousTransactionDate.getTime()) / 1000; // difference in seconds
double speed = zipcodeDistance/diffSeconds;
d. Determine whether the current transaction is genuine or fraud based on below parameters
If score < 200 or amount > ucl or speed > 0.25(based on the assumption that 1 KM/4 sec), then the transaction is marked as FRAUD otherwise GENUINE.
String status = "GENUINE";
if(record.getScore() < 200 || transactionData.getAmount() > record.getUcl() || speed > 0.25)
{
status = "FRAUD";
}
e. Calling HbaseClient.InsertTransactionIntoDB() method with status(GENUINE OR FRAUD). This method has logic to build current transaction row and insert into hbase card_transaction_master table and also if the current transaction is GENUINE, then update postcode and transaction_dt in card_transaction_lookup table for corresponding card_id
HbaseClient.InsertTransactionIntoDB(transactionData, hBaseMaster, status);
f. Performed collect() operation on transactiondata pairedRdd and printed transaction with corresponding status
transactionDataWithStatus.collect().forEach(t ->
{
System.out.println(t._1.toString() + " - " + t._2);
});
To run jar file in ec2-instance
• Copied zipCodePosId.csv to /home/ec2-user/ folder and added necessary permissions to ec2-user folder as below
chmod o+x /home/ec2-user/
• Run below spark command
spark2-submit --class com.pack.CCFraudDetectionSystem --master yarn --deploy-mode client --name CCFraudDetectionApp --conf "spark.app.id=CCFraudDetectionApp spark.driver.memory=12g spark.executor.memory=12g spark.executor.instances=2" ccfrauddetectionsystem-0.0.1-SNAPSHOT-jar-with-dependencies.jar <broker> <topic> <groupid> <zipcodecsvpath> <hbasemaster> &> output.txt