Giga, TPL_Dataflow
https://giga.chat/link/gcsccMtaQt
.Net Core Tpl_Dataflow - краткий обзор возможностей
Tpl.Dataflow (пространство имён
System.Threading.Tasks.Dataflow) — это библиотека в .NET, предназначенная для упрощения разработки конвейерной обработки данных и многопоточных приложений. Она построена на основе Task Parallel Library (TPL) и позволяет структурировать сложные асинхронные потоки данных с помощью готовых блоков.Вместо того чтобы вручную управлять потоками, очередями и синхронизацией, разработчик собирает логику приложения из стандартных «строительных блоков», соединяя их между собой.
Основные возможности
- Конвейерная обработка (Pipeline)Это главная концепция библиотеки. Вы можете выстроить цепочку операций, где данные последовательно проходят через несколько этапов обработки. Например: чтение файла → парсинг строк → фильтрация → сохранение в базу данных. Каждый этап может выполняться параллельно со следующим, что обеспечивает высокую пропускную способность.
- Стандартные блоки (Dataflow Blocks)Вся функциональность строится вокруг нескольких ключевых интерфейсов и классов:
- ISourceBlock<TOutput> и ITargetBlock<TInput>: базовые интерфейсы для блоков-источников и блоков-потребителей данных.
- IPropagatorBlock<TInput, TOutput>: объединяет оба интерфейса, представляя собой блок-трансформатор.
Наиболее часто используемые реализации:
- BufferBlock<T>: простой буфер, который хранит сообщения. Может быть как источником, так и целью.
- TransformBlock<TInput, TOutput>: принимает элемент типа
TInput, обрабатывает его (например, преобразует) и выдаёт результат типаTOutput.Пример: TransformBlock<string, int>(s => s.Length)
- ActionBlock<T>: принимает элемент и выполняет над ним действие (обычно "пожар-и-забудь", fire-and-forget).
Пример: ActionBlock<int>(i => Console.WriteLine(i))
- BroadcastBlock<T>: получает сообщение и рассылает его копии всем подключенным потребителям.
- JoinBlock<>: ожидает поступления сообщений во все свои целевые порты, а затем объединяет их в кортеж (
Tuple) или заданный тип и передаёт дальше.- BatchBlock<T>: накапливает указанное количество сообщений и отправляет их далее единым массивом (
List/Array). Полезен для пакетных операций, например, массовой вставки в БД.- Управление параллелизмомБиблиотека предоставляет мощные инструменты для контроля степени параллелизма:
- MaxDegreeOfParallelism: можно указать, сколько задач может одновременно обрабатывать один блок. Значение
DataflowBlockOptions.Unbounded(по умолчанию) означает отсутствие ограничений.- BoundedCapacity: задаёт максимальную ёмкость блока (количество ожидающих сообщений). Если лимит достигнут, новые сообщения не принимаются до тех пор, пока в блоке не освободится место. Это позволяет реализовать обратное давление (backpressure) и защитить систему от перегрузки.
- Асинхронность по умолчаниюВсе методы отправки и получения данных являются асинхронными (возвращают
Task), что идеально вписывается в современную модель асинхронного программирования в .NET (async/await) и не блокирует потоки.- Связывание блоков (LinkTo)Блоки соединяются методом
LinkTo. Можно создавать сложные топологии: ветвления, слияния потоков, циклы. При связывании можно использовать фильтры (predicate), чтобы передавать только определённые сообщения.- Обработка ошибок и завершение работыУ каждого блока есть статус завершения (
Completion— задачаTask). Если в одном из блоков произошла ошибка, он помечается как завершённый с ошибкой. Эту задачу можноawait'ить у последующих блоков в конвейере, и они автоматически распространят ошибку дальше по цепочке. Это значительно упрощает обработку исключений в сложных потоках данных.Пример простого конвейера
Допустим, нужно прочитать строки из файла, посчитать длину каждой строки и вывести результат.
Итог:
Tpl.Dataflow— это мощный инструмент для создания отказоустойчивых, масштабируемых и легко поддерживаемых систем обработки данных, которые эффективно используют ресурсы системы благодаря встроенному управлению параллелизмом и асинхронности.
Давайте начнем с простого. Если эта задача НЕ простая сообщи - мы упростим задание..NetCore, Worker проект, Backgroundservcie, очень простой EventВus на три канала,Три параллельных потока Tpl.Dataflow, исполнение Handlers EventBus в Параллельных потоках Tpl.Dataflow.У нас есть .NetCore, Worker проект, BAckgroundservice.В контексте BAckgroundservice находится шина EventВus с тремя events.Шина EventВus работает с тремя парами кеу, Handler.Обычно шины EventВus последоввательно обрабытывают приходящие Event. Мы тоже будем подавать на вход Events. EventBus будет обрабатывать эти events, но НЕ будут обрабатывать (запускать) HAndlers в контексе EventBus. Все эти найденные по ключу HAndlers будут перенаправляться в параллельные потоки Tpl.Dataflow и уже в контексте потока Tpl.Dataflow исполняться. На каждый Handlers необходимо создать один поток Tpl.Dataflow.Таким образом вместо последовательного исполнения Handlers в контекте EventBus, мы исполняем наши HAndlers в контексте потоков Tpl.Dataflow ПАРАЛЛЕЛЬНО. У нас три events в EventsBus, значит должно быть три параллельных потока Tpl.Dataflow, в которых исполняются ПАРАЛЛЕЛЬНО три наших Handlers.Надеюсь понятно написал.
Да, задача абсолютно понятна. Это классический паттерн для использования
Tpl.Dataflow: у вас есть источник событий (EventBus) и несколько независимых обработчиков, которые должны работать параллельно и не блокировать друг друга.Вместо того чтобы вызывать
handler.Handle(event)прямо в методеPublish(что привело бы к последовательному выполнению), мы будем отправлять задачи на обработку в специальные блокиActionBlock. Каждый такой блок работает в своём собственном потоке из пула задач.Вот пошаговый план реализации и готовый код.
Архитектура решения
IEventHandler<TEvent>: Интерфейс для обработчиков. Он определяет методHandle, который будет содержать бизнес-логику.EventBus: Центральный класс. Он хранит словарь, где ключом является тип события, а значением — список его обработчиков.
- Метод
Subscribe<TEvent>позволяет регистрировать обработчики.- Метод
Publish<TEvent>принимает событие, находит все его подписчики и отправляет задачу на выполнение в соответствующий блокTpl.Dataflow.- Блоки
Tpl.Dataflow(ActionBlock): В конструктореEventBusсоздаются три блока (по одному на каждый тип события). Каждый блок настроен на параллельную обработку (MaxDegreeOfParallelism > 1), что позволяет ему выполнять несколько задач одновременно, если они поступают быстрее, чем обрабатываются.Worker(BackgroundService): В методеExecuteAsyncимитируется приход событий. В реальном приложении события могли бы поступать из очереди (RabbitMQ, Kafka), по HTTP или из других источников.Реализация
Создайте новый .NET Worker проект (
dotnet new worker) и замените содержимое файлов следующим кодом.1. Интерфейс обработчика (IEventHandler.cs)
2. Классы событий (Events.cs)
Для примера создадим три пустых класса-события.
3. Классы-обработчики (Handlers.cs)
Создадим по одному простому обработчику для каждого события. В реальном коде здесь будет ваша логика.
4. Шина событий (EventBus.cs)
Это ключевой компонент, который связывает события с блоками
Tpl.Dataflow.5. Класс Worker (Worker.cs)
Здесь мы настраиваем зависимости и запускаем симуляцию событий.
Важное замечание по
Worker: В реальном сервисе вам не нужно будет вызыватьawait _eventBus.Publish(...)вручную. Вместо этого вы будете получать события из внешнего источника (например, из очереди сообщений) и передавать их в шину. Цикл сTask.Delayздесь нужен только для демонстрации работы.Как это работает?
- При запуске
Workerсоздает и регистрирует три обработчика. При регистрации каждого из них вEventBusсоздается соответствующий емуActionBlock.- В цикле
ExecuteAsyncмы "публикуем" события одно за другим.- Метод
Publishнаходит нужныйActionBlockи отправляет в него задачу на выполнение методаHandle.- Блок
ActionBlockзабирает эту задачу и исполняет её в потоке из пула задач. Поскольку у нас три разных блока для трех типов событий, их обработка происходит параллельно.- Если вы опубликуете два
FirstEventподряд очень быстро, вы увидите, что их обработка может происходить параллельно внутри одного блока, так как мы установилиMaxDegreeOfParallelism = Unbounded.- Главный поток
Workerне ждет завершения обработки событий (await actionBlock.SendAsync(...)не ждетawait handler.Handle(...)), а сразу продолжает работу. Это именно то поведение, которое вы хотели достичь.
Комментариев нет:
Отправить комментарий