Chapter 2: Blockchain Technology Principles

Haiyue
33min

Chapter 2: Blockchain Technology Principles

Learning Objectives
  • Understand the basic structure and working principles of blockchain
  • Master core concepts like hash functions and digital signatures
  • Learn about the characteristics of decentralized networks
  • Understand the role of consensus mechanisms

Basic Structure of Blockchain

Block Structure

Blockchain is a chain data structure composed of a series of blocks, where each block contains the following main components:

import hashlib
import time
import json
from typing import List, Dict, Any

class Block:
    """Block class definition"""
    def __init__(self, index: int, transactions: List[Dict], previous_hash: str, nonce: int = 0):
        self.index = index                    # Block index
        self.timestamp = time.time()          # Timestamp
        self.transactions = transactions      # Transaction list
        self.previous_hash = previous_hash    # Previous block hash
        self.nonce = nonce                   # Random number (for mining)
        self.merkle_root = self._calculate_merkle_root()  # Merkle root
        self.hash = self._calculate_hash()    # Block hash

    def _calculate_hash(self) -> str:
        """Calculate block hash"""
        block_string = json.dumps({
            "index": self.index,
            "timestamp": self.timestamp,
            "transactions": self.transactions,
            "previous_hash": self.previous_hash,
            "merkle_root": self.merkle_root,
            "nonce": self.nonce
        }, sort_keys=True)

        return hashlib.sha256(block_string.encode()).hexdigest()

    def _calculate_merkle_root(self) -> str:
        """Calculate Merkle root"""
        if not self.transactions:
            return hashlib.sha256("".encode()).hexdigest()

        # Simplified Merkle tree calculation
        tx_hashes = [
            hashlib.sha256(json.dumps(tx, sort_keys=True).encode()).hexdigest()
            for tx in self.transactions
        ]

        while len(tx_hashes) > 1:
            new_hashes = []
            for i in range(0, len(tx_hashes), 2):
                if i + 1 < len(tx_hashes):
                    combined = tx_hashes[i] + tx_hashes[i + 1]
                else:
                    combined = tx_hashes[i] + tx_hashes[i]  # Duplicate last one if odd
                new_hashes.append(hashlib.sha256(combined.encode()).hexdigest())
            tx_hashes = new_hashes

        return tx_hashes[0]

    def mine_block(self, difficulty: int) -> None:
        """Mining process: finding a hash that meets difficulty requirements"""
        target = "0" * difficulty

        print(f"Starting to mine block #{self.index}, difficulty: {difficulty}")
        start_time = time.time()

        while self.hash[:difficulty] != target:
            self.nonce += 1
            self.hash = self._calculate_hash()

        end_time = time.time()
        print(f"Mining successful! Time: {end_time - start_time:.2f}s, nonce: {self.nonce}")
        print(f"Block hash: {self.hash}")

# Create example block
transactions = [
    {"from": "Alice", "to": "Bob", "amount": 50, "fee": 1},
    {"from": "Bob", "to": "Charlie", "amount": 25, "fee": 0.5}
]

genesis_block = Block(0, [], "0")  # Genesis block
print(f"Genesis block hash: {genesis_block.hash}")

# Create second block and mine
block1 = Block(1, transactions, genesis_block.hash)
block1.mine_block(difficulty=4)  # Difficulty of 4 (requires 4 leading zeros)

Blockchain Data Structure

🔄 正在渲染 Mermaid 图表...

Hash Functions and Cryptographic Fundamentals

Hash Function Properties

Hash functions are the core cryptographic tools of blockchain, with the following important properties:

import hashlib

