четверг, 11 июня 2026 г.

Giga, TPL_Dataflow

https://giga.chat/link/gcsccMtaQt

.Net Core Tpl_Dataflow - краткий обзор возможностей

Ответ

Tpl.Dataflow (пространство имён System.Threading.Tasks.Dataflow) — это библиотека в .NET, предназначенная для упрощения разработки конвейерной обработки данных и многопоточных приложений. Она построена на основе Task Parallel Library (TPL) и позволяет структурировать сложные асинхронные потоки данных с помощью готовых блоков.

Вместо того чтобы вручную управлять потоками, очередями и синхронизацией, разработчик собирает логику приложения из стандартных «строительных блоков», соединяя их между собой.

Основные возможности

  1. Конвейерная обработка (Pipeline)Это главная концепция библиотеки. Вы можете выстроить цепочку операций, где данные последовательно проходят через несколько этапов обработки. Например: чтение файла → парсинг строк → фильтрация → сохранение в базу данных. Каждый этап может выполняться параллельно со следующим, что обеспечивает высокую пропускную способность.
  2. Стандартные блоки (Dataflow Blocks)Вся функциональность строится вокруг нескольких ключевых интерфейсов и классов:
    • ISourceBlock<TOutput> и ITargetBlock<TInput>: базовые интерфейсы для блоков-источников и блоков-потребителей данных.
    • IPropagatorBlock<TInput, TOutput>: объединяет оба интерфейса, представляя собой блок-трансформатор.

    Наиболее часто используемые реализации:

    • BufferBlock<T>: простой буфер, который хранит сообщения. Может быть как источником, так и целью.
    • TransformBlock<TInput, TOutput>: принимает элемент типа TInput, обрабатывает его (например, преобразует) и выдаёт результат типа TOutput.

      Пример: TransformBlock<string, int>(s => s.Length)

    • ActionBlock<T>: принимает элемент и выполняет над ним действие (обычно "пожар-и-забудь", fire-and-forget).

      Пример: ActionBlock<int>(i => Console.WriteLine(i))

    • BroadcastBlock<T>: получает сообщение и рассылает его копии всем подключенным потребителям.
    • JoinBlock<>: ожидает поступления сообщений во все свои целевые порты, а затем объединяет их в кортеж (Tuple) или заданный тип и передаёт дальше.
    • BatchBlock<T>: накапливает указанное количество сообщений и отправляет их далее единым массивом (List/Array). Полезен для пакетных операций, например, массовой вставки в БД.
  3. Управление параллелизмомБиблиотека предоставляет мощные инструменты для контроля степени параллелизма:
    • MaxDegreeOfParallelism: можно указать, сколько задач может одновременно обрабатывать один блок. Значение DataflowBlockOptions.Unbounded (по умолчанию) означает отсутствие ограничений.
    • BoundedCapacity: задаёт максимальную ёмкость блока (количество ожидающих сообщений). Если лимит достигнут, новые сообщения не принимаются до тех пор, пока в блоке не освободится место. Это позволяет реализовать обратное давление (backpressure) и защитить систему от перегрузки.
  4. Асинхронность по умолчаниюВсе методы отправки и получения данных являются асинхронными (возвращают Task), что идеально вписывается в современную модель асинхронного программирования в .NET (async/await) и не блокирует потоки.
  5. Связывание блоков (LinkTo)Блоки соединяются методом LinkTo. Можно создавать сложные топологии: ветвления, слияния потоков, циклы. При связывании можно использовать фильтры (predicate), чтобы передавать только определённые сообщения.
  6. Обработка ошибок и завершение работыУ каждого блока есть статус завершения (Completion — задача Task). Если в одном из блоков произошла ошибка, он помечается как завершённый с ошибкой. Эту задачу можно await'ить у последующих блоков в конвейере, и они автоматически распространят ошибку дальше по цепочке. Это значительно упрощает обработку исключений в сложных потоках данных.

Пример простого конвейера

Допустим, нужно прочитать строки из файла, посчитать длину каждой строки и вывести результат.

csharp
Копировать
// Настройка опций для управления параллелизмом
var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };

// Создаем блоки
var readFileBlock = new TransformBlock<string, string[]>(path =>
{
    return File.ReadAllLines(path);
}, executionOptions);

var computeLengthBlock = new TransformBlock<string[], int[]>(lines =>
{
    return lines.Select(line => line.Length).ToArray();
}, executionOptions);

var printResultBlock = new ActionBlock<int[]>(lengths =>
{
    foreach (var length in lengths)
        Console.WriteLine(length);
}, executionOptions);

// Соединяем блоки в конвейер
readFileBlock.LinkTo(computeLengthBlock);
computeLengthBlock.LinkTo(printResultBlock);

// Запускаем конвейер
readFileBlock.Post("myfile.txt");

// Сигнализируем об окончании ввода данных
readFileBlock.Complete();

// Ждем завершения всего конвейера
printResultBlock.Completion.Wait();

Итог: Tpl.Dataflow — это мощный инструмент для создания отказоустойчивых, масштабируемых и легко поддерживаемых систем обработки данных, которые эффективно используют ресурсы системы благодаря встроенному управлению параллелизмом и асинхронности.

