This project provides examples of Apache Beam multi-language pipelines:
- python/wordcount_external - A Python pipeline that runs the Word Count workflow using three external Java
SchemaTransforms. This example demonstrates the updated
ExternalTransformProvider
API. - python/addprefix - A Python pipeline that reads a text file and attaches a prefix on the Java side to each input.
- python/javacount - A Python pipeline that counts words using the Java
Count.perElement()
transform. - python/javadatagenerator - A Python pipeline that produces a set of strings generated from Java.
This example demonstrates the
JavaExternalTransform
API.
- Download the latest 'beam-examples-multi-language' JAR. Starting with Apache Beam 2.36.0, you can find it in the Maven Central Repository.
- Run the following command, replacing
<version>
and<port>
with valid values:java -jar beam-examples-multi-language-<version>.jar <port> --javaClassLookupAllowlistFile='*'
- See the Python quickstart for more information.
-
In a new shell, run a pipeline in the python directory using a Beam runner that supports multi-language pipelines.
The Python files contain details about the actual commands to run.
Performs image classification on handwritten digits from the MNIST database.
Please see here for context and information regarding the corresponding Python pipeline.
Please note that the Java pipeline is available in the Beam Java examples module.
-
Obtain/generate a csv input file that contains labels and pixels to feed into the model and store it in GCS. An example input is available here.
-
Create a model file that contains the pickled file of a scikit-learn model trained on MNIST data and store it in GCS. An example model file is available here. This model was generated by by running the program given here on the example input dataset.
-
Perform Beam runner specific setup according to instructions here.
Following instructions are for running the pipeline with the Dataflow runner. For other portable runners, please modify the instructions according to the guidelines here
- Checkout the Beam examples Maven archetype for the relevant Beam version.
export BEAM_VERSION=<Beam version>
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=$BEAM_VERSION \
-DgroupId=org.example \
-DartifactId=multi-language-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
- Run the pipeline.
export GCP_PROJECT=<GCP project>
export GCP_BUCKET=<GCP bucket>
export GCP_REGION=<GCP region>
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.multilanguage.SklearnMnistClassification \
-Dexec.args="--runner=DataflowRunner --project=$GCP_PROJECT \
--region=$GCP_REGION \
--gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
--output=gs://$GCP_BUCKET/multi-language-beam/output" \
-Pdataflow-runner
- Inspect the output. Each line has data separated by a comma ",". The first item is the actual label of the digit. The second item is the predicted label of the digit.
gsutil cat gs://$GCP_BUCKET/multi-language-beam/output*
-
Activate a new virtual environment following these instructions.
-
- Install Apache Beam package with gcp support and the
sklearn
package.
- Install Apache Beam package with gcp support and the
pip install apache-beam[gcp]
pip install sklearn
- Startup the expansion service
python -m apache_beam.runners.portability.expansion_service_main -p <PORT> --fully_qualified_name_glob "*"
-
Make sure that Docker is installed and available on your system.
-
In a different shell, build and push Python and Java Docker containers.
export DOCKER_ROOT=<Docker root>
./gradlew :sdks:python:container:py39:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
docker push $DOCKER_ROOT/beam_python3.9_sdk:latest
./gradlew :sdks:java:container:java11:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest -Pjava11Home=$JAVA_HOME
docker push $DOCKER_ROOT/beam_java11_sdk:latest
- Run the pipeline using the following Gradle command (this guide assumes Dataflow runner). Note that we override both the Java and Python SDK harness containers here.
export GCP_PROJECT=<GCP project>
export GCP_BUCKET=<GCP bucket>
export GCP_REGION=<GCP region>
export EXPANSION_SERVICE_PORT=<PORT>
# This removes any existing output.
gsutil rm gs://$GCP_BUCKET/multi-language-beam/output*
./gradlew :examples:multi-language:sklearnMinstClassification --args=" \
--runner=DataflowRunner \
--project=$GCP_PROJECT \
--gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
--output=gs://$GCP_BUCKET/multi-language-beam/output \
--sdkContainerImage=$DOCKER_ROOT/beam_java11_sdk:latest \
--sdkHarnessContainerImageOverrides=.*python.*,$DOCKER_ROOT/beam_python3.9_sdk:latest \
--expansionService=localhost:$EXPANSION_SERVICE_PORT \
--region=${GCP_REGION}"
- Inspect the output. Each line has data separated by a comma ",". The first item is the actual label of the digit. The second item is the predicted label of the digit.
gsutil cat gs://$GCP_BUCKET/multi-language-beam/output*
This example is covered in the Java multi-language pipelines quickstart. The pipeline source code is available at PythonDataframeWordCount.java.