Small improvement to Events processor interface
In this commit, we make sure that there are no left over channels after phase.Run is complete Change-Id: Ia9b7a37f19aad892e77fdca5ad02efe8322e612b
This commit is contained in:
parent
1918421ae8
commit
c89cd5fba0
@ -27,6 +27,7 @@ import (
|
|||||||
// EventProcessor use to process event channels produced by executors
|
// EventProcessor use to process event channels produced by executors
|
||||||
type EventProcessor interface {
|
type EventProcessor interface {
|
||||||
Process(<-chan Event) error
|
Process(<-chan Event) error
|
||||||
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultProcessor is implementation of EventProcessor
|
// DefaultProcessor is implementation of EventProcessor
|
||||||
@ -72,6 +73,11 @@ func (p *DefaultProcessor) Process(ch <-chan Event) error {
|
|||||||
return checkErrors(p.errors)
|
return checkErrors(p.errors)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close cleans up the auxiliary channels used to process events
|
||||||
|
func (p *DefaultProcessor) Close() {
|
||||||
|
close(p.applierChan)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *DefaultProcessor) processApplierEvent(e applyevent.Event) {
|
func (p *DefaultProcessor) processApplierEvent(e applyevent.Event) {
|
||||||
if e.Type == applyevent.ErrorType {
|
if e.Type == applyevent.ErrorType {
|
||||||
log.Printf("Received error when applying errors to kubernetes %v", e.ErrorEvent.Err)
|
log.Printf("Received error when applying errors to kubernetes %v", e.ErrorEvent.Err)
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
|
|
||||||
func TestDefaultProcessor(t *testing.T) {
|
func TestDefaultProcessor(t *testing.T) {
|
||||||
proc := events.NewDefaultProcessor(utils.Streams())
|
proc := events.NewDefaultProcessor(utils.Streams())
|
||||||
|
defer proc.Close()
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
events []events.Event
|
events []events.Event
|
||||||
|
@ -115,6 +115,7 @@ func (p *phase) Executor() (ifc.Executor, error) {
|
|||||||
|
|
||||||
// Run runs the phase via executor
|
// Run runs the phase via executor
|
||||||
func (p *phase) Run(ro ifc.RunOptions) error {
|
func (p *phase) Run(ro ifc.RunOptions) error {
|
||||||
|
defer p.processor.Close()
|
||||||
executor, err := p.Executor()
|
executor, err := p.Executor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user