|
5 | 5 | "execution_count": 1, |
6 | 6 | "id": "initial_id", |
7 | 7 | "metadata": { |
| 8 | + "collapsed": true, |
8 | 9 | "ExecuteTime": { |
9 | | - "end_time": "2023-11-25T08:06:52.534235Z", |
10 | | - "start_time": "2023-11-25T08:06:52.514168Z" |
11 | | - }, |
12 | | - "collapsed": true |
| 10 | + "end_time": "2023-11-27T20:36:37.381119Z", |
| 11 | + "start_time": "2023-11-27T20:36:37.371708Z" |
| 12 | + } |
13 | 13 | }, |
14 | 14 | "outputs": [], |
15 | 15 | "source": [ |
16 | 16 | "from dotenv import load_dotenv\n", |
17 | 17 | "import os\n", |
18 | 18 | "\n", |
19 | | - "YourAPIKey = \"\"\n", |
| 19 | + "YourAPIKey = \"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\"\n", |
20 | 20 | "\n", |
21 | 21 | "load_dotenv()\n", |
22 | 22 | "\n", |
|
25 | 25 | }, |
26 | 26 | { |
27 | 27 | "cell_type": "code", |
28 | | - "execution_count": 2, |
| 28 | + "execution_count": 8, |
29 | 29 | "id": "640e19e3961c5559", |
30 | 30 | "metadata": { |
| 31 | + "collapsed": false, |
31 | 32 | "ExecuteTime": { |
32 | | - "end_time": "2023-11-25T08:06:53.629849Z", |
33 | | - "start_time": "2023-11-25T08:06:52.521264Z" |
34 | | - }, |
35 | | - "collapsed": false |
| 33 | + "end_time": "2023-11-27T20:37:28.740784Z", |
| 34 | + "start_time": "2023-11-27T20:37:28.697880Z" |
| 35 | + } |
36 | 36 | }, |
37 | 37 | "outputs": [], |
38 | 38 | "source": [ |
|
47 | 47 | }, |
48 | 48 | { |
49 | 49 | "cell_type": "code", |
50 | | - "execution_count": 3, |
| 50 | + "execution_count": 9, |
51 | 51 | "id": "33597feb02573078", |
52 | 52 | "metadata": { |
| 53 | + "collapsed": false, |
53 | 54 | "ExecuteTime": { |
54 | | - "end_time": "2023-11-25T08:06:53.663555Z", |
55 | | - "start_time": "2023-11-25T08:06:53.630098Z" |
56 | | - }, |
57 | | - "collapsed": false |
| 55 | + "end_time": "2023-11-27T20:37:28.924429Z", |
| 56 | + "start_time": "2023-11-27T20:37:28.892412Z" |
| 57 | + } |
58 | 58 | }, |
59 | 59 | "outputs": [], |
60 | 60 | "source": [ |
|
63 | 63 | }, |
64 | 64 | { |
65 | 65 | "cell_type": "code", |
66 | | - "execution_count": 4, |
| 66 | + "execution_count": 10, |
67 | 67 | "id": "ce645e118f29cf79", |
68 | 68 | "metadata": { |
| 69 | + "collapsed": false, |
69 | 70 | "ExecuteTime": { |
70 | | - "end_time": "2023-11-25T08:06:53.686887Z", |
71 | | - "start_time": "2023-11-25T08:06:53.664778Z" |
72 | | - }, |
73 | | - "collapsed": false |
| 71 | + "end_time": "2023-11-27T20:37:29.445589Z", |
| 72 | + "start_time": "2023-11-27T20:37:29.422117Z" |
| 73 | + } |
74 | 74 | }, |
75 | 75 | "outputs": [], |
76 | 76 | "source": [ |
77 | | - "root_dir = '/amos2023ws05-pipeline-config-chat-ai/src/RAG/pipelines'\n", |
| 77 | + "root_dir = '/Users/zainhazzouri/projects/amos2023ws05-pipeline-config-chat-ai/src/RAG/pipelines'\n", |
78 | 78 | "docs = []\n", |
79 | 79 | "\n", |
80 | 80 | "# Go through each folder\n", |
|
92 | 92 | }, |
93 | 93 | { |
94 | 94 | "cell_type": "code", |
95 | | - "execution_count": 5, |
| 95 | + "execution_count": 11, |
96 | 96 | "id": "c6e41366a23e6224", |
97 | 97 | "metadata": { |
| 98 | + "collapsed": false, |
98 | 99 | "ExecuteTime": { |
99 | | - "end_time": "2023-11-25T08:06:53.687897Z", |
100 | | - "start_time": "2023-11-25T08:06:53.685060Z" |
101 | | - }, |
102 | | - "collapsed": false |
| 100 | + "end_time": "2023-11-27T20:37:32.037810Z", |
| 101 | + "start_time": "2023-11-27T20:37:32.031816Z" |
| 102 | + } |
103 | 103 | }, |
104 | 104 | "outputs": [ |
105 | 105 | { |
106 | 106 | "name": "stdout", |
107 | 107 | "output_type": "stream", |
108 | 108 | "text": [ |
109 | | - "You have 190 documents\n", |
| 109 | + "You have 219 documents\n", |
110 | 110 | "\n", |
111 | 111 | "------ Start Document ------\n", |
112 | 112 | "# Copyright 2022 RTDIP\n", |
|
129 | 129 | }, |
130 | 130 | { |
131 | 131 | "cell_type": "code", |
132 | | - "execution_count": 6, |
| 132 | + "execution_count": 12, |
133 | 133 | "id": "e9847352294eee40", |
134 | 134 | "metadata": { |
| 135 | + "collapsed": false, |
135 | 136 | "ExecuteTime": { |
136 | | - "end_time": "2023-11-25T08:06:57.051920Z", |
137 | | - "start_time": "2023-11-25T08:06:53.687581Z" |
138 | | - }, |
139 | | - "collapsed": false |
| 137 | + "end_time": "2023-11-27T20:37:37.370376Z", |
| 138 | + "start_time": "2023-11-27T20:37:33.328494Z" |
| 139 | + } |
140 | 140 | }, |
141 | 141 | "outputs": [], |
142 | 142 | "source": [ |
|
145 | 145 | }, |
146 | 146 | { |
147 | 147 | "cell_type": "code", |
148 | | - "execution_count": 7, |
| 148 | + "execution_count": 13, |
149 | 149 | "id": "90fd0d8a51a5cf31", |
150 | 150 | "metadata": { |
| 151 | + "collapsed": false, |
151 | 152 | "ExecuteTime": { |
152 | | - "end_time": "2023-11-25T08:06:59.831522Z", |
153 | | - "start_time": "2023-11-25T08:06:59.822748Z" |
154 | | - }, |
155 | | - "collapsed": false |
| 153 | + "end_time": "2023-11-27T20:37:38.431113Z", |
| 154 | + "start_time": "2023-11-27T20:37:38.428419Z" |
| 155 | + } |
156 | 156 | }, |
157 | 157 | "outputs": [], |
158 | 158 | "source": [ |
|
162 | 162 | }, |
163 | 163 | { |
164 | 164 | "cell_type": "code", |
165 | | - "execution_count": 10, |
| 165 | + "execution_count": 14, |
166 | 166 | "id": "103f11e7d6f49f6e", |
167 | 167 | "metadata": { |
| 168 | + "collapsed": false, |
168 | 169 | "ExecuteTime": { |
169 | | - "end_time": "2023-11-25T08:11:12.280923Z", |
170 | | - "start_time": "2023-11-25T08:10:35.004427Z" |
171 | | - }, |
172 | | - "collapsed": false |
| 170 | + "end_time": "2023-11-27T20:38:16.799202Z", |
| 171 | + "start_time": "2023-11-27T20:37:39.004977Z" |
| 172 | + } |
173 | 173 | }, |
174 | 174 | "outputs": [], |
175 | 175 | "source": [ |
|
179 | 179 | }, |
180 | 180 | { |
181 | 181 | "cell_type": "code", |
182 | | - "execution_count": 11, |
| 182 | + "execution_count": 15, |
183 | 183 | "id": "7b73d941ef97f4bb", |
184 | 184 | "metadata": { |
| 185 | + "collapsed": false, |
185 | 186 | "ExecuteTime": { |
186 | | - "end_time": "2023-11-25T08:11:13.673641Z", |
187 | | - "start_time": "2023-11-25T08:11:13.668368Z" |
188 | | - }, |
189 | | - "collapsed": false |
| 187 | + "end_time": "2023-11-27T20:38:18.862176Z", |
| 188 | + "start_time": "2023-11-27T20:38:18.858254Z" |
| 189 | + } |
190 | 190 | }, |
191 | 191 | "outputs": [ |
192 | 192 | { |
193 | 193 | "name": "stdout", |
194 | 194 | "output_type": "stream", |
195 | 195 | "text": [ |
196 | | - "Sure! Here's an example of how you can use RTDIP components to read from an Eventhub using a connection string and consumer group, transform the data from binary to string, and then write it to a Delta table:\n", |
| 196 | + "Certainly! Here's the code snippet that reads from an Eventhub using a connection string and consumer group, applies the BinaryToStringTransformer and EdgeXOPCUAJsonToPCDMTransformer transformations, and writes the data to a Delta table:\n", |
197 | 197 | "\n", |
198 | 198 | "```python\n", |
199 | | - "from rtdip_sdk.pipelines.sources import SparkEventhubSource\n", |
200 | | - "from rtdip_sdk.pipelines.transforms import BinaryToStringTransformer, EdgeXTransformer\n", |
201 | | - "from rtdip_sdk.pipelines.destinations import DeltaDestination\n", |
| 199 | + "from rtdip_sdk.pipelines.sources.spark.eventhub import SparkEventhubSource\n", |
| 200 | + "from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer\n", |
| 201 | + "from rtdip_sdk.pipelines.transformers.spark.edgex_opcua_json_to_pcdm import EdgeXOPCUAJsonToPCDMTransformer\n", |
| 202 | + "from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination\n", |
202 | 203 | "from rtdip_sdk.pipelines.utilities import SparkSessionUtility\n", |
| 204 | + "import json\n", |
203 | 205 | "\n", |
204 | | - "# Not required if using Databricks\n", |
205 | | - "spark = SparkSessionUtility(config={}).execute()\n", |
206 | | - "\n", |
207 | | - "# Eventhub connection string and consumer group\n", |
208 | | - "connection_string = \"YOUR_EVENTHUB_CONNECTION_STRING\"\n", |
209 | | - "consumer_group = \"YOUR_CONSUMER_GROUP\"\n", |
210 | | - "\n", |
211 | | - "# Create the Eventhub source\n", |
212 | | - "eventhub_source = SparkEventhubSource(spark=spark, options={\"eventhubs.connectionString\": connection_string, \"eventhubs.consumerGroup\": consumer_group})\n", |
| 206 | + "def pipeline():\n", |
| 207 | + " spark = SparkSessionUtility(config={}).execute()\n", |
213 | 208 | "\n", |
214 | | - "# Read from Eventhub\n", |
215 | | - "data = eventhub_source.read_stream()\n", |
| 209 | + " ehConf = {\n", |
| 210 | + " \"eventhubs.connectionString\": \"{EventhubConnectionString}\",\n", |
| 211 | + " \"eventhubs.consumerGroup\": \"{EventhubConsumerGroup}\",\n", |
| 212 | + " \"eventhubs.startingPosition\": json.dumps(\n", |
| 213 | + " {\"offset\": \"0\", \"seqNo\": -1, \"enqueuedTime\": None, \"isInclusive\": True}\n", |
| 214 | + " ),\n", |
| 215 | + " }\n", |
216 | 216 | "\n", |
217 | | - "# Transform data from binary to string\n", |
218 | | - "binary_to_string_transformer = BinaryToStringTransformer()\n", |
219 | | - "transformed_data = binary_to_string_transformer.transform(data)\n", |
| 217 | + " source = SparkEventhubSource(spark, ehConf).read_batch()\n", |
| 218 | + " string_data = BinaryToStringTransformer(source, \"body\", \"body\").transform()\n", |
| 219 | + " PCDM_data = EdgeXOPCUAJsonToPCDMTransformer(string_data, \"body\").transform()\n", |
| 220 | + " SparkDeltaDestination(\n", |
| 221 | + " data=PCDM_data, options={}, destination=\"{path/to/table}\"\n", |
| 222 | + " ).write_batch()\n", |
220 | 223 | "\n", |
221 | | - "# Apply EdgeX transformation\n", |
222 | | - "edgex_transformer = EdgeXTransformer()\n", |
223 | | - "transformed_data = edgex_transformer.transform(transformed_data)\n", |
224 | | - "\n", |
225 | | - "# Write transformed data to Delta table\n", |
226 | | - "delta_destination = DeltaDestination(spark=spark, data=transformed_data, table_name=\"YOUR_DELTA_TABLE_NAME\")\n", |
227 | | - "delta_destination.write_stream()\n", |
| 224 | + "if __name__ == \"__main__\":\n", |
| 225 | + " pipeline()\n", |
228 | 226 | "```\n", |
229 | 227 | "\n", |
230 | | - "Make sure to replace `YOUR_EVENTHUB_CONNECTION_STRING`, `YOUR_CONSUMER_GROUP`, and `YOUR_DELTA_TABLE_NAME` with your actual values.\n", |
231 | | - "\n", |
232 | | - "This code will create a streaming pipeline that reads data from an Eventhub, transforms it using binary to string and EdgeX transformations, and then writes the transformed data to a Delta table.\n", |
233 | | - "\n", |
234 | | - "Note that you'll need to have the necessary dependencies installed and import the required modules for the components to work properly.\n" |
| 228 | + "Please replace `{EventhubConnectionString}`, `{EventhubConsumerGroup}`, and `{path/to/table}` with your specific values.\n" |
235 | 229 | ] |
236 | 230 | } |
237 | 231 | ], |
|
0 commit comments