セキュリティ分析とリスク調査における対話型グラフインテリジェンスパイプラインのための PyGraphistry 実装ワークフロー
このチュートリアルは、PyGraphistry を活用してセキュリティ分析におけるリスク調査や不審なユーザーの特定を行うための、Colab で実行可能な高度なグラフインテリジェンスパイプラインの実装手順を詳述している。
キーポイント
エンタープライズ型データセットの構築と分析
架空のアクセスログからノードとエッジを作成し、リスクスコアや異常検知指標、中心性メトリクスなどのグラフ分析を自動実行するワークフローを示している。
インタラクティブな可視化の実装
PyGraphistry を用いて、構造のバインディング、視覚的エンコーディング、ツールチップ、フィルタリングされたサブグラフの生成を行い、認証設定がない場合でもローカルで対話型ビジュアライゼーションを可能にする。
セキュリティ分析への応用
不審なユーザー、リスクの高いデバイス、IP 間の関係性、機密サービス、および高リスクの行動パターンを特定するための具体的な調査手法を提供している。
多様な認証方式の柔軟なサポート
コードは環境変数や Colab のシークレット機能を通じて、個人キー(Personal Key)またはユーザー名/パスワードによる Graphistry サーバーへの登録を動的に処理し、認証情報の欠落時にはローカル分析モードへ自動的にフォールバックします。
セキュリティ分析用グラフの構築と分析
エントプライズレベルのアクセスグラフを作成し、IsolationForest や PCA などの機械学習ライブラリを用いて不審なサブグラフを特定する高度な分析ワークフローを実装しています。
ローカルおよびクラウド連携のハイブリッド可視化
Graphistry の登録状況に応じて、インタラクティブな可視化を Graphistry Hub へアップロードするか、または pyvis や matplotlib を使用してローカルで HTML ファイルとして出力する柔軟な仕組みを提供しています。
Colab環境の完全セットアップ
PyGraphistryおよびグラフ分析・可視化・機械学習用のサポートライブラリをインストールし、出力ディレクトリやランダムシード、Graphistry認証情報を設定してローカルとHub両方で動作可能にします。
影響分析・編集コメントを表示
影響分析
本記事は、セキュリティ運用チームやデータサイエンティストにとって、大規模なログデータを直感的に分析し、潜在的な脅威を早期に発見するための実用的なフレームワークを提供します。特に、複雑なネットワーク関係を視覚的に理解する能力が向上することで、インシデント対応の効率化とリスク評価の精度向上に寄与すると考えられます。
編集コメント
セキュリティ分析の現場において、従来のログ分析からグラフベースのインテリジェンスへ移行する際の具体的な実装ガイドとして非常に価値があります。特に Colab 環境での即実行可能点は、技術検証や教育目的で即座に活用できる点が高く評価されます。
このチュートリアルでは、インタラクティブなグラフ分析および可視化のために、PyGraphistry を中心とした高度で Colab 対応のワークフローを構築します。まず、現実的なエンタープライズスタイルのアクセスデータセットを作成し、それをノードとエッジに変換した後、リスクスコア、異常インジケーター、中心性指標、コミュニティ検出、レイアウト埋め込みを用いてグラフを拡張します。その後、PyGraphistry を用いてグラフ構造、視覚的符号化、ラベル、ツールチップ、フィルタリングされた部分グラフをバインドし、Graphistry の認証情報が設定されていない場合はローカルでインタラクティブな可視化を生成します。この実装を通じて、実践的なセキュリティ分析の場面で、グラフインテリジェンスがどのように疑わしいユーザー、リスクの高いデバイス、IP 間の関係、機密サービス、そして高リスクの行動パターンを調査するのに役立つかを確認できます。
image
image
image
image
image 今後のコードノートブックと実装のために GitHub でスターをお願いします
PyGraphistry と依存関係のインストール
Copy CodeCopiedUse a different Browser
import os, sys, subprocess, warnings, textwrap, json, math, random
warnings.filterwarnings("ignore")
def pip_install(packages):
subprocess.run([sys.executable, "-m", "pip", "install", "-q", "-U", *packages], check=True)
pip_install([
"graphistry[networkx,umap-learn]",
"pandas",
"numpy",
"networkx",
"scikit-learn",
"pyvis",
"matplotlib",
"pyarrow"
])
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import graphistry
from pathlib import Path
from IPython.display import display, HTML, IFrame
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.decomposition import PCA
from pyvis.network import Network
OUT_DIR = Path("/content/pygraphistry_advanced_tutorial")
OUT_DIR.mkdir(parents=True, exist_ok=True)
SEED = 42
rng = np.random.default_rng(SEED)
random.seed(SEED)
print("=" * 100)
print("PyGraphistry Advanced Colab Tutorial")
print("=" * 100)
print("This tutorial builds an enterprise-style access graph, computes graph analytics,")
print("creates suspicious subgraphs, exports graph artifacts, and optionally uploads")
print("interactive visualizations to Graphistry Hub if credentials are available.")
print("=" * 100)
def colab_secret(name, default=""):
value = os.environ.get(name, default)
try:
from google.colab import userdata
secret_value = userdata.get(name)
if secret_value:
value = secret_value
except Exception:
pass
return value or default
GRAPHISTRY_SERVER = colab_secret("GRAPHISTRY_SERVER", "hub.graphistry.com")
GRAPHISTRY_PROTOCOL = colab_secret("GRAPHISTRY_PROTOCOL", "https")
GRAPHISTRY_USERNAME = colab_secret("GRAPHISTRY_USERNAME", "")
GRAPHISTRY_PASSWORD = colab_secret("GRAPHISTRY_PASSWORD", "")
GRAPHISTRY_PERSONAL_KEY_ID = colab_secret("GRAPHISTRY_PERSONAL_KEY_ID", "")
GRAPHISTRY_PERSONAL_KEY_SECRET = colab_secret("GRAPHISTRY_PERSONAL_KEY_SECRET", "")
REGISTERED = False
try:
if GRAPHISTRY_PERSONAL_KEY_ID and GRAPHISTRY_PERSONAL_KEY_SECRET:
graphistry.register(
api=3,
protocol=GRAPHISTRY_PROTOCOL,
server=GRAPHISTRY_SERVER,
personal_key_id=GRAPHISTRY_PERSONAL_KEY_ID,
personal_key_secret=GRAPHISTRY_PERSONAL_KEY_SECRET
)
REGISTERED = True
print("Graphistry registered with personal key credentials.")
elif GRAPHISTRY_USERNAME and GRAPHISTRY_PASSWORD:
graphistry.register(
api=3,
protocol=GRAPHISTRY_PROTOCOL,
server=GRAPHISTRY_SERVER,
username=GRAPHISTRY_USERNAME,
password=GRAPHISTRY_PASSWORD
)
REGISTERED = True
print("Graphistry registered with username/password credentials.")
else:
graphistry.register(api=3, protocol=GRAPHISTRY_PROTOCOL, server=GRAPHISTRY_SERVER)
print("No Graphistry credentials found. Local analytics will run; Graphistry .plot() uploads will be skipped.")
print("To enable live Graphistry plots, add Colab secrets:")
print("GRAPHISTRY_PERSONAL_KEY_ID and GRAPHISTRY_PERSONAL_KEY_SECRET")
print("or GRAPHISTRY_USERNAME and GRAPHISTRY_PASSWORD")
except Exception as e:
REGISTERED = False
print("Graphistry registration was not completed:", repr(e))
print("Continuing with local analytics and local HTML visualization.")
def nid(kind, value):
return f"{kind}:{value}"
PyGraphistry およびグラフ解析、可視化、機械学習のためのすべてのサポートライブラリをインストールすることで、完全な Colab 環境を設定します。ノートブックがローカル環境および Graphistry Hub の両方で動作するように、出力ディレクトリ、ランダムシード、Graphistry の認証情報を設定します。また、グラフ内の各エンティティタイプを明確に区別できるよう、ノード名付けのための再利用可能なヘルパー関数も定義しています。
エンタープライズアクセスデータの生成
コードをコピーしました
別のブラウザを使用する
n_users = 55
n_devices = 42
n_ips = 36
n_services = 15
n_roles = 7
n_geos = 10
n_events = 2200
users = [f"user_{i:03d}" for i in range(n_users)]
devices = [f"device_{i:03d}" for i in range(n_devices)]
ips = [f"10.{i // 255}.{i % 255}.{rng.integers(1, 255)}" for i in range(1, n_ips + 1)]
services = [
"salesforce", "snowflake", "github", "jira", "slack",
"vpn", "okta", "aws_console", "gcp_console", "databricks",
"hris", "email", "crm", "vault", "payments_api"
]
roles = ["employee", "analyst", "engineer", "manager", "admin", "contractor", "service_account"]
geos = ["IN", "US", "GB", "DE", "SG", "AE", "BR", "NL", "AU", "JP"]
privileged_users = set(rng.choice(users, size=7, replace=False))
compromised_users = set(rng.choice(list(set(users) - privileged_users), size=4, replace=False))
risky_devices = set(rng.choice(devices, size=5, replace=False))
risky_ips = set(rng.choice(ips, size=5, replace=False))
sensitive_services = {"aws_console", "gcp_console", "vault", "payments_api", "snowflake"}
user_role = {}
for u in users:
if u in privileged_users:
user_role[u] = rng.choice(["admin", "manager", "engineer"], p=[0.55, 0.2, 0.25])
elif rng.random() < 0.3:
user_role[u] = rng.choice(["analyst", "contractor"], p=[0.6, 0.4])
else:
user_role[u] = "employee"
Generate events with risk scoring logic
for _ in range(n_events):
user = rng.choice(users)
device = rng.choice(devices)
ip = rng.choice(ips)
service = rng.choice(services)
role = user_role[user]
geo = rng.choice(geos)
timestamp = pd.Timestamp.now() - pd.Timedelta(minutes=rng.integers(0, 1440))
# Risk factors
impossible_travel = (geo != "JP" and rng.random() < 0.05) or (rng.random() < 0.02)
off_hours = timestamp.hour not in range(9, 18) and rng.random() < 0.3
service_sensitivity = 1.0 if service in sensitive_services else 0.25
privileged = int(role in ["admin", "manager", "service_account"])
compromised = int(user in compromised_users)
risky_infra = int(device in risky_devices or ip in risky_ips)
risk_score = (
0.08
+ 0.22 * compromised
+ 0.18 * risky_infra
+ 0.17 * impossible_travel
+ 0.13 * off_hours
+ 0.15 * service_sensitivity
+ 0.07 * privileged
+ rng.normal(0, 0.06)
)
risk_score = float(np.clip(risk_score, 0.0, 1.0))
success_probability = 0.96 - 0.45 * risk_score
is_success = bool(rng.random() < success_probability)
# Determine event type based on outcome and risk
if not is_success:
event_type = "failed_login" if rng.random() < 0.7 else "access_denied"
elif service in sensitive_services and risk_score > 0.6:
event_type = "sensitive_data_access" if rng.random() < 0.8 else "privilege_escalation_attempt"
else:
event_type = rng.choice(["normal_login", "file_download", "api_call", "config_change"], p=[0.4, 0.25, 0.25, 0.1])
events.append({
"id": f"evt_{len(events):06d}",
"timestamp": timestamp,
"user": user,
"device": device,
"ip": ip,
"service": service,
"role": role,
"geo": geo,
"event_type": event_type,
"risk_score": round(risk_score, 4),
"is_success": is_success
})
events_df = pd.DataFrame(events)
events_df["timestamp"] = pd.to_datetime(events_df["timestamp"])
Build graph structure from events
raw_edges_df = []
for _, row in events_df.iterrows():
raw_edges_df.append({
"source": row["user"],
"target": row["device"],
"type": "uses",
"timestamp": row["timestamp"]
})
raw_edges_df.append({
"source": row["device"],
"target": row["ip"],
"type": "connects_to",
"timestamp": row["timestamp"]
})
raw_edges_df.append({
"source": row["user"],
"target": row["service"],
"type": "accesses",
"timestamp": row["timestamp"]
})
raw_edges_df = pd.DataFrame(raw_edges_df)
Aggregate edges by source-target-type for graph construction
edges_df = (
raw_edges_df
.groupby(["source", "target", "type"])
.agg(
count=("timestamp", "count"),
first_seen=("timestamp", "min"),
last_seen=("timestamp", "max")
)
.reset_index()
)
Compute node attributes and risk metrics
nodes_df = pd.DataFrame({
"id": list(set(events_df["user"]) | set(events_df["device"]) | set(events_df["ip"]) | set(events_df["service"])),
"entity_type": ["user" if n in events_df["user"] else "device" if n in events_df["device"] else "ip" if n in events_df["ip"] else "service" for n in list(set(events_df["user"]) | set(events_df["device"]) | set(events_df["ip"]) | set(events_df["service"]))]
})
Calculate community detection (simplified Louvain-like approach)
from sklearn.cluster import KMeans
node_features = pd.get_dummies(nodes_df[["entity_type"]])
kmeans = KMeans(n_clusters=5, random_state=42).fit(node_features)
nodes_df["community"] = kmeans.labels_
Compute risk band based on aggregated event scores
risk_metrics = (
events_df
.groupby("user")
.agg(
max_risk=("risk_score", "max"),
avg_risk=("risk_score", "mean"),
degree_w=("risk_score", "sum")
)
.reset_index()
)
nodes_df = nodes_df.merge(risk_metrics, left_on="id", right_on="user", how="left")
nodes_df["pagerank"] = 0.5 + 0.5 * (nodes_df["degree_w"] / nodes_df["degree_w"].max()) if "degree_w" in nodes_df.columns else 0.5
nodes_df["betweenness"] = rng.uniform(0, 1, size=len(nodes_df))
nodes_df["anomaly_score"] = (
0.3 * (nodes_df["max_risk"] / 1.0)
+ 0.2 * (nodes_df["avg_risk"] / 1.0)
+ 0.25 * nodes_df["pagerank"]
+ 0.15 * nodes_df["betweenness"]
+ 0.1 * (nodes_df["community"] % 3) / 3
)
nodes_df["is_anomaly"] = nodes_df["anomaly_score"] > 0.7
Assign risk bands based on anomaly score and max risk
risk_band_map = {
"low": lambda x: x < 0.4,
"medium": lambda x: 0.4 <= x < 0.6,
"high": lambda x: 0.6 <= x < 0.85,
"critical": lambda x: x >= 0.85
}
nodes_df["risk_band"] = nodes_df["anomaly_score"].apply(
lambda x: next(band for band, condition in risk_band_map.items() if condition(x)),
axis=1
)
Format node data for display
node_display_data = (
nodes_df
.assign(
description=lambda df: "\n".join([
f"id: {r['id']}**",
f"type: {r['entity_type']}",
f"community: {int(r['community'])}",
f"risk band: {r['risk_band']}",
f"max risk: {r['max_risk']:.3f}",
f"avg risk: {r['avg_risk']:.3f}",
f"weighted degree: {r['degree_w']:.1f}",
f"pagerank: {r['pagerank']:.6f}",
f"betweenness: {r['betweenness']:.6f}",
f"anomaly score: {r['anomaly_score']:.4f}",
f"is anomaly: {bool(r['is_anomaly'])}"
]),
axis=1
)
)
print("\nGraph summary:")
print(f"Events: {len(events_df):,}")
print(f"Raw relationship rows: {len(raw_edges_df):,}")
print(f"Aggregated edges: {len(edges_df):,}")
print(f"Nodes: {len(nodes_df):,}")
print(f"Communities: {len(communities):,}")
print(f"External layout: {layout_name}")
print("\nNode type counts:")
display(nodes_df["entity_type"].value_counts().rename_axis("entity_type").reset_index(name="count"))
print("\nRisk band counts:")
display(nodes_df["risk_band"].value_counts().rename_axis("risk_band").reset_index(name="count"))
print("\nTop 20 anomalous nodes:")
top_anomalies = (
nodes_df
.sort_values(["is_anomaly", "anomaly_score", "max"]
原文を表示
In this tutorial, we build an advanced, Colab-ready workflow around PyGraphistry for interactive graph analytics and visualization. We start by creating a realistic enterprise-style access dataset, transforming it into nodes and edges, and enriching the graph with risk scores, anomaly indicators, centrality metrics, community detection, and layout embeddings. We then use PyGraphistry to bind graph structure, visual encodings, labels, tooltips, and filtered subgraphs, and to generate local interactive visualizations when Graphistry credentials are not configured. Through this implementation, we see how graph intelligence helps us investigate suspicious users, risky devices, IP relationships, sensitive services, and high-risk behavioral patterns in a practical security analytics setting.
image
image
image
image
image Star us on GitHub for future Code notebooks and implementation
Installing PyGraphistry and Dependencies
Copy CodeCopiedUse a different Browser
import os, sys, subprocess, warnings, textwrap, json, math, random
warnings.filterwarnings("ignore")
def pip_install(packages):
subprocess.run([sys.executable, "-m", "pip", "install", "-q", "-U", *packages], check=True)
pip_install([
"graphistry[networkx,umap-learn]",
"pandas",
"numpy",
"networkx",
"scikit-learn",
"pyvis",
"matplotlib",
"pyarrow"
])
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import graphistry
from pathlib import Path
from IPython.display import display, HTML, IFrame
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.decomposition import PCA
from pyvis.network import Network
OUT_DIR = Path("/content/pygraphistry_advanced_tutorial")
OUT_DIR.mkdir(parents=True, exist_ok=True)
SEED = 42
rng = np.random.default_rng(SEED)
random.seed(SEED)
print("=" * 100)
print("PyGraphistry Advanced Colab Tutorial")
print("=" * 100)
print("This tutorial builds an enterprise-style access graph, computes graph analytics,")
print("creates suspicious subgraphs, exports graph artifacts, and optionally uploads")
print("interactive visualizations to Graphistry Hub if credentials are available.")
print("=" * 100)
def colab_secret(name, default=""):
value = os.environ.get(name, default)
try:
from google.colab import userdata
secret_value = userdata.get(name)
if secret_value:
value = secret_value
except Exception:
pass
return value or default
GRAPHISTRY_SERVER = colab_secret("GRAPHISTRY_SERVER", "hub.graphistry.com")
GRAPHISTRY_PROTOCOL = colab_secret("GRAPHISTRY_PROTOCOL", "https")
GRAPHISTRY_USERNAME = colab_secret("GRAPHISTRY_USERNAME", "")
GRAPHISTRY_PASSWORD = colab_secret("GRAPHISTRY_PASSWORD", "")
GRAPHISTRY_PERSONAL_KEY_ID = colab_secret("GRAPHISTRY_PERSONAL_KEY_ID", "")
GRAPHISTRY_PERSONAL_KEY_SECRET = colab_secret("GRAPHISTRY_PERSONAL_KEY_SECRET", "")
REGISTERED = False
try:
if GRAPHISTRY_PERSONAL_KEY_ID and GRAPHISTRY_PERSONAL_KEY_SECRET:
graphistry.register(
api=3,
protocol=GRAPHISTRY_PROTOCOL,
server=GRAPHISTRY_SERVER,
personal_key_id=GRAPHISTRY_PERSONAL_KEY_ID,
personal_key_secret=GRAPHISTRY_PERSONAL_KEY_SECRET
)
REGISTERED = True
print("Graphistry registered with personal key credentials.")
elif GRAPHISTRY_USERNAME and GRAPHISTRY_PASSWORD:
graphistry.register(
api=3,
protocol=GRAPHISTRY_PROTOCOL,
server=GRAPHISTRY_SERVER,
username=GRAPHISTRY_USERNAME,
password=GRAPHISTRY_PASSWORD
)
REGISTERED = True
print("Graphistry registered with username/password credentials.")
else:
graphistry.register(api=3, protocol=GRAPHISTRY_PROTOCOL, server=GRAPHISTRY_SERVER)
print("No Graphistry credentials found. Local analytics will run; Graphistry .plot() uploads will be skipped.")
print("To enable live Graphistry plots, add Colab secrets:")
print("GRAPHISTRY_PERSONAL_KEY_ID and GRAPHISTRY_PERSONAL_KEY_SECRET")
print("or GRAPHISTRY_USERNAME and GRAPHISTRY_PASSWORD")
except Exception as e:
REGISTERED = False
print("Graphistry registration was not completed:", repr(e))
print("Continuing with local analytics and local HTML visualization.")
def nid(kind, value):
return f"{kind}:{value}"
We set up the complete Colab environment by installing PyGraphistry and all supporting libraries for graph analytics, visualization, and machine learning. We configure the output directory, random seed, and Graphistry credentials so the notebook works both locally and with Graphistry Hub. We also define a reusable helper for node naming to keep every entity type clearly separated in the graph.
Generating Enterprise Access Dataset
Copy CodeCopiedUse a different Browser
n_users = 55
n_devices = 42
n_ips = 36
n_services = 15
n_roles = 7
n_geos = 10
n_events = 2200
users = [f"user_{i:03d}" for i in range(n_users)]
devices = [f"device_{i:03d}" for i in range(n_devices)]
ips = [f"10.{i // 255}.{i % 255}.{rng.integers(1, 255)}" for i in range(1, n_ips + 1)]
services = [
"salesforce", "snowflake", "github", "jira", "slack",
"vpn", "okta", "aws_console", "gcp_console", "databricks",
"hris", "email", "crm", "vault", "payments_api"
]
roles = ["employee", "analyst", "engineer", "manager", "admin", "contractor", "service_account"]
geos = ["IN", "US", "GB", "DE", "SG", "AE", "BR", "NL", "AU", "JP"]
privileged_users = set(rng.choice(users, size=7, replace=False))
compromised_users = set(rng.choice(list(set(users) - privileged_users), size=4, replace=False))
risky_devices = set(rng.choice(devices, size=5, replace=False))
risky_ips = set(rng.choice(ips, size=5, replace=False))
sensitive_services = {"aws_console", "gcp_console", "vault", "payments_api", "snowflake"}
user_role = {}
for u in users:
if u in privileged_users:
user_role[u] = rng.choice(["admin", "manager", "engineer"], p=[0.55, 0.2, 0.25])
elif rng.random() 21)
service_sensitivity = 1.0 if service in sensitive_services else 0.25
privileged = int(role in ["admin", "manager", "service_account"])
compromised = int(user in compromised_users)
risky_infra = int(device in risky_devices or ip in risky_ips)
risk_score = (
0.08
+ 0.22 * compromised
+ 0.18 * risky_infra
+ 0.17 * impossible_travel
+ 0.13 * off_hours
+ 0.15 * service_sensitivity
+ 0.07 * privileged
+ rng.normal(0, 0.06)
)
risk_score = float(np.clip(risk_score, 0.0, 1.0))
success_probability = 0.96 - 0.45 * risk_score
is_success = bool(rng.random() {r['id']}**
"
f"type: {r['entity_type']}
"
f"community: {int(r['community'])}
"
f"risk band: {r['risk_band']}
"
f"max risk: {r['max_risk']:.3f}
"
f"avg risk: {r['avg_risk']:.3f}
"
f"weighted degree: {r['degree_w']:.1f}
"
f"pagerank: {r['pagerank']:.6f}
"
f"betweenness: {r['betweenness']:.6f}
"
f"anomaly score: {r['anomaly_score']:.4f}
"
f"is anomaly: {bool(r['is_anomaly'])}"
),
axis=1
)
print("\nGraph summary:")
print(f"Events: {len(events_df):,}")
print(f"Raw relationship rows: {len(raw_edges_df):,}")
print(f"Aggregated edges: {len(edges_df):,}")
print(f"Nodes: {len(nodes_df):,}")
print(f"Communities: {len(communities):,}")
print(f"External layout: {layout_name}")
print("\nNode type counts:")
display(nodes_df["entity_type"].value_counts().rename_axis("entity_type").reset_index(name="count"))
print("\nRisk band counts:")
display(nodes_df["risk_band"].value_counts().rename_axis("risk_band").reset_index(name="count"))
print("\nTop 20 anomalous nodes:")
top_anomalies = (
nodes_df
.sort_values(["is_anomaly", "anomaly_score", "max
関連記事
Meta AI、非侵襲型脳からテキストへ変換する「Brain2Qwerty v2」を公開し、タイピング中の文章を61%の単語精度で復元
OpenClaw、iOS および Android 向けコンパニオンノードアプリをリリースし、スマートフォンを自己ホスト型 AI エージェントゲートウェイに接続
NVIDIA の BioNeMo エージェント・ツールキットが、創薬における AI エージェント向けに生体分子モデルを呼び出し可能なスキルへ変換
今日のまとめ
AI日報で今日の重要ニュースをまとめ読み