Lewati ke isi

Contoh Kode

Copy-paste ready kode untuk komponen utama agent.

Semua kode di sini Python 3.12+. Adaptasi ke language lo sendiri kalo perlu.

requirements.txt

python-telegram-bot==21.0
openai>=1.50.0
python-dotenv>=1.0.0

Tambah sesuai kebutuhan:

# Untuk crypto
web3>=6.0.0
solana>=0.30.0

# Untuk GitHub
pygithub>=2.0.0

# Untuk Twitter
tweepy>=4.0.0

# Untuk image processing
pillow>=10.0.0

# Untuk async DB
asyncpg>=0.29.0

.env.example

# Telegram
TELEGRAM_BOT_TOKEN=
OWNER_TELEGRAM_ID=

# LLM (OpenAI / OpenRouter / proxy)
OPENAI_API_KEY=
OPENAI_BASE_URL=https://api.openai.com/v1
OPENAI_MODEL=gpt-4o-mini

# Agent
AGENT_NAME=Kai
USER_NAME=Gutluc

# Optional
WEB_SEARCH_API_KEY=

.gitignore

# Secrets
.env
credentials/

# Data
data/
*.log
*.bak

# Python
__pycache__/
*.pyc
venv/
.venv/

# IDE
.vscode/
.idea/
.DS_Store

main.py (full)

Lihat Tutorial Setup dari Nol untuk versi lengkap (~250 lines).

Snippet: Build system prompt

from pathlib import Path
from datetime import datetime
import json

AGENT_DIR = Path.home() / "agent"
SOUL_FILE = AGENT_DIR / "SOUL.md"
MEMORY_FILE = AGENT_DIR / "data" / "memory.json"
CREDENTIALS_DIR = AGENT_DIR / "credentials"

def load_soul() -> str:
    if SOUL_FILE.exists():
        try:
            return SOUL_FILE.read_text(encoding="utf-8").strip()
        except Exception:
            return ""
    return ""

def load_memory() -> dict:
    if MEMORY_FILE.exists():
        try:
            return json.loads(MEMORY_FILE.read_text())
        except Exception:
            pass
    return {"notes": [], "user_name": ""}

def list_credentials() -> list:
    if not CREDENTIALS_DIR.exists():
        return []
    return sorted([
        f.name for f in CREDENTIALS_DIR.iterdir() 
        if f.is_file() and f.name.endswith(".env")
    ])

def build_system_prompt() -> str:
    soul = load_soul()
    memory = load_memory()
    creds = list_credentials()
    today = datetime.now().strftime("%A, %d %B %Y %H:%M")

    memory_text = ""
    if memory.get("notes"):
        memory_text = "\n\nYang lo ingat tentang user:\n" + \
            "\n".join(f"- {n}" for n in memory["notes"][-20:])

    creds_text = ""
    if creds:
        creds_text = "\n\nCredential files available:\n" + \
            "\n".join(f"- ~/agent/credentials/{c}" for c in creds)

    if soul:
        return f"{soul}\n\nHari ini: {today}{creds_text}{memory_text}"

    return f"Hari ini: {today}{creds_text}{memory_text}"

Snippet: Tool use parser

import re, json

def extract_tool_calls(text: str) -> list[dict]:
    """Extract <tool_use>{...}</tool_use> from text."""
    pattern = re.compile(r'<tool_use>(.*?)</tool_use>', re.DOTALL)
    results = []
    for m in pattern.finditer(text):
        try:
            obj = json.loads(m.group(1).strip())
            if "name" in obj and "arguments" in obj:
                results.append(obj)
        except json.JSONDecodeError:
            pass
    return results

def strip_tool_calls(text: str) -> str:
    """Remove tool_use blocks from text (for final answer to user)."""
    return re.sub(r'<tool_use>.*?</tool_use>', '', text, flags=re.DOTALL).strip()

Snippet: Secret redaction

import re

