Skip to content

feat: support s3 urls for input and output #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions checks.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jacocoTestCoverageVerification {
'*.static {...}',
'*.model.*.get*',
'*.service.localencode.LocalEncodeService.moveFile*',
'*.S3Properties*.get*()',
'*RemoteFileService.DefaultHandler.*',
'*QueueService.getQueue*',
'*QueueService.migrateQueues()',
'*.ShutdownHandler.*',
Expand Down
3 changes: 3 additions & 0 deletions encore-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ dependencies {

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-slf4j")
implementation(platform("software.amazon.awssdk:bom:2.29.2"))
implementation("software.amazon.awssdk:s3")

testImplementation(project(":encore-web"))
testImplementation("org.springframework.security:spring-security-test")
Expand All @@ -26,6 +28,7 @@ dependencies {
testFixturesImplementation("com.redis:testcontainers-redis:2.2.4")
testFixturesImplementation("io.github.microutils:kotlin-logging:3.0.5")
testFixturesImplementation("org.junit.jupiter:junit-jupiter-api")
testFixturesImplementation("org.testcontainers:localstack:1.20.3")
testFixturesRuntimeOnly("org.junit.platform:junit-platform-launcher")
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB
//
// SPDX-License-Identifier: EUPL-1.2

package se.svt.oss.encore

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import se.svt.oss.encore.service.remotefiles.s3.S3Properties
import se.svt.oss.encore.service.remotefiles.s3.S3RemoteFileHandler
import se.svt.oss.encore.service.remotefiles.s3.S3UriConverter
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.S3Configuration
import software.amazon.awssdk.services.s3.presigner.S3Presigner
import java.net.URI

@ConditionalOnProperty("remote-files.s3.enabled", havingValue = "true")
@EnableConfigurationProperties(S3Properties::class)
@Configuration
class S3RemoteFilesConfiguration {

@Bean
fun s3Region() =
Region.of(System.getProperty("aws.region") ?: System.getenv("AWS_REGION") ?: "us-east-1")

@Bean
fun s3Client(s3Region: Region, s3Properties: S3Properties) = S3AsyncClient.builder()
.region(s3Region)
.crossRegionAccessEnabled(true)
.multipartEnabled(!s3Properties.anonymousAccess) // Multipart upload requires credentials
.serviceConfiguration(
S3Configuration.builder()
.pathStyleAccessEnabled(true)
.build(),
)
.credentialsProvider(
if (s3Properties.anonymousAccess) {
AnonymousCredentialsProvider.create()
} else {
DefaultCredentialsProvider.create()
},
)
.apply {
if (!s3Properties.endpoint.isNullOrBlank()) {
endpointOverride(URI.create(s3Properties.endpoint))
}
}
.build()

@Bean
fun s3Presigner(s3Region: Region, s3Properties: S3Properties) = S3Presigner.builder()
.region(s3Region)
.serviceConfiguration(
S3Configuration.builder()
.pathStyleAccessEnabled(true)
.build(),
)
.apply {
if (!s3Properties.endpoint.isNullOrBlank()) {
endpointOverride(URI.create(s3Properties.endpoint))
}
}
.build()

@Bean
fun s3UriConverter(s3Properties: S3Properties, s3Region: Region) = S3UriConverter(s3Properties, s3Region)

@Bean
fun s3RemoteFileHandler(
s3Client: S3AsyncClient,
s3Presigner: S3Presigner,
s3Properties: S3Properties,
s3UriConverter: S3UriConverter,
) =
S3RemoteFileHandler(s3Client, s3Presigner, s3Properties, s3UriConverter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ sealed interface Input {
@get:Schema(description = "URI of input file", required = true, example = "/path/to/file.mp4")
val uri: String

var accessUri: String

@get:Schema(description = "Input params required to properly decode input", example = """{ "ac": "2" }""")
val params: LinkedHashMap<String, String?>

Expand Down Expand Up @@ -167,6 +169,9 @@ data class AudioInput(
override val type: String
get() = TYPE_AUDIO

@JsonIgnore
override var accessUri: String = uri

override fun withSeekTo(seekTo: Double) = copy(seekTo = seekTo)

val duration: Double
Expand All @@ -188,6 +193,9 @@ data class VideoInput(
override val seekTo: Double? = null,
override val copyTs: Boolean = false,
) : VideoIn {
@JsonIgnore
override var accessUri: String = uri

override val analyzedVideo: VideoFile
@JsonIgnore
get() = analyzed as? VideoFile ?: throw RuntimeException("Analyzed video for $uri is ${analyzed?.type}")
Expand Down Expand Up @@ -221,6 +229,9 @@ data class AudioVideoInput(
override val copyTs: Boolean = false,
) : VideoIn,
AudioIn {
@JsonIgnore
override var accessUri: String = uri

override val analyzedVideo: VideoFile
@JsonIgnore
get() = analyzed as? VideoFile ?: throw RuntimeException("Analyzed audio/video for $uri is ${analyzed?.type}")
Expand All @@ -245,7 +256,7 @@ fun List<Input>.inputParams(readDuration: Double?): List<String> =
(readDuration?.let { listOf("-t", "$it") } ?: emptyList()) +
(input.seekTo?.let { listOf("-ss", "$it") } ?: emptyList()) +
(if (input.copyTs) listOf("-copyts") else emptyList()) +
listOf("-i", input.uri)
listOf("-i", input.accessUri)
}

fun List<Input>.maxDuration(): Double? = maxOfOrNull {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import se.svt.oss.encore.service.callback.CallbackService
import se.svt.oss.encore.service.localencode.LocalEncodeService
import se.svt.oss.encore.service.mediaanalyzer.MediaAnalyzerService
import se.svt.oss.encore.service.queue.QueueService
import se.svt.oss.encore.service.remotefiles.RemoteFileService
import se.svt.oss.mediaanalyzer.file.MediaContainer
import se.svt.oss.mediaanalyzer.file.MediaFile
import java.io.File
Expand All @@ -67,6 +68,7 @@ class EncoreService(
private val localEncodeService: LocalEncodeService,
private val encoreProperties: EncoreProperties,
private val queueService: QueueService,
private val remoteFileService: RemoteFileService,
) {

private val cancelTopicName = "cancel"
Expand Down Expand Up @@ -227,7 +229,7 @@ class EncoreService(
callbackService.sendProgressCallback(encoreJob)
} finally {
redisMessageListerenerContainer.removeMessageListener(cancelListener)
localEncodeService.cleanup(outputFolder)
localEncodeService.cleanup(outputFolder, encoreJob)
}
}

Expand Down Expand Up @@ -270,6 +272,10 @@ class EncoreService(
}

private fun initJob(encoreJob: EncoreJob) {
encoreJob.inputs.forEach { input ->
input.accessUri = remoteFileService.getAccessUri(input.uri)
}

encoreJob.inputs.forEach { input ->
mediaAnalyzerService.analyzeInput(input)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import org.springframework.stereotype.Service
import se.svt.oss.encore.config.EncoreProperties
import se.svt.oss.encore.model.EncoreJob
import se.svt.oss.encore.process.createTempDir
import se.svt.oss.encore.service.remotefiles.RemoteFileService
import se.svt.oss.mediaanalyzer.file.AudioFile
import se.svt.oss.mediaanalyzer.file.ImageFile
import se.svt.oss.mediaanalyzer.file.MediaFile
import se.svt.oss.mediaanalyzer.file.VideoFile
import java.io.File
import java.net.URI
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
Expand All @@ -23,11 +25,12 @@ private val log = KotlinLogging.logger {}
@Service
class LocalEncodeService(
private val encoreProperties: EncoreProperties,
private val remoteFileService: RemoteFileService,
) {

fun outputFolder(
encoreJob: EncoreJob,
): String = if (encoreProperties.localTemporaryEncode) {
): String = if (encoreProperties.localTemporaryEncode || remoteFileService.isRemoteFile(encoreJob.outputFolder)) {
createTempDir("job_${encoreJob.id}").toString()
} else {
encoreJob.outputFolder
Expand All @@ -38,6 +41,23 @@ class LocalEncodeService(
output: List<MediaFile>,
encoreJob: EncoreJob,
): List<MediaFile> {
if (remoteFileService.isRemoteFile(encoreJob.outputFolder)) {
log.debug { "Moving files to output destination ${encoreJob.outputFolder}, from local temp $outputFolder" }
File(outputFolder).listFiles()?.forEach { localFile ->
val remoteFile = URI.create(encoreJob.outputFolder).resolve(localFile.name).toString()
remoteFileService.upload(localFile.toString(), remoteFile)
}
val files = output.map {
val resolvedPath = URI.create(encoreJob.outputFolder).resolve(Path.of(it.file).fileName.toString()).toString()
when (it) {
is VideoFile -> it.copy(file = resolvedPath)
is AudioFile -> it.copy(file = resolvedPath)
is ImageFile -> it.copy(file = resolvedPath)
else -> throw Exception("Invalid conversion")
}
}
return files
}
if (encoreProperties.localTemporaryEncode) {
val destination = File(encoreJob.outputFolder)
log.debug { "Moving files to correct outputFolder ${encoreJob.outputFolder}, from local temp $outputFolder" }
Expand All @@ -50,8 +70,10 @@ class LocalEncodeService(
return output
}

fun cleanup(tempDirectory: String?) {
if (tempDirectory != null && encoreProperties.localTemporaryEncode) {
fun cleanup(tempDirectory: String?, encoreJob: EncoreJob) {
if (tempDirectory != null &&
(encoreProperties.localTemporaryEncode || remoteFileService.isRemoteFile(encoreJob.outputFolder))
) {
File(tempDirectory).deleteRecursively()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import se.svt.oss.mediaanalyzer.ffprobe.SideData
import se.svt.oss.mediaanalyzer.ffprobe.UnknownSideData
import se.svt.oss.mediaanalyzer.ffprobe.UnknownStream
import se.svt.oss.mediaanalyzer.file.AudioFile
import se.svt.oss.mediaanalyzer.file.ImageFile
import se.svt.oss.mediaanalyzer.file.SubtitleFile
import se.svt.oss.mediaanalyzer.file.VideoFile
import se.svt.oss.mediaanalyzer.mediainfo.AudioTrack
import se.svt.oss.mediaanalyzer.mediainfo.GeneralTrack
Expand Down Expand Up @@ -58,20 +60,25 @@ class MediaAnalyzerService(private val mediaAnalyzer: MediaAnalyzer) {
val useFirstAudioStreams = (input as? AudioIn)?.channelLayout?.channels?.size

input.analyzed = mediaAnalyzer.analyze(
file = input.uri,
file = input.accessUri,
probeInterlaced = probeInterlaced,
ffprobeInputParams = input.params,
).let {
val selectedVideoStream = (input as? VideoIn)?.videoStream
val selectedAudioStream = (input as? AudioIn)?.audioStream
when (it) {
is VideoFile -> it.selectVideoStream(selectedVideoStream)
.selectAudioStream(selectedAudioStream)
.trimAudio(useFirstAudioStreams)
is AudioFile -> it.selectAudioStream(selectedAudioStream)
.trimAudio(useFirstAudioStreams)
else -> it
)
.let {
val selectedVideoStream = (input as? VideoIn)?.videoStream
val selectedAudioStream = (input as? AudioIn)?.audioStream
when (it) {
is VideoFile -> it.selectVideoStream(selectedVideoStream)
.selectAudioStream(selectedAudioStream)
.trimAudio(useFirstAudioStreams)
.copy(file = input.uri)
is AudioFile -> it.selectAudioStream(selectedAudioStream)
.trimAudio(useFirstAudioStreams)
.copy(file = input.uri)
is ImageFile -> it.copy(file = input.uri)
is SubtitleFile -> it.copy(file = input.uri)
else -> it
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB
//
// SPDX-License-Identifier: EUPL-1.2

package se.svt.oss.encore.service.remotefiles

interface RemoteFileHandler {
fun getAccessUri(uri: String): String
fun upload(localFile: String, remoteFile: String)
val protocols: List<String>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB
//
// SPDX-License-Identifier: EUPL-1.2

package se.svt.oss.encore.service.remotefiles

import io.github.oshai.kotlinlogging.KotlinLogging
import org.springframework.stereotype.Service
import java.net.URI

private val log = KotlinLogging.logger {}

@Service
class RemoteFileService(private val remoteFileHandlers: List<RemoteFileHandler>) {

private val defaultHandler = DefaultHandler()

fun isRemoteFile(uriOrPath: String): Boolean {
val uri = URI.create(uriOrPath)
return !(uri.scheme.isNullOrEmpty() || uri.scheme.lowercase() == "file")
}

fun getAccessUri(uriOrPath: String): String {
val uri = URI.create(uriOrPath)
return getHandler(uri).getAccessUri(uriOrPath)
}

fun upload(localFile: String, remoteFile: String) {
val uri = URI.create(remoteFile)
getHandler(uri).upload(localFile, remoteFile)
}

private fun getHandler(uri: URI): RemoteFileHandler {
log.info { "Getting handler for uri $uri. Available protocols: ${remoteFileHandlers.flatMap {it.protocols} }" }
if (uri.scheme.isNullOrEmpty() || uri.scheme.lowercase() == "file") {
return defaultHandler
}
val handler = remoteFileHandlers.firstOrNull { it.protocols.contains(uri.scheme) }
if (handler != null) {
return handler
}
log.info { "No remote file handler found for protocol ${uri.scheme}. Using default handler." }
return defaultHandler
}

/** Handler user for protocols where no specific handler is defined. Works for local files and
* any protocols that ffmpeg supports natively */
private class DefaultHandler : RemoteFileHandler {
override fun getAccessUri(uri: String): String = uri

override fun upload(localFile: String, remoteFile: String) {
// Do nothing
}

override val protocols: List<String> = emptyList()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB
//
// SPDX-License-Identifier: EUPL-1.2

package se.svt.oss.encore.service.remotefiles.s3

import org.springframework.boot.context.properties.ConfigurationProperties
import java.time.Duration

@ConfigurationProperties("remote-files.s3")
data class S3Properties(
val enabled: Boolean = false,
val anonymousAccess: Boolean = false,
val endpoint: String = "",
val presignDurationSeconds: Long = Duration.ofHours(12).seconds,
val uploadTimeoutSeconds: Long = Duration.ofHours(1).seconds,
)
Loading