AIニュース最前線
最新ニュースAI日報Hacker日報週報動画AIツールトレンド企業

AIニュース最前線

世界中のAI最新情報を日本語で毎時更新

最新ニュース日報トレンド企業プレミアムRSS
© 2026 ainew.jp特定商取引法に基づく表記
ニュース一覧元記事を開く
MarkTechPost·2026年6月19日 11:44·約13分で読める

Salesforce CodeGen チュートリアル:ユニットテストと安全性チェック付きの Python 関数の生成・検証・再ランク付け

#Salesforce CodeGen#LLM#コード生成#単体テスト#静的解析
TL;DR

本記事は、Salesforce CodeGen モデルを用いて自然言語から Python 関数を生成する際、構文チェックや静的解析、単体テストによる検証、および候補の再ランク付けを行う高度なエンドツーエンドワークフローの実装方法を詳述している。

AI深層分析2026年6月19日 12:02
3
注目/ 5段階
深度40%
4
関連度30%
4
実用性20%
5
革新性10%
2

キーポイント

1

高度なコード生成パイプラインの実装

単純な推論を超え、関数抽出、構文チェック、静的安全性確認を組み合わせた多段階の生成プロセスを構築する手法を示している。

2

品質保証のための自動検証システム

生成されたコードに対して単体テストを実行し、その結果に基づいて候補を再ランク付け(Best-of-N)することで、実用可能なコードの精度を高めるアプローチが含まれている。

3

多様なモデルと環境での実験

Salesforce CodeGen の異なるバージョン(350M, 2B, 7B など)を選択可能にし、ベンチマークの可視化やアートのエクスポートを含む包括的な実験フレームワークを提供している。

4

実用的なコード生成ツールの構築

Hugging Face からモデルを読み込み、Python のライブラリ(radon, ast, multiprocessing など)を活用して、安全で検証済みのコードを自動生成するスクリプトの具体例を示している。

5

環境設定とモデルロードの最適化

CUDA の有無に応じて自動でデバイス(GPU/CPU)とデータ型(float16/float32)を切り替え、メモリ使用量を最適化する仕組みを実装しています。

6

柔軟なモデル選択機能

環境変数やコード内の辞書定義を通じて、CodeGen 350M から 7B モデルまで複数のサイズを簡単に切り替えて実行できる設定を提供しています。

7

環境構築とモデルロード

必要なライブラリのインストール、ランタイム確認、GPU の可用性検出を行い、Hugging Face から CodeGen モデルとトークナイザーをロードして実行環境を整備します。

影響分析・編集コメントを表示

影響分析

この記事は、AI によるコード生成が単なる「アイデア出し」の段階から、実際の開発現場で即座に使用可能な「実装」へと移行するための重要なステップを示しています。特に、生成されたコードの安全性と正しさを自動検証するワークフローを提示することで、LLM を活用したソフトウェア開発の信頼性を高める具体的な指針を提供しており、エンジニアや研究者にとって実践的な価値が高いです。

編集コメント

コード生成 AI の実用化において最も懸念される「ハルシネーション(誤ったコードの生成)」や「セキュリティリスク」を、自動化されたテストと検証プロセスで解決するアプローチは非常に示唆に富んでいます。

本チュートリアルでは、Salesforce CodeGen のエンドツーエンドのワークフローを実装します。Hugging Face から CodeGen モデルを読み込み、コード生成のために準備し、自然言語のプロンプトから Python 関数を生成するために使用します。その後、基本推論を超えて、関数抽出、構文チェック、静的安全性チェック、ユニットテストに基づく検証、Best-of-N 候補の再ランク付け、多段階プログラム合成、プロンプルスタイルの実験、ベンチマーク可視化、およびアーティファクトエクスポートを追加します。このワークフローを通じて、CodeGen がコード補完モデルとしてだけでなく、生成されたソリューションを評価し、フィルタリングし、整理する構造化されたコード生成パイプラインの一部としても使用できる方法を学びます。

Hugging Face からの Salesforce CodeGen モデルの読み込み

コピー コピー済み ブラウザを変更する

import os, sys, subprocess, textwrap, json, re, time, math, ast, tempfile, multiprocessing as mp

from pathlib import Path

def sh(cmd):

print(f"\n$ {cmd}")

subprocess.run(cmd, shell=True, check=True)

sh(f"{sys.executable} -m pip install -q -U transformers accelerate safetensors einops datasets evaluate pandas matplotlib tqdm rich radon tiktoken")

import torch

import pandas as pd

import matplotlib.pyplot as plt

from tqdm.auto import tqdm

from rich import print

from rich.panel import Panel

from rich.syntax import Syntax

from transformers import AutoTokenizer, AutoModelForCausalLM, set_seed

from radon.complexity import cc_visit

OUT_DIR = Path("/content/codegen_advanced_tutorial")

OUT_DIR.mkdir(parents=True, exist_ok=True)

set_seed(42)

print(Panel.fit("Salesforce CodeGen Advanced Tutorial", style="bold green"))

print("\nRuntime information")

print("Python:", sys.version.split()[0])

print("Torch:", torch.__version__)

print("CUDA available:", torch.cuda.is_available())

if torch.cuda.is_available():

print("GPU:", torch.cuda.get_device_name(0))

print("CUDA memory GB:", round(torch.cuda.get_device_properties(0).total_memory / 1e9, 2))

MODEL_ID = os.environ.get("CODEGEN_MODEL_ID", "Salesforce/codegen-350M-mono")

MODEL_OPTIONS = {

"easy_colab_default": "Salesforce/codegen-350M-mono",

"larger_codegen1": "Salesforce/codegen-2B-mono",

"codegen2_1b": "Salesforce/codegen2-1B_P",

"codegen25_7b_mono": "Salesforce/codegen25-7b-mono_P",

}

