Chapter 7: DeFi and Smart Contract Applications

Haiyue
24min

Chapter 7: DeFi and Smart Contract Applications

Learning Objectives
  • Understand the concept of Decentralized Finance (DeFi)
  • Learn about liquidity mining, staking, and other mechanisms
  • Study basic applications of smart contracts
  • Explore NFTs and other innovative applications

DeFi Ecosystem Overview

Decentralized Finance (DeFi) is a blockchain-based financial service that implements decentralized versions of traditional financial functions through smart contracts.

import time
import math
from typing import Dict, List
from dataclasses import dataclass

@dataclass
class LiquidityPool:
    """Liquidity Pool"""
    token_a: str
    token_b: str
    reserve_a: float
    reserve_b: float
    total_shares: float
    fee_rate: float = 0.003  # 0.3%

    def get_price(self, token: str) -> float:
        """Get token price"""
        if token == self.token_a:
            return self.reserve_b / self.reserve_a
        elif token == self.token_b:
            return self.reserve_a / self.reserve_b
        return 0.0

    def add_liquidity(self, amount_a: float, amount_b: float) -> float:
        """Add liquidity"""
        if self.total_shares == 0:
            # Initial liquidity addition
            shares = math.sqrt(amount_a * amount_b)
        else:
            # Proportional liquidity addition
            shares_a = amount_a * self.total_shares / self.reserve_a
            shares_b = amount_b * self.total_shares / self.reserve_b
            shares = min(shares_a, shares_b)

        self.reserve_a += amount_a
        self.reserve_b += amount_b
        self.total_shares += shares

        return shares

    def remove_liquidity(self, shares: float) -> tuple:
        """Remove liquidity"""
        if shares > self.total_shares:
            raise ValueError("Insufficient shares")

        amount_a = shares * self.reserve_a / self.total_shares
        amount_b = shares * self.reserve_b / self.total_shares

        self.reserve_a -= amount_a
        self.reserve_b -= amount_b
        self.total_shares -= shares

        return amount_a, amount_b

    def swap(self, token_in: str, amount_in: float) -> float:
        """Swap tokens"""
        if token_in == self.token_a:
            reserve_in = self.reserve_a
            reserve_out = self.reserve_b
        elif token_in == self.token_b:
            reserve_in = self.reserve_b
            reserve_out = self.reserve_a
        else:
            raise ValueError("Invalid input token")

        # AMM formula: x * y = k
        amount_in_with_fee = amount_in * (1 - self.fee_rate)
        amount_out = (amount_in_with_fee * reserve_out) / (reserve_in + amount_in_with_fee)

        # Update reserves
        if token_in == self.token_a:
            self.reserve_a += amount_in
            self.reserve_b -= amount_out
        else:
            self.reserve_b += amount_in
            self.reserve_a -= amount_out

        return amount_out

