Chapter 7: DeFi and Smart Contract Applications
Haiyue
24min
Chapter 7: DeFi and Smart Contract Applications
Learning Objectives
- Understand the concept of Decentralized Finance (DeFi)
- Learn about liquidity mining, staking, and other mechanisms
- Study basic applications of smart contracts
- Explore NFTs and other innovative applications
DeFi Ecosystem Overview
Decentralized Finance (DeFi) is a blockchain-based financial service that implements decentralized versions of traditional financial functions through smart contracts.
import time
import math
from typing import Dict, List
from dataclasses import dataclass
@dataclass
class LiquidityPool:
"""Liquidity Pool"""
token_a: str
token_b: str
reserve_a: float
reserve_b: float
total_shares: float
fee_rate: float = 0.003 # 0.3%
def get_price(self, token: str) -> float:
"""Get token price"""
if token == self.token_a:
return self.reserve_b / self.reserve_a
elif token == self.token_b:
return self.reserve_a / self.reserve_b
return 0.0
def add_liquidity(self, amount_a: float, amount_b: float) -> float:
"""Add liquidity"""
if self.total_shares == 0:
# Initial liquidity addition
shares = math.sqrt(amount_a * amount_b)
else:
# Proportional liquidity addition
shares_a = amount_a * self.total_shares / self.reserve_a
shares_b = amount_b * self.total_shares / self.reserve_b
shares = min(shares_a, shares_b)
self.reserve_a += amount_a
self.reserve_b += amount_b
self.total_shares += shares
return shares
def remove_liquidity(self, shares: float) -> tuple:
"""Remove liquidity"""
if shares > self.total_shares:
raise ValueError("Insufficient shares")
amount_a = shares * self.reserve_a / self.total_shares
amount_b = shares * self.reserve_b / self.total_shares
self.reserve_a -= amount_a
self.reserve_b -= amount_b
self.total_shares -= shares
return amount_a, amount_b
def swap(self, token_in: str, amount_in: float) -> float:
"""Swap tokens"""
if token_in == self.token_a:
reserve_in = self.reserve_a
reserve_out = self.reserve_b
elif token_in == self.token_b:
reserve_in = self.reserve_b
reserve_out = self.reserve_a
else:
raise ValueError("Invalid input token")
# AMM formula: x * y = k
amount_in_with_fee = amount_in * (1 - self.fee_rate)
amount_out = (amount_in_with_fee * reserve_out) / (reserve_in + amount_in_with_fee)
# Update reserves
if token_in == self.token_a:
self.reserve_a += amount_in
self.reserve_b -= amount_out
else:
self.reserve_b += amount_in
self.reserve_a -= amount_out
return amount_out
class YieldFarming:
"""Yield Farming"""
def __init__(self, reward_token: str, reward_per_block: float):
self.reward_token = reward_token
self.reward_per_block = reward_per_block
self.total_staked = 0.0
self.last_reward_block = 0
self.acc_reward_per_share = 0.0
self.user_stakes: Dict[str, float] = {}
self.user_debt: Dict[str, float] = {}
def stake(self, user: str, amount: float):
"""Stake LP tokens"""
self._update_pool()
if user in self.user_stakes:
# Calculate pending rewards
pending = (self.user_stakes[user] * self.acc_reward_per_share / 1e12) - self.user_debt[user]
if pending > 0:
print(f"User {user} received reward: {pending:.6f} {self.reward_token}")
self.user_stakes[user] = self.user_stakes.get(user, 0) + amount
self.total_staked += amount
self.user_debt[user] = self.user_stakes[user] * self.acc_reward_per_share / 1e12
print(f"User {user} staked {amount} LP tokens")
def unstake(self, user: str, amount: float):
"""Unstake"""
if user not in self.user_stakes or self.user_stakes[user] < amount:
raise ValueError("Insufficient staked amount")
self._update_pool()
# Calculate and distribute rewards
pending = (self.user_stakes[user] * self.acc_reward_per_share / 1e12) - self.user_debt[user]
if pending > 0:
print(f"User {user} received reward: {pending:.6f} {self.reward_token}")
self.user_stakes[user] -= amount
self.total_staked -= amount
self.user_debt[user] = self.user_stakes[user] * self.acc_reward_per_share / 1e12
print(f"User {user} unstaked {amount} LP tokens")
def _update_pool(self):
"""Update reward pool"""
current_block = int(time.time() // 10) # Simulated block number
if current_block <= self.last_reward_block:
return
if self.total_staked > 0:
blocks = current_block - self.last_reward_block
reward = blocks * self.reward_per_block
self.acc_reward_per_share += reward * 1e12 / self.total_staked
self.last_reward_block = current_block
def get_pending_reward(self, user: str) -> float:
"""View pending rewards"""
if user not in self.user_stakes:
return 0.0
temp_acc = self.acc_reward_per_share
current_block = int(time.time() // 10)
if current_block > self.last_reward_block and self.total_staked > 0:
blocks = current_block - self.last_reward_block
reward = blocks * self.reward_per_block
temp_acc += reward * 1e12 / self.total_staked
return (self.user_stakes[user] * temp_acc / 1e12) - self.user_debt.get(user, 0)
# DeFi Demo
print("=== DeFi Ecosystem Demo ===")
# Create ETH/USDT liquidity pool
pool = LiquidityPool("ETH", "USDT", 0, 0, 0)
# Initialize liquidity
initial_eth = 100
initial_usdt = 300000 # 1 ETH = 3000 USDT
shares = pool.add_liquidity(initial_eth, initial_usdt)
print(f"Initial liquidity pool:")
print(f"ETH reserve: {pool.reserve_a}")
print(f"USDT reserve: {pool.reserve_b}")
print(f"ETH price: ${pool.get_price('ETH'):,.2f}")
print(f"LP tokens: {shares}")
# Swap operation
print(f"\n=== Token Swap ===")
usdt_out = pool.swap("ETH", 1)
print(f"1 ETH swapped for: {usdt_out:.2f} USDT")
print(f"New ETH price: ${pool.get_price('ETH'):,.2f}")
eth_out = pool.swap("USDT", 3000)
print(f"3000 USDT swapped for: {eth_out:.6f} ETH")
print(f"New ETH price: ${pool.get_price('ETH'):,.2f}")
# Yield farming
print(f"\n=== Yield Farming ===")
farm = YieldFarming("FARM", 10.0) # 10 FARM tokens per block reward
# Users stake LP tokens
farm.stake("alice", 50)
farm.stake("bob", 30)
time.sleep(1) # Simulate time passage
# View rewards
alice_reward = farm.get_pending_reward("alice")
bob_reward = farm.get_pending_reward("bob")
print(f"Alice's pending reward: {alice_reward:.6f} FARM")
print(f"Bob's pending reward: {bob_reward:.6f} FARM")
# Partial unstaking
farm.unstake("alice", 20)
Lending Protocol
class LendingProtocol:
"""Lending Protocol"""
def __init__(self):
self.markets: Dict[str, Dict] = {}
self.user_supplies: Dict[str, Dict[str, float]] = {}
self.user_borrows: Dict[str, Dict[str, float]] = {}
self.interest_rates: Dict[str, float] = {}
def create_market(self, token: str, supply_rate: float, borrow_rate: float,
collateral_factor: float):
"""Create lending market"""
self.markets[token] = {
"total_supply": 0.0,
"total_borrow": 0.0,
"supply_rate": supply_rate,
"borrow_rate": borrow_rate,
"collateral_factor": collateral_factor, # Collateral ratio
"price": 1.0 # USD price
}
def supply(self, user: str, token: str, amount: float):
"""Deposit assets"""
if token not in self.markets:
raise ValueError("Market does not exist")
if user not in self.user_supplies:
self.user_supplies[user] = {}
self.user_supplies[user][token] = self.user_supplies[user].get(token, 0) + amount
self.markets[token]["total_supply"] += amount
print(f"{user} deposited {amount} {token}")
def borrow(self, user: str, token: str, amount: float):
"""Borrow assets"""
if token not in self.markets:
raise ValueError("Market does not exist")
# Check borrowing capacity
borrow_limit = self.get_borrow_limit(user)
current_borrows = self.get_total_borrows(user)
if current_borrows + amount > borrow_limit:
raise ValueError("Exceeds borrow limit")
if user not in self.user_borrows:
self.user_borrows[user] = {}
self.user_borrows[user][token] = self.user_borrows[user].get(token, 0) + amount
self.markets[token]["total_borrow"] += amount
print(f"{user} borrowed {amount} {token}")
def repay(self, user: str, token: str, amount: float):
"""Repay loan"""
if user not in self.user_borrows or token not in self.user_borrows[user]:
raise ValueError("No borrow record")
current_borrow = self.user_borrows[user][token]
repay_amount = min(amount, current_borrow)
self.user_borrows[user][token] -= repay_amount
self.markets[token]["total_borrow"] -= repay_amount
print(f"{user} repaid {repay_amount} {token}")
def get_borrow_limit(self, user: str) -> float:
"""Calculate borrow limit"""
total_collateral_value = 0.0
if user in self.user_supplies:
for token, amount in self.user_supplies[user].items():
if token in self.markets:
token_value = amount * self.markets[token]["price"]
collateral_value = token_value * self.markets[token]["collateral_factor"]
total_collateral_value += collateral_value
return total_collateral_value
def get_total_borrows(self, user: str) -> float:
"""Calculate total borrow value"""
total_borrow_value = 0.0
if user in self.user_borrows:
for token, amount in self.user_borrows[user].items():
if token in self.markets:
token_value = amount * self.markets[token]["price"]
total_borrow_value += token_value
return total_borrow_value
def liquidate(self, borrower: str, liquidator: str, token: str, amount: float):
"""Liquidate"""
# Check if liquidation is needed
borrow_limit = self.get_borrow_limit(borrower)
total_borrows = self.get_total_borrows(borrower)
if total_borrows <= borrow_limit:
raise ValueError("Account is healthy, liquidation not required")
# Execute liquidation logic
liquidation_bonus = 0.05 # 5% liquidation bonus
collateral_value = amount * (1 + liquidation_bonus)
print(f"Liquidation executed: {liquidator} liquidated {borrower}'s {amount} {token}")
print(f"Liquidation bonus: {liquidation_bonus:.1%}")
# Lending protocol demo
print(f"\n=== Lending Protocol Demo ===")
lending = LendingProtocol()
# Create markets
lending.create_market("ETH", 0.02, 0.05, 0.75) # 75% collateral ratio
lending.create_market("USDT", 0.01, 0.03, 0.9) # 90% collateral ratio
# Set prices
lending.markets["ETH"]["price"] = 3000
lending.markets["USDT"]["price"] = 1
# Alice deposits ETH as collateral
lending.supply("alice", "ETH", 2.0)
# Calculate Alice's borrowing capacity
alice_limit = lending.get_borrow_limit("alice")
print(f"Alice's borrow limit: ${alice_limit:,.2f}")
# Alice borrows USDT
try:
lending.borrow("alice", "USDT", 4000)
print("Borrowing successful")
except ValueError as e:
print(f"Borrowing failed: {e}")
# Alice borrows less USDT
lending.borrow("alice", "USDT", 3000)
# View Alice's account status
alice_borrows = lending.get_total_borrows("alice")
print(f"Alice's total borrow value: ${alice_borrows:,.2f}")
print(f"Health ratio: {(alice_limit / alice_borrows):.2f}")
NFT Ecosystem
import json
from typing import Optional
class NFTContract:
"""NFT Contract"""
def __init__(self, name: str, symbol: str):
self.name = name
self.symbol = symbol
self.tokens: Dict[int, Dict] = {}
self.owners: Dict[int, str] = {}
self.approvals: Dict[int, str] = {}
self.total_supply = 0
def mint(self, to: str, token_id: int, metadata: Dict) -> bool:
"""Mint NFT"""
if token_id in self.tokens:
raise ValueError("Token ID already exists")
self.tokens[token_id] = {
"metadata": metadata,
"minted_at": time.time(),
"creator": to
}
self.owners[token_id] = to
self.total_supply += 1
print(f"NFT #{token_id} minted to {to}")
return True
def transfer(self, from_addr: str, to: str, token_id: int) -> bool:
"""Transfer NFT"""
if token_id not in self.owners:
raise ValueError("Token does not exist")
if self.owners[token_id] != from_addr:
raise ValueError("Unauthorized transfer")
self.owners[token_id] = to
# Clear approval
self.approvals.pop(token_id, None)
print(f"NFT #{token_id} transferred from {from_addr} to {to}")
return True
def approve(self, owner: str, spender: str, token_id: int):
"""Approve NFT"""
if self.owners[token_id] != owner:
raise ValueError("Only owner can approve")
self.approvals[token_id] = spender
print(f"NFT #{token_id} approved to {spender}")
def get_metadata(self, token_id: int) -> Optional[Dict]:
"""Get metadata"""
return self.tokens.get(token_id, {}).get("metadata")
def owner_of(self, token_id: int) -> str:
"""Get owner"""
return self.owners.get(token_id, "")
class NFTMarketplace:
"""NFT Marketplace"""
def __init__(self):
self.listings: Dict[str, Dict] = {} # key: contract_token
self.offers: Dict[str, List[Dict]] = {}
self.sales_history: List[Dict] = []
def list_nft(self, contract: NFTContract, token_id: int,
seller: str, price: float, currency: str = "ETH"):
"""List NFT"""
if contract.owner_of(token_id) != seller:
raise ValueError("Only owner can list")
listing_key = f"{contract.name}_{token_id}"
self.listings[listing_key] = {
"contract": contract,
"token_id": token_id,
"seller": seller,
"price": price,
"currency": currency,
"listed_at": time.time(),
"active": True
}
print(f"NFT #{token_id} listed for {price} {currency}")
def buy_nft(self, contract_name: str, token_id: int, buyer: str) -> bool:
"""Buy NFT"""
listing_key = f"{contract_name}_{token_id}"
if listing_key not in self.listings:
raise ValueError("NFT not listed")
listing = self.listings[listing_key]
if not listing["active"]:
raise ValueError("NFT already delisted")
# Execute transfer
contract = listing["contract"]
seller = listing["seller"]
price = listing["price"]
contract.transfer(seller, buyer, token_id)
# Record sale
sale = {
"contract_name": contract_name,
"token_id": token_id,
"seller": seller,
"buyer": buyer,
"price": price,
"currency": listing["currency"],
"timestamp": time.time()
}
self.sales_history.append(sale)
listing["active"] = False
print(f"NFT #{token_id} sold: {seller} -> {buyer}, price: {price} {listing['currency']}")
return True
def make_offer(self, contract_name: str, token_id: int,
bidder: str, amount: float, currency: str = "ETH"):
"""Make offer"""
offer_key = f"{contract_name}_{token_id}"
if offer_key not in self.offers:
self.offers[offer_key] = []
offer = {
"bidder": bidder,
"amount": amount,
"currency": currency,
"timestamp": time.time(),
"active": True
}
self.offers[offer_key].append(offer)
print(f"Offer: {bidder} offered {amount} {currency} for NFT #{token_id}")
def get_floor_price(self, contract_name: str) -> float:
"""Get floor price"""
prices = []
for listing_key, listing in self.listings.items():
if (listing["active"] and
listing_key.startswith(contract_name) and
listing["currency"] == "ETH"):
prices.append(listing["price"])
return min(prices) if prices else 0.0
def get_collection_stats(self, contract_name: str) -> Dict:
"""Get collection statistics"""
# Aggregate sales history
sales = [s for s in self.sales_history
if s["contract_name"] == contract_name and s["currency"] == "ETH"]
if not sales:
return {"floor_price": 0, "volume": 0, "sales": 0}
total_volume = sum(s["price"] for s in sales)
avg_price = total_volume / len(sales)
floor_price = self.get_floor_price(contract_name)
return {
"floor_price": floor_price,
"volume": total_volume,
"average_price": avg_price,
"sales_count": len(sales),
"unique_holders": len(set(s["buyer"] for s in sales))
}
# NFT demo
print(f"\n=== NFT Ecosystem Demo ===")
# Create NFT collection
crypto_art = NFTContract("CryptoArt", "CA")
marketplace = NFTMarketplace()
# Artists mint NFTs
artists = ["artist1", "artist2", "artist3"]
artworks = [
{"name": "Digital Dreams #1", "description": "Abstract digital art", "rarity": "Rare"},
{"name": "Cyber Punk #42", "description": "Futuristic cityscape", "rarity": "Epic"},
{"name": "Nature's Code", "description": "Generative nature art", "rarity": "Common"}
]
for i, (artist, metadata) in enumerate(zip(artists, artworks)):
crypto_art.mint(artist, i+1, metadata)
# List NFTs
marketplace.list_nft(crypto_art, 1, "artist1", 2.5, "ETH")
marketplace.list_nft(crypto_art, 2, "artist2", 5.0, "ETH")
marketplace.list_nft(crypto_art, 3, "artist3", 1.2, "ETH")
# Buyers make offers
marketplace.make_offer("CryptoArt", 1, "collector1", 2.0, "ETH")
marketplace.make_offer("CryptoArt", 1, "collector2", 2.3, "ETH")
# Buy NFTs
marketplace.buy_nft("CryptoArt", 3, "collector1")
marketplace.buy_nft("CryptoArt", 1, "collector2")
# View collection stats
stats = marketplace.get_collection_stats("CryptoArt")
print(f"\nCryptoArt Collection Stats:")
print(f"Floor price: {stats['floor_price']:.2f} ETH")
print(f"Total volume: {stats['volume']:.2f} ETH")
print(f"Average price: {stats['average_price']:.2f} ETH")
print(f"Sales count: {stats['sales_count']}")
Cross-chain Bridge
class CrossChainBridge:
"""Cross-chain Bridge"""
def __init__(self):
self.chains = {
"ethereum": {"native_token": "ETH", "bridge_fee": 0.01},
"bsc": {"native_token": "BNB", "bridge_fee": 0.001},
"polygon": {"native_token": "MATIC", "bridge_fee": 0.0001}
}
self.locked_tokens: Dict[str, Dict] = {}
self.bridge_transactions = []
def lock_and_mint(self, from_chain: str, to_chain: str, token: str,
amount: float, user: str) -> str:
"""Lock and mint"""
if from_chain not in self.chains or to_chain not in self.chains:
raise ValueError("Unsupported chain")
bridge_fee = self.chains[from_chain]["bridge_fee"]
# Lock tokens on source chain
lock_key = f"{from_chain}_{token}"
if lock_key not in self.locked_tokens:
self.locked_tokens[lock_key] = 0.0
self.locked_tokens[lock_key] += amount
# Generate cross-chain transaction
tx_id = f"bridge_{len(self.bridge_transactions)}"
bridge_tx = {
"tx_id": tx_id,
"from_chain": from_chain,
"to_chain": to_chain,
"token": token,
"amount": amount,
"user": user,
"fee": bridge_fee,
"status": "pending",
"timestamp": time.time()
}
self.bridge_transactions.append(bridge_tx)
print(f"Cross-chain bridge: {amount} {token} from {from_chain} to {to_chain}")
print(f"Bridge fee: {bridge_fee} {self.chains[from_chain]['native_token']}")
print(f"Transaction ID: {tx_id}")
# Simulate confirmation process
time.sleep(0.1)
bridge_tx["status"] = "confirmed"
return tx_id
def burn_and_unlock(self, tx_id: str) -> bool:
"""Burn and unlock"""
# Find transaction
bridge_tx = None
for tx in self.bridge_transactions:
if tx["tx_id"] == tx_id:
bridge_tx = tx
break
if not bridge_tx or bridge_tx["status"] != "confirmed":
raise ValueError("Invalid bridge transaction")
# Burn tokens on target chain, unlock on source chain
from_chain = bridge_tx["from_chain"]
token = bridge_tx["token"]
amount = bridge_tx["amount"]
lock_key = f"{from_chain}_{token}"
if lock_key in self.locked_tokens:
self.locked_tokens[lock_key] -= amount
bridge_tx["status"] = "completed"
print(f"Cross-chain unlock completed: {tx_id}")
return True
def get_bridge_stats(self) -> Dict:
"""Get bridge statistics"""
total_volume = {}
total_fees = {}
for tx in self.bridge_transactions:
token = tx["token"]
amount = tx["amount"]
fee = tx["fee"]
total_volume[token] = total_volume.get(token, 0) + amount
total_fees[token] = total_fees.get(token, 0) + fee
return {
"total_transactions": len(self.bridge_transactions),
"total_volume": total_volume,
"total_fees": total_fees,
"locked_tokens": self.locked_tokens
}
# Cross-chain bridge demo
print(f"\n=== Cross-chain Bridge Demo ===")
bridge = CrossChainBridge()
# Cross-chain transfer USDT
tx1 = bridge.lock_and_mint("ethereum", "bsc", "USDT", 1000, "user1")
tx2 = bridge.lock_and_mint("bsc", "polygon", "USDT", 500, "user2")
# View bridge statistics
stats = bridge.get_bridge_stats()
print(f"\nBridge Statistics:")
print(f"Total transactions: {stats['total_transactions']}")
print(f"Total volume: {stats['total_volume']}")
print(f"Locked tokens: {stats['locked_tokens']}")
# Unlock operation
bridge.burn_and_unlock(tx1)
Chapter Summary
This chapter provides an in-depth study of DeFi and smart contract applications:
-
DeFi Core Mechanisms:
- Automated Market Makers (AMM)
- Yield farming rewards
- Decentralized exchange protocols
-
Lending Protocols:
- Collateralized lending mechanisms
- Interest rate calculation models
- Liquidation protection mechanisms
-
NFT Ecosystem:
- NFT minting and transfer
- Decentralized marketplaces
- Collection statistics analysis
-
Cross-chain Technology:
- Lock-and-mint mechanism
- Bridge protocol design
- Multi-chain asset management
These innovative applications demonstrate the enormous potential of blockchain technology in the financial sector, providing decentralized alternatives to traditional financial services.