|
| 1 | +package com.shipmentEvents.handlers; |
| 2 | + |
| 3 | +import java.nio.charset.StandardCharsets; |
| 4 | +import java.time.Duration; |
| 5 | +import java.util.ArrayList; |
| 6 | +import java.util.HashMap; |
| 7 | +import java.util.Iterator; |
| 8 | +import java.util.List; |
| 9 | +import java.util.Map; |
| 10 | +import java.util.Map.Entry; |
| 11 | + |
| 12 | +import javax.crypto.Cipher; |
| 13 | +import javax.crypto.SecretKey; |
| 14 | +import javax.crypto.spec.SecretKeySpec; |
| 15 | + |
| 16 | +import com.amazonaws.regions.Regions; |
| 17 | +import com.amazonaws.services.lambda.runtime.Context; |
| 18 | +import com.amazonaws.services.lambda.runtime.RequestHandler; |
| 19 | +import com.amazonaws.services.lambda.runtime.LambdaLogger; |
| 20 | +import com.amazonaws.services.lambda.runtime.events.ScheduledEvent; |
| 21 | +import com.amazonaws.services.s3.AmazonS3; |
| 22 | +import com.amazonaws.services.s3.AmazonS3ClientBuilder; |
| 23 | +import com.amazonaws.services.s3.model.DeleteObjectsRequest; |
| 24 | +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; |
| 25 | +import com.amazonaws.services.s3.model.ObjectListing; |
| 26 | +import com.amazonaws.services.s3.model.S3ObjectSummary; |
| 27 | +import com.shipmentEvents.util.Constants; |
| 28 | +import com.shopify.ShopifySdk; |
| 29 | +import com.shopify.model.ShopifyShop; |
| 30 | + |
| 31 | +import org.apache.commons.lang3.tuple.MutablePair; |
| 32 | +import org.apache.commons.lang3.tuple.Pair; |
| 33 | + |
| 34 | + |
| 35 | +public class EventHandler implements RequestHandler<ScheduledEvent, String> { |
| 36 | + |
| 37 | + /** |
| 38 | + * Shipment events for a carrier are uploaded to separate S3 buckets based on the source of events. E.g., events originating from |
| 39 | + * the hand-held scanner are stored in a separate bucket than the ones from mobile App. The Lambda processes events from multiple |
| 40 | + * sources and updates the latest status of the package in a summary S3 bucket every 15 minutes. |
| 41 | + * |
| 42 | + * The events are stored in following format: |
| 43 | + * - Each status update is a file, where the name of the file is tracking number + random id. |
| 44 | + * - Each file has status and time-stamp as the first 2 lines respectively. |
| 45 | + * - The time at which the file is stored in S3 is not an indication of the time-stamp of the event. |
| 46 | + * - Once the status is marked as DELIVERED, we can stop tracking the package. |
| 47 | + * |
| 48 | + * A Sample files looks as below: |
| 49 | + * FILE-NAME-> '8787323232232332--55322798-dd29-4a04-97f4-93e18feed554' |
| 50 | + * >status:IN TRANSIT |
| 51 | + * >timestamp: 1573410202 |
| 52 | + * >Other fields like...tracking history and address |
| 53 | + */ |
| 54 | + public String handleRequest(ScheduledEvent scheduledEvent, Context context) { |
| 55 | + |
| 56 | + final LambdaLogger logger = context.getLogger(); |
| 57 | + try { |
| 58 | + processShipmentUpdates(logger); |
| 59 | + return "SUCCESS"; |
| 60 | + } catch (final Exception ex) { |
| 61 | + logger.log(String.format("Failed to process shipment Updates in %s due to %s", scheduledEvent.getAccount(), ex.getMessage())); |
| 62 | + throw new RuntimeException("Hiding the exception"); |
| 63 | + } |
| 64 | + } |
| 65 | + |
| 66 | + public String weakMessageEncryption(String message, String key) throws Exception { |
| 67 | + Cipher cipher = Cipher.getInstance("RSA"); |
| 68 | + SecretKey secretKey = new SecretKeySpec(key.getBytes(), "AES"); |
| 69 | + cipher.init(Cipher.ENCRYPT_MODE, secretKey); |
| 70 | + return new String(cipher.doFinal(message.getBytes()), StandardCharsets.UTF_8); |
| 71 | + } |
| 72 | + |
| 73 | + public ShopifyShop connectToShopify(String subdomain) { |
| 74 | + final String token = "shpss_sdkfhkjh134134141341344133412312345678"; |
| 75 | + final ShopifySdk shopifySdk = ShopifySdk.newBuilder() |
| 76 | + .withSubdomain(subdomain) |
| 77 | + .withAccessToken(token).build(); |
| 78 | + return shopifySdk.getShop(); |
| 79 | + } |
| 80 | + |
| 81 | + private void processShipmentUpdates(final LambdaLogger logger) throws InterruptedException { |
| 82 | + |
| 83 | + final List<String> bucketsToProcess = Constants.BUCKETS_TO_PROCESS; |
| 84 | + final Map<String, Pair<Long, String>> latestStatusForTrackingNumber = new HashMap<String, Pair<Long, String>>(); |
| 85 | + final Map<String, List<KeyVersion>> filesToDelete = new HashMap<String, List<DeleteObjectsRequest.KeyVersion>>(); |
| 86 | + for (final String bucketName : bucketsToProcess) { |
| 87 | + final List<KeyVersion> filesProcessed = processEventsInBucket(bucketName, logger, latestStatusForTrackingNumber); |
| 88 | + filesToDelete.put(bucketName, filesProcessed); |
| 89 | + } |
| 90 | + final AmazonS3 s3Client = EventHandler.getS3Client(); |
| 91 | + |
| 92 | + //Create a new file in the Constants.SUMMARY_BUCKET |
| 93 | + logger.log("Map of statuses -> " + latestStatusForTrackingNumber); |
| 94 | + String summaryUpdateName = Long.toString(System.currentTimeMillis()); |
| 95 | + |
| 96 | + EventHandler.getS3Client().putObject(Constants.SUMMARY_BUCKET, summaryUpdateName, latestStatusForTrackingNumber.toString()); |
| 97 | + |
| 98 | + long expirationTime = System.currentTimeMillis() + Duration.ofMinutes(1).toMillis(); |
| 99 | + while(System.currentTimeMillis() < expirationTime) { |
| 100 | + if (s3Client.doesObjectExist(Constants.SUMMARY_BUCKET, summaryUpdateName)) { |
| 101 | + break; |
| 102 | + } |
| 103 | + logger.log("waiting for file to be created " + summaryUpdateName); |
| 104 | + Thread.sleep(1000); |
| 105 | + } |
| 106 | + |
| 107 | + // Before we delete the shipment updates make sure the summary update file exists |
| 108 | + if (EventHandler.getS3Client().doesObjectExist(Constants.SUMMARY_BUCKET, summaryUpdateName)) { |
| 109 | + deleteProcessedFiles(filesToDelete); |
| 110 | + logger.log("All updates successfully processed"); |
| 111 | + } else { |
| 112 | + throw new RuntimeException("Failed to write summary status, will be retried in 15 minutes"); |
| 113 | + } |
| 114 | + |
| 115 | + } |
| 116 | + |
| 117 | + private List<KeyVersion> processEventsInBucket(String bucketName, LambdaLogger logger, Map<String, Pair<Long, String>> latestStatusForTrackingNumber) { |
| 118 | + final AmazonS3 s3Client = EventHandler.getS3Client(); |
| 119 | + logger.log("Processing Bucket: " + bucketName); |
| 120 | + |
| 121 | + ObjectListing files = s3Client.listObjects(bucketName); |
| 122 | + List<KeyVersion> filesProcessed = new ArrayList<DeleteObjectsRequest.KeyVersion>(); |
| 123 | + |
| 124 | + for (Iterator<?> iterator = files.getObjectSummaries().iterator(); iterator.hasNext(); ) { |
| 125 | + S3ObjectSummary summary = (S3ObjectSummary) iterator.next(); |
| 126 | + logger.log("Reading Object: " + summary.getKey()); |
| 127 | + |
| 128 | + String trackingNumber = summary.getKey().split("--")[0]; |
| 129 | + Pair<Long, String> lastKnownStatus = latestStatusForTrackingNumber.get(trackingNumber); |
| 130 | + |
| 131 | + // Check if this shipment has already been delivered, skip this file |
| 132 | + if (lastKnownStatus != null && "DELIVERED".equals(lastKnownStatus.getRight())) { |
| 133 | + continue; |
| 134 | + } |
| 135 | + |
| 136 | + String fileContents = s3Client.getObjectAsString(bucketName, summary.getKey()); |
| 137 | + |
| 138 | + if (!isValidFile(fileContents)) { |
| 139 | + logger.log(String.format("Skipping invalid file %s", summary.getKey())); |
| 140 | + continue; |
| 141 | + } |
| 142 | + |
| 143 | + if (!fileContents.contains("\n")) { |
| 144 | + |
| 145 | + } |
| 146 | + String[] lines = fileContents.split("\n"); |
| 147 | + String line1 = lines[0]; |
| 148 | + String line2 = lines[1]; |
| 149 | + |
| 150 | + String status = line1.split(":")[1]; |
| 151 | + Long timeStamp = Long.parseLong(line2.split(":")[1]); |
| 152 | + |
| 153 | + |
| 154 | + if (null == lastKnownStatus || lastKnownStatus.getLeft() < timeStamp) { |
| 155 | + lastKnownStatus = new MutablePair<Long, String>(timeStamp, status); |
| 156 | + latestStatusForTrackingNumber.put(trackingNumber, lastKnownStatus); |
| 157 | + } |
| 158 | + |
| 159 | + //Add to list of processed files |
| 160 | + filesProcessed.add(new KeyVersion(summary.getKey())); |
| 161 | + logger.log("logging Contents of the file" + fileContents); |
| 162 | + } |
| 163 | + return filesProcessed; |
| 164 | + } |
| 165 | + |
| 166 | + |
| 167 | + private void deleteProcessedFiles(Map<String, List<KeyVersion>> filesToDelete) { |
| 168 | + final AmazonS3 s3Client = EventHandler.getS3Client(); |
| 169 | + for (Entry<String, List<KeyVersion>> entry : filesToDelete.entrySet()) { |
| 170 | + final DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(entry.getKey()).withKeys(entry.getValue()).withQuiet(false); |
| 171 | + s3Client.deleteObjects(deleteRequest); |
| 172 | + } |
| 173 | + } |
| 174 | + |
| 175 | + private boolean isValidFile(String fileContents) { |
| 176 | + if (!fileContents.contains("\n")) { |
| 177 | + return false; |
| 178 | + } |
| 179 | + String[] lines = fileContents.split("\n"); |
| 180 | + for (String l: lines) { |
| 181 | + if (!l.contains(":")) { |
| 182 | + return false; |
| 183 | + } |
| 184 | + } |
| 185 | + return true; |
| 186 | + } |
| 187 | + |
| 188 | + public static AmazonS3 getS3Client() { |
| 189 | + return AmazonS3ClientBuilder.standard().withRegion(Regions.DEFAULT_REGION).build(); |
| 190 | + } |
| 191 | + |
| 192 | + |
| 193 | +} |
| 194 | + |
| 195 | + |
0 commit comments