30
30
import base64
31
31
import bz2
32
32
import io
33
+ import sys
33
34
import threading
34
35
import zlib
35
36
40
41
except (ImportError , ModuleNotFoundError ):
41
42
pass
42
43
44
+
45
+ def _get_proto_enum_descriptor_class ():
46
+ try :
47
+ from google .protobuf .internal import api_implementation
48
+ except ImportError :
49
+ return None
50
+
51
+ implementation_type = api_implementation .Type ()
52
+
53
+ if implementation_type == 'upb' :
54
+ try :
55
+ from google ._upb ._message import EnumDescriptor
56
+ return EnumDescriptor
57
+ except ImportError :
58
+ pass
59
+ elif implementation_type == 'cpp' :
60
+ try :
61
+ from google .protobuf .pyext ._message import EnumDescriptor
62
+ return EnumDescriptor
63
+ except ImportError :
64
+ pass
65
+ elif implementation_type == 'python' :
66
+ try :
67
+ from google .protobuf .internal .python_message import EnumDescriptor
68
+ return EnumDescriptor
69
+ except ImportError :
70
+ pass
71
+
72
+ return None
73
+
74
+
75
+ EnumDescriptor = _get_proto_enum_descriptor_class ()
76
+
43
77
# Pickling, especially unpickling, causes broken module imports on Python 3
44
78
# if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884.
45
79
_pickle_lock = threading .RLock ()
46
80
RLOCK_TYPE = type (_pickle_lock )
81
+ LOCK_TYPE = type (threading .Lock ())
82
+
83
+
84
+ def _reconstruct_enum_descriptor (full_name ):
85
+ for _ , module in sys .modules .items ():
86
+ if not hasattr (module , 'DESCRIPTOR' ):
87
+ continue
88
+
89
+ for _ , attr_value in vars (module ).items ():
90
+ if not hasattr (attr_value , 'DESCRIPTOR' ):
91
+ continue
92
+
93
+ if hasattr (attr_value .DESCRIPTOR , 'enum_types_by_name' ):
94
+ for (_ , enum_desc ) in attr_value .DESCRIPTOR .enum_types_by_name .items ():
95
+ if enum_desc .full_name == full_name :
96
+ return enum_desc
97
+ raise ImportError (f'Could not find enum descriptor: { full_name } ' )
98
+
99
+
100
+ def _pickle_enum_descriptor (obj ):
101
+ full_name = obj .full_name
102
+ return _reconstruct_enum_descriptor , (full_name , )
47
103
48
104
49
105
def dumps (o , enable_trace = True , use_zlib = False ) -> bytes :
@@ -59,6 +115,12 @@ def dumps(o, enable_trace=True, use_zlib=False) -> bytes:
59
115
pickler .dispatch_table [RLOCK_TYPE ] = _pickle_rlock
60
116
except NameError :
61
117
pass
118
+ try :
119
+ pickler .dispatch_table [LOCK_TYPE ] = _lock_reducer
120
+ except NameError :
121
+ pass
122
+ if EnumDescriptor is not None :
123
+ pickler .dispatch_table [EnumDescriptor ] = _pickle_enum_descriptor
62
124
pickler .dump (o )
63
125
s = file .getvalue ()
64
126
@@ -106,6 +168,10 @@ def _pickle_rlock(obj):
106
168
return RLOCK_TYPE , tuple ([])
107
169
108
170
171
+ def _lock_reducer (obj ):
172
+ return threading .Lock , tuple ([])
173
+
174
+
109
175
def dump_session (file_path ):
110
176
# It is possible to dump session with cloudpickle. However, since references
111
177
# are saved it should not be necessary. See https://s.apache.org/beam-picklers
0 commit comments