J'ai une application simple, avec deux fonctions, l'une pour écouter le sujet et l'autre pour le point de terminaison Web. Je veux créer un flux d'événements côté serveur (SSE), c'est-à-dire du texte / flux d'événements, afin que du côté client, je puisse l'écouter en utilisant EventSource.
J'ai le code suivant pour l'instant, où chaque fonction fait son travail particulier:
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
@app.agent(test_topic)
async def test_topic_agent(stream):
async for value in stream:
print(f"test_topic_agent RECEIVED -- {value!r}")
yield value
@app.page("/")
async def index(self, request):
return self.text("yey")
Maintenant, je veux dans l'index, quelque chose comme ce code, mais en utilisant faust:
import asyncio
from aiohttp import web
from aiohttp.web import Response
from aiohttp_sse import sse_response
from datetime import datetime
async def hello(request):
loop = request.app.loop
async with sse_response(request) as resp:
while True:
data = 'Server Time : {}'.format(datetime.now())
print(data)
await resp.send(data)
await asyncio.sleep(1, loop=loop)
return resp
async def index(request):
d = """
<html>
<body>
<script>
var evtSource = new EventSource("/hello");
evtSource.onmessage = function(e) {
document.getElementById('response').innerText = e.data
}
</script>
<h1>Response from server:</h1>
<div id="response"></div>
</body>
</html>
"""
return Response(text=d, content_type='text/html')
app = web.Application()
app.router.add_route('GET', '/hello', hello)
app.router.add_route('GET', '/', index)
web.run_app(app, host='127.0.0.1', port=8080)
J'ai essayé ceci:
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
# @app.agent(test_topic)
# async def test_topic_agent(stream):
# async for value in stream:
# print(f"test_topic_agent RECEIVED -- {value!r}")
# yield value
@app.page("/", name="t1")
@app.agent(test_topic, name="t")
async def index(self, request):
return self.text("yey")
Mais cela me donne l'erreur suivante:
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 299, in find_app
val = symbol_by_name(app, imp=imp)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 262, in symbol_by_name
module = imp( # type: ignore
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
return imp(module, package=package)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
async def index(self, request):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
view = view_base.from_handler(cast(ViewHandlerFun, fun))
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/faust_demo/bin/faust", line 8, in <module>
sys.exit(cli())
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 781, in main
with self.make_context(prog_name, args, **extra) as ctx:
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 407, in make_context
self._maybe_import_app()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 372, in _maybe_import_app
find_app(appstr)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 303, in find_app
val = imp(app)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
return imp(module, package=package)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
async def index(self, request):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
view = view_base.from_handler(cast(ViewHandlerFun, fun))
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'
J'ai essayé ceci:
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
# @app.agent(test_topic)
# async def test_topic_agent(stream):
# async for value in stream:
# print(f"test_topic_agent RECEIVED -- {value!r}")
# yield value
@app.agent(test_topic, name="t")
@app.page("/", name="t1")
async def index(self, request):
return self.text("yey")
Mais j'obtiens l'erreur suivante:
[2020-03-28 10:32:50,676] [29976] [INFO] [^--Producer]: Creating topic 'app1-__assignor-__leader'
[2020-03-28 10:32:50,695] [29976] [INFO] [^--ReplyConsumer]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^--AgentManager]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^---Agent: app1.index]: Starting...
[2020-03-28 10:32:50,696] [29976] [ERROR] [^Worker]: Error: TypeError("__init__() missing 1 required positional argument: 'web'")
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/worker.py", line 273, in execute_from_commandline
self.loop.run_until_complete(self._starting_fut)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py", line 612, in run_until_complete
return future.result()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
await child.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
await child.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
await self.on_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/manager.py", line 58, in on_start
await agent.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
await self.on_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 282, in on_start
await self._on_start_supervisor()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 312, in _on_start_supervisor
res = await self._start_one(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 251, in _start_one
return await self._start_task(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 617, in _start_task
actor = self(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 525, in __call__
return self.actor_from_stream(stream,
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 552, in actor_from_stream
res = self.fun(actual_stream)
TypeError: __init__() missing 1 required positional argument: 'web'
[2020-03-28 10:32:50,703] [29976] [INFO] [^Worker]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Flush producer buffer...
[2020-03-28 10:32:50,703] [29976] [INFO] [^--TableManager]: Stopping...
Pourrait-il y avoir un moyen pour cela? Merci beaucoup d'avance!
python
python-3.x
apache-kafka
aiohttp
faust
Maverick
la source
la source
'Agent' object has no attribute '__name__'
... Avez-vous essayé de supprimername="t"
?name="t"
doesnt help, même erreur. @ cricket_007Réponses:
L'ouvrier Faust exposera également un serveur Web sur chaque instance, qui s'exécute par défaut sur le port 6066.
Le serveur utilisera la bibliothèque de serveur HTTP aiohttp et vous pouvez profiter de cette chose et créer un flux d'événements côté serveur (SSE) comme dans votre exemple de code.
Vous pouvez créer un agent qui lira à partir du sujet Kafka
test
et mettra à jour une variablelast_message_from_topic
avec le dernier message du sujet, cette variable sera également visible à partir de vos pages Web.Dans la page d'index (
@app.page('/')
), l'interface EventSource est utilisée pour recevoir les événements envoyés par le serveur. Il se connecte au serveur via HTTP et reçoit des événements au format texte / flux d'événements depuis la page/hello
sans fermer la connexion.La page Web
/hello
envoie à chaque seconde un texte de message avec le dernier message du sujet Kafkatest
et l'heure actuelle du serveur.voici mon
my_worker.py
code de fichier :vous devez maintenant démarrer l'ouvrier Faust avec la commande suivante:
sur votre navigateur Web, vous pouvez accéder à
http://localhost:6066/
:voici le code pour envoyer des messages à Kafka sur le sujet
test
(à partir d'un autre fichier python):la source
La lecture de la documentation Web faust ne semble pas gérer SSE.
@app.agent
est rappelé lorsqu'un message kafka est consommé et@app.page
lorsqu'une demande http est traitée. Leur combinaison n'est probablement pas possible.Une autre approche à l'aide
faust.web
consiste à interroger à partir du javascript.Par exemple en utilisant:
Cette implémentation naïve stocke le message kafka
app.lastmsg
qui pourrait être obtenu par l'API/msg
.Pour utiliser SSE, vous pouvez l'utiliser
asyncio
pour le composant WebPart etfaust
pour le consommateur Kafka.la source