PyGraphistry Implementation Workflow for Interactive Graph Intelligence Pipelines in Security Analytics and Risk Investigation

PyGraphistry Implementation Workflow for Interactive Graph Intelligence Pipelines in Security Analytics and Risk Investigation


base_g = (
   graphistry
   .bind(source="src", destination="dst", node="id")
   .edges(edges_df)
   .nodes(nodes_df)
   .bind(
       edge="edge_id",
       edge_title="edge_title",
       edge_label="edge_label",
       edge_weight="event_count",
       edge_size="edge_size",
       point_title="point_title",
       point_label="label",
       point_color="node_color",
       point_size="node_size",
       point_x="x",
       point_y="y"
   )
   .settings(url_params={"play": 0, "info": "true"})
)
print("\nConstructed a PyGraphistry Plotter named base_g.")
print("It binds src/dst edges, node attributes, titles, labels, sizes, colors, and external x/y layout.")
try:
   dot_text = base_g.plot_static(engine="graphviz-dot", reuse_layout=True)
   dot_path = OUT_DIR / "graph_static.dot"
   with open(dot_path, "w") as f:
       f.write(dot_text if isinstance(dot_text, str) else str(dot_text))
   print("Saved DOT representation:", dot_path)
except Exception as e:
   print("Static DOT export skipped:", repr(e))
def show_pyvis(nodes, edges, output_path, height="780px"):
   nodes_small = nodes.copy()
   edges_small = edges.copy()
   max_nodes = 320
   if len(nodes_small) > max_nodes:
       keep = set(
           nodes_small
           .sort_values(["is_anomaly", "anomaly_score", "max_risk", "pagerank"], ascending=[False, False, False, False])
           .head(max_nodes)["id"]
       )
       nodes_small = nodes_small[nodes_small["id"].isin(keep)]
       edges_small = edges_small[edges_small["src"].isin(keep) & edges_small["dst"].isin(keep)]
   net = Network(
       height=height,
       width="100%",
       directed=True,
       notebook=True,
       cdn_resources="in_line"
   )
   net.barnes_hut(gravity=-25000, central_gravity=0.2, spring_length=160, spring_strength=0.04, damping=0.92)
   for row in nodes_small.itertuples(index=False):
       title = str(row.point_title).replace("
", "\n").replace("", "").replace("", "") net.add_node( row.id, label=str(row.label), title=title, group=str(row.entity_type), value=float(row.node_size) ) for row in edges_small.itertuples(index=False): title = str(row.edge_title).replace("
", "\n").replace("", "").replace("", "") net.add_edge( row.src, row.dst, title=title, label=str(row.relation) if row.max_risk >= 0.90 else "", value=float(max(1.0, row.edge_size)) ) net.write_html(str(output_path), notebook=False) display(HTML(filename=str(output_path))) print("Saved local interactive HTML:", output_path) local_full_html = OUT_DIR / "local_full_graph.html" show_pyvis(nodes_df, edges_df, local_full_html) seed_node = ( nodes_df .sort_values(["is_anomaly", "anomaly_score", "max_risk", "pagerank"], ascending=[False, False, False, False]) .iloc[0]["id"] ) ego = nx.ego_graph(G.to_undirected(), seed_node, radius=2) ego_nodes = set(ego.nodes()) ego_edges_df = edges_df[edges_df["src"].isin(ego_nodes) & edges_df["dst"].isin(ego_nodes)].copy() ego_nodes_df = nodes_df[nodes_df["id"].isin(ego_nodes)].copy() print("\nFocused investigation seed node:", seed_node) print(f"Ego subgraph nodes: {len(ego_nodes_df):,}") print(f"Ego subgraph edges: {len(ego_edges_df):,}") display( ego_nodes_df .sort_values(["is_anomaly", "anomaly_score", "max_risk"], ascending=[False, False, False]) [["id", "entity_type", "risk_band", "is_anomaly", "anomaly_score", "max_risk", "degree_w", "pagerank", "community"]] .head(30) ) ego_g = ( graphistry .bind(source="src", destination="dst", node="id") .edges(ego_edges_df) .nodes(ego_nodes_df) .bind( edge="edge_id", edge_title="edge_title", edge_label="edge_label", edge_weight="event_count", edge_size="edge_size", point_title="point_title", point_label="label", point_color="node_color", point_size="node_size", point_x="x", point_y="y" ) .settings(url_params={"play": 0, "info": "true"}) ) local_ego_html = OUT_DIR / "local_ego_investigation_graph.html" show_pyvis(ego_nodes_df, ego_edges_df, local_ego_html) risky_edges_df = edges_df[ (edges_df["max_risk"] >= 0.85) | (edges_df["failed_count"] >= edges_df["failed_count"].quantile(0.95)) | (edges_df["impossible_travel_count"] > 0) ].copy() risky_node_ids = set(risky_edges_df["src"]).union(set(risky_edges_df["dst"])) risky_nodes_df = nodes_df[nodes_df["id"].isin(risky_node_ids)].copy() risky_g = ( graphistry .bind(source="src", destination="dst", node="id") .edges(risky_edges_df) .nodes(risky_nodes_df) .bind( edge="edge_id", edge_title="edge_title", edge_label="edge_label", edge_weight="event_count", edge_size="edge_size", point_title="point_title", point_label="label", point_color="node_color", point_size="node_size", point_x="x", point_y="y" ) .settings(url_params={"play": 0, "info": "true"}) ) print("\nHigh-risk filtered graph:") print(f"Risky nodes: {len(risky_nodes_df):,}") print(f"Risky edges: {len(risky_edges_df):,}") local_risky_html = OUT_DIR / "local_high_risk_graph.html" show_pyvis(risky_nodes_df, risky_edges_df, local_risky_html)



Source link