Python使用ecdh算法交换共享秘钥

Désiré / 2023-07-22 / 原文

dh_server.py:

from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
import socket

def ecdh_generater(received_public_key):
    # Generate private key
    private_key = ec.generate_private_key(ec.SECP256R1())

    # Get public key
    public_key = private_key.public_key()

    # Deserialize public key received from other party
    received_public_key = serialization.load_pem_public_key(received_public_key)

    # Perform key exchange
    shared_key = private_key.exchange(ec.ECDH(), received_public_key)

    # Serialize public key to send to other party
    serialized_public_key = public_key.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    )

    return serialized_public_key, private_key, shared_key

if __name__ == '__main__':
    # 创建 socket 对象
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    connect_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # 监听端口
    server_socket.bind(("127.0.0.1", 11221))
    server_socket.listen()

    while True:
        # 等待连接
        client_socket, addr = server_socket.accept()

        # 打印对端地址
        print(f"Connection from {addr[0]}:{addr[1]}")

        # 接收对端消息
        data = client_socket.recv(1024)

        # 生成对称秘钥
        serialized_public_key, _, shared_key = ecdh_generater(data)
        print(shared_key.hex())

        # 连接对端
        connect_socket.connect(("127.0.0.1", 11220))

        # 发送公钥
        connect_socket.sendall(serialized_public_key)
        
        # 关闭socket连接
        client_socket.close()
        break

dh_client.py:

from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
import socket

def ecdh_generater():
    # Generate private key
    private_key = ec.generate_private_key(ec.SECP256R1())

    # Get public key
    public_key = private_key.public_key()

    # Serialize public key to send to other party
    serialized_public_key = public_key.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    )
    return serialized_public_key, private_key

if __name__ == '__main__':
    # 创建 socket 对象
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    connect_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 监听端口
    server_socket.bind(("127.0.0.1", 11220))
    server_socket.listen()

    # 连接服务端
    connect_socket.connect(("127.0.0.1", 11221))

    # 生成公私钥
    serialized_public_key, private_key = ecdh_generater()

    # 发送公钥
    connect_socket.sendall(serialized_public_key)

    while True:
        # 等待连接
        client_socket, addr = server_socket.accept()

        # 打印对端地址
        print(f"Connection from {addr[0]}:{addr[1]}")

        # 接收对端消息
        data = client_socket.recv(1024)
        
        # 公钥解码
        received_public_key = serialization.load_pem_public_key(data)

        # 共享秘钥
        shared_key = private_key.exchange(ec.ECDH(), received_public_key)
        print(shared_key.hex())

        # 关闭连接
        client_socket.close()
        break