print("\nSelected model:", MODEL_ID)

print("Available model examples:", MODEL_OPTIONS)

trust_remote_code = any(x in MODEL_ID.lower() for x in ["codegen2", "codegen25"])

device = "cuda" if torch.cuda.is_available() else "cpu"

dtype = torch.float16 if torch.cuda.is_available() else torch.float32

print("\nLoading tokenizer...")

tokenizer = AutoTokenizer.from_pretrained(

MODEL_ID,

trust_remote_code=trust_remote_code

)

if tokenizer.pad_token is None:

tokenizer.pad_token = tokenizer.eos_token

print("Loading model...")

load_kwargs = {

"trust_remote_code": trust_remote_code,

"low_cpu_mem_usage": True,

}

if torch.cuda.is_available():

load_kwargs["torch_dtype"] = dtype

load_kwargs["device_map"] = "auto"

else:

load_kwargs["torch_dtype"] = torch.float32

model = AutoModelForCausalLM.from_pretrained(MODEL_ID, **load_kwargs)

if not torch.cuda.is_available():

model.to(device)

model.eval()

def count_parameters(model):

return sum(p.numel() for p in model.parameters())

print(f"Loaded {MODEL_ID}")

print(f"Parameter count: {count_parameters(model)/1e6:.1f}M")

def generate_text(

prompt,

max_new_tokens=180,

temperature=0.35,

top_p=0.92,

top_k=50,

do_sample=True,

num_return_sequences=1,

repetition_penalty=1.05,

):

inputs = tokenizer(prompt, return_tensors="pt")

inputs = {k: v.to(model.device) for k, v in inputs.items()}

with torch.no_grad():

outputs = model.generate(

**inputs,

max_new_tokens=max_new_tokens,

do_sample=do_sample,

temperature=temperature,

top_p=top_p,

top_k=top_k,

num_return_sequences=num_return_sequences,

repetition_penalty=repetition_penalty,

pad_token_id=tokenizer.eos_token_id,

eos_token_id=tokenizer.eos_token_id,

)

decoded = tokenizer.batch_decode(outputs, skip_special_tokens=True)

return decoded

def print_code(title, code):

print(Panel.fit(title, style="bold cyan"))

print(Syntax(code, "python", theme="monokai", line_numbers=True))

Salesforce CodeGen の実行環境を整備し、必要なライブラリをすべてインストールします。ランタイムを確認し、GPU の利用可能性を検出してから、CodeGen モデルを選択し、Hugging Face からトークナイザーとモデルをロードします。また、その後のチュートリアルをより理解しやすくするために、テキスト生成や整形されたコードの表示を行うためのヘルパー関数も定義しています。

抽出、安全性チェック、およびユニットテスト検証のためのユーティリティ構築

コピー コードをコピーしました別のブラウザを使用してください

def extract_function_source(full_text, function_name):

text = full_text.replace("\r\n", "\n")

fence = re.search(r"`(?:python)?\n(.*?)`", text, flags=re.S | re.I)

if fence:

text = fence.group(1)

pattern = rf"^def\s+{re.escape(function_name)}\s*\("

match = re.search(pattern, text, flags=re.M)

if not match:

return ""

chunk = text[match.start():]

lines = chunk.splitlines()

collected = []

for i, line in enumerate(lines):

if i > 0:

if line.startswith("def ") or line.startswith("class "):

break

if line.startswith("if __name__"):

break

if line and not line.startswith((" ", "\t", "#")) and re.match(r"^[A-Za-z_][A-Za-z0-9_]*\s*=", line):

break

collected.append(line)

source = "\n".join(collected).rstrip()

try:

ast.parse(source)

return source

except SyntaxError:

fixed_lines = []

for line in collected:

fixed_lines.append(line)

candidate = "\n".join(fixed_lines).rstrip()

try:

ast.parse(candidate)

source = candidate

except SyntaxError:

pass

return source if source.strip().startswith("def ") else ""

def syntax_ok(source):

try:

ast.parse(source)

return True, ""

except SyntaxError as e:

return False, str(e)

FORBIDDEN_NAMES = {

"eval", "exec", "compile", "open", "input", "__import__",

"globals", "locals", "vars", "dir", "getattr", "setattr", "delattr",

"help", "breakpoint", "exit", "quit"

}

FORBIDDEN_NODES = (

ast.Import,

ast.ImportFrom,

ast.Global,

ast.Nonlocal,

ast.With,

ast.AsyncWith,

ast.AsyncFunctionDef,

ast.ClassDef,

ast.Delete,

ast.Raise,

)

ALLOWED_BUILTINS = {

"abs": abs,

"all": all,

"any": any,

"bool": bool,

"dict": dict,

"enumerate": enumerate,

"float": float,

"int": int,

"isinstance": isinstance,

"len": len,

"list": list,

"map": map,

"max": max,

"min": min,

"pow": pow,

"range": range,

"reversed": reversed,

"round": round,

"set": set,

"sorted": sorted,

"str": str,

"sum": sum,

"tuple": tuple,

"zip": zip,

}

def static_safety_check(source):

try:

tree = ast.parse(source)

except SyntaxError as e:

return False, f"SyntaxError: {e}"

for node in ast.walk(tree):

if isinstance(node, FORBIDDEN_NODES):

return False, f"Forbidden AST node: {type(node).__name__}"

if isinstance(node, ast.Name):

if node.id in FORBIDDEN_NAMES or node.id.startswith("__"):

return False, f"Forbidden name: {node.id}"

if isinstance(node, ast.Attribute):

if node.attr.startswith("__"):

return False, f"Forbidden attribute: {node.attr}"

if isinstance(node, ast.Call):

if isinstance(node.func, ast.Name) and node.func.id in FORBIDDEN_NAMES:

return False, f"Forbidden call: {node.func.id}"

return True, "passed"

