Apache Beam Runners
Overview
Runners execute Beam pipelines on distributed processing backends. Each runner translates the portable Beam model to its native execution engine.
Available Runners
| Runner | Location | Description |
|---|---|---|
| Direct | runners/direct-java/ | Local execution for testing |
| Prism | runners/prism/ | Portable local runner |
| Dataflow | runners/google-cloud-dataflow-java/ | Google Cloud Dataflow |
| Flink | runners/flink/ | Apache Flink |
| Spark | runners/spark/ | Apache Spark |
| Samza | runners/samza/ | Apache Samza |
| Jet | runners/jet/ | Hazelcast Jet |
| Twister2 | runners/twister2/ | Twister2 |
Direct Runner
For local development and testing.
Java
java
PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); Pipeline p = Pipeline.create(options);
Python
python
options = PipelineOptions() options.view_as(StandardOptions).runner = 'DirectRunner' p = beam.Pipeline(options=options)
Command Line
bash
--runner=DirectRunner
Dataflow Runner
Prerequisites
- •GCP project with Dataflow API enabled
- •Service account with Dataflow Admin role
- •GCS bucket for staging
Java Usage
java
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-project");
options.setRegion("us-central1");
options.setTempLocation("gs://my-bucket/temp");
Python Usage
python
options = PipelineOptions([
'--runner=DataflowRunner',
'--project=my-project',
'--region=us-central1',
'--temp_location=gs://my-bucket/temp'
])
Runner v2
bash
--experiments=use_runner_v2
Custom SDK Container
bash
--sdkContainerImage=gcr.io/project/beam_java11_sdk:custom
Flink Runner
Embedded Mode
java
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");
Cluster Mode
java
options.setFlinkMaster("host:port");
Portable Mode (Python)
python
options = PipelineOptions([
'--runner=FlinkRunner',
'--flink_master=host:port',
'--environment_type=LOOPBACK' # or DOCKER, EXTERNAL
])
Spark Runner
Java
java
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]"); # or spark://host:port
Python (Portable)
python
options = PipelineOptions([
'--runner=SparkRunner',
'--spark_master_url=local[*]'
])
Testing with Runners
ValidatesRunner Tests
Tests that validate runner correctness:
bash
# Direct Runner ./gradlew :runners:direct-java:validatesRunner # Flink Runner ./gradlew :runners:flink:1.18:validatesRunner # Spark Runner ./gradlew :runners:spark:3:validatesRunner # Dataflow Runner ./gradlew :runners:google-cloud-dataflow-java:validatesRunner
TestPipeline with Runners
java
@Rule public TestPipeline pipeline = TestPipeline.create(); // Set runner via system property -DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'
Portable Runners
Concept
- •SDK-independent execution via Fn API
- •SDK runs in container, communicates via gRPC
Environment Types
- •
DOCKER- SDK in Docker container - •
LOOPBACK- SDK in same process (testing) - •
EXTERNAL- SDK at specified address - •
PROCESS- SDK in subprocess
Job Server
Start Flink job server:
bash
./gradlew :runners:flink:1.18:job-server:runShadow
Start Spark job server:
bash
./gradlew :runners:spark:3:job-server:runShadow
Runner-Specific Options
Dataflow
| Option | Description |
|---|---|
--project | GCP project |
--region | GCP region |
--tempLocation | GCS temp location |
--stagingLocation | GCS staging |
--numWorkers | Initial workers |
--maxNumWorkers | Max workers |
--workerMachineType | VM type |
Flink
| Option | Description |
|---|---|
--flinkMaster | Flink master address |
--parallelism | Default parallelism |
--checkpointingInterval | Checkpoint interval |
Spark
| Option | Description |
|---|---|
--sparkMaster | Spark master URL |
--sparkConf | Additional Spark config |
Building Runner Artifacts
Dataflow Worker Jar
bash
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
Flink Job Server
bash
./gradlew :runners:flink:1.18:job-server:shadowJar
Spark Job Server
bash
./gradlew :runners:spark:3:job-server:shadowJar
Debugging
Direct Runner
- •Enable logging:
-Dorg.slf4j.simpleLogger.defaultLogLevel=debug - •Use
--targetParallelism=1for deterministic execution
Dataflow
- •Check Dataflow UI: console.cloud.google.com/dataflow
- •Use
--experiments=upload_graphfor graph debugging - •Worker logs in Cloud Logging
Portable Runners
- •Enable debug logging on job server
- •Check SDK harness logs in worker containers