PitchHut logo
High-performance decentralized analytics engine for big data.
Pitch

KineQuet, the Kinetic Parquet analytics engine, leverages pure Python and C++ to deliver high-performance decentralized big data processing. It eliminates JVM overhead and features atomic concurrency control using SQLite, making it a robust alternative to traditional architectures.

Description

KineQuet (Kinetic Parquet)

High-Performance Decentralized Big Data Engine
KineQuet is a high-performance analytical engine developed in pure Python, designed with a strong emphasis on mechanical sympathy. This engine replaces the significant overhead of the Java Virtual Machine (JVM) seen in solutions like Spark, allowing for native execution in C++ alongside decentralized coordination.

Instead of relying on a fragile Master Node, KineQuet employs SQLite in WAL mode for atomic concurrency control and Jump Consistent Hash (JCH) for O(1) partition routing. Resource-intensive processing bypasses the Python Global Interpreter Lock (GIL) through Zero-Copy reading utilizing Apache Arrow and vectorized analytical execution with DuckDB.


Architectural Principles

  1. Zero Master Node: Eliminates any single point of logical failure in network orchestration.
  2. Mechanical Sympathy: Data is never iterated over in pure Python; heavy lifting is done in C++.
  3. State Isolation: Workers operate autonomously ("ephemeral kinetic cells"). If one fails, another assumes control of its bucket.
  4. Non-Blocking I/O: Network latency (via Kafka/AWS SQS) is isolated from the CPU loop through native ThreadPools.

System Topology (C4 Container)

The diagram below illustrates the physical and logical flow of data through the KineQuet engine.

graph TD  
    subgraph Data Lake  
        P1[(Giant Parquet Table)]  
    end  
    subgraph KineQuet Control Plane  
        C[SQLite Coordinator <br/> WAL Mode]  
        JCH((Jump Consistent Hash))  
    end  
    subgraph Worker Pool [Independent Distributed Nodes]  
        W1[Worker 1 <br/> Buckets 0-33]  
        W2[Worker 2 <br/> Buckets 34-66]  
        W3[Worker 3 <br/> Buckets 67-100]  
    end  
    subgraph Execution Engine [C++ Zero-Copy]  
        PA(PyArrow Reader <br/> Row Group Extraction)  
        DDB(DuckDB SQL <br/> Vectorized Execution)  
    end  
    subgraph Egress & Materialization  
        S3[(Stage Datalake <br/> Event Sourcing)]  
        K{Kafka / AWS SQS}  
    end  
    P1 -. Metadata .-> C  
    C -- Numeric Slicing --> JCH  
    JCH -- Defines Routing --> C  
    W1 <== Atomic Lock Limit ==> C  
    W2 <== Atomic Lock Limit ==> C  
    W3 <== Atomic Lock Limit ==> C  
    W1 --> PA  
    PA -- In-Memory Table --> DDB  
    DDB -- Immutable Writer --> S3  
    DDB -- Asynchronous ThreadPool --> K  
    classDef db fill:#2c3e50,stroke:#34495e,stroke-width:2px,color:#fff;  
    classDef worker fill:#d35400,stroke:#e67e22,stroke-width:2px,color:#fff;  
    classDef core fill:#2980b9,stroke:#3498db,stroke-width:2px,color:#fff;  
    class P1,S3,C db;  
    class W1,W2,W3 worker;  
    class PA,DDB core;  

This diagram details the chronological order of operations, from planning (Lazy Scan) to discard for the network. Notice how the asynchronous network does not block the Worker from fetching a new Shard.

