Source code for netbird.resources.cloud.event_streaming

"""
Event streaming resource handler for NetBird API.
"""

from typing import Any, Dict, List

from ...models.cloud.event_streaming import EventStreamingCreate, EventStreamingUpdate
from ..base import BaseResource


[docs] class EventStreamingResource(BaseResource): """Handler for NetBird event streaming integration API endpoints."""
[docs] def list(self) -> List[Dict[str, Any]]: """List all event streaming integrations.""" data = self.client.get("integrations/event-streaming") return self._parse_list_response(data)
[docs] def create(self, integration_data: EventStreamingCreate) -> Dict[str, Any]: """Create an event streaming integration.""" data = self.client.post( "integrations/event-streaming", data=integration_data.model_dump(exclude_unset=True), ) return self._parse_response(data)
[docs] def get(self, integration_id: str) -> Dict[str, Any]: """Retrieve a specific event streaming integration.""" data = self.client.get(f"event-streaming/{integration_id}") return self._parse_response(data)
[docs] def update( self, integration_id: str, integration_data: EventStreamingUpdate ) -> Dict[str, Any]: """Update an event streaming integration.""" data = self.client.put( f"event-streaming/{integration_id}", data=integration_data.model_dump(exclude_unset=True), ) return self._parse_response(data)
[docs] def delete(self, integration_id: str) -> None: """Delete an event streaming integration.""" self.client.delete(f"event-streaming/{integration_id}")