Powered By GitBook
Learn to use the Dataflow service by running a word count example in Java.
Take the interactive version of this tutorial, which runs in the Cloud Console:


In this tutorial, you'll learn the basics of the Dataflow service by running a simple example pipeline using Java.
Dataflow pipelines are either batch (processing bounded input like a file or database table) or streaming (processing unbounded input from a source like Pub/Sub). The example in this tutorial is a batch pipeline that counts words in a collection of Shakespeare's works.
Before you start, you'll need to check for prerequisites in your Google Cloud project and perform initial setup.

Project setup

Google Cloud organizes resources into projects. This allows you to collect all of the related resources for a single application in one place.
Begin by creating a new project or selecting an existing project for this tutorial.
For details, see Creating a project.

Set up Dataflow

To use Dataflow, enable the Dataflow APIs and open Cloud Shell.

Enable Cloud APIs

Dataflow processes data in many Google Cloud data stores and messaging services, including BigQuery, Cloud Storage, and Pub/Sub. To use these services, you must first enable their APIs.
Use the following to enable the APIs:

Open Cloud Shell

In this tutorial, you do much of your work in Cloud Shell, which is a built-in command-line tool for the Cloud Console.
Open Cloud Shell by clicking the Activate Cloud Shell button in the navigation bar in the upper-right corner of the console.

Install Dataflow samples

Dataflow runs jobs written using the Apache Beam SDK. To submit jobs to the Dataflow service using Java, your development environment requires Java, the Cloud SDK, the Apache Beam SDK for Java, and Apache Maven for managing SDK dependencies. This tutorial uses a Cloud Shell environment that has Java, the Cloud SDK, and Maven installed.
Alternatively, you can do this tutorial on your local machine.

Download the samples and the Apache Beam SDK for Java using the Maven command

To write a Dataflow job with Java, you first need to download the SDK from the Maven repository.
When you run this command in Cloud Shell, Maven creates a project structure and config file for downloading the appropriate version of the Apache Beam SDK:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DgroupId=com.example \
-DartifactId=dataflow-intro \
-Dversion="0.1" \
-DinteractiveMode=false \
    archetypeArtifactId and archetypeGroupId are used to define the example
    project structure.
    groupId is your organization's Java package name prefix; for example,
    artifactId sets the name of the created .jar file. Use the default value
    (dataflow-intro) for this tutorial.

Change directory

Change your working directory to dataflow-intro.
cd dataflow-intro
If you'd like to see the code for this example, you can find it in the src subdirectory in the dataflow-intro directory.

Set up a Cloud Storage bucket

Dataflow uses Cloud Storage buckets to store output data and cache your pipeline code.

Run gsutil mb

In Cloud Shell, use the command gsutil mb to create a Cloud Storage bucket.
gsutil mb gs://[PROJECT_ID]
[PROJECT_ID] is your Google Cloud project ID.
For more information about the gsutil tool, see the documentation.

Create and launch a pipeline

In Dataflow, data processing work is represented by a pipeline. A pipeline reads input data, performs transformations on that data, and then produces output data. A pipeline's transformations might include filtering, grouping, comparing, or joining data.
If you'd like to see the code for this example, you can find it in the src subdirectory in the dataflow-intro directory.

Launch your pipeline on the Dataflow service

Use Apache Maven's mvn exec command to launch your pipeline on the service. The running pipeline is referred to as a job.
mvn compile exec:java \
-Dexec.mainClass=com.example.WordCount \
-Dexec.args="--project=[PROJECT_ID] \
--gcpTempLocation=gs://[PROJECT_ID]/tmp/ \
--output=gs://[PROJECT_ID]/output \
--runner=DataflowRunner \
--jobName=dataflow-intro" \
    [PROJECT_ID] is your Google Cloud project ID.
    gcpTempLocation is the storage bucket that Dataflow will use for the
    binaries and other data for running your pipeline. This location can be
    shared across multiple jobs.
    output is the bucket used by the WordCount example to store the job

Your job is running

Congratulations! Your binary is now staged to the storage bucket that you created earlier, and Compute Engine instances are being created. Cloud Dataflow will split up your input file such that your data can be processed by multiple machines in parallel.
If you want to clean up the Maven project you generated, run this command in Cloud Shell to delete the directory:
cd .. && rm -R dataflow-intro

Monitor your job

In this section, you check the progress of your pipeline on the Dataflow page in the Cloud Console.

Go to the Dataflow page

Open the Navigation menu in the upper-left corner of the console, and then select Dataflow.

Select your job

Click the job name dataflow-intro to view the job details.

Explore pipeline details and metrics

Explore the pipeline on the left and the job information on the right. To see detailed job status, click Logs at the top of the page.
Click a step in the pipeline to view its metrics.
As your job finishes, you'll see the job status change, and the Compute Engine instances used by the job will stop automatically.
Note: When you see the message in Cloud Shell that the job is finished, you can close Cloud Shell.

View your output

Now that your job has run, you can explore the output files in Cloud Storage.

Go to the Cloud Storage page

Open the Navigation menu in the upper-left corner of the console, select Storage, and then click Browser.

Go to the storage bucket

In the list of buckets, select the bucket that you created earlier.
Dataflow saves the output in shards, so your bucket will contain several output files.
The tmp folder is for staging binaries needed by the workers and for temporary files needed by the job execution.

Clean up

To prevent being charged for Cloud Storage usage, delete the bucket you created.
    Click the Buckets link to go back to the bucket browser.
    Check the box next to the bucket that you created.
    Click the Delete button at the top of the Cloud Console, and confirm the deletion.


Here's what you can do next:
Set up your local environment:
Last modified 7mo ago