CUP(Common Useful Python):百度のユーティリティツールキットで信頼性の高いPythonワークフローを構築
百度が公開した汎用 Python ツールキット「CUP」は、ログ管理、並行処理、設定管理など多様な機能を提供し、開発者のワークフロー効率化と信頼性向上に寄与する。
キーポイント
包括的な機能セットの提供
CUP はログ、デコレータ、ネスト設定、キャッシュ、ID 生成、スレッドプールなど、開発で頻出する 15 以上のサブシステムを統合したライブラリである。
実装と環境構築の簡素化
Colab 対応や自動ログローテーション機能により、複雑な設定なしで即座に使用可能であり、開発初期段階でのセットアップ時間を大幅に短縮する。
信頼性と保守性の向上
中断可能なスレッド、ファイルロック、リソース監視などの機能を通じて、長時間実行されるワークフローの安定性とエラーハンドリング能力を強化する。
高度なデコレータ機能
CUP はシングルトンパターン、実行時間計測(TraceUsedTime)、およびプラットフォーム依存制御(needlinux)を提供するデコレータを実装しており、コードの再利用性と可読性を向上させます。
構造化設定ファイルの柔軟な管理
Configure2Dict と Dict2Configure を用いて、ネストされた階層構造を持つ設定ファイルを辞書形式で読み書きでき、重複キー(例:@path)は自動的にリストとして処理されます。
設定のランタイム変更と永続化
メモリ上の設定オブジェクトを直接修正し、新しいファイルとして書き出すことで、設定の変更を即座に反映かつ保存する「ラウンドトリップ」処理が容易に行えます。
CUP デコレータの機能
CUP のデコレータは、シングルトンクラスの作成、実行時間の追跡、および Linux 固有関数の保護に利用されます。
影響分析・編集コメントを表示
影響分析
このツールキットは、Python を用いたインフラ管理やバックエンド開発において、標準ライブラリの不足を補完し、開発効率とシステムの堅牢性を同時に高める有望なソリューションです。特に大規模な自動化ワークフローやデータパイプラインの構築現場では、デバッグコストの削減と保守性の向上に寄与する可能性が高いですが、特定の AI モデル学習や推論といったコア AI 技術への直接的な革新性は限定的です。
編集コメント
AI モデル開発そのものではなく、それを支える Python 開発基盤の強化ツールとしての価値が明確な記事です。
このチュートリアルでは、より堅牢な Python ワークフローを構築するための実用的なユーティリティ・ツールキットとして、Baidu の Common Useful Python ライブラリである CUP を探ります。まず Colab で使いやすい環境でライブラリのセットアップを行い、その後、ログ機能、デコレータ、ネストされた設定、キャッシュ、ID 生成、スレッドプール、割り込み可能なスレッド、遅延実行、時間ユーティリティ、Linux リソース監視、ファイルロック、ネットワークヘルパー、オブジェクトストレージインターフェース、型マップ、組み込みテストアサーションなど、主要なサブシステムを順を追って解説していきます。進めるにつれて、単にランダムに関数を呼び出すだけでなく、監視、自動化、並行処理、設定管理、信頼性チェックといった実際の開発タスクにおいて各モジュールがどのように機能するかを観察します。
CUP のセットアップとログ機能
コードをコピーしました(コピー済み)
別のブラウザを使用してください
import os
import sys
import time
import threading
import tempfile
import datetime
import subprocess
def banner(title):
line = "=" * 70
print("\n" + line + "\n" + title + "\n" + line)
def skip(exc):
"""ノートブックを中止せずに、優雅にスキップされたセクションを報告する。"""
print(" [skipped — {}: {}]".format(type(exc).__name__, exc))
banner("0. SETUP (install + cup.platforms + cup.version)")
subprocess.run(
[sys.executable, "-m", "pip", "install", "-q", "cup", "pytz"],
check=False,
)
import cup
ver = getattr(cup, "__version__", None)
if ver is None:
try:
from cup import version as _v
ver = getattr(_v, "VERSION", None) or getattr(_v, "__version__", "unknown")
except Exception:
ver = "unknown"
print("CUP version :", ver)
print("Python :", sys.version.split()[0])
try:
from cup import platforms
print("is_linux :", platforms.is_linux())
print("is_mac :", platforms.is_mac())
print("is_windows :", platforms.is_windows())
print("is_py3 :", platforms.is_py3())
except Exception as e:
skip(e)
banner("1. LOGGING (cup.log)")
LOGFILE = os.path.join(tempfile.gettempdir(), "cup_tutorial.log")
try:
from cup import log
log.init_comlog(
"cup_tutorial",
log.INFO,
LOGFILE,
log.ROTATION,
10 * 1024 * 1024,
True,
False,
)
log.info("hello from cup.log — written to file AND stdout")
log.warning("a warning line")
log.info_if(2 > 1, "info_if(True) -> emitted")
log.info_if(1 > 2, "info_if(False) -> you will NOT see this")
log.setloglevel(log.DEBUG)
log.debug("debug visible after setloglevel(DEBUG)")
try:
with open(LOGFILE) as fh:
last = [ln for ln in fh.read().splitlines() if ln.strip()][-1]
parsed = log.parse(last)
print("parsed last log line ->")
for k in ("loglevel", "date", "time", "pid", "srcline", "msg"):
if isinstance(parsed, dict) and k in parsed:
print(" {:8}: {}".format(k, parsed[k]))
except Exception as e:
skip(e)
except Exception as e:
skip(e)
CUP チュートリアルの環境を設定し、必要なパッケージを Python から直接インストールすることから始めます。ノートブックの可読性を保ちつつ、失敗したセクションを安全にスキップできるようにするヘルパー関数を定義します。その後、ライブラリの基盤を理解するために、CUP のバージョン詳細、プラットフォームチェック、構造化ログについて探ります。
デコレータとネストされた設定
コードをコピーしました
別のブラウザを使用してください
banner("2. DECORATORS (cup.decorators)")
try:
from cup import decorators
@decorators.Singleton
class AppConfig(object):
def __init__(self):
self.created_at = time.time()
a, b = AppConfig(), AppConfig()
print("Singleton: a is b ->", a is b, "(same created_at:",
a.created_at == b.created_at, ")")
@decorators.TraceUsedTime(
b_print_stdout=True,
enter_msg="event_id=0xABCDE enter",
leave_msg="event_id=0xABCDE leave",
)
def heavy_compute():
time.sleep(0.2)
return sum(range(200000))
print("heavy_compute() =", heavy_compute())
@decorators.needlinux
def linux_only():
return "this body is allowed to run on Linux"
print("needlinux ->", linux_only())
except Exception as e:
skip(e)
banner("3. RICH NESTED CONFIG (cup.util.conf)")
CONF_PATH = os.path.join(tempfile.gettempdir(), "cup_demo.conf")
CONF_TEXT = """\
---- global scalars (layer 0) ----
host: abc.com
port: 12345
debug: false
[monitor]
enabled: true
interval: 60
regex: sshd
[.thresholds]
cpu_max: 90
mem_max: 80
[..actions]
on_breach: alert
[storage]
@path: /data/disk1
@path: /data/disk2
@path: /data/disk3
"""
try:
from cup.util import conf
with open(CONF_PATH, "w") as fh:
fh.write(CONF_TEXT)
cfg = conf.Configure2Dict(CONF_PATH, separator=":").get_dict()
print("host :", cfg["host"])
print("port :", cfg["port"])
print("monitor.enabled :", cfg["monitor"]["enabled"])
print("monitor.regex :", cfg["monitor"]["regex"])
print("monitor.thresholds.cpu_max :", cfg["monitor"]["thresholds"]["cpu_max"])
print("monitor.thresholds.actions :",
cfg["monitor"]["thresholds"]["actions"]["on_breach"])
print("storage.path (repeated @ -> list):", list(cfg["storage"]["path"]))
cfg["port"] = "10085"
cfg["monitor"]["thresholds"]["actions"]["on_breach"] = "restart"
NEW_PATH = CONF_PATH + ".new"
conf.Dict2Configure(cfg, separator=":").write_conf(NEW_PATH)
re_read = conf.Configure2Dict(NEW_PATH, separator=":").get_dict()
print("round-trip port :", re_read["port"], "(was 12345)")
print("round-trip on_breach :",
re_read["monitor"]["thresholds"]["actions"]["on_breach"], "(was alert)")
except Exception as e:
skip(e)
次に、CUP デコレータについて見ていきましょう。これらは単一インスタンスクラスの作成や実行時間の追跡、Linux 固有関数の保護にどのように役立つかを確認します。その後、CUP の豊富な設定システムを活用し、セクション、子セクション、繰り返し値を持つネストされた設定ファイルをロードします。さらに、設定を更新してディスクへ書き戻すことで、読み取り - 変更 - 書き込みのフローが正しく機能していることを確認します。
キャッシュ、ID、スレッドプール
コードをコピー
コピー済み
別のブラウザを使用
banner("4. メモリ内 KV キャッシュ (cup.cache)")
try:
from cup import cache
kv = cache.KVCache(name="demo")
kv.set({"user:1": "alice", "user:2": "bob"}, expire_sec=2)
kv.set({"config:flag": "on"}, expire_sec=None)
print("set 後のサイズ :", kv.size())
print("user:1 の取得 :", kv.get("user:1"))
print("存在しないキーの取得 :", kv.get("nope"))
print("2 秒 TTL キーが期限切れになるまで 2.2 秒待機中...")
time.sleep(2.2)
print("user:1 の取得 (期限切れ):", kv.get("user:1"))
print("config:flag の取得 (永続):", kv.get("config:flag"))
reclaimed = kv.pop_n_expired(0)
print("pop_n_expired で回収された:", list(reclaimed.keys()) if reclaimed else [])
except Exception as e:
skip(e)
banner("5. 一意 ID 生成 (cup.services.generator)")
try:
from cup.services import generator
gman = generator.CGeneratorMan()
print("uniqname :", gman.get_uniqname())
print("next_uniq_num :", gman.get_next_uniq_num())
print("next_uniq_num (再取得) :", gman.get_next_uniq_num(), "(単調増加)")
if hasattr(gman, "get_uuid"):
try:
print("get_uuid :", gman.get_uuid())
except Exception as e:
skip(e)
if hasattr(gman, "get_random_str"):
try:
print("get_random_str(16) :", gman.get_random_str(16))
except Exception as e:
skip(e)
print("シングルトンチェック :", generator.CGeneratorMan() is gman)
try:
cyc = generator.CycleIDGenerator("127.0.0.1", 8080)
i1, i2 = cyc.next_id(), cyc.next_id()
print("CycleIDGenerator ID #1 :", i1)
print("CycleIDGenerator ID #2 :", i2, "(インクリメント済み)")
print("ID #1 の 16 進数表記 :", generator.CycleIDGenerator.id2_hexstring(i1))
except Exception as e:
skip(e)
except Exception as e:
skip(e)
banner("6. スレッドプール (cup.services.threadpool)")
try:
from cup.services import threadpool
pool = threadpool.ThreadPool(minthreads=2, maxthreads=4, name="demo-pool")
pool.start()
results, rlock = [], threading.Lock()
def square(n):
time.sleep(0.03)
with rlock:
results.append(n * n)
return n * n
for i in range(8):
pool.add_1job(square, i)
callback_log = []
def on_done(ok, result):
callback_log.append((ok, result))
pool.add_1job_with_callback(on_done, square, 100)
def will_fail():
raise RuntimeError("worker 内で boom")
pool.add_1job_with_callback(on_done, will_fail)
time.sleep(0.5)
print("ライブ統計 :", pool.get_stats())
pool.stop()
print("収集された平方数 :", sorted(results))
print("コールバック結果 :", callback_log)
except Exception as e:
skip(e)
CUP のメモリ内キャッシュを使用して、一時的および永続的な寿命を持つキー値ペアを保存します。その後、分散型識別子用の一意の名前、カウンター、UUID 形式の値、ランダム文字列、循環 ID を生成します。また、スレッドプールを作成し、ジョブを送信して結果を集約し、成功したタスクと失敗したタスクの両方におけるコールバック動作を観察します。
スレッド、スケジューリング、時間ユーティリティ
コードをコピーしました(別のブラウザを使用)
banner("7. インターラプチブルスレッド + RW ロック (cup.thread)")
try:
from cup import thread as cupthread
rw = cupthread.RWLock()
rw.acquire_readlock()
rw.acquire_readlock()
print("acquired 2 read locks concurrently")
rw.release_readlock()
rw.release_readlock()
rw.acquire_writelock()
print("acquired exclusive write lock")
rw.release_writelock()
stop_flag = {"running": True}
def busy_loop():
while stop_flag["running"]:
time.sleep(0.05)
t = cupthread.CupThread(target=busy_loop)
t.daemon = True
t.start()
print("worker python tid :", t.get_my_tid())
stop_flag["running"] = False
ok = t.terminate()
print("CupThread.terminate -> :", ok)
except Exception as e:
skip(e)
banner("8. DELAY / CRON EXECUTOR (cup.services.executor)")
try:
from cup.services import executor
URGENCY = getattr(executor, "URGENCY_NORMAL", 0)
svc = executor.ExecutionService(
delay_exe_thdnum=2, queue_exec_thdnum=2, name="exec-demo"
)
svc.start()
fired = []
def task(tag):
fired.append((tag, round(time.time(), 3)))
svc.delay_exec(0.3, task, URGENCY, "delayed-0.3s")
svc.queue_exec(task, URGENCY, "queued-now")
time.sleep(0.8)
svc.stop()
print("fired (tag, ts) :", fired)
try:
import pytz
import hashlib
tz = pytz.timezone("Asia/Shanghai")
timer = {"minute": [0, 30], "hour": None,
"weekday": None, "monthday": None, "month": None}
task_uuid = hashlib.md5(b"demo-cron").hexdigest()
ctask = executor.CronTask("demo-cron", tz, timer, task_uuid, task, "cron-arg")
print("cron next_schedtime :", ctask.next_schedtime())
print("cron 32-byte taskid :", ctask.taskid())
except Exception as e:
skip(e)
except Exception as e:
skip(e)
banner("9. TIME HELPERS (cup.timeplus)")
try:
from cup import timeplus
import pytz
print("get_str_now (default) :", timeplus.get_str_now())
print("get_str_now (custom fmt):", timeplus.get_str_now("%Y/%m/%d %H:%M:%S"))
tp = timeplus.TimePlus(pytz.timezone("Asia/Shanghai"))
now_utc = datetime.datetime.utcnow()
print("naive utc now :", now_utc)
print("-> Shanghai local :", tp.utc2local(now_utc))
except Exception as e:
skip(e)
CUP のスレッドユーティリティを、リーダー・ライタロックと割り込み可能なスレッドを使用して探索します。その後、実行サービスを通じて遅延タスクおよびキューイングされたタスクを実行し、単純なスケジューリングの動作を理解します。また、cron スタイルのタスクを作成し、実際の時計が刻むのを待たずに、次にスケジュールされている実行時刻を検証します。
システムリソースとストレージ
コードをコピーしました
別のブラウザを使用してください
banner("10. SYSTEM RESOURCES (cup.res.linux)")
try:
from cup.res import linux as reslinux
try:
print("cpu cores :", reslinux.get_cpu_nums())
except Exception as e:
skip(e)
try:
cpu = reslinux.get_cpu_usage(intvl_in_sec=1)
print("cpu usage sample :", cpu, "| usr=", getattr(cpu, "usr", "?"))
except Exception as e:
skip(e)
try:
mem = reslinux.get_meminfo()
gb = 1024.0 ** 3
print("mem total / available : {:.2f} GB / {:.2f} GB".format(
mem.total / gb, mem.available / gb))
except Exception as e:
skip(e)
try:
print("kernel version :", reslinux.get_kernel_version())
except Exception as e:
skip(e)
try:
all_pids = reslinux.pids()
print("live process count :", len(all_pids))
except Exception as e:
skip(e)
try:
me = reslinux.Process(os.getpid())
name = me.get_process_name() if hasattr(me, "get_process_name") else "?"
status = me.get_process_status() if hasattr(me, "get_process_status") else "?"
print("this process name/status:", name, "/", status)
except Exception as e:
skip(e)
except Exception as e:
skip(e)
banner("11. FILE UTILITIES (cup.exfile)")
try:
from cup import exfile
lock_path = os.path.join(tempfile.gettempdir(), "cup_demo.lock")
flock = exfile.LockFile(lock_path)
got = flock.lock(blocking=True)
print("acquired file lock :", got)
print("lock file path :", flock.filepath())
flock.unlock()
print("released file lock : True")
except Exception as e:
skip(e)
banner("12. NETWORKING HELPERS (cup.net)")
try:
from cup import net as cupnet
try:
print("local hostname :", cupnet.get_local_hostname())
except Exception as e:
skip(e)
try:
print("host ip :", cupnet.get_hostip())
except Exception as e:
skip(e)
try:
print("local port 9999 free? :", cupnet.localport_free(9999))
except Exception as e:
skip(e)
except Exception as e:
skip(e)
banner("13. UNIFIED OBJECT STORAGE (cup.storage.obj)")
try:
from cup.storage import obj as cupobj
print("available backends :",
[n for n in dir(cupobj) if n.endswith("ObjectSystem")])
los = None
for ctor in (lambda: cupobj.LocalObjectSystem(tempfile.gettempdir()),
lambda: cupobj.LocalObjectSystem({}),
lambda: cupobj.LocalObjectSystem()):
try:
los = ctor()
break
except Exception:
continue
if los is None:
print(" LocalObjectSystem ctor varies by version — see docs for "
"your release's exact signature.")
else:
print("LocalObjectSystem ready :", type(los).__name__,
"(put/get/head/delete available)")
except Exception as e:
skip(e)
Linux システムリソース、すなわち CPU コア数、CPU 使用率、メモリ、カーネルバージョン、プロセス数、および現在のプロセスステータスを確認します。その後、CUP のファイルユーティリティモジュールを使用して、安全な単一インスタンス実行のためにアドバザリーロック(注釈付きロック)を取得・解放します。また、ネットワークヘルパーをテストし、オブジェクトストレージインターフェースをプローブして、利用可能なストレージバックエンドを確認します。
Type Maps and Assertions
コードをコピーしました
別のブラウザを使用してください
banner("14. BIDIRECTIONAL TYPE MAPS (cup.flag)")
try:
from cup import flag
tman = flag.TypeMan()
tman.register_types({"LOGIN": 1, "LOGOUT": 2, "HEARTBEAT": 3, "DATA": 4})
print("LOGIN -> number :", tman.getnumber_bykey("LOGIN"))
print("number 2 -> key :", tman.getkey_bynumber(2))
print("all registered keys :", list(tman.get_key_list()))
except Exception as e:
skip(e)
banner("15. BUILT-IN TEST ASSERTIONS (cup.unittest)")
try:
from cup import unittest as cup_unittest
cup_unittest.assert_eq(2 + 2, 4)
cup_unittest.assert_ne(1, 2)
cup_unittest.assert_true(isinstance([], list))
cup_unittest.assert_gt(5, 3)
cup_unittest.assert_eq_one("b", ["a", "b", "c"])
def raises_value_error():
raise ValueError("expected")
cup_unittest.expect_raise(raises_value_error, ValueError)
print("all passing assertions : OK")
try:
cup_unittest.assert_eq(1, 2, "intentional failure demo")
except Exception as e:
print("caught intended failure :", type(e).__name__)
except Exception as e:
skip(e)
banner("DONE — you exercised 15 CUP subsystems")
print("""
What you just used, by module:
cup.log .................. structured, rotating, parseable logging
cup.decorators ........... Singleton, TraceUsedTime, platform guards
cup.util.conf ............ nested config with sections + @arrays (round-trip)
cup.cache ................ TTL KV cache (get() extends lifetime)
cup.services.generator ... host-unique names, counters, cycling 128-bit IDs
cup.services.threadpool .. pool with result/failure callbacks + live stats
cup.thread ............... interruptible CupThread + reader/writer RWLock
cup.services.executor .... delay_exec / queue_exec + crontab-style CronTask
cup.timeplus ............. now-formatting + safe tz<->UTC conversion
cup.res.linux ............ /proc cpu, memory, kernel, process introspection
cup.exfile ............... advisory single-instance file locking
cup.net .................. hostname / ip / free-port helpers
cup.storage.obj .......... one API over Local / FTP / S3 / AFS backends
cup.flag ................. bidirectional key<->number type tables
cup.unittest ............. assertion toolkit that plays well with try/except
Next steps: API reference at https://cup.iobusy.com • source at
https://github.com/baidu/CUP (browse src/cup/ and the cup_test/ suite).
"")
CUP のフラグユーティリティを使用して、メッセージ名と数値コードの間の双方向マッピングを作成します。その後、CUP の組み込みアサートヘルパーを実行して、等価性、不等価性、真偽チェック、比較演算、メンバーシップ、および予期される例外を検証します。最後に、使用したすべての CUP サブシステムを要約し、それぞれを実践的な開発ユースケースに結びつけてチュートリアルを終了します。
結論
結論として、CUP の広範な実践的なツアーを完了し、そのユーティリティがプロダクションスタイルの Python アプリケーションをどのようにサポートできるかを理解しました。CUP がログの構造化、ランタイム設定の管理、一時データのキャッシュ、一意の識別子の生成、スレッドの調整、システムリソースの検査、ロックによるファイル保護、およびアサートを通じた動作検証をどのように支援するかを確認しました。これは、信頼性、観測可能性、実用的なツールが重要となるスクリプト、バックエンドサービス、自動化ジョブ、およびシステムレベルの Python プロジェクトで CUP を使用する際に、コンパクトながら有用な基盤を提供します。
完全なコードはここでご覧ください。また、Twitter でフォローすることもできますし、15 万人以上の ML サブレッドに参加したり、ニュースレターを購読することを忘れないでください。待ってください!Telegram を使っていますか?今なら Telegram でも私たちに参加できます。
GitHub リポジトリや Hugging Face ページ、製品リリース、ウェビナーなどのプロモーションのためにパートナーシップをご希望ですか?私たちに連絡してください。
この投稿「CUP (Common Useful Python): Baidu のユーティリティツールキットによる信頼性の高い Python ワークフローの構築」は、MarkTechPost で最初に公開されました。
原文を表示
In this tutorial, we explore CUP, Baidu’s Common Useful Python library, as a practical utility toolkit for building stronger Python workflows. We begin by setting up the library in a Colab-friendly environment and then move through its major subsystems step by step, including logging, decorators, nested configuration, caching, ID generation, thread pools, interruptible threads, delayed execution, time utilities, Linux resource monitoring, file locking, networking helpers, object storage interfaces, type maps, and built-in testing assertions. As we progress, we do not just call functions at random; we observe how each module fits into real-world development tasks such as monitoring, automation, concurrency, configuration management, and reliability checks.
CUP Setup and Logging
Copy CodeCopiedUse a different Browser
import os
import sys
import time
import threading
import tempfile
import datetime
import subprocess
def banner(title):
line = "=" * 70
print("\n" + line + "\n" + title + "\n" + line)
def skip(exc):
"""Report a gracefully-skipped section without aborting the notebook."""
print(" [skipped — {}: {}]".format(type(exc).__name__, exc))
banner("0. SETUP (install + cup.platforms + cup.version)")
subprocess.run(
[sys.executable, "-m", "pip", "install", "-q", "cup", "pytz"],
check=False,
)
import cup
ver = getattr(cup, "__version__", None)
if ver is None:
try:
from cup import version as _v
ver = getattr(_v, "VERSION", None) or getattr(_v, "__version__", "unknown")
except Exception:
ver = "unknown"
print("CUP version :", ver)
print("Python :", sys.version.split()[0])
try:
from cup import platforms
print("is_linux :", platforms.is_linux())
print("is_mac :", platforms.is_mac())
print("is_windows :", platforms.is_windows())
print("is_py3 :", platforms.is_py3())
except Exception as e:
skip(e)
banner("1. LOGGING (cup.log)")
LOGFILE = os.path.join(tempfile.gettempdir(), "cup_tutorial.log")
try:
from cup import log
log.init_comlog(
"cup_tutorial",
log.INFO,
LOGFILE,
log.ROTATION,
10 * 1024 * 1024,
True,
False,
)
log.info("hello from cup.log — written to file AND stdout")
log.warning("a warning line")
log.info_if(2 > 1, "info_if(True) -> emitted")
log.info_if(1 > 2, "info_if(False) -> you will NOT see this")
log.setloglevel(log.DEBUG)
log.debug("debug visible after setloglevel(DEBUG)")
try:
with open(LOGFILE) as fh:
last = [ln for ln in fh.read().splitlines() if ln.strip()][-1]
parsed = log.parse(last)
print("parsed last log line ->")
for k in ("loglevel", "date", "time", "pid", "srcline", "msg"):
if isinstance(parsed, dict) and k in parsed:
print(" {:8}: {}".format(k, parsed[k]))
except Exception as e:
skip(e)
except Exception as e:
skip(e)
We begin by setting up the CUP tutorial environment and installing the required packages directly from Python. We define helper functions that keep the notebook readable and allow failed sections to be skipped safely. We then explore CUP version details, platform checks, and structured logging to understand the library’s foundation.
Decorators and Nested Config
Copy CodeCopiedUse a different Browser
banner("2. DECORATORS (cup.decorators)")
try:
from cup import decorators
@decorators.Singleton
class AppConfig(object):
def __init__(self):
self.created_at = time.time()
a, b = AppConfig(), AppConfig()
print("Singleton: a is b ->", a is b, "(same created_at:",
a.created_at == b.created_at, ")")
@decorators.TraceUsedTime(
b_print_stdout=True,
enter_msg="event_id=0xABCDE enter",
leave_msg="event_id=0xABCDE leave",
)
def heavy_compute():
time.sleep(0.2)
return sum(range(200000))
print("heavy_compute() =", heavy_compute())
@decorators.needlinux
def linux_only():
return "this body is allowed to run on Linux"
print("needlinux ->", linux_only())
except Exception as e:
skip(e)
banner("3. RICH NESTED CONFIG (cup.util.conf)")
CONF_PATH = os.path.join(tempfile.gettempdir(), "cup_demo.conf")
CONF_TEXT = """\
---- global scalars (layer 0) ----
host: abc.com
port: 12345
debug: false
[monitor]
enabled: true
interval: 60
regex: sshd
[.thresholds]
cpu_max: 90
mem_max: 80
[..actions]
on_breach: alert
[storage]
@path: /data/disk1
@path: /data/disk2
@path: /data/disk3
"""
try:
from cup.util import conf
with open(CONF_PATH, "w") as fh:
fh.write(CONF_TEXT)
cfg = conf.Configure2Dict(CONF_PATH, separator=":").get_dict()
print("host :", cfg["host"])
print("port :", cfg["port"])
print("monitor.enabled :", cfg["monitor"]["enabled"])
print("monitor.regex :", cfg["monitor"]["regex"])
print("monitor.thresholds.cpu_max :", cfg["monitor"]["thresholds"]["cpu_max"])
print("monitor.thresholds.actions :",
cfg["monitor"]["thresholds"]["actions"]["on_breach"])
print("storage.path (repeated @ -> list):", list(cfg["storage"]["path"]))
cfg["port"] = "10085"
cfg["monitor"]["thresholds"]["actions"]["on_breach"] = "restart"
NEW_PATH = CONF_PATH + ".new"
conf.Dict2Configure(cfg, separator=":").write_conf(NEW_PATH)
re_read = conf.Configure2Dict(NEW_PATH, separator=":").get_dict()
print("round-trip port :", re_read["port"], "(was 12345)")
print("round-trip on_breach :",
re_read["monitor"]["thresholds"]["actions"]["on_breach"], "(was alert)")
except Exception as e:
skip(e)
We move on to CUP decorators and see how they help us create single-instance classes, track execution time, and protect Linux-only functions. We then work with CUP’s rich configuration system and load a nested configuration file with sections, child sections, and repeated values. We also update the configuration and write it back to disk to confirm that the read-modify-write flow works correctly.
Caching, IDs, Thread Pools
Copy CodeCopiedUse a different Browser
banner("4. IN-MEMORY KV CACHE (cup.cache)")
try:
from cup import cache
kv = cache.KVCache(name="demo")
kv.set({"user:1": "alice", "user:2": "bob"}, expire_sec=2)
kv.set({"config:flag": "on"}, expire_sec=None)
print("size after sets :", kv.size())
print("get user:1 :", kv.get("user:1"))
print("get missing key :", kv.get("nope"))
print("sleeping 2.2s to let the 2s-TTL keys expire ...")
time.sleep(2.2)
print("get user:1 (expired) :", kv.get("user:1"))
print("get config:flag (eternal):", kv.get("config:flag"))
reclaimed = kv.pop_n_expired(0)
print("pop_n_expired reclaimed :", list(reclaimed.keys()) if reclaimed else [])
except Exception as e:
skip(e)
banner("5. UNIQUE ID GENERATION (cup.services.generator)")
try:
from cup.services import generator
gman = generator.CGeneratorMan()
print("uniqname :", gman.get_uniqname())
print("next_uniq_num :", gman.get_next_uniq_num())
print("next_uniq_num (again) :", gman.get_next_uniq_num(), "(monotonic)")
if hasattr(gman, "get_uuid"):
try:
print("get_uuid :", gman.get_uuid())
except Exception as e:
skip(e)
if hasattr(gman, "get_random_str"):
try:
print("get_random_str(16) :", gman.get_random_str(16))
except Exception as e:
skip(e)
print("singleton check :", generator.CGeneratorMan() is gman)
try:
cyc = generator.CycleIDGenerator("127.0.0.1", 8080)
i1, i2 = cyc.next_id(), cyc.next_id()
print("CycleIDGenerator id #1 :", i1)
print("CycleIDGenerator id #2 :", i2, "(incremented)")
print("id #1 as hex :", generator.CycleIDGenerator.id2_hexstring(i1))
except Exception as e:
skip(e)
except Exception as e:
skip(e)
banner("6. THREAD POOL (cup.services.threadpool)")
try:
from cup.services import threadpool
pool = threadpool.ThreadPool(minthreads=2, maxthreads=4, name="demo-pool")
pool.start()
results, rlock = [], threading.Lock()
def square(n):
time.sleep(0.03)
with rlock:
results.append(n * n)
return n * n
for i in range(8):
pool.add_1job(square, i)
callback_log = []
def on_done(ok, result):
callback_log.append((ok, result))
pool.add_1job_with_callback(on_done, square, 100)
def will_fail():
raise RuntimeError("boom inside worker")
pool.add_1job_with_callback(on_done, will_fail)
time.sleep(0.5)
print("live stats :", pool.get_stats())
pool.stop()
print("squares collected :", sorted(results))
print("callback results :", callback_log)
except Exception as e:
skip(e)
We use CUP’s in-memory cache to store key-value pairs with temporary and permanent lifetimes. We then generate unique names, counters, UUID-style values, random strings, and cycling IDs for distributed-style identifiers. We also create a thread pool, submit jobs, collect results, and observe callback behavior for both successful and failed tasks.
Threads, Scheduling, Time Utilities
Copy CodeCopiedUse a different Browser
banner("7. INTERRUPTIBLE THREADS + RW LOCK (cup.thread)")
try:
from cup import thread as cupthread
rw = cupthread.RWLock()
rw.acquire_readlock()
rw.acquire_readlock()
print("acquired 2 read locks concurrently")
rw.release_readlock()
rw.release_readlock()
rw.acquire_writelock()
print("acquired exclusive write lock")
rw.release_writelock()
stop_flag = {"running": True}
def busy_loop():
while stop_flag["running"]:
time.sleep(0.05)
t = cupthread.CupThread(target=busy_loop)
t.daemon = True
t.start()
print("worker python tid :", t.get_my_tid())
stop_flag["running"] = False
ok = t.terminate()
print("CupThread.terminate -> :", ok)
except Exception as e:
skip(e)
banner("8. DELAY / CRON EXECUTOR (cup.services.executor)")
try:
from cup.services import executor
URGENCY = getattr(executor, "URGENCY_NORMAL", 0)
svc = executor.ExecutionService(
delay_exe_thdnum=2, queue_exec_thdnum=2, name="exec-demo"
)
svc.start()
fired = []
def task(tag):
fired.append((tag, round(time.time(), 3)))
svc.delay_exec(0.3, task, URGENCY, "delayed-0.3s")
svc.queue_exec(task, URGENCY, "queued-now")
time.sleep(0.8)
svc.stop()
print("fired (tag, ts) :", fired)
try:
import pytz
import hashlib
tz = pytz.timezone("Asia/Shanghai")
timer = {"minute": [0, 30], "hour": None,
"weekday": None, "monthday": None, "month": None}
task_uuid = hashlib.md5(b"demo-cron").hexdigest()
ctask = executor.CronTask("demo-cron", tz, timer, task_uuid, task, "cron-arg")
print("cron next_schedtime :", ctask.next_schedtime())
print("cron 32-byte taskid :", ctask.taskid())
except Exception as e:
skip(e)
except Exception as e:
skip(e)
banner("9. TIME HELPERS (cup.timeplus)")
try:
from cup import timeplus
import pytz
print("get_str_now (default) :", timeplus.get_str_now())
print("get_str_now (custom fmt):", timeplus.get_str_now("%Y/%m/%d %H:%M:%S"))
tp = timeplus.TimePlus(pytz.timezone("Asia/Shanghai"))
now_utc = datetime.datetime.utcnow()
print("naive utc now :", now_utc)
print("-> Shanghai local :", tp.utc2local(now_utc))
except Exception as e:
skip(e)
We explore CUP’s thread utilities by using a reader-writer lock and an interruptible thread. We then run delayed and queued tasks through the execution service to understand simple scheduling behavior. We also create a cron-style task and inspect its next scheduled run time without waiting for the real clock to tick.
System Resources and Storage
Copy CodeCopiedUse a different Browser
banner("10. SYSTEM RESOURCES (cup.res.linux)")
try:
from cup.res import linux as reslinux
try:
print("cpu cores :", reslinux.get_cpu_nums())
except Exception as e:
skip(e)
try:
cpu = reslinux.get_cpu_usage(intvl_in_sec=1)
print("cpu usage sample :", cpu, "| usr=", getattr(cpu, "usr", "?"))
except Exception as e:
skip(e)
try:
mem = reslinux.get_meminfo()
gb = 1024.0 ** 3
print("mem total / available : {:.2f} GB / {:.2f} GB".format(
mem.total / gb, mem.available / gb))
except Exception as e:
skip(e)
try:
print("kernel version :", reslinux.get_kernel_version())
except Exception as e:
skip(e)
try:
all_pids = reslinux.pids()
print("live process count :", len(all_pids))
except Exception as e:
skip(e)
try:
me = reslinux.Process(os.getpid())
name = me.get_process_name() if hasattr(me, "get_process_name") else "?"
status = me.get_process_status() if hasattr(me, "get_process_status") else "?"
print("this process name/status:", name, "/", status)
except Exception as e:
skip(e)
except Exception as e:
skip(e)
banner("11. FILE UTILITIES (cup.exfile)")
try:
from cup import exfile
lock_path = os.path.join(tempfile.gettempdir(), "cup_demo.lock")
flock = exfile.LockFile(lock_path)
got = flock.lock(blocking=True)
print("acquired file lock :", got)
print("lock file path :", flock.filepath())
flock.unlock()
print("released file lock : True")
except Exception as e:
skip(e)
banner("12. NETWORKING HELPERS (cup.net)")
try:
from cup import net as cupnet
try:
print("local hostname :", cupnet.get_local_hostname())
except Exception as e:
skip(e)
try:
print("host ip :", cupnet.get_hostip())
except Exception as e:
skip(e)
try:
print("local port 9999 free? :", cupnet.localport_free(9999))
except Exception as e:
skip(e)
except Exception as e:
skip(e)
banner("13. UNIFIED OBJECT STORAGE (cup.storage.obj)")
try:
from cup.storage import obj as cupobj
print("available backends :",
[n for n in dir(cupobj) if n.endswith("ObjectSystem")])
los = None
for ctor in (lambda: cupobj.LocalObjectSystem(tempfile.gettempdir()),
lambda: cupobj.LocalObjectSystem({}),
lambda: cupobj.LocalObjectSystem()):
try:
los = ctor()
break
except Exception:
continue
if los is None:
print(" LocalObjectSystem ctor varies by version — see docs for "
"your release's exact signature.")
else:
print("LocalObjectSystem ready :", type(los).__name__,
"(put/get/head/delete available)")
except Exception as e:
skip(e)
We inspect Linux system resources, including CPU cores, CPU usage, memory, kernel version, process count, and current process status. We then use CUP’s file utility module to acquire and release an advisory lock for safe single-instance execution. We also test networking helpers and probe the object storage interface to see which storage backends are available.
Type Maps and Assertions
Copy CodeCopiedUse a different Browser
banner("14. BIDIRECTIONAL TYPE MAPS (cup.flag)")
try:
from cup import flag
tman = flag.TypeMan()
tman.register_types({"LOGIN": 1, "LOGOUT": 2, "HEARTBEAT": 3, "DATA": 4})
print("LOGIN -> number :", tman.getnumber_bykey("LOGIN"))
print("number 2 -> key :", tman.getkey_bynumber(2))
print("all registered keys :", list(tman.get_key_list()))
except Exception as e:
skip(e)
banner("15. BUILT-IN TEST ASSERTIONS (cup.unittest)")
try:
from cup import unittest as cup_unittest
cup_unittest.assert_eq(2 + 2, 4)
cup_unittest.assert_ne(1, 2)
cup_unittest.assert_true(isinstance([], list))
cup_unittest.assert_gt(5, 3)
cup_unittest.assert_eq_one("b", ["a", "b", "c"])
def raises_value_error():
raise ValueError("expected")
cup_unittest.expect_raise(raises_value_error, ValueError)
print("all passing assertions : OK")
try:
cup_unittest.assert_eq(1, 2, "intentional failure demo")
except Exception as e:
print("caught intended failure :", type(e).__name__)
except Exception as e:
skip(e)
banner("DONE — you exercised 15 CUP subsystems")
print("""
What you just used, by module:
cup.log .................. structured, rotating, parseable logging
cup.decorators ........... Singleton, TraceUsedTime, platform guards
cup.util.conf ............ nested config with sections + @arrays (round-trip)
cup.cache ................ TTL KV cache (get() extends lifetime)
cup.services.generator ... host-unique names, counters, cycling 128-bit IDs
cup.services.threadpool .. pool with result/failure callbacks + live stats
cup.thread ............... interruptible CupThread + reader/writer RWLock
cup.services.executor .... delay_exec / queue_exec + crontab-style CronTask
cup.timeplus ............. now-formatting + safe tz<->UTC conversion
cup.res.linux ............ /proc cpu, memory, kernel, process introspection
cup.exfile ............... advisory single-instance file locking
cup.net .................. hostname / ip / free-port helpers
cup.storage.obj .......... one API over Local / FTP / S3 / AFS backends
cup.flag ................. bidirectional key<->number type tables
cup.unittest ............. assertion toolkit that plays well with try/except
Next steps: API reference at https://cup.iobusy.com • source at
https://github.com/baidu/CUP (browse src/cup/ and the cup_test/ suite).
""")
We use CUP’s flag utilities to create bidirectional mappings between message names and numeric codes. We then run CUP’s built-in assertion helpers to validate equality, inequality, truth checks, comparisons, membership, and expected exceptions. We finish the tutorial by summarizing every CUP subsystem we exercise and connecting each one to a practical development use case.
Conclusion
In conclusion, we completed a broad hands-on tour of CUP and understand how its utilities can support production-style Python applications. We saw how CUP helps us structure logs, manage runtime configuration, cache temporary data, generate unique identifiers, coordinate threads, inspect system resources, protect files with locks, and validate behavior through assertions. It sets us a compact but useful foundation for using CUP in scripts, backend services, automation jobs, and system-level Python projects where reliability, observability, and practical tooling matter.
Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 150k+ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us
The post CUP (Common Useful Python): Building Reliable Python Workflows with Baidu’s Utility Toolkit appeared first on MarkTechPost.
関連記事
今日のまとめ
AI日報で今日の重要ニュースをまとめ読み