WhatsUpmessage on telegram
closeChat with us

A version of this page is available in your language.
Would you like to switch?

Индексация блокчейн-данных от RPC-запросов до стриминга

December 14, 2025
Блокчейн
Индексация блокчейн-данных от RPC-запросов до стриминга

Если вы работаете над приложениями, которые напрямую зависят от ончейн-данных, создаете торговый терминал, аналитику, DeFi-примитивы на быстрых сетях, или просто исследуете подходы к обработке и индексации блокчейн-данных, этот пост для вас.

Сегодня мы разберём, как устроены современные решения для индексации блокчейн данных: от базового взаимодействия с нодой до продвинутых инструментов вроде Firehose и Substreams. Поговорим про исторические данные, обработку chain reorg и возможности параллельной обработки, а в конце сравним всё это с Subgraph.

Зачем нужна индексация блокчейн данных

Любая система аналитики, мониторинга или автоматической реакции в блокчейне начинается с одной и той же задачи — извлечения данных из сети.

У нас есть блокчейн, есть ноды, которые обмениваются данными по P2P-протоколу, и есть внешние сервисы, которым нужно понимать, что происходит в сети и как на это реагировать.

Такими сервисами могут быть:

  • аналитические платформы, которые считывают все события и прогоняют их через ML-пайплайны;
  • индексеры, отслеживающие конкретные смарт-контракты и агрегирующие данные по ним;
  • event-driven системы, которые реагируют на определённые события в блокчейне и запускают бизнес-логику;
  • мониторинг и алерты для протоколов, DAO или инфраструктурных компонентов.

Во всех этих случаях базовая проблема одна:
блокчейн — это источник данных, из которого нужно уметь стабильно, масштабируемо и предсказуемо извлекать информацию.

Чтобы понять, как к этому пришли современные индексеры, имеет смысл начать с самого простого подхода и постепенно проследить эволюцию.

В качестве примера дальше будем использовать Ethereum, как наиболее показательный и распространённый кейс.

Базовый подход к индексации блокчейна через RPC и логи

Самая простая форма индексации данных в Ethereum строится вокруг ноды и её RPC-интерфейса.

Схема базового подхода к индексации блокчейна через RPC и логи

Ethereum-нода предоставляет JSON-RPC API, через которое внешний сервис может:

  • читать блоки,
  • получать транзакции,
  • и, что особенно важно, извлекать события (logs).

Что такое logs в Ethereum

Logs — это специальная структура данных, которую разработчики смарт-контрактов специально закладывают в контракт с помощью событий (events).

Фактически это:

  • канал коммуникации между смарт-контрактом и внешним миром;
  • данные, предназначенные не для on-chain логики, а для off-chain сервисов;
  • основной источник информации для индексеров и бэкендов.

Именно logs позволяют:

  • отслеживать вызовы контрактов,
  • реагировать на изменения состояния,
  • строить аналитику без необходимости парсить весь state.

Отправная точка eth_getLogs

Для работы с ивентами Ethereum предоставляет метод eth_getLogs.

Это один из самых популярных и базовых инструментов для построения индексеров, потому что:

  • он поддерживает фильтрацию по адресам контрактов;
  • позволяет фильтровать по сигнатурам событий (topics);
  • даёт возможность задавать диапазон блоков;
  • работает достаточно стабильно для простых сценариев.

В рамках своей задачи eth_getLogs справляется хорошо и долгое время был стандартом де-факто для большинства простых индексеров.

Однако по мере роста требований к данным, объёму и скорости обработки становится очевидно, что одного eth_getLogs недостаточно.

Именно с этого момента и начинается эволюция индексеров, от простых RPC-сканеров к полноценным системам обработки блокчейн данных.

Когда возможностей eth_getLogs не хватает для решения задач

На раннем этапе eth_getLogs выглядит универсальным решением, но довольно быстро появляются сценарии, в которых его возможностей не хватает.

