diff --git a/README.md b/README.md index 4aa6eeb3a..8eaa16567 100644 --- a/README.md +++ b/README.md @@ -14,22 +14,23 @@ cleansing, transformation, and filtering using a set of data manipulation instru (directives). These instructions are either generated using an interative visual tool or are manually created. - * Data Prep defines few concepts that might be useful if you are just getting started with it. Learn about them [here](wrangler-docs/concepts.md) - * The Data Prep Transform is [separately documented](wrangler-transform/wrangler-docs/data-prep-transform.md). - * [Data Prep Cheatsheet](wrangler-docs/cheatsheet.md) +- Data Prep defines few concepts that might be useful if you are just getting started with it. Learn about them [here](wrangler-docs/concepts.md) +- The Data Prep Transform is [separately documented](wrangler-transform/wrangler-docs/data-prep-transform.md). +- [Data Prep Cheatsheet](wrangler-docs/cheatsheet.md) ## New Features More [here](wrangler-docs/upcoming-features.md) on upcoming features. - * **User Defined Directives, also known as UDD**, allow you to create custom functions to transform records within CDAP DataPrep or a.k.a Wrangler. CDAP comes with a comprehensive library of functions. There are however some omissions, and some specific cases for which UDDs are the solution. Additional information on how you can build your custom directives [here](wrangler-docs/custom-directive.md). - * Migrating directives from version 1.0 to version 2.0 [here](wrangler-docs/directive-migration.md) - * Information about Grammar [here](wrangler-docs/grammar/grammar-info.md) - * Various `TokenType` supported by system [here](../api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java) - * Custom Directive Implementation Internals [here](wrangler-docs/udd-internal.md) +- **User Defined Directives, also known as UDD**, allow you to create custom functions to transform records within CDAP DataPrep or a.k.a Wrangler. CDAP comes with a comprehensive library of functions. There are however some omissions, and some specific cases for which UDDs are the solution. Additional information on how you can build your custom directives [here](wrangler-docs/custom-directive.md). - * A new capability that allows CDAP Administrators to **restrict the directives** that are accessible to their users. -More information on configuring can be found [here](wrangler-docs/exclusion-and-aliasing.md) + - Migrating directives from version 1.0 to version 2.0 [here](wrangler-docs/directive-migration.md) + - Information about Grammar [here](wrangler-docs/grammar/grammar-info.md) + - Various `TokenType` supported by system [here](../api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java) + - Custom Directive Implementation Internals [here](wrangler-docs/udd-internal.md) + +- A new capability that allows CDAP Administrators to **restrict the directives** that are accessible to their users. + More information on configuring can be found [here](wrangler-docs/exclusion-and-aliasing.md) ## Demo Videos and Recipes @@ -37,144 +38,265 @@ Videos and Screencasts are best way to learn, so we have compiled simple, short ### Videos - * [SCREENCAST] [Creating Lookup Dataset and Joining](https://www.youtube.com/watch?v=Nc1b0rsELHQ) - * [SCREENCAST] [Restricted Directives](https://www.youtube.com/watch?v=71EcMQU714U) - * [SCREENCAST] [Parse Excel files in CDAP](https://www.youtube.com/watch?v=su5L1noGlEk) - * [SCREENCAST] [Parse File As AVRO File](https://www.youtube.com/watch?v=tmwAw4dKUNc) - * [SCREENCAST] [Parsing Binary Coded AVRO Messages](https://www.youtube.com/watch?v=Ix_lPo-PDJY) - * [SCREENCAST] [Parsing Binary Coded AVRO Messages & Protobuf messages using schema registry](https://www.youtube.com/watch?v=LVLIdWnUX1k) - * [SCREENCAST] [Quantize a column - Digitize](https://www.youtube.com/watch?v=VczkYX5SRtY) - * [SCREENCAST] [Data Cleansing capability with send-to-error directive](https://www.youtube.com/watch?v=aZd5H8hIjDc) - * [SCREENCAST] [Building Data Prep from the GitHub source](https://youtu.be/pGGjKU04Y38) - * [VOICE-OVER] [End-to-End Demo Video](https://youtu.be/AnhF0qRmn24) - * [SCREENCAST] [Ingesting into Kudu](https://www.youtube.com/watch?v=KBW7a38vlUM) - * [SCREENCAST] [Realtime HL7 CCDA XML from Kafka into Time Parititioned Parquet](https://youtu.be/0fqNmnOnD-0) - * [SCREENCAST] [Parsing JSON file](https://youtu.be/vwnctcGDflE) - * [SCREENCAST] [Flattening arrays](https://youtu.be/SemHxgBYIsY) - * [SCREENCAST] [Data cleansing with send-to-error directive](https://www.youtube.com/watch?v=aZd5H8hIjDc) - * [SCREENCAST] [Publishing to Kafka](https://www.youtube.com/watch?v=xdc8pvvlI48) - * [SCREENCAST] [Fixed length to JSON](https://www.youtube.com/watch?v=3AXu4m1swuM) +- [SCREENCAST] [Creating Lookup Dataset and Joining](https://www.youtube.com/watch?v=Nc1b0rsELHQ) +- [SCREENCAST] [Restricted Directives](https://www.youtube.com/watch?v=71EcMQU714U) +- [SCREENCAST] [Parse Excel files in CDAP](https://www.youtube.com/watch?v=su5L1noGlEk) +- [SCREENCAST] [Parse File As AVRO File](https://www.youtube.com/watch?v=tmwAw4dKUNc) +- [SCREENCAST] [Parsing Binary Coded AVRO Messages](https://www.youtube.com/watch?v=Ix_lPo-PDJY) +- [SCREENCAST] [Parsing Binary Coded AVRO Messages & Protobuf messages using schema registry](https://www.youtube.com/watch?v=LVLIdWnUX1k) +- [SCREENCAST] [Quantize a column - Digitize](https://www.youtube.com/watch?v=VczkYX5SRtY) +- [SCREENCAST] [Data Cleansing capability with send-to-error directive](https://www.youtube.com/watch?v=aZd5H8hIjDc) +- [SCREENCAST] [Building Data Prep from the GitHub source](https://youtu.be/pGGjKU04Y38) +- [VOICE-OVER] [End-to-End Demo Video](https://youtu.be/AnhF0qRmn24) +- [SCREENCAST] [Ingesting into Kudu](https://www.youtube.com/watch?v=KBW7a38vlUM) +- [SCREENCAST] [Realtime HL7 CCDA XML from Kafka into Time Parititioned Parquet](https://youtu.be/0fqNmnOnD-0) +- [SCREENCAST] [Parsing JSON file](https://youtu.be/vwnctcGDflE) +- [SCREENCAST] [Flattening arrays](https://youtu.be/SemHxgBYIsY) +- [SCREENCAST] [Data cleansing with send-to-error directive](https://www.youtube.com/watch?v=aZd5H8hIjDc) +- [SCREENCAST] [Publishing to Kafka](https://www.youtube.com/watch?v=xdc8pvvlI48) +- [SCREENCAST] [Fixed length to JSON](https://www.youtube.com/watch?v=3AXu4m1swuM) ### Recipes - * [Parsing Apache Log Files](wrangler-demos/parsing-apache-log-files.md) - * [Parsing CSV Files and Extracting Column Values](wrangler-demos/parsing-csv-extracting-column-values.md) - * [Parsing HL7 CCDA XML Files](wrangler-demos/parsing-hl7-ccda-xml-files.md) +- [Parsing Apache Log Files](wrangler-demos/parsing-apache-log-files.md) +- [Parsing CSV Files and Extracting Column Values](wrangler-demos/parsing-csv-extracting-column-values.md) +- [Parsing HL7 CCDA XML Files](wrangler-demos/parsing-hl7-ccda-xml-files.md) ## Available Directives These directives are currently available: -| Directive | Description | -| ---------------------------------------------------------------------- | ---------------------------------------------------------------- | -| **Parsers** | | -| [JSON Path](wrangler-docs/directives/json-path.md) | Uses a DSL (a JSON path expression) for parsing JSON records | -| [Parse as AVRO](wrangler-docs/directives/parse-as-avro.md) | Parsing an AVRO encoded message - either as binary or json | -| [Parse as AVRO File](wrangler-docs/directives/parse-as-avro-file.md) | Parsing an AVRO data file | -| [Parse as CSV](wrangler-docs/directives/parse-as-csv.md) | Parsing an input record as comma-separated values | -| [Parse as Date](wrangler-docs/directives/parse-as-date.md) | Parsing dates using natural language processing | -| [Parse as Excel](wrangler-docs/directives/parse-as-excel.md) | Parsing excel file. | -| [Parse as Fixed Length](wrangler-docs/directives/parse-as-fixed-length.md) | Parses as a fixed length record with specified widths | -| [Parse as HL7](wrangler-docs/directives/parse-as-hl7.md) | Parsing Health Level 7 Version 2 (HL7 V2) messages | -| [Parse as JSON](wrangler-docs/directives/parse-as-json.md) | Parsing a JSON object | -| [Parse as Log](wrangler-docs/directives/parse-as-log.md) | Parses access log files as from Apache HTTPD and nginx servers | -| [Parse as Protobuf](wrangler-docs/directives/parse-as-log.md) | Parses an Protobuf encoded in-memory message using descriptor | -| [Parse as Simple Date](wrangler-docs/directives/parse-as-simple-date.md) | Parses date strings | -| [Parse XML To JSON](wrangler-docs/directives/parse-xml-to-json.md) | Parses an XML document into a JSON structure | -| [Parse as Currency](wrangler-docs/directives/parse-as-currency.md) | Parses a string representation of currency into a number. | -| [Parse as Datetime](wrangler-docs/directives/parse-as-datetime.md) | Parses strings with datetime values to CDAP datetime type | -| **Output Formatters** | | -| [Write as CSV](wrangler-docs/directives/write-as-csv.md) | Converts a record into CSV format | -| [Write as JSON](wrangler-docs/directives/write-as-json-map.md) | Converts the record into a JSON map | -| [Write JSON Object](wrangler-docs/directives/write-as-json-object.md) | Composes a JSON object based on the fields specified. | -| [Format as Currency](wrangler-docs/directives/format-as-currency.md) | Formats a number as currency as specified by locale. | -| **Transformations** | | -| [Changing Case](wrangler-docs/directives/changing-case.md) | Changes the case of column values | -| [Cut Character](wrangler-docs/directives/cut-character.md) | Selects parts of a string value | -| [Set Column](wrangler-docs/directives/set-column.md) | Sets the column value to the result of an expression execution | -| [Find and Replace](wrangler-docs/directives/find-and-replace.md) | Transforms string column values using a "sed"-like expression | -| [Index Split](wrangler-docs/directives/index-split.md) | (_Deprecated_) | -| [Invoke HTTP](wrangler-docs/directives/invoke-http.md) | Invokes an HTTP Service (_Experimental_, potentially slow) | -| [Quantization](wrangler-docs/directives/quantize.md) | Quantizes a column based on specified ranges | -| [Regex Group Extractor](wrangler-docs/directives/extract-regex-groups.md) | Extracts the data from a regex group into its own column | -| [Setting Character Set](wrangler-docs/directives/set-charset.md) | Sets the encoding and then converts the data to a UTF-8 String | -| [Setting Record Delimiter](wrangler-docs/directives/set-record-delim.md) | Sets the record delimiter | -| [Split by Separator](wrangler-docs/directives/split-by-separator.md) | Splits a column based on a separator into two columns | -| [Split Email Address](wrangler-docs/directives/split-email.md) | Splits an email ID into an account and its domain | -| [Split URL](wrangler-docs/directives/split-url.md) | Splits a URL into its constituents | -| [Text Distance (Fuzzy String Match)](wrangler-docs/directives/text-distance.md) | Measures the difference between two sequences of characters | -| [Text Metric (Fuzzy String Match)](wrangler-docs/directives/text-metric.md) | Measures the difference between two sequences of characters | -| [URL Decode](wrangler-docs/directives/url-decode.md) | Decodes from the `application/x-www-form-urlencoded` MIME format | -| [URL Encode](wrangler-docs/directives/url-encode.md) | Encodes to the `application/x-www-form-urlencoded` MIME format | -| [Trim](wrangler-docs/directives/trim.md) | Functions for trimming white spaces around string data | -| **Encoders and Decoders** | | -| [Decode](wrangler-docs/directives/decode.md) | Decodes a column value as one of `base32`, `base64`, or `hex` | -| [Encode](wrangler-docs/directives/encode.md) | Encodes a column value as one of `base32`, `base64`, or `hex` | -| **Unique ID** | | -| [UUID Generation](wrangler-docs/directives/generate-uuid.md) | Generates a universally unique identifier (UUID) .Recommended to use with Wrangler version 4.4.0 and above due to an important bug fix [CDAP-17732](https://cdap.atlassian.net/browse/CDAP-17732) | -| **Date Transformations** | | -| [Diff Date](wrangler-docs/directives/diff-date.md) | Calculates the difference between two dates | -| [Format Date](wrangler-docs/directives/format-date.md) | Custom patterns for date-time formatting | -| [Format Unix Timestamp](wrangler-docs/directives/format-unix-timestamp.md) | Formats a UNIX timestamp as a date | -| **DateTime Transformations** | | -| [Current DateTime](wrangler-docs/directives/current-datetime.md) | Generates the current datetime using the given zone or UTC by default| -| [Datetime To Timestamp](wrangler-docs/directives/datetime-to-timestamp.md) | Converts a datetime value to timestamp with the given zone | -| [Format Datetime](wrangler-docs/directives/format-datetime.md) | Formats a datetime value to custom date time pattern strings | -| [Timestamp To Datetime](wrangler-docs/directives/timestamp-to-datetime.md) | Converts a timestamp value to datetime | -| **Lookups** | | -| [Catalog Lookup](wrangler-docs/directives/catalog-lookup.md) | Static catalog lookup of ICD-9, ICD-10-2016, ICD-10-2017 codes | -| [Table Lookup](wrangler-docs/directives/table-lookup.md) | Performs lookups into Table datasets | -| **Hashing & Masking** | | -| [Message Digest or Hash](wrangler-docs/directives/hash.md) | Generates a message digest | -| [Mask Number](wrangler-docs/directives/mask-number.md) | Applies substitution masking on the column values | -| [Mask Shuffle](wrangler-docs/directives/mask-shuffle.md) | Applies shuffle masking on the column values | -| **Row Operations** | | -| [Filter Row if Matched](wrangler-docs/directives/filter-row-if-matched.md) | Filters rows that match a pattern for a column | -| [Filter Row if True](wrangler-docs/directives/filter-row-if-true.md) | Filters rows if the condition is true. | -| [Filter Row Empty of Null](wrangler-docs/directives/filter-empty-or-null.md) | Filters rows that are empty of null. | -| [Flatten](wrangler-docs/directives/flatten.md) | Separates the elements in a repeated field | -| [Fail on condition](wrangler-docs/directives/fail.md) | Fails processing when the condition is evaluated to true. | -| [Send to Error](wrangler-docs/directives/send-to-error.md) | Filtering of records to an error collector | -| [Send to Error And Continue](wrangler-docs/directives/send-to-error-and-continue.md) | Filtering of records to an error collector and continues processing | -| [Split to Rows](wrangler-docs/directives/split-to-rows.md) | Splits based on a separator into multiple records | -| **Column Operations** | | -| [Change Column Case](wrangler-docs/directives/change-column-case.md) | Changes column names to either lowercase or uppercase | -| [Changing Case](wrangler-docs/directives/changing-case.md) | Change the case of column values | -| [Cleanse Column Names](wrangler-docs/directives/cleanse-column-names.md) | Sanatizes column names, following specific rules | -| [Columns Replace](wrangler-docs/directives/columns-replace.md) | Alters column names in bulk | -| [Copy](wrangler-docs/directives/copy.md) | Copies values from a source column into a destination column | -| [Drop Column](wrangler-docs/directives/drop.md) | Drops a column in a record | -| [Fill Null or Empty Columns](wrangler-docs/directives/fill-null-or-empty.md) | Fills column value with a fixed value if null or empty | -| [Keep Columns](wrangler-docs/directives/keep.md) | Keeps specified columns from the record | -| [Merge Columns](wrangler-docs/directives/merge.md) | Merges two columns by inserting a third column | -| [Rename Column](wrangler-docs/directives/rename.md) | Renames an existing column in the record | -| [Set Column Header](wrangler-docs/directives/set-headers.md) | Sets the names of columns, in the order they are specified | -| [Split to Columns](wrangler-docs/directives/split-to-columns.md) | Splits a column based on a separator into multiple columns | -| [Swap Columns](wrangler-docs/directives/swap.md) | Swaps column names of two columns | -| [Set Column Data Type](wrangler-docs/directives/set-type.md) | Convert data type of a column | -| **NLP** | | -| [Stemming Tokenized Words](wrangler-docs/directives/stemming.md) | Applies the Porter stemmer algorithm for English words | -| **Transient Aggregators & Setters** | | -| [Increment Variable](wrangler-docs/directives/increment-variable.md) | Increments a transient variable with a record of processing. | -| [Set Variable](wrangler-docs/directives/set-variable.md) | Sets a transient variable with a record of processing. | -| **Functions** | | -| [Data Quality](wrangler-docs/functions/dq-functions.md) | Data quality check functions. Checks for date, time, etc. | -| [Date Manipulations](wrangler-docs/functions/date-functions.md) | Functions that can manipulate date | -| [DDL](wrangler-docs/functions/ddl-functions.md) | Functions that can manipulate definition of data | -| [JSON](wrangler-docs/functions/json-functions.md) | Functions that can be useful in transforming your data | -| [Types](wrangler-docs/functions/type-functions.md) | Functions for detecting the type of data | +| Directive | Description | +| ------------------------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| **Parsers** | | +| [JSON Path](wrangler-docs/directives/json-path.md) | Uses a DSL (a JSON path expression) for parsing JSON records | +| [Parse as AVRO](wrangler-docs/directives/parse-as-avro.md) | Parsing an AVRO encoded message - either as binary or json | +| [Parse as AVRO File](wrangler-docs/directives/parse-as-avro-file.md) | Parsing an AVRO data file | +| [Parse as CSV](wrangler-docs/directives/parse-as-csv.md) | Parsing an input record as comma-separated values | +| [Parse as Date](wrangler-docs/directives/parse-as-date.md) | Parsing dates using natural language processing | +| [Parse as Excel](wrangler-docs/directives/parse-as-excel.md) | Parsing excel file. | +| [Parse as Fixed Length](wrangler-docs/directives/parse-as-fixed-length.md) | Parses as a fixed length record with specified widths | +| [Parse as HL7](wrangler-docs/directives/parse-as-hl7.md) | Parsing Health Level 7 Version 2 (HL7 V2) messages | +| [Parse as JSON](wrangler-docs/directives/parse-as-json.md) | Parsing a JSON object | +| [Parse as Log](wrangler-docs/directives/parse-as-log.md) | Parses access log files as from Apache HTTPD and nginx servers | +| [Parse as Protobuf](wrangler-docs/directives/parse-as-log.md) | Parses an Protobuf encoded in-memory message using descriptor | +| [Parse as Simple Date](wrangler-docs/directives/parse-as-simple-date.md) | Parses date strings | +| [Parse XML To JSON](wrangler-docs/directives/parse-xml-to-json.md) | Parses an XML document into a JSON structure | +| [Parse as Currency](wrangler-docs/directives/parse-as-currency.md) | Parses a string representation of currency into a number. | +| [Parse as Datetime](wrangler-docs/directives/parse-as-datetime.md) | Parses strings with datetime values to CDAP datetime type | +| **Output Formatters** | | +| [Write as CSV](wrangler-docs/directives/write-as-csv.md) | Converts a record into CSV format | +| [Write as JSON](wrangler-docs/directives/write-as-json-map.md) | Converts the record into a JSON map | +| [Write JSON Object](wrangler-docs/directives/write-as-json-object.md) | Composes a JSON object based on the fields specified. | +| [Format as Currency](wrangler-docs/directives/format-as-currency.md) | Formats a number as currency as specified by locale. | +| **Transformations** | | +| [Changing Case](wrangler-docs/directives/changing-case.md) | Changes the case of column values | +| [Cut Character](wrangler-docs/directives/cut-character.md) | Selects parts of a string value | +| [Set Column](wrangler-docs/directives/set-column.md) | Sets the column value to the result of an expression execution | +| [Find and Replace](wrangler-docs/directives/find-and-replace.md) | Transforms string column values using a "sed"-like expression | +| [Index Split](wrangler-docs/directives/index-split.md) | (_Deprecated_) | +| [Invoke HTTP](wrangler-docs/directives/invoke-http.md) | Invokes an HTTP Service (_Experimental_, potentially slow) | +| [Quantization](wrangler-docs/directives/quantize.md) | Quantizes a column based on specified ranges | +| [Regex Group Extractor](wrangler-docs/directives/extract-regex-groups.md) | Extracts the data from a regex group into its own column | +| [Setting Character Set](wrangler-docs/directives/set-charset.md) | Sets the encoding and then converts the data to a UTF-8 String | +| [Setting Record Delimiter](wrangler-docs/directives/set-record-delim.md) | Sets the record delimiter | +| [Split by Separator](wrangler-docs/directives/split-by-separator.md) | Splits a column based on a separator into two columns | +| [Split Email Address](wrangler-docs/directives/split-email.md) | Splits an email ID into an account and its domain | +| [Split URL](wrangler-docs/directives/split-url.md) | Splits a URL into its constituents | +| [Text Distance (Fuzzy String Match)](wrangler-docs/directives/text-distance.md) | Measures the difference between two sequences of characters | +| [Text Metric (Fuzzy String Match)](wrangler-docs/directives/text-metric.md) | Measures the difference between two sequences of characters | +| [URL Decode](wrangler-docs/directives/url-decode.md) | Decodes from the `application/x-www-form-urlencoded` MIME format | +| [URL Encode](wrangler-docs/directives/url-encode.md) | Encodes to the `application/x-www-form-urlencoded` MIME format | +| [Trim](wrangler-docs/directives/trim.md) | Functions for trimming white spaces around string data | +| **Encoders and Decoders** | | +| [Decode](wrangler-docs/directives/decode.md) | Decodes a column value as one of `base32`, `base64`, or `hex` | +| [Encode](wrangler-docs/directives/encode.md) | Encodes a column value as one of `base32`, `base64`, or `hex` | +| **Unique ID** | | +| [UUID Generation](wrangler-docs/directives/generate-uuid.md) | Generates a universally unique identifier (UUID) .Recommended to use with Wrangler version 4.4.0 and above due to an important bug fix [CDAP-17732](https://cdap.atlassian.net/browse/CDAP-17732) | +| **Date Transformations** | | +| [Diff Date](wrangler-docs/directives/diff-date.md) | Calculates the difference between two dates | +| [Format Date](wrangler-docs/directives/format-date.md) | Custom patterns for date-time formatting | +| [Format Unix Timestamp](wrangler-docs/directives/format-unix-timestamp.md) | Formats a UNIX timestamp as a date | +| **DateTime Transformations** | | +| [Current DateTime](wrangler-docs/directives/current-datetime.md) | Generates the current datetime using the given zone or UTC by default | +| [Datetime To Timestamp](wrangler-docs/directives/datetime-to-timestamp.md) | Converts a datetime value to timestamp with the given zone | +| [Format Datetime](wrangler-docs/directives/format-datetime.md) | Formats a datetime value to custom date time pattern strings | +| [Timestamp To Datetime](wrangler-docs/directives/timestamp-to-datetime.md) | Converts a timestamp value to datetime | +| **Lookups** | | +| [Catalog Lookup](wrangler-docs/directives/catalog-lookup.md) | Static catalog lookup of ICD-9, ICD-10-2016, ICD-10-2017 codes | +| [Table Lookup](wrangler-docs/directives/table-lookup.md) | Performs lookups into Table datasets | +| **Hashing & Masking** | | +| [Message Digest or Hash](wrangler-docs/directives/hash.md) | Generates a message digest | +| [Mask Number](wrangler-docs/directives/mask-number.md) | Applies substitution masking on the column values | +| [Mask Shuffle](wrangler-docs/directives/mask-shuffle.md) | Applies shuffle masking on the column values | +| **Row Operations** | | +| [Filter Row if Matched](wrangler-docs/directives/filter-row-if-matched.md) | Filters rows that match a pattern for a column | +| [Filter Row if True](wrangler-docs/directives/filter-row-if-true.md) | Filters rows if the condition is true. | +| [Filter Row Empty of Null](wrangler-docs/directives/filter-empty-or-null.md) | Filters rows that are empty of null. | +| [Flatten](wrangler-docs/directives/flatten.md) | Separates the elements in a repeated field | +| [Fail on condition](wrangler-docs/directives/fail.md) | Fails processing when the condition is evaluated to true. | +| [Send to Error](wrangler-docs/directives/send-to-error.md) | Filtering of records to an error collector | +| [Send to Error And Continue](wrangler-docs/directives/send-to-error-and-continue.md) | Filtering of records to an error collector and continues processing | +| [Split to Rows](wrangler-docs/directives/split-to-rows.md) | Splits based on a separator into multiple records | +| **Column Operations** | | +| [Change Column Case](wrangler-docs/directives/change-column-case.md) | Changes column names to either lowercase or uppercase | +| [Changing Case](wrangler-docs/directives/changing-case.md) | Change the case of column values | +| [Cleanse Column Names](wrangler-docs/directives/cleanse-column-names.md) | Sanatizes column names, following specific rules | +| [Columns Replace](wrangler-docs/directives/columns-replace.md) | Alters column names in bulk | +| [Copy](wrangler-docs/directives/copy.md) | Copies values from a source column into a destination column | +| [Drop Column](wrangler-docs/directives/drop.md) | Drops a column in a record | +| [Fill Null or Empty Columns](wrangler-docs/directives/fill-null-or-empty.md) | Fills column value with a fixed value if null or empty | +| [Keep Columns](wrangler-docs/directives/keep.md) | Keeps specified columns from the record | +| [Merge Columns](wrangler-docs/directives/merge.md) | Merges two columns by inserting a third column | +| [Rename Column](wrangler-docs/directives/rename.md) | Renames an existing column in the record | +| [Set Column Header](wrangler-docs/directives/set-headers.md) | Sets the names of columns, in the order they are specified | +| [Split to Columns](wrangler-docs/directives/split-to-columns.md) | Splits a column based on a separator into multiple columns | +| [Swap Columns](wrangler-docs/directives/swap.md) | Swaps column names of two columns | +| [Set Column Data Type](wrangler-docs/directives/set-type.md) | Convert data type of a column | +| **NLP** | | +| [Stemming Tokenized Words](wrangler-docs/directives/stemming.md) | Applies the Porter stemmer algorithm for English words | +| **Transient Aggregators & Setters** | | +| [Increment Variable](wrangler-docs/directives/increment-variable.md) | Increments a transient variable with a record of processing. | +| [Set Variable](wrangler-docs/directives/set-variable.md) | Sets a transient variable with a record of processing. | +| **Functions** | | +| [Data Quality](wrangler-docs/functions/dq-functions.md) | Data quality check functions. Checks for date, time, etc. | +| [Date Manipulations](wrangler-docs/functions/date-functions.md) | Functions that can manipulate date | +| [DDL](wrangler-docs/functions/ddl-functions.md) | Functions that can manipulate definition of data | +| [JSON](wrangler-docs/functions/json-functions.md) | Functions that can be useful in transforming your data | +| [Types](wrangler-docs/functions/type-functions.md) | Functions for detecting the type of data | + +## ByteSize and TimeDuration Parsers + +The Data Prep system includes specialized parsers for handling byte sizes and time durations: + +### ByteSize Parser + +The ByteSize parser handles string representations of byte sizes with units. It supports the following formats: + +- Bytes (B): "100B" +- Kilobytes (KB/K): "10KB" or "10K" +- Megabytes (MB/M): "5MB" or "5M" +- Gigabytes (GB/G): "2GB" or "2G" +- Terabytes (TB/T): "1TB" or "1T" +- Petabytes (PB/P): "0.5PB" or "0.5P" + +Example usage: + +```java +ByteSize size = new ByteSize("10MB"); +long bytes = size.getBytes(); // Returns size in bytes +double kb = size.getKilobytes(); // Returns size in kilobytes +double mb = size.getMegabytes(); // Returns size in megabytes +double gb = size.getGigabytes(); // Returns size in gigabytes +``` + +### TimeDuration Parser + +The TimeDuration parser handles string representations of time durations with units. It supports the following formats: + +- Seconds (s): "30s" +- Minutes (m): "5m" +- Hours (h): "2h" +- Days (d): "1d" +- Weeks (w): "1w" +- Months (mo): "1mo" (approximated as 30 days) +- Years (y): "1y" (approximated as 365 days) + +Example usage: + +```java +TimeDuration duration = new TimeDuration("1h30m"); +long ms = duration.getMilliseconds(); // Returns duration in milliseconds +double sec = duration.getSeconds(); // Returns duration in seconds +double min = duration.getMinutes(); // Returns duration in minutes +double hours = duration.getHours(); // Returns duration in hours +double days = duration.getDays(); // Returns duration in days +``` + +## Aggregate Stats Directive + +The `aggregate-stats` directive allows you to perform statistical aggregations on byte sizes and time durations. It supports various aggregation types and can output results in different units. + +### Syntax + +``` +aggregate-stats :size_column :time_column output_size_column output_time_column [aggregation_type] +``` + +Where: + +- `:size_column` - Column containing byte size values +- `:time_column` - Column containing time duration values +- `output_size_column` - Name of the output column for size aggregation +- `output_time_column` - Name of the output column for time aggregation +- `aggregation_type` - (Optional) Type of aggregation: "total", "average", "median", "p95", or "p99" + +### Examples + +1. Total aggregation (default): + +``` +aggregate-stats :data_transfer_size :response_time total_size_mb total_time_sec +``` + +2. Average aggregation: + +``` +aggregate-stats :data_transfer_size :response_time avg_size_mb avg_time_sec average +``` + +3. Median aggregation: + +``` +aggregate-stats :data_transfer_size :response_time median_size_kb median_time_ms median +``` + +4. Percentile aggregation: + +``` +aggregate-stats :data_transfer_size :response_time p95_size_mb p95_time_sec p95 +``` + +5. Multiple unit outputs: + +``` +aggregate-stats :data_transfer_size :response_time total_size_gb total_time_min +``` + +### Supported Output Units + +For byte sizes: + +- Bytes (B) +- Kilobytes (KB) +- Megabytes (MB) +- Gigabytes (GB) + +For time durations: + +- Milliseconds (ms) +- Seconds (sec) +- Minutes (min) +- Hours (hr) +- Days (day) + +### Notes + +- The directive automatically handles unit conversions +- Empty datasets will result in zero values +- The output maintains precision for floating-point calculations +- Percentile calculations (p95, p99) are useful for identifying outliers in the data ## Performance Initial performance tests show that with a set of directives of high complexity for -transforming data, *DataPrep* is able to process at about ~106K records per second. The -rates below are specified as *records/second*. - -| Directive Complexity | Column Count | Records | Size | Mean Rate | -| -------------------- | :----------: | ---------: | -------------: | --------: | -| High (167 Directives) | 426 | 127,946,398 | 82,677,845,324 | 106,367.27 | -| High (167 Directives) | 426 | 511,785,592 | 330,711,381,296 | 105,768.93 | +transforming data, _DataPrep_ is able to process at about ~106K records per second. The +rates below are specified as _records/second_. +| Directive Complexity | Column Count | Records | Size | Mean Rate | +| --------------------- | :----------: | ----------: | --------------: | ---------: | +| High (167 Directives) | 426 | 127,946,398 | 82,677,845,324 | 106,367.27 | +| High (167 Directives) | 426 | 511,785,592 | 330,711,381,296 | 105,768.93 | ## Contact @@ -182,9 +304,9 @@ rates below are specified as *records/second*. CDAP User Group and Development Discussions: -* [cdap-user@googlegroups.com](https://groups.google.com/d/forum/cdap-user) +- [cdap-user@googlegroups.com](https://groups.google.com/d/forum/cdap-user) -The *cdap-user* mailing list is primarily for users using the product to develop +The _cdap-user_ mailing list is primarily for users using the product to develop applications or building plugins for appplications. You can expect questions from users, release announcements, and any other discussions that we think will be helpful to the users. @@ -197,7 +319,6 @@ CDAP IRC Channel: [#cdap on irc.freenode.net](http://webchat.freenode.net?channe CDAP Users on Slack: [cdap-users team](https://cdap-users.herokuapp.com) - ## License and Trademarks Copyright © 2016-2019 Cask Data, Inc. @@ -215,4 +336,4 @@ and limitations under the License. Cask is a trademark of Cask Data, Inc. All rights reserved. Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with -permission. No endorsement by The Apache Software Foundation is implied by the use of these marks. +permission. No endorsement by The Apache Software Foundation is implied by the use of these marks. \ No newline at end of file diff --git a/wrangler-api/pom.xml b/wrangler-api/pom.xml index e97464a64..b4f5aa36a 100644 --- a/wrangler-api/pom.xml +++ b/wrangler-api/pom.xml @@ -41,4 +41,4 @@ - + \ No newline at end of file diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java new file mode 100644 index 000000000..8c816b4d3 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java @@ -0,0 +1,147 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + package io.cdap.wrangler.api.parser; + + import com.google.gson.JsonElement; + import com.google.gson.JsonObject; + import io.cdap.wrangler.api.annotations.PublicEvolving; + + import java.util.regex.Matcher; + import java.util.regex.Pattern; + + /** + * Token class representing byte size values with units (e.g., "10KB", "5MB"). + * Parses and stores byte sizes, providing methods to retrieve the value in + * bytes. + */ + @PublicEvolving + public class ByteSize implements Token { + private static final Pattern BYTE_SIZE_PATTERN = Pattern.compile("(\\d+)\\s*([kKmMgGtTpP]?[bB]?)"); + private final String value; + private final long bytes; + + /** + * Constructs a ByteSize token from a string representation. + * Accepts formats like "10KB", "5MB", "1GB", etc. + * + * @param value String representation of a byte size with unit + * @throws IllegalArgumentException if the string cannot be parsed as a byte + * size + */ + public ByteSize(String value) { + this.value = value; + this.bytes = parseBytes(value); + } + + /** + * Parses a string representation of byte size into its byte value. + * + * @param sizeStr String representation of a byte size (e.g., "10KB") + * @return The size in bytes + * @throws IllegalArgumentException if the string cannot be parsed + */ + private long parseBytes(String sizeStr) { + Matcher matcher = BYTE_SIZE_PATTERN.matcher(sizeStr); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid byte size format: " + sizeStr); + } + + long size = Long.parseLong(matcher.group(1)); + String unit = matcher.group(2).toUpperCase(); + + switch (unit) { + case "B": + case "": + return size; + case "KB": + case "K": + return size * 1024; + case "MB": + case "M": + return size * 1024 * 1024; + case "GB": + case "G": + return size * 1024 * 1024 * 1024; + case "TB": + case "T": + return size * 1024 * 1024 * 1024 * 1024; + case "PB": + case "P": + return size * 1024 * 1024 * 1024 * 1024 * 1024; + default: + throw new IllegalArgumentException("Unsupported byte size unit: " + unit); + } + } + + /** + * Returns the original string representation of the byte size. + */ + @Override + public String value() { + return value; + } + + /** + * Returns the size in bytes. + * + * @return The size in bytes + */ + public long getBytes() { + return bytes; + } + + /** + * Returns the size in kilobytes. + * + * @return The size in kilobytes + */ + public double getKilobytes() { + return bytes / 1024.0; + } + + /** + * Returns the size in megabytes. + * + * @return The size in megabytes + */ + public double getMegabytes() { + return bytes / (1024.0 * 1024.0); + } + + /** + * Returns the size in gigabytes. + * + * @return The size in gigabytes + */ + public double getGigabytes() { + return bytes / (1024.0 * 1024.0 * 1024.0); + } + + @Override + public TokenType type() { + return TokenType.BYTE_SIZE; + } + + @Override + public JsonElement toJson() { + JsonObject object = new JsonObject(); + object.addProperty("type", TokenType.BYTE_SIZE.name()); + object.addProperty("value", value); + object.addProperty("bytes", bytes); + return object; + } + } \ No newline at end of file diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java new file mode 100644 index 000000000..d7607bbab --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java @@ -0,0 +1,156 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + package io.cdap.wrangler.api.parser; + + import com.google.gson.JsonElement; + import com.google.gson.JsonObject; + import io.cdap.wrangler.api.annotations.PublicEvolving; + + import java.util.concurrent.TimeUnit; + import java.util.regex.Matcher; + import java.util.regex.Pattern; + + /** + * Token class representing time duration values with units (e.g., "5s", "10m", + * "2h"). + * Parses and stores time durations, providing methods to retrieve the value in + * various time units. + */ + @PublicEvolving + public class TimeDuration implements Token { + private static final Pattern TIME_PATTERN = Pattern.compile("(\\d+)\\s*([smhdwy]|mo)"); + private final String value; + private final long milliseconds; + + /** + * Constructs a TimeDuration token from a string representation. + * Accepts formats like "5s" (seconds), "10m" (minutes), "2h" (hours), etc. + * + * @param value String representation of a time duration with unit + * @throws IllegalArgumentException if the string cannot be parsed as a time + * duration + */ + public TimeDuration(String value) { + this.value = value; + this.milliseconds = parseMilliseconds(value); + } + + /** + * Parses a string representation of time duration into milliseconds. + * + * @param durationStr String representation of a time duration (e.g., "5s") + * @return The duration in milliseconds + * @throws IllegalArgumentException if the string cannot be parsed + */ + private long parseMilliseconds(String durationStr) { + Matcher matcher = TIME_PATTERN.matcher(durationStr); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid time duration format: " + durationStr); + } + + long amount = Long.parseLong(matcher.group(1)); + String unit = matcher.group(2).toLowerCase(); + + switch (unit) { + case "s": + return TimeUnit.SECONDS.toMillis(amount); + case "m": + return TimeUnit.MINUTES.toMillis(amount); + case "h": + return TimeUnit.HOURS.toMillis(amount); + case "d": + return TimeUnit.DAYS.toMillis(amount); + case "w": + return TimeUnit.DAYS.toMillis(amount * 7); + case "mo": + // Approximate a month as 30 days + return TimeUnit.DAYS.toMillis(amount * 30); + case "y": + // Approximate a year as 365 days + return TimeUnit.DAYS.toMillis(amount * 365); + default: + throw new IllegalArgumentException("Unsupported time unit: " + unit); + } + } + + /** + * Returns the original string representation of the time duration. + */ + @Override + public String value() { + return value; + } + + /** + * Returns the duration in milliseconds. + * + * @return The duration in milliseconds + */ + public long getMilliseconds() { + return milliseconds; + } + + /** + * Returns the duration in seconds. + * + * @return The duration in seconds + */ + public double getSeconds() { + return milliseconds / 1000.0; + } + + /** + * Returns the duration in minutes. + * + * @return The duration in minutes + */ + public double getMinutes() { + return milliseconds / (1000.0 * 60); + } + + /** + * Returns the duration in hours. + * + * @return The duration in hours + */ + public double getHours() { + return milliseconds / (1000.0 * 60 * 60); + } + + /** + * Returns the duration in days. + * + * @return The duration in days + */ + public double getDays() { + return milliseconds / (1000.0 * 60 * 60 * 24); + } + + @Override + public TokenType type() { + return TokenType.TIME_DURATION; + } + + @Override + public JsonElement toJson() { + JsonObject object = new JsonObject(); + object.addProperty("type", TokenType.TIME_DURATION.name()); + object.addProperty("value", value); + object.addProperty("milliseconds", milliseconds); + return object; + } + } \ No newline at end of file diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/Token.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/Token.java index bc596f4df..683fb26ae 100644 --- a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/Token.java +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/Token.java @@ -32,6 +32,7 @@ * of this interface.

