Skip to content

[exec] Fixed multiple event inputs#187

Open
gfalcone wants to merge 1 commit intospotify:developfrom
gfalcone:fix_multiple_event_inputs
Open

[exec] Fixed multiple event inputs#187
gfalcone wants to merge 1 commit intospotify:developfrom
gfalcone:fix_multiple_event_inputs

Conversation

@gfalcone
Copy link
Contributor

@gfalcone gfalcone commented Apr 3, 2021

Hello !

I have an error when I use multiple event inputs like this in my Klio job :

job_config:
  allow_non_klio_messages: True
  events:
    inputs:
      - type: pubsub
        topic: projects/my-project/topics/oplog-high-latency
        subscription: projects/my-project/subscriptions/frames-input
      - type: pubsub
        topic: projects/my-project/topics/oplog-high-latency
        subscription: projects/my-project/subscriptions/frames-input

Here is the error I get when I do a klio job run:

Traceback (most recent call last):
  File "/usr/local/bin/klioexec", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/klio_core/utils.py", line 238, in wrapper
    func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/klio_exec/cli.py", line 95, in run_pipeline
    klio_pipeline.run()
  File "/usr/local/lib/python3.6/site-packages/klio_exec/commands/run.py", line 556, in run
    self._setup_pipeline(pipeline)
  File "/usr/local/lib/python3.6/site-packages/klio_exec/commands/run.py", line 529, in _setup_pipeline
    pipeline
  File "/usr/local/lib/python3.6/site-packages/klio_exec/commands/run.py", line 488, in _generate_pcoll_per_input
    | "Merge multi-input pass-thrus" >> beam.Flatten()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py", line 1036, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py", line 553, in __ror__
    'as there are no deferred inputs.' % self.label)
ValueError: "Flatten" requires a pipeline to be specified as there are no deferred inputs.

I have successfully ran my jobs with this fix.

Thank you in advance for taking a look into it :)

Checklist for PR author(s)

  • Format the pull request title like [cli] Fixes bugs in 'klio job fake-cmd'.
  • Changes are covered by unit tests (no major decrease in code coverage %) and/or integration tests.
  • Document any relevant additions/changes in the appropriate spot in docs/src.
  • For any change that affects users, update the package's changelog in docs/src/reference/<package>/changelog.rst.
@gfalcone gfalcone force-pushed the fix_multiple_event_inputs branch 2 times, most recently from 7c2d898 to 25ff120 Compare April 3, 2021 11:51
@fallonchen
Copy link
Contributor

Hi @gfalcone ! Thanks for taking the time to make this PR, and sorry for the long delay in replying!

While the proposed change makes sense to me, I am having trouble reproducing this error - can you tell me how you are using the inputs in your run.py, as well as: version of klio, Python and Beam you are using?

@gfalcone
Copy link
Contributor Author

Hello @fallonchen no problem :).

It happened on Python 3.6 and Apache Beam 2.27 (which is not the version used at the moment by Klio if I am not wrong).

If I recall, I was following the example on this page : https://docs.klio.io/en/stable/userguide/pipeline/io.html#step-2-using-multiple-inputs

@gfalcone
Copy link
Contributor Author

Hello @fallonchen, do you have any news concerning this PR ? Thank you :)

@econchick
Copy link
Contributor

Hey @gfalcone - sorry for the silence!

I'm going to close then reopen this PR to re-trigger the GitHub actions. If this still passes, I'll go ahead and merge, and it should be available in either the 21.11.0 or 21.12.0 release (we may skip 21.11.0).

@econchick econchick closed this Nov 19, 2021
@econchick econchick reopened this Nov 19, 2021
@econchick
Copy link
Contributor

fixed a separate bug that had the build broken, so let's try this re-triggering workflows again 😅

@econchick econchick closed this Nov 19, 2021
@econchick econchick reopened this Nov 19, 2021
@econchick
Copy link
Contributor

uff, okay - hey @gfalcone could I ask you to rebase this PR off of latest develop branch and push? if that's too much, I can close this PR and create a fresh one with your change in it (I'd otherwise rebase & push to this PR myself but I don't have access to your fork). I'll close if I don't hear from you after a week or so.

@gfalcone gfalcone force-pushed the fix_multiple_event_inputs branch from 25ff120 to 0a6bebd Compare January 3, 2022 16:21
@gfalcone
Copy link
Contributor Author

gfalcone commented Jan 3, 2022

Hello @econchick , first of all sorry for the late reply :/. I rebased my branch with develop, it should be good now. Thank you for your help on this :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants