Contoh Kode¶
Copy-paste ready kode untuk komponen utama agent.
Semua kode di sini Python 3.12+. Adaptasi ke language lo sendiri kalo perlu.
requirements.txt¶
Tambah sesuai kebutuhan:
# Untuk crypto
web3>=6.0.0
solana>=0.30.0
# Untuk GitHub
pygithub>=2.0.0
# Untuk Twitter
tweepy>=4.0.0
# Untuk image processing
pillow>=10.0.0
# Untuk async DB
asyncpg>=0.29.0
.env.example¶
# Telegram
TELEGRAM_BOT_TOKEN=
OWNER_TELEGRAM_ID=
# LLM (OpenAI / OpenRouter / proxy)
OPENAI_API_KEY=
OPENAI_BASE_URL=https://api.openai.com/v1
OPENAI_MODEL=gpt-4o-mini
# Agent
AGENT_NAME=Kai
USER_NAME=Gutluc
# Optional
WEB_SEARCH_API_KEY=
.gitignore¶
# Secrets
.env
credentials/
# Data
data/
*.log
*.bak
# Python
__pycache__/
*.pyc
venv/
.venv/
# IDE
.vscode/
.idea/
.DS_Store
main.py (full)¶
Lihat Tutorial Setup dari Nol untuk versi lengkap (~250 lines).
Snippet: Build system prompt¶
from pathlib import Path
from datetime import datetime
import json
AGENT_DIR = Path.home() / "agent"
SOUL_FILE = AGENT_DIR / "SOUL.md"
MEMORY_FILE = AGENT_DIR / "data" / "memory.json"
CREDENTIALS_DIR = AGENT_DIR / "credentials"
def load_soul() -> str:
if SOUL_FILE.exists():
try:
return SOUL_FILE.read_text(encoding="utf-8").strip()
except Exception:
return ""
return ""
def load_memory() -> dict:
if MEMORY_FILE.exists():
try:
return json.loads(MEMORY_FILE.read_text())
except Exception:
pass
return {"notes": [], "user_name": ""}
def list_credentials() -> list:
if not CREDENTIALS_DIR.exists():
return []
return sorted([
f.name for f in CREDENTIALS_DIR.iterdir()
if f.is_file() and f.name.endswith(".env")
])
def build_system_prompt() -> str:
soul = load_soul()
memory = load_memory()
creds = list_credentials()
today = datetime.now().strftime("%A, %d %B %Y %H:%M")
memory_text = ""
if memory.get("notes"):
memory_text = "\n\nYang lo ingat tentang user:\n" + \
"\n".join(f"- {n}" for n in memory["notes"][-20:])
creds_text = ""
if creds:
creds_text = "\n\nCredential files available:\n" + \
"\n".join(f"- ~/agent/credentials/{c}" for c in creds)
if soul:
return f"{soul}\n\nHari ini: {today}{creds_text}{memory_text}"
return f"Hari ini: {today}{creds_text}{memory_text}"
Snippet: Tool use parser¶
import re, json
def extract_tool_calls(text: str) -> list[dict]:
"""Extract <tool_use>{...}</tool_use> from text."""
pattern = re.compile(r'<tool_use>(.*?)</tool_use>', re.DOTALL)
results = []
for m in pattern.finditer(text):
try:
obj = json.loads(m.group(1).strip())
if "name" in obj and "arguments" in obj:
results.append(obj)
except json.JSONDecodeError:
pass
return results
def strip_tool_calls(text: str) -> str:
"""Remove tool_use blocks from text (for final answer to user)."""
return re.sub(r'<tool_use>.*?</tool_use>', '', text, flags=re.DOTALL).strip()
Snippet: Secret redaction¶
import re
SECRET_PATTERNS = [
(re.compile(r'ghp_[A-Za-z0-9]{36}'), '[REDACTED_GITHUB_PAT]'),
(re.compile(r'github_pat_[A-Za-z0-9_]{82}'), '[REDACTED_GITHUB_FPAT]'),
(re.compile(r'gho_[A-Za-z0-9]{36}'), '[REDACTED_GITHUB_OAUTH]'),
(re.compile(r'\b0x[a-fA-F0-9]{64}\b'), '[REDACTED_PRIVATE_KEY]'),
(re.compile(r'AKIA[0-9A-Z]{16}'), '[REDACTED_AWS_KEY]'),
(re.compile(r'sk-[A-Za-z0-9]{40,}'), '[REDACTED_OPENAI_KEY]'),
(re.compile(r'xox[bp]-[0-9-A-Za-z]+'), '[REDACTED_SLACK_TOKEN]'),
(re.compile(
r'-----BEGIN [A-Z ]+PRIVATE KEY-----.*?-----END [A-Z ]+PRIVATE KEY-----',
re.DOTALL
), '[REDACTED_PEM]'),
]
def redact_secrets(text: str) -> str:
if not isinstance(text, str):
return text
for pattern, replacement in SECRET_PATTERNS:
text = pattern.sub(replacement, text)
return text
Snippet: Async shell exec¶
import asyncio
async def run_shell(cmd: str, timeout: int = 60) -> str:
"""Execute shell command, return output (capped 3000 chars)."""
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout
)
out = stdout.decode(errors='replace') + stderr.decode(errors='replace')
if len(out) > 3000:
return out[:3000] + f"\n\n... (truncated, total {len(out)} chars)"
return out or "(no output)"
except asyncio.TimeoutError:
return f"Command timeout after {timeout}s"
Snippet: Agentic loop¶
async def ask_ai_agentic(chat_id: int, user_text: str) -> str:
history = load_history(chat_id)
messages = [
{"role": "system", "content": build_system_prompt()},
*history[-30:],
{"role": "user", "content": user_text}
]
new_messages = [{"role": "user", "content": user_text}]
for step in range(MAX_AGENTIC_STEPS):
if chat_cancel.get(chat_id):
chat_cancel[chat_id] = False
return "Dibatalkan user."
try:
resp = client.chat.completions.create(
model=OPENAI_MODEL,
messages=messages,
temperature=0.7,
)
answer = resp.choices[0].message.content
except Exception as e:
return f"LLM error: {e}"
calls = extract_tool_calls(answer)
if not calls:
final = strip_tool_calls(answer)
new_messages.append({"role": "assistant", "content": answer})
save_history(chat_id, (history + new_messages)[-MAX_HISTORY:])
return final
new_messages.append({"role": "assistant", "content": answer})
messages.append({"role": "assistant", "content": answer})
for call in calls:
cmd = call["arguments"].get("command", "")
risk = call["arguments"].get("risk", "high")
if risk == "high":
pending_commands[chat_id] = cmd
return (
f"Mau jalanin command high-risk:\n{cmd}\n\n"
f"Balas 'ya' untuk lanjut, atau apapun untuk batal."
)
result = await run_shell(cmd)
tool_msg = (
f"Tool execution selesai (step {step+1}/{MAX_AGENTIC_STEPS}). "
f"Hasil:\n{result}"
)
messages.append({"role": "user", "content": tool_msg})
new_messages.append({"role": "user", "content": tool_msg})
save_history(chat_id, (history + new_messages)[-MAX_HISTORY:])
return "Max step tercapai. Belum kelar."
Snippet: Telegram handler¶
from telegram import Update
from telegram.ext import (
Application, CommandHandler, MessageHandler, filters, ContextTypes
)
async def message_handler(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_owner(update.effective_user.id):
await update.message.reply_text("Gue cuma respond ke owner.")
return
chat_id = update.effective_chat.id
text = update.message.text
# Konfirmasi pending command
if chat_id in pending_commands:
if text.lower().strip() in ("ya", "y", "yes", "ok", "go"):
cmd = pending_commands.pop(chat_id)
await ctx.bot.send_chat_action(chat_id, "typing")
result = await run_shell(cmd)
await update.message.reply_text(f"Done.\n\n{result[:3000]}")
else:
pending_commands.pop(chat_id)
await update.message.reply_text("Dibatalin.")
return
# Lock per chat
lock = chat_locks.setdefault(chat_id, asyncio.Lock())
async with lock:
chat_busy[chat_id] = {
"desc": text[:40],
"start_time": asyncio.get_event_loop().time()
}
try:
await ctx.bot.send_chat_action(chat_id, "typing")
response = await ask_ai_agentic(chat_id, text)
# Split kalo response > 4000 chars (Telegram limit)
for chunk in [response[i:i+4000] for i in range(0, len(response), 4000)]:
await update.message.reply_text(chunk)
except Exception as e:
log.exception("Handler error")
await update.message.reply_text(f"Error: {e}")
finally:
chat_busy.pop(chat_id, None)
Snippet: Status query fast path¶
STATUS_KEYWORDS = ["gimana", "udah", "udh", "udh blm", "status", "bisa",
"sampai mana", "kelar belum", "selesai?"]
def is_status_query(text: str) -> bool:
text_lower = text.lower().strip()
if len(text_lower) > 30:
return False
return any(kw in text_lower for kw in STATUS_KEYWORDS)
async def message_handler(update, ctx):
chat_id = update.effective_chat.id
text = update.message.text
# Fast path: kalo busy + user nanya status
if chat_busy.get(chat_id) and is_status_query(text):
busy = chat_busy[chat_id]
elapsed = int(asyncio.get_event_loop().time() - busy["start_time"])
await update.message.reply_text(
f"Lagi proses: {busy['desc']} ({elapsed}s). "
f"Tunggu beres, atau /cancel buat batalin."
)
return # ga lock, ga queue
# ... normal flow
Snippet: Credential loader¶
def load_credential_file(platform: str) -> dict:
"""Load credentials/<platform>.env into dict."""
path = CREDENTIALS_DIR / f"{platform}.env"
if not path.exists():
return {}
creds = {}
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, v = line.split("=", 1)
creds[k.strip()] = v.strip()
return creds
def has_credential(platform: str) -> bool:
"""Check if credential file exists (without loading)."""
return (CREDENTIALS_DIR / f"{platform}.env").exists()
Snippet: Run command with credentials¶
import os
import subprocess
def run_with_creds(cmd: str, platform: str, timeout: int = 60) -> str:
"""Run shell command with credential env vars injected."""
env = os.environ.copy()
env.update(load_credential_file(platform))
try:
result = subprocess.run(
cmd, shell=True, env=env,
capture_output=True, text=True, timeout=timeout
)
return result.stdout + result.stderr
except subprocess.TimeoutExpired:
return f"Timeout after {timeout}s"
except Exception as e:
return f"Error: {e}"
Snippet: Wallet integration¶
from web3 import Web3
def get_wallet():
creds = load_credential_file("wallet")
private_key = creds.get("EVM_PRIVATE_KEY")
if not private_key:
raise ValueError("EVM_PRIVATE_KEY not found in wallet.env")
return private_key
def check_balance_eth(address: str, rpc_url: str = "https://eth.llamarpc.com") -> str:
w3 = Web3(Web3.HTTPProvider(rpc_url))
balance_wei = w3.eth.get_balance(address)
balance_eth = w3.from_wei(balance_wei, 'ether')
return f"{balance_eth} ETH"
# Transfer (HIGH RISK - butuh konfirmasi user)
def transfer_eth(to_address: str, amount_eth: float, rpc_url: str) -> str:
pk = get_wallet()
w3 = Web3(Web3.HTTPProvider(rpc_url))
account = w3.eth.account.from_key(pk)
tx = {
"nonce": w3.eth.get_transaction_count(account.address),
"to": to_address,
"value": w3.to_wei(amount_eth, 'ether'),
"gas": 21000,
"gasPrice": w3.eth.gas_price,
"chainId": w3.eth.chain_id,
}
signed = account.sign_transaction(tx)
tx_hash = w3.eth.send_raw_transaction(signed.rawTransaction)
return tx_hash.hex()
Snippet: GitHub integration¶
import subprocess
def github_command(cmd: str) -> str:
"""Run gh CLI command with GitHub credentials."""
creds = load_credential_file("github")
env = os.environ.copy()
env["GH_TOKEN"] = creds.get("GH_TOKEN", "")
result = subprocess.run(
f"gh {cmd}", shell=True, env=env,
capture_output=True, text=True
)
return result.stdout + result.stderr
# Usage:
output = github_command("repo list --limit 10")
Snippet: Twitter posting¶
import requests
def post_tweet(text: str) -> str:
creds = load_credential_file("twitter")
headers = {
"Authorization": f"Bearer {creds['TWITTER_BEARER_TOKEN']}",
"Content-Type": "application/json"
}
response = requests.post(
"https://api.twitter.com/2/tweets",
json={"text": text},
headers=headers
)
if response.status_code == 201:
data = response.json()
tweet_id = data["data"]["id"]
return f"Posted: https://twitter.com/i/status/{tweet_id}"
else:
return f"Failed: {response.status_code} {response.text}"
Snippet: Memory management commands¶
async def remember_cmd(update, ctx):
if not is_owner(update.effective_user.id):
return
text = " ".join(ctx.args)
if not text:
await update.message.reply_text("Format: /remember <text>")
return
if is_suspicious_note(text):
await update.message.reply_text(
"Note ini keliatan kayak prompt injection. Ga gue save."
)
return
add_memory_note(text)
await update.message.reply_text(f"OK, gue inget: {text}")
async def memory_cmd(update, ctx):
if not is_owner(update.effective_user.id):
return
m = load_memory()
if not m.get("notes"):
await update.message.reply_text("Memory kosong.")
return
text = "Memory notes:\n" + "\n".join(
f"{i+1}. {n}" for i, n in enumerate(m["notes"])
)
await update.message.reply_text(text[:4000])
async def forget_cmd(update, ctx):
if not is_owner(update.effective_user.id):
return
substring = " ".join(ctx.args)
if not substring:
await update.message.reply_text("Format: /forget <substring>")
return
m = load_memory()
before = len(m.get("notes", []))
m["notes"] = [
n for n in m.get("notes", [])
if substring.lower() not in n.lower()
]
save_memory(m)
removed = before - len(m["notes"])
await update.message.reply_text(f"Removed {removed} note(s).")
def is_suspicious_note(text: str) -> bool:
suspicious = [
r'absolute mode', r'eliminate emojis', r'you are now',
r'ignore previous', r'reply in the language', r'as an AI'
]
text_lower = text.lower()
for p in suspicious:
if re.search(p, text_lower):
return True
return len(text) > 500
Snippet: SOUL.md reload command¶
async def soul_cmd(update, ctx):
"""Show SOUL.md preview + credential status."""
if not is_owner(update.effective_user.id):
return
soul = load_soul()
creds = list_credentials()
if not soul:
await update.message.reply_text(
"SOUL.md ga ada. Buat file di ~/agent/SOUL.md"
)
return
preview = soul[:1500] + ("\n..." if len(soul) > 1500 else "")
creds_text = ""
if creds:
creds_text = "\n\nCredentials available:\n" + \
"\n".join(f"- {c}" for c in creds)
await update.message.reply_text(
f"SOUL.md ({len(soul)} chars):\n\n{preview}{creds_text}"
)
async def creds_cmd(update, ctx):
"""List credential files (names only)."""
if not is_owner(update.effective_user.id):
return
creds = list_credentials()
if not creds:
await update.message.reply_text(
"Belum ada credential. Siapin di ~/agent/credentials/"
)
return
text = "Credential files:\n" + "\n".join(f"- {c}" for c in creds)
await update.message.reply_text(text)
Snippet: Reset history¶
async def reset_cmd(update, ctx):
"""Archive + reset history for current chat."""
if not is_owner(update.effective_user.id):
return
chat_id = update.effective_chat.id
f = HISTORY_DIR / f"{chat_id}.json"
if f.exists():
archive_dir = HISTORY_DIR / "archive"
archive_dir.mkdir(exist_ok=True)
ts = datetime.now().strftime('%Y%m%d-%H%M%S')
(archive_dir / f"{chat_id}-{ts}.json").write_text(f.read_text())
f.write_text("[]")
await update.message.reply_text("Chat history di-reset. Mulai dari nol.")
Snippet: Cancel mid-loop¶
chat_cancel = {} # chat_id -> bool
async def cancel_cmd(update, ctx):
if not is_owner(update.effective_user.id):
return
chat_id = update.effective_chat.id
chat_cancel[chat_id] = True
await update.message.reply_text(
"Cancel flag set. Loop bakal abort di iterasi berikut."
)
# Di agentic loop, check tiap iter:
async def ask_ai_agentic(chat_id, user_text):
for step in range(MAX_AGENTIC_STEPS):
if chat_cancel.get(chat_id):
chat_cancel[chat_id] = False
return "Dibatalkan user."
# ... rest
Snippet: Audit log¶
from datetime import datetime
AUDIT_LOG = AGENT_DIR / "data" / "audit.log"
def audit_log(chat_id: int, action: str, risk: str, command: str, result_preview: str = ""):
"""Log all medium/high risk actions."""
if risk not in ("medium", "high"):
return # skip low
AUDIT_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(AUDIT_LOG, "a") as f:
ts = datetime.now().isoformat()
f.write(
f"{ts} | chat:{chat_id} | risk:{risk} | "
f"cmd:{command[:200]} | result:{result_preview[:100]}\n"
)
Full main.py structure¶
main.py
├── Imports
├── Config (env vars, paths)
├── State (pending_commands, chat_busy, chat_locks)
├── Helpers
│ ├── redact_secrets
│ ├── load_memory / save_memory / add_memory_note
│ ├── load_soul
│ ├── list_credentials
│ ├── build_system_prompt
│ ├── load_history / save_history
│ ├── extract_tool_calls / strip_tool_calls
│ └── run_shell
├── Agentic loop
│ └── ask_ai_agentic
├── Telegram handlers
│ ├── start_cmd
│ ├── soul_cmd
│ ├── memory_cmd / remember_cmd / forget_cmd
│ ├── reset_cmd
│ ├── cancel_cmd
│ └── message_handler
└── main()
├── Application.builder
├── add_handler for each command
├── add_handler for messages
└── run_polling
Full implementation di Tutorial Setup dari Nol.