Script Redis
Script Redis
# Available constants:
# They are to assign a type to a field with a value null.
# sdc.NULL_BOOLEAN, sdc.NULL_CHAR, sdc.NULL_BYTE, sdc.NULL_SHORT,
sdc.NULL_INTEGER, sdc.NULL_LONG,
# sdc.NULL_FLOAT, sdc.NULL_DOUBLE, sdc.NULL_DATE, sdc.NULL_DATETIME,
sdc.NULL_TIME, sdc.NULL_DECIMAL,
# sdc.NULL_BYTE_ARRAY, sdc.NULL_STRING, sdc.NULL_LIST, sdc.NULL_MAP
#
# Available Objects:
#
# sdc.records: an array of records to process, depending on Jython processor
# processing mode it may have 1 record or all the records in the batch.
#
# sdc.state: a dict that is preserved between invocations of this script.
# Useful for caching bits of data e.g. counters.
#
# sdc.log.<loglevel>(msg, obj...): use instead of print to send log messages to
the log4j log instead of stdout.
# loglevel is any log4j level: e.g. info, error,
warn, trace.
#
# sdc.output.write(record): writes a record to processor output
#
# sdc.error.write(record, message): sends a record to error
#
# sdc.getFieldNull(Record, 'field path'): Receive a constant defined above
# to check if the field is typed field with value
null
# sdc.createRecord(String recordId): Creates a new record.
# Pass a recordId to uniquely identify the record and
include enough information to track down the record source.
# sdc.createMap(boolean listMap): Create a map for use as a field in a record.
# Pass True to this function to create a list map
(ordered map)
#
# sdc.createEvent(String type, int version): Creates a new event.
# Create new empty event with standard headers.
# sdc.toEvent(Record): Send event to event stream
# Only events created with sdcFunctions.createEvent are
supported.
# sdc.isPreview(): Determine if pipeline is in preview mode.
#
# sdc.userParams: Dictionary of user-specified keys and values (from UI).
#
#
# Available Record Header Variables:
#
# record.attributes: a map of record header attributes.
#
# record.<header name>: get the value of 'header name'.
#
# Add additional module search paths:
# try:
# sdc.importLock()
# import sys
# sys.path.append('/some/other/dir/to/search')
# import something
# finally:
# sdc.importUnlock()
#
# Change record root field value to a MAP value and create an entry
# record.value = { 'V' : 'Hello'}
# Check if the field is NULL_INTEGER(Both '==' and 'is' work). If so, assign a
value
# if sdc.getFieldNull(record, '/null_int') == sdc.NULL_INTEGER:
# record.value['null_int'] = 123
# Direct access to the underlying Data Collector Record. Use for read-only
operations.
# fieldAttr = record.sdcRecord.get('/value').getAttribute('attr')
# Get a record header with field names ex. get sourceId and errorCode
# sourceId = record.sourceId
# errorCode = ''
# if record.errorCode:
# errorCode = record.errorCode
# 只处理现有 ID 字段的记录
rdb = redis.Redis(host="172.16.10.192", port=6379, db=0,
decode_responses=True)
idList = rdb.get("I_ID")
if storeValue['rowDatas']['I_ID'] not in idList:
break
# 不处理 DDL
if storeValue['isDdl']:
eventType = "DDL"
data = {'type' : eventType}
newRecord = sdc.createRecord(record.sourceId + ':newRecordId')
newRecord.value = data
sdc.output.write(newRecord)
break
data = {'type' : eventType}
# 判断有效数据
# INSERT afterColumn record.value = data
# UPDATE afterColumns
# DELETE beforeColumns
columnsName = 'afterColumns'
if eventType == 'DELETE':
columnsName = 'beforeColumns'
# 遍历数据
for row in storeValue['rowDatas']:
columns = row[columnsName]
# 处理每列的值
for column in columns:
if column['value']:
data[column['name']] = column['value']
# 写入 record
newRecord = sdc.createRecord(record.sourceId + ':newRecordId')
newRecord.value =data
sdc.output.write(newRecord)
except Exception as e:
# Send record to error
sdc.error.write(record, str(e))
print(rdb.get("name"))
print("测试成功")
# 输出