Ограниченность данных по событиям

Ethereum логи содержат только ту информацию, которую разработчики контракта выносят во внешний мир.

Как следствие:

  • в логах отсутствует большая часть метаданных транзакции;
  • из доступного — хэш транзакции и номер блока;
  • таких параметров, как timestamp, from, to, gas usage и других, в логах как правило нет.

Если индексеру нужен, например, timestamp события, приходится:

  1. сначала получить логи;
  2. затем для каждого лога отдельно запрашивать транзакцию или блок;
  3. извлекать нужные метаданные.

На практике это быстро приводит к медленному и плохо масштабируемому пайплайну, где один RPC-запрос порождает десятки или сотни дополнительных.

Именно в этот момент архитектура индексера обычно эволюционирует к более «толстой» модели загрузки данных.

Обработка блоков через eth_getBlockReceipts

Ethereum ноды предоставляют более насыщенный по данным метод — eth_getBlockReceipts.

Схема обработки блоков через eth_getBlockReceipts

Этот метод возвращает подробную информацию обо всех транзакциях в конкретном блоке, включая:

  • calldata — что именно пользователь пытался сделать;
  • receipts и logs — что в итоге произошло;
  • статус транзакций и технические параметры выполнения.

Фактически, это почти полная картина всего, что произошло в рамках одного блока.

По сравнению с eth_getLogs такой подход:

  • устраняет необходимость догружать метаданные для каждой транзакции;
  • упрощает пайплайн обработки данных;
  • делает индексацию более предсказуемой.

Однако у этой модели есть и обратная сторона.

Проблема over-fetching

В отличие от eth_getLogs, eth_getBlockReceipts не поддерживает фильтрацию.

Нельзя сказать ноде:

  • «отдай транзакции только по этому контракту»;
  • «верни только события с определенными сигнатурами»;
  • «проиндексируй только нужные мне изменения».

В результате индексер вынужден:

  • забирать все данные блока;
  • фильтровать и агрегировать их уже на своей стороне;
  • хранить и обрабатывать существенно больший объём информации, чем реально нужен.

Тем не менее, на практике этот подход всё равно часто оказывается лучше, чем каскадный RPC-пайплайн с догрузкой транзакций.

Но даже он не закрывает все потребности в кейсах

Почему данных из eth_getBlockReceipts может быть недостаточно

Хороший пример — отслеживание позиций в Uniswap V3.

Когда происходит swap:

  • лог сообщает, что своп состоялся;
  • но перерасчёт комиссий для провайдеров ликвидности требует обновления всех позиций, находящихся в активном ценовом диапазоне.

Проблема в том, что:

  • события отражают только то, что разработчики решили положить в лог;
  • изменения storage целиком туда не попадают, так как  это слишком дорого по газу;
  • И параметры которые нам нужны для пересчета комиссий, FeeGrowthGlobal0 и FeeGrowthGlobal1, в логах отсутствуют.

Даже если индексер загружает весь блок целиком, этих данных там всё равно нет.

В результате для каждого свопа приходится:

  • делать дополнительный запрос в ноду;
  • читать текущее состояние storage пула;
  • пересчитывать комиссии вручную.

Ограничения масштаба и производительности индексации через eth_getBlockReceipts

Для Ethereum с блоками раз в ~15 секунд такой подход еще может работать:

  • дополнительные RPC-запросы укладываются во временное окно;
  • нагрузка остается управляемой.

Но в более быстрых сетях:

  • в одном блоке могут быть десятки или сотни свопов;
  • количество запросов к ноде растет линейно;
  • latency и стоимость инфраструктуры резко увеличиваются.

В этот момент становится понятно, что индексация на уровне RPC — это тупик для сложных продуктовых сценариев.

Решение этой проблемы существует даже в рамках стандартного Ethereum-деплоя. О нем поговорим дальше.

Индексация данных через Debug API и Trace Block

