pyspark.sql.protobuf.functions.to_protobuf¶
- 
pyspark.sql.protobuf.functions.to_protobuf(data: ColumnOrName, messageName: str, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None, binaryDescriptorSet: Optional[bytes] = None) → pyspark.sql.column.Column[source]¶
- Converts a column into binary of protobuf format. The Protobuf definition is provided in one of these ways: - Protobuf descriptor file: E.g. a descriptor file created with
- protoc –include_imports –descriptor_set_out=abc.desc abc.proto 
 
- Protobuf descriptor as binary: Rather than file path as in previous option, we can provide the binary content of the file. This allows flexibility in how the descriptor set is created and fetched. 
- Jar containing Protobuf Java class: The jar containing Java class should be shaded. Specifically, com.google.protobuf.* should be shaded to org.sparkproject.spark_protobuf.protobuf.*. https://github.com/rangadi/shaded-protobuf-classes is useful to create shaded jar from Protobuf files. The jar file can be added with spark-submit option –jars. 
 - New in version 3.4.0. - Changed in version 3.5.0: Supports binaryDescriptorSet arg to pass binary descriptor directly. Supports Spark Connect. - Parameters
- dataColumnor str
- the data column. 
- messageName: str, optional
- the protobuf message name to look for in descriptor file, or The Protobuf class name when descFilePath parameter is not set. E.g. com.example.protos.ExampleEvent. 
- descFilePathstr, optional
- the Protobuf descriptor file. 
- optionsdict, optional
- binaryDescriptorSet: bytes, optional
- The Protobuf FileDescriptorSet serialized as binary. 
 
- data
 - Notes - Protobuf functionality is provided as a pluggable external module - Examples - >>> import tempfile >>> data = [([(2, "Alice", 13093020)])] >>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>" >>> df = spark.createDataFrame(data, ddl_schema) >>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' ... '26F746F33') >>> # Writing a protobuf description into a file, generated by using >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file >>> with tempfile.TemporaryDirectory() as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) ... f.flush() ... message_name = 'SimpleMessage' ... proto_df = df.select( # With file name for descriptor ... to_protobuf(df.value, message_name, desc_file_path).alias("suite")) ... proto_df.show(truncate=False) ... proto_df_2 = df.select( # With binary for descriptor ... to_protobuf(df.value, message_name, ... binaryDescriptorSet=bytearray.fromhex(desc_hex)) ... .alias("suite")) ... proto_df_2.show(truncate=False) +-------------------------------------------+ |suite | +-------------------------------------------+ |[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]| +-------------------------------------------+ +-------------------------------------------+ |suite | +-------------------------------------------+ |[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]| +-------------------------------------------+ >>> data = [([(1668035962, 2020)])] >>> ddl_schema = "value struct<seconds: LONG, nanos: INT>" >>> df = spark.createDataFrame(data, ddl_schema) >>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp" >>> proto_df = df.select(to_protobuf(df.value, message_class_name).alias("suite")) >>> proto_df.show(truncate=False) +----------------------------+ |suite | +----------------------------+ |[08 FA EA B0 9B 06 10 E4 0F]| +----------------------------+