Custom Streams Configuration
Sometimes you need to pass a custom configuration to Kafka Streams:
import com.madewithtea.mockedstreams.MockedStreams
val props = new Properties
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[CustomExtractor].getName)
val mstreams = MockedStreams()
.topology { builder => builder.stream(...) [...] } // Scala DSL
.config(props)
.input("in-a", strings, ints, inputA)
.input("in-b", strings, ints, inputB)
.stores(Seq("store-name"))
mstreams.output("out-a", strings, ints) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints) shouldEqual(expectedB)