Ethereum-ноды предоставляют расширенный функционал через Debug API, однако его использование требует отдельной проверки.

Перед тем как делать индексер на основе debug-методов, важно учитывать:

  • не все RPC-провайдеры поддерживают Debug API;
  • даже если поддерживают, это может быть ограничено конкретными сетями;
  • в managed-инфраструктуре (Infura, Alchemy, QuickNode и т.д.) такие методы часто выключены или имеют квоты.

Тем не менее, если Debug API доступен, он открывает принципиально новые возможности.

Полный контекст исполнения транзакций через Trace Block

Индексация блокчейн данных через Debug API и Trace Block

Ключевой метод — trace_block.

В отличие от стандартных RPC-вызовов, trace возвращает расширенную информацию об исполнении транзакций, включая:

  • calldata — что пользователь пытался сделать;
  • logs — какие события были сгенерированы;
  • изменения storage в процессе выполнения транзакции;
  • полное дерево вызовов (transaction execution tree).

Именно наличие state diffs делает trace критически важным для сложных кейсов.

Решение проблемы с изменениями состояний

Вернемся к примеру с Uniswap V3.

Для корректного пересчета комиссий по позициям необходимо знать:

  • как изменились FeeGrowthGlobal0 и FeeGrowthGlobal1;
  • в какой момент времени это произошло;
  • в рамках какой конкретной транзакции.

С использованием trace:

  • мы берём транзакцию со swap;
  • смотрим изменения storage внутри исполнения транзакции;
  • считаем комиссии оффчейн, без дополнительных запросов к ноде;
  • получаем корректный state на момент завершения транзакции, а не на конец блока.

Это устраняет необходимость делать отдельные eth_call или eth_getStorageAt для каждого события и сильно упрощает пайплайн.

Дерево вызовов транзакции

Ещё одно важное отличие trace от стандартных receipts — наличие полного дерева вызовов.

Дерево вызовов транзакции в Индексация блокчейн данных через Debug API

Например:

  • пользователь вызывает multicall-контракт;
  • multicall обращается к роутеру Uniswap;
  • роутер взаимодействует с несколькими пулами;
  • каждый вызов фиксируется в execution tree.

Trace позволяет видеть всю эту структуру как дерево, а не как плоский набор логов.

Это критично для:

  • анализа сложных транзакций;
  • понимания реального execution flow;
  • построения более точной аналитики и мониторинга.

Ограничения индексации onchain данных через поллинг ноды

Даже с использованием trace, мы все еще работаем в рамках pull-модели, напрямую общаясь с нодой. И здесь так же прослеживаются фундаментальные ограничения.

Проблема 1. Отсутствие полноценной push-модели

Идеальная модель для индексера – это подписка на поток данных, начиная с нужного блока или timestamp. Ethereum-ноды такого интерфейса не предоставляют. Да, существуют WebSocket endpoints, но у них есть жесткие ограничения:

  • подписка начинается только с момента подключения;
  • невозможно запросить стрим с определённого блока;
  • при любом реконнекте события теряются.

Для систем, которые индексируют блокчейн целиком, отслеживают платежи или считают финансовые показатели потеря событий недопустима. В результате WebSocket-подписки используются редко, а основная логика всё равно строится вокруг поллинга ноды и повторных запросов.

Проблема 2. Chain reorganization (reorg)
В polling-модели нода не уведомляет клиента о форке, индексер просто запрашивает данные по номеру блока, факт реорга становится известен постфактум, если становится известен вообще.

Это оставляет два варианта:

  1. Работать только с финализированными блоками: просто и надежно, но появляется задержка между событием и его обработкой.
  2. Обрабатывать reorg на клиенте: хранить историю блоков, уметь откатывать состояние, повторно применять транзакции; фактически реализовывать логику консенсуса на стороне индексера. На практике этот путь слишком сложен и почти все продакшн-системы выбирают первый вариант.

Реализация стриминга блокчейн-данных на примере FireHose от The Graph