sequenceDiagram  
    autonumber  
    actor Dev as Data Engineer  
    participant Core as KineQuet Session  
    participant DB as SQLite (WAL)  
    participant W as Worker Node  
    participant Arrow as PyArrow (C++)  
    participant SQL as DuckDB  
    participant Net as Async Publisher  
    Dev->>Core: scan("s3://bucket/data.parquet")  
    Core->>Arrow: Extract Metadata (num_row_groups)  
    Arrow-->>Core: Returns metadata  
    Core->>DB: register_parquet_file() (Maps via JCH)  
    loop Worker Continuous Loop  
        W->>DB: claim_shard_from_bucket(my_buckets)  
        DB-->>W: Acquired Lock (shard_id, rg_index)  
        W->>Arrow: read_row_group(rg_index)  
        Note right of Arrow: Zero-Copy Reading (Bypasses GIL)  
        Arrow-->>W: Arrow Table  
        W->>SQL: execute_query("SELECT ... FROM shard_data")  
        Note right of SQL: C++ Vectorized Execution  
        SQL-->>W: Filtered/Aggregated Table  
        W->>Net: dispatch(result)  
        Note right of Net: Releases GIL on Secondary Thread  
        W->>DB: mark_completed(shard_id)  
        Net-->>AWS/Kafka: HTTP/TCP I/O (In Background)  
    end  

State Machine (Shard Cycle)

To guarantee reliability (fault-tolerance) and adhere to the Exactly-Once or At-Least-Once principles, each extracted Row Group has a strict lifecycle governed by SQLite's ACID compliance.

stateDiagram-v2  
    [*] --> PENDING: Ingestion (scan)  
    PENDING --> PROCESSING: Worker acquires Exclusive Lock  
    state PROCESSING {  
        [*] --> PyArrow_Read  
        PyArrow_Read --> DuckDB_Execute  
        DuckDB_Execute --> [*]  
    }  
    PROCESSING --> COMPLETED: Success in SQL and Dispatch  
    PROCESSING --> FAILED: OOM, Corruption, or I/O Failure  
    FAILED --> PENDING: Coordinator applies Retry policy  
    COMPLETED --> [*]: Archived / Purged  

Key Components

  • core.py: The framework facade. It manages user sessions and lazy evaluation planning.
  • coordinator.py: The decentralized brain. Manages BEGIN IMMEDIATE transactions in SQLite to prevent read contention.
  • hashing.py: Implementation of Jump Consistent Hash. Guarantees that workers only read their own fragments, transforming concurrent I/O into isolated queues O(1).
  • worker.py: The kinetic execution agent. Operates in a blind repetition loop consuming its assigned buckets.
  • reader.py & sql.py: Bridges to the C++ infrastructure. Bypass Python for large-scale processing using pyarrow and duckdb.
  • publishers.py: Specialized thread pool for handling network bottlenecks (confluent-kafka and boto3).
  • writer.py: Data materialization via Append-Only (Event Sourcing) pattern for intermediate stages.

Backpressure and Out-of-Memory (OOM) Prevention

To shield the cluster from OOM failures caused by network bottlenecks (such as AWS SQS rate limiting) and to decouple ingestion from asynchronous publishing, KineQuet implements a mathematically rigorous BackpressureController. This acts as a guardian for AsyncPublishManager, blocking or regulating the ingestion loop (Worker.step()) through four advanced disciplines:

  1. Queue Theory (M/M/1 Proxy): The publication thread pool is modeled as an M/M/1 proxy. The controller actively monitors the exact number of bytes in the dispatch queue (in_flight_bytes). If the allocated volume exceeds a pre-configured limit (max_memory_bytes, default 50MB), the producer is immediately blocked (check_and_wait). Once sub-threads finish uploads and release memory, the queue resumes flow.

  2. PID Control (Proportional-Integral-Derivative): Instead of blind semaphore locks (which cause violent "on/off" effects), the barrier applies a PID Loop on the error margin (current_memory - memory_limit). The more exceeded the memory limit (Proportional and Integral Errors), the stronger the "brake" (delay) imposed on the Worker. The Derivative factor helps smooth transitions, avoiding overcompensation and maintaining ingestion flow perfectly aligned with the network throughput.

  3. Game Theory (Increasing Logarithmic Cost): To prevent aggressive threads from monopolizing CPU time in the Kernel while attempting to inject data, the principle of Increasing Marginal Cost is utilized. The time penalty imposed upon blocking grows logarithmically (math.log1p(pid_output)). This creates a natural "Nash Equilibrium": trying to flood the queue becomes exponentially more expensive in wait time, forcing Workers to self-regulate to the exact outflow speed of SQS/Kafka.

  4. Number Theory (Jitter Anti-Collision): If dozens of concurrent Workers hit the memory barrier at once, they would pause and attempt to start again at the same instant, causing a stampede effect (Thundering Herd). To disperse this event, the engine applies a deterministic Jitter in wait times, iterating sequentially over an array of prime numbers ([2, 3, 5, 7, 11...]). The use of modular arithmetic with primes perfectly spreads the wake-up cycles in micro-fractional milliseconds that are mathematically non-congruent, evenly distributing the recovery load.