Давайте начнем с простого. Если эта задача НЕ простая сообщи - мы упростим задание..NetCore, Worker проект, Backgroundservcie, очень простой EventВus на три канала,Три параллельных потока Tpl.Dataflow, исполнение Handlers EventBus в Параллельных потоках Tpl.Dataflow.У нас есть .NetCore, Worker проект, BAckgroundservice.В контексте BAckgroundservice находится шина EventВus с тремя events.Шина EventВus работает с тремя парами кеу, Handler.Обычно шины EventВus последоввательно обрабытывают приходящие Event. Мы тоже будем подавать на вход Events. EventBus будет обрабатывать эти events, но НЕ будут обрабатывать (запускать) HAndlers в контексе EventBus. Все эти найденные по ключу HAndlers будут перенаправляться в параллельные потоки Tpl.Dataflow и уже в контексте потока Tpl.Dataflow исполняться. На каждый Handlers необходимо создать один поток Tpl.Dataflow.Таким образом вместо последовательного исполнения Handlers в контекте EventBus, мы исполняем наши HAndlers в контексте потоков Tpl.Dataflow ПАРАЛЛЕЛЬНО. У нас три events в EventsBus, значит должно быть три параллельных потока Tpl.Dataflow, в которых исполняются ПАРАЛЛЕЛЬНО три наших Handlers.Надеюсь понятно написал.

Ответ

Да, задача абсолютно понятна. Это классический паттерн для использования Tpl.Dataflow: у вас есть источник событий (EventBus) и несколько независимых обработчиков, которые должны работать параллельно и не блокировать друг друга.

Вместо того чтобы вызывать handler.Handle(event) прямо в методе Publish (что привело бы к последовательному выполнению), мы будем отправлять задачи на обработку в специальные блоки ActionBlock. Каждый такой блок работает в своём собственном потоке из пула задач.

Вот пошаговый план реализации и готовый код.

Архитектура решения

  1. IEventHandler<TEvent>: Интерфейс для обработчиков. Он определяет метод Handle, который будет содержать бизнес-логику.
  2. EventBus: Центральный класс. Он хранит словарь, где ключом является тип события, а значением — список его обработчиков.
    • Метод Subscribe<TEvent> позволяет регистрировать обработчики.
    • Метод Publish<TEvent> принимает событие, находит все его подписчики и отправляет задачу на выполнение в соответствующий блок Tpl.Dataflow.
  3. Блоки Tpl.Dataflow (ActionBlock): В конструкторе EventBus создаются три блока (по одному на каждый тип события). Каждый блок настроен на параллельную обработку (MaxDegreeOfParallelism > 1), что позволяет ему выполнять несколько задач одновременно, если они поступают быстрее, чем обрабатываются.
  4. Worker (BackgroundService): В методе ExecuteAsync имитируется приход событий. В реальном приложении события могли бы поступать из очереди (RabbitMQ, Kafka), по HTTP или из других источников.

Реализация

Создайте новый .NET Worker проект (dotnet new worker) и замените содержимое файлов следующим кодом.

1. Интерфейс обработчика (IEventHandler.cs)

csharp
Копировать
// Определяем контракт для всех обработчиков событий
public interface IEventHandler<in TEvent>
{
    Task Handle(TEvent @event);
}

2. Классы событий (Events.cs)

Для примера создадим три пустых класса-события.

csharp
Копировать
public class FirstEvent { }
public class SecondEvent { }
public class ThirdEvent { }

3. Классы-обработчики (Handlers.cs)

Создадим по одному простому обработчику для каждого события. В реальном коде здесь будет ваша логика.

csharp
Копировать
public class FirstEventHandler : IEventHandler<FirstEvent>
{
    public async Task Handle(FirstEvent @event)
    {
        // Имитация долгой работы
        await Task.Delay(1000);
        Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] Обработан FirstEvent в потоке {Thread.CurrentThread.ManagedThreadId}");
    }
}

public class SecondEventHandler : IEventHandler<SecondEvent>
{
    public async Task Handle(SecondEvent @event)
    {
        await Task.Delay(1500);
        Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] Обработан SecondEvent в потоке {Thread.CurrentThread.ManagedThreadId}");
    }
}

public class ThirdEventHandler : IEventHandler<ThirdEvent>
{
    public async Task Handle(ThirdEvent @event)
    {
        await Task.Delay(2000);
        Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] Обработан ThirdEvent в потоке {Thread.CurrentThread.ManagedThreadId}");
    }
}

4. Шина событий (EventBus.cs)

Это ключевой компонент, который связывает события с блоками Tpl.Dataflow.

csharp
Копировать
using System.Collections.Concurrent;
using System.Threading.Tasks.Dataflow;

public class EventBus
{
    // Словарь для хранения подписок: Тип события -> Список его обработчиков
    private readonly ConcurrentDictionary<Type, List<object>> _handlers = new();

    // Словарь для хранения блоков обработки данных: Тип события -> ActionBlock
    private readonly ConcurrentDictionary<Type, ActionBlock<Func<Task>>> _actionBlocks = new();