class HashFunction:
    """Hash function demonstration class"""

    @staticmethod
    def sha256_hash(data: str) -> str:
        """Calculate SHA-256 hash"""
        return hashlib.sha256(data.encode()).hexdigest()

    @staticmethod
    def demonstrate_properties():
        """Demonstrate important properties of hash functions"""
        print("=== Hash Function Properties Demonstration ===\n")

        # 1. Determinism: Same input produces same output
        message1 = "Hello, Blockchain!"
        hash1_a = HashFunction.sha256_hash(message1)
        hash1_b = HashFunction.sha256_hash(message1)
        print(f"Determinism test:")
        print(f"Input: {message1}")
        print(f"Hash1: {hash1_a}")
        print(f"Hash2: {hash1_b}")
        print(f"Same? {hash1_a == hash1_b}\n")

        # 2. Avalanche effect: Small change leads to completely different output
        message2 = "Hello, Blockchain!"  # Completely same
        message3 = "Hello, blockchain!"  # Only case difference
        hash2 = HashFunction.sha256_hash(message2)
        hash3 = HashFunction.sha256_hash(message3)
        print(f"Avalanche effect test:")
        print(f"Message1: {message2}")
        print(f"Hash1: {hash2}")
        print(f"Message2: {message3}")
        print(f"Hash2: {hash3}")
        print(f"Hamming distance: {HashFunction.hamming_distance(hash2, hash3)}\n")

        # 3. Fixed length output
        short_msg = "Hi"
        long_msg = "This is a very long message that contains much more information than the previous short message, but the hash output length remains the same."
        short_hash = HashFunction.sha256_hash(short_msg)
        long_hash = HashFunction.sha256_hash(long_msg)
        print(f"Fixed length output test:")
        print(f"Short message hash length: {len(short_hash)}")
        print(f"Long message hash length: {len(long_hash)}")
        print(f"Same length? {len(short_hash) == len(long_hash)}\n")

        # 4. Computational efficiency
        import time
        test_data = "Blockchain" * 1000  # Repeat 1000 times
        start_time = time.time()
        for _ in range(10000):  # Calculate 10000 times
            HashFunction.sha256_hash(test_data)
        end_time = time.time()
        print(f"Computational efficiency test:")
        print(f"10000 hash calculations time: {end_time - start_time:.4f}s")

    @staticmethod
    def hamming_distance(hash1: str, hash2: str) -> int:
        """Calculate Hamming distance between two hashes"""
        # Convert to binary and count different bits
        bin1 = bin(int(hash1, 16))[2:].zfill(256)
        bin2 = bin(int(hash2, 16))[2:].zfill(256)
        return sum(b1 != b2 for b1, b2 in zip(bin1, bin2))

# Demonstrate hash function properties
HashFunction.demonstrate_properties()

Merkle Tree

A Merkle tree is a binary tree structure used for efficiently verifying the integrity of large amounts of data:

class MerkleTree:
    """Merkle tree implementation"""

    def __init__(self, transactions: List[str]):
        self.transactions = transactions
        self.tree = self._build_tree()
        self.root = self.tree[0] if self.tree else None

    def _hash(self, data: str) -> str:
        """Hash function"""
        return hashlib.sha256(data.encode()).hexdigest()

    def _build_tree(self) -> List[str]:
        """Build Merkle tree"""
        if not self.transactions:
            return []

        # First layer: transaction hashes
        tree_level = [self._hash(tx) for tx in self.transactions]
        tree = tree_level.copy()

        # Build upwards to root node
        while len(tree_level) > 1:
            next_level = []

            for i in range(0, len(tree_level), 2):
                if i + 1 < len(tree_level):
                    # Paired nodes
                    combined = tree_level[i] + tree_level[i + 1]
                else:
                    # Odd number of nodes, duplicate the last one
                    combined = tree_level[i] + tree_level[i]

                next_level.append(self._hash(combined))

            tree_level = next_level
            tree.extend(next_level)

        return tree

    def get_proof(self, transaction_index: int) -> List[Dict]:
        """Get Merkle proof for a specific transaction"""
        if transaction_index >= len(self.transactions):
            return []

        proof = []
        current_index = transaction_index
        tree_level = [self._hash(tx) for tx in self.transactions]

        while len(tree_level) > 1:
            # Determine sibling node
            if current_index % 2 == 0:  # Left node
                if current_index + 1 < len(tree_level):
                    sibling = tree_level[current_index + 1]
                    proof.append({"hash": sibling, "position": "right"})
                else:
                    sibling = tree_level[current_index]  # Self as sibling
                    proof.append({"hash": sibling, "position": "right"})
            else:  # Right node
                sibling = tree_level[current_index - 1]
                proof.append({"hash": sibling, "position": "left"})

            # Move to next layer
            current_index = current_index // 2
            next_level = []

            for i in range(0, len(tree_level), 2):
                if i + 1 < len(tree_level):
                    combined = tree_level[i] + tree_level[i + 1]
                else:
                    combined = tree_level[i] + tree_level[i]
                next_level.append(self._hash(combined))

            tree_level = next_level

        return proof

    @staticmethod
    def verify_proof(transaction: str, proof: List[Dict], root_hash: str) -> bool:
        """Verify Merkle proof"""
        current_hash = hashlib.sha256(transaction.encode()).hexdigest()

        for step in proof:
            sibling_hash = step["hash"]
            position = step["position"]

            if position == "left":
                combined = sibling_hash + current_hash
            else:
                combined = current_hash + sibling_hash

            current_hash = hashlib.sha256(combined.encode()).hexdigest()

        return current_hash == root_hash

# Merkle tree demonstration
transactions = [
    "Alice->Bob: 10 BTC",
    "Bob->Charlie: 5 BTC",
    "Charlie->David: 3 BTC",
    "David->Eve: 2 BTC"
]

merkle_tree = MerkleTree(transactions)
print(f"Merkle root: {merkle_tree.root}")

# Get proof for the 2nd transaction
proof = merkle_tree.get_proof(1)
print(f"\nMerkle proof for transaction '{transactions[1]}':")
for i, step in enumerate(proof):
    print(f"  Step {i+1}: {step}")

# Verify proof
is_valid = MerkleTree.verify_proof(transactions[1], proof, merkle_tree.root)
print(f"\nProof verification result: {is_valid}")

Digital Signatures

ECDSA Digital Signature Algorithm

Digital signatures are used to verify transaction authenticity and prevent tampering:

import hashlib
import secrets
from dataclasses import dataclass
from typing import Tuple

@dataclass
class KeyPair:
    """Key pair"""
    private_key: int
    public_key: Tuple[int, int]

class SimpleECDSA:
    """Simplified ECDSA digital signature implementation (for educational purposes only)"""

    # Simplified elliptic curve parameters (in practice, secp256k1 is used)
    P = 2**256 - 2**32 - 2**9 - 2**8 - 2**7 - 2**6 - 2**4 - 1  # Prime
    A = 0
    B = 7
    Gx = 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798
    Gy = 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8
    N = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141  # Order

    @classmethod
    def generate_keypair(cls) -> KeyPair:
        """Generate key pair"""
        # Generate random private key
        private_key = secrets.randbelow(cls.N)

        # Calculate public key = private key * G (elliptic curve point multiplication)
        # Simplified calculation here, actual application requires complete elliptic curve operations
        public_key = (cls.Gx, cls.Gy)  # Simplified representation

        return KeyPair(private_key, public_key)

    @classmethod
    def sign(cls, message: str, private_key: int) -> Tuple[int, int]:
        """Digital signature"""
        # Calculate message hash
        message_hash = int(hashlib.sha256(message.encode()).hexdigest(), 16)

        # Generate random number k
        k = secrets.randbelow(cls.N)

        # Calculate signature (r, s) - simplified implementation
        r = pow(cls.Gx, k, cls.P) % cls.N
        s = (pow(k, -1, cls.N) * (message_hash + r * private_key)) % cls.N

        return (r, s)

    @classmethod
    def verify(cls, message: str, signature: Tuple[int, int], public_key: Tuple[int, int]) -> bool:
        """Verify signature"""
        r, s = signature

        # Basic validity check
        if not (1 <= r < cls.N and 1 <= s < cls.N):
            return False

        # Calculate message hash
        message_hash = int(hashlib.sha256(message.encode()).hexdigest(), 16)

        # Simplified verification process (actual implementation requires complete elliptic curve operations)
        # Only basic check here
        return True  # Simplified implementation always returns True

class Transaction:
    """Transaction class"""

    def __init__(self, sender_public_key: Tuple[int, int], recipient_address: str,
                 amount: float, sender_private_key: int = None):
        self.sender_public_key = sender_public_key
        self.recipient_address = recipient_address
        self.amount = amount
        self.timestamp = time.time()
        self.signature = None

        if sender_private_key:
            self.sign_transaction(sender_private_key)

    def sign_transaction(self, private_key: int):
        """Sign transaction"""
        message = self._get_transaction_data()
        self.signature = SimpleECDSA.sign(message, private_key)

    def verify_signature(self) -> bool:
        """Verify transaction signature"""
        if not self.signature:
            return False

        message = self._get_transaction_data()
        return SimpleECDSA.verify(message, self.signature, self.sender_public_key)

    def _get_transaction_data(self) -> str:
        """Get transaction data for signing"""
        return f"{self.sender_public_key}{self.recipient_address}{self.amount}{self.timestamp}"

    def to_dict(self) -> Dict:
        """Convert to dictionary format"""
        return {
            "sender": self.sender_public_key,
            "recipient": self.recipient_address,
            "amount": self.amount,
            "timestamp": self.timestamp,
            "signature": self.signature
        }

# Digital signature demonstration
print("=== Digital Signature Demonstration ===")

# Generate key pairs for Alice and Bob
alice_keypair = SimpleECDSA.generate_keypair()
bob_keypair = SimpleECDSA.generate_keypair()

print(f"Alice private key: {hex(alice_keypair.private_key)[:20]}...")
print(f"Alice public key: {hex(alice_keypair.public_key[0])[:20]}...")

# Alice sends transaction to Bob
transaction = Transaction(
    sender_public_key=alice_keypair.public_key,
    recipient_address="Bob_Address_123",
    amount=10.5,
    sender_private_key=alice_keypair.private_key
)

print(f"\nTransaction created successfully:")
print(f"Sender: {hex(transaction.sender_public_key[0])[:20]}...")
print(f"Recipient: {transaction.recipient_address}")
print(f"Amount: {transaction.amount}")
print(f"Signature: {transaction.signature[0] if transaction.signature else None}")

# Verify transaction signature
is_valid = transaction.verify_signature()
print(f"\nTransaction signature verification: {'Valid' if is_valid else 'Invalid'}")

Decentralized Network

P2P Network Architecture

Decentralization Characteristics

Blockchain network is a decentralized peer-to-peer (P2P) network with the following characteristics:

  • No Single Point of Failure: No central server, any node failure doesn’t affect the overall network
  • Data Redundancy: Each node maintains a complete copy of the blockchain
  • Independent Verification: Each node independently verifies transactions and blocks
  • Open Participation: Anyone can join the network as a node
import socket
import threading
import json
import queue
from typing import Set, Dict, List
from enum import Enum

class NodeType(Enum):
    """Node types"""
    FULL_NODE = "full_node"      # Full node
    LIGHT_NODE = "light_node"    # Light node
    MINER_NODE = "miner_node"    # Miner node