*/ @PublicEvolving + public interface Token extends Serializable { /** * Returns the {@code value} of the object wrapped by the @@ -57,3 +58,4 @@ public interface Token extends Serializable { */ JsonElement toJson(); } +// [blank line here] \ No newline at end of file diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java index 8c93b0e6a..22fcf2dd4 100644 --- a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java @@ -14,143 +14,178 @@ * the License. */ -package io.cdap.wrangler.api.parser; - -import io.cdap.wrangler.api.annotations.PublicEvolving; - -import java.io.Serializable; - -/** - * The TokenType class provides the enumerated types for different types of - * tokens that are supported by the grammar. - * - * Each of the enumerated types specified in this class also has associated - * object representing it. e.g. {@code DIRECTIVE_NAME} is represented by the - * object {@code DirectiveName}. - * - * @see Bool - * @see BoolList - * @see ColumnName - * @see ColumnNameList - * @see DirectiveName - * @see Numeric - * @see NumericList - * @see Properties - * @see Ranges - * @see Expression - * @see Text - * @see TextList - */ -@PublicEvolving -public enum TokenType implements Serializable { - /** - * Represents the enumerated type for the object {@code DirectiveName} type. - * This type is associated with the token that is recognized as a directive - * name within the recipe. - */ - DIRECTIVE_NAME, - - /** - * Represents the enumerated type for the object of {@code ColumnName} type. - * This type is associated with token that represents the column as defined - * by the grammar as :. - */ - COLUMN_NAME, - - /** - * Represents the enumerated type for the object of {@code Text} type. - * This type is associated with the token that is either enclosed within a single quote(') - * or a double quote (") as string. - */ - TEXT, - - /** - * Represents the enumerated type for the object of {@code Numeric} type. - * This type is associated with the token that is either a integer or real number. - */ - NUMERIC, - - /** - * Represents the enumerated type for the object of {@code Bool} type. - * This type is associated with the token that either represents string 'true' or 'false'. - */ - BOOLEAN, - - /** - * Represents the enumerated type for the object of type {@code BoolList} type. - * This type is associated with the rule that is a collection of {@code Boolean} values - * separated by comman(,). E.g. - * - * ColumnName[,ColumnName]* - * - */ - COLUMN_NAME_LIST, - - /** - * Represents the enumerated type for the object of type {@code TextList} type. - * This type is associated with the comma separated text represented were each text - * is enclosed within a single quote (') or double quote (") and each text is separated - * by comma (,). E.g. - * - * Text[,Text]* - * - */ - TEXT_LIST, - - /** - * Represents the enumerated type for the object of type {@code NumericList} type. - * This type is associated with the collection of {@code Numeric} values separated by - * comma(,). E.g. - * - * Numeric[,Numeric]* - * - * - */ - NUMERIC_LIST, - - /** - * Represents the enumerated type for the object of type {@code BoolList} type. - * This type is associated with the collection of {@code Bool} values separated by - * comma(,). E.g. - * - * Boolean[,Boolean]* - * - */ - BOOLEAN_LIST, - - /** - * Represents the enumerated type for the object of type {@code Expression} type. - * This type is associated with code block that either represents a condition or - * an expression. E.g. - * - * exp:{ } - * - */ - EXPRESSION, - - /** - * Represents the enumerated type for the object of type {@code Properties} type. - * This type is associated with a collection of key and value pairs all separated - * by a comma(,). E.g. - * - * prop:{ =[,=]*} - * - */ - PROPERTIES, - - /** - * Represents the enumerated type for the object of type {@code Ranges} types. - * This type is associated with a collection of range represented in the form shown - * below - * - * :=value[,:=value]* - * - */ - RANGES, - - /** - * Represents the enumerated type for the object of type {@code String} with restrictions - * on characters that can be present in a string. - */ - IDENTIFIER -} + package io.cdap.wrangler.api.parser; + + import io.cdap.wrangler.api.annotations.PublicEvolving; + + import java.io.Serializable; + + /** + * The TokenType class provides the enumerated types for different types of + * tokens that are supported by the grammar. + * + * Each of the enumerated types specified in this class also has associated + * object representing it. e.g. {@code DIRECTIVE_NAME} is represented by the + * object {@code DirectiveName}. + * + * @see Bool + * @see BoolList + * @see ColumnName + * @see ColumnNameList + * @see DirectiveName + * @see Numeric + * @see NumericList + * @see Properties + * @see Ranges + * @see Expression + * @see Text + * @see TextList + */ + @PublicEvolving + public enum TokenType implements Serializable { + /** + * Represents the enumerated type for the object {@code DirectiveName} type. + * This type is associated with the token that is recognized as a directive + * name within the recipe. + */ + DIRECTIVE_NAME, + + /** + * Represents the enumerated type for the object of {@code ColumnName} type. + * This type is associated with token that represents the column as defined + * by the grammar as :. + */ + COLUMN_NAME, + + /** + * Represents the enumerated type for the object of {@code Text} type. + * This type is associated with the token that is either enclosed within a + * single quote(') + * or a double quote (") as string. + */ + TEXT, + + /** + * Represents the enumerated type for the object of {@code Numeric} type. + * This type is associated with the token that is either a integer or real + * number. + */ + NUMERIC, + + /** + * Represents the enumerated type for the object of {@code Bool} type. + * This type is associated with the token that either represents string 'true' + * or 'false'. + */ + BOOLEAN, + + /** + * Represents the enumerated type for the object of type {@code BoolList} type. + * This type is associated with the rule that is a collection of {@code Boolean} + * values + * separated by comman(,). E.g. + * + * ColumnName[,ColumnName]* + * + */ + COLUMN_NAME_LIST, + + /** + * Represents the enumerated type for the object of type {@code TextList} type. + * This type is associated with the comma separated text represented were each + * text + * is enclosed within a single quote (') or double quote (") and each text is + * separated + * by comma (,). E.g. + * + * Text[,Text]* + * + */ + TEXT_LIST, + + /** + * Represents the enumerated type for the object of type {@code NumericList} + * type. + * This type is associated with the collection of {@code Numeric} values + * separated by + * comma(,). E.g. + * + * Numeric[,Numeric]* + * + * + */ + NUMERIC_LIST, + + /** + * Represents the enumerated type for the object of type {@code BoolList} type. + * This type is associated with the collection of {@code Bool} values separated + * by + * comma(,). E.g. + * + * Boolean[,Boolean]* + * + */ + BOOLEAN_LIST, + + /** + * Represents the enumerated type for the object of type {@code Expression} + * type. + * This type is associated with code block that either represents a condition or + * an expression. E.g. + * + * exp:{ } + * + */ + EXPRESSION, + + /** + * Represents the enumerated type for the object of type {@code Properties} + * type. + * This type is associated with a collection of key and value pairs all + * separated + * by a comma(,). E.g. + * + * prop:{ =[,=]*} + * + */ + PROPERTIES, + + /** + * Represents the enumerated type for the object of type {@code Ranges} types. + * This type is associated with a collection of range represented in the form + * shown + * below + * + * :=value[,:=value]* + * + */ + RANGES, + + /** + * Represents the enumerated type for the object of type {@code String} with + * restrictions + * on characters that can be present in a string. + */ + IDENTIFIER, + + /** + * Represents the enumerated type for the object of type {@code ByteSize} type. + * This type is associated with token that represents a byte size value with a + * unit, + * like "10KB", "5MB", "2GB", etc. + * + * @see ByteSize + */ + BYTE_SIZE, + + /** + * Represents the enumerated type for the object of type {@code TimeDuration} + * type. + * This type is associated with token that represents a time duration value with + * a unit, + * like "5s" (seconds), "10m" (minutes), "2h" (hours), etc. + * + * @see TimeDuration + */ + TIME_DURATION + } \ No newline at end of file diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java index 78800b7d1..10a498278 100644 --- a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java @@ -14,231 +14,250 @@ * the License. */ -package io.cdap.wrangler.api.parser; + package io.cdap.wrangler.api.parser; -import io.cdap.wrangler.api.Optional; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -/** - * This class {@link UsageDefinition} provides a way for users to registers the argument for UDDs. - * - * {@link UsageDefinition} is a collection of {@link TokenDefinition} and the name of the directive - * itself. Each token specification has an associated ordinal that can be used to position the argument - * within the directive. - * - * Following is a example of how this class can be used. - * - * UsageDefinition.Builder builder = UsageDefinition.builder(); - * builder.add("col1", TypeToken.COLUMN_NAME); // By default, this field is required. - * builder.add("col2", TypeToken.COLUMN_NAME, false); // This is a optional field. - * builder.add("expression", TypeToken.EXPRESSION); - * UsageDefinition definition = builder.build(); - * - * - * NOTE: No constraints checks are included in this implementation. - * - * @see TokenDefinition - */ -public final class UsageDefinition implements Serializable { - // transient so it doesn't show up when serialized using gson in service endpoint responses - private final transient int optionalCnt; - private final String directive; - private final List tokens; - - private UsageDefinition(String directive, int optionalCnt, List tokens) { - this.directive = directive; - this.tokens = tokens; - this.optionalCnt = optionalCnt; - } - - /** - * Returns the name of the directive for which the this UsageDefinition - * object is created. - * - * @return name of the directive. - */ - public String getDirectiveName() { - return directive; - } - - /** - * This method returns the list of TokenDefinition that should be - * used for parsing the directive into Arguments. - * - * @return List of TokenDefinition. - */ - public List getTokens() { - return tokens; - } - - /** - * Returns the count of TokenDefinition that have been specified - * as optional in the UsageDefinition. - * - * @return number of tokens in the usage that are optional. - */ - public int getOptionalTokensCount() { - return optionalCnt; - } - - /** - * This method converts the UsageDefinition into a usage string - * for this directive. It inspects all the tokens to generate a standard syntax - * for the usage of the directive. - * - * @return a usage representation of this object. - */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(directive).append(" "); - - int count = tokens.size(); - for (TokenDefinition token : tokens) { - if (token.optional()) { - sb.append(" ["); - } - - if (token.label() != null) { - sb.append(token.label()); - } else { - if (token.type().equals(TokenType.DIRECTIVE_NAME)) { - sb.append(token.name()); - } else if (token.type().equals(TokenType.COLUMN_NAME)) { - sb.append(":").append(token.name()); - } else if (token.type().equals(TokenType.COLUMN_NAME_LIST)) { - sb.append(":").append(token.name()).append(" [,:").append(token.name()).append(" ]*"); - } else if (token.type().equals(TokenType.BOOLEAN)) { - sb.append(token.name()).append(" (true/false)"); - } else if (token.type().equals(TokenType.TEXT)) { - sb.append("'").append(token.name()).append("'"); - } else if (token.type().equals(TokenType.IDENTIFIER) || token.type().equals(TokenType.NUMERIC)) { - sb.append(token.name()); - } else if (token.type().equals(TokenType.BOOLEAN_LIST) || token.type().equals(TokenType.NUMERIC_LIST) - || token.type().equals(TokenType.TEXT_LIST)) { - sb.append(token.name()).append("[,").append(token.name()).append(" ...]*"); - } else if (token.type().equals(TokenType.EXPRESSION)) { - sb.append("exp:{<").append(token.name()).append(">}"); - } else if (token.type().equals(TokenType.PROPERTIES)) { - sb.append("prop:{key:value,[key:value]*"); - } else if (token.type().equals(TokenType.RANGES)) { - sb.append("start:end=[bool|text|numeric][,start:end=[bool|text|numeric]*"); - } - } - - count--; - - if (token.optional()) { - sb.append("]"); - } else { - if (count > 0) { - sb.append(" "); - } - } - } - return sb.toString(); - } - - /** - * This is a static method for creating a builder for the UsageDefinition - * object. In order to create a UsageDefinition, a builder has to created. - * - *