def _worker_run_tests(source, function_name, tests, queue):

try:

safe_globals = {"__builtins__": ALLOWED_BUILTINS}

safe_locals = {}

compiled = compile(source, "", "exec")

exec(compiled, safe_globals, safe_locals)

fn = safe_locals.get(function_name) or safe_globals.get(function_name)

if fn is None:

queue.put({"ok": False, "error": f"{function_name} not found", "passed": 0, "total": len(tests)})

return

passed = 0

details = []

for test in tests:

args = test.get("args", [])

kwargs = test.get("kwargs", {})

expected = test["expected"]

result = fn(*args, **kwargs)

ok = result == expected

passed += int(ok)

details.append({

"args": args,

"kwargs": kwargs,

"expected": expected,

"result": result,

"ok": ok,

})

queue.put({"ok": passed == len(tests), "error": "", "passed": passed, "total": len(tests), "details": details})

except Exception as e:

queue.put({"ok": False, "error": repr(e), "passed": 0, "total": len(tests)})

def run_unit_tests_safely(source, function_name, tests, timeout_seconds=3):

safe, reason = static_safety_check(source)

if not safe:

return {"ok": False, "error": reason, "passed": 0, "total": len(tests), "details": []}

ctx = mp.get_context("fork")

queue = ctx.Queue()

process = ctx.Process(target=_worker_run_tests, args=(source, function_name, tests, queue))

process.start()

process.join(timeout_seconds)

if process.is_alive():

process.terminate()

process.join()

return {"ok": False, "error": "timeout", "passed": 0, "total": len(tests), "details": []}

if queue.empty():

return {"ok": False, "error": "no result returned", "passed": 0, "total": len(tests), "details": []}

return queue.get()

def code_complexity(source):

try:

blocks = cc_visit(source)

if not blocks:

return 1

return max(block.complexity for block in blocks)

except Exception:

return None

def score_candidate(source, test_result):

syntax_score = 1 if syntax_ok(source)[0] else 0

safety_score = 1 if static_safety_check(source)[0] else 0

passed = test_result.get("passed", 0)

total = max(test_result.get("total", 1), 1)

test_score = passed / total

complexity = code_complexity(source)

complexity_penalty = 0 if complexity is None else min(complexity / 20, 0.25)

return syntax_score + safety_score + 3 * test_score - complexity_penalty

生成された Python 関数を生のモデル出力から抽出するユーティリティレイヤーを構築します。生成されたコードの評価を容易にするために、構文検証、静的安全性チェック、制限付き実行、ユニットテストの実行、およびタイムアウト処理を追加します。また、コードの複雑度を計算し、正しさ、安全性、単純性に基づいて生成候補をランク付けするスコアリング関数を作成します。

コピー コード コピー済み 別のブラウザを使用してください

print("\n" + "=" * 90)

コードの生成とベンチマークタスクの定義

コピー コード コピー済み 別のブラウザを使用してください

必ず JSON 形式で返してください。translation フィールドのみ。他のフィールド (technical_terms 等) は一切追加しないこと — 余計なフィールドを書こうとして本文翻訳がトークン上限で打ち切られる事故を防ぐため:

{"translation": "翻訳全文"}

print("デモ 1: 自然言語からコードへの基本的な補完")

print("=" * 90)

basic_prompt = """# 円の面積を返す Python 関数を書いてください。

関数は circle_area という名前で、半径を引数として受け取る必要があります。

何も印刷しないでください。数値結果を返してください。

def circle_area(radius):

"""

basic_output = generate_text(

basic_prompt,

max_new_tokens=120,

temperature=0.25,

do_sample=True,

num_return_sequences=1,

)[0]

print_code("生の CodeGen 出力", basic_output)

circle_source = extract_function_source(basic_output, "circle_area")

print_code("抽出された関数", circle_source if circle_source else "# 関数が抽出されませんでした")

circle_tests = [

{"args": [1], "expected": math.pi},

{"args": [2], "expected": 4 * math.pi},

]

if circle_source:

print("構文:", syntax_ok(circle_source))

print("安全性:", static_safety_check(circle_source))

print("複雑度:", code_complexity(circle_source))

print("\n" + "=" * 90)

print("デモ 2: テストベースの再ランク付けによる Best-of-N 生成")

print("=" * 90)

TASKS = [

{

"name": "factorial",

"signature": "def factorial(n):",

"instruction": "非負整数 n の階乗を返してください。factorial(0) は 1 を使用します。",

"tests": [

{"args": [0], "expected": 1},

{"args": [1], "expected": 1},

{"args": [5], "expected": 120},

{"args": [7], "expected": 5040},

],

},

{

"name": "is_palindrome",

"signature": "def is_palindrome(text):",

"instruction": "スペースを除去し、大文字小文字を区別しない場合、テキストが回文であれば True を返し、そうでなければ False を返してください。",

"tests": [

{"args": ["Race car"], "expected": True},

{"args": ["hello"], "expected": False},

{"args": ["Never odd or even"], "expected": True},

],

},

{

"name": "fibonacci",

"signature": "def fibonacci(n):",

"instruction": "nth フィボナッチ数を返してください。ただし、fibonacci(0)=0 かつ fibonacci(1)=1 とします。",

"tests": [

{"args": [0], "expected": 0},

{"args": [1], "expected": 1},

{"args": [8], "expected": 21},

{"args": [10], "expected": 55},

],

},

{

"name": "dedupe_keep_order",

"signature": "def dedupe_keep_order(items):",

"instruction": "最初の出現順を保持しながら、重複値を除去したリストを返してください。",

"tests": [

{"args": [[1, 2, 1, 3, 2]], "expected": [1, 2, 3]},

{"args": [["a", "b", "a", "c"]], "expected": ["a", "b", "c"]},

{"args": [[]], "expected": []},

],

},

]

