-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.go
126 lines (108 loc) · 3.6 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/*******************************************************************************
*
* Copyright 2016 SAP SE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You should have received a copy of the License along with this
* program. If not, you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/
package main
import (
"context"
"fmt"
"net/http"
"os"
"sync"
"time"
"github.com/sapcc/go-api-declarations/bininfo"
"github.com/sapcc/go-bits/httpext"
"github.com/sapcc/go-bits/logg"
"github.com/sapcc/go-bits/must"
"github.com/sapcc/go-bits/osext"
"go.uber.org/automaxprocs/maxprocs"
"github.com/sapcc/swift-http-import/pkg/actors"
"github.com/sapcc/swift-http-import/pkg/objects"
)
func main() {
startTime := time.Now()
logg.ShowDebug = osext.GetenvBool("DEBUG")
undoMaxprocs := must.Return(maxprocs.Set(maxprocs.Logger(logg.Debug)))
defer undoMaxprocs()
// setup a context that shuts down all pipeline actors when an interrupt signal is received
ctx := httpext.ContextWithSIGINT(context.Background(), 1*time.Second)
wrap := httpext.WrapTransport(&http.DefaultTransport)
wrap.SetInsecureSkipVerify(osext.GetenvBool("INSECURE")) // for debugging with mitmproxy etc. (DO NOT SET IN PRODUCTION)
wrap.SetOverrideUserAgent(bininfo.Component(), bininfo.VersionOr("dev"))
// read arguments
if len(os.Args) != 2 {
fmt.Fprintln(os.Stderr, "usage: swift-http-import <config-file>")
fmt.Fprintln(os.Stderr, " or: swift-http-import --version")
os.Exit(1) //nolint:gocritic // it doesn't really matter if we run undoMaxprocs here or not
}
if os.Args[1] == "--version" {
fmt.Println("swift-http-import " + bininfo.VersionOr("dev"))
os.Exit(0)
}
// read configuration
config, errs := objects.ReadConfiguration(ctx, os.Args[1])
if len(errs) > 0 {
for _, err := range errs {
logg.Error(err.Error())
}
os.Exit(1)
}
// setup the Report actor
reportChan := make(chan actors.ReportEvent)
report := actors.Report{
Input: reportChan,
Statsd: config.Statsd,
StartTime: startTime,
}
var wgReport sync.WaitGroup
actors.Start(ctx, &report, &wgReport)
// do the work
runPipeline(ctx, config, reportChan)
// shutdown Report actor
close(reportChan)
wgReport.Wait()
os.Exit(report.ExitCode)
}
func runPipeline(ctx context.Context, config *objects.Configuration, report chan<- actors.ReportEvent) {
// start the pipeline actors
var wg sync.WaitGroup
var wgTransfer sync.WaitGroup
queue1 := make(chan objects.File, 10) // will be closed by scraper when it's done
queue2 := make(chan actors.FileInfoForCleaner, 10) // will be closed by us when all transferors are done
actors.Start(ctx, &actors.Scraper{
Jobs: config.Jobs,
Output: queue1,
Report: report,
}, &wg)
for range config.WorkerCounts.Transfer {
actors.Start(ctx, &actors.Transferor{
Input: queue1,
Output: queue2,
Report: report,
}, &wg, &wgTransfer)
}
actors.Start(ctx, &actors.Cleaner{
Input: queue2,
Report: report,
}, &wg)
// wait for transfer phase to finish
wgTransfer.Wait()
// signal to cleaner to start its work
close(queue2)
// wait for remaining workers to finish
wg.Wait()
}