Skip to content

Added NNG broker.#607

Open
s3rius wants to merge 1 commit intomasterfrom
feature/nng
Open

Added NNG broker.#607
s3rius wants to merge 1 commit intomasterfrom
feature/nng

Conversation

@s3rius
Copy link
Copy Markdown
Member

@s3rius s3rius commented Apr 11, 2026

Ref: #602.

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 11, 2026

Codecov Report

❌ Patch coverage is 0% with 22 lines in your changes missing coverage. Please review.
✅ Project coverage is 78.30%. Comparing base (02700d4) to head (2fd7a3c).

Files with missing lines Patch % Lines
taskiq/brokers/nng_broker.py 0.00% 22 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #607      +/-   ##
==========================================
- Coverage   79.00%   78.30%   -0.70%     
==========================================
  Files          69       70       +1     
  Lines        2463     2485      +22     
==========================================
  Hits         1946     1946              
- Misses        517      539      +22     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.


async def kick(self, message: BrokerMessage) -> None:
"""Send a message."""
await self.socket.ascend(message.message)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct pynng method for async sending is asend(). Currently, this will raise an AttributeError on the very first task dispatch.

:param addr: address which is used by both worker and client.
"""
super().__init__()
self.socket = pynng.Pair1(polyamorous=True)
Copy link
Copy Markdown

@alexted alexted Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation has a fundamental architectural flaw that limits its use in production environments.
The choice of pynng.Pair1(polyamorous=True) is problematic. The PAIR protocol is strictly designed for 1-to-1 communication. Even in polyamorous mode (which is deprecated in NNG), it doesn't provide the load balancing and reliability required for a distributed task queue. The -w 1 constraint mentioned in the docs is a symptom of this: it effectively prevents taskiq from scaling horizontally, which is one of the library's core strengths.

For a robust, full-featured broker that supports not just task distribution, but also feedback loops (task status, heartbeats, metrics, or results), the Router/Dealer patterns is the industry standard.

  • Router0 (Broker/Client side): Can talk to multiple workers asynchronously. It tracks which worker sent which message, allowing for complex routing and status tracking.
  • Dealer0 (Worker side): A fully asynchronous socket that can receive tasks and send back updates (e.g., "Task Started", "Progress: 50%", "Success") without blocking.

Here is how the core logic should look to support multiple workers and bi-directional communication:

Client Side (NNGBroker):

# Using Router0 to manage multiple workers
self.socket = pynng.Router0(listen=self.addr)

async def kick(self, message: TaskiqMessage) -> None:
    # Router handles distribution. 
    # Note: Using asend() instead of the 'ascend' typo.
    await self.socket.asend(message.message)

Worker Side:

# Using Dealer0 for async feedback
self.socket = pynng.Dealer0(dial=self.addr)

async def listen(self):
    while True:
        raw_msg = await self.socket.arecv()
        # Worker can immediately report status back through the same socket
        await self.socket.asend(b"ACK: Task Received") 
        yield raw_msg

Moving to Router/Dealer will make this broker a first-class citizen in the taskiq ecosystem, capable of handling high-load, production scenarios.

Copy link
Copy Markdown

@alexted alexted left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new NNGBroker is not exported in taskiq/__init__.py and taskiq/brokers/__init__.py. We need to export it to keep the public API consistent with ZeroMQBroker and others, so users can just do from taskiq import NNGBroker.

Copy link
Copy Markdown

@alexted alexted left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a core networking component, we strictly need at least basic integration tests (e.g., in tests/brokers/) to verify startup, kick, listen, and shutdown behaviors.

Comment on lines +31 to +34
if self.is_worker_process:
self.socket.listen(self.addr)
else:
self.socket.dial(self.addr, block=True)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NNG (like most brokers) can be susceptible to slow joiner syndrome during dial and listen. If we kick a task immediately after dial, the message might be dropped or cause an error before the connection is fully established. To make the initial transmission more robust, should we consider adding a brief timeout or a socket state check?
Just thinking out loud - pure theory.

implementation.
"""

def __init__(self, addr: str) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addr parameter is accepted but remains unvalidated. Given that Taskiq relies heavily on Pydantic, it would be fitting to add basic validation for the connection string (e.g., checking for ipc:// or tcp:// prefixes).

@@ -0,0 +1,48 @@
from collections.abc import AsyncGenerator

import pynng
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency is added as an extra in pyproject.toml, but the code is missing a guard import.

try:
    import pynng
except ImportError:
    raise ImportError("Install 'taskiq[nng]' to use NNGBroker.")

Without it the user will get a confusing ModuleNotFoundError only when they try to initialize the class.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants