Skip to content

introducing SubscriberBase and SubscriptionBase#157

Merged
lehecka merged 6 commits into
rsocket:masterfrom
lehecka:SubscriberBase
Oct 26, 2016
Merged

introducing SubscriberBase and SubscriptionBase#157
lehecka merged 6 commits into
rsocket:masterfrom
lehecka:SubscriberBase

Conversation

@lehecka

@lehecka lehecka commented Oct 26, 2016

Copy link
Copy Markdown
Contributor

SubscriberBase and SubscriptionBase are new base classes designed for users implementing their own Subscriber/Subscription classes. The base classes allow to specify Executor in the constructor.

  • All calls to onSubscribe/onNext/onComplete/onError and request/cancel are synchronized via provided or default executor. The Executor makes sure there will be only one of the call on the stack. This is keeping our current design which is very useful for correctness and safety.
  • If users provides an EventBase as an Executor, all above calls will be automatically marshaled onto the provided EventBase. (please remove all runInEventBaseThread wrappers from your code and create Subscribers with the appropriate Executor!)

Please see how the new classes are used in the test and tcp duplex connection

@jspahrsummers jspahrsummers left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good in general—thanks for cleaning this up!

I think the various classes could use more documentation about their motivation and intended usage, though. Right now, they seem to take for granted that you will want them… but the docs should answer, "Why?" 😃

: public std::enable_shared_from_this<EnableSharedFromThisVirtualBase> {};

template <typename T>
class EnableSharedFromThisBase

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this class necessary?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want user to be able to implement a class which inherits both from SubscriberBase and SubscriptionBase. The class needs to inherit std::enable_shared_from_this only once in order to work properly. Unfortunately std::enable_shared_from_this can't be inherited virtually because of some implementation details of that class.
EnableSharedFromThis is essentially a workaround which will enable you to inherit std::enable_shared_from_this only once in your class hierarchy.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please document that?

Comment thread src/Executor.cpp