class YieldFarming:
    """Yield Farming"""

    def __init__(self, reward_token: str, reward_per_block: float):
        self.reward_token = reward_token
        self.reward_per_block = reward_per_block
        self.total_staked = 0.0
        self.last_reward_block = 0
        self.acc_reward_per_share = 0.0
        self.user_stakes: Dict[str, float] = {}
        self.user_debt: Dict[str, float] = {}

    def stake(self, user: str, amount: float):
        """Stake LP tokens"""
        self._update_pool()

        if user in self.user_stakes:
            # Calculate pending rewards
            pending = (self.user_stakes[user] * self.acc_reward_per_share / 1e12) - self.user_debt[user]
            if pending > 0:
                print(f"User {user} received reward: {pending:.6f} {self.reward_token}")

        self.user_stakes[user] = self.user_stakes.get(user, 0) + amount
        self.total_staked += amount
        self.user_debt[user] = self.user_stakes[user] * self.acc_reward_per_share / 1e12

        print(f"User {user} staked {amount} LP tokens")

    def unstake(self, user: str, amount: float):
        """Unstake"""
        if user not in self.user_stakes or self.user_stakes[user] < amount:
            raise ValueError("Insufficient staked amount")

        self._update_pool()

        # Calculate and distribute rewards
        pending = (self.user_stakes[user] * self.acc_reward_per_share / 1e12) - self.user_debt[user]
        if pending > 0:
            print(f"User {user} received reward: {pending:.6f} {self.reward_token}")

        self.user_stakes[user] -= amount
        self.total_staked -= amount
        self.user_debt[user] = self.user_stakes[user] * self.acc_reward_per_share / 1e12

        print(f"User {user} unstaked {amount} LP tokens")

    def _update_pool(self):
        """Update reward pool"""
        current_block = int(time.time() // 10)  # Simulated block number
        if current_block <= self.last_reward_block:
            return

        if self.total_staked > 0:
            blocks = current_block - self.last_reward_block
            reward = blocks * self.reward_per_block
            self.acc_reward_per_share += reward * 1e12 / self.total_staked

        self.last_reward_block = current_block

    def get_pending_reward(self, user: str) -> float:
        """View pending rewards"""
        if user not in self.user_stakes:
            return 0.0

        temp_acc = self.acc_reward_per_share
        current_block = int(time.time() // 10)

        if current_block > self.last_reward_block and self.total_staked > 0:
            blocks = current_block - self.last_reward_block
            reward = blocks * self.reward_per_block
            temp_acc += reward * 1e12 / self.total_staked

        return (self.user_stakes[user] * temp_acc / 1e12) - self.user_debt.get(user, 0)

# DeFi Demo
print("=== DeFi Ecosystem Demo ===")

# Create ETH/USDT liquidity pool
pool = LiquidityPool("ETH", "USDT", 0, 0, 0)

# Initialize liquidity
initial_eth = 100
initial_usdt = 300000  # 1 ETH = 3000 USDT
shares = pool.add_liquidity(initial_eth, initial_usdt)

print(f"Initial liquidity pool:")
print(f"ETH reserve: {pool.reserve_a}")
print(f"USDT reserve: {pool.reserve_b}")
print(f"ETH price: ${pool.get_price('ETH'):,.2f}")
print(f"LP tokens: {shares}")

# Swap operation
print(f"\n=== Token Swap ===")
usdt_out = pool.swap("ETH", 1)
print(f"1 ETH swapped for: {usdt_out:.2f} USDT")
print(f"New ETH price: ${pool.get_price('ETH'):,.2f}")

eth_out = pool.swap("USDT", 3000)
print(f"3000 USDT swapped for: {eth_out:.6f} ETH")
print(f"New ETH price: ${pool.get_price('ETH'):,.2f}")

# Yield farming
print(f"\n=== Yield Farming ===")
farm = YieldFarming("FARM", 10.0)  # 10 FARM tokens per block reward

# Users stake LP tokens
farm.stake("alice", 50)
farm.stake("bob", 30)

time.sleep(1)  # Simulate time passage

# View rewards
alice_reward = farm.get_pending_reward("alice")
bob_reward = farm.get_pending_reward("bob")

print(f"Alice's pending reward: {alice_reward:.6f} FARM")
print(f"Bob's pending reward: {bob_reward:.6f} FARM")

# Partial unstaking
farm.unstake("alice", 20)

Lending Protocol

class LendingProtocol:
    """Lending Protocol"""

    def __init__(self):
        self.markets: Dict[str, Dict] = {}
        self.user_supplies: Dict[str, Dict[str, float]] = {}
        self.user_borrows: Dict[str, Dict[str, float]] = {}
        self.interest_rates: Dict[str, float] = {}

    def create_market(self, token: str, supply_rate: float, borrow_rate: float,
                     collateral_factor: float):
        """Create lending market"""
        self.markets[token] = {
            "total_supply": 0.0,
            "total_borrow": 0.0,
            "supply_rate": supply_rate,
            "borrow_rate": borrow_rate,
            "collateral_factor": collateral_factor,  # Collateral ratio
            "price": 1.0  # USD price
        }

    def supply(self, user: str, token: str, amount: float):
        """Deposit assets"""
        if token not in self.markets:
            raise ValueError("Market does not exist")

        if user not in self.user_supplies:
            self.user_supplies[user] = {}

        self.user_supplies[user][token] = self.user_supplies[user].get(token, 0) + amount
        self.markets[token]["total_supply"] += amount

        print(f"{user} deposited {amount} {token}")

    def borrow(self, user: str, token: str, amount: float):
        """Borrow assets"""
        if token not in self.markets:
            raise ValueError("Market does not exist")

        # Check borrowing capacity
        borrow_limit = self.get_borrow_limit(user)
        current_borrows = self.get_total_borrows(user)

        if current_borrows + amount > borrow_limit:
            raise ValueError("Exceeds borrow limit")

        if user not in self.user_borrows:
            self.user_borrows[user] = {}

        self.user_borrows[user][token] = self.user_borrows[user].get(token, 0) + amount
        self.markets[token]["total_borrow"] += amount

        print(f"{user} borrowed {amount} {token}")

    def repay(self, user: str, token: str, amount: float):
        """Repay loan"""
        if user not in self.user_borrows or token not in self.user_borrows[user]:
            raise ValueError("No borrow record")

        current_borrow = self.user_borrows[user][token]
        repay_amount = min(amount, current_borrow)

        self.user_borrows[user][token] -= repay_amount
        self.markets[token]["total_borrow"] -= repay_amount

        print(f"{user} repaid {repay_amount} {token}")

    def get_borrow_limit(self, user: str) -> float:
        """Calculate borrow limit"""
        total_collateral_value = 0.0

        if user in self.user_supplies:
            for token, amount in self.user_supplies[user].items():
                if token in self.markets:
                    token_value = amount * self.markets[token]["price"]
                    collateral_value = token_value * self.markets[token]["collateral_factor"]
                    total_collateral_value += collateral_value

        return total_collateral_value

    def get_total_borrows(self, user: str) -> float:
        """Calculate total borrow value"""
        total_borrow_value = 0.0

        if user in self.user_borrows:
            for token, amount in self.user_borrows[user].items():
                if token in self.markets:
                    token_value = amount * self.markets[token]["price"]
                    total_borrow_value += token_value

        return total_borrow_value

    def liquidate(self, borrower: str, liquidator: str, token: str, amount: float):
        """Liquidate"""
        # Check if liquidation is needed
        borrow_limit = self.get_borrow_limit(borrower)
        total_borrows = self.get_total_borrows(borrower)

        if total_borrows <= borrow_limit:
            raise ValueError("Account is healthy, liquidation not required")

        # Execute liquidation logic
        liquidation_bonus = 0.05  # 5% liquidation bonus
        collateral_value = amount * (1 + liquidation_bonus)

        print(f"Liquidation executed: {liquidator} liquidated {borrower}'s {amount} {token}")
        print(f"Liquidation bonus: {liquidation_bonus:.1%}")

# Lending protocol demo
print(f"\n=== Lending Protocol Demo ===")

lending = LendingProtocol()

# Create markets
lending.create_market("ETH", 0.02, 0.05, 0.75)  # 75% collateral ratio
lending.create_market("USDT", 0.01, 0.03, 0.9)   # 90% collateral ratio

# Set prices
lending.markets["ETH"]["price"] = 3000
lending.markets["USDT"]["price"] = 1

# Alice deposits ETH as collateral
lending.supply("alice", "ETH", 2.0)

# Calculate Alice's borrowing capacity
alice_limit = lending.get_borrow_limit("alice")
print(f"Alice's borrow limit: ${alice_limit:,.2f}")

# Alice borrows USDT
try:
    lending.borrow("alice", "USDT", 4000)
    print("Borrowing successful")
except ValueError as e:
    print(f"Borrowing failed: {e}")

# Alice borrows less USDT
lending.borrow("alice", "USDT", 3000)

# View Alice's account status
alice_borrows = lending.get_total_borrows("alice")
print(f"Alice's total borrow value: ${alice_borrows:,.2f}")
print(f"Health ratio: {(alice_limit / alice_borrows):.2f}")

NFT Ecosystem

import json
from typing import Optional

class NFTContract:
    """NFT Contract"""

    def __init__(self, name: str, symbol: str):
        self.name = name
        self.symbol = symbol
        self.tokens: Dict[int, Dict] = {}
        self.owners: Dict[int, str] = {}
        self.approvals: Dict[int, str] = {}
        self.total_supply = 0

    def mint(self, to: str, token_id: int, metadata: Dict) -> bool:
        """Mint NFT"""
        if token_id in self.tokens:
            raise ValueError("Token ID already exists")

        self.tokens[token_id] = {
            "metadata": metadata,
            "minted_at": time.time(),
            "creator": to
        }
        self.owners[token_id] = to
        self.total_supply += 1

        print(f"NFT #{token_id} minted to {to}")
        return True

    def transfer(self, from_addr: str, to: str, token_id: int) -> bool:
        """Transfer NFT"""
        if token_id not in self.owners:
            raise ValueError("Token does not exist")

        if self.owners[token_id] != from_addr:
            raise ValueError("Unauthorized transfer")

        self.owners[token_id] = to
        # Clear approval
        self.approvals.pop(token_id, None)

        print(f"NFT #{token_id} transferred from {from_addr} to {to}")
        return True

    def approve(self, owner: str, spender: str, token_id: int):
        """Approve NFT"""
        if self.owners[token_id] != owner:
            raise ValueError("Only owner can approve")

        self.approvals[token_id] = spender
        print(f"NFT #{token_id} approved to {spender}")

    def get_metadata(self, token_id: int) -> Optional[Dict]:
        """Get metadata"""
        return self.tokens.get(token_id, {}).get("metadata")

    def owner_of(self, token_id: int) -> str:
        """Get owner"""
        return self.owners.get(token_id, "")

class NFTMarketplace:
    """NFT Marketplace"""

    def __init__(self):
        self.listings: Dict[str, Dict] = {}  # key: contract_token
        self.offers: Dict[str, List[Dict]] = {}
        self.sales_history: List[Dict] = []

    def list_nft(self, contract: NFTContract, token_id: int,
                seller: str, price: float, currency: str = "ETH"):
        """List NFT"""
        if contract.owner_of(token_id) != seller:
            raise ValueError("Only owner can list")

        listing_key = f"{contract.name}_{token_id}"
        self.listings[listing_key] = {
            "contract": contract,
            "token_id": token_id,
            "seller": seller,
            "price": price,
            "currency": currency,
            "listed_at": time.time(),
            "active": True
        }

        print(f"NFT #{token_id} listed for {price} {currency}")

    def buy_nft(self, contract_name: str, token_id: int, buyer: str) -> bool:
        """Buy NFT"""
        listing_key = f"{contract_name}_{token_id}"

        if listing_key not in self.listings:
            raise ValueError("NFT not listed")

        listing = self.listings[listing_key]

        if not listing["active"]:
            raise ValueError("NFT already delisted")

        # Execute transfer
        contract = listing["contract"]
        seller = listing["seller"]
        price = listing["price"]

        contract.transfer(seller, buyer, token_id)

        # Record sale
        sale = {
            "contract_name": contract_name,
            "token_id": token_id,
            "seller": seller,
            "buyer": buyer,
            "price": price,
            "currency": listing["currency"],
            "timestamp": time.time()
        }

        self.sales_history.append(sale)
        listing["active"] = False

        print(f"NFT #{token_id} sold: {seller} -> {buyer}, price: {price} {listing['currency']}")
        return True

    def make_offer(self, contract_name: str, token_id: int,
                  bidder: str, amount: float, currency: str = "ETH"):
        """Make offer"""
        offer_key = f"{contract_name}_{token_id}"

        if offer_key not in self.offers:
            self.offers[offer_key] = []

        offer = {
            "bidder": bidder,
            "amount": amount,
            "currency": currency,
            "timestamp": time.time(),
            "active": True
        }

        self.offers[offer_key].append(offer)
        print(f"Offer: {bidder} offered {amount} {currency} for NFT #{token_id}")

    def get_floor_price(self, contract_name: str) -> float:
        """Get floor price"""
        prices = []

        for listing_key, listing in self.listings.items():
            if (listing["active"] and
                listing_key.startswith(contract_name) and
                listing["currency"] == "ETH"):
                prices.append(listing["price"])

        return min(prices) if prices else 0.0

    def get_collection_stats(self, contract_name: str) -> Dict:
        """Get collection statistics"""
        # Aggregate sales history
        sales = [s for s in self.sales_history
                if s["contract_name"] == contract_name and s["currency"] == "ETH"]

        if not sales:
            return {"floor_price": 0, "volume": 0, "sales": 0}

        total_volume = sum(s["price"] for s in sales)
        avg_price = total_volume / len(sales)
        floor_price = self.get_floor_price(contract_name)

        return {
            "floor_price": floor_price,
            "volume": total_volume,
            "average_price": avg_price,
            "sales_count": len(sales),
            "unique_holders": len(set(s["buyer"] for s in sales))
        }

# NFT demo
print(f"\n=== NFT Ecosystem Demo ===")

# Create NFT collection
crypto_art = NFTContract("CryptoArt", "CA")
marketplace = NFTMarketplace()

# Artists mint NFTs
artists = ["artist1", "artist2", "artist3"]
artworks = [
    {"name": "Digital Dreams #1", "description": "Abstract digital art", "rarity": "Rare"},
    {"name": "Cyber Punk #42", "description": "Futuristic cityscape", "rarity": "Epic"},
    {"name": "Nature's Code", "description": "Generative nature art", "rarity": "Common"}
]

for i, (artist, metadata) in enumerate(zip(artists, artworks)):
    crypto_art.mint(artist, i+1, metadata)

# List NFTs
marketplace.list_nft(crypto_art, 1, "artist1", 2.5, "ETH")
marketplace.list_nft(crypto_art, 2, "artist2", 5.0, "ETH")
marketplace.list_nft(crypto_art, 3, "artist3", 1.2, "ETH")

# Buyers make offers
marketplace.make_offer("CryptoArt", 1, "collector1", 2.0, "ETH")
marketplace.make_offer("CryptoArt", 1, "collector2", 2.3, "ETH")

# Buy NFTs
marketplace.buy_nft("CryptoArt", 3, "collector1")
marketplace.buy_nft("CryptoArt", 1, "collector2")

# View collection stats
stats = marketplace.get_collection_stats("CryptoArt")
print(f"\nCryptoArt Collection Stats:")
print(f"Floor price: {stats['floor_price']:.2f} ETH")
print(f"Total volume: {stats['volume']:.2f} ETH")
print(f"Average price: {stats['average_price']:.2f} ETH")
print(f"Sales count: {stats['sales_count']}")

Cross-chain Bridge

class CrossChainBridge:
    """Cross-chain Bridge"""

    def __init__(self):
        self.chains = {
            "ethereum": {"native_token": "ETH", "bridge_fee": 0.01},
            "bsc": {"native_token": "BNB", "bridge_fee": 0.001},
            "polygon": {"native_token": "MATIC", "bridge_fee": 0.0001}
        }
        self.locked_tokens: Dict[str, Dict] = {}
        self.bridge_transactions = []

    def lock_and_mint(self, from_chain: str, to_chain: str, token: str,
                     amount: float, user: str) -> str:
        """Lock and mint"""
        if from_chain not in self.chains or to_chain not in self.chains:
            raise ValueError("Unsupported chain")

        bridge_fee = self.chains[from_chain]["bridge_fee"]

        # Lock tokens on source chain
        lock_key = f"{from_chain}_{token}"
        if lock_key not in self.locked_tokens:
            self.locked_tokens[lock_key] = 0.0

        self.locked_tokens[lock_key] += amount

        # Generate cross-chain transaction
        tx_id = f"bridge_{len(self.bridge_transactions)}"
        bridge_tx = {
            "tx_id": tx_id,
            "from_chain": from_chain,
            "to_chain": to_chain,
            "token": token,
            "amount": amount,
            "user": user,
            "fee": bridge_fee,
            "status": "pending",
            "timestamp": time.time()
        }

        self.bridge_transactions.append(bridge_tx)

        print(f"Cross-chain bridge: {amount} {token} from {from_chain} to {to_chain}")
        print(f"Bridge fee: {bridge_fee} {self.chains[from_chain]['native_token']}")
        print(f"Transaction ID: {tx_id}")

        # Simulate confirmation process
        time.sleep(0.1)
        bridge_tx["status"] = "confirmed"

        return tx_id

    def burn_and_unlock(self, tx_id: str) -> bool:
        """Burn and unlock"""
        # Find transaction
        bridge_tx = None
        for tx in self.bridge_transactions:
            if tx["tx_id"] == tx_id:
                bridge_tx = tx
                break

        if not bridge_tx or bridge_tx["status"] != "confirmed":
            raise ValueError("Invalid bridge transaction")

        # Burn tokens on target chain, unlock on source chain
        from_chain = bridge_tx["from_chain"]
        token = bridge_tx["token"]
        amount = bridge_tx["amount"]

        lock_key = f"{from_chain}_{token}"
        if lock_key in self.locked_tokens:
            self.locked_tokens[lock_key] -= amount

        bridge_tx["status"] = "completed"
        print(f"Cross-chain unlock completed: {tx_id}")

        return True

    def get_bridge_stats(self) -> Dict:
        """Get bridge statistics"""
        total_volume = {}
        total_fees = {}

        for tx in self.bridge_transactions:
            token = tx["token"]
            amount = tx["amount"]
            fee = tx["fee"]

            total_volume[token] = total_volume.get(token, 0) + amount
            total_fees[token] = total_fees.get(token, 0) + fee

        return {
            "total_transactions": len(self.bridge_transactions),
            "total_volume": total_volume,
            "total_fees": total_fees,
            "locked_tokens": self.locked_tokens
        }

# Cross-chain bridge demo
print(f"\n=== Cross-chain Bridge Demo ===")

bridge = CrossChainBridge()

# Cross-chain transfer USDT
tx1 = bridge.lock_and_mint("ethereum", "bsc", "USDT", 1000, "user1")
tx2 = bridge.lock_and_mint("bsc", "polygon", "USDT", 500, "user2")

# View bridge statistics
stats = bridge.get_bridge_stats()
print(f"\nBridge Statistics:")
print(f"Total transactions: {stats['total_transactions']}")
print(f"Total volume: {stats['total_volume']}")
print(f"Locked tokens: {stats['locked_tokens']}")

# Unlock operation
bridge.burn_and_unlock(tx1)

Chapter Summary

This chapter provides an in-depth study of DeFi and smart contract applications:

  1. DeFi Core Mechanisms:

    • Automated Market Makers (AMM)
    • Yield farming rewards
    • Decentralized exchange protocols
  2. Lending Protocols:

    • Collateralized lending mechanisms
    • Interest rate calculation models
    • Liquidation protection mechanisms
  3. NFT Ecosystem:

    • NFT minting and transfer
    • Decentralized marketplaces
    • Collection statistics analysis
  4. Cross-chain Technology:

    • Lock-and-mint mechanism
    • Bridge protocol design
    • Multi-chain asset management

These innovative applications demonstrate the enormous potential of blockchain technology in the financial sector, providing decentralized alternatives to traditional financial services.