Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
a52b3553
Commit
a52b3553
authored
Feb 14, 2025
by
Neelay Shah
Committed by
GitHub
Feb 14, 2025
Browse files
fix: updating subscribe and publish to take same args for event topic
parent
45b3505c
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
30 additions
and
2 deletions
+30
-2
icp/python/src/triton_distributed/icp/nats_event_plane.py
icp/python/src/triton_distributed/icp/nats_event_plane.py
+4
-2
icp/tests/python/event_plane/test_publish_subscribe.py
icp/tests/python/event_plane/test_publish_subscribe.py
+26
-0
No files found.
icp/python/src/triton_distributed/icp/nats_event_plane.py
View file @
a52b3553
...
@@ -246,7 +246,7 @@ class NatsEventPlane:
...
@@ -246,7 +246,7 @@ class NatsEventPlane:
self
,
self
,
payload
:
bytes
|
Any
,
payload
:
bytes
|
Any
,
event_type
:
Optional
[
str
]
=
None
,
event_type
:
Optional
[
str
]
=
None
,
event_topic
:
Optional
[
EventTopic
]
=
None
,
event_topic
:
Optional
[
EventTopic
|
str
|
List
[
str
]
]
=
None
,
timestamp
:
Optional
[
datetime
.
datetime
]
=
datetime
.
datetime
.
now
(
datetime
.
UTC
),
timestamp
:
Optional
[
datetime
.
datetime
]
=
datetime
.
datetime
.
now
(
datetime
.
UTC
),
event_id
:
Optional
[
uuid
.
UUID
]
=
uuid
.
uuid4
(),
event_id
:
Optional
[
uuid
.
UUID
]
=
uuid
.
uuid4
(),
)
->
Event
:
)
->
Event
:
...
@@ -271,6 +271,9 @@ class NatsEventPlane:
...
@@ -271,6 +271,9 @@ class NatsEventPlane:
if
event_id
is
None
:
if
event_id
is
None
:
event_id
=
uuid
.
uuid4
()
event_id
=
uuid
.
uuid4
()
if
event_topic
is
not
None
and
not
isinstance
(
event_topic
,
EventTopic
):
event_topic
=
EventTopic
(
event_topic
)
event_metadata
=
EventMetadata
(
event_metadata
=
EventMetadata
(
event_id
=
event_id
,
event_id
=
event_id
,
event_topic
=
event_topic
,
event_topic
=
event_topic
,
...
@@ -290,7 +293,6 @@ class NatsEventPlane:
...
@@ -290,7 +293,6 @@ class NatsEventPlane:
subject
=
self
.
_compose_publish_subject
(
event_metadata
)
subject
=
self
.
_compose_publish_subject
(
event_metadata
)
await
self
.
_nc
.
publish
(
subject
,
message
)
await
self
.
_nc
.
publish
(
subject
,
message
)
event_with_metadata
=
OnDemandEvent
(
event_with_metadata
=
OnDemandEvent
(
payload
,
metadata_serialized
,
event_metadata
payload
,
metadata_serialized
,
event_metadata
)
)
...
...
icp/tests/python/event_plane/test_publish_subscribe.py
View file @
a52b3553
...
@@ -114,6 +114,32 @@ class TestEventPlaneFunctional:
...
@@ -114,6 +114,32 @@ class TestEventPlaneFunctional:
assert
len
(
received_events
)
==
1
assert
len
(
received_events
)
==
1
assert
received_events
[
0
].
event_id
==
event_metadata
.
event_id
assert
received_events
[
0
].
event_id
==
event_metadata
.
event_id
@
pytest
.
mark
.
asyncio
async
def
test_event_topic_list
(
self
,
nats_server
,
event_plane
):
print
(
f
"Print loop test:
{
id
(
asyncio
.
get_running_loop
())
}
"
)
received_events
:
List
[
Event
]
=
[]
event
=
b
"test_payload"
subscription
=
await
event_plane
.
subscribe
(
event_topic
=
"hello"
)
event_metadata
=
await
event_plane
.
publish
(
event
,
event_topic
=
[
"hello"
])
# Allow time for message to propagate
await
asyncio
.
sleep
(
2
)
async
for
x
in
subscription
:
print
(
x
.
timestamp
)
print
(
x
.
event_id
)
print
(
x
.
event_type
)
print
(
x
.
event_topic
)
print
(
x
.
payload
)
received_events
.
append
(
x
)
break
assert
len
(
received_events
)
==
1
assert
received_events
[
0
].
event_id
==
event_metadata
.
event_id
@
pytest
.
mark
.
asyncio
@
pytest
.
mark
.
asyncio
async
def
test_custom_type
(
self
,
nats_server
,
event_plane
):
async
def
test_custom_type
(
self
,
nats_server
,
event_plane
):
print
(
f
"Print loop test:
{
id
(
asyncio
.
get_running_loop
())
}
"
)
print
(
f
"Print loop test:
{
id
(
asyncio
.
get_running_loop
())
}
"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment