База по шардированию базы
Возможность горизонтального масштабирования это одно из важнейших нефункциональных требований индустрии в последнее время. Рост бизнеса со стороны IT выглядит чаще всего как рост нагрузки и цены отказа системы. Нам всем хочется создавать такие приложения, которые будут одинаково быстро и стабильно работать как с сотней, так и с сотней тысяч клиентов. Для этого необходимо еще на стадии проектирования закладывать потенциал для масштабирования, одним из способов которого является шардирование.
Мы на пальцах рассмотрим что такое шардирование, как оно помогает в масштабировании и даже рассмотрим тот самый этап «роста».
О чём речь?
Шардинг (или шардирование) — это разделение хранилища на несколько независимых частей, шардов (от англ. shard — осколок). Не путайте шардирование с репликацией, в случае которой выделенные экземпляры базы данных являются не составными частями общего хранилища, а копиями друг друга.
Шардирование помогает оптимизировать хранение данных приложения за счёт их распределения между инсталляциями БД (которые находятся на разных железках), что улучшает отзывчивость сервиса, так как размер данных в целом на каждом инстансе станет меньше.
Шардирование — это разновидность партиционирования (от англ. partition — деление, раздел). Отличие в том, что партиционирование подразумевает разделение данных внутри одной БД, а шардирование распределяет их по разным экземплярам БД.
Способы шардирования
Осуществить шардирование можно несколькими способами:
- Средствами БД. Некоторые базы — MongoDB, Elasticsearch, ClickHouse и другие — умеют самостоятельно распределять данные между своими экземплярами, для этого достаточно настроить конфигурацию. На мой взгляд, это лучший вариант.
- Надстройками к БД. Самый спорный способ — применение надстроек, которые выполняют шардирование, например Vitess или Citus, поскольку при этом есть риск потери данных и производительности.
- Клиентскими средствами. В этом случае экземпляры БД даже не подозревают о существовании друг друга, шардированием управляет стороннее приложение — со всеми вытекающими рисками.
Методы работы в этих способах схожи: мы выбираем ключ для распределения данных (это может быть идентификатор, временная метка или хеш записи) и в соответствии с ним записываем информацию в нужный шард. Как правило, ключи стараются выбирать так, чтобы данные были равномерно распределены по шардам. Сделать это не сложно — достаточно ориентироваться на текущее содержимое БД.
Важно учитывать для чего вы делаете шардирование. В случае если требуется распределить нагрузку на запись, необходимо подобрать такой ключ, который обеспечит равномерное распределение запросов между инстансами. Нельзя забывать и о «горячих» данных, запросы к которым происходят чаще, из-за чего нагрузка на шарды оказывается неравномерной. Для этого можно добавить в приложение метрику, показывающую, сколько раз в какой шард будут попадать данные по конкретному ключу.
Пример шардирования
Давайте в качестве примера сделаем клиентское шардирование горячо любимой в Ozon PostgreSQL. Приложение будет на Go, а мигрировать будем с помощью Goose. Для начала нам надо добавить сами шарды, то есть развернуть еще одну инсталляцию БД. Отвлекаться на детальный разбор того, как правильно раскатывать PostgreSQL, мы не будем.
Добавим в наш Storage маппинг шардов:
// Обозначим количество шардов. const bucketQuantity = 2 const ( Shard1 ShardNum = iota Shard2 ) // Для лучшей семантики. type ShardNum int type shardMap map[ShardNum]*sqlx.DB type Storage struct { shardMap shardMap }
Напишем конструктор для Storage который возьмёт на себя все задачи по инициализации соединений с БД.
func initShardMap(ctx context.Context, dsns map[ShardNum]string) shardMap { m := make(shardMap, len(dsns)) for sh, dsn := range dsns { m[sh] = discoveryShard(ctx, dsn) } return m } func discoveryShard(ctx context.Context, dsn string) *sqlx.DB { db, err := sqlx.ConnectContext(ctx, "postgres", dsn) if err != nil { panic(err) } return db } func NewStorage(ctx context.Context, dsns map[ShardNum]string) *Storage { return &Storage{ shardMap: initShardMap(ctx, dsns), } }
Переходим к работе с данными. Реализуем методы для их записи в шарды и чтения оттуда. Начинается всё с определения того, в какой шард идти.
При условии равномерности распределения наших ID (представим, что это действительно так) нам хватит классического остатка от деления. Выглядеть это будет примерно так:
func (s *Storage) shardByItemID(itemID int64) ShardNum { return ShardNum(itemID % bucketQuantity) }
У нас есть вот такой незаурядный метод чтения из БД. Тут стоит обратить внимание на то, что мы выполняем запрос на инстансе БД из нашего маппинга, а получаем инстанс (*sqlx.DB) по идентификатору шарда из сигнатуры.
func (s *Storage) getItemsByID(ctx context.Context, shard *sqlx.DB, itemsIDs []int64) ([]models.Item, error) { items := make([]models.Item, 0) query, args, err := sq. Select(itemsTableFields...). From(itemsTable). Where(sq.Eq{itemIDField: itemsIDs}). PlaceholderFormat(sq.Dollar). ToSql() if err != nil { err = errors.Wrap(err, "[create query]") return items, err } err = shard.SelectContext(ctx, &items, query, args...) return items, err }
Сам идентификатор шарда мы получаем чуть выше, когда распределяем наши ItemIDs по кубышкам. Само распределение выглядит вот так:
func (s *Storage) sortItemsIDsByShard(itemIDs ...int64) map[ShardNum][]int64 { shardToItems := make(map[ShardNum][]int64) for _, id := range itemIDs { shardID := s.shardByItemID(id) if _, ok := shardToItems[shardID]; !ok { shardToItems[shardID] = make([]int64, 0) } shardToItems[shardID] = append(shardToItems[shardID], id) } return shardToItems }
Ну и инфраструктурная обёрточка — чтобы запросы выполнялись параллельно. Вот так будет выглядеть публичный метод получения Item. Кажется, что он довольно большой, но в действительности большую часть метода съедают раскручивания каналов.
func (s *Storage) GetItems(ctx context.Context, itemIDs ...int64) ([]models.Item, error) { shardToItems := s.sortItemsIDsByShard(itemIDs...) respChan := make(chan []models.Item, len(shardToItems)) errChan := make(chan error, len(shardToItems)) wg := &sync.WaitGroup{} for shardID, ids := range shardToItems { wg.Add(1) shard := s.shardMap[shardID] go s.asyncGetItemsByID(ctx, shard, ids, wg, respChan, errChan) } wg.Wait() close(respChan) close(errChan) result := make([]models.Item, 0) for items := range respChan { result = append(result, items...) } errs := make([]error, 0, len(errChan)) for e := range errChan { errs = append(errs, e) } err := multierr.Combine(errs...) return result, err }
Для того чтобы не терять смысл getItemsByID за нагромождением каналов и Wait-групп, мы просто обернем всё это в asyncGetItemsByID:
unc (s *Storage) asyncGetItemsByID( ctx context.Context, shard *sqlx.DB, itemsIDs []int64, wg *sync.WaitGroup, resp chan<- []models.Item, errs chan<- error, ) { defer wg.Done() items, err := s.getItemsByID(ctx, shard, itemsIDs) if err != nil { errs <- errors.Wrapf(err, "[getItemsByID] can't select from shard %d", shard) } resp <- items }
Всё то же самое мы проделываем для записи данных в шарды:
Ну и скриптик для миграции всего этого дела:
#!/usr/bin/env bash export MIGRATION_DIR=./migrations/ if [ "${STAGE}" = "production" ]; then if [ "$1" = "--dryrun" ]; then goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" status goose -dir ${MIGRATION_DIR} postgres "user=${USER2} password=${PASSWORD2} dbname=${DBNAME2} host=${HOST2} port=${PORT2} sslmode=disable" status else goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" up goose -dir ${MIGRATION_DIR} postgres "user=${USER2} password=${PASSWORD2} dbname=${DBNAME2} host=${HOST2} port=${PORT2} sslmode=disable" up fi elif [ "${STAGE}" = "staging" ]; then if [ "$1" = "--dryrun" ]; then goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" status else goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" up fi elif [ "${STAGE}" = "development" ]; then exit 0 fi
Очень удобно шардировать в приложении, где еще нет данных, а следовательно нет необходимости их перетаскивать. Но что делать, если мы шардим рабочее приложение? Тут, как говорится у нас на Руси, case-by-case.
- Приложение можно остановить? Прекрасно! Останавливаем разбор очередей, отключаем обработку запросов или вовсе останавливаем приложение, делаем резервную копию базы, если ещё не сделали, перетаскиваем данные в соответствии с выбранным ключом и снова вводим приложение в эксплуатацию, предварительно проведя регрессионное тестирование.
- Приложение должно отвечать на запросы? Тогда делаем временную надстройку внутри репозитория. Пишем данные всегда в новые шарды. Читаем сначала из нового шарда; если там нет нужных данных, то обращаемся к старым шардам. Если данные оказываются в старом шарде, то при желании можно их переносить в новый — и в конце концов данные перераспределятся между шардами. Только не забудьте добавить метрики, чтобы не пропустить этот знаменательный момент. И обязательно проверьте, все ли данные переехали или только те, к которым идут запросы. Да, и на первых порах это не то чтобы положительно скажется на отзывчивости приложения, будьте к этому готовы.
- Если приложение пишет данные в БД только по событиям из условной kafka, а синхронные запросы (REST/GRPC) только читающие (классическая ситуация для event sourcing), то мы отключаем чтение из kafka, выкатываем в prod инстанс версии приложения, которое уже живет с новой схемой шардов, но синхронные запросы шлем только на инстанс приложения предыдущей версии (оно же canary-deploy). Далее джоба внутри приложения последовательно читает данные по старому маппингу, и пишет в новый, после переноса можно сразу же и удалять данные в старой схеме.
Можно просто скопировать данные в другой шард, а потом просто удалить их из источника, но на практике я такого не встречал.
Для полноты картины разберём вариант решардинга в условиях, когда нам не хотелось бы останавливать сервис. Писать данные будем только в новый маппинг шардов, а вот читать их будем сразу из старого и нового.
Начинается всё с создания дублирующей схемы шардов внутри Storage. Внесём изменения в константы:
const legacyBucketQuantity = 2 const bucketQuantity = 3 const ( Shard1 ShardNum = iota Shard2 Shard3 )
Заведём внутри Storage shardMapLegacy, который содержит дорешардинговый маппинг:
type Storage struct { shardMapLegacy shardMap shardMap shardMap }
Ну и инициализация. В конструктор Storage теперь будем передавать также две схемы:
func NewStorage(ctx context.Context, dsns map[ShardNum]string, dsnsLegacy map[ShardNum]string) *Storage { return &Storage{ shardMap: initShardMap(ctx, dsns), shardMapLegacy: initShardMap(ctx, dsnsLegacy), } }
Заводим метод для получения shardID, чтобы после переноса данных его удалить:
func (s *Storage) legacyShardByItemID(itemID int64) ShardNum { return ShardNum(itemID % legacyBucketQuantity) }
Ну и ещё чуть-чуть дублирования кода. Речь о практически полной копии sortItemsIDsByShard; разница лишь в том, что для получения идентификатора шарда мы используем ранее модифицированную функцию.
func (s *Storage) sortItemsIDsByLegacyShard(itemIDs ...int64) map[ShardNum][]int64 { shardToItems := make(map[ShardNum][]int64) for _, id := range itemIDs { shardID := s.legacyShardByItemID(id) if _, ok := shardToItems[shardID]; !ok { shardToItems[shardID] = make([]int64, 0) } shardToItems[shardID] = append(shardToItems[shardID], id) } return shardToItems }
Метод добавления Items в изменении не нуждается, так как мы условились, что данные пишем всегда в свежие шарды, а вот GetItems надо подправить. Теперь он будет конкурентно выполнять запрос сразу в две схемы, а полученные данных мы будем склеивать, отдавая предпочтение данным с актуального маппинга шардов.
func (s *Storage) GetItems(ctx context.Context, itemIDs ...int64) ([]models.Item, error) { wg := &sync.WaitGroup{} resultLegacy := make([]models.Item, 0) resultActual := make([]models.Item, 0) var err error wg.Add(1) go func() { defer wg.Done() res, e := s.getItems(ctx, itemIDs...) err = multierr.Append(err, e) resultActual = res }() wg.Add(1) go func() { defer wg.Done() res, e := s.getItemsFromLegacyShardMap(ctx, itemIDs...) err = multierr.Append(err, e) resultLegacy = res }() wg.Wait() result := mergeItems(resultActual, resultLegacy) return result, err }
Склейка результата выглядит так:
func mergeItems(items, legacyItems []models.Item) []models.Item { itemsMap := make(map[models.Item]struct{}) for _, item := range legacyItems { itemsMap[item] = struct{}{} } for _, item := range items { itemsMap[item] = struct{}{} } mergedItems := make([]models.Item, 0, len(itemsMap)) for item, _ := range itemsMap { mergedItems = append(mergedItems, item) } return mergedItems }
Опционально можно добавить метрику для отслеживания частоты запросов по старому маппингу, которая будет сигнализировать нам о том, что данные перетащились и можно отключать дублирующий легаси-флоу.
При таком подходе остаётся только один вопрос: что делать с «мёртвыми» данными, которые лежат не в своих шардах после решардинга?
Вариант в лоб: предположим, что мы зарешардились с двух до четырёх шардов, идём — и на каждом легаси-шарде выполняем запрос на удаление записей, где полученный для ID ключ шардирования не соответствует текущему шарду:
DELETE FROM items WHERE ctid IN ( SELECT ctid FROM items WHERE id % 4 NOT IN (2, 2-4) );
З.Ы. Вариант SQL-запроса предполагает, что мы храним гошный UInt64 в постгревом BigInt. В этом случае положительные гошные числа могут превратиться в отрицательные постгревые, поэтому делаем NOT IN для ренджа.
Иногда встречаются системы, где данные имеют свойства «протухать». В таких системах самое логичное оставить данные после решардинга, и дождаться пока они «протухнут».
И пара слов об упячках, с которыми я сталкивался, — о партиционировании внутри одной БД и шардинге целыми партициями. Поначалу кажется, что это логично и даже элегантно. Ведь для решардинга достаточно просто перетащить целую партицию с одного шарда в другой. И это ФАТАЛЬНАЯ ОШИБКА. Со временем вы устанете от трёхэтажного мата негодования, вызванного пятиэтажными пакетными запросами, из-за которых горячие данные не будут нормально попадать в кеш. Такой способ работает лишь в том случае, если партиционирование выполняется по дате, но запросы, как правило, обращаются к свежим или старым данным, как, например, во многих OLAP-системах. В остальных случаях перспективнее держать данные в рамках одной партиции, а решардить их путём постепенного переноса, если, конечно, БД не предусматривает своих вариантов решения проблемы решардинга.
Вместо вывода
Рано или поздно вам придётся заняться решардингом. А потом ещё раз, и ещё, и ещё, особенно, если бизнес будет расти и данных будет становиться всё больше. Поэтому приберегите инструменты, которые вам помогли однажды, и ничего страшного, если запускать вы их будете раз в полгода.
Клиентское шардирование — надо.
Ключ шардирования выбираем с умом, предварительно медитируем над метриками, чтобы чётко видеть картину того, как данные пишутся, запрашиваются и хранятся.
Не так страшно шардирование, как решардинг. Важно заранее подумать о том, как вы будете решать вопросы консистентности при решардинге.