Configuration Options

Configuration Parameters

Key User Configuration Properties

Note

plog.location.uri - is the location from which Dbvisit Replicate Connector will search for and read PLOG files delivered by the Dbvisit Replicate application. This directory must be local to the filesystem on which Kafka Connect is running, or accessible to it via a mounted filesystem. It cannot be an otherwise remote filesystem.

Note

plog.data.flush.size - for low transactional volumes (and for testing) it is best to change the default value of plog.data.flush.size in the configuration file to a value less than your total number of change records, eg. for manual testing you can use 1 to verify that each and every record is emitted correctly. This configuration parameter is used as the internal PLOG reader’s cache size which translates at run time to the size of the polled batch of Kafka messages. It is more efficient under high transactional load to publish to Kafka (particularly in distributed mode where network latency might be an issue) in batches. Please note that in this approach the data from the PLOG reader is only emitted once the cache is full (for the specific Kafka source task) and/or the PLOG is done, so an Oracle redo log switch has occurred. This means that if the plog.data.flush.size is greater than total number of LCRs in cache it will wait for more data to arrive or a log switch to occur.

All User Configuration Properties

Version 2.8.04

tasks.max

Maximum number of tasks to start for processing PLOGs.

  • Type: string
  • Default: 4
topic.prefix

Prefix for all topic names.

  • Type: string
  • Default:
plog.location.uri

Replicate PLOG location URI, output of Replicate MINE.

plog.data.flush.size

LCRs to cache before flushing, for connector this is the batch size, choose this value according to transactional volume, for high throughput to kafka the default value may suffic, for low or sporadic volume lower this value, eg. for testing use 1 which will not use cache and emit every record immediately.

  • Type: string
  • Default: 1000
plog.interval.time.ms

Time in milliseconds for one wait interval, used by scans and health check.

  • Type: string
  • Default: 500
plog.scan.interval.count

Number of intervals between scans, eg. 5 x 0.5s = 2.5s scan wait time.

  • Type: string
  • Default: 5
plog.health.check.interval

Number of intervals between health checks, these are used when initially waiting for MINE to produce PLOGs, eg. 10 * 0.5s = 5.0s.

  • Type: string
  • Default: 10
plog.scan.offline.interval

Default number of health check scans to decide whether or not replicate is offline, this is used as time out value. NOTE for testing use 1, i.e. quit after first health check 1 * 10 * 0.5s = 5s where 10 is plog.health.check.interval value and 0.5s is plog.interval.time.ms value.

  • Type: string
  • Default: 1000
topic.name.transaction.info

Topic name for transaction meta data stream.

  • Type: string
  • Default: TX.META
plog.global.scn.cold.start

Global SCN when to start loading data during cold start.

  • Type: string
  • Default: 0

Version 2.9.00

Includes all configuration options for 2.8.04.

connector.publish.cdc.format

Set the output CDC format of the Replicate Connector. This determines what type of messages are published to Kafka, and the options are:

  1. changerow - complete row, the view of the table record after the action was applied (Default)
  2. changeset - publish the KEY, NEW, OLD and LOB change fields separately
  • Type: string
  • Default: changerow
connector.publish.transaction.info

Set whether or not the transaction info topic topic.name.transaction.info should be populated. This includes adding three fields to each Kafka data message to link the individual change message to its parent transaction message:

  • XID - transaction ID
  • TYPE - type of change, e.g. INSERT, UPDATE or DELETE
  • CHANGE_ID - unique ID of change

The options are:

  1. true
  2. false - do not publish the extra transaction info and fields
  • Type: string
  • Default: true
connector.publish.keys

Set whether or not keys should be published to all table topics. Keys are either primary or unique table constraints. When none of these are available all columns with either character, numeric or date data types are used as the key. The latter is not ideal, so it is encouraged to use PK or unique key constraints on source table.

The options are:

  1. true - publish key schema and values for all Kafka data messages (not transactional info message)
  2. false - do not publish keys
  • Type: string
  • Default: false
connector.publish.no.schema.evolution

If logical data types are used as default values certain versions of Schema Registry might fail validation due to an issue, see #556. This option is provided for disabling schema evolution for BACKWARDS compatible schemas, effectively forcing all messages to conform to the first schema version published by ignoring all subsequent DDL operations.

The options are:

  1. true - disable schema evolution, ignore all DDL modifications
  2. false - allow schema evolution for Schema Registry version 3.3 and newer
  • Type: string
  • Default: true
