Pipelines as UDFs

Hussain Sultan

June 9, 2025

TL;DR

Often, real-world data workflows don't fit neatly into SQL or standard relational operations. You need custom transformations, complex aggregations, multi-step logic, and arbitrary processing—but current tools force you to fragment these workflows across teams and systems. Team A builds processed tables, Team B builds models, Team C runs inference—each step opaque to the others, making end-to-end reasoning difficult.

In this article, we'll present a solution that simplifies data pipeline use cases without building pipelines at all. Instead, we use the open source Xorq framework to define a composite data engine leveraging four different UDF expression types that let you model any real-world data pattern as a composable expression: row-wise transformations (UDF), aggregations (UDAF), and new user-defined exchange functions (UDXFs), which can input or output any shape schema, and subsume any UDF type to compose workflows that perform arbitrary logic on your data. Instead of breaking workflows apart, you can express complete business logic as single, coherent expression.

Introduction

The Xorq framework (https://github.com/xorq-labs/xorq) is a Python framework for building composite data engines. Powered by Ibis, Apache DataFusion, and Apache Arrow, it provides a declarative syntax for defining custom query engines capable of applying any processing to any data, regardless of its source. The approach greatly simplifies most data science and AI data engineering use cases.

The Xorq framework supports four UDF types that can be used to model any real-world data workflow as pure expressions. Whether you're doing custom transformations, complex aggregations, or chaining multi-step business logic, these UDF primitives let you express complete workflows without fragmenting across tools or teams. Every expression carries an explicit schema—the contract that lets the optimizer push work to the best engine and makes the entire workflow transparent to any team member.

If you caught Sev Leonard's PyCon US talk *"https://www.youtube.com/watch?v=fCkEo-1MG1A,"* you already know the punchline: well‑defined schemas turn messy data plumbing into a composable, predictable system. Xorq bakes that principle in from the first line of code.

Our Dataset: Lending Club Loans

Let's illustrate the power of composable data processing by walking through an ML training and inference scenario that leverages multiple UDF types, including a new type, the User-Defined Exchange Function (UDFX). Throughout the example, we'll work with a lending club dataset containing loan applications with features like:

import pandas as pd
import xorq as xo
import xorq.expr.datatypes as dt

loans = xo.deferred_read_parquet(
  xo.connect(),
  xo.config.options.pins.get_path("lending-club"),`
)


Let's see how different UDF types help us analyze and model this data.

Scalar User-Defined Functions (UDF)

Scalar UDFs are the simplest type of user-defined function—they operate on individual rows, taking one or more column values as input and returning a single value. Think of them as the equivalent of applying a Python function to each row of a DataFrame.

When to use Scalar UDFs:

  • Row-wise transformations and calculations
  • Data cleaning and validation
  • Feature engineering that doesn't require aggregation
  • Custom business logic that operates on individual records

In this example, we use the Xorq library and it’s declarative data processing syntax to define a UDF expression that calculated risk scores:

from xorq.expr.udf import make_pandas_udf


def calculate_risk_score(df: pd.DataFrame) -> pd.DataFrame:
    """Calculate a numerical risk score."""
    base_score = 100.0
    
    # Start with base score for all rows
    scores = pd.Series([base_score] * len(df), index=df.index)
    
    # Apply income factors
    scores.loc[df['annual_inc'] < 30000] *= 1.8
    scores.loc[(df['annual_inc'] >= 30000) & (df['annual_inc'] < 50000)] *= 1.3
    
    # Apply credit score factors  
    scores.loc[df['fico_range_high'] < 650] *= 1.6
    scores.loc[(df['fico_range_high'] >= 650) & (df['fico_range_high'] < 700)] *= 1.2
    
    return scores
    
risk_score_udf = make_pandas_udf(
    calculate_risk_score,
    schema=xo.schema({"annual_inc": "float64", "fico_range_high": "int64"}),
    return_type=dt.float64,
    name="risk_score"
) 

enriched_loans = (
    loans
    .mutate(credit_score=risk_score_udf.on_expr)
)


User-Defined Aggregate Functions (UDAF) and Window Functions (UDWF)

UDAFs take multiple rows as input and produce a single output value, similar to SQL’s SUM or AVG functions, but with arbitrary Python logic e.g. group-by apply case. They maintain state across all rows in a group and can implement sophisticated algorithms while still fitting naturally into declarative query plans.

UDAFs are perfect for outputting reductions like training an ML model where a N-row x M-col group produces a 1 row x 1 col output.

When to use UDAFs:

  • Custom statistical calculations
  • ML training
  • Complex operations on groups

Here we build a UDAF expression that uses a UDAF to compute data on which to train an XGBoost model:

import numpy as np
import xgboost as xgb
import pickle
from xorq.expr.udf import agg
from xorq.common.utils.toolz_utils import curry

features = ("emp_length", "dti", "annual_inc", "loan_amnt", "fico_range_high", "cr_age_days")
target = "event_occurred"

@curry
def train_xgboost_model(df, features=features, target=target, seed=0):
    """Train an XGBoost model on loan data."""
        
    param = {"max_depth": 4, "eta": 0.1, "objective": "binary:logistic", "seed": seed}
    num_round = 50
    
    X = df[list(features)].fillna(0)
    y = df[target]
    
    dtrain = xgb.DMatrix(X, y)
    bst = xgb.train(param, dtrain, num_boost_round=num_round)
    return bst


model_udaf = udf.agg.pandas_df(
    fn=toolz.compose(pickle.dumps, train_xgboost_model),
    schema=t[features + (target,)].schema(),
    return_type=dt.binary,
    name="model",
)

model_expr = (
    enriched_loans
    .agg([
        model_udaf.on_expr(enriched_loans).name("model")
    ])
)

Note: Xorq also supports the User-Defined Window Function (UDWF), which works similarly to a UDAF, but keep state between windows or time (see example: https://github.com/xorq-labs/xorq/blob/main/examples/python_udwf.py).

Expr Scalar User Defined Functions (ExprScalarUDF)

Here's where things get really interesting. Expression Scalar UDFs (ExprScalarUDF) allow you to define UDFs that are derived from other expressions. This means you can declaratively express a UDF that depends on the result of a UDAF: a single expression can contain the train/test split, model training and inference.

This composability is what makes composite query engines created with Xorq truly powerful: instead of breaking your logic into separate steps with intermediate storage, you can express complex workflows as single, coherent expressions.

When to use ExprScalar UDFs:

  • Passing computed aggregates to row-wise functions
  • Building complex multi-stage transformations
  • Composing with reusable components that depend on computed values

Here's a practical example that applies our XGBoost trained model from above for predictions:

from xorq.expr.udf import make_pandas_expr_udf

@curry
def predict_xgboost_model(model, df, features=features):
    return model.predict(xgb.DMatrix(df[list(features)]))

# Load data and create train/test splits
t = xo.deferred_read_parquet(
    xo.connect(), xo.config.options.pins.get_path("lending-club")
)
(train, test) = xo.train_test_splits(
    t,
    unique_key="rownum",
    test_sizes=0.7,
    random_seed=42,
)

# Step 2: ExprScalar UDF that takes the trained model as a computed parameter
predict_expr_udf = make_pandas_expr_udf(
    computed_kwargs_expr=model_udaf.on_expr(train),  # This is the magic!
    fn=predict_xgboost_model,
    schema=t[features].schema(),
    return_type=dt.dtype("float32"),
    name="predicted",
)

# Step 3: Apply predictions to test data using the trained model
expr = test.mutate(predict_expr_udf.on_expr(test).name("predicted"))

# Execute the entire pipeline
result = expr.execute()

User Defined Exchange Functions (UDXF)

Finally, we propose a fourth UDF type, made possible with the Xorq framework: UDXFs can accept any shape schema as input and return any shape schema as output. Xorq leverages Apache Arrow Flight Server to communicate data with clients. UDXFs are a declarative escape hatch necessary to compose relational pipelines with real-world applications — the only things that we need to know are the schema (or the shape of the data) going in and coming out. Furthermore, UDXFs are special as they are executed in a light-weight Flight Server which does not have to be in-process. Hence, the flight server makes the UDXFs portable.

import pandas as pd
import xorq as xo
from random import Random

def do_risk_scenarios(df: pd.DataFrame):
    # this could be an API call or something else impure
    return pd.DataFrame(
        (
            {
                "loan_id": tpl.rownum, 
                "scenario_id": scenario_id, 
                "scenario_type": scenario_type,
                "adjusted_score": tpl.credit_score * (factor + Random(tpl.rownum + scenario_id).uniform(-0.1, 0.1))
            }
            for tpl in df.itertuples()
            for (scenario_id, scenario_type, factor) in [
                (0, "base", 1.0),
                (1, "stress", 1.5), 
                (2, "optimistic", 0.8)
            ]
        )
    )

schema_in = xo.schema({"rownum": "int64", "credit_score": "float64"})
schema_out = xo.schema({
    "loan_id": "int64", 
    "scenario_id": "int64", 
    "scenario_type": "string", 
    "adjusted_score": "float64"
})

risk_scenarios_udxf = xo.expr.relations.flight_udxf(
    process_df=do_risk_scenarios,
    maybe_schema_in=schema_in.to_pyarrow(),
    maybe_schema_out=schema_out.to_pyarrow(),
    name="risk_scenarios",
)

# Apply to our enriched loans data
risk_scenarios = (
    enriched_loans
    .select(["rownum", "credit_score"])
    .pipe(risk_scenarios_udxf)
)

risk_scenarios.execute()


Comparison with User-Defined Table Functions (UDTFs):

UDXFs are similar to UDTFs with stateful processing, where output could change based on the complete input of a batch not just row-wise. However, they are different in the sense that they also describe a Flight service and enable portability. In that sense, it is closer to External Functions.

Choosing the Right UDF Type

FeatureScalar UDFUDAFUDWFUDXF
Processing PatternRow by rowAggregationWindowArbitrary transformation
State ManagementStatelessAggregation stateWindow stateArbitrary state
Schema1 row x m cols in
1 row x 1 col out
n rows x m cols in
1 row x 1 col out
n rows x m cols in
1 row x 1 col out
(but once for each row in)
n rows x m cols in
y rows x z cols out
Execution ContextIn-processIn-processIn-processArrow Flight service
(potentially remote)
Best forInference / FunctionsML TrainingTime-series analysisCompute offloading, complex logic, arbitrary output shapes

Here's a quick decision tree to help you choose the right UDF type for your use case:

  1. Start with Scalar UDFs for simple row-wise transformations. They're the easiest to write, test, and reason about.
  2. Move to UDAFs when you need to aggregate data with custom logic that can't be expressed in standard SQL and ExprScalarUDFs when you need to simultaneously express training and inference.
  3. Use UDWFs for time-series analysis, running calculations, or any operation that needs context from neighboring rows.
  4. Write PyArrow UDFs when performance is critical and we can use pyarrow’s compute kernels.
  5. Reach for UDXFs as an escape hatch for:
    1. Complex Logic Encapsulation: When you have sophisticated algorithms that would be difficult to express in relational Intermediate Representation (IR) but need to be portable.
    2. Legacy Code Integration: When you need to wrap existing code without refactoring it to fit Xorq's patterns
    3. Offloading Compute:  You could easily call out other compute platforms either as part of a the Xorq expressions or arbitrarily in a UDF e.g. use Modal to run a function externally.

The key distinction is that UDXFs are generic functions that can implement the behavior of any other function type, with the added flexibility of arbitrary data transformations and potential execution outside Xorq.

Conclusion

  • Composability is king. Each function type slots naturally into a bigger declarative plan.
  • Stay declarative. Even complex ML workflows can live inside a single expression graph.
  • Keep the escape hatch handy. UDXFs let you wrap legacy code or off‑load compute without abandoning Xorq’s lineage & caching.

The lending club example shows how you can start with basic row-wise operations and progressively build more sophisticated analytics—all while maintaining the declarative nature that makes pipelines easy to reason about, test, and optimize.

Free Xorq Training

Spend 30 minutes with Xorq engineering to get on the fast path to better ML engineering.

Schedule Free Training