まずは、円の面積を計算する関数を用いた、自然言語からコードを生成するシンプルな例から始めます。まず、CodeGen による生出力を生成し、その中から関数を抽出して、構文・安全性・複雑性を検証します。その後、異なる関数生成問題にわたって CodeGen のベンチマークを行うために、複数のプログラミングタスクを定義します。

Best-of-N Candidate Generation and Test-Based Reranking

Copy CodeCopiedUse a different Browser

def build_prompt(task):

examples = []

for t in task["tests"][:2]:

examples.append(f"# Example: {task['name']}(*{t['args']}) -> {repr(t['expected'])}")

example_block = "\n".join(examples)

return f'''# You are writing clean Python 3 code.

Task: {task["instruction"]}

Rules:

- Do not import packages.

- Do not print anything.

- Return the answer from the function.

- Keep the implementation compact and readable.

{example_block}

{task["signature"]}

'''

def generate_candidates_for_task(task, n=3, max_new_tokens=160):

prompt = build_prompt(task)

outputs = generate_text(

prompt,

max_new_tokens=max_new_tokens,

temperature=0.45,

top_p=0.92,

do_sample=True,

num_return_sequences=n,

repetition_penalty=1.07,

)

candidates = []

for i, out in enumerate(outputs):

source = extract_function_source(out, task["name"])

syntax_pass, syntax_error = syntax_ok(source) if source else (False, "no source extracted")

test_result = run_unit_tests_safely(source, task["name"], task["tests"]) if source else {

"ok": False,

"error": "no source extracted",

"passed": 0,

"total": len(task["tests"]),

"details": [],

}

candidates.append({

"task": task["name"],

"candidate_id": i,

"prompt": prompt,

"raw_output": out,

"source": source,

"syntax_ok": syntax_pass,

"syntax_error": syntax_error,

"safety": static_safety_check(source)[0] if source else False,

"tests_passed": test_result.get("passed", 0),

"tests_total": test_result.get("total", len(task["tests"])),

"test_ok": test_result.get("ok", False),

"test_error": test_result.get("error", ""),

"complexity": code_complexity(source) if source else None,

"score": score_candidate(source, test_result) if source else -999,

})

candidates = sorted(candidates, key=lambda x: x["score"], reverse=True)

return candidates

all_candidates = []

best_solutions = {}

CANDIDATES_PER_TASK = 2

for task in tqdm(TASKS, desc="Generating and evaluating"):

candidates = generate_candidates_for_task(task, n=CANDIDATES_PER_TASK)

all_candidates.extend(candidates)

best_solutions[task["name"]] = candidates[0]

results_df = pd.DataFrame([

{

"task": c["task"],

"candidate_id": c["candidate_id"],

"syntax_ok": c["syntax_ok"],

"safety": c["safety"],

"tests_passed": c["tests_passed"],

"tests_total": c["tests_total"],

"test_ok": c["test_ok"],

"complexity": c["complexity"],

"score": round(c["score"], 3),

"test_error": c["test_error"],

}

for c in all_candidates

]).sort_values(["task", "score"], ascending=[True, False])

print("\nCandidate summary")

display(results_df)

for task_name, best in best_solutions.items():

print_code(f"Best solution for {task_name}", best["source"] if best["source"] else "# No valid source")

print({

"task": task_name,

"tests_passed": f'{best["tests_passed"]}/{best["tests_total"]}',

"score": best["score"],

"test_error": best["test_error"],

})

各タスクに対して構造化されたプロンプトを作成し、CodeGen を用いて複数の候補ソリューションを生成します。各候補は、ユニットテスト、構文チェック、安全性チェック、複雑度分析、およびスコアリングシステムを用いて評価されます。その後、結果を DataFrame に要約し、各タスクに対する最良の生成済みソリューションを表示します。

"\n" + "=" * 90 と印刷します。

マルチターンプログラム合成とプロンプトスタイルの実験

必ず JSON 形式で返してください。translation フィールドのみ。他のフィールド (technical_terms 等) は一切追加しないこと — 余計なフィールドを書こうとして本文翻訳がトークン上限で打ち切られる事故を防ぐため:

{"translation": "翻訳全文"}

print("Demo 3: Multi-turn program synthesis")

print("=" * 90)

multi_turn_prompts = [

{

"name": "normalize_words",

"prompt": """# Step 1.

Write a Python function normalize_words(text).

It should lowercase text, remove punctuation characters .,!?:;, and split into words.

Do not import packages.

def normalize_words(text):

""",

"tests": [

{"args": ["Hello, HELLO world!"], "expected": ["hello", "hello", "world"]},

{"args": ["A test: yes."], "expected": ["a", "test", "yes"]},

],

},

{

"name": "word_counts",

"prompt": """# Step 2.

Write a Python function word_counts(words).

It receives a list of words and returns a dictionary mapping each word to its frequency.

Do not import packages.

def word_counts(words):

""",

"tests": [

{"args": [["a", "b", "a"]], "expected": {"a": 2, "b": 1}},

{"args": [[]], "expected": {}},

],

},

{

"name": "top_word",

"prompt": """# Step 3.

Write a Python function top_word(counts).

It receives a dictionary of word frequencies.

Return the word with the highest count.

If counts is empty, return None.

If there is a tie, return the alphabetically smallest word.

Do not import packages.

def top_word(counts):

""",

"tests": [

{"args": [{"a": 2, "b": 1}], "expected": "a"},

{"args": [{"b": 2, "a": 2}], "expected": "a"},

{"args": [{}], "expected": None},

],

},

]

multi_turn_sources = []

for spec in multi_turn_prompts:

out = generate_text(

spec["prompt"],

max_new_tokens=150,

temperature=0.35,

top_p=0.92,

do_sample=True,

num_return_sequences=1,

)[0]

src = extract_function_source(out, spec["name"])