    // Настройки для блоков Dataflow: разрешаем параллельное выполнение задач внутри одного блока
    private readonly ExecutionDataflowBlockOptions _blockOptions = new()
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded // Можно установить конкретное число, например, 4
    };

    public EventBus()
    {
        // При создании EventBus блоки еще не созданы.
        // Они будут созданы динамически при первой подписке на событие.
    }

    // Подписка обработчика на событие
    public void Subscribe<TEvent>(IEventHandler<TEvent> handler)
    {
        var eventType = typeof(TEvent);

        // Добавляем обработчик в словарь подписчиков
        _handlers.AddOrUpdate(
            eventType,
            _ => new List<object> { handler },
            (_, list) => { list.Add(handler); return list; });

        // Создаем ActionBlock для этого типа события, если он еще не создан.
        // ActionBlock принимает на вход функцию (Func<Task>), которую он асинхронно исполняет.
        _actionBlocks.GetOrAdd(eventType, _ => new ActionBlock<Func<Task>>(async action =>
        {
            await action(); // Исполняем переданную функцию (вызов Handle)
        }, _blockOptions));
    }

    // Публикация события
    public async Task Publish<TEvent>(TEvent @event)
    {
        var eventType = typeof(TEvent);

        if (_handlers.TryGetValue(eventType, out var handlersList))
        {
            if (_actionBlocks.TryGetValue(eventType, out var actionBlock))
            {
                // Для каждого подписчика создаем задачу и отправляем ее в ActionBlock.
                // Важно: мы НЕ ждем выполнения здесь. Мы просто отправляем "работу" в очередь блока.
                foreach (var handler in handlersList)
                {
                    var concreteHandler = (IEventHandler<TEvent>)handler;
                    await actionBlock.SendAsync(() => concreteHandler.Handle(@event));
                }
            }
        }
    }
}

5. Класс Worker (Worker.cs)

Здесь мы настраиваем зависимости и запускаем симуляцию событий.

csharp
Копировать
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;

public class Worker : BackgroundService
{
    private readonly ILogger<Worker> _logger;
    private readonly EventBus _eventBus;

    public Worker(ILogger<Worker> logger, EventBus eventBus)
    {
        _logger = logger;
        _eventBus = eventBus;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Регистрируем наши обработчики в шине событий.
        // Это можно сделать через DI, но для простоты делаем здесь.
        _eventBus.Subscribe(new FirstEventHandler());
        _eventBus.Subscribe(new SecondEventHandler());
        _eventBus.Subscribe(new ThirdEventHandler());

        _logger.LogInformation("Worker started at: {time}", DateTimeOffset.Now);

        while (!stoppingToken.IsCancellationRequested)
        {
            _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);

            // --- Симуляция прихода событий ---
            // Публикуем события с небольшой задержкой между ними,
            // чтобы увидеть параллельное выполнение в консоли.
            _logger.LogInformation("Публикуем FirstEvent");
            await _eventBus.Publish(new FirstEvent());

            await Task.Delay(200, stoppingToken); // Задержка

            _logger.LogInformation("Публикуем SecondEvent");
            await _eventBus.Publish(new SecondEvent());

            await Task.Delay(200, stoppingToken); // Задержка

            _logger.LogInformation("Публикуем ThirdEvent");
            await _eventBus.Publish(new ThirdEvent());

            // Ждем завершения всех блоков обработки перед следующей итерацией,
            // чтобы вывод в консоль был более читаемым.
            await Task.Delay(3000, stoppingToken); // Ждем дольше самого долгого события (2000мс + запас)
            
            _logger.LogInformation("--- Цикл завершен ---");
            await Task.Delay(5000, stoppingToken); // Пауза перед следующим циклом симуляции
        }
    }
}

Важное замечание по Worker: В реальном сервисе вам не нужно будет вызывать await _eventBus.Publish(...) вручную. Вместо этого вы будете получать события из внешнего источника (например, из очереди сообщений) и передавать их в шину. Цикл с Task.Delay здесь нужен только для демонстрации работы.

Как это работает?

  1. При запуске Worker создает и регистрирует три обработчика. При регистрации каждого из них в EventBus создается соответствующий ему ActionBlock.
  2. В цикле ExecuteAsync мы "публикуем" события одно за другим.
  3. Метод Publish находит нужный ActionBlock и отправляет в него задачу на выполнение метода Handle.
  4. Блок ActionBlock забирает эту задачу и исполняет её в потоке из пула задач. Поскольку у нас три разных блока для трех типов событий, их обработка происходит параллельно.
  5. Если вы опубликуете два FirstEvent подряд очень быстро, вы увидите, что их обработка может происходить параллельно внутри одного блока, так как мы установили MaxDegreeOfParallelism = Unbounded.
  6. Главный поток Worker не ждет завершения обработки событий (await actionBlock.SendAsync(...) не ждет await handler.Handle(...)), а сразу продолжает работу. Это именно то поведение, которое вы хотели достичь.




Комментариев нет:

Отправить комментарий