От этих двух болей, которые мы описали выше появляется следующая реализация — продукт, который эти боли решает: FireHose от The Graph. Давайте разбираться, как устроен этот сервис.

Первое, что нам нужно — нода

Мы не можем просто поднять ноду,  в этом нет смысла, потому что мы перенесем всю polling-модель о которой говорили ранее, и опять всё будет супер медленно. Поэтому нам нужно решать проблему со стримингом.

Реализация стрима блокчейн данных через кастомизацию ноды

Идея такая: мы форкаем ноду и добавляем патч со стримингом, который будем читать своим сервисом. То есть, как только новый блок добавляется в ноду, мы сразу стримим его в pipe, из которого потом наш сервис его прочитает.

  • В случае Ethereum — это кастомный форк, потому что нет официальных гайдов о том, как патчить ноду.
  • В случае Solana — есть Geyser plugin, который помогает модифицировать ноду. Он предоставляет интерфейс с функциями, позволяющими реагировать на события: например, появился новый блок → кладём его в pipe → наш сервис читает этот pipe и обрабатывает данные.

Решаем проблему с историческим стримингом

Обычная имплементация нод не рассчитана на исторический стриминг, то есть на возможность стримить данные с какого-то блока в прошлом.

Обычные ноды ориентированы на максимально эффективное хранилище и используют файловую систему. А это значит, что мы не можем бесконечно стримить нагрузку в storage.

  • Если у нас есть стрим в памяти с текущей даты, и пользователь подписывается на уже существующие данные, то отправка по клиентам идёт просто.
  • Другое дело, когда нужно постоянно ходить в storage за историей, это создает неконтролируемую нагрузку на файловую систему, и нода просто не рассчитана на такой сценарий.

Собственно, поэтому стандартная нода не поддерживает исторический стриминг.

Но мы делаем сервис, который умеет именно это — читать данные с любого момента в прошлом и стримить их клиентам без перегрузки ноды.

Второе, что нам нужно — S3

Мы будем использовать flat files, так как это минимальный юнит, который есть у ноды, и он максимально эффективен для хранения данных.

Использование S3 для хранения исторических блокчейн данных

Во-вторых, S3 — это cloud-native решение. Это значит, что нам как разработчикам не нужно думать о менеджменте инфраструктуры и скейлинге:

  • это serverless-модель, где мы платим только за фактические запросы;
  • Нет зависимости от вендора — у всех провайдеров есть S3-хранилища, зачастую с одинаковым интерфейсом;
  • если понадобится сменить провайдера (например, будет выгоднее хранить данные у другого), переезд проходит безболезненно.

Использование S3 позволяет построить масштабируемый и надежный слой хранения, который легко интегрируется с нашей системой стриминга.

Последнее, что нам нужно имплементировать — API

Нода обычно работает по JSON-RPC — это стандартный HTTP, где данные стримятся как plain text. То есть в том виде, в котором пришли, они и передаются дальше.

Использование gRPC для стриминга real time и исторических блокчейн данных

Мы можем взять что-то более эффективное, например gRPC.

Плюсы gRPC:

  1. Бинарный протокол — данные эффективно упаковываются перед стримингом, что уменьшает нагрузку на сеть и ускоряет передачу.
  2. Language-agnostic — мы описываем схему в едином формате и можем на любом языке или фреймворке сгенерировать клиентский код. Это значит, что не нужно писать библиотеку каждый раз, когда хочется подключить новый язык.

Флоу работы сервиса Firehose

Итак, как будет проходить работа нашего сервиса Firehose:

Схема работы сервиса стриминга onchain данных - Firehose
  1. Поднимаем ноду и модифицируем её.
  2. Стримим данные из ноды в pipe.
  3. Кладём данные в bucket (S3).
  4. Реализуем стриминговый интерфейс для пользователей, через который они будут получать данные.