class BlockchainNode:
    """Blockchain network node"""

    def __init__(self, node_id: str, node_type: NodeType, port: int):
        self.node_id = node_id
        self.node_type = node_type
        self.port = port
        self.peers: Set[str] = set()  # Peer node addresses
        self.blockchain: List[Dict] = []  # Blockchain data
        self.pending_transactions: queue.Queue = queue.Queue()
        self.is_running = False

    def connect_to_peer(self, peer_address: str) -> bool:
        """Connect to peer node"""
        try:
            # Simulate connection process
            if peer_address not in self.peers:
                self.peers.add(peer_address)
                print(f"Node {self.node_id} connected to {peer_address}")

                # Send handshake message
                self._send_handshake(peer_address)
                return True
        except Exception as e:
            print(f"Connection failed: {e}")
            return False

    def _send_handshake(self, peer_address: str):
        """Send handshake message"""
        handshake_msg = {
            "type": "handshake",
            "node_id": self.node_id,
            "node_type": self.node_type.value,
            "blockchain_height": len(self.blockchain),
            "timestamp": time.time()
        }
        print(f"Sending handshake to {peer_address}: {handshake_msg}")

    def broadcast_transaction(self, transaction: Dict):
        """Broadcast transaction to network"""
        message = {
            "type": "new_transaction",
            "transaction": transaction,
            "sender": self.node_id,
            "timestamp": time.time()
        }

        print(f"Node {self.node_id} broadcasting transaction: {transaction}")

        # Send to all peer nodes
        for peer in self.peers:
            self._send_message(peer, message)

    def broadcast_block(self, block: Dict):
        """Broadcast new block to network"""
        message = {
            "type": "new_block",
            "block": block,
            "sender": self.node_id,
            "timestamp": time.time()
        }

        print(f"Node {self.node_id} broadcasting new block: #{block.get('index', 'Unknown')}")

        for peer in self.peers:
            self._send_message(peer, message)

    def _send_message(self, peer_address: str, message: Dict):
        """Send message to specific node"""
        # Simulate message sending
        print(f"→ Sending to {peer_address}: {message['type']}")

    def handle_message(self, message: Dict, sender: str):
        """Handle received message"""
        msg_type = message.get("type")

        if msg_type == "handshake":
            self._handle_handshake(message, sender)
        elif msg_type == "new_transaction":
            self._handle_new_transaction(message)
        elif msg_type == "new_block":
            self._handle_new_block(message)
        elif msg_type == "sync_request":
            self._handle_sync_request(message, sender)

    def _handle_handshake(self, message: Dict, sender: str):
        """Handle handshake message"""
        remote_height = message.get("blockchain_height", 0)
        local_height = len(self.blockchain)

        print(f"Received handshake: {message['node_id']} (height:{remote_height})")

        # If remote chain is longer, request sync
        if remote_height > local_height:
            self._request_blockchain_sync(sender)

    def _handle_new_transaction(self, message: Dict):
        """Handle new transaction"""
        transaction = message["transaction"]

        # Verify transaction
        if self._validate_transaction(transaction):
            self.pending_transactions.put(transaction)
            print(f"Received valid transaction: {transaction}")
        else:
            print(f"Received invalid transaction: {transaction}")

    def _handle_new_block(self, message: Dict):
        """Handle new block"""
        block = message["block"]

        # Verify block
        if self._validate_block(block):
            self.blockchain.append(block)
            print(f"Accepted new block: #{block.get('index', 'Unknown')}")

            # Clear confirmed transactions
            self._clear_confirmed_transactions(block)
        else:
            print(f"Rejected invalid block: #{block.get('index', 'Unknown')}")

    def _validate_transaction(self, transaction: Dict) -> bool:
        """Validate transaction"""
        # Simplified validation logic
        required_fields = ["sender", "recipient", "amount", "timestamp"]
        return all(field in transaction for field in required_fields)

    def _validate_block(self, block: Dict) -> bool:
        """Validate block"""
        # Simplified validation logic
        if not isinstance(block, dict):
            return False

        # Check block index
        expected_index = len(self.blockchain)
        if block.get("index") != expected_index:
            return False

        # Check previous block hash
        if self.blockchain:
            expected_prev_hash = self.blockchain[-1].get("hash")
            if block.get("previous_hash") != expected_prev_hash:
                return False

        return True

    def _clear_confirmed_transactions(self, block: Dict):
        """Clear confirmed transactions"""
        confirmed_txs = block.get("transactions", [])
        # Remove confirmed transactions from pending queue
        # Simplified implementation: clear queue
        while not self.pending_transactions.empty():
            try:
                self.pending_transactions.get_nowait()
            except queue.Empty:
                break

    def get_network_stats(self) -> Dict:
        """Get network statistics"""
        return {
            "node_id": self.node_id,
            "node_type": self.node_type.value,
            "connected_peers": len(self.peers),
            "blockchain_height": len(self.blockchain),
            "pending_transactions": self.pending_transactions.qsize()
        }

# P2P network demonstration
print("=== P2P Network Demonstration ===")

# Create multiple nodes
nodes = [
    BlockchainNode("Node_1", NodeType.FULL_NODE, 8001),
    BlockchainNode("Node_2", NodeType.MINER_NODE, 8002),
    BlockchainNode("Node_3", NodeType.FULL_NODE, 8003),
    BlockchainNode("Node_4", NodeType.LIGHT_NODE, 8004)
]