res = run_unit_tests_safely(src, spec["name"], spec["tests"]) if src else {"ok": False, "error": "no extraction"}

multi_turn_sources.append(src)

print_code(f"Generated {spec['name']}", src if src else "# No source extracted")

print("Test result:", res)

pipeline_code = "\n\n".join([s for s in multi_turn_sources if s])

pipeline_code += """

def most_common_word(text):

words = normalize_words(tex

python
print("Demo 3: Multi-turn program synthesis")
print("=" * 90)
multi_turn_prompts = [
   {
       "name": "normalize_words",
       "prompt": """# Step 1.
# Write a Python function normalize_words(text).
# It should lowercase text, remove punctuation characters .,!?:;, and split into words.
# Do not import packages.
def normalize_words(text):
""",
       "tests": [
           {"args": ["Hello, HELLO world!"], "expected": ["hello", "hello", "world"]},
           {"args": ["A test: yes."], "expected": ["a", "test", "yes"]},
       ],
   },
   {
       "name": "word_counts",
       "prompt": """# Step 2.
# Write a Python function word_counts(words).
# It receives a list of words and returns a dictionary mapping each word to its frequency.
# Do not import packages.
def word_counts(words):
""",
       "tests": [
           {"args": [["a", "b", "a"]], "expected": {"a": 2, "b": 1}},
           {"args": [[]], "expected": {}},
       ],
   },
   {
       "name": "top_word",
       "prompt": """# Step 3.
