Opened 4 months ago
Last modified 2 months ago
#36916 assigned New feature
Add support for streaming with TaskGroups
| Reported by: | Thomas Grainger | Owned by: | Carlton Gibson |
|---|---|---|---|
| Component: | HTTP handling | Version: | dev |
| Severity: | Normal | Keywords: | structured-concurrency, taskgroups |
| Cc: | Thomas Grainger | Triage Stage: | Accepted |
| Has patch: | no | Needs documentation: | no |
| Needs tests: | no | Patch needs improvement: | no |
| Easy pickings: | no | UI/UX: | no |
Description (last modified by )
https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4
https://github.com/django/new-features/issues/117
Feature Description
see https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4
I’d like to be able to write code that combines multiple streams of data:
async def news_and_weather(request: HttpRequest) -> StreamingHttpResponse:
async def gen() -> AsyncGenerator[bytes]:
async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
async with tx, connect_ws(ws_url) as conn:
async for msg in conn:
await tx.send(msg)
async with anyio.create_task_group() as tg:
tx, rx = anyio.create_memory_object_stream[bytes]()
with tx, rx:
tg.start_soon(push, "ws://example.com/news", tx.clone())
tg.start_soon(push, "ws://example.com/weather", tx.clone())
tx.close()
async for msg in rx:
yield msg # yield in async generator!! illegal inside TaskGroup!
return StreamingHttpResponse(gen())
Problem
however this doesn’t work because I’m using a yield inside an async generator that’s not a context manager, and calling aclosing() on that async generator is not sufficient to allow a TaskGroup to cancel itself and catch the cancel error.
from useful_types import SupportsAnext
class AsyncIteratorBytesResource(Protocol):
"""
all the machinery needed to safely run an AsyncGenerator[Bytes]
(for django-stubs) this allows AsyncGenerator[bytes] but is less strict
so would also allow a anyio MemoryObjectRecieveStream[bytes]]
"""
async def __aiter__(self) -> SupportsAnext[bytes]: ...
async def aclose(self) -> object: ...
async def news_and_weather(request: HttpRequest) -> StreamingAcmgrHttpResponse:
@contextlib.asynccontextmanager
async def acmgr_gen() -> AsyncGenerator[AsyncIteratorBytesResource]:
async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
async with tx, connect_ws(ws_url) as conn:
async for msg in conn:
await tx.send(msg)
async with anyio.create_task_group() as tg:
tx, rx = anyio.create_memory_object_stream[bytes]()
with tx, rx:
tg.start_soon(push, "ws://example.com/news", tx.clone())
tg.start_soon(push, "ws://example.com/weather", tx.clone())
tx.close()
yield rx # yield inside asynccontextmanager, permitted inside TaskGroup
return StreamingAcmgrHttpResponse(acmgr_gen())
Implementation Suggestions
Change History (10)
comment:1 by , 4 months ago
| Description: | modified (diff) |
|---|---|
| Owner: | set to |
| Status: | new → assigned |
comment:2 by , 4 months ago
| Owner: | removed |
|---|---|
| Status: | assigned → new |
comment:3 by , 4 months ago
| Description: | modified (diff) |
|---|
comment:4 by , 4 months ago
| Needs documentation: | set |
|---|
comment:5 by , 4 months ago
| Easy pickings: | unset |
|---|---|
| Owner: | set to |
| Status: | new → assigned |
| Summary: | add support for streaming with TaskGroups → Add support for streaming with TaskGroups |
| Triage Stage: | Unreviewed → Accepted |
| Type: | Uncategorized → New feature |
comment:6 by , 3 months ago
| Patch needs improvement: | set |
|---|
General approach looks good. I left some comments, and there’s some discussion about a possible alternative approach (but I’m not sure if that’s viable or not, so we should see if that appears.)
comment:7 by , 2 months ago
| Has patch: | unset |
|---|---|
| Patch needs improvement: | unset |
comment:8 by , 2 months ago
How about this approach? Working from my phone so posting as a patch rather than pushing directly.
Summary of changes (from Claude)
django/http/response.py
_set_streaming_contentdetects async context managers viahasattr(__aenter__, __aexit__), setsself.is_acmgr = Trueand stores the value inself.__acmgrstreaming_contentproperty:is_acmgris an early return at the top (unnested fromis_async). Returns an@asynccontextmanagerwhenis_acmgris True — callers that checkis_acmgruseasync with response.streaming_content as agen. Both acmgr and regular async paths applymake_bytesandaclosetheir underlying iterator in afinallyblockStreamingHttpResponsegains__aenter__/__aexit__: for acmgr responses,__aenter__calls intoself.streaming_content(no duplication ofawrapperlogic needed), stores the context and the yielded generator, then__aexit__closes the generator and exits the context in the right order soTaskGroupcleans up correctly__iter__,__aiter__, andgetvalueall raiseIsAcmgrExceptionwhenis_acmgris TrueIsAcmgrExceptionis exported fromdjango.http
django/core/handlers/asgi.py
send_responsesimplified toasync with response as content— works for both regular streaming and acmgr responses.aclosingno longer needed
django/middleware/gzip.py
- Adds
is_acmgrbranch: capturesresponse.streaming_content(an acmgr) and wraps it in a new@asynccontextmanagerthat feeds the yielded generator intoacompress_sequence
django/utils/text.py
acompress_sequencewraps its entire body intry/finallytoacloseits operand on exit if the method is present
-
django/core/handlers/asgi.py
diff --git a/django/core/handlers/asgi.py b/django/core/handlers/asgi.py index 9555860..0f55717 100644
a b import sys 4 4 import tempfile 5 5 import traceback 6 6 from collections import defaultdict 7 from contextlib import aclosing,closing7 from contextlib import closing 8 8 9 9 from asgiref.sync import ThreadSensitiveContext, sync_to_async 10 10 … … class ASGIHandler(base.BaseHandler): 315 315 ) 316 316 # Streaming responses need to be pinned to their iterator. 317 317 if response.streaming: 318 # - Consume via `__aiter__` and not `streaming_content` directly, 319 # to allow mapping of a sync iterator. 320 # - Use aclosing() when consuming aiter. See 321 # https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e 322 async with aclosing(aiter(response)) as content: 318 async with response as content: 323 319 async for part in content: 324 320 for chunk, _ in self.chunk_bytes(part): 325 321 await send( -
django/http/__init__.py
diff --git a/django/http/__init__.py b/django/http/__init__.py index 628564e..2df8fa6 100644
a b from django.http.request import ( 8 8 ) 9 9 from django.http.response import ( 10 10 BadHeaderError, 11 11 12 FileResponse, 12 13 Http404, 13 14 HttpResponse, … … __all__ = [ 47 48 "HttpResponseServerError", 48 49 "Http404", 49 50 "BadHeaderError", 51 50 52 "JsonResponse", 51 53 "FileResponse", 52 54 ] -
django/http/response.py
diff --git a/django/http/response.py b/django/http/response.py index 9bf0b14..e3976da 100644
a b import re 7 7 import sys 8 8 import time 9 9 import warnings 10 10 11 from email.header import Header 11 12 from http.client import responses 12 13 from urllib.parse import urlsplit … … class BadHeaderError(ValueError): 104 105 pass 105 106 106 107 108 109 110 111 107 112 class HttpResponseBase: 108 113 """ 109 114 An HTTP response base class with dictionary-accessed headers. … … class StreamingHttpResponse(HttpResponseBase): 479 484 480 485 @property 481 486 def streaming_content(self): 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 482 505 if self.is_async: 483 506 # pull to lexical scope to capture fixed reference in case 484 507 # streaming_content is set again later. 485 508 _iterator = self._iterator 486 509 487 510 async def awrapper(): 488 async for part in _iterator: 489 yield self.make_bytes(part) 511 try: 512 async for part in _iterator: 513 yield self.make_bytes(part) 514 finally: 515 if hasattr(_iterator, "aclose"): 516 await _iterator.aclose() 490 517 491 518 return awrapper() 492 519 else: … … class StreamingHttpResponse(HttpResponseBase): 498 525 499 526 def _set_streaming_content(self, value): 500 527 # Ensure we can never iterate on "value" more than once. 528 529 530 531 532 533 501 534 try: 502 535 self._iterator = iter(value) 503 536 self.is_async = False … … class StreamingHttpResponse(HttpResponseBase): 507 540 if hasattr(value, "close"): 508 541 self._resource_closers.append(value.close) 509 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 510 559 def __iter__(self): 560 561 562 563 564 511 565 try: 512 566 return iter(self.streaming_content) 513 567 except TypeError: … … class StreamingHttpResponse(HttpResponseBase): 528 582 return map(self.make_bytes, iter(async_to_sync(to_list)(self._iterator))) 529 583 530 584 async def __aiter__(self): 585 586 587 588 589 531 590 try: 532 591 async for part in self.streaming_content: 533 592 yield part … … class StreamingHttpResponse(HttpResponseBase): 544 603 yield part 545 604 546 605 def getvalue(self): 606 607 608 609 610 547 611 return b"".join(self.streaming_content) 548 612 549 613 -
django/middleware/gzip.py
diff --git a/django/middleware/gzip.py b/django/middleware/gzip.py index eb151d7..78b5739 100644
a b 1 2 1 3 from django.utils.cache import patch_vary_headers 2 4 from django.utils.deprecation import MiddlewareMixin 3 5 from django.utils.regex_helper import _lazy_re_compile … … class GZipMiddleware(MiddlewareMixin): 31 33 return response 32 34 33 35 if response.streaming: 34 if response.is_async: 36 if response.is_acmgr: 37 original_acmgr = response.streaming_content 38 max_random_bytes = self.max_random_bytes 39 40 @asynccontextmanager 41 async def compressed_acmgr(): 42 async with original_acmgr as agen: 43 yield acompress_sequence( 44 agen, 45 max_random_bytes=max_random_bytes, 46 ) 47 48 response.streaming_content = compressed_acmgr() 49 elif response.is_async: 35 50 response.streaming_content = acompress_sequence( 36 51 response.streaming_content, 37 52 max_random_bytes=self.max_random_bytes, -
django/utils/text.py
diff --git a/django/utils/text.py b/django/utils/text.py index d1306f9..55bd6f5 100644
a b def compress_sequence(sequence, *, max_random_bytes=None): 390 390 391 391 392 392 async def acompress_sequence(sequence, *, max_random_bytes=None): 393 buf = StreamingBuffer() 394 filename = _get_random_filename(max_random_bytes) if max_random_bytes else None 395 with GzipFile( 396 filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0 397 ) as zfile: 398 # Output headers... 393 try: 394 buf = StreamingBuffer() 395 filename = _get_random_filename(max_random_bytes) if max_random_bytes else None 396 with GzipFile( 397 filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0 398 ) as zfile: 399 # Output headers... 400 yield buf.read() 401 async for item in sequence: 402 zfile.write(item) 403 zfile.flush() 404 data = buf.read() 405 if data: 406 yield data 399 407 yield buf.read() 400 async for item in sequence: 401 zfile.write(item) 402 zfile.flush() 403 data = buf.read() 404 if data: 405 yield data 406 yield buf.read() 408 finally: 409 if hasattr(sequence, "aclose"): 410 await sequence.aclose()
comment:9 by , 2 months ago
Attaching updated patch based on the approach discussed in PR #19364.
Summary of changes
django/http/response.py:
_set_streaming_contentdetects async context managers viahasattr(__aenter__, __aexit__), setsis_acmgr = Trueand stores the valuestreaming_contentproperty returns an@asynccontextmanagerwrappingmake_byteswhenis_acmgris TrueStreamingHttpResponsegains__aenter__/__aexit__: for acmgr responses, enters the streaming_content CM; for regular responses, returnsaiter(self).__aexit__only handles acmgr CM cleanup —aclosingin the ASGI handler handles iterator close__iter__,__aiter__, andgetvalueraiseIsAcmgrExceptionwhenis_acmgris True
django/core/handlers/asgi.py:
- Single code path:
async with response as agen, aclosing(agen) as content:— works for both regular streaming and acmgr responses
django/middleware/gzip.py:
- Adds
is_acmgrbranch that wraps the acmgr in a new@asynccontextmanagerfeeding intoacompress_sequence
django/utils/text.py:
acompress_sequencewraps its body intry/finallytoacloseits operand
Tests
- 13 new tests in
tests/httpwrappers/tests.pycovering: basic acmgr streaming, make_bytes coercion,IsAcmgrExceptionguards on__iter__/__aiter__/getvalue,__aexit__on error and break, non-acmgr__aenter__fallback, reassignment, single-producer TaskGroup+Queue, and multi-producer fan-in (news-and-weather pattern) - 1 new test in
tests/middleware/tests.pyfor gzip compression of acmgr streaming responses
Docs
- New "Streaming with TaskGroup" section in
docs/ref/request-response.txtwith fullnews_and_weatherexample, key points re PEP 789, and anyio note - Release note in
docs/releases/6.1.txt
Patch
-
django/core/handlers/asgi.py
diff --git a/django/core/handlers/asgi.py b/django/core/handlers/asgi.py index 7ee5208..bcdafdc 100644
a b class ASGIHandler(base.BaseHandler): 318 318 ) 319 319 # Streaming responses need to be pinned to their iterator. 320 320 if response.streaming: 321 # - Consume via `__aiter__` and not `streaming_content` directly, 322 # to allow mapping of a sync iterator. 323 # - Use aclosing() when consuming aiter. See 324 # https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e 325 async with aclosing(aiter(response)) as content: 321 # Use aclosing() when consuming aiter. See 322 # https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e 323 async with response as agen, aclosing(agen) as content: 326 324 async for part in content: 327 325 for chunk, _ in self.chunk_bytes(part): 328 326 await send( -
django/http/__init__.py
diff --git a/django/http/__init__.py b/django/http/__init__.py index 628564e..2df8fa6 100644
a b from django.http.request import ( 8 8 ) 9 9 from django.http.response import ( 10 10 BadHeaderError, 11 11 12 FileResponse, 12 13 Http404, 13 14 HttpResponse, … … __all__ = [ 47 48 "HttpResponseServerError", 48 49 "Http404", 49 50 "BadHeaderError", 51 50 52 "JsonResponse", 51 53 "FileResponse", 52 54 ] -
django/http/response.py
diff --git a/django/http/response.py b/django/http/response.py index 9bf0b14..c1c0c20 100644
a b import re 7 7 import sys 8 8 import time 9 9 import warnings 10 10 11 from email.header import Header 11 12 from http.client import responses 12 13 from urllib.parse import urlsplit … … class BadHeaderError(ValueError): 104 105 pass 105 106 106 107 108 109 110 111 107 112 class HttpResponseBase: 108 113 """ 109 114 An HTTP response base class with dictionary-accessed headers. … … class StreamingHttpResponse(HttpResponseBase): 479 484 480 485 @property 481 486 def streaming_content(self): 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 482 505 if self.is_async: 483 506 # pull to lexical scope to capture fixed reference in case 484 507 # streaming_content is set again later. 485 508 _iterator = self._iterator 486 509 487 510 async def awrapper(): 488 async for part in _iterator: 489 yield self.make_bytes(part) 511 try: 512 async for part in _iterator: 513 yield self.make_bytes(part) 514 finally: 515 if hasattr(_iterator, "aclose"): 516 await _iterator.aclose() 490 517 491 518 return awrapper() 492 519 else: … … class StreamingHttpResponse(HttpResponseBase): 498 525 499 526 def _set_streaming_content(self, value): 500 527 # Ensure we can never iterate on "value" more than once. 528 529 530 531 532 533 501 534 try: 502 535 self._iterator = iter(value) 503 536 self.is_async = False … … class StreamingHttpResponse(HttpResponseBase): 507 540 if hasattr(value, "close"): 508 541 self._resource_closers.append(value.close) 509 542 543 544 545 546 547 548 549 550 551 552 510 553 def __iter__(self): 554 555 556 557 558 511 559 try: 512 560 return iter(self.streaming_content) 513 561 except TypeError: … … class StreamingHttpResponse(HttpResponseBase): 528 576 return map(self.make_bytes, iter(async_to_sync(to_list)(self._iterator))) 529 577 530 578 async def __aiter__(self): 579 580 581 582 583 531 584 try: 532 585 async for part in self.streaming_content: 533 586 yield part … … class StreamingHttpResponse(HttpResponseBase): 544 597 yield part 545 598 546 599 def getvalue(self): 600 601 602 603 604 547 605 return b"".join(self.streaming_content) 548 606 549 607 -
django/middleware/gzip.py
diff --git a/django/middleware/gzip.py b/django/middleware/gzip.py index eb151d7..78b5739 100644
a b 1 2 1 3 from django.utils.cache import patch_vary_headers 2 4 from django.utils.deprecation import MiddlewareMixin 3 5 from django.utils.regex_helper import _lazy_re_compile … … class GZipMiddleware(MiddlewareMixin): 31 33 return response 32 34 33 35 if response.streaming: 34 if response.is_async: 36 if response.is_acmgr: 37 original_acmgr = response.streaming_content 38 max_random_bytes = self.max_random_bytes 39 40 @asynccontextmanager 41 async def compressed_acmgr(): 42 async with original_acmgr as agen: 43 yield acompress_sequence( 44 agen, 45 max_random_bytes=max_random_bytes, 46 ) 47 48 response.streaming_content = compressed_acmgr() 49 elif response.is_async: 35 50 response.streaming_content = acompress_sequence( 36 51 response.streaming_content, 37 52 max_random_bytes=self.max_random_bytes, -
django/utils/text.py
diff --git a/django/utils/text.py b/django/utils/text.py index d1306f9..55bd6f5 100644
a b def compress_sequence(sequence, *, max_random_bytes=None): 390 390 391 391 392 392 async def acompress_sequence(sequence, *, max_random_bytes=None): 393 buf = StreamingBuffer() 394 filename = _get_random_filename(max_random_bytes) if max_random_bytes else None 395 with GzipFile( 396 filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0 397 ) as zfile: 398 # Output headers... 393 try: 394 buf = StreamingBuffer() 395 filename = _get_random_filename(max_random_bytes) if max_random_bytes else None 396 with GzipFile( 397 filename=filename, mode="wb", compresslevel=6, fileobj=buf, mtime=0 398 ) as zfile: 399 # Output headers... 400 yield buf.read() 401 async for item in sequence: 402 zfile.write(item) 403 zfile.flush() 404 data = buf.read() 405 if data: 406 yield data 399 407 yield buf.read() 400 async for item in sequence: 401 zfile.write(item) 402 zfile.flush() 403 data = buf.read() 404 if data: 405 yield data 406 yield buf.read() 408 finally: 409 if hasattr(sequence, "aclose"): 410 await sequence.aclose() 407 411 408 412 409 413 # Expression to match some_token and some_token="with spaces" (and similarly -
docs/ref/request-response.txt
diff --git a/docs/ref/request-response.txt b/docs/ref/request-response.txt index ba60415..945683a 100644
a b is streaming. If you perform long-running operations in your view before 1424 1424 returning the ``StreamingHttpResponse`` object, then you may also want to 1425 1425 :ref:`handle disconnections in the view <async-handling-disconnect>` itself. 1426 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1427 1524 ``FileResponse`` objects 1428 1525 ======================== 1429 1526 -
docs/releases/6.1.txt
diff --git a/docs/releases/6.1.txt b/docs/releases/6.1.txt index 00eaf53..790782c 100644
a b Pagination 323 323 Requests and Responses 324 324 ~~~~~~~~~~~~~~~~~~~~~~ 325 325 326 327 328 329 330 326 331 * :attr:`HttpRequest.multipart_parser_class <django.http.HttpRequest.multipart_parser_class>` 327 332 can now be customized to use a different multipart parser class. 328 333 -
tests/asgi/urls.py
diff --git a/tests/asgi/urls.py b/tests/asgi/urls.py index 0311cf3..b7fa206 100644
a b 1 1 import asyncio 2 2 3 import threading 3 4 import time 4 5 -
tests/httpwrappers/tests.py
diff --git a/tests/httpwrappers/tests.py b/tests/httpwrappers/tests.py index 3e8364e..a2add61 100644
a b 1 2 1 3 import copy 2 4 import json 3 5 import os 4 6 import pickle 7 5 8 import unittest 6 9 import uuid 7 10 … … from django.http import ( 16 19 HttpResponseNotModified, 17 20 HttpResponsePermanentRedirect, 18 21 HttpResponseRedirect, 22 19 23 JsonResponse, 20 24 QueryDict, 21 25 SimpleCookie, … … class StreamingHttpResponseTests(SimpleTestCase): 808 812 with self.assertWarnsMessage(Warning, msg): 809 813 self.assertEqual(b"hello", await anext(aiter(r))) 810 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 811 1059 def test_text_attribute_error(self): 812 1060 r = StreamingHttpResponse(iter(["hello", "world"])) 813 1061 msg = "This %s instance has no `text` attribute." % r.__class__.__name__ -
tests/middleware/tests.py
diff --git a/tests/middleware/tests.py b/tests/middleware/tests.py index a61c4b1..4192ada 100644
a b 1 1 2 import gzip 2 3 import random 3 4 import re … … class GZipMiddlewareTest(SimpleTestCase): 936 937 self.assertEqual(r.get("Content-Encoding"), "gzip") 937 938 self.assertFalse(r.has_header("Content-Length")) 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 939 971 def test_compress_streaming_response_unicode(self): 940 972 """ 941 973 Compression is performed on responses with streaming Unicode content.
comment:10 by , 2 months ago
| Needs documentation: | unset |
|---|---|
| Owner: | changed from to |
Thanks. Reception on the new-features repo issue seems positive, so though it hasn't moved through any swimlanes there yet, I will speculatively move this one to Accepted assuming it likely will.