Skip to content

Commit 850e4c6

Browse files
authored
Merge pull request #658 from lsieradzki/feature/#657
#657
2 parents a1b9e17 + d719b0e commit 850e4c6

File tree

4 files changed

+163
-0
lines changed

4 files changed

+163
-0
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
features:
2+
- gcp_pubsub_subscription - allows to create GCS subscription

plugins/modules/gcp_pubsub_subscription.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,65 @@
6363
}}"'
6464
required: true
6565
type: dict
66+
cloud_storage:
67+
description:
68+
- Cloud Storage Subscription configuration.
69+
required: false
70+
type: dict
71+
suboptions:
72+
bucket:
73+
description:
74+
- A reference to a Bucket resource.
75+
- This field represents a name of a Bucket resource in GCP where messages read from a Topic will be stored.
76+
required: true
77+
type: str
78+
file_prefix:
79+
description:
80+
- File object name prefix stored in a Bucket.
81+
required: false
82+
type: str
83+
file_suffix:
84+
description:
85+
- File object name suffix stored in a Bucket.
86+
required: false
87+
type: str
88+
file_datetime_format:
89+
description:
90+
- File object datetime format stored in a Bucket.
91+
required: false
92+
type: str
93+
max_duration:
94+
description:
95+
- Subscription writes a new output file if the specified value of max duration is exceeded. Min 60s, max 600s.
96+
required: true
97+
type: str
98+
max_bytes:
99+
description:
100+
- Cloud Storage Subscription writes a new output file if the specified value of max bytes is exceeded. Min 1000, max 10737418240.
101+
required: false
102+
type: int
103+
max_messages:
104+
description:
105+
- Cloud Storage Subscription writes a new output file if the specified number of messages is exceeded. Min 1000.
106+
required: false
107+
type: int
108+
output_format:
109+
description:
110+
- Specify the format of the output files that are to be stored in a Cloud Storage bucket as text or avro.
111+
required: true
112+
type: str
113+
write_metadata:
114+
description:
115+
- This option allows to store the message metadata along with the message, e.g. message headers.
116+
- This field is valid for avro message format only.
117+
required: false
118+
type: bool
119+
use_topic_schema:
120+
description:
121+
- When true, the output Cloud Storage file will be serialized using the topic schema, if it exists.
122+
- This field is valid for avro message format only.
123+
required: false
124+
type: bool
66125
labels:
67126
description:
68127
- A set of key/value label pairs to assign to this Subscription.
@@ -576,6 +635,21 @@ def main():
576635
state=dict(default='present', choices=['present', 'absent'], type='str'),
577636
name=dict(required=True, type='str'),
578637
topic=dict(required=True, type='dict'),
638+
cloud_storage=dict(
639+
type='dict',
640+
options=dict(
641+
bucket=dict(required=True, type='str'),
642+
file_prefix=dict(type='str'),
643+
file_suffix=dict(type='str'),
644+
file_datetime_format=dict(type='str'),
645+
max_duration=dict(type='str'),
646+
max_bytes=dict(type='int'),
647+
max_messages=dict(type='int'),
648+
output_format=dict(type='str'),
649+
write_metadata=dict(type='bool'),
650+
use_topic_schema=dict(type='bool'),
651+
),
652+
),
579653
labels=dict(type='dict'),
580654
push_config=dict(
581655
type='dict',
@@ -642,6 +716,8 @@ def update(module, link, fetch):
642716

643717
def updateMask(request, response):
644718
update_mask = []
719+
if request.get('cloudStorageConfig') != response.get('cloudStorageConfig'):
720+
update_mask.append('cloudStorageConfig')
645721
if request.get('labels') != response.get('labels'):
646722
update_mask.append('labels')
647723
if request.get('pushConfig') != response.get('pushConfig'):
@@ -671,6 +747,7 @@ def resource_to_request(module):
671747
u'name': name_pattern(module.params.get('name'), module),
672748
u'topic': topic_pattern(replace_resource_dict(module.params.get(u'topic', {}), 'name'), module),
673749
u'labels': module.params.get('labels'),
750+
u'cloudStorageConfig': SubscriptionCloudStorageConfig(module.params.get(u'cloud_storage', {}), module).to_request(),
674751
u'pushConfig': SubscriptionPushconfig(module.params.get('push_config', {}), module).to_request(),
675752
u'ackDeadlineSeconds': module.params.get('ack_deadline_seconds'),
676753
u'messageRetentionDuration': module.params.get('message_retention_duration'),
@@ -748,6 +825,7 @@ def response_to_hash(module, response):
748825
u'name': name_pattern(module.params.get('name'), module),
749826
u'topic': topic_pattern(replace_resource_dict(module.params.get(u'topic', {}), 'name'), module),
750827
u'labels': response.get(u'labels'),
828+
u'cloudStorageConfig': SubscriptionCloudStorageConfig(response.get(u'cloudStorageConfig', {}), module).from_response(),
751829
u'pushConfig': SubscriptionPushconfig(response.get(u'pushConfig', {}), module).from_response(),
752830
u'ackDeadlineSeconds': response.get(u'ackDeadlineSeconds'),
753831
u'messageRetentionDuration': response.get(u'messageRetentionDuration'),
@@ -881,5 +959,47 @@ def from_response(self):
881959
return remove_nones_from_dict({u'minimumBackoff': self.request.get(u'minimumBackoff'), u'maximumBackoff': self.request.get(u'maximumBackoff')})
882960

883961

962+
class SubscriptionCloudStorageConfig(object):
963+
def __init__(self, request, module):
964+
self.module = module
965+
if request:
966+
self.request = request
967+
else:
968+
self.request = {}
969+
970+
def to_request(self):
971+
return remove_nones_from_dict(
972+
{
973+
u'bucket': self.request.get('bucket'),
974+
u'filenamePrefix': self.request.get('file_prefix', {}),
975+
u'filenameSuffix': self.request.get('file_suffix', {}),
976+
u'filenameDatetimeFormat': self.request.get('file_datetime_format', {}),
977+
u'maxDuration': self.request.get('max_duration', {}),
978+
u'maxBytes': self.request.get('max_bytes', {}),
979+
u'maxMessages': self.request.get('max_messages', {}),
980+
u'avroConfig': {'writeMetadata': self.request.get('write_metadata', False),
981+
'useTopicSchema': self.request.get('use_topic_schema', False)}
982+
if self.request.get('output_format', {}) == 'avro'
983+
else {},
984+
}
985+
)
986+
987+
def from_response(self):
988+
storageConfig = {
989+
u'bucket': self.request.get('bucket', {}),
990+
u'filenamePrefix': self.request.get('filenamePrefix', {}),
991+
u'filenameSuffix': self.request.get('filenameSuffix', {}),
992+
u'filenameDatetimeFormat': self.request.get('filenameDatetimeFormat', {}),
993+
u'maxDuration': self.request.get('maxDuration', {}),
994+
u'maxBytes': self.request.get('maxBytes', {}),
995+
u'maxMessages': self.request.get('maxMessages', {}),
996+
u'avroConfig': {'writeMetadata': self.request.get('avroConfig', {}).get('writeMetadata', False),
997+
'useTopicSchema': self.request.get('avroConfig', {}).get('useTopicSchema', False)}
998+
if self.request.get('avroConfig', {})
999+
else {},
1000+
}
1001+
return remove_nones_from_dict(storageConfig) if self.request else storageConfig
1002+
1003+
8841004
if __name__ == '__main__':
8851005
main()

scripts/bootstrap-project.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ SERVICE_LIST=(
3232

3333
REQUIRED_ROLE_LIST=(
3434
"roles/storage.objectAdmin"
35+
"roles/storage.legacyBucketReader"
36+
"roles/storage.objectCreator"
3537
"roles/source.admin"
3638
)
3739

tests/integration/targets/gcp_pubsub_subscription/tasks/autogen.yml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@
2121
service_account_file: "{{ gcp_cred_file | default(omit) }}"
2222
state: present
2323
register: topic
24+
- name: Create a bucket
25+
google.cloud.gcp_storage_bucket:
26+
name: topic-subscription-bucket
27+
project: "{{ gcp_project }}"
28+
auth_kind: "{{ gcp_cred_kind }}"
29+
service_account_file: "{{ gcp_cred_file | default(omit) }}"
30+
state: present
2431
- name: Delete a subscription
2532
google.cloud.gcp_pubsub_subscription:
2633
name: "{{ resource_name }}"
@@ -88,6 +95,30 @@
8895
that:
8996
- result.changed == true
9097
#----------------------------------------------------------
98+
- name: Update cloudStorageConfig of a subscription that already exists
99+
google.cloud.gcp_pubsub_subscription:
100+
name: "{{ resource_name }}"
101+
topic: "{{ topic }}"
102+
ack_deadline_seconds: 300
103+
cloud_storage: {
104+
bucket: "topic-subscription-bucket",
105+
file_prefix: "test_",
106+
file_suffix: "_test",
107+
max_bytes: 10737418240,
108+
max_duration: "600s",
109+
output_format: "avro",
110+
write_metadata: true
111+
}
112+
project: "{{ gcp_project }}"
113+
auth_kind: "{{ gcp_cred_kind }}"
114+
service_account_file: "{{ gcp_cred_file | default(omit) }}"
115+
state: present
116+
register: result
117+
- name: Assert changed is true
118+
ansible.builtin.assert:
119+
that:
120+
- result.changed == true
121+
#----------------------------------------------------------
91122
- name: Delete a subscription
92123
google.cloud.gcp_pubsub_subscription:
93124
name: "{{ resource_name }}"
@@ -141,3 +172,11 @@
141172
state: absent
142173
register: topic
143174
ignore_errors: true
175+
176+
- name: Delete a bucket
177+
google.cloud.gcp_storage_bucket:
178+
name: topic-subscription-bucket
179+
project: "{{ gcp_project }}"
180+
auth_kind: "{{ gcp_cred_kind }}"
181+
service_account_file: "{{ gcp_cred_file | default(omit) }}"
182+
state: absent

0 commit comments

Comments
 (0)