Connector unsubscribe bound pubs#895
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes RedisConnector pub/sub callback unregistration for bound methods by comparing callbacks by equality instead of object identity.
Changes:
- Updates
_filter_topics_cbto remove callbacks using!=, allowing equivalent bound method objects to match during unregister.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
f29dc4d to
44ef315
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (2)
bec_lib/bec_lib/redis_connector.py:1031
- Adding
**kwargstounregistermakes kwargs look like a subscription selector, but the stream unregister paths still pass onlycbinto_unregister_stream. As a result, unregistering a stream subscription with non-matching kwargs still removes it, unlike pub/sub subscriptions.
def unregister(self, topics=None, patterns=None, cb=None, **kwargs):
bec_lib/bec_lib/redis_connector.py:1055
- This all-subscriptions path treats every
_topics_cbkey as a normal topic, but pattern registrations are stored in the same map. Callingunregister(cb=...)for a pattern subscription removes it from_topics_cband then usesunsubscribeinstead ofpunsubscribe, leaving the Redis pattern subscription active.
self.unregister(topics=topics, cb=cb, **kwargs)
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
| def _matches_subscription( | ||
| item: tuple[louie.saferef.BoundMethodWeakref, dict], cb: Callable | None, kwargs: dict | ||
| ) -> bool: | ||
| cb_ref, item_kwargs = item | ||
| if cb is not None and cb_ref() != cb: |
| return unsubscribe_list | ||
|
|
||
| def unregister(self, topics=None, patterns=None, cb=None): | ||
| def unregister(self, topics=None, patterns=None, cb=None, **kwargs): |
d-perl
left a comment
There was a problem hiding this comment.
Why does this add kwargs? Rather I think we should simplify and match the unsubscription only on the endpoint+callback. Is there any important example where we actually make use of a duplicate subscription with different kwargs? Otherwise I think the simplification far outweighs the advantage of this unused extra complexity, both in the connector and at the site of use. It is much more practical not to need to remember the kwargs to unsubscribe.
|
@d-perl if we allow kwargs as input, we should also allow them to scope the unsubscription. Right now, they are used to e.g. scope the subscription to a scan_id, passed in as kwarg. If we don't support unsubscribing using a kwarg, I cannot distinguish these callbacks anymore and all of them would trigger indefinitely. |
|
@wakonig yeah, I understand, my question is more is this actually used in the codebase anywhere? I don't really see how it's useful to have the same callback trigger multiple times with different kwargs. When I removed this possibility for streams, there were no occasions where it was used. It also seems quite inefficient. Imagine you were watching a hundred IDs. Rather than maintaining a separate subscription for each ID, wouldn't it be better to track watched IDs in the callback owner and just have one callback run per message? Alternatively, if a different object is subscribed for each scan ID, then the endpoint and callback function are enough to differentiate them and the kwargs aren't needed. |
But these are not separate subscriptions to redis. The only difference is whether you first loop over the callbacks or over IDs. I doubt you would see any difference in performance.
True, but that's often quite difficult to achieve, given that you want to treat them likely the same, just with additional metadata. Imagine the following (taken from my attempt to unify the scan progress yesterday ;)): You have a subscription to a device progress, which is highly bespoke to a single scan id. There is no guarantee that you will receive all elements of the device progress before the scan is closed as it solely depends on redis and OS scheduling. So you have to prepare for multiple subscriptions, yet different scan ids. If you simply try to unsubscribe all callbacks, you would potentially kick out valid subscriptions from a currently running scan. In general, I don't have a problem with removing that option, but then we should not be allowed to put in kwargs in the first place. Not being able to unsubscribe and essentially having multiple callbacks without the ability to remove the ones that are not needed anymore sounds more like a bug to me. |
|
@wyzula-jan @d-perl Given that this topic may require some more discussion, I would suggest that we extract the "bug fix" out of the PR and create a new one. It is sufficiently urgent to not leave this pending for too long |
|
@wakonig nah, I'm okay with this solution as long as you assure me there's an actual use case and it's not just complicated dead code :) |
hotfix which should be rollout ASAP #896 |
Description
Redis connector was not unregistering callback of bound methods → fix: Instead of identity check,
!=Additional Comments
Cherry pick first commit from #893 do the same work + tests + kwargs forwarding to unsubscribe