# Write a Python function top_word(counts).
# It receives a dictionary of word frequencies.
# Return the word with the highest count.
# If counts is empty, return None.
# If there is a tie, return the alphabetically smallest word.
# Do not import packages.
def top_word(counts):
""",
       "tests": [
           {"args": [{"a": 2, "b": 1}], "expected": "a"},
           {"args": [{"b": 2, "a": 2}], "expected": "a"},
           {"args": [{}], "expected": None},
       ],
   },
]
multi_turn_sources = []
for spec in multi_turn_prompts:
   out = generate_text(
       spec["prompt"],
       max_new_tokens=150,
       temperature=0.35,
       top_p=0.92,
       do_sample=True,
       num_return_sequences=1,
   )[0]
   src = extract_function_source(out, spec["name"])
   res = run_unit_tests_safely(src, spec["name"], spec["tests"]) if src else {"ok": False, "error": "no extraction"}
   multi_turn_sources.append(src)
   print_code(f"Generated {spec['name']}", src if src else "# No source extracted")
   print("Test result:", res)
pipeline_code = "\n\n".join([s for s in multi_turn_sources if s])
pipeline_code += """
def most_common_word(text):
   words = normalize_words(tex
原文を表示

In this tutorial, we implement an end-to-end workflow for Salesforce CodeGen. We load a CodeGen model from Hugging Face, prepare it for code generation, and use it to generate Python functions from natural-language prompts. We then move beyond basic inference by adding function extraction, syntax checking, static safety checks, unit-test-based validation, best-of-N candidate reranking, multi-step program synthesis, prompt-style experimentation, benchmark visualization, and artifact export. Through this workflow, we learn how CodeGen can be used not only as a code completion model but also as part of a structured code-generation pipeline that evaluates, filters, and organizes generated solutions.

Loading the Salesforce CodeGen Model from Hugging Face

Copy CodeCopiedUse a different Browser

import os, sys, subprocess, textwrap, json, re, time, math, ast, tempfile, multiprocessing as mp

from pathlib import Path

def sh(cmd):

print(f"\n$ {cmd}")

subprocess.run(cmd, shell=True, check=True)

sh(f"{sys.executable} -m pip install -q -U transformers accelerate safetensors einops datasets evaluate pandas matplotlib tqdm rich radon tiktoken")

import torch

import pandas as pd

import matplotlib.pyplot as plt

from tqdm.auto import tqdm

from rich import print

from rich.panel import Panel

from rich.syntax import Syntax

from transformers import AutoTokenizer, AutoModelForCausalLM, set_seed

from radon.complexity import cc_visit

OUT_DIR = Path("/content/codegen_advanced_tutorial")

OUT_DIR.mkdir(parents=True, exist_ok=True)

set_seed(42)

print(Panel.fit("Salesforce CodeGen Advanced Tutorial", style="bold green"))

print("\nRuntime information")

print("Python:", sys.version.split()[0])

print("Torch:", torch.__version__)

print("CUDA available:", torch.cuda.is_available())

if torch.cuda.is_available():

print("GPU:", torch.cuda.get_device_name(0))

print("CUDA memory GB:", round(torch.cuda.get_device_properties(0).total_memory / 1e9, 2))

MODEL_ID = os.environ.get("CODEGEN_MODEL_ID", "Salesforce/codegen-350M-mono")

MODEL_OPTIONS = {

"easy_colab_default": "Salesforce/codegen-350M-mono",

"larger_codegen1": "Salesforce/codegen-2B-mono",

"codegen2_1b": "Salesforce/codegen2-1B_P",

"codegen25_7b_mono": "Salesforce/codegen25-7b-mono_P",

}

print("\nSelected model:", MODEL_ID)

print("Available model examples:", MODEL_OPTIONS)

trust_remote_code = any(x in MODEL_ID.lower() for x in ["codegen2", "codegen25"])

device = "cuda" if torch.cuda.is_available() else "cpu"

dtype = torch.float16 if torch.cuda.is_available() else torch.float32

print("\nLoading tokenizer...")

tokenizer = AutoTokenizer.from_pretrained(

MODEL_ID,

trust_remote_code=trust_remote_code

)

if tokenizer.pad_token is None:

tokenizer.pad_token = tokenizer.eos_token

print("Loading model...")

load_kwargs = {

"trust_remote_code": trust_remote_code,

"low_cpu_mem_usage": True,

}

if torch.cuda.is_available():

load_kwargs["torch_dtype"] = dtype

load_kwargs["device_map"] = "auto"

else:

load_kwargs["torch_dtype"] = torch.float32

model = AutoModelForCausalLM.from_pretrained(MODEL_ID, **load_kwargs)

if not torch.cuda.is_available():

model.to(device)

model.eval()

def count_parameters(model):

return sum(p.numel() for p in model.parameters())

print(f"Loaded {MODEL_ID}")

print(f"Parameter count: {count_parameters(model)/1e6:.1f}M")

def generate_text(

prompt,

max_new_tokens=180,

temperature=0.35,

top_p=0.92,

top_k=50,

do_sample=True,

num_return_sequences=1,

repetition_penalty=1.05,

):

inputs = tokenizer(prompt, return_tensors="pt")

inputs = {k: v.to(model.device) for k, v in inputs.items()}

with torch.no_grad():

outputs = model.generate(

**inputs,

max_new_tokens=max_new_tokens,

do_sample=do_sample,

temperature=temperature,

top_p=top_p,

top_k=top_k,

num_return_sequences=num_return_sequences,

repetition_penalty=repetition_penalty,

pad_token_id=tokenizer.eos_token_id,

eos_token_id=tokenizer.eos_token_id,

)

decoded = tokenizer.batch_decode(outputs, skip_special_tokens=True)

return decoded

def print_code(title, code):

print(Panel.fit(title, style="bold cyan"))

print(Syntax(code, "python", theme="monokai", line_numbers=True))

We install all required libraries and prepare the environment for running Salesforce CodeGen. We check the runtime, detect GPU availability, select the CodeGen model, and load both the tokenizer and model from Hugging Face. We also define helper functions for text generation and for displaying formatted code so that the rest of the tutorial is easier to follow.

Building Extraction, Safety, and Unit-Test Validation Utilities

Copy CodeCopiedUse a different Browser

def extract_function_source(full_text, function_name):

text = full_text.replace("\r\n", "\n")

fence = re.search(r"`(?:python)?\n(.*?)`", text, flags=re.S | re.I)

if fence:

text = fence.group(1)

pattern = rf"^def\s+{re.escape(function_name)}\s*\("

match = re.search(pattern, text, flags=re.M)

if not match:

return ""

chunk = text[match.start():]

lines = chunk.splitlines()

collected = []

for i, line in enumerate(lines):

if i > 0:

if line.startswith("def ") or line.startswith("class "):

break

if line.startswith("if __name__"):

break

if line and not line.startswith((" ", "\t", "#")) and re.match(r"^[A-Za-z_][A-Za-z0-9_]*\s*=", line):

break

collected.append(line)

source = "\n".join(collected).rstrip()

try:

ast.parse(source)

return source

except SyntaxError:

fixed_lines = []

for line in collected:

fixed_lines.append(line)

candidate = "\n".join(fixed_lines).rstrip()

try:

ast.parse(candidate)

source = candidate

except SyntaxError:

pass

return source if source.strip().startswith("def ") else ""

def syntax_ok(source):

try:

ast.parse(source)

return True, ""

except SyntaxError as e:

return False, str(e)

FORBIDDEN_NAMES = {

"eval", "exec", "compile", "open", "input", "__import__",

"globals", "locals", "vars", "dir", "getattr", "setattr", "delattr",

"help", "breakpoint", "exit", "quit"

}

FORBIDDEN_NODES = (

ast.Import,

ast.ImportFrom,

ast.Global,

ast.Nonlocal,

ast.With,

ast.AsyncWith,

ast.AsyncFunctionDef,

ast.ClassDef,

ast.Delete,

ast.Raise,

)

ALLOWED_BUILTINS = {

"abs": abs,

"all": all,

"any": any,

"bool": bool,

"dict": dict,

"enumerate": enumerate,

"float": float,

"int": int,

"isinstance": isinstance,

"len": len,

"list": list,

"map": map,

"max": max,

"min": min,

"pow": pow,

"range": range,

"reversed": reversed,

"round": round,

"set": set,

"sorted": sorted,

"str": str,

"sum": sum,

"tuple": tuple,

"zip": zip,

}

def static_safety_check(source):

try:

tree = ast.parse(source)

except SyntaxError as e:

return False, f"SyntaxError: {e}"

for node in ast.walk(tree):

if isinstance(node, FORBIDDEN_NODES):

return False, f"Forbidden AST node: {type(node).__name__}"

if isinstance(node, ast.Name):

if node.id in FORBIDDEN_NAMES or node.id.startswith("__"):

return False, f"Forbidden name: {node.id}"

if isinstance(node, ast.Attribute):

if node.attr.startswith("__"):

return False, f"Forbidden attribute: {node.attr}"

if isinstance(node, ast.Call):

if isinstance(node.func, ast.Name) and node.func.id in FORBIDDEN_NAMES:

return False, f"Forbidden call: {node.func.id}"

return True, "passed"

def _worker_run_tests(source, function_name, tests, queue):

try:

safe_globals = {"__builtins__": ALLOWED_BUILTINS}

safe_locals = {}

compiled = compile(source, "<generated_code>", "exec")

exec(compiled, safe_globals, safe_locals)

fn = safe_locals.get(function_name) or safe_globals.get(function_name)

if fn is None:

queue.put({"ok": False, "error": f"{function_name} not found", "passed": 0, "total": len(tests)})

return

passed = 0

details = []

for test in tests:

args = test.get("args", [])

kwargs = test.get("kwargs", {})

expected = test["expected"]

result = fn(*args, **kwargs)

ok = result == expected

passed += int(ok)

details.append({

"args": args,

"kwargs": kwargs,

"expected": expected,

"result": result,

"ok": ok,

})

queue.put({"ok": passed == len(tests), "error": "", "passed": passed, "total": len(tests), "details": details})

except Exception as e:

queue.put({"ok": False, "error": repr(e), "passed": 0, "total": len(tests)})

def run_unit_tests_safely(source, function_name, tests, timeout_seconds=3):

safe, reason = static_safety_check(source)

if not safe:

return {"ok": False, "error": reason, "passed": 0, "total": len(tests), "details": []}

ctx = mp.get_context("fork")

queue = ctx.Queue()

process = ctx.Process(target=_worker_run_tests, args=(source, function_name, tests, queue))

process.start()

process.join(timeout_seconds)

if process.is_alive():

process.terminate()

process.join()

return {"ok": False, "error": "timeout", "passed": 0, "total": len(tests), "details": []}

if queue.empty():

return {"ok": False, "error": "no result returned", "passed": 0, "total": len(tests), "details": []}

return queue.get()

def code_complexity(source):

try:

blocks = cc_visit(source)

if not blocks:

return 1

return max(block.complexity for block in blocks)

except Exception:

return None

def score_candidate(source, test_result):

syntax_score = 1 if syntax_ok(source)[0] else 0

safety_score = 1 if static_safety_check(source)[0] else 0

passed = test_result.get("passed", 0)

total = max(test_result.get("total", 1), 1)

test_score = passed / total

complexity = code_complexity(source)

complexity_penalty = 0 if complexity is None else min(complexity / 20, 0.25)

return syntax_score + safety_score + 3 * test_score - complexity_penalty

We build the utility layer that extracts generated Python functions from raw model outputs. We add syntax validation, static safety checks, restricted execution, unit-test execution, and timeout handling to make generated code easier to evaluate. We also calculate code complexity and create a scoring function to rank generated candidates by correctness, safety, and simplicity.

Copy CodeCopiedUse a different Browser

print("\n" + "=" * 90)

Generating Code and Defining Benchmark Tasks

Copy CodeCopiedUse a different Browser

print("Demo 1: Basic natural-language-to-code completion")

print("=" * 90)

basic_prompt = """# Write a Python function that returns the area of a circle.

The function should be named circle_area and should accept radius as input.

Do not print anything. Return the numeric result.

def circle_area(radius):

"""

basic_output = generate_text(

basic_prompt,

max_new_tokens=120,

temperature=0.25,

do_sample=True,

num_return_sequences=1,

)[0]

print_code("Raw CodeGen output", basic_output)

circle_source = extract_function_source(basic_output, "circle_area")

print_code("Extracted function", circle_source if circle_source else "# No function extracted")

circle_tests = [

{"args": [1], "expected": math.pi},

{"args": [2], "expected": 4 * math.pi},

]

if circle_source:

print("Syntax:", syntax_ok(circle_source))

print("Safety:", static_safety_check(circle_source))

print("Complexity:", code_complexity(circle_source))

print("\n" + "=" * 90)

print("Demo 2: Best-of-N generation with test-based reranking")

print("=" * 90)

TASKS = [

{

"name": "factorial",

"signature": "def factorial(n):",

"instruction": "Return n factorial for a non-negative integer n. Use 1 for factorial(0).",

"tests": [

{"args": [0], "expected": 1},

{"args": [1], "expected": 1},

{"args": [5], "expected": 120},

{"args": [7], "expected": 5040},

],

},

{

"name": "is_palindrome",

"signature": "def is_palindrome(text):",

"instruction": "Return True if text is a palindrome after removing spaces and ignoring case, otherwise return False.",

"tests": [

{"args": ["Race car"], "expected": True},

{"args": ["hello"], "expected": False},

{"args": ["Never odd or even"], "expected": True},

],

},

{

"name": "fibonacci",

"signature": "def fibonacci(n):",

"instruction": "Return the nth Fibonacci number where fibonacci(0)=0 and fibonacci(1)=1.",

"tests": [

{"args": [0], "expected": 0},

{"args": [1], "expected": 1},

{"args": [8], "expected": 21},

{"args": [10], "expected": 55},

],

},

{

"name": "dedupe_keep_order",

"signature": "def dedupe_keep_order(items):",

"instruction": "Return a list with duplicate values removed while preserving the first occurrence order.",

"tests": [

{"args": [[1, 2, 1, 3, 2]], "expected": [1, 2, 3]},

{"args": [["a", "b", "a", "c"]], "expected": ["a", "b", "c"]},

{"args": [[]], "expected": []},

],

},

]

We start with a simple natural-language-to-code generation example using a circle area function. We generate raw CodeGen output, extract the function, and inspect its syntax, safety, and complexity. We then define multiple programming tasks that later help us benchmark CodeGen across different function-generation problems.

Best-of-N Candidate Generation and Test-Based Reranking

Copy CodeCopiedUse a different Browser

def build_prompt(task):

examples = []

for t in task["tests"][:2]:

examples.append(f"# Example: {task['name']}(*{t['args']}) -> {repr(t['expected'])}")

example_block = "\n".join(examples)

return f'''# You are writing clean Python 3 code.

Task: {task["instruction"]}

Rules:

- Do not import packages.

- Do not print anything.

- Return the answer from the function.

- Keep the implementation compact and readable.

{example_block}

{task["signature"]}

'''

def generate_candidates_for_task(task, n=3, max_new_tokens=160):

prompt = build_prompt(task)

outputs = generate_text(

prompt,

max_new_tokens=max_new_tokens,

temperature=0.45,

top_p=0.92,

do_sample=True,

num_return_sequences=n,

repetition_penalty=1.07,

)

candidates = []

for i, out in enumerate(outputs):

source = extract_function_source(out, task["name"])

syntax_pass, syntax_error = syntax_ok(source) if source else (False, "no source extracted")

test_result = run_unit_tests_safely(source, task["name"], task["tests"]) if source else {

"ok": False,

"error": "no source extracted",

"passed": 0,

"total": len(task["tests"]),

"details": [],

}

candidates.append({

"task": task["name"],

"candidate_id": i,

"prompt": prompt,

"raw_output": out,

"source": source,

"syntax_ok": syntax_pass,

"syntax_error": syntax_error,

"safety": static_safety_check(source)[0] if source else False,

"tests_passed": test_result.get("passed", 0),

"tests_total": test_result.get("total", len(task["tests"])),

"test_ok": test_result.get("ok", False),

"test_error": test_result.get("error", ""),

"complexity": code_complexity(source) if source else None,

"score": score_candidate(source, test_result) if source else -999,

})

candidates = sorted(candidates, key=lambda x: x["score"], reverse=True)

return candidates

all_candidates = []

best_solutions = {}

CANDIDATES_PER_TASK = 2

for task in tqdm(TASKS, desc="Generating and evaluating"):

candidates = generate_candidates_for_task(task, n=CANDIDATES_PER_TASK)

all_candidates.extend(candidates)

best_solutions[task["name"]] = candidates[0]

results_df = pd.DataFrame([

{

"task": c["task"],

"candidate_id": c["candidate_id"],

"syntax_ok": c["syntax_ok"],

"safety": c["safety"],

"tests_passed": c["tests_passed"],

"tests_total": c["tests_total"],

"test_ok": c["test_ok"],

"complexity": c["complexity"],

"score": round(c["score"], 3),

"test_error": c["test_error"],

}

for c in all_candidates

]).sort_values(["task", "score"], ascending=[True, False])

print("\nCandidate summary")

display(results_df)

for task_name, best in best_solutions.items():

print_code(f"Best solution for {task_name}", best["source"] if best["source"] else "# No valid source")

print({

"task": task_name,

"tests_passed": f'{best["tests_passed"]}/{best["tests_total"]}',

"score": best["score"],

"test_error": best["test_error"],

})

We create structured prompts for each task and generate multiple candidate solutions using CodeGen. We evaluate each candidate with unit tests, syntax checks, safety checks, complexity analysis, and a scoring system. We then summarize the results in a DataFrame and display the best-generated solution for each task.

Copy CodeCopiedUse a different Browser

print("\n" + "=" * 90)

Multi-Turn Program Synthesis and Prompt-Style Experiments

Copy CodeCopiedUse a different Browser

print("Demo 3: Multi-turn program synthesis")

print("=" * 90)

multi_turn_prompts = [

{

"name": "normalize_words",

"prompt": """# Step 1.

Write a Python function normalize_words(text).

It should lowercase text, remove punctuation characters .,!?:;, and split into words.

Do not import packages.

def normalize_words(text):

""",

"tests": [

{"args": ["Hello, HELLO world!"], "expected": ["hello", "hello", "world"]},

{"args": ["A test: yes."], "expected": ["a", "test", "yes"]},

],

},

{

"name": "word_counts",

"prompt": """# Step 2.

Write a Python function word_counts(words).

It receives a list of words and returns a dictionary mapping each word to its frequency.

Do not import packages.

def word_counts(words):

""",

"tests": [

{"args": [["a", "b", "a"]], "expected": {"a": 2, "b": 1}},

{"args": [[]], "expected": {}},

],

},

{

"name": "top_word",

"prompt": """# Step 3.

Write a Python function top_word(counts).

It receives a dictionary of word frequencies.

Return the word with the highest count.

If counts is empty, return None.

If there is a tie, return the alphabetically smallest word.

Do not import packages.

def top_word(counts):

""",

"tests": [

{"args": [{"a": 2, "b": 1}], "expected": "a"},

{"args": [{"b": 2, "a": 2}], "expected": "a"},

{"args": [{}], "expected": None},

],

},

]

multi_turn_sources = []

for spec in multi_turn_prompts:

out = generate_text(

spec["prompt"],

max_new_tokens=150,

temperature=0.35,

top_p=0.92,

do_sample=True,

num_return_sequences=1,

)[0]

src = extract_function_source(out, spec["name"])

res = run_unit_tests_safely(src, spec["name"], spec["tests"]) if src else {"ok": False, "error": "no extraction"}

multi_turn_sources.append(src)

print_code(f"Generated {spec['name']}", src if src else "# No source extracted")

print("Test result:", res)

pipeline_code = "\n\n".join([s for s in multi_turn_sources if s])

pipeline_code += """

def most_common_word(text):

words = normalize_words(tex

この記事をシェア

関連記事

TLDR AI★42026年6月18日 09:00

Claude で Replit が利用可能に(2 分読了)

Anthropic の AI チャットボット「Claude」が、コード開発環境「Replit」との連携機能を正式に追加し、ユーザーは Claude 内で直接 Replit を使用できるようになった。

Latent Space★42026年6月19日 14:53

[AINews] GLM は GPT より優れているか?GLM-5.2 が実用性を証明、Z.ai が 12 月までに「Open Fable」を公開予定

Latent Space のニュースでは、中国のモデル「GLM-5.2」がベンチマークで優れた結果を示し実用性があると評価されたことと、Z.ai が 12 月までにオープンソースプロジェクト「Open Fable」を発表する見込みについて報じられています。

AWS Machine Learning Blog★42026年6月19日 08:31

CloudWatch の SageMaker メトリクスとインサイトダッシュボードを用いた生成 AI 推論の監視・デバッグ

AWS は、大規模な生成 AI 推論エンドポイントの P99 レイテンシ急上昇などのトラブルを GPU メモリ圧力や KV キャッシュ飽和などから特定できるよう、CloudWatch に SageMaker の詳細メトリクスとインサイトダッシュボードを追加した。

今日のまとめ

AI日報で今日の重要ニュースをまとめ読み

ニュース一覧に戻る元記事を読む