Use Case: Serverless Architecture (AWS Lambda)

KineQuet is designed to excel in ephemeral computing paradigms (Serverless), such as AWS Lambda, Google Cloud Functions, or Azure Functions.
The diagram below demonstrates the behavior of the architecture when the Worker pool is replaced by independent serverless function invocations, operating in a decentralized manner.

graph TD  
    subgraph Serverless Cloud [Ephemeral Cloud Environment]  
        subgraph Shared Storage [Shared Storage]  
            EFS[(AWS EFS / NFS)]  
            DB[(SQLite WAL <br/> coord.db)]  
            EFS --- DB  
        end  
        subgraph Serverless Compute [Lambda Pool]  
            L0[Lambda 0]  
            L1[Lambda 1]  
            LN[Lambda N]  
        end  
        S3[(AWS S3 <br/> Data Lake)]  
        SQS{AWS SQS <br/> Output Queue}  
        L0 -. Atomic Lock .-> DB  
        L1 -. Atomic Lock .-> DB  
        LN -. Atomic Lock .-> DB  
        S3 -- Byte-Range GET <br/> (Only Fragments) --> L0  
        S3 -- Byte-Range GET <br/> (Only Fragments) --> L1  
        S3 -- Byte-Range GET <br/> (Only Fragments) --> LN  
        L0 -- Asynchronous Publishing <br/> with Backpressure --> SQS  
        L1 -- Asynchronous Publishing <br/> with Backpressure --> SQS  
        LN -- Asynchronous Publishing <br/> with Backpressure --> SQS  
    end  
    classDef aws fill:#FF9900,stroke:#232F3E,stroke-width:2px,color:#fff,font-weight:bold;  
    classDef lambda fill:#D35400,stroke:#E67E22,stroke-width:2px,color:#fff;  
    classDef storage fill:#2C3E50,stroke:#34495E,stroke-width:2px,color:#fff;  
    class S3,SQS aws;  
    class L0,L1,LN lambda;  
    class EFS,DB storage;  

Advantages of KineQuet in Serverless:

  1. No "Cold Start" (Zero-JVM): Running a PySpark/JVM cluster inside Lambdas often leads to frequent timeouts due to the weight of the virtual machine. KineQuet operates in pure Python calling C++ compiled binaries (PyArrow and DuckDB), reducing cold start times to mere milliseconds.
  2. S3 Byte-Range = I/O Magic: Lambdas have minuscule disk resources (/tmp). KineQuet does not require downloading the entire file. By leveraging Byte-Range HTTP calls, the function pulls the designated Row Group directly into memory, minimizing bandwidth costs and abstracting the need for disk storage.
  3. Decentralized O(1) Jump Consistent Hash: In a Serverless setup, functions do not communicate with each other. How to divide tasks? By passing the Lambda index (0 to N) as an environment variable, JCH mathematically dictates which buckets and shards it should process. This results in 100% decentralized orchestration.
  4. Financial Protection (Backpressure): Thousands of Lambdas writing to an SQS/API Gateway could trigger rate limiting (HTTP 429), leading to data loss and inflated costs due to retries. The Backpressure Controller acts as a natural aerodynamic brake, balancing ingestion throughput to match the cloud's exact consumption limit.
  5. Shared Coordination via NFS (EFS): The SQLiteCoordinator is stored in a shared elastic volume (like AWS EFS mounted on Lambda). With the natural concurrency of the database in WAL mode, dozens or hundreds of Lambdas can perform atomic locks directly in the file system, eliminating the need and cost of a dedicated relational database (RDS/Aurora).

KineQuet represents a significant advancement in decentralized big data analytics, ensuring high performance while overcoming the limitations imposed by traditional frameworks.

0 comments

No comments yet.

Sign in to be the first to comment.