4

We have a pipeline reading data from BigQuery and processing historical data for various calendar years. It fails with OutOfMemoryError errors if the input data is small (~500MB)

On startup it reads from BigQuery about 10.000 elements/sec, after short time it slows down to hundreds elements/s then it hangs completely.

Observing 'Elements Added' on the next processing step (BQImportAndCompute), the value increases and then decreases again. That looks to me like some already loaded data is dropped and then loaded again.

Stackdriver Logging console contains errors with various stack traces that contain java.lang.OutOfMemoryError, for example:

Error reporting workitem progress update to Dataflow service:

"java.lang.OutOfMemoryError: Java heap space
    at com.google.cloud.dataflow.sdk.runners.worker.BigQueryAvroReader$BigQueryAvroFileIterator.getProgress(BigQueryAvroReader.java:145)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$SynchronizedReaderIterator.setProgressFromIteratorConcurrent(ReadOperation.java:397)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$SynchronizedReaderIterator.setProgressFromIterator(ReadOperation.java:389)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$1.run(ReadOperation.java:206)

I would suspect that there is a problem with topology of the pipe, but running the same pipeline

  1. locally with DirectPipelineRunner works fine
  2. in cloud with DataflowPipelineRunner on large dataset (5GB, for another year) works fine

I assume problem is how Dataflow parallelizes and distributes work in the pipeline. Are there any possibilities to inspect or influence it?

4
  • Any details on what BQImportAndCompute is doing? Also, BQConcat looks like a Flatten, is that correct? Is there a specific job ID that we should look at to see what was going on? Commented Apr 22, 2016 at 17:01
  • BQImportAndCompute is reading TableRow and converting to another POJO. Yes, BQConcat is Flatten. For example jobID 2016-04-22_07_09_50-6916524826145032691. In fact, all the manually cancelled jobs then did not complete. evernote.com/l/ADtUXFwm92ROr5sTXqEUVrcfhUwQfrHctFM Commented Apr 22, 2016 at 21:22
  • Also, I've found out later, that after throwing OutOfMemory, JVM did not exit. I've connected to worker and tried to kill JVM process. Then more data was procesed until next OOM. After few repeats then pipe failed because of too many retries. Commented Apr 22, 2016 at 21:36
  • and for comparison, the same pipeline (java code) Job 2016-04-22_05_01_130-3684926617487173331 input size 4.07 GB - completed. Jobs 2016-04-21_08_51_03-9649478815118530360, 2016-04-21_10_04_12-4127814452160753674 input size 339MB - failed. Commented Apr 22, 2016 at 21:51

2 Answers 2

1

The problem here doesn't seem to be related to the size of the BigQuery table, but likely the number of BigQuery sources being used and the rest of the pipeline.

  1. Instead of reading from multiple BigQuery sources and flattening them have you tried reading from a query that pulls in all the information? Doing that in a single step should simplify the pipeline and also allow BigQuery to execute better (one query against multiple tables vs. multiple queries against individual tables).

  2. Another possible problem is if there is a high degree of fan-out within or after the BQImportAndCompute operation. Depending on the computation being done there, you may be able to reduce the fan-out using clever CombineFns or WindowFns. If you want help figuring out how to improve that path, please share more details about what is happening after the BQImportAndCompute.

1
  • I put the source code of pipeline on github dash-pipeline I'm going to try reducing fan-out degree. Commented May 9, 2016 at 9:21
0

Have you tried debugging with Stackdriver?

https://cloud.google.com/blog/big-data/2016/04/debugging-data-transformations-using-cloud-dataflow-and-stackdriver-debugger

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.