SECRET_PATTERNS = [
    (re.compile(r'ghp_[A-Za-z0-9]{36}'), '[REDACTED_GITHUB_PAT]'),
    (re.compile(r'github_pat_[A-Za-z0-9_]{82}'), '[REDACTED_GITHUB_FPAT]'),
    (re.compile(r'gho_[A-Za-z0-9]{36}'), '[REDACTED_GITHUB_OAUTH]'),
    (re.compile(r'\b0x[a-fA-F0-9]{64}\b'), '[REDACTED_PRIVATE_KEY]'),
    (re.compile(r'AKIA[0-9A-Z]{16}'), '[REDACTED_AWS_KEY]'),
    (re.compile(r'sk-[A-Za-z0-9]{40,}'), '[REDACTED_OPENAI_KEY]'),
    (re.compile(r'xox[bp]-[0-9-A-Za-z]+'), '[REDACTED_SLACK_TOKEN]'),
    (re.compile(
        r'-----BEGIN [A-Z ]+PRIVATE KEY-----.*?-----END [A-Z ]+PRIVATE KEY-----', 
        re.DOTALL
    ), '[REDACTED_PEM]'),
]

def redact_secrets(text: str) -> str:
    if not isinstance(text, str):
        return text
    for pattern, replacement in SECRET_PATTERNS:
        text = pattern.sub(replacement, text)
    return text

Snippet: Async shell exec

import asyncio

async def run_shell(cmd: str, timeout: int = 60) -> str:
    """Execute shell command, return output (capped 3000 chars)."""
    try:
        proc = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        stdout, stderr = await asyncio.wait_for(
            proc.communicate(), 
            timeout=timeout
        )
        out = stdout.decode(errors='replace') + stderr.decode(errors='replace')
        if len(out) > 3000:
            return out[:3000] + f"\n\n... (truncated, total {len(out)} chars)"
        return out or "(no output)"
    except asyncio.TimeoutError:
        return f"Command timeout after {timeout}s"

Snippet: Agentic loop

async def ask_ai_agentic(chat_id: int, user_text: str) -> str:
    history = load_history(chat_id)

    messages = [
        {"role": "system", "content": build_system_prompt()},
        *history[-30:],
        {"role": "user", "content": user_text}
    ]

    new_messages = [{"role": "user", "content": user_text}]

    for step in range(MAX_AGENTIC_STEPS):
        if chat_cancel.get(chat_id):
            chat_cancel[chat_id] = False
            return "Dibatalkan user."

        try:
            resp = client.chat.completions.create(
                model=OPENAI_MODEL,
                messages=messages,
                temperature=0.7,
            )
            answer = resp.choices[0].message.content
        except Exception as e:
            return f"LLM error: {e}"

        calls = extract_tool_calls(answer)

        if not calls:
            final = strip_tool_calls(answer)
            new_messages.append({"role": "assistant", "content": answer})
            save_history(chat_id, (history + new_messages)[-MAX_HISTORY:])
            return final

        new_messages.append({"role": "assistant", "content": answer})
        messages.append({"role": "assistant", "content": answer})

        for call in calls:
            cmd = call["arguments"].get("command", "")
            risk = call["arguments"].get("risk", "high")

            if risk == "high":
                pending_commands[chat_id] = cmd
                return (
                    f"Mau jalanin command high-risk:\n{cmd}\n\n"
                    f"Balas 'ya' untuk lanjut, atau apapun untuk batal."
                )

            result = await run_shell(cmd)
            tool_msg = (
                f"Tool execution selesai (step {step+1}/{MAX_AGENTIC_STEPS}). "
                f"Hasil:\n{result}"
            )
            messages.append({"role": "user", "content": tool_msg})
            new_messages.append({"role": "user", "content": tool_msg})

    save_history(chat_id, (history + new_messages)[-MAX_HISTORY:])
    return "Max step tercapai. Belum kelar."

Snippet: Telegram handler

from telegram import Update
from telegram.ext import (
    Application, CommandHandler, MessageHandler, filters, ContextTypes
)

