Arcjet is the runtime security platform that ships with your AI code. Stop bots and automated attacks from burning your AI budget, leaking data, or misusing tools with Arcjet's AI security building blocks.
This is the Python SDK for Arcjet.
- Get your API key — sign up at
app.arcjet.com. - Install the SDK:
pip install arcjet
# or with uv
uv add arcjet- Set your environment variable:
# .env or .env.local
ARCJET_KEY=ajkey_yourkey- Protect a route — see the AI protection example or individual feature examples below.
Join our Discord server or reach out for support.
- Documentation — full reference and guides
- Examples — FastAPI and Flask example apps, including LangChain integration
- Blueprints — recipes for common security patterns
Note: Examples below use FastAPI (async). For Flask and other sync frameworks, use
arcjet_syncinstead ofarcjet. The API is identical — see Async vs. sync client.
Protect an AI chat endpoint with prompt injection detection, token budget rate limiting, and bot protection:
# main.py
import os
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from arcjet import (
arcjet, # async client — use arcjet_sync for Flask and other sync frameworks
detect_bot,
detect_prompt_injection,
detect_sensitive_info,
shield,
token_bucket,
Mode,
SensitiveInfoEntityType,
)
app = FastAPI()
arcjet_key = os.getenv("ARCJET_KEY")
if not arcjet_key:
raise RuntimeError(
"ARCJET_KEY is required. Get one at https://app.arcjet.com"
)
# Create a single Arcjet instance and reuse it across requests.
# Use arcjet_sync instead if you are using Flask or another sync framework.
aj = arcjet(
key=arcjet_key,
rules=[
# Detect and block prompt injection attacks in user messages
detect_prompt_injection(mode=Mode.LIVE),
# Block sensitive data (e.g. credit cards, PII) from reaching your LLM
detect_sensitive_info(
mode=Mode.LIVE,
deny=[
SensitiveInfoEntityType.CREDIT_CARD_NUMBER,
SensitiveInfoEntityType.EMAIL,
SensitiveInfoEntityType.PHONE_NUMBER,
],
),
# Rate limit by token budget — refill 100 tokens every 60 seconds
token_bucket(
characteristics=["userId"],
mode=Mode.LIVE,
refill_rate=100,
interval=60,
capacity=1000,
),
# Block automated clients and scrapers from your AI endpoints
detect_bot(
mode=Mode.LIVE,
allow=[], # empty = block all bots
),
# Protect against common web attacks (SQLi, XSS, etc.)
shield(mode=Mode.LIVE),
],
)
class ChatRequest(BaseModel):
message: str
@app.post("/chat")
async def chat(request: Request, body: ChatRequest):
userId = "user_123" # replace with real user ID from session
decision = await aj.protect(
request,
requested=5, # tokens consumed per request
characteristics={"userId": userId},
detect_prompt_injection_message=body.message, # scan for prompt injection
sensitive_info_value=body.message, # scan for PII
)
if decision.is_denied():
status = 429 if decision.reason_v2.type == "RATE_LIMIT" else 403
return JSONResponse({"error": "Denied"}, status_code=status)
# Safe to pass body.message to your LLM
return {"reply": "..."}- 🔒 Prompt Injection Detection — detect and block prompt injection attacks before they reach your LLM.
- 🤖 Bot Protection — stop scrapers, credential stuffers, and AI crawlers from abusing your endpoints.
- 🛑 Rate Limiting — token bucket, fixed window, and sliding window algorithms; model AI token budgets per user.
- 🕵️ Sensitive Information Detection — block PII, credit cards, and custom patterns from entering your AI pipeline.
- 🛡️ Shield WAF — protect against SQL injection, XSS, and other common web attacks.
- 📧 Email Validation — block disposable, invalid, and undeliverable addresses at signup.
- 📝 Signup Form Protection — combines bot protection, email validation, and rate limiting to protect your signup forms.
- 🎯 Request Filters — expression-based rules on IP, path, headers, and custom fields.
- 🌐 IP Analysis — geolocation, ASN, VPN, proxy, Tor, and hosting detection included with every request.
| If your app has... | Recommended features |
|---|---|
| LLM / AI chat endpoints | Prompt injection + sensitive info + token bucket rate limit + bot protection + shield |
| Public API | Rate limiting + bot protection + shield |
| Signup / login forms | Email validation + bot protection + rate limiting (or signup protection) |
| Internal / admin routes | Shield + request filters (country, VPN/proxy blocking) |
| Any web application | Shield + bot protection (good baseline for all apps) |
All features can be combined in a single Arcjet instance. Rules are evaluated
together — if any rule denies the request, decision.is_denied() returns
True. Use Mode.DRY_RUN on individual rules to test them before enforcing.
# With a uv project
uv add arcjet
# With an existing pip managed project
uv pip install arcjetOr with pip:
pip install arcjetDetect and block prompt injection attacks — attempts by users to hijack your LLM's behavior through crafted input — before they reach your model.
from arcjet import arcjet, detect_prompt_injection, Mode
aj = arcjet(
key=arcjet_key,
rules=[
detect_prompt_injection(mode=Mode.LIVE),
],
)
@app.post("/chat")
async def chat(request: Request, body: ChatRequest):
decision = await aj.protect(
request,
detect_prompt_injection_message=body.message,
)
if decision.is_denied():
return JSONResponse({"error": "Prompt injection detected"}, status_code=403)
# safe to pass body.message to your LLMfrom arcjet import arcjet_sync, detect_prompt_injection, Mode
aj = arcjet_sync(
key=arcjet_key,
rules=[
detect_prompt_injection(mode=Mode.LIVE),
],
)
@app.route("/chat", methods=["POST"])
def chat():
body = request.get_json()
decision = aj.protect(request, detect_prompt_injection_message=body["message"])
if decision.is_denied():
return jsonify(error="Prompt injection detected"), 403
# safe to pass body["message"] to your LLMYou can tune the detection sensitivity with the threshold parameter (0.0–1.0,
default 0.5). Higher values require stronger signals to trigger a denial,
reducing false positives but potentially missing subtle attacks:
detect_prompt_injection(mode=Mode.LIVE, threshold=0.8)See the Prompt Injection docs for more details.
Manage traffic from automated clients. Block scrapers, credential stuffers, and AI crawlers, while allowing legitimate bots like search engines and monitors.
from arcjet import arcjet, detect_bot, Mode, BotCategory
aj = arcjet(
key=arcjet_key,
rules=[
detect_bot(
mode=Mode.LIVE,
allow=[
BotCategory.SEARCH_ENGINE, # Google, Bing, etc.
# BotCategory.MONITOR, # Uptime monitoring
# BotCategory.PREVIEW, # Link previews (Slack, Discord)
# "OPENAI_CRAWLER_SEARCH", # Allow OpenAI crawler
],
),
],
)
@app.get("/")
async def index(request: Request):
decision = await aj.protect(request)
if decision.is_denied():
return JSONResponse({"error": "Bot detected"}, status_code=403)
return {"message": "Hello world"}from arcjet import arcjet_sync, detect_bot, is_spoofed_bot, Mode, BotCategory
aj = arcjet_sync(
key=arcjet_key,
rules=[
detect_bot(mode=Mode.LIVE, allow=[BotCategory.SEARCH_ENGINE]),
],
)
@app.route("/")
def index():
decision = aj.protect(request)
if decision.is_denied():
return jsonify(error="Bot detected"), 403
if any(is_spoofed_bot(r) for r in decision.results):
return jsonify(error="Spoofed bot"), 403
return jsonify(message="Hello world")Configure rules using categories or specific bot identifiers:
detect_bot(
mode=Mode.LIVE,
allow=[
BotCategory.SEARCH_ENGINE,
"OPENAI_CRAWLER_SEARCH",
],
)Available categories: ACADEMIC, ADVERTISING, AI, AMAZON,
ARCHIVE, BOTNET, FEEDFETCHER, GOOGLE, META, MICROSOFT,
MONITOR, OPTIMIZER, PREVIEW, PROGRAMMATIC, SEARCH_ENGINE,
SLACK, SOCIAL, TOOL, UNKNOWN, VERCEL, YAHOO. Use
BotCategory.<NAME> in Python or pass the string directly. You can also
allow or deny specific bots by name.
If you specify an allow list, all other bots are denied. An empty allow list blocks all bots. The reverse applies for deny lists.
Bots claiming to be well-known crawlers (e.g. Googlebot) are verified against
their known IP ranges. Use is_spoofed_bot() to check:
from arcjet import is_spoofed_bot
if any(is_spoofed_bot(r) for r in decision.results):
return jsonify(error="Spoofed bot"), 403See the Bot Protection docs for more details.
Limit request rates per IP, user, or any custom characteristic. Arcjet supports
token bucket, fixed window, and sliding window algorithms. Token buckets are
ideal for controlling AI token budgets — set capacity to the max tokens a user
can spend, refill_rate to how many tokens are restored per interval, and
deduct tokens per request via requested in protect(). The interval accepts
seconds as a number. Use characteristics to track limits per user instead of
per IP.
Rate limits track by IP address by default. To track per user, declare the key
name in characteristics on the rule, then pass the actual value in
protect():
from arcjet import arcjet, token_bucket, Mode
aj = arcjet(
key=arcjet_key,
rules=[
token_bucket(
characteristics=["userId"], # or ["ip.src"] for IP-based
mode=Mode.LIVE,
refill_rate=100, # tokens added per interval
interval=60, # interval in seconds
capacity=1000, # maximum tokens per bucket
),
],
)
@app.post("/chat")
async def chat(request: Request):
decision = await aj.protect(
request,
requested=5, # tokens consumed by this request
characteristics={"userId": "user_123"},
)
if decision.is_denied():
return JSONResponse({"error": "Rate limited"}, status_code=429)from arcjet import arcjet, fixed_window, Mode
aj = arcjet(
key=arcjet_key,
rules=[
fixed_window(mode=Mode.LIVE, window=60, max=100),
],
)from arcjet import arcjet, sliding_window, Mode
aj = arcjet(
key=arcjet_key,
rules=[
sliding_window(mode=Mode.LIVE, interval=60, max=100),
],
)See the Rate Limiting docs for more details.
Detect and block PII in request content before it reaches your LLM or data
store. Built-in entity types: EMAIL, PHONE_NUMBER, IP_ADDRESS,
CREDIT_CARD_NUMBER. You can also provide a custom detect callback for
additional patterns.
from arcjet import arcjet, detect_sensitive_info, SensitiveInfoEntityType, Mode
aj = arcjet(
key=arcjet_key,
rules=[
detect_sensitive_info(
mode=Mode.LIVE,
deny=[
SensitiveInfoEntityType.EMAIL,
SensitiveInfoEntityType.CREDIT_CARD_NUMBER,
],
),
],
)
# Pass the content to scan with each protect() call
decision = await aj.protect(request, sensitive_info_value="User input to scan")You can supplement built-in detectors with a custom detect callback:
def my_detect(tokens: list[str]) -> list[str | None]:
return ["CUSTOM_PII" if "secret" in t.lower() else None for t in tokens]
rules = [
detect_sensitive_info(
mode=Mode.LIVE,
deny=["CUSTOM_PII"],
detect=my_detect,
),
]See the Sensitive Information docs for more details.
Protect against common web attacks including SQL injection, XSS, path traversal, and other OWASP Top 10 threats. No additional configuration needed — Shield analyzes request patterns automatically.
from arcjet import arcjet, shield, Mode
aj = arcjet(
key=arcjet_key,
rules=[
shield(mode=Mode.LIVE),
],
)See the Shield docs for more details.
Prevent users from signing up with disposable, invalid, or undeliverable email
addresses. Deny types: DISPOSABLE, FREE, INVALID, NO_MX_RECORDS,
NO_GRAVATAR.
from arcjet import arcjet, validate_email, EmailType, Mode
aj = arcjet(
key=arcjet_key,
rules=[
validate_email(
mode=Mode.LIVE,
deny=[
EmailType.DISPOSABLE,
EmailType.INVALID,
EmailType.NO_MX_RECORDS,
],
),
],
)
# Pass the email with each protect() call
decision = await aj.protect(request, email="user@example.com")See the Email Validation docs for more details.
Filter requests using expression-based rules against request properties (IP address, headers, path, HTTP method, and custom local fields).
Restrict access to specific countries — useful for licensing, compliance, or
regional rollouts. The allow list denies all countries not listed:
from arcjet import arcjet, filter_request, Mode
aj = arcjet(
key=arcjet_key,
rules=[
# Allow only US traffic — all other countries are denied
filter_request(
mode=Mode.LIVE,
allow=['ip.src.country eq "US"'],
),
],
)
@app.get("/")
async def index(request: Request):
decision = await aj.protect(request)
if decision.is_denied():
return JSONResponse({"error": "Access restricted in your region"}, status_code=403)To restrict to a specific state or province, combine country and region:
filter_request(
mode=Mode.LIVE,
# Allow only California — useful for state-level compliance e.g. CCPA testing
allow=['ip.src.country eq "US" and ip.src.region eq "California"'],
)Prevent anonymized traffic from accessing sensitive endpoints — useful for fraud prevention, enforcing geo-restrictions, and reducing abuse:
from arcjet import arcjet, filter_request, Mode
aj = arcjet(
key=arcjet_key,
rules=[
filter_request(
mode=Mode.LIVE,
deny=[
"ip.src.vpn", # VPN services
"ip.src.proxy", # Open proxies
"ip.src.tor", # Tor exit nodes
],
),
],
)For cases where you want to allow some anonymized traffic (e.g. Apple Private
Relay) but still log or handle it differently, use decision.ip helpers after
calling protect():
decision = await aj.protect(request)
if decision.ip.is_vpn() or decision.ip.is_tor():
return JSONResponse({"error": "VPN traffic not allowed"}, status_code=403)
ip = decision.ip_details
if ip and ip.is_relay:
# Privacy relay (e.g. Apple Private Relay) — lower risk than a VPN
pass # allow through with custom handlingPass arbitrary values from your application for use in filter expressions:
decision = await aj.protect(
request,
filter_local={"userId": current_user.id, "plan": current_user.plan},
)These are then available as local.userId and local.plan in expressions:
filter_request(
mode=Mode.LIVE,
deny=['local.plan eq "free" and ip.src.country ne "US"'],
)See the Request Filters docs, IP Geolocation blueprint, and VPN/Proxy Detection blueprint for more details.
Arcjet returns IP metadata with every decision — no extra API calls needed.
# High-level helpers
if decision.ip.is_hosting():
# likely a cloud/hosting provider — often suspicious for bots
return JSONResponse({"error": "Hosting IP blocked"}, status_code=403)
if decision.ip.is_vpn() or decision.ip.is_proxy() or decision.ip.is_tor():
# apply your policy for anonymized traffic
pass
# Typed field access
ip = decision.ip_details
if ip:
print(ip.city, ip.country_name) # geolocation
print(ip.asn, ip.asn_name) # ASN / network
print(ip.is_vpn, ip.is_hosting) # reputationAvailable fields include geolocation (latitude, longitude, city,
region, country, continent), network (asn, asn_name, asn_domain,
asn_type, asn_country), and reputation (is_vpn, is_proxy, is_tor,
is_hosting, is_relay).
Arcjet works with any Python code, including LangChain agents and chains. In this example, we protect a LangChain agent's chat endpoint with Arcjet to prevent prompt injection, block bots, prevent sensitive data leakage, and enforce token budgets before invoking the agent.
from arcjet import arcjet, detect_bot, detect_prompt_injection, detect_sensitive_info, token_bucket, Mode, SensitiveInfoEntityType
aj = arcjet(
key=arcjet_key,
rules=[
detect_prompt_injection(mode=Mode.LIVE),
detect_sensitive_info(
mode=Mode.LIVE,
deny=[
SensitiveInfoEntityType.EMAIL,
SensitiveInfoEntityType.CREDIT_CARD_NUMBER,
SensitiveInfoEntityType.PHONE_NUMBER,
],
),
detect_bot(mode=Mode.LIVE, allow=["CURL"]),
token_bucket(characteristics=["userId"], mode=Mode.LIVE, refill_rate=5, interval=10, capacity=10),
],
)
@app.post("/chat")
async def chat(request: Request, body: ChatRequest):
decision = await aj.protect(
request,
requested=5,
characteristics={"userId": "user_123"},
detect_prompt_injection_message=body.message, # scan for prompt injection
sensitive_info_value=body.message, # scan for PII before sending to LLM
)
if decision.is_denied():
status = 429 if decision.reason_v2.type == "RATE_LIMIT" else 403
return JSONResponse({"error": "Denied"}, status_code=status)
reply = await chain.ainvoke({"message": body.message})
return {"reply": reply}from arcjet import arcjet_sync, detect_bot, detect_prompt_injection, detect_sensitive_info, token_bucket, Mode, SensitiveInfoEntityType
aj = arcjet_sync(
key=arcjet_key,
rules=[
detect_prompt_injection(mode=Mode.LIVE),
detect_sensitive_info(
mode=Mode.LIVE,
deny=[
SensitiveInfoEntityType.EMAIL,
SensitiveInfoEntityType.CREDIT_CARD_NUMBER,
SensitiveInfoEntityType.PHONE_NUMBER,
],
),
detect_bot(mode=Mode.LIVE, allow=["CURL"]),
token_bucket(characteristics=["userId"], mode=Mode.LIVE, refill_rate=5, interval=10, capacity=10),
],
)
@app.post("/chat")
def chat():
body = request.get_json()
message = body.get("message", "") if body else ""
decision = aj.protect(
request,
requested=5,
characteristics={"userId": "user_123"},
detect_prompt_injection_message=message, # scan for prompt injection
sensitive_info_value=message, # scan for PII before sending to LLM
)
if decision.is_denied():
status = 429 if decision.reason_v2.type == "RATE_LIMIT" else 403
return jsonify(error="Denied"), status
reply = chain.invoke({"message": message})
return jsonify(reply=reply)Create one Arcjet client at startup and reuse it across all requests:
# Good — one instance, created once at startup
aj = arcjet(key=arcjet_key, rules=[...])
# Bad — new instance per request wastes resources
@app.get("/")
async def index(request: Request):
aj = arcjet(key=arcjet_key, rules=[...]) # don't do thisUse Mode.DRY_RUN to test rules without blocking traffic. Decisions are logged
but requests are allowed through:
aj = arcjet(
key=arcjet_key,
rules=[
detect_bot(mode=Mode.DRY_RUN, allow=[]),
token_bucket(mode=Mode.DRY_RUN, refill_rate=5, interval=10, capacity=10),
],
)When running behind a load balancer or reverse proxy, configure trusted IPs so
Arcjet resolves the real client IP from X-Forwarded-For:
aj = arcjet(
key=arcjet_key,
rules=[...],
proxies=["10.0.0.0/8", "192.168.0.1"],
)Use arcjet (async) with FastAPI and other async frameworks. Use arcjet_sync
with Flask and other sync frameworks:
from arcjet import arcjet, arcjet_sync
# Async — for FastAPI, Starlette, etc.
aj_async = arcjet(key=arcjet_key, rules=[...])
decision = await aj_async.protect(request)
# Sync — for Flask, Django, etc.
aj_sync = arcjet_sync(key=arcjet_key, rules=[...])
decision = aj_sync.protect(request)All parameters are optional keyword arguments passed alongside the request:
| Parameter | Type | Used by |
|---|---|---|
requested |
int |
Token bucket rate limit |
characteristics |
dict[str, Any] |
Rate limiting (pass values for keys declared in rule config) |
detect_prompt_injection_message |
str |
Prompt injection detection |
sensitive_info_value |
str |
Sensitive info detection |
email |
str |
Email validation |
filter_local |
dict[str, str] |
Request filters (local.* fields) |
ip_src |
str |
Manual IP override (advanced) |
decision = await aj.protect(request)
# Top-level checks
decision.is_denied() # True if any rule denied the request
decision.is_allowed() # True if all rules allowed the request
decision.is_error() # True if Arcjet encountered an error (fails open)
# reason_v2.type values: "BOT", "RATE_LIMIT", "SHIELD", "EMAIL", "ERROR", "FILTER"
if decision.reason_v2.type == "RATE_LIMIT":
print(decision.reason_v2.remaining) # tokens/requests remaining
elif decision.reason_v2.type == "BOT":
print(decision.reason_v2.denied) # list of denied bot names
print(decision.reason_v2.spoofed) # list of spoofed bot names
# Per-rule results (for granular handling)
for result in decision.results:
print(result.reason_v2.type, result.is_denied())Arcjet is designed to fail open — if the service is unavailable, requests are allowed through. Check for errors explicitly if your use case requires it:
decision = await aj.protect(request)
if decision.is_error():
# Arcjet service error — fail open or apply fallback policy
pass
elif decision.is_denied():
return JSONResponse({"error": "Denied"}, status_code=403)This repository follows the Arcjet Support Policy.
This repository follows the Arcjet Security Policy.
Packages maintained in this repository are compatible with Python 3.10 and above.
Licensed under the Apache License, Version 2.0.