|
11 | 11 | "cell_type": "markdown", |
12 | 12 | "metadata": {}, |
13 | 13 | "source": [ |
14 | | - "The platform's Streaming API enables working with data in the platform as streams.\n" |
15 | | - "For more information, see the platform;s [streams](https://www.iguazio.com/docs/v3.0/data-layer/stream/) documentation." |
| 14 | + "The platform's Streaming API enables working with data in the platform as streams.\n", |
| 15 | + " For more information, see the platform's [streams](https://www.iguazio.com/docs/v3.0/data-layer/stream/) documentation." |
16 | 16 | ] |
17 | 17 | }, |
18 | 18 | { |
|
35 | 35 | "cell_type": "markdown", |
36 | 36 | "metadata": {}, |
37 | 37 | "source": [ |
38 | | - "Create a dataplane client" |
| 38 | + "Create a `dataplane` client:" |
39 | 39 | ] |
40 | 40 | }, |
41 | 41 | { |
|
51 | 51 | "cell_type": "markdown", |
52 | 52 | "metadata": {}, |
53 | 53 | "source": [ |
54 | | - "> **Note**: You can pass to the client the `endpoint` and `access_key` parameters explicitly. The following code is equivalent to the default values:\n", |
| 54 | + "> **Note**: You can pass to the client the `endpoint` and `access_key` parameters explicitly.\n", |
| 55 | + "> The following code is equivalent to the default values:\n", |
55 | 56 | ">\n", |
56 | | - ">``` python\n", |
57 | | - ">from os import getenv\n", |
58 | | - ">v3io_client = v3io.dataplane.Client(endpoint='http://v3io-webapi:8081',\n", |
59 | | - "> access_key=getenv('V3IO_ACCESS_KEY'))\n", |
60 | | - ">```\n", |
| 57 | + "> ``` python\n", |
| 58 | + "> from os import getenv\n", |
| 59 | + "> v3io_client = v3io.dataplane.Client(endpoint='http://v3io-webapi:8081',\n", |
| 60 | + "> access_key=getenv('V3IO_ACCESS_KEY'))\n", |
| 61 | + "> ```\n", |
61 | 62 | ">\n", |
62 | | - "> When calling externally, you can obtain the URL of your cluster by copying the API URL of the web-APIs service (webapi) from the **Services** dashboard page. You can select between two types of URLs:\n", |
63 | | - "- **HTTPS Direct** (recommended) — a URL of the format `https://<tenant IP>:<web-APIs port>`; for example, `https://default-tenant.app.mycluster.iguazio.com:8443`.\n", |
64 | | - "- **HTTPS** — a URL of the format `https://webapi.<tenant IP>`; for example, `https://webapi.default-tenant.app.mycluster.iguazio.com`.\n", |
| 63 | + "> When running the code remotely, you can obtain the URL of your cluster by copying the API URL of the web-APIs service (`webapi`) from the **Services** dashboard page. You can select between two types of URLs:\n", |
65 | 64 | ">\n", |
66 | | - "> For more information, see the [Data-Service Web-API General Structure](https://www.iguazio.com/docs/v3.0/data-layer/reference/web-apis/data-service-web-api-gen-struct/) documentation." |
| 65 | + "> - **HTTPS Direct** (recommended) — a URL of the format `https://<tenant IP>:<web-APIs port>`; for example, `https://default-tenant.app.mycluster.iguazio.com:8443`.\n", |
| 66 | + "> - **HTTPS** — a URL of the format `https://webapi.<tenant IP>`; for example, `https://webapi.default-tenant.app.mycluster.iguazio.com`.\n", |
| 67 | + ">\n", |
| 68 | + "> For more information see the [Data-Service Web-API General Structure](https://www.iguazio.com/docs/v3.0/data-layer/reference/web-apis/data-service-web-api-gen-struct/) documentation." |
67 | 69 | ] |
68 | 70 | }, |
69 | 71 | { |
70 | 72 | "cell_type": "markdown", |
71 | 73 | "metadata": {}, |
72 | 74 | "source": [ |
73 | | - "> **Number of maximum parallel connections**: Another noteworthy parameter is `max_connections`, which defines the number of maximum parallel connections when performing batch operations. If left unspecified, the default is 8 connections. For more information see the [Put Multiple Objects](#Put-Multiple-Objects) section in this notebook." |
| 75 | + "> **Number of maximum parallel connections**: Another noteworthy parameter is `max_connections`, which defines the number of maximum parallel connections when performing batch operations.\n", |
| 76 | + "> If left unspecified, the default is 8 connections.\n", |
| 77 | + "> For more information see the [Put Multiple Records](#Put-Multiple-Records) section in this tutorial." |
74 | 78 | ] |
75 | 79 | }, |
76 | 80 | { |
77 | 81 | "cell_type": "markdown", |
78 | 82 | "metadata": {}, |
79 | 83 | "source": [ |
80 | | - "### Set path" |
| 84 | + "### Set the Data Path" |
81 | 85 | ] |
82 | 86 | }, |
83 | 87 | { |
84 | 88 | "cell_type": "markdown", |
85 | 89 | "metadata": {}, |
86 | 90 | "source": [ |
87 | | - "All data in the platform is stored in user-defined data containers. In this case we use the predefined \"users\" container.\n" |
88 | | - "For more information, see the platform's [Data Containers](https://www.iguazio.com/docs/v3.0/data-layer/containers/) documentation." |
| 91 | + "All data in the platform is stored in user-defined data containers.\n", |
| 92 | + "This tutorial uses the predefined \"users\" container.\n", |
| 93 | + "For more information refer to the platform's [data-containers](https://www.iguazio.com/docs/v3.0/data-layer/containers/) documentation." |
89 | 94 | ] |
90 | 95 | }, |
91 | 96 | { |
|
101 | 106 | "cell_type": "markdown", |
102 | 107 | "metadata": {}, |
103 | 108 | "source": [ |
104 | | - "Data path where to store the kv table" |
| 109 | + "Set the data path for storing the stream:" |
105 | 110 | ] |
106 | 111 | }, |
107 | 112 | { |
|
165 | 170 | "cell_type": "markdown", |
166 | 171 | "metadata": {}, |
167 | 172 | "source": [ |
168 | | - "Adds records to a stream." |
| 173 | + "Use the `put` method to add records to a stream." |
169 | 174 | ] |
170 | 175 | }, |
171 | 176 | { |
172 | 177 | "cell_type": "markdown", |
173 | 178 | "metadata": {}, |
174 | 179 | "source": [ |
175 | | - "We'll define a function that will convert our text to lowercase words" |
| 180 | + "The following example defines a function that converts text to lowercase words:" |
176 | 181 | ] |
177 | 182 | }, |
178 | 183 | { |
|
205 | 210 | " \n", |
206 | 211 | " return words\n", |
207 | 212 | "\n", |
208 | | - "text1 = \"WOLF, meeting with a Lamb astray from the fold, resolved not to lay violent hands on him, but to find some plea to justify to the Lamb the Wolf's right to eat him. He thus addressed him: “Sirrah, last year you grossly insulted me.” “Indeed,” bleated the Lamb in a mournful tone of voice, “I was not then born.” Then said the Wolf, “You feed in my pasture.” “No, good sir,” replied the Lamb, “I have not yet tasted grass.” Again said the Wolf, “You drink of my well.” “No,” exclaimed the Lamb, “I never yet drank water, for as yet my mother's milk is both food and drink to me.” Upon which the Wolf seized him and ate him up, saying, “Well! I won't remain supperless, even though you refute every one of my imputations.” The tyrant will always find a pretext for his tyranny.\"\n", |
| 213 | + "text1 = \"WOLF, meeting with a Lamb astray from the fold, resolved not to lay violent hands on him, but to find some plea to justify to the Lamb the Wolf’s right to eat him. He thus addressed him: “Sirrah, last year you grossly insulted me.” “Indeed,” bleated the Lamb in a mournful tone of voice, “I was not then born.” Then said the Wolf, “You feed in my pasture.” “No, good sir,” replied the Lamb, “I have not yet tasted grass.” Again said the Wolf, “You drink of my well.” “No,” exclaimed the Lamb, “I never yet drank water, for as yet my mother’s milk is both food and drink to me.” Upon which the Wolf seized him and ate him up, saying, “Well! I won’t remain supperless, even though you refute every one of my imputations.” The tyrant will always find a pretext for his tyranny.\"\n", |
209 | 214 | "words1 = text_to_words(text1)\n", |
210 | 215 | "len(words1)" |
211 | 216 | ] |
|
214 | 219 | "cell_type": "markdown", |
215 | 220 | "metadata": {}, |
216 | 221 | "source": [ |
217 | | - "Convert the list of words to a record. A record is a list of dictionaries, where for each dictionary the `data` key contains the record data.\n", |
218 | | - "We display the first 5 records:" |
| 222 | + "The following code converts the list of words to a record.\n", |
| 223 | + "A record is a list of dictionaries, where for each dictionary the `data` key contains the record data.\n", |
| 224 | + "The sample code displays the first 5 records:" |
219 | 225 | ] |
220 | 226 | }, |
221 | 227 | { |
|
260 | 266 | "cell_type": "markdown", |
261 | 267 | "metadata": {}, |
262 | 268 | "source": [ |
263 | | - "Write another set of records to the stream" |
| 269 | + "The following code writes another set of records to the stream:" |
264 | 270 | ] |
265 | 271 | }, |
266 | 272 | { |
|
286 | 292 | "cell_type": "markdown", |
287 | 293 | "metadata": {}, |
288 | 294 | "source": [ |
289 | | - "Multiple consumer instances can consume data from the same stream. A consumer retrieves records from a specific shard. It is recommended that you distribute the data consumption among several consumer instances (“workers”), assigning each instance one or more shards.\n", |
| 295 | + "Multiple consumer instances can consume data from the same stream.\n", |
| 296 | + "A consumer retrieves records from a specific shard.\n", |
| 297 | + "It's recommended that you distribute the data consumption among several consumer instances (\"workers\"), assigning each instance one or more shards.\n", |
290 | 298 | "\n", |
291 | | - "For each shard, the consumer should determine the location within the shard from which to begin consuming records. This can be the earliest ingested record, the end of the shard, the first ingested record starting at a specific time, or a specific record identified by its sequence number (a unique record identifier that is assigned by the platform based on the record's ingestion time). The consumer first uses a seek operation to obtain the desired consumption location, and then provides this location as the starting point for its record consumption. The consumption operation returns the location of the next record to consume within the shard, and this location should be used as the location for a subsequent consumption operation, allowing for sequential record consumption." |
| 299 | + "For each shard, the consumer should determine the location within the shard from which to begin consuming records.\n", |
| 300 | + "This can be the earliest ingested record, the end of the shard, the first ingested record starting at a specific time, or a specific record identified by its sequence number (a unique record identifier that is assigned by the platform based on the record’s ingestion time).\n", |
| 301 | + "The consumer first uses a seek operation to obtain the desired consumption location, and then provides this location as the starting point for its record consumption.\n", |
| 302 | + "The consumption operation returns the location of the next record to consume within the shard, and this location should be used as the location for a subsequent consumption operation, allowing for sequential record consumption." |
292 | 303 | ] |
293 | 304 | }, |
294 | 305 | { |
|
309 | 320 | "cell_type": "markdown", |
310 | 321 | "metadata": {}, |
311 | 322 | "source": [ |
312 | | - "We will read from the stream, by default `get_records` limits the number of records returned to 1,000. For the sake of this demonstration we will limit the number of returned records to 10 by setting the `limit` parameter." |
| 323 | + "Use the `get_records` method to read from the stream (retrieve records).\n", |
| 324 | + "By default `get_records` limits the number of records returned to 1,000.\n", |
| 325 | + "For the sake of this demonstration, the sample code limits the number of returned records to 10 by setting the `limit` parameter." |
313 | 326 | ] |
314 | 327 | }, |
315 | 328 | { |
|
371 | 384 | "cell_type": "markdown", |
372 | 385 | "metadata": {}, |
373 | 386 | "source": [ |
374 | | - "We can retrieve a stream's configuration, including the shard count and retention period." |
| 387 | + "Use the `describe` method to retrieve a stream's configuration, including the shard count and retention period." |
375 | 388 | ] |
376 | 389 | }, |
377 | 390 | { |
|
422 | 435 | "cell_type": "markdown", |
423 | 436 | "metadata": {}, |
424 | 437 | "source": [ |
425 | | - "Updates a stream's configuration by increasing its shard count. The changes are applied immediately.\n", |
| 438 | + "Use the `update` method to updates a stream's configuration by increasing its shard count.\n", |
| 439 | + "The changes are applied immediately.\n", |
426 | 440 | "\n", |
427 | | - "> **Note**: You can increase the shard count at any time, but you cannot reduce it. From the platform's perspective, there is virtually no cost to creating even thousands of shards. However, if you increase a stream's shard count after its creation, new records with a previously used partition key will be assigned either to the same shard that was previously used for this partition key or to a new shard.\n" |
| 441 | + "> **Note**: You can increase the shard count at any time, but you cannot reduce it.\n", |
| 442 | + "> From the platform's perspective, there's virtually no cost to creating even thousands of shards.\n", |
| 443 | + "> However, if you increase a stream's shard count after its creation, new records with a previously used partition key will be assigned either to the same shard that was previously used for this partition key or to a new shard.\n", |
428 | 444 | "> For more information see the platform's [stream sharding and partitioning](https://www.iguazio.com/docs/v3.0/data-layer/stream/#stream-sharding-and-partitioning) documentation.\n", |
429 | 445 | "\n", |
430 | 446 | "> **Spark-Streaming Note**: To use the Spark Streaming API to consume records from new shards after a shard-count increase, you must first restart the consumer application." |
|
454 | 470 | "cell_type": "markdown", |
455 | 471 | "metadata": {}, |
456 | 472 | "source": [ |
457 | | - "Describe the stream again to see the updated shard count" |
| 473 | + "Describe the stream again to see the updated shard count:" |
458 | 474 | ] |
459 | 475 | }, |
460 | 476 | { |
|
487 | 503 | "cell_type": "markdown", |
488 | 504 | "metadata": {}, |
489 | 505 | "source": [ |
490 | | - "In the prior section when we get the records, we didn't get all words. The reason is that, by default, the platform assigns records to shards using a Round Robin algorithm, and our consumer code reads from a single shard. If you would like to ensure a consumer gets all the words, you can optionally assign a record to specific stream shard by specifying a related shard ID by setting the `shard_id` value, or associate the record with a specific partition key to ensure that similar records are assigned to the same shard (by setting the `partition_key` value).\n" |
491 | | - "For more information, see the platform's [stream sharding and partitioning](https://www.iguazio.com/docs/v3.0/data-layer/stream/#stream-sharding-and-partitioning) documentation." |
| 506 | + "In the previous section, when you retrieved the stream records you didn't get all words.\n", |
| 507 | + "The reason is that by default, the platform assigns records to shards using a Round Robin algorithm, and the sample consumer code reads from a single shard.\n", |
| 508 | + "If you would like to ensure that a consumer gets all the words, you can optionally assign a record to specific stream shard by specifying a related shard ID by setting the `shard_id` value, or associate the record with a specific partition key to ensure that similar records are assigned to the same shard (by setting the `partition_key` value).\n", |
| 509 | + "For more information see the platform's [stream sharding and partitioning](https://www.iguazio.com/docs/v3.0/data-layer/stream/#stream-sharding-and-partitioning) documentation." |
492 | 510 | ] |
493 | 511 | }, |
494 | 512 | { |
|
513 | 531 | "cell_type": "markdown", |
514 | 532 | "metadata": {}, |
515 | 533 | "source": [ |
516 | | - "Write the first text to shard 10" |
| 534 | + "Write the first text to shard 10:" |
517 | 535 | ] |
518 | 536 | }, |
519 | 537 | { |
|
531 | 549 | "cell_type": "markdown", |
532 | 550 | "metadata": {}, |
533 | 551 | "source": [ |
534 | | - "And write the second text to shard 11" |
| 552 | + "And write the second text to shard 11:" |
535 | 553 | ] |
536 | 554 | }, |
537 | 555 | { |
|
549 | 567 | "cell_type": "markdown", |
550 | 568 | "metadata": {}, |
551 | 569 | "source": [ |
552 | | - "Now, read from shard 10" |
| 570 | + "Now, read from shard 10:" |
553 | 571 | ] |
554 | 572 | }, |
555 | 573 | { |
|
611 | 629 | "cell_type": "markdown", |
612 | 630 | "metadata": {}, |
613 | 631 | "source": [ |
614 | | - "`put_records` accepts up to a maximum of 1,000 records. If the records limit is exceeded, the request fails. Therefore, we would need to call `put_records` multiple times. In this example, we'll send large number of events. We will create a generator that returns a list of events. Each event has the following record:\n", |
615 | | - "\n", |
| 632 | + "`put_records` accepts up to a maximum of 1,000 records.\n", |
| 633 | + "If the records limit is exceeded, the request fails.\n", |
| 634 | + "Therefore, you need to call `put_records` multiple times.\n", |
| 635 | + "The following example sends a large number of events.\n", |
| 636 | + "It creates a generator that returns a list of events.\n", |
| 637 | + "Each event has the following record:\n", |
616 | 638 | "``` python\n", |
617 | 639 | "'user': <user_id>\n", |
618 | 640 | "'time': <event_time>\n", |
619 | 641 | "'url': <url>\n", |
620 | 642 | "```\n", |
621 | | - "Each user will be selected at random, the URL will also be a random string and the generated events will have their event_time monotonically increasing" |
| 643 | + "Each user is selected at random.\n", |
| 644 | + "The URL is also be a random string, and the `event_time` of the generated events is monotonically increased." |
622 | 645 | ] |
623 | 646 | }, |
624 | 647 | { |
|
663 | 686 | "cell_type": "markdown", |
664 | 687 | "metadata": {}, |
665 | 688 | "source": [ |
666 | | - "Test the generator, print just a few events" |
| 689 | + "Test the generator by printing a few events:" |
667 | 690 | ] |
668 | 691 | }, |
669 | 692 | { |
|
695 | 718 | "cell_type": "markdown", |
696 | 719 | "metadata": {}, |
697 | 720 | "source": [ |
698 | | - "Seek the latest position in one of the shards, in order to later retrieve the new data" |
| 721 | + "Seek the latest position in one of the shards, in order to later retrieve the new data:" |
699 | 722 | ] |
700 | 723 | }, |
701 | 724 | { |
|
742 | 765 | "cell_type": "markdown", |
743 | 766 | "metadata": {}, |
744 | 767 | "source": [ |
745 | | - "The looped `put_records` interface above will send all `put records` requests to the data layer in parallel. When `wait` is called, it will block until either all responses arrive (in which case it will return a `Responses` object, containing the `responses` of each call) or an error occurs - in which case an exception is thrown. You can pass `raise_for_status` to `wait`, and it behaves as explained above.\n", |
| 768 | + "The looped `put_records` interface in the previous code block sends all `put_records` requests to the data layer in parallel.\n", |
| 769 | + "When `wait` is called, it blocks until either all responses arrive — in which case it returns a `Responses` object that contains the `responses` of each call — or an error occurs — in which case an exception is thrown.\n", |
| 770 | + "You can pass `raise_for_status` to `wait`, and it behaves as previously explained.\n", |
746 | 771 | "\n", |
747 | | - "> Note: The `batch` object is stateful, so you can only create one batch at a time. However, you can create multiple parallel batches yourself through the client's `create_batch()` interface" |
| 772 | + "> **Note:** The `batch` object is stateful, therefore you can only create one batch at a time.\n", |
| 773 | + "> However, you can create multiple parallel batches yourself through the client's `create_batch` interface." |
748 | 774 | ] |
749 | 775 | }, |
750 | 776 | { |
751 | 777 | "cell_type": "markdown", |
752 | 778 | "metadata": {}, |
753 | 779 | "source": [ |
754 | | - "Test the received record from the previously obtained location" |
| 780 | + "Test the received record from the previously obtained location:" |
755 | 781 | ] |
756 | 782 | }, |
757 | 783 | { |
|
815 | 841 | "cell_type": "markdown", |
816 | 842 | "metadata": {}, |
817 | 843 | "source": [ |
818 | | - "Alternatively you can use the following commands:\n", |
819 | | - "``` python\n", |
| 844 | + "Alternatively, you can use the following command:" |
| 845 | + ] |
| 846 | + }, |
| 847 | + { |
| 848 | + "cell_type": "code", |
| 849 | + "execution_count": null, |
| 850 | + "metadata": {}, |
| 851 | + "outputs": [], |
| 852 | + "source": [ |
820 | 853 | "import shutil\n", |
821 | 854 | "V3IO_STREAM_PATH = path.join(sep, 'v3io', CONTAINER, STREAM_PATH)\n", |
822 | | - "shutil.rmtree(V3IO_STREAM_PATH)\n", |
823 | | - "```\n", |
824 | | - "\n", |
825 | | - "or\n", |
826 | | - "\n", |
827 | | - "```\n", |
828 | | - "!rm -r $V3IO_STREAM_PATH\n", |
829 | | - "```" |
| 855 | + "shutil.rmtree(V3IO_STREAM_PATH)" |
| 856 | + ] |
| 857 | + }, |
| 858 | + { |
| 859 | + "cell_type": "markdown", |
| 860 | + "metadata": {}, |
| 861 | + "source": [ |
| 862 | + "Or use this command:" |
| 863 | + ] |
| 864 | + }, |
| 865 | + { |
| 866 | + "cell_type": "code", |
| 867 | + "execution_count": null, |
| 868 | + "metadata": {}, |
| 869 | + "outputs": [], |
| 870 | + "source": [ |
| 871 | + "!rm -r $V3IO_STREAM_PATH" |
830 | 872 | ] |
831 | 873 | } |
832 | 874 | ], |
|
0 commit comments