We plan flows like Thomas Jefferson
This article focuses on how to distribute tasks across queue pipelines to minimize total processing time, and an unexpected connection between this scheduling method and Thomas Jefferson’s method.
Contents
Background and motivation
You know how on assembly lines in production and in instruction pipelines in processors, a form of parallelism is created, without violating determinism (the order of input data to the pipeline is preserved at the output)?
We’ll call this implicit parallelism through pipelining. Implicit, because simply by dividing the task into stages, we automatically get parallelism. When this method is implemented in software, it allows us to write fully sequential programs (for each stage) using parallel hardware efficiently.
How does it work? Each stage runs independently (on one processor/core) and the stages are connected via queues. When the first processor/core completes the first stage of the first element, it passes it to the second stage and continues to work on the first stage of the second element. As the queues between stages fill up, we achieve parallelism while maintaining determinism (outputs arrive in the same order as inputs).
Here is an example of a conveyor belt in a bottle factory:
Each stage is connected by conveyor belts (queues) and works in parallel with the others, for example, after filling the first bottle, you can fill the second while simultaneously closing the first bottle, etc.
If a stage is running slowly, the input queue for that stage can be split or broken up by adding another handler to that stage. At the same time, every other input element will be sent to a new processor, which effectively almost doubles the throughput.
In this article, I would like to discuss the following question: How best to allocate processors/cores between stages? For example, if we only have two processors/cores, but three stages, then it makes no sense to allocate one of them to the last stage until at least some of the elements have been processed in the second stage.
Initially I want to develop a library to test these concepts, but in the long term I would like to create a programming language that uses these ideas to see if we can create something that scales with the number of CPUs/cores available.
Inspiration and previous works
Although examples of conveyorization in manufacturing existed before Henry Ford, it seems that this is when it became widespread. Wikipedia says:
On the assembly line, driven by conveyor belts, reduced production time for Model T to just 93 minutes by dividing the process into 45 steps. By producing cars quicker, which colors the day could dry, he had a name of influence in the world.
For comparison, before the introduction of the assembly line for car production it took about 12.5 hours.
Processors are another example of using pipelines to speed up instruction processing. The pipeline can look like this: fetching the instruction, fetching the operands, executing the instruction, and finally writing the results.
Given the tremendous success of this concept in both manufacturing and hardware, would you expect pipelining to become popular in software as well? However, for some reason this has not happened yet, although this idea has supporters.
Jim Gray talked about parallelism in software pipelines and division of tasks in his interview on the occasion of winning the Turing Award. Programming languages based on data streamsin particular programming based on data flows Paul Morrison use this idea. Pattern LMAX Disruptor is also based on pipeline parallelism and supports what Jim calls division parallelism. One of the sources cited by Disruptor is an article SEDA: An Architecture for Well-Conditioned, Scalable Internet Services (2001)where it is also about pipelines and dynamic distribution of flows by stages. Recently, studying work Jim, I found that database engines also sell something similar to pipelined parallelism. An example of database engines using this technique is provided in the article: Morsel-Driven Parallelism.
These examples of program pipeline parallelism inspired me to think about this concept. However, it wasn’t until I read the following statement by Martin Thompson, one of the developers of LMAX Disruptor, that I really began to think about how to design a programming language to more easily implement pipelining in software:
If you think I want Erlang folks, you are going right from a high level, but you have to investovat in your function infrastructure to be super fast, super efficient and obeys all the right properties to let stuff work really well.
after hearing anecdote Joe Armstrong’s report that an unmodified Erlang program ran only 33 times faster on a 64-core machine, not 64 times as expected by Ericsson management, I began to think about how a programming language could be designed to facilitate pipelining in applications .
I started researching this topic at two previous ones articlesas well as wrote about elastic scaling one step up and down, but in this post we will take a more global approach.
General picture
The system consists of three parts: a conveyor, processors and a scheduler:
Planner monitors the pipeline, estimating the length of the incoming queues of each stage and the average service time of this stage. Using this data, it calculates how to allocate available processors.
The processor distribution algorithm works as follows:
-
All possible changes in the distribution of processors by stages are generated;
-
Each configuration is evaluated using the following formula: where is a stage, is the length of the input queue at stage s, is the average processing time at stage s, and is the number of processors allocated to this stage.
-
The configuration with the smallest value is selected, i.e., where the total processing time is minimum.
Processors, usually one per available processor/core, process a batch of input data at a stage specified by the scheduler and then report back to the scheduler, after which the process repeats until the input stream is complete.
If we zoom in and take a closer look at the pipeline, we can see that it consists of a source, N stages, and an end node:
The source can be a file, a network socket, a list of items provided by the user, etc., from which the input for the first-stage queue is created. Input data can be length-prefixed bytes, raw bytes, newline-delimited bytes, etc. Similarly, the endpoint can be a file, stdout, or socket. Between the source and the final node, basic processing takes place at various stages.
Implementation prototype
I hope it’s clear from the picture above that most of the code will be “binding” (connecting components using queues). The most interesting part is how the scheduler determines which task to assign to the handler after it has finished the previous one.
Let’s start by presenting the configuration of handlers distributed along the pipeline. Each stage has a name or an identifier, so the configuration can be represented as a pair of identifiers and the number of processors assigned to this stage.
newtype Config = Config (Map StageId NumOfWorkers)
deriving Show
type NumOfWorkers = Int
Initial configuration – all stages are assigned zero handlers:
initConfig :: [StageId] -> Config
initConfig stageIds =
Config (Map.fromList (zip stageIds (replicate (length stageIds) 0)))
The implementation of the allocation of processors begins with the generation of all possible variants of configurations, discarding those that assign processors to completed stages (“completed” is the stage at which no more data will arrive), evaluating all configurations and choosing the one with the lowest score.
allocateWorkers :: Int -> Map StageId QueueStats -> Set StageId -> Maybe Config
allocateWorkers cpus qstats done = case result of
[] -> Nothing
(cfg, _score) : _ -> Just cfg
where
result = sortBy (comparing snd)
[ (cfg, sum (Map.elems (scores qstats cfg)))
| cfg
All possible configurations are generated as follows:
possibleConfigs :: Int -> [StageId] -> [Config]
possibleConfigs cpus stages = map (Config . Map.fromList . zip stages) $ filter ((== cpus) . sum)
[ foldl' (\ih i -> update i succ ih) (replicate (length stages) 0) slot
| choice Int -> [[a]]
combinations xs n = filter ((== n) . length) (subsequences xs)
-- update i f xs = xs[i] := f (xs[i])
update :: Int -> (a -> a) -> [a] -> [a]
update i f = go [] i
where
go acc _ [] = reverse acc
go acc 0 (x : xs) = reverse acc ++ f x : xs
go acc n (x : xs) = go (x : acc) (n - 1) xs
While the evaluation is carried out as follows:
scores :: Map StageId QueueStats -> Config -> Map StageId Double
scores qss (Config cfg) = joinMapsWith score qss cfg
where
score :: QueueStats -> Int -> Double
score qs workers =
(fromIntegral (queueLength qs) * fromIntegral avgServiceTimePicos)
/
(fromIntegral workers + 1)
where
avgServiceTimePicos :: Word64
avgServiceTimePicos
| len == 0 = 1 -- XXX: What's the right value here?
| otherwise = sum (serviceTimesPicos qs) `div` len
where
len :: Word64
len = genericLength (serviceTimesPicos qs)
A small helper function for merging hash tables:
joinMapsWith :: Ord k => (a -> b -> c) -> Map ka -> Map kb -> Map kc joinMapsWith f m1 m2 = assert (Map.keys m1 == Map.keys m2) $ Map.fromList
[ (k, f x (m2 Map.! k))
| (k, x)
Последнее, что нам нужно сделать, это определить конфигурации, которые назначают обработчики на завершенные стадии:
allocatesDoneStages :: Config -> Set StageId -> Bool
allocatesDoneStages (Config cfg) done =
any (\(stageId, numWorkers) -> stageId `Set.member` done && numWorkers > 0)
(Map.toList cfg)
Запуск прототипа
Давайте завершим с парой примеров в REPL. Предположим, у нас есть два обработчика и два этапа (A и B). Этап A имеет три элемента в своей входной очереди (и это будут все входные данные, которые он получит), и ни один этап еще не завершен (это последний аргумент S.empty
):
>>> allocateWorkers 2
(M.fromList [ ("A", QueueStats 3 []) , ("B", QueueStats 0 [])]) S.empty
(Designer QueueStats
takes the length of the input queue as the first argument and the list of processing times as the second argument.)
If we run the code above, we get:
Just (Config (fromList [("A",2),("B",0)]))
This means that both processors must be allocated for stage A. Suppose we make this allocation, and after one time unit both processors have finished. This means that there is now one element left in the input queue, while the second stage (B) now has two elements in its input queue. Since both handlers have finished, we run the allocation function again:
>>> allocateWorkers 2
(M.fromList [ ("A", QueueStats 1 [1,1])
, ("B", QueueStats 2 [])])
S.empty
Just (Config (fromList [("A",1),("B",1)]))
This means that we have to allocate one handler for each stage. If we imagine again that we did this and they both finished after one time unit, we are in a situation where all three elements have been processed in the first stage (A) and we can mark it as complete, while in The second stage will have two items in its input queue:
>>> allocateWorkers 2
(M.fromList [ ("A", QueueStats 0 [1,1,1])
, ("B", QueueStats 2 [1])])
(S.fromList ["A"])
Just (Config (fromList [("A",0),("B",2)]))
In this case, the allocation of handlers will mean that both handlers will be allocated for the second stage. After the handlers have finished working on these elements, the second stage will also process all the elements, and we are done with the process:
>>> allocateWorkers 2
(M.fromList [ ("A", QueueStats 0 [1,1,1])
, ("B", QueueStats 0 [1,1,1])])
(S.fromList ["A", "B"])
Nothing
An unexpected connection with Thomas Jefferson
When I developed the planned idea described above, I discussed it with my friend Daniel Gustafsson, who immediately replied: “It’s a bit like Jefferson method“(from the allocation of seats in parliaments).
Here’s how it works:
When all the votes are counted, each party’s consecutive coefficients are calculated. The party with the highest coefficient wins one seat and its coefficient is recalculated. This is repeated until the required number of seats is filled. The formula for calculating the coefficient looks like this:
where:
V is the total number of votes received by the party, and
s is the number of seats the party has already won, initially 0 for all parties.”
The analogy is as follows:
-
party: stages in the pipeline.
-
seats per party: handlers allocated to a stage
-
voices: “score” (input queue length multiplied by average processing time)
-
rounds: total number of handlers
Let’s try to repeat the example we looked at earlier, where stage A and stage B had queue lengths of 3 and 2, respectively, but using Jefferson’s method:
-
In the first round, a party/stage gets 1 vote, while a party/stage gets 2 votes, so the odds add up to and respectively, which means the stage wins the round and gets one seat.
-
In the second round we get the odds: and (note that here because the stage/party has already won a place in the previous round). This means we get a draw, in which case I suppose we can arbitrarily choose the first lot so that our example matches the implementation*.
Daniel also explained that although Jefferson proposed this method, it is not actually used in the US, but most countries in Europe, including the EU Parliament, use this method.
Conclusion and future work
We considered a strategy to elastically scale the number of processors/cores allocated to a single pipeline stage. The ability to do this will be useful in the following cases:
-
When the load on the system changes and one stage suddenly becomes slower than the other; with elasticity we can redistribute cores and maintain throughput.
-
When the load decreases, we can scale out and use the cores in other parts of the system.
We have also seen how Thomas Jefferson’s method of apportioning seats in Parliament can be used to solve the same problem. This unexpected connection makes me wonder where else this algorithm might pop up.
We are still a long way from implementing a runtime parallel programming language using these ideas. In particular, current realization uses simple competitive queues to connect stages, which means scaling up a stage does not preserve output determinism. This can be solved with Disruptors as I described in mine old article. I have collected many other tasks that need to be done in a separate one files. If you are interested in any of these, please do not hesitate to get in touch by me.