Skip to content

Commit aa095a0

Browse files
committed
Pipeline samples for python dependency test
1 parent 94405cf commit aa095a0

File tree

20 files changed

+684
-0
lines changed

20 files changed

+684
-0
lines changed

python-dependency/.gitignore

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Intellij
2+
.idea
3+
*.iml
4+
5+
# Python virtual environment and build
6+
env/
7+
downloads/
8+
__pycache__
9+
*.pyc
10+
*.egg-info
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import logging
2+
3+
from apache_beam import CombineGlobally
4+
from apache_beam import Create
5+
from apache_beam import ParDo
6+
from apache_beam import Pipeline
7+
from apache_beam.options.pipeline_options import PipelineOptions
8+
from apache_beam.options.pipeline_options import SetupOptions
9+
10+
11+
def main():
12+
options = PipelineOptions()
13+
options.view_as(SetupOptions).save_main_session = True
14+
15+
p = Pipeline(options=options)
16+
17+
start = 1
18+
end = 100
19+
(p
20+
| 'From {} to {}'.format(start, end) >> Create(list(range(start, end + 1)))
21+
| 'Sum' >> CombineGlobally(sum)
22+
| 'Print' >> ParDo(
23+
lambda total: logging.info('Sum from 1 to 100 is %s', total)))
24+
25+
p.run()
26+
27+
28+
if __name__ == '__main__':
29+
logging.getLogger().setLevel(logging.INFO)
30+
main()

python-dependency/example0/run.sh

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#!/bin/bash
2+
3+
# Move to workdir
4+
WORK_DIR="$(cd "$(dirname $0)"; pwd)"
5+
cd "$WORK_DIR"
6+
7+
# Create venv
8+
echo "## Create venv."
9+
python -m venv env
10+
source env/bin/activate
11+
12+
# Install apache-beam
13+
echo "## Install apache-beam[gcp]."
14+
pip install --upgrade pip
15+
pip install apache-beam[gcp]==2.35.0
16+
17+
# Run pipeline
18+
echo "## Run pipeline."
19+
PROJECT=$(gcloud config get-value project)
20+
REGION=us-central1
21+
22+
python -m print_sum --runner=DataflowRunner \
23+
--project=$PROJECT \
24+
--region=$REGION \
25+
--job_name= \
26+
--experiments=use_runner_v2 \
27+
--experiments=no_use_multiple_sdk_containers # To run single SDK container
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
google-cloud-translate==3.6.1

python-dependency/example1/run.sh

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/bin/bash
2+
3+
# Move to workdir
4+
WORK_DIR="$(cd "$(dirname $0)"; pwd)"
5+
cd "$WORK_DIR"
6+
7+
# Create venv
8+
echo "## Create venv."
9+
python -m venv env
10+
source env/bin/activate
11+
12+
# Install apache-beam
13+
echo "## Install apache-beam[gcp] and dependency packages."
14+
pip install --upgrade pip
15+
pip install apache-beam[gcp]==2.35.0 google-cloud-translate==3.6.1
16+
17+
# Run pipeline
18+
echo "## Run pipeline with '--requirements_file'."
19+
PROJECT=$(gcloud config get-value project)
20+
REGION=us-central1
21+
22+
python -m translate --runner=DataflowRunner \
23+
--project=$PROJECT \
24+
--region=$REGION \
25+
--job_name=example1 \
26+
--experiments=use_runner_v2 \
27+
--experiments=no_use_multiple_sdk_containers \
28+
--requirements_file=requirements.txt
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import logging
2+
3+
from apache_beam import Create
4+
from apache_beam import DoFn
5+
from apache_beam import Map
6+
from apache_beam import ParDo
7+
from apache_beam import Pipeline
8+
from apache_beam.options.pipeline_options import GoogleCloudOptions
9+
from apache_beam.options.pipeline_options import PipelineOptions
10+
from apache_beam.options.pipeline_options import SetupOptions
11+
from google.cloud import translate
12+
13+
14+
class Translate(DoFn):
15+
def __init__(self, project, source_language_code, target_language_code):
16+
self._client = None
17+
self._project = project
18+
self._source_code = source_language_code
19+
self._target_code = target_language_code
20+
21+
def setup(self):
22+
self._client = translate.TranslationServiceClient()
23+
24+
def process(self, text):
25+
# See https://cloud.google.com/translate/docs/advanced/quickstart
26+
request = {
27+
'parent': f'projects/{self._project}/locations/global',
28+
'contents': [text],
29+
'mime_type': 'text/plain',
30+
'source_language_code': self._source_code,
31+
'target_language_code': self._target_code,
32+
}
33+
34+
response = self._client.translate_text(request)
35+
36+
translated_text = None
37+
for translation in response.translations:
38+
translated_text = translation.translated_text
39+
break
40+
41+
yield text, translated_text
42+
43+
44+
def main():
45+
options = PipelineOptions()
46+
options.view_as(SetupOptions).save_main_session = True
47+
48+
project = options.view_as(GoogleCloudOptions).project
49+
assert project is not None, '"project" is not specified.'
50+
51+
source_code = 'en-US'
52+
target_code = 'ja'
53+
texts = ['Hello', 'Thank you', 'Goodbye']
54+
55+
p = Pipeline(options=options)
56+
(p
57+
| 'Texts' >> Create(texts)
58+
| 'Translate' >> ParDo(Translate(project, source_code, target_code))
59+
| 'Print' >> Map(lambda pair: logging.info('%s -> %s', pair[0], pair[1])))
60+
61+
p.run()
62+
63+
64+
if __name__ == '__main__':
65+
logging.getLogger().setLevel(logging.INFO)
66+
main()