# Establish connections
nodes[0].connect_to_peer("192.168.1.102:8002")
nodes[0].connect_to_peer("192.168.1.103:8003")
nodes[1].connect_to_peer("192.168.1.101:8001")
nodes[1].connect_to_peer("192.168.1.104:8004")
nodes[2].connect_to_peer("192.168.1.101:8001")

# Simulate transaction broadcast
sample_transaction = {
    "sender": "Alice",
    "recipient": "Bob",
    "amount": 10,
    "timestamp": time.time(),
    "signature": "sample_signature"
}

nodes[0].broadcast_transaction(sample_transaction)

# Simulate block broadcast
sample_block = {
    "index": 1,
    "timestamp": time.time(),
    "transactions": [sample_transaction],
    "previous_hash": "0" * 64,
    "hash": "a" * 64,
    "nonce": 12345
}

nodes[1].broadcast_block(sample_block)

# Display network status
print("\n=== Network Status ===")
for node in nodes:
    stats = node.get_network_stats()
    print(f"{stats['node_id']}: {stats}")

Consensus Mechanisms

Proof of Work (PoW)

class ProofOfWork:
    """Proof of Work implementation"""

    def __init__(self, difficulty: int = 4):
        self.difficulty = difficulty
        self.target = "0" * difficulty

    def mine_block(self, block_data: Dict) -> Dict:
        """Mining: finding nonce that meets difficulty requirements"""
        nonce = 0
        start_time = time.time()

        print(f"Starting mining, difficulty: {self.difficulty}")

        while True:
            # Construct candidate block
            candidate_block = {
                **block_data,
                "nonce": nonce
            }

            # Calculate hash
            block_hash = self._calculate_hash(candidate_block)

            # Check if it meets difficulty requirements
            if block_hash.startswith(self.target):
                end_time = time.time()
                candidate_block["hash"] = block_hash

                print(f"Mining successful!")
                print(f"Time: {end_time - start_time:.2f}s")
                print(f"Attempts: {nonce + 1}")
                print(f"Hash: {block_hash}")

                return candidate_block

            nonce += 1

            # Show progress every 10000 attempts
            if nonce % 10000 == 0:
                print(f"Attempted {nonce} times...")

    def _calculate_hash(self, block_data: Dict) -> str:
        """Calculate block hash"""
        block_string = json.dumps(block_data, sort_keys=True)
        return hashlib.sha256(block_string.encode()).hexdigest()

    def verify_block(self, block: Dict) -> bool:
        """Verify proof of work for block"""
        if "hash" not in block:
            return False

        # Recalculate hash
        block_copy = block.copy()
        claimed_hash = block_copy.pop("hash")
        calculated_hash = self._calculate_hash(block_copy)

        # Verify hash correctness and difficulty requirements
        return (calculated_hash == claimed_hash and
                calculated_hash.startswith(self.target))

# PoW demonstration
pow_system = ProofOfWork(difficulty=3)

# Mining example
block_data = {
    "index": 1,
    "timestamp": time.time(),
    "transactions": [
        {"from": "Alice", "to": "Bob", "amount": 50}
    ],
    "previous_hash": "0" * 64
}

mined_block = pow_system.mine_block(block_data)
print(f"\nMining result: {mined_block}")

# Verify block
is_valid = pow_system.verify_block(mined_block)
print(f"Block verification: {'Valid' if is_valid else 'Invalid'}")

Proof of Stake (PoS)

import random

