Skip to content

Configuration

The available broker configuration options depend on the implementation of ConnectionFactoryProvider being used. Each broker provider defines its own set of options. For example, when using ActiveMqConnectionFactoryProvider from the provider-activemq module, you can configure the broker URL:

import io.github.dyaraev.spark.connector.jms.provider.activemq.ActiveMqConnectionFactoryProvider

df.writeStream
  .format("jms-v2")
  .option("jms.connection.provider", "active-mq")
  .option("jms.connection.broker.url", "tcp://localhost:61616")
  // other options
  .start()
  .awaitTermination()

Depending on the connector implementation, you must use the corresponding format value for each DataStreamReader and DataStreamWriter, such as jms-v1 or jms-v2. It is recommended to use the DataSource V2 implementation by specifying jms-v2 as the format value.

Connection Options

The following options can be provided to both DataStreamReader and DataStreamWriter:

Option Required Description
jms.connection.provider Yes Name of the ConnectionFactoryProvider implementation
jms.connection.queue Yes Name of the JMS queue to read messages from
jms.connection.username No Username for JMS connection authentication
jms.connection.password No Password for JMS connection authentication
jms.connection.selector No JMS message selector for filtering messages

Broker specific options can be passed to the connector using the jms.connection.broker. prefix.

Source Options

The following options can be provided to DataStreamReader:

Option Required Description
jms.messageFormat Yes Message format that depends on the Message implementation (TextMessage or BytesMessage): text or binary
jms.commitIntervalMs Yes Interval in milliseconds to use for storing received messages in the write-ahead log
jms.receiveTimeoutMs No Timeout in milliseconds for using when Consumer.receive(...) is called; if omitted Consumer.receiveNoWait() will be used
jms.bufferSize No Size of the internal buffer to store messages before they are written into the write-ahead log (defaults to 5000)
jms.numOffsetsToKeep No Number of offsets in the Spark metadata directory to retain for tracking (defaults to 100)
jms.numPartitions No Number of partitions to split incoming data into (uses value of spark.sparkContext.defaultParallelism if not provided)

Sink Options

The following options can be provided to DataStreamWriter:

Option Required Description
jms.messageFormat Yes Message format that depends on the Message implementation (TextMessage or BytesMessage): text or binary
jms.throttlingDelayMs No Delay in milliseconds for throttling message writes (by default no throttling is applied)