PyTeal Examples¶
Here are some additional PyTeal example programs:
Signature Mode¶
Atomic Swap¶
Atomic Swap allows the transfer of Algos from a buyer to a seller in exchange for a good or service. This is done using a Hashed Time Locked Contract. In this scheme, the buyer and funds a TEAL account with the sale price. The buyer also picks a secret value and encodes a secure hash of this value in the TEAL program. The TEAL program will transfer its balance to the seller if the seller is able to provide the secret value that corresponds to the hash in the program. When the seller renders the good or service to the buyer, the buyer discloses the secret from the program. The seller can immediately verify the secret and withdraw the payment.
# This example is provided for informational purposes only and has not been audited for security.
from pyteal import *
"""Atomic Swap"""
alice = Addr("6ZHGHH5Z5CTPCF5WCESXMGRSVK7QJETR63M3NY5FJCUYDHO57VTCMJOBGY")
bob = Addr("7Z5PWO2C6LFNQFGHWKSK5H47IQP5OJW2M3HA2QPXTY3WTNP5NU2MHBW27M")
secret = Bytes("base32", "2323232323232323")
timeout = 3000
def htlc(
tmpl_seller=alice,
tmpl_buyer=bob,
tmpl_fee=1000,
tmpl_secret=secret,
tmpl_hash_fn=Sha256,
tmpl_timeout=timeout,
):
fee_cond = Txn.fee() < Int(tmpl_fee)
safety_cond = And(
Txn.type_enum() == TxnType.Payment,
Txn.close_remainder_to() == Global.zero_address(),
Txn.rekey_to() == Global.zero_address(),
)
recv_cond = And(Txn.receiver() == tmpl_seller, tmpl_hash_fn(Arg(0)) == tmpl_secret)
esc_cond = And(Txn.receiver() == tmpl_buyer, Txn.first_valid() > Int(tmpl_timeout))
return And(fee_cond, safety_cond, Or(recv_cond, esc_cond))
if __name__ == "__main__":
print(compileTeal(htlc(), mode=Mode.Signature, version=2))
Split Payment¶
Split Payment splits payment between tmpl_rcv1
and tmpl_rcv2
on the ratio of
tmpl_ratn / tmpl_ratd
.
# This example is provided for informational purposes only and has not been audited for security.
from pyteal import *
"""Split Payment"""
tmpl_fee = Int(1000)
tmpl_rcv1 = Addr("6ZHGHH5Z5CTPCF5WCESXMGRSVK7QJETR63M3NY5FJCUYDHO57VTCMJOBGY")
tmpl_rcv2 = Addr("7Z5PWO2C6LFNQFGHWKSK5H47IQP5OJW2M3HA2QPXTY3WTNP5NU2MHBW27M")
tmpl_own = Addr("5MK5NGBRT5RL6IGUSYDIX5P7TNNZKRVXKT6FGVI6UVK6IZAWTYQGE4RZIQ")
tmpl_ratn = Int(1)
tmpl_ratd = Int(3)
tmpl_min_pay = Int(1000)
tmpl_timeout = Int(3000)
def split(
tmpl_fee=tmpl_fee,
tmpl_rcv1=tmpl_rcv1,
tmpl_rcv2=tmpl_rcv2,
tmpl_own=tmpl_own,
tmpl_ratn=tmpl_ratn,
tmpl_ratd=tmpl_ratd,
tmpl_min_pay=tmpl_min_pay,
tmpl_timeout=tmpl_timeout,
):
split_core = And(
Txn.type_enum() == TxnType.Payment,
Txn.fee() < tmpl_fee,
Txn.rekey_to() == Global.zero_address(),
)
split_transfer = And(
Gtxn[0].sender() == Gtxn[1].sender(),
Txn.close_remainder_to() == Global.zero_address(),
Gtxn[0].receiver() == tmpl_rcv1,
Gtxn[1].receiver() == tmpl_rcv2,
Gtxn[0].amount()
== ((Gtxn[0].amount() + Gtxn[1].amount()) * tmpl_ratn) / tmpl_ratd,
Gtxn[0].amount() == tmpl_min_pay,
)
split_close = And(
Txn.close_remainder_to() == tmpl_own,
Txn.receiver() == Global.zero_address(),
Txn.amount() == Int(0),
Txn.first_valid() > tmpl_timeout,
)
split_program = And(
split_core, If(Global.group_size() == Int(2), split_transfer, split_close)
)
return split_program
if __name__ == "__main__":
print(compileTeal(split(), mode=Mode.Signature, version=2))
Periodic Payment¶
Periodic Payment allows some account to execute periodic withdrawal of funds. This PyTeal program
creates an contract account that allows tmpl_rcv
to withdraw tmpl_amt
every
tmpl_period
rounds for tmpl_dur
after every multiple of tmpl_period
.
After tmpl_timeout
, all remaining funds in the escrow are available to tmpl_rcv
.
# This example is provided for informational purposes only and has not been audited for security.
from pyteal import *
"""Periodic Payment"""
tmpl_fee = Int(1000)
tmpl_period = Int(50)
tmpl_dur = Int(5000)
tmpl_lease = Bytes("base64", "023sdDE2")
tmpl_amt = Int(2000)
tmpl_rcv = Addr("6ZHGHH5Z5CTPCF5WCESXMGRSVK7QJETR63M3NY5FJCUYDHO57VTCMJOBGY")
tmpl_timeout = Int(30000)
def periodic_payment(
tmpl_fee=tmpl_fee,
tmpl_period=tmpl_period,
tmpl_dur=tmpl_dur,
tmpl_lease=tmpl_lease,
tmpl_amt=tmpl_amt,
tmpl_rcv=tmpl_rcv,
tmpl_timeout=tmpl_timeout,
):
periodic_pay_core = And(
Txn.type_enum() == TxnType.Payment,
Txn.fee() < tmpl_fee,
Txn.first_valid() % tmpl_period == Int(0),
Txn.last_valid() == tmpl_dur + Txn.first_valid(),
Txn.lease() == tmpl_lease,
)
periodic_pay_transfer = And(
Txn.close_remainder_to() == Global.zero_address(),
Txn.rekey_to() == Global.zero_address(),
Txn.receiver() == tmpl_rcv,
Txn.amount() == tmpl_amt,
)
periodic_pay_close = And(
Txn.close_remainder_to() == tmpl_rcv,
Txn.rekey_to() == Global.zero_address(),
Txn.receiver() == Global.zero_address(),
Txn.first_valid() == tmpl_timeout,
Txn.amount() == Int(0),
)
periodic_pay_escrow = periodic_pay_core.And(
periodic_pay_transfer.Or(periodic_pay_close)
)
return periodic_pay_escrow
if __name__ == "__main__":
print(compileTeal(periodic_payment(), mode=Mode.Signature, version=2))
Application Mode¶
Voting¶
Voting allows accounts to register and vote for arbitrary choices. Here a choice is any byte slice and anyone is allowed to register to vote.
This example has a configurable registration period defined by the global state RegBegin
and RegEnd
which restrict when accounts can register to vote. There is also a separate
configurable voting period defined by the global state VotingBegin
and VotingEnd
which restrict when voting can take place.
An account must register in order to vote. Accounts cannot vote more than once, and if an account opts out of the application before the voting period has concluded, their vote is discarded. The results are visible in the global state of the application, and the winner is the candidate with the highest number of votes.
# This example is provided for informational purposes only and has not been audited for security.
from pyteal import *
def approval_program():
on_creation = Seq(
[
App.globalPut(Bytes("Creator"), Txn.sender()),
Assert(Txn.application_args.length() == Int(4)),
App.globalPut(Bytes("RegBegin"), Btoi(Txn.application_args[0])),
App.globalPut(Bytes("RegEnd"), Btoi(Txn.application_args[1])),
App.globalPut(Bytes("VoteBegin"), Btoi(Txn.application_args[2])),
App.globalPut(Bytes("VoteEnd"), Btoi(Txn.application_args[3])),
Return(Int(1)),
]
)
is_creator = Txn.sender() == App.globalGet(Bytes("Creator"))
get_vote_of_sender = App.localGetEx(Int(0), App.id(), Bytes("voted"))
on_closeout = Seq(
[
get_vote_of_sender,
If(
And(
Global.round() <= App.globalGet(Bytes("VoteEnd")),
get_vote_of_sender.hasValue(),
),
App.globalPut(
get_vote_of_sender.value(),
App.globalGet(get_vote_of_sender.value()) - Int(1),
),
),
Return(Int(1)),
]
)
on_register = Return(
And(
Global.round() >= App.globalGet(Bytes("RegBegin")),
Global.round() <= App.globalGet(Bytes("RegEnd")),
)
)
choice = Txn.application_args[1]
choice_tally = App.globalGet(choice)
on_vote = Seq(
[
Assert(
And(
Global.round() >= App.globalGet(Bytes("VoteBegin")),
Global.round() <= App.globalGet(Bytes("VoteEnd")),
)
),
get_vote_of_sender,
If(get_vote_of_sender.hasValue(), Return(Int(0))),
App.globalPut(choice, choice_tally + Int(1)),
App.localPut(Int(0), Bytes("voted"), choice),
Return(Int(1)),
]
)
program = Cond(
[Txn.application_id() == Int(0), on_creation],
[Txn.on_completion() == OnComplete.DeleteApplication, Return(is_creator)],
[Txn.on_completion() == OnComplete.UpdateApplication, Return(is_creator)],
[Txn.on_completion() == OnComplete.CloseOut, on_closeout],
[Txn.on_completion() == OnComplete.OptIn, on_register],
[Txn.application_args[0] == Bytes("vote"), on_vote],
)
return program
def clear_state_program():
get_vote_of_sender = App.localGetEx(Int(0), App.id(), Bytes("voted"))
program = Seq(
[
get_vote_of_sender,
If(
And(
Global.round() <= App.globalGet(Bytes("VoteEnd")),
get_vote_of_sender.hasValue(),
),
App.globalPut(
get_vote_of_sender.value(),
App.globalGet(get_vote_of_sender.value()) - Int(1),
),
),
Return(Int(1)),
]
)
return program
if __name__ == "__main__":
with open("vote_approval.teal", "w") as f:
compiled = compileTeal(approval_program(), mode=Mode.Application, version=2)
f.write(compiled)
with open("vote_clear_state.teal", "w") as f:
compiled = compileTeal(clear_state_program(), mode=Mode.Application, version=2)
f.write(compiled)
A reference script that deploys the voting application is below:
# based off https://github.com/algorand/docs/blob/cdf11d48a4b1168752e6ccaf77c8b9e8e599713a/examples/smart_contracts/v2/python/stateful_smart_contracts.py
import base64
import datetime
from algosdk.future import transaction
from algosdk import account, mnemonic
from algosdk.v2client import algod
from pyteal import compileTeal, Mode
from vote import approval_program, clear_state_program
# user declared account mnemonics
creator_mnemonic = "Your 25-word mnemonic goes here"
user_mnemonic = "A second distinct 25-word mnemonic goes here"
# user declared algod connection parameters. Node must have EnableDeveloperAPI set to true in its config
algod_address = "http://localhost:4001"
algod_token = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
# helper function to compile program source
def compile_program(client, source_code):
compile_response = client.compile(source_code)
return base64.b64decode(compile_response["result"])
# helper function that converts a mnemonic passphrase into a private signing key
def get_private_key_from_mnemonic(mn):
private_key = mnemonic.to_private_key(mn)
return private_key
# helper function that waits for a given txid to be confirmed by the network
def wait_for_confirmation(client, txid):
last_round = client.status().get("last-round")
txinfo = client.pending_transaction_info(txid)
while not (txinfo.get("confirmed-round") and txinfo.get("confirmed-round") > 0):
print("Waiting for confirmation...")
last_round += 1
client.status_after_block(last_round)
txinfo = client.pending_transaction_info(txid)
print(
"Transaction {} confirmed in round {}.".format(
txid, txinfo.get("confirmed-round")
)
)
return txinfo
def wait_for_round(client, round):
last_round = client.status().get("last-round")
print(f"Waiting for round {round}")
while last_round < round:
last_round += 1
client.status_after_block(last_round)
print(f"Round {last_round}")
# create new application
def create_app(
client,
private_key,
approval_program,
clear_program,
global_schema,
local_schema,
app_args,
):
# define sender as creator
sender = account.address_from_private_key(private_key)
# declare on_complete as NoOp
on_complete = transaction.OnComplete.NoOpOC.real
# get node suggested parameters
params = client.suggested_params()
# comment out the next two (2) lines to use suggested fees
params.flat_fee = True
params.fee = 1000
# create unsigned transaction
txn = transaction.ApplicationCreateTxn(
sender,
params,
on_complete,
approval_program,
clear_program,
global_schema,
local_schema,
app_args,
)
# sign transaction
signed_txn = txn.sign(private_key)
tx_id = signed_txn.transaction.get_txid()
# send transaction
client.send_transactions([signed_txn])
# await confirmation
wait_for_confirmation(client, tx_id)
# display results
transaction_response = client.pending_transaction_info(tx_id)
app_id = transaction_response["application-index"]
print("Created new app-id:", app_id)
return app_id
# opt-in to application
def opt_in_app(client, private_key, index):
# declare sender
sender = account.address_from_private_key(private_key)
print("OptIn from account: ", sender)
# get node suggested parameters
params = client.suggested_params()
# comment out the next two (2) lines to use suggested fees
params.flat_fee = True
params.fee = 1000
# create unsigned transaction
txn = transaction.ApplicationOptInTxn(sender, params, index)
# sign transaction
signed_txn = txn.sign(private_key)
tx_id = signed_txn.transaction.get_txid()
# send transaction
client.send_transactions([signed_txn])
# await confirmation
wait_for_confirmation(client, tx_id)
# display results
transaction_response = client.pending_transaction_info(tx_id)
print("OptIn to app-id:", transaction_response["txn"]["txn"]["apid"])
# call application
def call_app(client, private_key, index, app_args):
# declare sender
sender = account.address_from_private_key(private_key)
print("Call from account:", sender)
# get node suggested parameters
params = client.suggested_params()
# comment out the next two (2) lines to use suggested fees
params.flat_fee = True
params.fee = 1000
# create unsigned transaction
txn = transaction.ApplicationNoOpTxn(sender, params, index, app_args)
# sign transaction
signed_txn = txn.sign(private_key)
tx_id = signed_txn.transaction.get_txid()
# send transaction
client.send_transactions([signed_txn])
# await confirmation
wait_for_confirmation(client, tx_id)
def format_state(state):
formatted = {}
for item in state:
key = item["key"]
value = item["value"]
formatted_key = base64.b64decode(key).decode("utf-8")
if value["type"] == 1:
# byte string
if formatted_key == "voted":
formatted_value = base64.b64decode(value["bytes"]).decode("utf-8")
else:
formatted_value = value["bytes"]
formatted[formatted_key] = formatted_value
else:
# integer
formatted[formatted_key] = value["uint"]
return formatted
# read user local state
def read_local_state(client, addr, app_id):
results = client.account_info(addr)
for local_state in results["apps-local-state"]:
if local_state["id"] == app_id:
if "key-value" not in local_state:
return {}
return format_state(local_state["key-value"])
return {}
# read app global state
def read_global_state(client, addr, app_id):
results = client.account_info(addr)
apps_created = results["created-apps"]
for app in apps_created:
if app["id"] == app_id:
return format_state(app["params"]["global-state"])
return {}
# delete application
def delete_app(client, private_key, index):
# declare sender
sender = account.address_from_private_key(private_key)
# get node suggested parameters
params = client.suggested_params()
# comment out the next two (2) lines to use suggested fees
params.flat_fee = True
params.fee = 1000
# create unsigned transaction
txn = transaction.ApplicationDeleteTxn(sender, params, index)
# sign transaction
signed_txn = txn.sign(private_key)
tx_id = signed_txn.transaction.get_txid()
# send transaction
client.send_transactions([signed_txn])
# await confirmation
wait_for_confirmation(client, tx_id)
# display results
transaction_response = client.pending_transaction_info(tx_id)
print("Deleted app-id:", transaction_response["txn"]["txn"]["apid"])
# close out from application
def close_out_app(client, private_key, index):
# declare sender
sender = account.address_from_private_key(private_key)
# get node suggested parameters
params = client.suggested_params()
# comment out the next two (2) lines to use suggested fees
params.flat_fee = True
params.fee = 1000
# create unsigned transaction
txn = transaction.ApplicationCloseOutTxn(sender, params, index)
# sign transaction
signed_txn = txn.sign(private_key)
tx_id = signed_txn.transaction.get_txid()
# send transaction
client.send_transactions([signed_txn])
# await confirmation
wait_for_confirmation(client, tx_id)
# display results
transaction_response = client.pending_transaction_info(tx_id)
print("Closed out from app-id: ", transaction_response["txn"]["txn"]["apid"])
# clear application
def clear_app(client, private_key, index):
# declare sender
sender = account.address_from_private_key(private_key)
# get node suggested parameters
params = client.suggested_params()
# comment out the next two (2) lines to use suggested fees
params.flat_fee = True
params.fee = 1000
# create unsigned transaction
txn = transaction.ApplicationClearStateTxn(sender, params, index)
# sign transaction
signed_txn = txn.sign(private_key)
tx_id = signed_txn.transaction.get_txid()
# send transaction
client.send_transactions([signed_txn])
# await confirmation
wait_for_confirmation(client, tx_id)
# display results
transaction_response = client.pending_transaction_info(tx_id)
print("Cleared app-id:", transaction_response["txn"]["txn"]["apid"])
# convert 64 bit integer i to byte string
def intToBytes(i):
return i.to_bytes(8, "big")
def main():
# initialize an algodClient
algod_client = algod.AlgodClient(algod_token, algod_address)
# define private keys
creator_private_key = get_private_key_from_mnemonic(creator_mnemonic)
user_private_key = get_private_key_from_mnemonic(user_mnemonic)
# declare application state storage (immutable)
local_ints = 0
local_bytes = 1
global_ints = (
24 # 4 for setup + 20 for choices. Use a larger number for more choices.
)
global_bytes = 1
global_schema = transaction.StateSchema(global_ints, global_bytes)
local_schema = transaction.StateSchema(local_ints, local_bytes)
# get PyTeal approval program
approval_program_ast = approval_program()
# compile program to TEAL assembly
approval_program_teal = compileTeal(
approval_program_ast, mode=Mode.Application, version=2
)
# compile program to binary
approval_program_compiled = compile_program(algod_client, approval_program_teal)
# get PyTeal clear state program
clear_state_program_ast = clear_state_program()
# compile program to TEAL assembly
clear_state_program_teal = compileTeal(
clear_state_program_ast, mode=Mode.Application, version=2
)
# compile program to binary
clear_state_program_compiled = compile_program(
algod_client, clear_state_program_teal
)
# configure registration and voting period
status = algod_client.status()
regBegin = status["last-round"] + 10
regEnd = regBegin + 10
voteBegin = regEnd + 1
voteEnd = voteBegin + 10
print(f"Registration rounds: {regBegin} to {regEnd}")
print(f"Vote rounds: {voteBegin} to {voteEnd}")
# create list of bytes for app args
app_args = [
intToBytes(regBegin),
intToBytes(regEnd),
intToBytes(voteBegin),
intToBytes(voteEnd),
]
# create new application
app_id = create_app(
algod_client,
creator_private_key,
approval_program_compiled,
clear_state_program_compiled,
global_schema,
local_schema,
app_args,
)
# read global state of application
print(
"Global state:",
read_global_state(
algod_client, account.address_from_private_key(creator_private_key), app_id
),
)
# wait for registration period to start
wait_for_round(algod_client, regBegin)
# opt-in to application
opt_in_app(algod_client, user_private_key, app_id)
wait_for_round(algod_client, voteBegin)
# call application without arguments
call_app(algod_client, user_private_key, app_id, [b"vote", b"choiceA"])
# read local state of application from user account
print(
"Local state:",
read_local_state(
algod_client, account.address_from_private_key(user_private_key), app_id
),
)
# wait for registration period to start
wait_for_round(algod_client, voteEnd)
# read global state of application
global_state = read_global_state(
algod_client, account.address_from_private_key(creator_private_key), app_id
)
print("Global state:", global_state)
max_votes = 0
max_votes_choice = None
for key, value in global_state.items():
if (
key
not in (
"RegBegin",
"RegEnd",
"VoteBegin",
"VoteEnd",
"Creator",
)
and isinstance(value, int)
):
if value > max_votes:
max_votes = value
max_votes_choice = key
print("The winner is:", max_votes_choice)
# delete application
delete_app(algod_client, creator_private_key, app_id)
# clear application from user account
clear_app(algod_client, user_private_key, app_id)
if __name__ == "__main__":
main()
Example output for deployment would be:
Registration rounds: 592 to 602
Vote rounds: 603 to 613
Waiting for confirmation...
Transaction KXJHR6J4QSCAHO36L77DPJ53CLZBCCSPSBAOGTGQDRA7WECDXUEA confirmed in round 584.
Created new app-id: 29
Global state: {'RegEnd': 602, 'VoteBegin': 603, 'VoteEnd': 613, 'Creator': '49y8gDrKSnM77cgRyFzYdlkw18SDVNKhhOiS6NVVH8U=', 'RegBegin': 592}
Waiting for round 592
Round 585
Round 586
Round 587
Round 588
Round 589
Round 590
Round 591
Round 592
OptIn from account: FVQEFNOSD25TDBTTTIU2I5KW5DHR6PADYMZESTOCQ2O3ME4OWXEI7OHVRY
Waiting for confirmation...
Transaction YWXOAREFSUYID6QLWQHANTXK3NR2XOVTIQYKMD27F3VXJKP7CMYQ confirmed in round 595.
OptIn to app-id: 29
Waiting for round 603
Round 596
Round 597
Round 598
Round 599
Round 600
Round 601
Round 602
Round 603
Call from account: FVQEFNOSD25TDBTTTIU2I5KW5DHR6PADYMZESTOCQ2O3ME4OWXEI7OHVRY
Waiting for confirmation...
Transaction WNV4DTPEMVGUXNRZHMWNSCUU7AQJOCFTBKJT6NV2KN6THT4QGKNQ confirmed in round 606.
Local state: {'voted': 'choiceA'}
Waiting for round 613
Round 607
Round 608
Round 609
Round 610
Round 611
Round 612
Round 613
Global state: {'RegBegin': 592, 'RegEnd': 602, 'VoteBegin': 603, 'VoteEnd': 613, 'choiceA': 1, 'Creator': '49y8gDrKSnM77cgRyFzYdlkw18SDVNKhhOiS6NVVH8U='}
The winner is: choiceA
Waiting for confirmation...
Transaction 535KBWJ7RQX4ISV763IUUICQWI6VERYBJ7J6X7HPMAMFNKJPSNPQ confirmed in round 616.
Deleted app-id: 29
Waiting for confirmation...
Transaction Z56HDAJYARUC4PWGWQLCBA6TZYQOOLNOXY5XRM3IYUEEUCT5DRMA confirmed in round 618.
Cleared app-id: 29
Asset¶
Asset is an implementation of a custom asset type using smart contracts. While Algorand has ASAs, in some blockchains the only way to create a custom asset is through smart contracts.
At creation, the creator specifies the total supply of the asset. Initially this supply is placed in a reserve and the creator is made an admin. Any admin can move funds from the reserve into the balance of any account that has opted into the application using the mint argument. Additionally, any admin can move funds from any account’s balance into the reserve using the burn argument.
Accounts are free to transfer funds in their balance to any other account that has opted into the application. When an account opts out of the application, their balance is added to the reserve.
# This example is provided for informational purposes only and has not been audited for security.
from pyteal import *
def approval_program():
on_creation = Seq(
[
Assert(Txn.application_args.length() == Int(1)),
App.globalPut(Bytes("total supply"), Btoi(Txn.application_args[0])),
App.globalPut(Bytes("reserve"), Btoi(Txn.application_args[0])),
App.localPut(Int(0), Bytes("admin"), Int(1)),
App.localPut(Int(0), Bytes("balance"), Int(0)),
Return(Int(1)),
]
)
is_admin = App.localGet(Int(0), Bytes("admin"))
on_closeout = Seq(
[
App.globalPut(
Bytes("reserve"),
App.globalGet(Bytes("reserve"))
+ App.localGet(Int(0), Bytes("balance")),
),
Return(Int(1)),
]
)
register = Seq([App.localPut(Int(0), Bytes("balance"), Int(0)), Return(Int(1))])
# configure the admin status of the account Txn.accounts[1]
# sender must be admin
new_admin_status = Btoi(Txn.application_args[1])
set_admin = Seq(
[
Assert(And(is_admin, Txn.application_args.length() == Int(2))),
App.localPut(Int(1), Bytes("admin"), new_admin_status),
Return(Int(1)),
]
)
# NOTE: The above set_admin code is carefully constructed. If instead we used the following code:
# Seq([
# Assert(Txn.application_args.length() == Int(2)),
# App.localPut(Int(1), Bytes("admin"), new_admin_status),
# Return(is_admin)
# ])
# It would be vulnerable to the following attack: a sender passes in their own address as
# Txn.accounts[1], so then the line App.localPut(Int(1), Bytes("admin"), new_admin_status)
# changes the sender's admin status, meaning the final Return(is_admin) can return anything the
# sender wants. This allows anyone to become an admin!
# move assets from the reserve to Txn.accounts[1]
# sender must be admin
mint_amount = Btoi(Txn.application_args[1])
mint = Seq(
[
Assert(Txn.application_args.length() == Int(2)),
Assert(mint_amount <= App.globalGet(Bytes("reserve"))),
App.globalPut(
Bytes("reserve"), App.globalGet(Bytes("reserve")) - mint_amount
),
App.localPut(
Int(1),
Bytes("balance"),
App.localGet(Int(1), Bytes("balance")) + mint_amount,
),
Return(is_admin),
]
)
# transfer assets from the sender to Txn.accounts[1]
transfer_amount = Btoi(Txn.application_args[1])
transfer = Seq(
[
Assert(Txn.application_args.length() == Int(2)),
Assert(transfer_amount <= App.localGet(Int(0), Bytes("balance"))),
App.localPut(
Int(0),
Bytes("balance"),
App.localGet(Int(0), Bytes("balance")) - transfer_amount,
),
App.localPut(
Int(1),
Bytes("balance"),
App.localGet(Int(1), Bytes("balance")) + transfer_amount,
),
Return(Int(1)),
]
)
program = Cond(
[Txn.application_id() == Int(0), on_creation],
[Txn.on_completion() == OnComplete.DeleteApplication, Return(is_admin)],
[Txn.on_completion() == OnComplete.UpdateApplication, Return(is_admin)],
[Txn.on_completion() == OnComplete.CloseOut, on_closeout],
[Txn.on_completion() == OnComplete.OptIn, register],
[Txn.application_args[0] == Bytes("set admin"), set_admin],
[Txn.application_args[0] == Bytes("mint"), mint],
[Txn.application_args[0] == Bytes("transfer"), transfer],
)
return program
def clear_state_program():
program = Seq(
[
App.globalPut(
Bytes("reserve"),
App.globalGet(Bytes("reserve"))
+ App.localGet(Int(0), Bytes("balance")),
),
Return(Int(1)),
]
)
return program
if __name__ == "__main__":
with open("asset_approval.teal", "w") as f:
compiled = compileTeal(approval_program(), mode=Mode.Application, version=2)
f.write(compiled)
with open("asset_clear_state.teal", "w") as f:
compiled = compileTeal(clear_state_program(), mode=Mode.Application, version=2)
f.write(compiled)
Security Token¶
Security Token is an extension of the Asset example with more features and restrictions. There are two types of admins, contract admins and transfer admins.
Contract admins can delete the smart contract if the entire supply is in the reserve. They can promote accounts to transfer or contract admins. They can also mint and burn funds.
Transfer admins can impose maximum balance limitations on accounts, temporarily lock accounts, assign accounts to transfer groups, and impose transaction restrictions between transaction groups.
Both contract and transfer admins can pause trading of funds and freeze individual accounts.
Accounts can only transfer funds if trading is not paused, both the sender and receive accounts are not frozen or temporarily locked, transfer group restrictions are not in place between them, and the receiver’s account does not have a maximum balance restriction that would be invalidated.
# This example is provided for informational purposes only and has not been audited for security.
from pyteal import *
def approval_program():
on_creation = Seq(
[
Assert(Txn.application_args.length() == Int(1)),
App.globalPut(Bytes("total supply"), Btoi(Txn.application_args[0])),
App.globalPut(Bytes("reserve"), Btoi(Txn.application_args[0])),
App.globalPut(Bytes("paused"), Int(0)),
App.localPut(Int(0), Bytes("contract admin"), Int(1)),
App.localPut(Int(0), Bytes("transfer admin"), Int(1)),
App.localPut(Int(0), Bytes("balance"), Int(0)),
Return(Int(1)),
]
)
is_contract_admin = App.localGet(Int(0), Bytes("contract admin"))
is_transfer_admin = App.localGet(Int(0), Bytes("transfer admin"))
is_any_admin = is_contract_admin.Or(is_transfer_admin)
can_delete = And(
is_contract_admin,
App.globalGet(Bytes("total supply")) == App.globalGet(Bytes("reserve")),
)
on_closeout = Seq(
[
App.globalPut(
Bytes("reserve"),
App.globalGet(Bytes("reserve"))
+ App.localGet(Int(0), Bytes("balance")),
),
Return(Int(1)),
]
)
register = Seq([App.localPut(Int(0), Bytes("balance"), Int(0)), Return(Int(1))])
# pause all transfers
# sender must be any admin
new_pause_value = Btoi(Txn.application_args[1])
pause = Seq(
[
Assert(Txn.application_args.length() == Int(2)),
App.globalPut(Bytes("paused"), new_pause_value),
Return(is_any_admin),
]
)
# configure the admin status of the account Txn.accounts[1]
# sender must be contract admin
new_admin_type = Txn.application_args[1]
new_admin_status = Btoi(Txn.application_args[2])
set_admin = Seq(
[
Assert(
And(
is_contract_admin,
Txn.application_args.length() == Int(3),
Or(
new_admin_type == Bytes("contract admin"),
new_admin_type == Bytes("transfer admin"),
),
Txn.accounts.length() == Int(1),
)
),
App.localPut(Int(1), new_admin_type, new_admin_status),
Return(Int(1)),
]
)
# NOTE: The above set_admin code is carefully constructed. If instead we used the following code:
# Seq([
# Assert(And(
# Txn.application_args.length() == Int(3),
# Or(new_admin_type == Bytes("contract admin"), new_admin_type == Bytes("transfer admin")),
# Txn.accounts.length() == Int(1)
# )),
# App.localPut(Int(1), new_admin_type, new_admin_status),
# Return(is_contract_admin)
# ])
# It would be vulnerable to the following attack: a sender passes in their own address as
# Txn.accounts[1], so then the line App.localPut(Int(1), new_admin_type, new_admin_status)
# changes the sender's admin status, meaning the final Return(is_contract_admin) can return
# anything the sender wants. This allows anyone to become an admin!
# freeze Txn.accounts[1]
# sender must be any admin
new_freeze_value = Btoi(Txn.application_args[1])
freeze = Seq(
[
Assert(
And(
Txn.application_args.length() == Int(2),
Txn.accounts.length() == Int(1),
)
),
App.localPut(Int(1), Bytes("frozen"), new_freeze_value),
Return(is_any_admin),
]
)
# modify the max balance of Txn.accounts[1]
# if max_balance_value is 0, will delete the existing max balance limitation on the account
# sender must be transfer admin
max_balance_value = Btoi(Txn.application_args[1])
max_balance = Seq(
[
Assert(
And(
Txn.application_args.length() == Int(2),
Txn.accounts.length() == Int(1),
)
),
If(
max_balance_value == Int(0),
App.localDel(Int(1), Bytes("max balance")),
App.localPut(Int(1), Bytes("max balance"), max_balance_value),
),
Return(is_transfer_admin),
]
)
# lock Txn.accounts[1] until a UNIX timestamp
# sender must be transfer admin
lock_until_value = Btoi(Txn.application_args[1])
lock_until = Seq(
[
Assert(
And(
Txn.application_args.length() == Int(2),
Txn.accounts.length() == Int(1),
)
),
If(
lock_until_value == Int(0),
App.localDel(Int(1), Bytes("lock until")),
App.localPut(Int(1), Bytes("lock until"), lock_until_value),
),
Return(is_transfer_admin),
]
)
set_transfer_group = Seq(
[
Assert(
And(
Txn.application_args.length() == Int(3),
Txn.accounts.length() == Int(1),
)
),
App.localPut(
Int(1), Bytes("transfer group"), Btoi(Txn.application_args[2])
),
]
)
def getRuleKey(sendGroup, receiveGroup):
return Concat(Bytes("rule"), Itob(sendGroup), Itob(receiveGroup))
lock_transfer_key = getRuleKey(
Btoi(Txn.application_args[2]), Btoi(Txn.application_args[3])
)
lock_transfer_until = Btoi(Txn.application_args[4])
lock_transfer_group = Seq(
[
Assert(Txn.application_args.length() == Int(5)),
If(
lock_transfer_until == Int(0),
App.globalDel(lock_transfer_key),
App.globalPut(lock_transfer_key, lock_transfer_until),
),
]
)
# sender must be transfer admin
transfer_group = Seq(
[
Assert(Txn.application_args.length() > Int(2)),
Cond(
[Txn.application_args[1] == Bytes("set"), set_transfer_group],
[Txn.application_args[1] == Bytes("lock"), lock_transfer_group],
),
Return(is_transfer_admin),
]
)
# move assets from the reserve to Txn.accounts[1]
# sender must be contract admin
mint_amount = Btoi(Txn.application_args[1])
mint = Seq(
[
Assert(
And(
Txn.application_args.length() == Int(2),
Txn.accounts.length() == Int(1),
mint_amount <= App.globalGet(Bytes("reserve")),
)
),
App.globalPut(
Bytes("reserve"), App.globalGet(Bytes("reserve")) - mint_amount
),
App.localPut(
Int(1),
Bytes("balance"),
App.localGet(Int(1), Bytes("balance")) + mint_amount,
),
Return(is_contract_admin),
]
)
# move assets from Txn.accounts[1] to the reserve
# sender must be contract admin
burn_amount = Btoi(Txn.application_args[1])
burn = Seq(
[
Assert(
And(
Txn.application_args.length() == Int(2),
Txn.accounts.length() == Int(1),
burn_amount <= App.localGet(Int(1), Bytes("balance")),
)
),
App.globalPut(
Bytes("reserve"), App.globalGet(Bytes("reserve")) + burn_amount
),
App.localPut(
Int(1),
Bytes("balance"),
App.localGet(Int(1), Bytes("balance")) - burn_amount,
),
Return(is_contract_admin),
]
)
# transfer assets from the sender to Txn.accounts[1]
transfer_amount = Btoi(Txn.application_args[1])
receiver_max_balance = App.localGetEx(Int(1), App.id(), Bytes("max balance"))
transfer = Seq(
[
Assert(
And(
Txn.application_args.length() == Int(2),
Txn.accounts.length() == Int(1),
transfer_amount <= App.localGet(Int(0), Bytes("balance")),
)
),
receiver_max_balance,
If(
Or(
App.globalGet(Bytes("paused")),
App.localGet(Int(0), Bytes("frozen")),
App.localGet(Int(1), Bytes("frozen")),
App.localGet(Int(0), Bytes("lock until"))
>= Global.latest_timestamp(),
App.localGet(Int(1), Bytes("lock until"))
>= Global.latest_timestamp(),
App.globalGet(
getRuleKey(
App.localGet(Int(0), Bytes("transfer group")),
App.localGet(Int(1), Bytes("transfer group")),
)
)
>= Global.latest_timestamp(),
And(
receiver_max_balance.hasValue(),
receiver_max_balance.value()
< App.localGet(Int(1), Bytes("balance")) + transfer_amount,
),
),
Return(Int(0)),
),
App.localPut(
Int(0),
Bytes("balance"),
App.localGet(Int(0), Bytes("balance")) - transfer_amount,
),
App.localPut(
Int(1),
Bytes("balance"),
App.localGet(Int(1), Bytes("balance")) + transfer_amount,
),
Return(Int(1)),
]
)
program = Cond(
[Txn.application_id() == Int(0), on_creation],
[Txn.on_completion() == OnComplete.DeleteApplication, Return(can_delete)],
[
Txn.on_completion() == OnComplete.UpdateApplication,
Return(is_contract_admin),
],
[Txn.on_completion() == OnComplete.CloseOut, on_closeout],
[Txn.on_completion() == OnComplete.OptIn, register],
[Txn.application_args[0] == Bytes("pause"), pause],
[Txn.application_args[0] == Bytes("set admin"), set_admin],
[Txn.application_args[0] == Bytes("freeze"), freeze],
[Txn.application_args[0] == Bytes("max balance"), max_balance],
[Txn.application_args[0] == Bytes("lock until"), lock_until],
[Txn.application_args[0] == Bytes("transfer group"), transfer_group],
[Txn.application_args[0] == Bytes("mint"), mint],
[Txn.application_args[0] == Bytes("burn"), burn],
[Txn.application_args[0] == Bytes("transfer"), transfer],
)
return program
def clear_state_program():
program = Seq(
[
App.globalPut(
Bytes("reserve"),
App.globalGet(Bytes("reserve"))
+ App.localGet(Int(0), Bytes("balance")),
),
Return(Int(1)),
]
)
return program
if __name__ == "__main__":
with open("security_token_approval.teal", "w") as f:
compiled = compileTeal(approval_program(), mode=Mode.Application, version=2)
f.write(compiled)
with open("security_token_clear_state.teal", "w") as f:
compiled = compileTeal(clear_state_program(), mode=Mode.Application, version=2)
f.write(compiled)