В этом интерфейсе важно реализовать Joined Block Source — механизм, который автоматически переключается между источниками блоков:

  • Допустим, пользователь хочет стримить блоки с часа назад.
  • Сначала мы понимаем, что это историческая дата, и начинаем читать данные из исторического хранилища (S3).
  • Как только пользователь догоняет текущий block head, интерфейс автоматически переключается на реальный стриминг через pipe из модифицированной ноды.

Возможности для пользователя

  • Cursor — можно указать, с какого момента стримить данные.
  • Chain Agnostic — интерфейс работает с любой сетью. Разница лишь в модификации ноды для реального времени, остальное (история через bucket и API) стандартизировано.
  • Reorgs notification — пользователи получают уведомления о форках.
  • Совмещенные источники данных — пользователю не нужно думать о разделении исторических и реального времени данных.
  • Сложная логика reorg на сервере — ранее мы бы реализовывали её на клиенте. В Firehose вся обработка уже встроена, и пользователю остается только реагировать на уведомления.

Обеспечение доступности данных

Если мы говорим о полной доступности данных, то здесь тоже есть над чем подумать. Схема работы Firehose в этом случае будет выглядеть немного сложнее.

Итак, что можно сделать для повышения availability

  1. Поднять две ноды.
  2. Оставить одну ноду основной, а вторую использовать как запасной источник данных через RPC-провайдера.

Второй источник данных может работает по polling модели, медленнее, но позволит поддерживать непрерывность потока данных в случае проблем с основной нодой.

Архитектура сервиса стриминга блокчейн данных - Firehose

Архитектура reader-компонентов

  • Минимум два reader-сервиса забирают блоки из разных источников и складывают их в bucket.
  • Каждый reader реализует gRPC интерфейс, который стримит бинарные данные с блоками.
  • Push-модель сохраняется: данные сразу стримятся пользователю.

Что делает компонент Firehose

  • Подписывается на «live» источники блоков, чтобы получать актуальную информацию быстрее всех.
  • Выполняет merge и дедупликацию данных — если один блок приходит от обеих нод, компонент выбирает, отуда данные пришли первыми.
  • Если основная нода отваливается, вторая продолжает стримить данные, пусть и медленнее, но пользователи всё равно получают поток без пропусков данных.

Дедупликация на уровне bucket

  • Оба reader-а складывают блоки в bucket.
  • Чтобы избежать повторов данных, нужна дедупликация уже на уровне bucket, так как иначе один и тот же блок может попасть в стрим дважды.
  • Firehose занимается этим автоматически, чтобы пользователи получали чистый и последовательный поток данных.

Добавляем сервис Merger

Чтобы оптимизировать работу с историческими данными и дедупликацию, в архитектуру Firehose добавляется отдельный сервис — Merger.

Merger сервис в архитектуре сервиса стриминга блокчейн данных

Основные задачи Merger

  • Достает блоки из One Blocks bucket
  • Для финализированных блоков можно оптимизировать хранение.
  • Merger делает дедупликацию и создаёт bundles по 100 блоков в Merged Blocks Bucket.
  • Все блоки, которые форкнулись, складываются в отдельное хранилище — Forked Blocks Bucket.

Теперь Firehose взаимодействует с тремя хранилищами:

  1. One Blocks Bucket — сырые данные с ноды.
  2. Merged Blocks Bucket — дедуплицированные и объединённые блоки по 100 штук.
  3. Forked Blocks Bucket — блоки, которые были форкнуты.

Если пользователь запрашивает большой диапазон исторических данных, блоки уже не будут выдаваться поштучно, а пакетами по 100 штук, это ускорит обработку и уменьшит нагрузку на систему.

Подытожим

  • Мы избавились от проблем с polling моделью напрямую из нод.
  • Сделали сервис более надёжным, масштабируемым и удобным для пользователей.
  • Firehose теперь обеспечивает стриминг исторических и real time данных, с дедупликацией и обработкой форков на сервере.