namespace reactivesocket {

// just instantiating of the template here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want the template to be instantiated in every compilation unit. It is unnecessary and just adds more work for compiler and linker. That's why I declared the template to be extern. Now Executor.cpp is the only place which will instantiate the template (it could be any cpp file) so the code is generated only once.

Comment thread src/Executor.cpp

void ExecutorBase::start() {
if (pendingSignals_) {
auto movedSignals = folly::makeMoveWrapper(std::move(pendingSignals_));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we didn't need MoveWrapper anymore?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not super clear about this. I don't think Yuri make the call yet as to whether we can use c++14 on all platforms and so we can move this code to c++14 yet.

bool startExecutor = true);

/// We start in a queueing mode, where it merely queues signal
/// deliveries until ::start is invoked.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to queue initially?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want this base class to be used for the implementation reactive socket automata where we need the functionality. We want to make some calls to the object right after the construction which can eventually call users callback, but we don't want to process user signals until we say start (the instance is fully initialized and ready).
I should write more about this in the documentation.
Good thing is that adds no overhead if not used.

template <typename F>
void runInExecutor(F&& func) {
if (pendingSignals_) {
pendingSignals_->emplace_back(func);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both uses of func in here should be std::forward<F>(func).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.


private:
using PendingSignals = std::vector<std::function<void()>>;
std::unique_ptr<PendingSignals> pendingSignals_;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this allocated on the heap? The vector can be moved into the executor lambda regardless.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because we are differentiating a case when pendingSignals_ is null or pendingSignals_ is empty. Please, see method start().
If we wanted to save the allocation, we would add a boolean value signaling whether we are still just accumulating user signals or executing them right away.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need to differentiate those cases. Enqueuing an empty work item on the Executor isn't a big enough deal to complicate this code, IMO.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to leave this comment unanswered...
We do need the differentiation to know whether the incoming signal should be queued or it should be processed.
If pendingSignals_ is null, we know to process the signal. if it is not null, we queue it.
If pendingSignals_.empty() == true, it means its the first signal to queue.

Comment thread src/SubscriberBase.h
class SubscriberBaseT : public Subscriber<T>,
public EnableSharedFromThisBase<SubscriberBaseT<T>>,
public virtual ExecutorBase {
virtual void onSubscribeImpl(std::shared_ptr<Subscription> subscription) = 0;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is default visibility protected? I didn't realize that—maybe worth adding it explicitly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default visibility in class is always private.

Comment thread src/SubscriberBase.h
void onSubscribe(std::shared_ptr<Subscription> subscription) override final {
runInExecutor(std::bind(
&SubscriberBaseT::onSubscribeImpl,
this->shared_from_this(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this-> change the semantics?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this looks strange. But without this-> the code would not compile. The reason why is that the name resolution in the templated code is excluding dependent base (base class dependent on a template argument, in our case it is EnableSharedFromThisBase<SubscriberBaseT>) from lookup:
https://gcc.gnu.org/wiki/VerboseDiagnostics#dependent_base

Comment thread src/SubscriberBase.h
using ExecutorBase::ExecutorBase;

void onSubscribe(std::shared_ptr<Subscription> subscription) override final {
runInExecutor(std::bind(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's much reason to prefer std::bind over a lambda, especially because lambdas are usually easier to understand.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a personal preference. std::bind does exactly the same with fever lines of code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides being more common and easier to understand, lambdas are also more optimizable, since they're effectively like just creating a class with an operator() that does the work you put inside. std::bind is crazy template magic, as I understand it (classic Boost).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I will change this to lambdas. It is also much easier to put a breakpoint into lambda to see it was executed.

Comment thread src/SubscriptionBase.h

void request(size_t n) override final {
runInExecutor(
std::bind(&SubscriptionBase::requestImpl, shared_from_this(), n));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These uses of shared_from_this() could lead to some very surprising behavior if a derived class wants to call one of these methods during construction.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the constructor the internal weak_ptr is not set yet. It is set by the shared_ptr afterwards.
So the behavior that you will always get is exception bad_weak_ptr. Unfortunately that's what we have to live with ;(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's ok as long as it throws an error.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only C++17 guarantees that an error is thrown. An implementation is allowed to do anything it wants before then—and that could very well happen on iOS or Android.

@martinsix martinsix left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this! Finally no more mixins.

Comment thread src/SubscriptionBase.h

void request(size_t n) override final {
runInExecutor(
std::bind(&SubscriptionBase::requestImpl, shared_from_this(), n));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's ok as long as it throws an error.

@lehecka lehecka merged commit e075610 into rsocket:master Oct 26, 2016
@jspahrsummers

Copy link
Copy Markdown
Contributor

I requested changes in the hopes that the documentation could be updated before merging—sorry for not making that clear. 😕

dgrnbrg-meta added a commit to dgrnbrg-meta/rsocket-cpp that referenced this pull request Mar 11, 2022
Summary:
X-link: facebookincubator/katran#157

X-link: facebook/watchman#1011

X-link: facebook/wangle#205

X-link: facebook/proxygen#401

X-link: facebook/openr#129

X-link: facebookarchive/fbzmq#35

X-link: facebook/fb303#26

X-link: facebookarchive/bistro#59

X-link: facebook/folly#1734

X-link: facebook/fboss#113

Adds an environment variable to getdeps to provide `hg` info to avoid calling `hg` directly.

When using `getdeps` inside a containerized environment (which we need to build Research Super Cluster tooling with the correct linker attributes), `getdeps` fails because of unregistered mercurial extensions in the `hgrc`.

This allows `getdeps` to be useable in an environment where the mercurial extensions used in a project aren't installed/available.

Reviewed By: vivekspai

Differential Revision: D34732506

fbshipit-source-id: a4408af450c35b42b0dac95127ba46d305202419
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants