Configuration¶
The available broker configuration options depend on the implementation of ConnectionFactoryProvider being used.
Each broker provider defines its own set of options.
For example, when using ActiveMqConnectionFactoryProvider from the provider-activemq module, you can configure the broker URL:
import io.github.dyaraev.spark.connector.jms.provider.activemq.ActiveMqConnectionFactoryProvider
df.writeStream
.format("jms-v2")
.option("jms.connection.provider", "active-mq")
.option("jms.connection.broker.url", "tcp://localhost:61616")
// other options
.start()
.awaitTermination()
Depending on the connector implementation, you must use the corresponding format value for each DataStreamReader and DataStreamWriter, such as jms-v1 or jms-v2.
It is recommended to use the DataSource V2 implementation by specifying jms-v2 as the format value.
Connection Options¶
The following options can be provided to both DataStreamReader and DataStreamWriter:
| Option | Required | Description |
|---|---|---|
jms.connection.provider |
Yes | Name of the ConnectionFactoryProvider implementation |
jms.connection.queue |
Yes | Name of the JMS queue to read messages from |
jms.connection.username |
No | Username for JMS connection authentication |
jms.connection.password |
No | Password for JMS connection authentication |
jms.connection.selector |
No | JMS message selector for filtering messages |
Broker specific options can be passed to the connector using the jms.connection.broker. prefix.
Source Options¶
The following options can be provided to DataStreamReader:
| Option | Required | Description |
|---|---|---|
jms.messageFormat |
Yes | Message format that depends on the Message implementation (TextMessage or BytesMessage): text or binary |
jms.commitIntervalMs |
Yes | Interval in milliseconds to use for storing received messages in the write-ahead log |
jms.receiveTimeoutMs |
No | Timeout in milliseconds for using when Consumer.receive(...) is called; if omitted Consumer.receiveNoWait() will be used |
jms.bufferSize |
No | Size of the internal buffer to store messages before they are written into the write-ahead log (defaults to 5000) |
jms.numOffsetsToKeep |
No | Number of offsets in the Spark metadata directory to retain for tracking (defaults to 100) |
jms.numPartitions |
No | Number of partitions to split incoming data into (uses value of spark.sparkContext.defaultParallelism if not provided) |
Sink Options¶
The following options can be provided to DataStreamWriter:
| Option | Required | Description |
|---|---|---|
jms.messageFormat |
Yes | Message format that depends on the Message implementation (TextMessage or BytesMessage): text or binary |
jms.throttlingDelayMs |
No | Delay in milliseconds for throttling message writes (by default no throttling is applied) |