Проблема overfetching и Substreams

Даже в Firehose остаётся одна проблема — Избыточный стрим данных

  • Мы получаем все блоки, но приложениям часто нужны только специфические данные, которых нет в стандартных фильтрах.
  • Стандартные фильтры не закрывают все кейсы и возвращают либо слишком мало, либо слишком много данных.
  • Использовать стандартные методы ноды снова бессмысленно, они не дают полного контроля над выборкой данных.

Кастомная фильтрация данных

Самый эффективный подход — позволить разработчикам писать свои фильтры и получать только те данные, которые нужны для конкретного приложения.

Substreams

Substreams — это движок, который выполняет пользовательский код на сервере для каждого блока. Код пишется как функция, которая на вход принимает блок, а на выход — отфильтрованные данные. Функция компилируется в WebAssembly.

Схема работы модуля фильтрации блокчейн данных

Пример работы:

  • Разработчик пишет функцию, которая принимает блок и извлекает необходимые события, например, Raydium Events
  • Код загружается на сервер, компилируется и выполняется для каждого блока.
  • Firehose стримит отфильтрованные данные, подходящие для конкретного приложения, без необходимости перенастраивать стандартные фильтры.

Архитектурные изменения Firehose с добавлением Substreams

С появлением Substreams архитектура Firehose стала более гибкой. Substreams добавляется как отдельный сервис, аналогичный Firehose, и позволяет разработчикам определять какие данные они хотят получать.

Пользователи загружают функции, которые выполняются для каждого блока. Эти функции могут фильтровать данные по конкретным контрактам, событиям или любым другим критериям. В результате разработчики получают только то, что им нужно, вместо полного потока блокчейн-данных за всё время.

Давайте посмотрим как изменился наш workflow.

Архитектура сервиса индексации onchain данных с модулем для кастомной фильтрации

Relayer и дедупликация

Появление Substreams сделало необходимым вынести логику дедупликации из Firehose в отдельный сервис — Relayer. Ранее Firehose был единственным потребителем данных и сам занимался удалением дубликатов.

С новой схемой Relayer объединяет данные от Firehose и Substreams, обеспечивая:

  • получение блока от ноды, которая первой его застримила;
  • обработку исторических данных из bucket-хранилищ;
  • очистку от дубликатов до того, как данные попадут к пользователям.

Масштабирование Substreams

Теперь давайте разберём, как Substreams масштабируется.

Масштабирование Substreams - сервис для фильтрации блокчейн данных в стриме

Сервис разделён на два уровня: Front Tier и Worker Tier.

Когда пользователь отправляет запрос на обработку диапазона блоков, например с 10000 по 14999 (5000 блоков), Front Tier принимает этот запрос и распределяет задачи между воркерами.

Каждый воркер может одновременно обрабатывать несколько ренжей (до 16). Через прокси запросы распределяются между воркерами пакетами по примерно 1000 блоков. Как только воркер заканчивает работу, он складывает данные в отдельный bucket с кешем. Об этом bucket мы поговорим позже, когда будем рассматривать пакетирование данных.

То есть мы собираем данные в bucket, а обратно в Front Tier стримим не сами данные, а прогресс. Это нужно, чтобы понимать, когда ренж завершён и можно начать стримить данные дальше, или чтобы знать, если где-то в функции пользователя произошёл revert, потому что не факт, что пользователь написал функцию правильно.

Front Tier ждёт, пока обработается первый ренж из запрошенных, стримит его обратно пользователю, затем проверяет следующий. Если ренж готов — отправляет его, если нет — ждёт завершения обработки. Так процесс идёт до последнего ренжа, сохраняя порядок блоков.

Как устроены функции в Substreams

Теперь давайте поговорим, как устроены функции, которые можно загружать в Substreams, и как их можно оптимизировать с помощью кеша.

Самое простое, что приходит на ум, — использовать кеш. Рассмотрим пример:

Оптимизация кастомных фильтров данных с помощью кеш модулей

Есть модуль, который кто-то уже написал до нас. Он принимает на вход блоки из Merged Blocks Bucket и извлекает из них все события Uniswap V3 в рамках одного блока. Модуль ничего с этими данными не делает, просто отбирает нужные события. На выходе мы получаем меньше информации, чем было в исходном блоке — только события Uniswap V3.

Далее наш сервис сохраняет эти данные в Substreams Store Bucket.

Теперь в нашей функции можно указать, что входные данные — это не сами блоки, а output этого модуля (Uniswap V3 events), и подключить к запросу соответствующий модуль. Сервер понимает, что данные уже доступны в кеше, и не тратит ресурсы на повторную обработку блока. Он просто берёт отфильтрованные данные из Substreams Store.

Если учесть биллинг, где пользователь платит за объем данных, которые он получает, такой подход упрощает работу разработчика и снижает стоимость обработки, так как можно использовать уже подготовленные данные из кеша вместо повторного вычисления.

Index-модули в Substreams

Следующий уровень оптимизации — Index-модули.

Отличие Index-модулей от обычных модулей в том, что у них стандартизированный формат вывода. На выходе для каждого блока формируется массив ключей, с помощью которых можно быстро понять, есть ли интересующая информация в блоке.

Index-модули в Substreams

Например, индекс-модуль принимает на вход raw blocks и создаёт для каждого блока индекс с такими данными, как:

  • с какими контрактами взаимодействовали пользователи;
  • какие топики логов присутствуют в блоке.

Далее можно написать модуль, например Filtered Transactions, который использует этот индекс. В манифесте указываем, что модуль использует индекс и передаем его бинарный файл. Также нужно описать фильтр, по которому работает индекс. Например, мы хотим получить только транзакции по Radiym.

Сервер достаёт индексы из кеша и определяет, в каких блоках есть транзакции с Radiym. На вход модуля Filtered Transactions попадают только эти блоки, что значительно экономит ресурсы.

Интересный момент: если кто-то уже обрабатывал эти блоки и создавал фильтр по Radiym, то, скорее всего, данные уже есть в кешe Substreams. В этом случае пользователь может пропустить этап прогона по индексу и сразу получить результат из Filtered Transactions module, который уже содержит отфильтрованные данные.

Как стримить onchain данные напрямую в БД?

На этом этапе нужна система, которая всё, что мы собрали, корректно уложит в базу данных.

Для этого используется SQL Sink, разработанный The Graph, исходный код доступен в открытом доступе и который можно поднять у себя. Сервис подключается к Substreams server и потребляет данные.

Архитектура сервиса интеграции стримов блокчейн данных в базу данных

SQL Sink ожидает, что модули Substreams будут отправлять данные в определённом формате, чтобы можно было однозначно определить, как их разместить в базе данных. Формат описывает, что делать с блокчейн-данными: insert, upsert, update или delete по конкретным primary keys с соответствующими данными.

При этом остаётся параллельная обработка данных на уровне Substreams module. На выходе получаем массив инструкций, который сообщает, какие операции нужно выполнить в базе. Пользователю остается реализовать только модули, которые реализуют этот формат.

Обработка инструкций и Chain Reorganization

Дальше происходит следующее:

  1. Исполнение команд — insert, upsert, update, delete. Здесь ничего сложного: данные раскидываются по таблицам так, как это указано в вашем модуле.
  2. Обработка chain reorgs. Для каждой выполненной операции создается отдельная таблица — History table, где логируются все действия: insert, upsert, update, delete.

Это необходимо для того, чтобы в случае chain реорга и получения сигнала о последнем валидном блоке, система могла корректно откатить изменения и синхронизироваться с новой цепочкой.

На схеме ниже показан кейс того, как это будет обрабатываться.

как обрабатывать реорги в базе данных

Обработка chain reorg и расширение SQL Sink

