Author: Fu Dian
Python custom function is one of the most important functions in PyFlink Table API. It allows users to use custom functions developed in Python language in PyFlink Table API, which greatly broadens the scope of use of Python Table API.
At present, the functions of Python custom functions are very complete, supporting various types of custom functions, such as UDF (scalar function), UDTF (table function), UDAF (aggregate function), UDTAF (table aggregate function, 1.13 support), Panda UDF, Pandas UDAF, etc. Next, we will introduce in detail how to use Python custom functions in PyFlink Table API jobs.
Python custom function basics
According to the number of rows of input/output data, in Flink Table API & SQL, custom functions can be divided into the following categories:
|Custom function||Single Row Input||Multiple Row Input|
|Single Row Output||ScalarFunction||AggregateFunction|
|Multiple Row Output||TableFunction||TableAggregateFunction|
PyFlink provides support for the above four types of custom functions. Next, let's take a look at how to use each type of custom function.
Python UDF, or Python ScalarFunction, produces only one output data for each input data. For example, the following example shows a variety of ways to define a Python UDF named "sub_string":
from pyflink.table.udf import udf, FunctionContext, ScalarFunction from pyflink.table import DataTypes method one: @udf(result_type=DataTypes.STRING()) def sub_string(s: str, begin: int, end: int): return s[begin:end] Way two: sub_string = udf(lambda s, begin, end: s[begin:end], result_type=DataTypes.STRING()) Way three: class SubString(object): def __call__(self, s: str, begin: int, end: int): return s[begin:end] sub_string = udf(SubString(), result_type=DataTypes.STRING()) Way four: def sub_string(s: str, begin: int, end: int): return s[begin:end] sub_string_begin_1 = udf(functools.partial(sub_string, begin=1), result_type=DataTypes.STRING()) Way Five: class SubString(ScalarFunction): def open(self, function_context: FunctionContext): pass def eval(self, s: str, begin: int, end: int): return s[begin:end] sub_string = udf(SubString(), result_type=DataTypes.STRING()) Copy code
- Need to declare that this is a scalar function through a decorator named "udf";
- Need to declare the result type of the scalar function through the result_type parameter in the decorator;
- In the fifth method above, defining Python UDF by inheriting ScalarFunction has the following uses:
- The base class UserDefinedFunction of ScalarFunction defines an open method, which is only executed once when the job is initialized, so you can use this method to do some initialization work, such as loading machine learning models, connecting to external services, etc.
- In addition, you can also register and use metrics through the function_context parameter in the open method.
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create(environment_settings=env_settings) table = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a','b']) table.select(sub_string(table.a, 1, 3)) Copy code
Python UDTF, or Python TableFunction, for each piece of input data, Python UDTF can generate zero, one, or multiple pieces of output data. In addition, one piece of output data can contain multiple columns. For example, the following example defines a Python UDF named split, and uses the specified string as the separator to split the input string into two strings:
from pyflink.table.udf import udtf from pyflink.table import DataTypes @udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()]) def split(s: str, sep: str): splits = s.split(sep) yield splits, splits Copy code
- Need to declare that this is a table function through a decorator named "udtf";
- You need to declare the result type of the table function through the result_types parameter in the decorator. Since each output of the table function can contain multiple columns, result_types needs to specify the types of all output columns;
- The definition of Python UDTF also supports multiple definition methods listed in the Python UDF chapter, and only one of them is shown here.
After defining the Python UDTF, you can directly use it in the Python Table API:
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create(environment_settings=env_settings) table = t_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a','b']) table.join_lateral(split(table.a,'|').alias("c1, c2")) table.left_outer_join_lateral(split(table.a,'|').alias("c1, c2")) Copy code
Python UDAF, namely Python AggregateFunction. Python UDAF is used to perform aggregation operations on a set of data, such as multiple pieces of data under the same window, or multiple pieces of data under the same key. For the same set of input data, Python AggregateFunction produces a piece of output data. For example, the following example defines a Python UDAF named weighted_avg:
from pyflink.common import Row from pyflink.table import AggregateFunction, DataTypes from pyflink.table.udf import udaf class WeightedAvg(AggregateFunction): def create_accumulator(self): # Row(sum, count) return Row(0, 0) def get_value(self, accumulator: Row) -> float: if accumulator == 0: return 0 else: return accumulator/accumulator def accumulate(self, accumulator: Row, value, weight): accumulator += value * weight accumulator += weight def retract(self, accumulator: Row, value, weight): accumulator -= value * weight accumulator -= weight weighted_avg = udaf(f=WeightedAvg(), result_type=DataTypes.DOUBLE(), accumulator_type=DataTypes.ROW([ DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())])) Copy code
- Need to declare that this is an aggregate function through a decorator named "udaf",
- You need to declare the result type and accumulator type of the aggregate function through the result_type and accumulator_type parameters in the decorator respectively;
- The three methods of create_accumulator, get_value and accumulate must be defined. The retract method can be defined as required. For details, please refer to the official Flink document ; it should be noted that since the three methods of create_accumulator, get_value and accumulate must be defined, Python UDAF can only be defined by inheriting AggregateFunction (Pandas UDAF does not have this limitation).
After defining the Python UDAF, you can use it in the Python Table API like this:
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t = t_env.from_elements([(1, 2, "Lee"), (3, 4, "Jay"), (5, 6, "Jay"), (7, 8, "Lee")], ["value", "count", "name"]) t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg")) Copy code
Python UDTAF, namely Python TableAggregateFunction. Python UDTAF is used to perform aggregation operations on a set of data, such as multiple pieces of data under the same window, or multiple pieces of data under the same key. Unlike Python UDAF, for the same set of input data, Python UDTAF can generate 0, 1, or even multiple output data.
The following example defines a Python UDTAF named Top2:
from pyflink.common import Row from pyflink.table import DataTypes from pyflink.table.udf import udtaf, TableAggregateFunction class Top2(TableAggregateFunction): def create_accumulator(self): # Store the current two largest values return [None, None] def accumulate(self, accumulator, input_row): if input_row is not None: # The new input value is the largest if accumulator is None or input_row> accumulator: accumulator = accumulator accumulator = input_row # The new input value is the second largest elif accumulator is None or input_row> accumulator: accumulator = input_row def emit_value(self, accumulator): yield Row(accumulator) if accumulator is not None: yield Row(accumulator) top2 = udtaf(f=Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT())) Copy code
- Python UDTAF function is a new function supported after Flink 1.13;
- The three methods create_accumulator, accumulate and emit_value must be defined. In addition, TableAggregateFunction supports retract, merge and other methods. You can choose whether to define them according to your needs. For details, please refer to Flink official documentation .
After defining the Python UDTAF, you can use it in the Python Table API like this:
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t = t_env.from_elements([(1,'Hi','Hello'), (3,'Hi','hi'), (5,'Hi2','hi'), (2,'Hi','Hello'), (7,'Hi','Hello')], ['a','b','c']) t_env.execute_sql(""" CREATE TABLE my_sink ( word VARCHAR, `sum` BIGINT ) WITH ( 'connector' ='print' ) """) result = t.group_by(tb).flat_aggregate(top2).select("b, a").execute_insert("my_sink") # 1) Wait for the completion of the job execution, for local execution, otherwise the job may not be completed yet, and the script has exited, which will cause the minicluster to exit prematurely # 2) When the job is submitted to the remote cluster through the detach mode, such as YARN/Standalone/K8s, etc., this method needs to be removed result.wait() Copy code
When the above program is executed, you can see output similar to the following:
11> +I[Hi, 7] 10> +I[Hi2, 5] 11> +I[Hi, 3] Copy code
- Python UDTAF can only be used in Table API, not in SQL statements;
- The result of flat_aggregate contains the original grouping column and the output of UDTAF (top 2), so the column "b" can be accessed in select.
Advanced Python custom functions
Use Python custom functions in pure SQL jobs
The CREATE FUNCTION statement in Flink SQL supports the registration of Python custom functions, so users can use Python custom functions in pure SQL jobs in addition to using Python custom functions in PyFlink Table API jobs.
CREATE TEMPORARY FUNCTION sub_string AS'test_udf.sub_string' LANGUAGE PYTHON CREATE TABLE source ( a VARCHAR ) WITH ( 'connector' ='datagen' ); CREATE TABLE sink ( a VARCHAR ) WITH ( 'connector' ='print' ); INSERT INTO sink SELECT sub_string(a, 1, 3) FROM source; Copy code
Use Python custom functions in Java jobs
Users can register Python custom functions through DDL, which means that users can also use Python custom functions in Java Table API jobs, such as:
TableEnvironment tEnv = TableEnvironment.create( EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); tEnv.executeSql("CREATE TEMPORARY FUNCTION sub_string AS'test_udf.sub_string' LANGUAGE PYTHON"); tEnv.createTemporaryView("source", tEnv.fromValues("hello", "world", "flink").as("a")); tEnv.executeSql("SELECT sub_string(a) FROM source").collect(); Copy code
Detailed examples can be found in PyFlink Playground .
An important use of this feature is to mix Java operators with Python operators. Users can use the Java language to develop most of the job logic. When some parts of the job logic must be written in the Python language, you can use the above method to call a custom function written in the Python language.
If it is a DataStream job, you can first convert the DataStream to a Table, and then use the above method to call a custom function written in Python.
Accessing third-party Python libraries in Python custom functions is a very common requirement. In addition, in machine learning prediction scenarios, users may also need to load a machine learning model in Python custom functions. When we execute PyFlink jobs in local mode, we can install third-party Python libraries in the local Python environment, or download machine learning models locally; however, when we submit PyFlink jobs to remote execution, this may also occur some problems:
- How third-party Python libraries are accessed by Python custom functions. Different jobs have different requirements for the version of the Python library. Pre-installing third-party Python libraries into the cluster's Python environment is only suitable for installing some public dependencies, and cannot solve the individualized needs of Python dependencies for different jobs;
- How the machine learning model or data file is distributed to the cluster nodes and finally accessed by the Python custom function.
In addition, dependencies may also include JAR packages, etc. PyFlink provides a variety of solutions for various dependencies:
|Dependent type||solution||Use description||Example (flink run)|
|flink run parameters||Configuration item||API|
|Job entry file||-py/--python||no||no||The entry file of the specified job, it can only be a .py file||-py file:///path/to/table_api_demo.py|
|Entry module||-pym/--pyModule||no||no||The entry module of the specified job, the function is similar to --python, can be used when the Python file of the job is a zip package, etc., when it cannot be specified by --python, it is more general than --python||-pym table_api_demo-pyfs file:///path/to/table_api_demo.py|
|Python third-party library files||-pyfs/--pyFiles||python.files||add_python_file||Specify one or more Python files (.py/.zip/.whl, etc., separated by commas), these Python files will be placed in the PYTHONPATH of the Python process when the job is executed, and can be directly accessed in the Python custom function||-pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip|
|Archive file||-pyarch/--pyArchives||python.archives||add_python_archive||Specify one or more archive files (separated by commas). These archive files will be decompressed when the job is executed and placed in the working directory of the Python process, which can be accessed through relative paths||-pyarchfile:///path/to/venv.zip|
|Python interpreter path||-pyexec/--pyExecutable||python.executable||set_python_executable||Specify the path of the Python interpreter used when the job is executed||-pyarchfile:///path/to/venv.zip-pyexec venv.zip/venv/bin/python3|
|requirements file||-pyreq/--pyRequirements||python.requirements||set_python_requirements||Specify the requirements file. The Python third-party library dependencies of the job are defined in the requirements file. When the job is executed, the relevant dependencies will be installed through pip according to the content of the requirements||-pyreq requirements.txt|
|JAR package||no||pipeline.classpaths, pipeline.jars||There is no special API, it can be set through the set_string method of configuration||Specify the JAR package that the job depends on, usually used to specify the connector JAR package||no|
- It should be noted that the file where the implementation of Python UDF is located also needs to be uploaded as a dependent file when the job is executed;
- You can use the "archive file" and "Python interpreter path" together to specify that the job is executed using the uploaded Python virtual environment, such as:
table_env.add_python_archive("/path/to/py_env.zip") # Specify to use the python included in the py_env.zip package to execute user-defined functions, which must be specified by a relative path table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python") Copy code
- It is recommended that users use conda to build a Python virtual environment. The Python virtual environment built by conda contains most of the underlying libraries needed to execute Python, which can greatly avoid when the local environment is different from the cluster environment. When the cluster is executed, there is a lack of various underlying dependent libraries. For how to use conda to build a Python virtual environment, you can refer to the introduction in the "Using Python Three-Party Package" chapter in the Alibaba Cloud VVP document 
- Some Python third-party libraries need to be installed before they can be used, that is, they are not "downloaded and can be directly referenced in PYTHONPATH". For this type of Python third-party library, there are two solutions:
- Install it in the Python virtual environment, and specify the built Python virtual environment to run the job;
- Find a machine (or docker) that is the same as the cluster environment, install the required Python third-party library, and then package the installation files. Compared with the Python virtual environment, this method has relatively small packaging files. For details, please refer to the introduction in the chapter "Using a Custom Python Virtual Environment" in the Alibaba Cloud VVP document .
PyFlink supports users to debug Python custom functions through remote debugging. For specific methods, please refer to the introduction in the "Remote Debugging" chapter in the article "How to develop PyFlink API jobs from 0 to 1" .
In addition, users can also print logs in Python custom functions through logging. It should be noted that the log output needs to be viewed in the log file of the TaskManager, not the current console. For specific usage, please refer to the introduction of "Custom Log" in "How to Develop PyFlink API Jobs from 0 to 1" . It should be noted that when running a job in local mode, the TM log is located in the PyFlink installation directory, for example:
>>> import pyflink
The performance of Python custom functions largely depends on the implementation of Python custom functions. If you encounter performance problems, you first need to find ways to optimize the implementation of Python custom functions as much as possible.
In addition, the performance of Python custom functions is also affected by the values of the following parameters.
|python.fn-execution.bundle.size||The execution of Python custom functions is asynchronous. During the execution of the job, the Java operator asynchronously sends data to the Python process for processing. Before the Java operator sends the data to the Python process, it will cache the data first, and then send it to the Python process after reaching a certain threshold. The python.fn-execution.bundle.size parameter can be used to control the maximum number of data that can be cached, and the default value is 100000.|
|python.fn-execution.bundle.time||Used to control the maximum buffering time of data. When the number of cached data items reaches the threshold defined by python.fn-execution.bundle.size or the cache time reaches the threshold defined by python.fn-execution.bundle.time, the calculation of the cached data will be triggered. The default value is 1000 and the unit is milliseconds.|
|python.fn-execution.arrow.batch.size||Used to control the maximum number of data that can be accommodated in an arrow batch when Pandas UDF is used. The default value is 10000. Note that the value of the python.fn-execution.arrow.batch.size parameter cannot be greater than the value of the python.fn-execution.bundle.size parameter.|
- During checkpoint, the calculation of cached data will be triggered. Therefore, when the value of the above parameter configuration is too large, it may cause too much data to be processed during checkpoint, resulting in too long checkpoint time, and even checkpoint failure. When encountering the problem that the checkpoint time of the job is relatively long, you can try to reduce the value of the above parameters.
1) The actual return value type of the Python custom function is inconsistent with the type declared in result_type. This problem will cause the Java operator to report an error when deserializing the execution result of the Python custom function. The error stack is similar:
Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261] at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0] Copy code
2) An object that cannot be serialized by cloudpickle is instantiated in the init method of the Python custom function .
When submitting a job, PyFlink will serialize Python custom functions through cloudpickle. If the Python custom function contains objects that cannot be serialized by cloudpickle, you will encounter a similar error: TypeError: can't pickle xxx, you can use this Variables are initialized in the open method.
3) Load a very large data file in the init method of the Python custom function .
When submitting the job, PyFlink will serialize the Python custom function through cloudpickle. If a very large data file is loaded in the init method, the entire data file will be serialized and used as part of the Python custom function implementation. The data file is very large, which may cause the job execution to fail. The operation of the load data file can be executed in the open method.
4) The client-side Python environment is inconsistent with the cluster-side Python environment, such as inconsistent Python versions, inconsistent PyFlink versions (large versions need to be consistent, for example, both are 1.12.x), etc.
In this article, we mainly introduce the definition and usage of various Python custom functions, as well as information on Python dependency management, Python custom function debugging and tuning, etc., hoping to help users understand Python custom functions. Next, we will continue to launch the PyFlink series of articles to help PyFlink users understand various functions, application scenarios, best practices, etc. in PyFlink.
In addition, the Alibaba Cloud real-time computing ecological team recruits outstanding big data talents (including internship + social recruitment) for a long time. Our work includes:
Real-time machine learning: Support real-time feature engineering and AI engine in machine learning scenarios, build real-time machine learning standards based on Apache Flink and its ecology, and promote the full real-timeization of scenarios such as search, recommendation, advertising, and risk control;
Big data + AI integration: including programming language integration (PyFlink related work), execution engine integration (TF on Flink), workflow and management integration (Flink AI Flow).
If you are interested in open source, big data or AI, please send your resume to: email@example.com