package main import ( "context" "fmt" "sync" "time" ) func main() { const levels = 3 const sourceDepth = 5 sources := make([]chan int, levels) for i := 0; i < levels; i++ { sources[i] = make(chan int, sourceDepth) } out := make(chan int) ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} pc := New(ctx, sources, 10, out) wg.Add(1) go func() { defer wg.Done() defer close(out) pc.Prioritize() }() wg.Add(1) go func() { defer wg.Done() for i := range out { fmt.Println("i: ", i) time.Sleep(time.Second / 4) } }() for _, i := range []int{0, 2, 1, 0, 2, 1, 0, 2, 1} { fmt.Println("submitting ", i) pc.Submit(i, i) } time.Sleep(time.Second * 3) cancel() wg.Wait() } type PriorityChannel struct { notify chan struct{} sources []chan int out chan int ctx context.Context } func New(ctx context.Context, sources []chan int, cap int, out chan int) PriorityChannel { pc := PriorityChannel{ notify: make(chan struct{}, cap), sources: sources, out: out, ctx: ctx, } for i := 0; i < cap; i++ { pc.notify <- struct{}{} } return pc } func (pc PriorityChannel) Prioritize() { for { // block until there's a value select { case pc.notify <- struct{}{}: // proceed case <-pc.ctx.Done(): return } SOURCES: for _, rcv := range pc.sources { select { case i := <-rcv: pc.out <- i break SOURCES default: // keep looping } } } } func (pc PriorityChannel) Submit(i, priority int) { if priority < 0 || priority >= len(pc.sources) { panic("invalid priority") } pc.sources[priority] <- i <-pc.notify }
Jason Mansfield is a software engineer, security enthusiast, and crazy thinker living in San Diego.
Thursday, February 27, 2020
Priority Channel in Go
I'm kind of impressed with this ugly monster:
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.