topic.static.schemas

Define the source schemas, as a comma separated list of fully qualified source table names, that may be considered static or only receiving sporadic changes. The committed offsets of their last message can be safely ignored if the lapsed days between the source PLOG of a new message and that of a previous one exceeds topic.static.offsets.age.days

Example:

  • SCHEMA.TABLE1,SCHEMA.TABLE2
  • Type: string
  • Default: none
topic.static.offsets.age.days

The age of the last committed offset for a static schema topic.static.schemas, when it can be safely ignored during a task restart and stream rewind. A message that originated from a source PLOG older will be considered static and not restart at its original source PLOG stream offset, but instead at its next available message offset. This is intended for static look up tables that rarely change when their source PLOGs may have been flushed since their last update. Defaults to 7 days.

  • Type: string
  • Default: 7
connector.catalog.bootstrap.servers

The Kafka bootstrap servers to use for establishing the initial connection to the Kafka cluster. This is needed for storing internal catalog records for the Dbvisit Replicate source connector.

  • Type: string
  • Default: localhost:9092
connector.catalog.topic.name

The name of the internal catalog topic created by the Dbvisit Replicate Connector for Kafka, used for tracking all replicated schemas emitted in PLOG stream. The provision is made here to rename this, to avoid conflicts with existing tables. But otherwise this topic should not be interfered with.

  • Type: string
  • Default: REPLICATE-INFO

Data Types

Oracle Data Type Connect Data Type Default Value Conversion Rule
NUMBER Int32 -1 scale <= 0 and precision - scale < 10
NUMBER Int64 -1L scale <= 0 and precision - scale > 10 and < 20
NUMBER Decimal BigDecimal.ZERO scale > 0 or precision - scale > 20
CHAR Type.String Empty string (zero length) Encoded as UTF8 string
VARCHAR “” “” “”
VARCHAR2 “” “” “”
LONG “” “” “”
NCHAR Type.String Empty string (zero length) Encoded as UTF8, attempt is made to auto-detect if national character set was UTF-16
NVARCHAR “” “” “”
NVARCHAR2 “” “” “”
INTERVAL DAY TO SECOND Type.String Empty string (zero length)  
INTERVAL YEAR TO MONTH “” “”  
CLOB Type.String Empty string (zero length) UTF8 string
NCLOB “” “” “”
DATE Timestamp Epoch time  
TIMESTAMP “” “”  
TIMESTAMP WITH TIME ZONE “” “”  
TIMESTAMP WITH LOCAL TIME ZONE “” “”  
BLOB Bytes Empty byte array (zero length) Converted from SerialBlob to bytes
RAW Bytes Empty byte array (zero length) No conversion
LONG RAW “” “” “”

Distributed Mode Settings

Use the following as a guide to starting Dbvisit Replicate Connector for Kafka in Distributed mode, once the Kafka Connect worker(s) has been started on the host node(s). Postman is an excellent utility for working with cUrl commands.

  ➜ curl -v -H "Content-Type: application/json" -X PUT 'http://localhost:8083/connectors/kafka-connect-dbvisitreplicate/config' -d
'{
  "connector.class": "com.dbvisit.replicate.kafkaconnect.ReplicateSourceConnector",
  "tasks.max": "2",
  "topic.prefix": "REP-",
  "plog.location.uri": "file:/foo/bar",
  "plog.data.flush.size": "1",
  "plog.interval.time.ms": "500",
  "plog.scan.interval.count": "5",
  "plog.health.check.interval": "10",
  "plog.scan.offline.interval": "1000",
  "topic.name.transaction.info": "TX.META"
}'

Or save this to a file <json_file>:

{
  "name": "kafka-connect-dbvisitreplicate",
  "config": {
    "connector.class": "com.dbvisit.replicate.kafkaconnect.ReplicateSourceConnector",
  "tasks.max": "2",
  "topic.prefix": "REP-",
  "plog.location.uri": "file:/foo/bar",
  "plog.data.flush.size": "1",
  "plog.interval.time.ms": "500",
  "plog.scan.interval.count": "5",
  "plog.health.check.interval": "10",
  "plog.scan.offline.interval": "1000",
  "topic.name.transaction.info": "TX.META"
  }
}

➜ curl -X POST -H "Content-Type: application/json" http://localhost:8083 --data "@<json_file>"