Source code for netbird.network_map

"""
NetBird Network Map Generator

This module provides functionality to generate enriched network maps from NetBird API
data. It enriches networks with detailed resource and policy information for
visualization purposes.
"""

from typing import Any, Dict, List, Set, Tuple

from .client import APIClient
from .exceptions import NetBirdAPIError, NetBirdAuthenticationError


[docs] def generate_full_network_map( client: APIClient, include_routers: bool = True, include_policies: bool = True, include_resources: bool = True, ) -> List[Dict[str, Any]]: """ Generate a comprehensive network map with enriched data from NetBird API. This function fetches all networks and enriches them with: - Detailed resource information (replacing resource IDs with full objects) - Complete policy data (replacing policy IDs with full policy objects) - Router information with enhanced metadata Args: client: Authenticated NetBird API client include_routers: Whether to include router information (default: True) include_policies: Whether to include policy information (default: True) include_resources: Whether to include resource information (default: True) Returns: List of enriched network dictionaries containing full object data instead of just IDs. Raises: NetBirdAuthenticationError: If authentication fails NetBirdAPIError: If API requests fail Example: >>> from netbird import APIClient >>> from netbird.network_map import generate_full_network_map >>> >>> client = APIClient(host="api.netbird.io", api_token="your-token") >>> networks = generate_full_network_map(client) >>> >>> # Access enriched data >>> for network in networks: ... print(f"Network: {network['name']}") ... for resource in network.get('resources', []): ... print(f" Resource: {resource['name']} - {resource['address']}") ... for policy in network.get('policies', []): ... print(f" Policy: {policy['name']}") """ try: # List all networks networks = client.networks.list() if not networks: return [] # Enrich networks with detailed information enriched_networks = [] for network in networks: enriched_network = network.copy() # Enrich with detailed resource information if include_resources and "resources" in network and network["resources"]: try: detailed_resources = client.networks.list_resources(network["id"]) enriched_network["resources"] = detailed_resources except Exception as e: print( f"Warning: Could not fetch resources for network " f"{network['name']}: {e}" ) enriched_network["resources"] = [] elif not include_resources: enriched_network["resources"] = [] # Enrich with full policy objects if include_policies and "policies" in network and network["policies"]: detailed_policies = [] for policy_id in network["policies"]: try: policy_data = client.policies.get(policy_id) detailed_policies.append(policy_data) except Exception as e: print(f"Warning: Could not fetch policy {policy_id}: {e}") detailed_policies.append({"id": policy_id, "error": str(e)}) enriched_network["policies"] = detailed_policies elif not include_policies: enriched_network["policies"] = [] else: enriched_network["policies"] = [] # Enrich with detailed router information if include_routers and "routers" in network and network["routers"]: try: detailed_routers = client.networks.list_routers(network["id"]) enriched_routers = [] for i, router in enumerate(detailed_routers): enriched_router = { "name": f"{network['name']}-router-{i+1}", "enabled": router.get("enabled", True), "masquerade": router.get("masquerade", False), "metric": router.get("metric", 9999), "peer": router.get("peer", ""), "original_id": router.get("id", ""), } enriched_routers.append(enriched_router) enriched_network["routers"] = enriched_routers except Exception as e: print( f"Warning: Could not fetch routers for network " f"{network['name']}: {e}" ) enriched_network["routers"] = [] elif not include_routers: enriched_network["routers"] = [] enriched_networks.append(enriched_network) return enriched_networks except NetBirdAuthenticationError: raise NetBirdAuthenticationError( "Authentication failed. Please check your API token." ) except NetBirdAPIError as e: raise NetBirdAPIError(f"API Error: {e.message}", status_code=e.status_code) except Exception as e: raise NetBirdAPIError(f"Unexpected error while generating network map: {e}")
[docs] def get_network_topology_data( client: APIClient, optimize_connections: bool = True ) -> Dict[str, Any]: """ Generate network topology data optimized for visualization. This function creates a comprehensive data structure that includes: - All source groups from policies - Resource-to-group mappings - Connection mappings (both group-based and direct) - Optimized connection data to reduce visual clutter Args: client: Authenticated NetBird API client optimize_connections: Whether to optimize connections for visualization (default: True) Returns: Dictionary containing: - networks: List of enriched networks - all_source_groups: Set of all source group names - group_connections: Mapping of group-based connections - direct_connections: Mapping of direct resource connections - resource_mappings: Resource ID to node mappings - group_mappings: Group name to resource node mappings Example: >>> topology = get_network_topology_data(client) >>> print(f"Found {len(topology['all_source_groups'])} source groups") >>> print(f"Found {len(topology['group_connections'])} group connections") """ # Get enriched network data networks = generate_full_network_map(client) if optimize_connections: # Use the same optimization logic from the unified diagram return _collect_optimized_connections(networks) else: # Return raw data without optimization return { "networks": networks, "all_source_groups": set(), "group_connections": {}, "direct_connections": {}, "resource_mappings": {}, "group_mappings": {}, }
def _collect_optimized_connections(networks: List[Dict[str, Any]]) -> Dict[str, Any]: """ Internal function to collect and optimize connections for visualization. This reduces visual clutter by merging duplicate connections and organizing them by source group and destination. """ group_connections: Dict[Tuple[str, str], List[str]] = ( {} ) # {(source, dest_group): [policy_names]} direct_connections: Dict[Tuple[str, str], List[str]] = ( {} ) # {(source, dest_node): [policy_names]} all_source_groups: Set[str] = set() resource_id_to_node: Dict[str, str] = {} group_name_to_nodes: Dict[str, List[str]] = {} # First pass: collect all source groups and build mappings for network_idx, network in enumerate(networks): resources = network.get("resources", []) # Build resource mappings for res_idx, resource in enumerate(resources): resource_id = resource.get("id", None) resource_groups = resource.get("groups", []) resource_node_name = f"res_{network_idx}_{res_idx}" # Map resource ID to node if resource_id: resource_id_to_node[resource_id] = resource_node_name # Map group names to nodes if resource_groups: for group in resource_groups: if isinstance(group, dict): group_name = group.get("name") or group.get("id") or "Unknown" else: group_name = str(group) if group_name not in group_name_to_nodes: group_name_to_nodes[group_name] = [] group_name_to_nodes[group_name].append(resource_node_name) # Collect source groups from policies policies = network.get("policies", []) for policy in policies: if isinstance(policy, dict): rules = policy.get("rules", []) for rule in rules: sources = rule.get("sources", []) or [] for source in sources: if isinstance(source, dict): source_name = ( source.get("name") or source.get("id") or "Unknown" ) all_source_groups.add(source_name) else: all_source_groups.add(str(source)) # Second pass: collect connections for network in networks: policies = network.get("policies", []) for policy in policies: if isinstance(policy, dict): rules = policy.get("rules", []) for rule in rules: sources = rule.get("sources", []) or [] destinations = rule.get("destinations", []) or [] destination_resource = rule.get("destinationResource", {}) policy_name = policy.get("name", "Policy") # Get source group names source_names = [] for source in sources: if isinstance(source, dict): source_name = ( source.get("name") or source.get("id") or "Unknown" ) source_names.append(source_name) else: source_names.append(str(source)) # Collect group connections if destinations: for dest_group_obj in destinations: if isinstance(dest_group_obj, dict): dest_group_name = ( dest_group_obj.get("name") or dest_group_obj.get("id") or "Unknown" ) if dest_group_name in group_name_to_nodes: for source_name in source_names: key = (source_name, dest_group_name) if key not in group_connections: group_connections[key] = [] group_connections[key].append(policy_name) elif ( isinstance(dest_group_obj, str) and dest_group_obj in group_name_to_nodes ): for source_name in source_names: key = (source_name, dest_group_obj) if key not in group_connections: group_connections[key] = [] group_connections[key].append(policy_name) # Collect direct connections if isinstance(destination_resource, dict): dest_resource_id = destination_resource.get("id") if dest_resource_id and dest_resource_id in resource_id_to_node: dest_node = resource_id_to_node[dest_resource_id] for source_name in source_names: key = (source_name, dest_node) if key not in direct_connections: direct_connections[key] = [] direct_connections[key].append(policy_name) return { "networks": networks, "group_connections": group_connections, "direct_connections": direct_connections, "all_source_groups": all_source_groups, "resource_id_to_node": resource_id_to_node, "group_name_to_nodes": group_name_to_nodes, }