This builder is provided as user API for constructing the usage specification - * for a directive.

- * - * @param directive name of the directive for which the builder is created for. - * @return A UsageDefinition.Builder object that can be used to construct - * UsageDefinition object. - */ - public static UsageDefinition.Builder builder(String directive) { - return new UsageDefinition.Builder(directive); - } - - /** - * This inner builder class provides a way to create UsageDefinition - * object. It exposes different methods that allow users to configure the TokenDefinition - * for each token used within the usage of a directive. - */ - public static final class Builder { - private final String directive; - private final List tokens; - private int currentOrdinal; - private int optionalCnt; - - public Builder(String directive) { - this.directive = directive; - this.currentOrdinal = 0; - this.tokens = new ArrayList<>(); - this.optionalCnt = 0; - } - - /** - * This method provides a way to set the name and the type of token, while - * defaulting the label to 'null' and setting the optional to FALSE. - * - * @param name of the token in the definition of a directive. - * @param type of the token to be extracted. - */ - public void define(String name, TokenType type) { - TokenDefinition spec = new TokenDefinition(name, type, null, currentOrdinal, Optional.FALSE); - currentOrdinal++; - tokens.add(spec); - } - - /** - * Allows users to define a token with a name, type of the token and additional optional - * for the label that is used during creation of the usage for the directive. - * - * @param name of the token in the definition of a directive. - * @param type of the token to be extracted. - * @param label label that modifies the usage for this field. - */ - public void define(String name, TokenType type, String label) { - TokenDefinition spec = new TokenDefinition(name, type, label, currentOrdinal, Optional.FALSE); - currentOrdinal++; - tokens.add(spec); - } - - /** - * Method allows users to specify a field as optional in combination to the - * name of the token and the type of token. - * - * @param name of the token in the definition of a directive. - * @param type of the token to be extracted. - * @param optional Optional#TRUE if token is optional, else Optional#FALSE. - */ - public void define(String name, TokenType type, boolean optional) { - TokenDefinition spec = new TokenDefinition(name, type, null, currentOrdinal, optional); - optionalCnt = optional ? optionalCnt + 1 : optionalCnt; - currentOrdinal++; - tokens.add(spec); - } - - /** - * Method allows users to specify a field as optional in combination to the - * name of the token, the type of token and also the ability to specify a label - * for the usage. - * - * @param name of the token in the definition of a directive. - * @param type of the token to be extracted. - * @param label label that modifies the usage for this field. - * @param optional Optional#TRUE if token is optional, else Optional#FALSE. - */ - public void define(String name, TokenType type, String label, boolean optional) { - TokenDefinition spec = new TokenDefinition(name, type, label, currentOrdinal, optional); - optionalCnt = optional ? optionalCnt + 1 : optionalCnt; - currentOrdinal++; - tokens.add(spec); - } - - /** - * @return a instance of UsageDefinition object. - */ - public UsageDefinition build() { - return new UsageDefinition(directive, optionalCnt, tokens); - } - } -} + import io.cdap.wrangler.api.Optional; + + import java.io.Serializable; + import java.util.ArrayList; + import java.util.List; + + /** + * This class {@link UsageDefinition} provides a way for users to registers the + * argument for UDDs. + * + * {@link UsageDefinition} is a collection of {@link TokenDefinition} and the + * name of the directive + * itself. Each token specification has an associated ordinal that can be used + * to position the argument + * within the directive. + * + * Following is a example of how this class can be used. + * + * UsageDefinition.Builder builder = UsageDefinition.builder(); + * builder.add("col1", TypeToken.COLUMN_NAME); // By default, this field is required. + * builder.add("col2", TypeToken.COLUMN_NAME, false); // This is a optional field. + * builder.add("expression", TypeToken.EXPRESSION); + * UsageDefinition definition = builder.build(); + * + * + * NOTE: No constraints checks are included in this implementation. + * + * @see TokenDefinition + */ + public final class UsageDefinition implements Serializable { + // transient so it doesn't show up when serialized using gson in service + // endpoint responses + private final transient int optionalCnt; + private final String directive; + private final List tokens; + + private UsageDefinition(String directive, int optionalCnt, List tokens) { + this.directive = directive; + this.tokens = tokens; + this.optionalCnt = optionalCnt; + } + + /** + * Returns the name of the directive for which the this + * UsageDefinition + * object is created. + * + * @return name of the directive. + */ + public String getDirectiveName() { + return directive; + } + + /** + * This method returns the list of TokenDefinition that should be + * used for parsing the directive into Arguments. + * + * @return List of TokenDefinition. + */ + public List getTokens() { + return tokens; + } + + /** + * Returns the count of TokenDefinition that have been specified + * as optional in the UsageDefinition. + * + * @return number of tokens in the usage that are optional. + */ + public int getOptionalTokensCount() { + return optionalCnt; + } + + /** + * This method converts the UsageDefinition into a usage string + * for this directive. It inspects all the tokens to generate a standard syntax + * for the usage of the directive. + * + * @return a usage representation of this object. + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(directive).append(" "); + + int count = tokens.size(); + for (TokenDefinition token : tokens) { + if (token.optional()) { + sb.append(" ["); + } + + if (token.label() != null) { + sb.append(token.label()); + } else { + if (token.type().equals(TokenType.DIRECTIVE_NAME)) { + sb.append(token.name()); + } else if (token.type().equals(TokenType.COLUMN_NAME)) { + sb.append(":").append(token.name()); + } else if (token.type().equals(TokenType.COLUMN_NAME_LIST)) { + sb.append(":").append(token.name()).append(" [,:").append(token.name()).append(" ]*"); + } else if (token.type().equals(TokenType.BOOLEAN)) { + sb.append(token.name()).append(" (true/false)"); + } else if (token.type().equals(TokenType.TEXT)) { + sb.append("'").append(token.name()).append("'"); + } else if (token.type().equals(TokenType.IDENTIFIER) || token.type().equals(TokenType.NUMERIC)) { + sb.append(token.name()); + } else if (token.type().equals(TokenType.BOOLEAN_LIST) || token.type().equals(TokenType.NUMERIC_LIST) + || token.type().equals(TokenType.TEXT_LIST)) { + sb.append(token.name()).append("[,").append(token.name()).append(" ...]*"); + } else if (token.type().equals(TokenType.EXPRESSION)) { + sb.append("exp:{<").append(token.name()).append(">}"); + } else if (token.type().equals(TokenType.PROPERTIES)) { + sb.append("prop:{key:value,[key:value]*"); + } else if (token.type().equals(TokenType.RANGES)) { + sb.append("start:end=[bool|text|numeric][,start:end=[bool|text|numeric]*"); + } else if (token.type().equals(TokenType.BYTE_SIZE)) { + sb.append(token.name()).append(" (e.g., 10KB, 5MB)"); + } else if (token.type().equals(TokenType.TIME_DURATION)) { + sb.append(token.name()).append(" (e.g., 10s, 5m, 2h)"); + } + } + + count--; + + if (token.optional()) { + sb.append("]"); + } else { + if (count > 0) { + sb.append(" "); + } + } + } + return sb.toString(); + } + + /** + * This is a static method for creating a builder for the + * UsageDefinition + * object. In order to create a UsageDefinition, a builder has to + * created. + * + *

+ * This builder is provided as user API for constructing the usage specification + * for a directive. + *

+ * + * @param directive name of the directive for which the builder is created for. + * @return A UsageDefinition.Builder object that can be used to + * construct + * UsageDefinition object. + */ + public static UsageDefinition.Builder builder(String directive) { + return new UsageDefinition.Builder(directive); + } + + /** + * This inner builder class provides a way to create + * UsageDefinition + * object. It exposes different methods that allow users to configure the + * TokenDefinition + * for each token used within the usage of a directive. + */ + public static final class Builder { + private final String directive; + private final List tokens; + private int currentOrdinal; + private int optionalCnt; + + public Builder(String directive) { + this.directive = directive; + this.currentOrdinal = 0; + this.tokens = new ArrayList<>(); + this.optionalCnt = 0; + } + + /** + * This method provides a way to set the name and the type of token, while + * defaulting the label to 'null' and setting the optional to FALSE. + * + * @param name of the token in the definition of a directive. + * @param type of the token to be extracted. + */ + public void define(String name, TokenType type) { + TokenDefinition spec = new TokenDefinition(name, type, null, currentOrdinal, Optional.FALSE); + currentOrdinal++; + tokens.add(spec); + } + + /** + * Allows users to define a token with a name, type of the token and additional + * optional + * for the label that is used during creation of the usage for the directive. + * + * @param name of the token in the definition of a directive. + * @param type of the token to be extracted. + * @param label label that modifies the usage for this field. + */ + public void define(String name, TokenType type, String label) { + TokenDefinition spec = new TokenDefinition(name, type, label, currentOrdinal, Optional.FALSE); + currentOrdinal++; + tokens.add(spec); + } + + /** + * Method allows users to specify a field as optional in combination to the + * name of the token and the type of token. + * + * @param name of the token in the definition of a directive. + * @param type of the token to be extracted. + * @param optional Optional#TRUE if token is optional, else + * Optional#FALSE. + */ + public void define(String name, TokenType type, boolean optional) { + TokenDefinition spec = new TokenDefinition(name, type, null, currentOrdinal, optional); + optionalCnt = optional ? optionalCnt + 1 : optionalCnt; + currentOrdinal++; + tokens.add(spec); + } + + /** + * Method allows users to specify a field as optional in combination to the + * name of the token, the type of token and also the ability to specify a label + * for the usage. + * + * @param name of the token in the definition of a directive. + * @param type of the token to be extracted. + * @param label label that modifies the usage for this field. + * @param optional Optional#TRUE if token is optional, else + * Optional#FALSE. + */ + public void define(String name, TokenType type, String label, boolean optional) { + TokenDefinition spec = new TokenDefinition(name, type, label, currentOrdinal, optional); + optionalCnt = optional ? optionalCnt + 1 : optionalCnt; + currentOrdinal++; + tokens.add(spec); + } + + /** + * @return a instance of UsageDefinition object. + */ + public UsageDefinition build() { + return new UsageDefinition(directive, optionalCnt, tokens); + } + } + } \ No newline at end of file diff --git a/wrangler-core/pom.xml b/wrangler-core/pom.xml index e2dcb3c2b..cd0857474 100644 --- a/wrangler-core/pom.xml +++ b/wrangler-core/pom.xml @@ -361,4 +361,4 @@ - + \ No newline at end of file diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 index 7c517ed6a..ac25f71bf 100644 --- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 @@ -64,6 +64,8 @@ directive | stringList | numberRanges | properties + | byteSize + | timeDuration )*? ; @@ -140,7 +142,7 @@ numberRange ; value - : String | Number | Column | Bool + : String | Number | Column | Bool | BYTE_SIZE | TIME_DURATION ; ecommand @@ -195,6 +197,13 @@ identifierList : Identifier (',' Identifier)* ; +byteSize + : BYTE_SIZE + ; + +timeDuration + : TIME_DURATION + ; /* * Following are the Lexer Rules used for tokenizing the recipe. @@ -303,6 +312,33 @@ Space : [ \t\r\n\u000C]+ -> skip ; +BYTE_SIZE + : Int BYTE_UNIT + ; + +TIME_DURATION + : Int TIME_UNIT + ; + +fragment BYTE_UNIT + : [kK][bB] // kilobyte + | [mM][bB] // megabyte + | [gG][bB] // gigabyte + | [tT][bB] // terabyte + | [pP][bB] // petabyte + | [bB] // byte + ; + +fragment TIME_UNIT + : [sS] // seconds + | [mM] // minutes + | [hH] // hours + | [dD] // days + | [wW] // weeks + | [mM][oO] // months + | [yY] // years + ; + fragment Int : '-'? [1-9] Digit* [L]* | '0' @@ -310,4 +346,4 @@ fragment Int fragment Digit : [0-9] - ; + ; \ No newline at end of file diff --git a/wrangler-core/src/main/java/io/cdap/directives/aggregates/SizeTimeAggregator.java b/wrangler-core/src/main/java/io/cdap/directives/aggregates/SizeTimeAggregator.java new file mode 100644 index 000000000..feb7a278d --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/directives/aggregates/SizeTimeAggregator.java @@ -0,0 +1,298 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + package io.cdap.directives.aggregates; + + import io.cdap.cdap.api.annotation.Description; + import io.cdap.cdap.api.annotation.Name; + import io.cdap.cdap.api.annotation.Plugin; + import io.cdap.wrangler.api.Arguments; + import io.cdap.wrangler.api.Directive; + import io.cdap.wrangler.api.DirectiveExecutionException; + import io.cdap.wrangler.api.DirectiveParseException; + import io.cdap.wrangler.api.ExecutorContext; + import io.cdap.wrangler.api.Optional; + import io.cdap.wrangler.api.Row; + import io.cdap.wrangler.api.TransientStore; + import io.cdap.wrangler.api.TransientVariableScope; + import io.cdap.wrangler.api.annotations.Categories; + import io.cdap.wrangler.api.lineage.Lineage; + import io.cdap.wrangler.api.lineage.Mutation; + import io.cdap.wrangler.api.parser.ByteSize; + import io.cdap.wrangler.api.parser.ColumnName; + import io.cdap.wrangler.api.parser.Text; + import io.cdap.wrangler.api.parser.TimeDuration; + import io.cdap.wrangler.api.parser.TokenType; + import io.cdap.wrangler.api.parser.UsageDefinition; + + import java.util.ArrayList; + import java.util.List; + + /** + * A directive for aggregating byte sizes and time durations across multiple + * rows. + * + * This directive processes ByteSize and TimeDuration tokens, accumulates them, + * and produces summary statistics such as total size and total/average time. + */ + @Plugin(type = Directive.TYPE) + @Name(SizeTimeAggregator.NAME) + @Categories(categories = { "aggregator", "statistics" }) + @Description("Aggregates byte sizes and time durations across rows, calculating totals and averages.") + public class SizeTimeAggregator implements Directive, Lineage { + public static final String NAME = "aggregate-size-time"; + + // Store keys for the transient store + private static final String TOTAL_SIZE_KEY = "aggregate_total_size_bytes"; + private static final String TOTAL_TIME_KEY = "aggregate_total_time_ms"; + private static final String COUNT_KEY = "aggregate_count"; + + // Source column names + private String sizeColumnName; + private String timeColumnName; + + // Target column names + private String targetSizeColumnName; + private String targetTimeColumnName; + + // Unit settings for output (optional) + private String sizeUnit; // Default: bytes, Options: KB, MB, GB + private String timeUnit; // Default: ms, Options: s, m, h + private boolean useAverage; // Default: false (use total) + + @Override + public UsageDefinition define() { + UsageDefinition.Builder builder = UsageDefinition.builder(NAME); + builder.define("size-column", TokenType.COLUMN_NAME); + builder.define("time-column", TokenType.COLUMN_NAME); + builder.define("target-size-column", TokenType.COLUMN_NAME); + builder.define("target-time-column", TokenType.COLUMN_NAME); + builder.define("size-unit", TokenType.TEXT, Optional.TRUE); + builder.define("time-unit", TokenType.TEXT, Optional.TRUE); + builder.define("aggregate-type", TokenType.TEXT, Optional.TRUE); + return builder.build(); + } + + @Override + public void initialize(Arguments args) throws DirectiveParseException { + this.sizeColumnName = ((ColumnName) args.value("size-column")).value(); + this.timeColumnName = ((ColumnName) args.value("time-column")).value(); + this.targetSizeColumnName = ((ColumnName) args.value("target-size-column")).value(); + this.targetTimeColumnName = ((ColumnName) args.value("target-time-column")).value(); + + // Parse optional arguments with default values + this.sizeUnit = args.contains("size-unit") ? ((Text) args.value("size-unit")).value().toUpperCase() : "BYTES"; + this.timeUnit = args.contains("time-unit") ? ((Text) args.value("time-unit")).value().toLowerCase() : "ms"; + + // Determine aggregation type: total or average + String aggregateType = args.contains("aggregate-type") + ? ((Text) args.value("aggregate-type")).value().toLowerCase() + : "total"; + this.useAverage = "average".equals(aggregateType) || "avg".equals(aggregateType); + + // Validate size unit + if (!("BYTES".equals(sizeUnit) || "KB".equals(sizeUnit) || + "MB".equals(sizeUnit) || "GB".equals(sizeUnit))) { + throw new DirectiveParseException( + NAME, String.format("Invalid size unit '%s'. Supported units are BYTES, KB, MB, GB", sizeUnit)); + } + + // Validate time unit + if (!("ms".equals(timeUnit) || "s".equals(timeUnit) || + "m".equals(timeUnit) || "h".equals(timeUnit))) { + throw new DirectiveParseException( + NAME, String.format("Invalid time unit '%s'. Supported units are ms, s, m, h", timeUnit)); + } + } + + @Override + public void destroy() { + // no-op + } + + @Override + public List execute(List rows, ExecutorContext context) throws DirectiveExecutionException { + // Get the transient store for aggregation + TransientStore store = context.getTransientStore(); + + // Initialize counters if they don't exist + initializeCounters(store); + + // Process each row to accumulate values + for (Row row : rows) { + int sizeIdx = row.find(sizeColumnName); + int timeIdx = row.find(timeColumnName); + + // Skip row if either column is not found + if (sizeIdx == -1 || timeIdx == -1) { + continue; + } + + // Get values from columns + Object sizeObj = row.getValue(sizeIdx); + Object timeObj = row.getValue(timeIdx); + + // Process byte size if it's a valid type + if (sizeObj != null) { + long sizeBytes = 0; + if (sizeObj instanceof ByteSize) { + sizeBytes = ((ByteSize) sizeObj).getBytes(); + } else if (sizeObj instanceof String) { + try { + ByteSize byteSize = new ByteSize((String) sizeObj); + sizeBytes = byteSize.getBytes(); + } catch (IllegalArgumentException e) { + // Skip invalid format + continue; + } + } + + // Add size to total + if (sizeBytes > 0) { + long currentTotal = store.get(TOTAL_SIZE_KEY); + store.set(TransientVariableScope.GLOBAL, TOTAL_SIZE_KEY, currentTotal + sizeBytes); + } + } + + // Process time duration if it's a valid type + if (timeObj != null) { + long timeMs = 0; + if (timeObj instanceof TimeDuration) { + timeMs = ((TimeDuration) timeObj).getMilliseconds(); + } else if (timeObj instanceof String) { + try { + TimeDuration timeDuration = new TimeDuration((String) timeObj); + timeMs = timeDuration.getMilliseconds(); + } catch (IllegalArgumentException e) { + // Skip invalid format + continue; + } + } + + // Add time to total + if (timeMs > 0) { + long currentTotal = store.get(TOTAL_TIME_KEY); + store.set(TransientVariableScope.GLOBAL, TOTAL_TIME_KEY, currentTotal + timeMs); + } + } + + // Increment count + long currentCount = store.get(COUNT_KEY); + store.set(TransientVariableScope.GLOBAL, COUNT_KEY, currentCount + 1); + } + + // Return unchanged rows during normal processing + return rows; + } + + /** + * Initialize the counters in the transient store if they don't exist + */ + private void initializeCounters(TransientStore store) { + if (store.get(TOTAL_SIZE_KEY) == null) { + store.set(TransientVariableScope.GLOBAL, TOTAL_SIZE_KEY, 0L); + } + if (store.get(TOTAL_TIME_KEY) == null) { + store.set(TransientVariableScope.GLOBAL, TOTAL_TIME_KEY, 0L); + } + if (store.get(COUNT_KEY) == null) { + store.set(TransientVariableScope.GLOBAL, COUNT_KEY, 0L); + } + } + + /** + * Finalize the aggregation, creating a summary row with the aggregated values + * + * This should be called after all data has been processed + */ + public Row getAggregationResult(ExecutorContext context) { + TransientStore store = context.getTransientStore(); + long totalSizeBytes = store.get(TOTAL_SIZE_KEY); + long totalTimeMs = store.get(TOTAL_TIME_KEY); + long count = store.get(COUNT_KEY); + + // Create a new result row + Row result = new Row(); + + // Calculate size based on unit + double sizeValue; + switch (sizeUnit) { + case "KB": + sizeValue = totalSizeBytes / 1024.0; + break; + case "MB": + sizeValue = totalSizeBytes / (1024.0 * 1024.0); + break; + case "GB": + sizeValue = totalSizeBytes / (1024.0 * 1024.0 * 1024.0); + break; + case "BYTES": + default: + sizeValue = totalSizeBytes; + break; + } + + // Calculate time based on unit and aggregation type + double timeValue; + double timeInSelectedUnit; + + // Convert to selected time unit + switch (timeUnit) { + case "s": + timeInSelectedUnit = totalTimeMs / 1000.0; + break; + case "m": + timeInSelectedUnit = totalTimeMs / (1000.0 * 60); + break; + case "h": + timeInSelectedUnit = totalTimeMs / (1000.0 * 60 * 60); + break; + case "ms": + default: + timeInSelectedUnit = totalTimeMs; + break; + } + + // Apply aggregation type + if (useAverage && count > 0) { + timeValue = timeInSelectedUnit / count; + } else { + timeValue = timeInSelectedUnit; + } + + // Add values to the result row + result.add(targetSizeColumnName, sizeValue); + result.add(targetTimeColumnName, timeValue); + + // Reset counters for next use + store.set(TransientVariableScope.GLOBAL, TOTAL_SIZE_KEY, 0L); + store.set(TransientVariableScope.GLOBAL, TOTAL_TIME_KEY, 0L); + store.set(TransientVariableScope.GLOBAL, COUNT_KEY, 0L); + + return result; + } + + @Override + public Mutation lineage() { + return Mutation.builder() + .readable("Aggregated byte size from column '%s' and time duration from column '%s' " + + "into columns '%s' and '%s'", + sizeColumnName, timeColumnName, targetSizeColumnName, targetTimeColumnName) + .relation(sizeColumnName, targetSizeColumnName) + .relation(timeColumnName, targetTimeColumnName) + .build(); + } + } \ No newline at end of file diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java index ac35e7a5e..7ae7f0abe 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java @@ -14,316 +14,376 @@ * the License. */ -package io.cdap.wrangler.parser; + package io.cdap.wrangler.parser; -import io.cdap.wrangler.api.LazyNumber; -import io.cdap.wrangler.api.RecipeSymbol; -import io.cdap.wrangler.api.SourceInfo; -import io.cdap.wrangler.api.Triplet; -import io.cdap.wrangler.api.parser.Bool; -import io.cdap.wrangler.api.parser.BoolList; -import io.cdap.wrangler.api.parser.ColumnName; -import io.cdap.wrangler.api.parser.ColumnNameList; -import io.cdap.wrangler.api.parser.DirectiveName; -import io.cdap.wrangler.api.parser.Expression; -import io.cdap.wrangler.api.parser.Identifier; -import io.cdap.wrangler.api.parser.Numeric; -import io.cdap.wrangler.api.parser.NumericList; -import io.cdap.wrangler.api.parser.Properties; -import io.cdap.wrangler.api.parser.Ranges; -import io.cdap.wrangler.api.parser.Text; -import io.cdap.wrangler.api.parser.TextList; -import io.cdap.wrangler.api.parser.Token; -import org.antlr.v4.runtime.ParserRuleContext; -import org.antlr.v4.runtime.misc.Interval; -import org.antlr.v4.runtime.tree.ParseTree; -import org.antlr.v4.runtime.tree.TerminalNode; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This class RecipeVisitor implements the visitor pattern - * used during traversal of the AST tree. The ParserTree#Walker - * invokes appropriate methods as call backs with information about the node. - * - *

In order to understand what's being invoked, please look at the grammar file - * Directive.g4

. - * - *

This class exposes a getTokenGroups method for retrieving the - * RecipeSymbol after visiting. The RecipeSymbol represents - * all the TokenGroup for all directives in a recipe. Each directive - * will create a TokenGroup

- * - *

As the ParseTree is walking through the call graph, it generates - * one TokenGroup for each directive in the recipe. Each TokenGroup - * contains parsed Tokens for that directive along with more information like - * SourceInfo. A collection of TokenGroup consistutes a RecipeSymbol - * that is returned by this function.

- */ -public final class RecipeVisitor extends DirectivesBaseVisitor { - private RecipeSymbol.Builder builder = new RecipeSymbol.Builder(); - - /** - * Returns a RecipeSymbol for the recipe being parsed. This - * object has all the tokens that were successfully parsed along with source - * information for each directive in the recipe. - * - * @return An compiled object after parsing the recipe. - */ - public RecipeSymbol getCompiledUnit() { - return builder.build(); - } - - /** - * A Recipe is made up of Directives and Directives is made up of each individual - * Directive. This method is invoked on every visit to a new directive in the recipe. - */ - @Override - public RecipeSymbol.Builder visitDirective(DirectivesParser.DirectiveContext ctx) { - builder.createTokenGroup(getOriginalSource(ctx)); - return super.visitDirective(ctx); - } - - /** - * A Directive can include identifiers, this method extracts that token that is being - * identified as token of type Identifier. - */ - @Override - public RecipeSymbol.Builder visitIdentifier(DirectivesParser.IdentifierContext ctx) { - builder.addToken(new Identifier(ctx.Identifier().getText())); - return super.visitIdentifier(ctx); - } - - /** - * A Directive can include properties (which are a collection of key and value pairs), - * this method extracts that token that is being identified as token of type Properties. - */ - @Override - public RecipeSymbol.Builder visitPropertyList(DirectivesParser.PropertyListContext ctx) { - Map props = new HashMap<>(); - List properties = ctx.property(); - for (DirectivesParser.PropertyContext property : properties) { - String identifier = property.Identifier().getText(); - Token token; - if (property.number() != null) { - token = new Numeric(new LazyNumber(property.number().getText())); - } else if (property.bool() != null) { - token = new Bool(Boolean.valueOf(property.bool().getText())); - } else { - String text = property.text().getText(); - token = new Text(text.substring(1, text.length() - 1)); - } - props.put(identifier, token); - } - builder.addToken(new Properties(props)); - return builder; - } - - /** - * A Pragma is an instruction to the compiler to dynamically load the directives being specified - * from the DirectiveRegistry. These do not affect the data flow. - * - *

E.g. #pragma load-directives test1, test2, test3; will collect the tokens - * test1, test2 and test3 as dynamically loadable directives.

- */ - @Override - public RecipeSymbol.Builder visitPragmaLoadDirective(DirectivesParser.PragmaLoadDirectiveContext ctx) { - List identifiers = ctx.identifierList().Identifier(); - for (TerminalNode identifier : identifiers) { - builder.addLoadableDirective(identifier.getText()); - } - return builder; - } - - /** - * A Pragma version is a informational directive to notify compiler about the grammar that is should - * be using to parse the directives below. - */ - @Override - public RecipeSymbol.Builder visitPragmaVersion(DirectivesParser.PragmaVersionContext ctx) { - builder.addVersion(ctx.Number().getText()); - return builder; - } - - /** - * A Directive can include number ranges like start:end=value[,start:end=value]*. This - * visitor method allows you to collect all the number ranges and create a token type - * Ranges. - */ - @Override - public RecipeSymbol.Builder visitNumberRanges(DirectivesParser.NumberRangesContext ctx) { - List> output = new ArrayList<>(); - List ranges = ctx.numberRange(); - for (DirectivesParser.NumberRangeContext range : ranges) { - List numbers = range.Number(); - String text = range.value().getText(); - if (text.startsWith("'") && text.endsWith("'")) { - text = text.substring(1, text.length() - 1); - } - Triplet val = - new Triplet<>(new Numeric(new LazyNumber(numbers.get(0).getText())), - new Numeric(new LazyNumber(numbers.get(1).getText())), - text - ); - output.add(val); - } - builder.addToken(new Ranges(output)); - return builder; - } - - /** - * This visitor method extracts the custom directive name specified. The custom - * directives are specified with a bang (!) at the start. - */ - @Override - public RecipeSymbol.Builder visitEcommand(DirectivesParser.EcommandContext ctx) { - builder.addToken(new DirectiveName(ctx.Identifier().getText())); - return builder; - } - - /** - * A Directive can consist of column specifiers. These are columns that the directive - * would operate on. When a token of type column is visited, it would generate a token - * type of type ColumnName. - */ - @Override - public RecipeSymbol.Builder visitColumn(DirectivesParser.ColumnContext ctx) { - builder.addToken(new ColumnName(ctx.Column().getText().substring(1))); - return builder; - } - - /** - * A Directive can consist of text field. These type of fields are enclosed within - * a single-quote or a double-quote. This visitor method extracts the string value - * within the quotes and creates a token type Text. - */ - @Override - public RecipeSymbol.Builder visitText(DirectivesParser.TextContext ctx) { - String value = ctx.String().getText(); - builder.addToken(new Text(value.substring(1, value.length() - 1))); - return builder; - } - - /** - * A Directive can consist of numeric field. This visitor method extracts the - * numeric value Numeric. - */ - @Override - public RecipeSymbol.Builder visitNumber(DirectivesParser.NumberContext ctx) { - LazyNumber number = new LazyNumber(ctx.Number().getText()); - builder.addToken(new Numeric(number)); - return builder; - } - - /** - * A Directive can consist of Bool field. The Bool field is represented as - * either true or false. This visitor method extract the bool value into a - * token type Bool. - */ - @Override - public RecipeSymbol.Builder visitBool(DirectivesParser.BoolContext ctx) { - builder.addToken(new Bool(Boolean.valueOf(ctx.Bool().getText()))); - return builder; - } - - /** - * A Directive can include a expression or a condition to be evaluated. When - * such a token type is found, the visitor extracts the expression and generates - * a token type Expression to be added to the TokenGroup - */ - @Override - public RecipeSymbol.Builder visitCondition(DirectivesParser.ConditionContext ctx) { - int childCount = ctx.getChildCount(); - StringBuilder sb = new StringBuilder(); - for (int i = 1; i < childCount - 1; ++i) { - ParseTree child = ctx.getChild(i); - sb.append(child.getText()).append(" "); - } - builder.addToken(new Expression(sb.toString())); - return builder; - } - - /** - * A Directive has name and in the parsing context it's called a command. - * This visitor methods extracts the command and creates a toke type DirectiveName - */ - @Override - public RecipeSymbol.Builder visitCommand(DirectivesParser.CommandContext ctx) { - builder.addToken(new DirectiveName(ctx.Identifier().getText())); - return builder; - } - - /** - * This visitor methods extracts the list of columns specified. It creates a token - * type ColumnNameList to be added to TokenGroup. - */ - @Override - public RecipeSymbol.Builder visitColList(DirectivesParser.ColListContext ctx) { - List columns = ctx.Column(); - List names = new ArrayList<>(); - for (TerminalNode column : columns) { - names.add(column.getText().substring(1)); - } - builder.addToken(new ColumnNameList(names)); - return builder; - } - - /** - * This visitor methods extracts the list of numeric specified. It creates a token - * type NumericList to be added to TokenGroup. - */ - @Override - public RecipeSymbol.Builder visitNumberList(DirectivesParser.NumberListContext ctx) { - List numbers = ctx.Number(); - List numerics = new ArrayList<>(); - for (TerminalNode number : numbers) { - numerics.add(new LazyNumber(number.getText())); - } - builder.addToken(new NumericList(numerics)); - return builder; - } - - /** - * This visitor methods extracts the list of booleans specified. It creates a token - * type BoolList to be added to TokenGroup. - */ - @Override - public RecipeSymbol.Builder visitBoolList(DirectivesParser.BoolListContext ctx) { - List bools = ctx.Bool(); - List booleans = new ArrayList<>(); - for (TerminalNode bool : bools) { - booleans.add(Boolean.parseBoolean(bool.getText())); - } - builder.addToken(new BoolList(booleans)); - return builder; - } - - /** - * This visitor methods extracts the list of strings specified. It creates a token - * type StringList to be added to TokenGroup. - */ - @Override - public RecipeSymbol.Builder visitStringList(DirectivesParser.StringListContext ctx) { - List strings = ctx.String(); - List strs = new ArrayList<>(); - for (TerminalNode string : strings) { - String text = string.getText(); - strs.add(text.substring(1, text.length() - 1)); - } - builder.addToken(new TextList(strs)); - return builder; - } - - private SourceInfo getOriginalSource(ParserRuleContext ctx) { - int a = ctx.getStart().getStartIndex(); - int b = ctx.getStop().getStopIndex(); - Interval interval = new Interval(a, b); - String text = ctx.start.getInputStream().getText(interval); - int lineno = ctx.getStart().getLine(); - int column = ctx.getStart().getCharPositionInLine(); - return new SourceInfo(lineno, column, text); - } -} + import io.cdap.wrangler.api.LazyNumber; + import io.cdap.wrangler.api.RecipeSymbol; + import io.cdap.wrangler.api.SourceInfo; + import io.cdap.wrangler.api.Triplet; + import io.cdap.wrangler.api.parser.Bool; + import io.cdap.wrangler.api.parser.BoolList; + import io.cdap.wrangler.api.parser.ByteSize; + import io.cdap.wrangler.api.parser.ColumnName; + import io.cdap.wrangler.api.parser.ColumnNameList; + import io.cdap.wrangler.api.parser.DirectiveName; + import io.cdap.wrangler.api.parser.Expression; + import io.cdap.wrangler.api.parser.Identifier; + import io.cdap.wrangler.api.parser.Numeric; + import io.cdap.wrangler.api.parser.NumericList; + import io.cdap.wrangler.api.parser.Properties; + import io.cdap.wrangler.api.parser.Ranges; + import io.cdap.wrangler.api.parser.Text; + import io.cdap.wrangler.api.parser.TextList; + import io.cdap.wrangler.api.parser.TimeDuration; + import io.cdap.wrangler.api.parser.Token; + import org.antlr.v4.runtime.ParserRuleContext; + import org.antlr.v4.runtime.misc.Interval; + import org.antlr.v4.runtime.tree.ParseTree; + import org.antlr.v4.runtime.tree.TerminalNode; + + import java.util.ArrayList; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + + /** + * This class RecipeVisitor implements the visitor pattern + * used during traversal of the AST tree. The ParserTree#Walker + * invokes appropriate methods as call backs with information about the node. + * + *

+ * In order to understand what's being invoked, please look at the grammar file + * Directive.g4 + *

+ * . + * + *

+ * This class exposes a getTokenGroups method for retrieving the + * RecipeSymbol after visiting. The RecipeSymbol + * represents + * all the TokenGroup for all directives in a recipe. Each + * directive + * will create a TokenGroup + *

+ * + *

+ * As the ParseTree is walking through the call graph, it generates + * one TokenGroup for each directive in the recipe. Each + * TokenGroup + * contains parsed Tokens for that directive along with more + * information like + * SourceInfo. A collection of TokenGroup consistutes + * a RecipeSymbol + * that is returned by this function. + *

+ */ + public final class RecipeVisitor extends DirectivesBaseVisitor { + private RecipeSymbol.Builder builder = new RecipeSymbol.Builder(); + + /** + * Returns a RecipeSymbol for the recipe being parsed. This + * object has all the tokens that were successfully parsed along with source + * information for each directive in the recipe. + * + * @return An compiled object after parsing the recipe. + */ + public RecipeSymbol getCompiledUnit() { + return builder.build(); + } + + /** + * A Recipe is made up of Directives and Directives is made up of each + * individual + * Directive. This method is invoked on every visit to a new directive in the + * recipe. + */ + @Override + public RecipeSymbol.Builder visitDirective(DirectivesParser.DirectiveContext ctx) { + builder.createTokenGroup(getOriginalSource(ctx)); + return super.visitDirective(ctx); + } + + /** + * A Directive can include identifiers, this method extracts that token that is + * being + * identified as token of type Identifier. + */ + @Override + public RecipeSymbol.Builder visitIdentifier(DirectivesParser.IdentifierContext ctx) { + builder.addToken(new Identifier(ctx.Identifier().getText())); + return super.visitIdentifier(ctx); + } + + /** + * A Directive can include properties (which are a collection of key and value + * pairs), + * this method extracts that token that is being identified as token of type + * Properties. + */ + @Override + public RecipeSymbol.Builder visitPropertyList(DirectivesParser.PropertyListContext ctx) { + Map props = new HashMap<>(); + List properties = ctx.property(); + for (DirectivesParser.PropertyContext property : properties) { + String identifier = property.Identifier().getText(); + Token token; + if (property.number() != null) { + token = new Numeric(new LazyNumber(property.number().getText())); + } else if (property.bool() != null) { + token = new Bool(Boolean.valueOf(property.bool().getText())); + } else { + String text = property.text().getText(); + token = new Text(text.substring(1, text.length() - 1)); + } + props.put(identifier, token); + } + builder.addToken(new Properties(props)); + return builder; + } + + /** + * A Pragma is an instruction to the compiler to dynamically load the directives + * being specified + * from the DirectiveRegistry. These do not affect the data flow. + * + *

+ * E.g. #pragma load-directives test1, test2, test3; will collect + * the tokens + * test1, test2 and test3 as dynamically loadable directives. + *

+ */ + @Override + public RecipeSymbol.Builder visitPragmaLoadDirective(DirectivesParser.PragmaLoadDirectiveContext ctx) { + List identifiers = ctx.identifierList().Identifier(); + for (TerminalNode identifier : identifiers) { + builder.addLoadableDirective(identifier.getText()); + } + return builder; + } + + /** + * A Pragma version is a informational directive to notify compiler about the + * grammar that is should + * be using to parse the directives below. + */ + @Override + public RecipeSymbol.Builder visitPragmaVersion(DirectivesParser.PragmaVersionContext ctx) { + builder.addVersion(ctx.Number().getText()); + return builder; + } + + /** + * A Directive can include number ranges like + * start:end=value[,start:end=value]*. This + * visitor method allows you to collect all the number ranges and create a token + * type + * Ranges. + */ + @Override + public RecipeSymbol.Builder visitNumberRanges(DirectivesParser.NumberRangesContext ctx) { + List> output = new ArrayList<>(); + List ranges = ctx.numberRange(); + for (DirectivesParser.NumberRangeContext range : ranges) { + List numbers = range.Number(); + String text = range.value().getText(); + if (text.startsWith("'") && text.endsWith("'")) { + text = text.substring(1, text.length() - 1); + } + Triplet val = new Triplet<>(new Numeric(new LazyNumber(numbers.get(0).getText())), + new Numeric(new LazyNumber(numbers.get(1).getText())), + text); + output.add(val); + } + builder.addToken(new Ranges(output)); + return builder; + } + + /** + * This visitor method extracts the custom directive name specified. The custom + * directives are specified with a bang (!) at the start. + */ + @Override + public RecipeSymbol.Builder visitEcommand(DirectivesParser.EcommandContext ctx) { + builder.addToken(new DirectiveName(ctx.Identifier().getText())); + return builder; + } + + /** + * A Directive can consist of column specifiers. These are columns that the + * directive + * would operate on. When a token of type column is visited, it would generate a + * token + * type of type ColumnName. + */ + @Override + public RecipeSymbol.Builder visitColumn(DirectivesParser.ColumnContext ctx) { + builder.addToken(new ColumnName(ctx.Column().getText().substring(1))); + return builder; + } + + /** + * A Directive can consist of text field. These type of fields are enclosed + * within + * a single-quote or a double-quote. This visitor method extracts the string + * value + * within the quotes and creates a token type Text. + */ + @Override + public RecipeSymbol.Builder visitText(DirectivesParser.TextContext ctx) { + String value = ctx.String().getText(); + builder.addToken(new Text(value.substring(1, value.length() - 1))); + return builder; + } + + /** + * A Directive can consist of numeric field. This visitor method extracts the + * numeric value Numeric. + */ + @Override + public RecipeSymbol.Builder visitNumber(DirectivesParser.NumberContext ctx) { + LazyNumber number = new LazyNumber(ctx.Number().getText()); + builder.addToken(new Numeric(number)); + return builder; + } + + /** + * A Directive can consist of Bool field. The Bool field is represented as + * either true or false. This visitor method extract the bool value into a + * token type Bool. + */ + @Override + public RecipeSymbol.Builder visitBool(DirectivesParser.BoolContext ctx) { + builder.addToken(new Bool(Boolean.valueOf(ctx.Bool().getText()))); + return builder; + } + + /** + * A Directive can consist of a ByteSize field. The ByteSize field is + * represented as + * a number followed by a byte unit (e.g., "10KB", "5MB"). This visitor method + * extracts + * the byte size value into a token type ByteSize. + */ + @Override + public RecipeSymbol.Builder visitByteSize(DirectivesParser.ByteSizeContext ctx) { + builder.addToken(new ByteSize(ctx.BYTE_SIZE().getText())); + return builder; + } + + /** + * A Directive can consist of a TimeDuration field. The TimeDuration field is + * represented as + * a number followed by a time unit (e.g., "5s", "10m", "2h"). This visitor + * method extracts + * the time duration value into a token type TimeDuration. + */ + @Override + public RecipeSymbol.Builder visitTimeDuration(DirectivesParser.TimeDurationContext ctx) { + builder.addToken(new TimeDuration(ctx.TIME_DURATION().getText())); + return builder; + } + + /** + * A Directive can include a expression or a condition to be evaluated. When + * such a token type is found, the visitor extracts the expression and generates + * a token type Expression to be added to the + * TokenGroup + */ + @Override + public RecipeSymbol.Builder visitCondition(DirectivesParser.ConditionContext ctx) { + int childCount = ctx.getChildCount(); + StringBuilder sb = new StringBuilder(); + for (int i = 1; i < childCount - 1; ++i) { + ParseTree child = ctx.getChild(i); + sb.append(child.getText()).append(" "); + } + builder.addToken(new Expression(sb.toString())); + return builder; + } + + /** + * A Directive has name and in the parsing context it's called a command. + * This visitor methods extracts the command and creates a toke type + * DirectiveName + */ + @Override + public RecipeSymbol.Builder visitCommand(DirectivesParser.CommandContext ctx) { + builder.addToken(new DirectiveName(ctx.Identifier().getText())); + return builder; + } + + /** + * This visitor methods extracts the list of columns specified. It creates a + * token + * type ColumnNameList to be added to TokenGroup. + */ + @Override + public RecipeSymbol.Builder visitColList(DirectivesParser.ColListContext ctx) { + List columns = ctx.Column(); + List names = new ArrayList<>(); + for (TerminalNode column : columns) { + names.add(column.getText().substring(1)); + } + builder.addToken(new ColumnNameList(names)); + return builder; + } + + /** + * This visitor methods extracts the list of numeric specified. It creates a + * token + * type NumericList to be added to TokenGroup. + */ + @Override + public RecipeSymbol.Builder visitNumberList(DirectivesParser.NumberListContext ctx) { + List numbers = ctx.Number(); + List numerics = new ArrayList<>(); + for (TerminalNode number : numbers) { + numerics.add(new LazyNumber(number.getText())); + } + builder.addToken(new NumericList(numerics)); + return builder; + } + + /** + * This visitor methods extracts the list of booleans specified. It creates a + * token + * type BoolList to be added to TokenGroup. + */ + @Override + public RecipeSymbol.Builder visitBoolList(DirectivesParser.BoolListContext ctx) { + List bools = ctx.Bool(); + List booleans = new ArrayList<>(); + for (TerminalNode bool : bools) { + booleans.add(Boolean.parseBoolean(bool.getText())); + } + builder.addToken(new BoolList(booleans)); + return builder; + } + + /** + * This visitor methods extracts the list of strings specified. It creates a + * token + * type StringList to be added to TokenGroup. + */ + @Override + public RecipeSymbol.Builder visitStringList(DirectivesParser.StringListContext ctx) { + List strings = ctx.String(); + List strs = new ArrayList<>(); + for (TerminalNode string : strings) { + String text = string.getText(); + strs.add(text.substring(1, text.length() - 1)); + } + builder.addToken(new TextList(strs)); + return builder; + } + + private SourceInfo getOriginalSource(ParserRuleContext ctx) { + int a = ctx.getStart().getStartIndex(); + int b = ctx.getStop().getStopIndex(); + Interval interval = new Interval(a, b); + String text = ctx.start.getInputStream().getText(interval); + int lineno = ctx.getStart().getLine(); + int column = ctx.getStart().getCharPositionInLine(); + return new SourceInfo(lineno, column, text); + } + } \ No newline at end of file diff --git a/wrangler-core/src/test/java/io/cdap/directives/aggregates/SizeTimeAggregatorTest.java b/wrangler-core/src/test/java/io/cdap/directives/aggregates/SizeTimeAggregatorTest.java new file mode 100644 index 000000000..8a7cf33a2 --- /dev/null +++ b/wrangler-core/src/test/java/io/cdap/directives/aggregates/SizeTimeAggregatorTest.java @@ -0,0 +1,299 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + package io.cdap.directives.aggregates; + + import io.cdap.wrangler.TestingPipelineContext; + import io.cdap.wrangler.api.DirectiveParseException; + import io.cdap.wrangler.api.ExecutorContext; + import io.cdap.wrangler.api.Row; + import io.cdap.wrangler.api.parser.ColumnName; + import io.cdap.wrangler.api.parser.Text; + import io.cdap.wrangler.api.parser.TokenType; + import io.cdap.wrangler.api.parser.UsageDefinition; + import io.cdap.wrangler.parser.MapArguments; + import org.junit.Assert; + import org.junit.Test; + + import java.util.Arrays; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + + /** + * Tests for {@link SizeTimeAggregator}. + */ + public class SizeTimeAggregatorTest { + + @Test + public void testUsageDefinition() { + SizeTimeAggregator directive = new SizeTimeAggregator(); + UsageDefinition definition = directive.define(); + Assert.assertNotNull(definition); + Assert.assertEquals(7, definition.getTokens().size()); + Assert.assertEquals(TokenType.COLUMN_NAME, definition.getTokens().get(0).type()); + Assert.assertEquals(TokenType.COLUMN_NAME, definition.getTokens().get(1).type()); + Assert.assertEquals(TokenType.COLUMN_NAME, definition.getTokens().get(2).type()); + Assert.assertEquals(TokenType.COLUMN_NAME, definition.getTokens().get(3).type()); + Assert.assertEquals(TokenType.TEXT, definition.getTokens().get(4).type()); + Assert.assertEquals(TokenType.TEXT, definition.getTokens().get(5).type()); + Assert.assertEquals(TokenType.TEXT, definition.getTokens().get(6).type()); + } + + @Test + public void testByteSizeAggregation() throws Exception { + SizeTimeAggregator directive = new SizeTimeAggregator(); + + // Set up arguments + Map args = new HashMap<>(); + args.put("size-column", new ColumnName("size")); + args.put("time-column", new ColumnName("time")); + args.put("target-size-column", new ColumnName("total_size")); + args.put("target-time-column", new ColumnName("total_time")); + + // Initialize the directive manually + directive.initialize(new DirectiveArgumentsTest(args)); + + // Create some test data with various byte sizes + List rows = Arrays.asList( + new Row("size", "1KB").add("time", "5s"), + new Row("size", "500B").add("time", "10s"), + new Row("size", "2MB").add("time", "15s")); + + // Create a test execution context + ExecutorContext context = new TestingPipelineContext(); + + // Execute the directive + directive.execute(rows, context); + + // Get the aggregation result + Row result = directive.getAggregationResult(context); + + // The total should be 1KB + 500B + 2MB = approximately 2MB + 1KB + 500B + // 1KB = 1024 bytes, 2MB = 2097152 bytes, 500B = 500 bytes + // Total = 2098676 bytes + Assert.assertEquals(2098676.0, ((Number) result.getValue("total_size")).doubleValue(), 0.0001); + + // The time total should be 5s + 10s + 15s = 30s = 30000ms + Assert.assertEquals(30000.0, ((Number) result.getValue("total_time")).doubleValue(), 0.0001); + } + + @Test + public void testByteSizeAggregationWithUnits() throws Exception { + SizeTimeAggregator directive = new SizeTimeAggregator(); + + // Set up arguments with specific units + Map args = new HashMap<>(); + args.put("size-column", new ColumnName("size")); + args.put("time-column", new ColumnName("time")); + args.put("target-size-column", new ColumnName("total_size")); + args.put("target-time-column", new ColumnName("total_time")); + args.put("size-unit", new Text("MB")); + args.put("time-unit", new Text("s")); + + // Initialize the directive manually + directive.initialize(new DirectiveArgumentsTest(args)); + + // Create some test data + List rows = Arrays.asList( + new Row("size", "1KB").add("time", "5s"), + new Row("size", "500B").add("time", "10s"), + new Row("size", "2MB").add("time", "15s")); + + // Create a test execution context + ExecutorContext context = new TestingPipelineContext(); + + // Execute the directive + directive.execute(rows, context); + + // Get the aggregation result + Row result = directive.getAggregationResult(context); + + // Total is 2098676 bytes = 2.00151 MB (approximately) + double expectedMB = 2098676.0 / (1024.0 * 1024.0); + Assert.assertEquals(expectedMB, ((Number) result.getValue("total_size")).doubleValue(), 0.0001); + + // The time total in seconds should be 30s + Assert.assertEquals(30.0, ((Number) result.getValue("total_time")).doubleValue(), 0.0001); + } + + @Test + public void testAverageAggregation() throws Exception { + SizeTimeAggregator directive = new SizeTimeAggregator(); + + // Set up arguments for average aggregation + Map args = new HashMap<>(); + args.put("size-column", new ColumnName("size")); + args.put("time-column", new ColumnName("time")); + args.put("target-size-column", new ColumnName("avg_size")); + args.put("target-time-column", new ColumnName("avg_time")); + args.put("aggregate-type", new Text("average")); + args.put("size-unit", new Text("KB")); + args.put("time-unit", new Text("s")); + + // Initialize the directive manually + directive.initialize(new DirectiveArgumentsTest(args)); + + // Create some test data + List rows = Arrays.asList( + new Row("size", "1KB").add("time", "5s"), + new Row("size", "500B").add("time", "10s"), + new Row("size", "2MB").add("time", "15s")); + + // Create a test execution context + ExecutorContext context = new TestingPipelineContext(); + + // Execute the directive + directive.execute(rows, context); + + // Get the aggregation result + Row result = directive.getAggregationResult(context); + + // Total is 2098676 bytes = 2050.46 KB, average is 2050.46 / 3 = 683.49 KB + double expectedKB = 2098676.0 / 1024.0 / 3.0; + Assert.assertEquals(expectedKB, ((Number) result.getValue("avg_size")).doubleValue(), 0.01); + + // The average time in seconds should be 30s / 3 = 10s + Assert.assertEquals(10.0, ((Number) result.getValue("avg_time")).doubleValue(), 0.0001); + } + + @Test(expected = DirectiveParseException.class) + public void testInvalidSizeUnit() throws Exception { + SizeTimeAggregator directive = new SizeTimeAggregator(); + + // Set up arguments with an invalid size unit + Map args = new HashMap<>(); + args.put("size-column", new ColumnName("size")); + args.put("time-column", new ColumnName("time")); + args.put("target-size-column", new ColumnName("total_size")); + args.put("target-time-column", new ColumnName("total_time")); + args.put("size-unit", new Text("XB")); // Invalid unit + + // Initialize the directive manually - should throw an exception + directive.initialize(new DirectiveArgumentsTest(args)); + } + + @Test(expected = DirectiveParseException.class) + public void testInvalidTimeUnit() throws Exception { + SizeTimeAggregator directive = new SizeTimeAggregator(); + + // Set up arguments with an invalid time unit + Map args = new HashMap<>(); + args.put("size-column", new ColumnName("size")); + args.put("time-column", new ColumnName("time")); + args.put("target-size-column", new ColumnName("total_size")); + args.put("target-time-column", new ColumnName("total_time")); + args.put("time-unit", new Text("x")); // Invalid unit + + // Initialize the directive manually - should throw an exception + directive.initialize(new DirectiveArgumentsTest(args)); + } + + @Test + public void testWithMixedDataTypes() throws Exception { + SizeTimeAggregator directive = new SizeTimeAggregator(); + + // Set up arguments + Map args = new HashMap<>(); + args.put("size-column", new ColumnName("size")); + args.put("time-column", new ColumnName("time")); + args.put("target-size-column", new ColumnName("total_size")); + args.put("target-time-column", new ColumnName("total_time")); + + // Initialize the directive manually + directive.initialize(new DirectiveArgumentsTest(args)); + + // Create some test data with various formats and some invalid entries + List rows = Arrays.asList( + new Row("size", "1KB").add("time", "5s"), + new Row("size", "not-a-size").add("time", "10s"), // Invalid size + new Row("size", "2MB").add("time", "not-a-time"), // Invalid time + new Row("size", "3MB").add("time", "15s"), + new Row("other", "value") // Missing columns + ); + + // Create a test execution context + ExecutorContext context = new TestingPipelineContext(); + + // Execute the directive + directive.execute(rows, context); + + // Get the aggregation result - only valid entries should be counted + Row result = directive.getAggregationResult(context); + + // The total should be 1KB + 3MB = approximately 3MB + 1KB + // 1KB = 1024 bytes, 3MB = 3145728 bytes + // Total = 3146752 bytes + Assert.assertEquals(3146752.0, ((Number) result.getValue("total_size")).doubleValue(), 0.0001); + + // The time total should be 5s + 15s = 20s = 20000ms + Assert.assertEquals(20000.0, ((Number) result.getValue("total_time")).doubleValue(), 0.0001); + } + + /** + * Simple implementation of Arguments for testing. + */ + private static class DirectiveArgumentsTest implements io.cdap.wrangler.api.Arguments { + private final Map tokens; + + public DirectiveArgumentsTest(Map args) { + this.tokens = args; + } + + @Override + public T value(String name) { + return (T) tokens.get(name); + } + + @Override + public int size() { + return tokens.size(); + } + + @Override + public boolean contains(String name) { + return tokens.containsKey(name); + } + + @Override + public io.cdap.wrangler.api.parser.TokenType type(String name) { + if (tokens.get(name) instanceof io.cdap.wrangler.api.parser.Token) { + return ((io.cdap.wrangler.api.parser.Token) tokens.get(name)).type(); + } + return null; + } + + @Override + public int line() { + return 0; + } + + @Override + public int column() { + return 0; + } + + @Override + public String source() { + return "Test source"; + } + + @Override + public com.google.gson.JsonElement toJson() { + return new com.google.gson.JsonObject(); + } + } + } \ No newline at end of file