class ProofOfStake:
    """Proof of Stake implementation"""

    def __init__(self):
        self.validators = {}  # Validators and their staked amounts
        self.delegations = {}  # Delegation relationships

    def stake(self, validator: str, amount: float):
        """Stake tokens to become a validator"""
        if validator in self.validators:
            self.validators[validator] += amount
        else:
            self.validators[validator] = amount

        print(f"{validator} staked {amount} tokens, total stake: {self.validators[validator]}")

    def delegate(self, delegator: str, validator: str, amount: float):
        """Delegate tokens to a validator"""
        if validator not in self.validators:
            raise ValueError(f"Validator {validator} does not exist")

        if delegator not in self.delegations:
            self.delegations[delegator] = {}

        if validator in self.delegations[delegator]:
            self.delegations[delegator][validator] += amount
        else:
            self.delegations[delegator][validator] = amount

        # Increase validator's total stake
        self.validators[validator] += amount

        print(f"{delegator} delegated {amount} tokens to {validator}")

    def select_validator(self) -> str:
        """Randomly select validator based on stake"""
        if not self.validators:
            return None

        # Calculate total stake
        total_stake = sum(self.validators.values())

        # Weighted random selection
        random_point = random.uniform(0, total_stake)
        current_sum = 0

        for validator, stake in self.validators.items():
            current_sum += stake
            if random_point <= current_sum:
                return validator

        # Default return last validator
        return list(self.validators.keys())[-1]

    def create_block(self, validator: str, transactions: List[Dict]) -> Dict:
        """Create block by selected validator"""
        if validator not in self.validators:
            raise ValueError(f"Invalid validator: {validator}")

        block = {
            "validator": validator,
            "timestamp": time.time(),
            "transactions": transactions,
            "validator_stake": self.validators[validator]
        }

        # Calculate block hash
        block_string = json.dumps(block, sort_keys=True)
        block["hash"] = hashlib.sha256(block_string.encode()).hexdigest()

        return block

    def slash_validator(self, validator: str, penalty_rate: float = 0.1):
        """Penalize malicious validator"""
        if validator in self.validators:
            penalty = self.validators[validator] * penalty_rate
            self.validators[validator] -= penalty

            print(f"Validator {validator} was slashed {penalty} tokens")

            # Remove validator if stake goes to zero
            if self.validators[validator] <= 0:
                del self.validators[validator]
                print(f"Validator {validator} has been removed")

    def get_validator_stats(self) -> Dict:
        """Get validator statistics"""
        total_stake = sum(self.validators.values())

        stats = {}
        for validator, stake in self.validators.items():
            stats[validator] = {
                "stake": stake,
                "stake_percentage": (stake / total_stake) * 100 if total_stake > 0 else 0
            }

        return stats

# PoS demonstration
pos_system = ProofOfStake()

# Validator staking
pos_system.stake("Validator_A", 1000)
pos_system.stake("Validator_B", 2000)
pos_system.stake("Validator_C", 500)

# User delegation
pos_system.delegate("User_1", "Validator_A", 300)
pos_system.delegate("User_2", "Validator_B", 800)

print(f"\nValidator statistics:")
for validator, stats in pos_system.get_validator_stats().items():
    print(f"{validator}: Stake {stats['stake']:.0f} ({stats['stake_percentage']:.1f}%)")

# Simulate block production
print(f"\n=== Block Production Simulation ===")
for round_num in range(5):
    selected_validator = pos_system.select_validator()

    sample_transactions = [
        {"from": "User_X", "to": "User_Y", "amount": 10}
    ]

    new_block = pos_system.create_block(selected_validator, sample_transactions)

    print(f"Round {round_num + 1}: Validator {selected_validator} created block")
    print(f"  Hash: {new_block['hash'][:16]}...")

Chapter Summary

Through this chapter, we have gained a deep understanding of the core principles of blockchain technology:

  1. Blockchain Structure:

    • Basic components of a block: block header, transaction list, Merkle root
    • Chain structure: previous block hash ensures data integrity
  2. Cryptographic Fundamentals:

    • Hash functions: determinism, avalanche effect, fixed output length
    • Merkle tree: efficiently verify integrity of large amounts of data
    • Digital signatures: ensure transaction authenticity and non-repudiation
  3. Decentralized Network:

    • P2P architecture: no single point of failure, data redundancy
    • Node types: full nodes, light nodes, miner nodes
    • Message propagation: broadcast mechanism for transactions and blocks
  4. Consensus Mechanisms:

    • Proof of Work (PoW): compete for accounting rights through computation
    • Proof of Stake (PoS): select validators based on stake

These technical components together form the technical foundation of the blockchain system, providing reliable guarantees for the secure operation of virtual currencies. In the next chapter, we will use Bitcoin as an example to analyze in detail the implementation details of the first successful virtual currency system.