python-dependency/example2/run.sh

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#!/bin/bash
2+
3+
# Move to workdir
4+
WORK_DIR="$(cd "$(dirname $0)"; pwd)"
5+
cd "$WORK_DIR"
6+
7+
# Create venv
8+
echo "## Create venv."
9+
python -m venv env
10+
source env/bin/activate
11+
12+
# Install apache-beam
13+
echo "## Install apache-beam[gcp] and dependency packages."
14+
pip install --upgrade pip
15+
pip install apache-beam[gcp]==2.35.0 google-cloud-translate==3.6.1
16+
17+
# Download google-cloud-translate source package
18+
echo "## Download google-cloud-translate source package."
19+
pip download --no-binary :all: --no-deps --dest downloads google-cloud-translate==3.6.1
20+
SOURCE_PACKAGE=$(ls downloads/*)
21+
22+
# Run pipeline
23+
echo "## Run pipeline with '--extra_packages'."
24+
PROJECT=$(gcloud config get-value project)
25+
REGION=us-central1
26+
27+
python -m translate --runner=DataflowRunner \
28+
--project=$PROJECT \
29+
--region=$REGION \
30+
--job_name=example2 \
31+
--experiments=use_runner_v2 \
32+
--experiments=no_use_multiple_sdk_containers \
33+
--extra_packages=$SOURCE_PACKAGE
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import logging
2+
3+
from apache_beam import Create
4+
from apache_beam import DoFn
5+
from apache_beam import Map
6+
from apache_beam import ParDo
7+
from apache_beam import Pipeline
8+
from apache_beam.options.pipeline_options import GoogleCloudOptions
9+
from apache_beam.options.pipeline_options import PipelineOptions
10+
from apache_beam.options.pipeline_options import SetupOptions
11+
from google.cloud import translate
12+
13+
14+
class Translate(DoFn):
15+
def __init__(self, project, source_language_code, target_language_code):
16+
self._client = None
17+
self._project = project
18+
self._source_code = source_language_code
19+
self._target_code = target_language_code
20+
21+
def setup(self):
22+
self._client = translate.TranslationServiceClient()
23+
24+
def process(self, text):
25+
# See https://cloud.google.com/translate/docs/advanced/quickstart
26+
request = {
27+
'parent': f'projects/{self._project}/locations/global',
28+
'contents': [text],
29+
'mime_type': 'text/plain',
30+
'source_language_code': self._source_code,
31+
'target_language_code': self._target_code,
32+
}
33+
34+
response = self._client.translate_text(request)
35+
36+
translated_text = None
37+
for translation in response.translations:
38+
translated_text = translation.translated_text
39+
break
40+
41+
yield text, translated_text
42+
43+
44+
def main():
45+
options = PipelineOptions()
46+
options.view_as(SetupOptions).save_main_session = True
47+
48+
project = options.view_as(GoogleCloudOptions).project
49+
assert project is not None, '"project" is not specified.'
50+
51+
source_code = 'en-US'
52+
target_code = 'ja'
53+
texts = ['Hello', 'Thank you', 'Goodbye']
54+
55+
p = Pipeline(options=options)
56+
(p
57+
| 'Texts' >> Create(texts)
58+
| 'Translate' >> ParDo(Translate(project, source_code, target_code))
59+
| 'Print' >> Map(lambda pair: logging.info('%s -> %s', pair[0], pair[1])))
60+
61+
p.run()
62+
63+
64+
if __name__ == '__main__':
65+
logging.getLogger().setLevel(logging.INFO)
66+
main()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
lxml==4.7.1

python-dependency/example3/run.sh

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/bin/bash
2+
3+
# Move to workdir
4+
WORK_DIR="$(cd "$(dirname $0)"; pwd)"
5+
cd "$WORK_DIR"
6+
7+
# Create venv
8+
echo "## Create venv."
9+
python -m venv env
10+
source env/bin/activate
11+
12+
# Install apache-beam
13+
echo "## Install apache-beam[gcp] and dependency packages."
14+
pip install --upgrade pip
15+
pip install apache-beam[gcp]==2.35.0 lxml==4.7.1
16+
17+
# Run pipeline
18+
echo "## Run pipeline with '--requirements_file'."
19+
PROJECT=$(gcloud config get-value project)
20+
REGION=us-central1
21+
22+
python -m translate --runner=DataflowRunner \
23+
--project=$PROJECT \
24+
--region=$REGION \
25+
--job_name=example3 \
26+
--experiments=use_runner_v2 \
27+
--experiments=no_use_multiple_sdk_containers \
28+
--requirements_file=requirements.txt

0 commit comments

Comments
 (0)