В случае реорга сервис обращается к History table, получает все операции в диапазоне «невалидных» блоков и откатывает их.

Суть в том, что реализация SQL Sink достаточно гибкая. Текущая версия ограничена базовыми операциями (insert, upsert, update, delete), но при необходимости её можно форкнуть и добавить новые операции. Например, в решении для Blum возникла необходимость добавить операции типа increment, которые не укладывались в стандартный набор операций. Подробнее об этом можно почитать в нашем кейс-стади.

Итак, что можно сделать: форкнуть SQL Sink, написать модули, которые помимо базовых операций возвращают дополнительные, такие как increment, и реализовать handler, который трансформирует новую операцию в SQL-запрос.

Ещё один уровень гибкости: не обязательно использовать SQL Sink. Пользователь может написать свой sink и сам решать, как обрабатывать и куда укладывать данные. Сервис предоставляет стримы и параллельную обработку данных, а дальше пользователь выбирает: SQL — есть готовый SQL Sink, нужна нестандартная операция — форкаем и расширяем SQL Sink, нужен другой тип хранения — можно написать sink с нуля.

Сравнение с Subgraph

Наконец, давайте разберём, как всё это укладывается в сравнении с Subgraph.

Архитектура сабграфа для индексации блокчейн данных

Subgraph — это готовая «коробка», в которую пользователи один раз загружают свой код, скомпилированный в WebAssembly. В этом коде пользователь описывает все сценарии работы: как реагировать на событие, как обрабатывать транзакцию и так далее.

При этом Subgraph не имеет собственного хранилища с блоками. Каждый раз, когда нужно обработать какой-то диапазон блоков, Subgraph обращается к ноде.

Плюсы такого подхода:

  • Не нужно хостить всю сложную архитектуру, о которой мы говорили выше.
  • Можно быстро поднять систему у себя.

Минусы:

  • Нет параллелизации данных — все блоки нужно синхронизировать с нуля.
  • Подходит для медленных сетей типа Ethereum, но неэффективен для быстрых сетей, таких как Solana.

Заключение

Мы прошли путь от классического поллинга нод до реализации Firehose с Substreams и SQL Sink. Такой подход позволяет:

  • получать поток блокчейн-данных с возможностью подписки на нужные диапазоны;
  • уменьшить избыточную загрузку данных и дать разработчикам возможность фильтровать их под свои задачи;
  • параллельно обрабатывать данные и корректно укладывать их в базу с учётом chain reorg.

Использование Firehose и Substreams даёт системное решение проблем, с которыми сталкиваются классические индексеры, и делает работу с данными надёжной и предсказуемой, даже для быстрых сетей вроде Solana.

О Rock’n’Block

Rock’n’Block — это студия разработки Web3-продуктов, создающая production-grade инфраструктуру блокчейн-сервисов и децентрализованные приложения. Мы помогаем командам быстрее запускать высокопроизводительные продукты, строить UX для миллионов пользователей и выделяться на конкурентных рынках.

Чем занимаемся

  • Создаём кастомные Layer-1 сети, DeFi-протоколы и децентрализованные биржи.
  • Разрабатываем perpetual DEX с ордербуками, funding rates, ликвидациями и интеграцией ораклов.
  • Создаём решения для стриминга блокчейн-данных, обрабатывающие исторические и real-time данные.
  • Внедряем независимые платёжные решения и финансовые инструменты.

Наша команда имеет почти десятилетний опыт блокчейн разработки, поддерживая продукты для более чем 71 млн пользователей DeFi, с суммарной капитализацией систем $2,5 млрд.

Следите за нами в X, чтобы не пропустить новые глубокие обзоры по блокчейн-разработке. Подписаться


Have an Idea?
Let's chat!

Get free consultation

message on whatsupmessage on telegramcall button
This site is protected by reCAPTCHA and the Privacy Policyand Terms of Service apply.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Let’s Talk

Awards