File tree Expand file tree Collapse file tree 2 files changed +29
-0
lines changed
sdks/python/apache_beam/yaml Expand file tree Collapse file tree 2 files changed +29
-0
lines changed Original file line number Diff line number Diff line change @@ -224,6 +224,12 @@ def _validate_schema():
224224 return (
225225 schema_pb2 .Schema (fields = [schemas .schema_field ('payload' , bytes )]),
226226 lambda payload : beam .Row (payload = payload ))
227+ if format == 'STRING' :
228+ if schema :
229+ raise ValueError ('STRING format does not take a schema' )
230+ return (
231+ schema_pb2 .Schema (fields = [schemas .schema_field ('payload' , str )]),
232+ lambda payload : beam .Row (payload = payload .decode ('utf-8' )))
227233 elif format == 'JSON' :
228234 _validate_schema ()
229235 beam_schema = json_utils .json_schema_to_beam_schema (schema )
@@ -313,6 +319,7 @@ def read_from_pubsub(
313319
314320 - RAW: Produces records with a single `payload` field whose contents
315321 are the raw bytes of the pubsub message.
322+ - STRING: Like RAW, but the bytes are decoded as a UTF-8 string.
316323 - AVRO: Parses records with a given Avro schema.
317324 - JSON: Parses records with a given JSON schema.
318325
Original file line number Diff line number Diff line change @@ -101,6 +101,28 @@ def test_simple_read(self):
101101 result ,
102102 equal_to ([beam .Row (payload = b'msg1' ), beam .Row (payload = b'msg2' )]))
103103
104+ def test_simple_read_string (self ):
105+ with beam .Pipeline (options = beam .options .pipeline_options .PipelineOptions (
106+ pickle_library = 'cloudpickle' )) as p :
107+ with mock .patch ('apache_beam.io.ReadFromPubSub' ,
108+ FakeReadFromPubSub (
109+ topic = 'my_topic' ,
110+ messages = [PubsubMessage ('äter' .encode ('utf-8' ),
111+ {'attr' : 'value1' }),
112+ PubsubMessage ('köttbullar' .encode ('utf-8' ),
113+ {'attr' : 'value2' })])):
114+ result = p | YamlTransform (
115+ '''
116+ type: ReadFromPubSub
117+ config:
118+ topic: my_topic
119+ format: STRING
120+ ''' )
121+ assert_that (
122+ result ,
123+ equal_to ([beam .Row (payload = 'äter' ),
124+ beam .Row (payload = 'köttbullar' )]))
125+
104126 def test_read_with_attribute (self ):
105127 with beam .Pipeline (options = beam .options .pipeline_options .PipelineOptions (
106128 pickle_library = 'cloudpickle' )) as p :
You can’t perform that action at this time.
0 commit comments