async def message_handler(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
    if not is_owner(update.effective_user.id):
        await update.message.reply_text("Gue cuma respond ke owner.")
        return

    chat_id = update.effective_chat.id
    text = update.message.text

    # Konfirmasi pending command
    if chat_id in pending_commands:
        if text.lower().strip() in ("ya", "y", "yes", "ok", "go"):
            cmd = pending_commands.pop(chat_id)
            await ctx.bot.send_chat_action(chat_id, "typing")
            result = await run_shell(cmd)
            await update.message.reply_text(f"Done.\n\n{result[:3000]}")
        else:
            pending_commands.pop(chat_id)
            await update.message.reply_text("Dibatalin.")
        return

    # Lock per chat
    lock = chat_locks.setdefault(chat_id, asyncio.Lock())
    async with lock:
        chat_busy[chat_id] = {
            "desc": text[:40], 
            "start_time": asyncio.get_event_loop().time()
        }
        try:
            await ctx.bot.send_chat_action(chat_id, "typing")
            response = await ask_ai_agentic(chat_id, text)

            # Split kalo response > 4000 chars (Telegram limit)
            for chunk in [response[i:i+4000] for i in range(0, len(response), 4000)]:
                await update.message.reply_text(chunk)
        except Exception as e:
            log.exception("Handler error")
            await update.message.reply_text(f"Error: {e}")
        finally:
            chat_busy.pop(chat_id, None)

Snippet: Status query fast path

STATUS_KEYWORDS = ["gimana", "udah", "udh", "udh blm", "status", "bisa", 
                   "sampai mana", "kelar belum", "selesai?"]

def is_status_query(text: str) -> bool:
    text_lower = text.lower().strip()
    if len(text_lower) > 30:
        return False
    return any(kw in text_lower for kw in STATUS_KEYWORDS)

async def message_handler(update, ctx):
    chat_id = update.effective_chat.id
    text = update.message.text

    # Fast path: kalo busy + user nanya status
    if chat_busy.get(chat_id) and is_status_query(text):
        busy = chat_busy[chat_id]
        elapsed = int(asyncio.get_event_loop().time() - busy["start_time"])
        await update.message.reply_text(
            f"Lagi proses: {busy['desc']} ({elapsed}s). "
            f"Tunggu beres, atau /cancel buat batalin."
        )
        return  # ga lock, ga queue

    # ... normal flow

Snippet: Credential loader

def load_credential_file(platform: str) -> dict:
    """Load credentials/<platform>.env into dict."""
    path = CREDENTIALS_DIR / f"{platform}.env"
    if not path.exists():
        return {}

    creds = {}
    for line in path.read_text().splitlines():
        line = line.strip()
        if not line or line.startswith("#"):
            continue
        if "=" in line:
            k, v = line.split("=", 1)
            creds[k.strip()] = v.strip()
    return creds

def has_credential(platform: str) -> bool:
    """Check if credential file exists (without loading)."""
    return (CREDENTIALS_DIR / f"{platform}.env").exists()

Snippet: Run command with credentials

import os
import subprocess

def run_with_creds(cmd: str, platform: str, timeout: int = 60) -> str:
    """Run shell command with credential env vars injected."""
    env = os.environ.copy()
    env.update(load_credential_file(platform))

    try:
        result = subprocess.run(
            cmd, shell=True, env=env,
            capture_output=True, text=True, timeout=timeout
        )
        return result.stdout + result.stderr
    except subprocess.TimeoutExpired:
        return f"Timeout after {timeout}s"
    except Exception as e:
        return f"Error: {e}"

Snippet: Wallet integration

from web3 import Web3

def get_wallet():
    creds = load_credential_file("wallet")
    private_key = creds.get("EVM_PRIVATE_KEY")
    if not private_key:
        raise ValueError("EVM_PRIVATE_KEY not found in wallet.env")
    return private_key

def check_balance_eth(address: str, rpc_url: str = "https://eth.llamarpc.com") -> str:
    w3 = Web3(Web3.HTTPProvider(rpc_url))
    balance_wei = w3.eth.get_balance(address)
    balance_eth = w3.from_wei(balance_wei, 'ether')
    return f"{balance_eth} ETH"

# Transfer (HIGH RISK - butuh konfirmasi user)
def transfer_eth(to_address: str, amount_eth: float, rpc_url: str) -> str:
    pk = get_wallet()
    w3 = Web3(Web3.HTTPProvider(rpc_url))
    account = w3.eth.account.from_key(pk)

    tx = {
        "nonce": w3.eth.get_transaction_count(account.address),
        "to": to_address,
        "value": w3.to_wei(amount_eth, 'ether'),
        "gas": 21000,
        "gasPrice": w3.eth.gas_price,
        "chainId": w3.eth.chain_id,
    }

    signed = account.sign_transaction(tx)
    tx_hash = w3.eth.send_raw_transaction(signed.rawTransaction)
    return tx_hash.hex()

Snippet: GitHub integration

import subprocess

def github_command(cmd: str) -> str:
    """Run gh CLI command with GitHub credentials."""
    creds = load_credential_file("github")
    env = os.environ.copy()
    env["GH_TOKEN"] = creds.get("GH_TOKEN", "")

    result = subprocess.run(
        f"gh {cmd}", shell=True, env=env,
        capture_output=True, text=True
    )
    return result.stdout + result.stderr

# Usage:
output = github_command("repo list --limit 10")

Snippet: Twitter posting

import requests

def post_tweet(text: str) -> str:
    creds = load_credential_file("twitter")

    headers = {
        "Authorization": f"Bearer {creds['TWITTER_BEARER_TOKEN']}",
        "Content-Type": "application/json"
    }

    response = requests.post(
        "https://api.twitter.com/2/tweets",
        json={"text": text},
        headers=headers
    )

    if response.status_code == 201:
        data = response.json()
        tweet_id = data["data"]["id"]
        return f"Posted: https://twitter.com/i/status/{tweet_id}"
    else:
        return f"Failed: {response.status_code} {response.text}"

Snippet: Memory management commands

async def remember_cmd(update, ctx):
    if not is_owner(update.effective_user.id):
        return
    text = " ".join(ctx.args)
    if not text:
        await update.message.reply_text("Format: /remember <text>")
        return
    if is_suspicious_note(text):
        await update.message.reply_text(
            "Note ini keliatan kayak prompt injection. Ga gue save."
        )
        return
    add_memory_note(text)
    await update.message.reply_text(f"OK, gue inget: {text}")

async def memory_cmd(update, ctx):
    if not is_owner(update.effective_user.id):
        return
    m = load_memory()
    if not m.get("notes"):
        await update.message.reply_text("Memory kosong.")
        return
    text = "Memory notes:\n" + "\n".join(
        f"{i+1}. {n}" for i, n in enumerate(m["notes"])
    )
    await update.message.reply_text(text[:4000])

async def forget_cmd(update, ctx):
    if not is_owner(update.effective_user.id):
        return
    substring = " ".join(ctx.args)
    if not substring:
        await update.message.reply_text("Format: /forget <substring>")
        return
    m = load_memory()
    before = len(m.get("notes", []))
    m["notes"] = [
        n for n in m.get("notes", [])
        if substring.lower() not in n.lower()
    ]
    save_memory(m)
    removed = before - len(m["notes"])
    await update.message.reply_text(f"Removed {removed} note(s).")

def is_suspicious_note(text: str) -> bool:
    suspicious = [
        r'absolute mode', r'eliminate emojis', r'you are now',
        r'ignore previous', r'reply in the language', r'as an AI'
    ]
    text_lower = text.lower()
    for p in suspicious:
        if re.search(p, text_lower):
            return True
    return len(text) > 500

Snippet: SOUL.md reload command

async def soul_cmd(update, ctx):
    """Show SOUL.md preview + credential status."""
    if not is_owner(update.effective_user.id):
        return

    soul = load_soul()
    creds = list_credentials()

    if not soul:
        await update.message.reply_text(
            "SOUL.md ga ada. Buat file di ~/agent/SOUL.md"
        )
        return

    preview = soul[:1500] + ("\n..." if len(soul) > 1500 else "")
    creds_text = ""
    if creds:
        creds_text = "\n\nCredentials available:\n" + \
            "\n".join(f"- {c}" for c in creds)

    await update.message.reply_text(
        f"SOUL.md ({len(soul)} chars):\n\n{preview}{creds_text}"
    )

async def creds_cmd(update, ctx):
    """List credential files (names only)."""
    if not is_owner(update.effective_user.id):
        return

    creds = list_credentials()
    if not creds:
        await update.message.reply_text(
            "Belum ada credential. Siapin di ~/agent/credentials/"
        )
        return

    text = "Credential files:\n" + "\n".join(f"- {c}" for c in creds)
    await update.message.reply_text(text)

Snippet: Reset history

async def reset_cmd(update, ctx):
    """Archive + reset history for current chat."""
    if not is_owner(update.effective_user.id):
        return
    chat_id = update.effective_chat.id
    f = HISTORY_DIR / f"{chat_id}.json"

    if f.exists():
        archive_dir = HISTORY_DIR / "archive"
        archive_dir.mkdir(exist_ok=True)
        ts = datetime.now().strftime('%Y%m%d-%H%M%S')
        (archive_dir / f"{chat_id}-{ts}.json").write_text(f.read_text())
        f.write_text("[]")

    await update.message.reply_text("Chat history di-reset. Mulai dari nol.")

Snippet: Cancel mid-loop

chat_cancel = {}  # chat_id -> bool

async def cancel_cmd(update, ctx):
    if not is_owner(update.effective_user.id):
        return
    chat_id = update.effective_chat.id
    chat_cancel[chat_id] = True
    await update.message.reply_text(
        "Cancel flag set. Loop bakal abort di iterasi berikut."
    )

# Di agentic loop, check tiap iter:
async def ask_ai_agentic(chat_id, user_text):
    for step in range(MAX_AGENTIC_STEPS):
        if chat_cancel.get(chat_id):
            chat_cancel[chat_id] = False
            return "Dibatalkan user."
        # ... rest

Snippet: Audit log

from datetime import datetime

AUDIT_LOG = AGENT_DIR / "data" / "audit.log"

def audit_log(chat_id: int, action: str, risk: str, command: str, result_preview: str = ""):
    """Log all medium/high risk actions."""
    if risk not in ("medium", "high"):
        return  # skip low

    AUDIT_LOG.parent.mkdir(parents=True, exist_ok=True)
    with open(AUDIT_LOG, "a") as f:
        ts = datetime.now().isoformat()
        f.write(
            f"{ts} | chat:{chat_id} | risk:{risk} | "
            f"cmd:{command[:200]} | result:{result_preview[:100]}\n"
        )

Full main.py structure

main.py
├── Imports
├── Config (env vars, paths)
├── State (pending_commands, chat_busy, chat_locks)
├── Helpers
│   ├── redact_secrets
│   ├── load_memory / save_memory / add_memory_note
│   ├── load_soul
│   ├── list_credentials
│   ├── build_system_prompt
│   ├── load_history / save_history
│   ├── extract_tool_calls / strip_tool_calls
│   └── run_shell
├── Agentic loop
│   └── ask_ai_agentic
├── Telegram handlers
│   ├── start_cmd
│   ├── soul_cmd
│   ├── memory_cmd / remember_cmd / forget_cmd
│   ├── reset_cmd
│   ├── cancel_cmd
│   └── message_handler
└── main()
    ├── Application.builder
    ├── add_handler for each command
    ├── add_handler for messages
    └── run_polling

Full implementation di Tutorial Setup dari Nol.